public inbox for pgsql-general@postgresql.org  
help / color / mirror / Atom feed
From: Zheng Li <zhengli10@gmail.com>
To: Japin Li <japinli@hotmail.com>
Cc: Alvaro Herrera <alvherre@alvh.no-ip.org>
Cc: Dilip Kumar <dilipbalaut@gmail.com>
Cc: rajesh.rs0541@gmail.com
Cc: PostgreSQL Hackers <pgsql-hackers@lists.postgresql.org>
Subject: Re: Support logical replication of DDLs
Date: Thu, 17 Mar 2022 20:18:18 -0400
Message-ID: <CAAD30UKTp87+kvGZYL3M2Suxq=WEvFUG24ZRT0yT9rqdkP=uMA@mail.gmail.com> (raw)
In-Reply-To: <CAAD30UKRUusq8JyyHzAv71=ncN22OE8OkOOyAWvRHW3wXNjyyA@mail.gmail.com>
References: <CAAD30ULtoGp8L_GKbV15Wnm+X5r=SE7MOnYHuqBr396m26jJSA@mail.gmail.com>
	<202203162206.7spggyktx63e@alvherre.pgsql>
	<CAAD30UKRUusq8JyyHzAv71=ncN22OE8OkOOyAWvRHW3wXNjyyA@mail.gmail.com>

Hello,

Attached please find the broken down patch set. Also fixed the failing
TAP tests Japin reported.

Regards,
Zheng Li
Amazon RDS/Aurora for PostgreSQL


Attachments:

  [application/octet-stream] 0001-syntax-pg_publication-pg_dump-ddl_replication.patch (62.6K, 2-0001-syntax-pg_publication-pg_dump-ddl_replication.patch)
  download | inline diff:
commit 3caa9210974f749e7bf82d80de57fe9d208984b4
Author: Zheng (Zane) Li <zhelli@amazon.com>
Date:   Fri Mar 18 00:01:34 2022 +0000

    0001-syntax-pg_publication-pg_dump-ddl_replication.patch
    1. Syntax, pg_publication and pg_dump change:
    Allows the user to configure either database level or table level
    DDL replication via the CREATE PUBLICATION command as proposed
    in my first email. Two new columns are added to the pg_publication
    catalog to show the DDL replication levels, test output
    publication.out is updated accordingly. pg_dump is also modified
    to accommodate the pg_publication catalog change.

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 789b895db8..00b5673b8f 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -857,6 +857,8 @@ GetPublication(Oid pubid)
 	pub->pubactions.pubdelete = pubform->pubdelete;
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
 	pub->pubviaroot = pubform->pubviaroot;
+	pub->pubactions.pubddl_database = pubform->pubddl_database;
+	pub->pubactions.pubddl_table = pubform->pubddl_table;
 
 	ReleaseSysCache(tup);
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 1aad2e769c..7d767c2bc4 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -80,10 +80,12 @@ static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
 static void
 parse_publication_options(ParseState *pstate,
 						  List *options,
+						  bool for_all_tables,
 						  bool *publish_given,
 						  PublicationActions *pubactions,
 						  bool *publish_via_partition_root_given,
-						  bool *publish_via_partition_root)
+						  bool *publish_via_partition_root,
+						  bool *ddl_level_given)
 {
 	ListCell   *lc;
 
@@ -96,6 +98,16 @@ parse_publication_options(ParseState *pstate,
 	pubactions->pubdelete = true;
 	pubactions->pubtruncate = true;
 	*publish_via_partition_root = false;
+	if (for_all_tables)
+	{
+		pubactions->pubddl_database = true;
+		pubactions->pubddl_table = true;
+	}
+	else
+	{
+		pubactions->pubddl_database = false;
+		pubactions->pubddl_table = false;
+	}
 
 	/* Parse options */
 	foreach(lc, options)
@@ -154,6 +166,57 @@ parse_publication_options(ParseState *pstate,
 			*publish_via_partition_root_given = true;
 			*publish_via_partition_root = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ddl") == 0)
+		{
+			char	   *ddl_level;
+			List	   *ddl_list;
+			ListCell   *lc;
+
+			if (*ddl_level_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			/*
+			 * If publish option was given only the explicitly listed actions
+			 * should be published.
+			 */
+			pubactions->pubddl_database = false;
+			pubactions->pubddl_table = false;
+
+			*ddl_level_given = true;
+			ddl_level = defGetString(defel);
+
+			if (!SplitIdentifierString(ddl_level, ',', &ddl_list))
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("invalid list syntax for \"ddl\" option")));
+
+			/* Process the option list. */
+			foreach(lc, ddl_list)
+			{
+				char	   *publish_opt = (char *) lfirst(lc);
+
+				if (strcmp(publish_opt, "database") == 0)
+				{
+					if (!for_all_tables)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("DDL replication on the database level is only supported in FOR ALL TABLES publication")));
+					else
+					{
+						pubactions->pubddl_database = true;
+						pubactions->pubddl_table = true;
+					}
+				}
+				else if (strcmp(publish_opt, "table") == 0)
+					pubactions->pubddl_table = true;
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("unrecognized \"ddl\" value: \"%s\"", publish_opt)));
+			}
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -622,6 +685,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 	Datum		values[Natts_pg_publication];
 	HeapTuple	tup;
 	bool		publish_given;
+	bool		ddl_level_given;
 	PublicationActions pubactions;
 	bool		publish_via_partition_root_given;
 	bool		publish_via_partition_root;
@@ -664,9 +728,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 
 	parse_publication_options(pstate,
 							  stmt->options,
+							  stmt->for_all_tables,
 							  &publish_given, &pubactions,
 							  &publish_via_partition_root_given,
-							  &publish_via_partition_root);
+							  &publish_via_partition_root,
+							  &ddl_level_given);
 
 	puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
 								Anum_pg_publication_oid);
@@ -683,6 +749,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 		BoolGetDatum(pubactions.pubtruncate);
 	values[Anum_pg_publication_pubviaroot - 1] =
 		BoolGetDatum(publish_via_partition_root);
+	values[Anum_pg_publication_pubddl_database - 1] =
+		BoolGetDatum(pubactions.pubddl_database);
+	values[Anum_pg_publication_pubddl_table - 1] =
+		BoolGetDatum(pubactions.pubddl_table);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -766,6 +836,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 	bool		replaces[Natts_pg_publication];
 	Datum		values[Natts_pg_publication];
 	bool		publish_given;
+	bool        ddl_level_given;
 	PublicationActions pubactions;
 	bool		publish_via_partition_root_given;
 	bool		publish_via_partition_root;
@@ -774,11 +845,15 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 	List	   *root_relids = NIL;
 	ListCell   *lc;
 
+	pubform = (Form_pg_publication) GETSTRUCT(tup);
+
 	parse_publication_options(pstate,
 							  stmt->options,
+							  pubform->puballtables,
 							  &publish_given, &pubactions,
 							  &publish_via_partition_root_given,
-							  &publish_via_partition_root);
+							  &publish_via_partition_root,
+							  &ddl_level_given);
 
 	pubform = (Form_pg_publication) GETSTRUCT(tup);
 
@@ -859,6 +934,15 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 		replaces[Anum_pg_publication_pubtruncate - 1] = true;
 	}
 
+	if (ddl_level_given)
+	{
+		values[Anum_pg_publication_pubddl_database - 1] = BoolGetDatum(pubactions.pubddl_database);
+		replaces[Anum_pg_publication_pubddl_database - 1] = true;
+
+		values[Anum_pg_publication_pubddl_table - 1] = BoolGetDatum(pubactions.pubddl_table);
+		replaces[Anum_pg_publication_pubddl_table - 1] = true;
+	}
+
 	if (publish_via_partition_root_given)
 	{
 		values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index fccffce572..d88b2aa642 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5606,6 +5606,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
 		pubdesc->pubactions.pubupdate |= pubform->pubupdate;
 		pubdesc->pubactions.pubdelete |= pubform->pubdelete;
 		pubdesc->pubactions.pubtruncate |= pubform->pubtruncate;
+		pubdesc->pubactions.pubddl_database |= pubform->pubddl_database;
+		pubdesc->pubactions.pubddl_table |= pubform->pubddl_table;
 
 		/*
 		 * Check if all columns referenced in the filter expression are part of
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 725cd2e4eb..5e06cf4d3a 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3819,6 +3819,8 @@ getPublications(Archive *fout, int *numPublications)
 	int			i_pubdelete;
 	int			i_pubtruncate;
 	int			i_pubviaroot;
+	int			i_pubddl_database;
+	int			i_pubddl_table;
 	int			i,
 				ntups;
 
@@ -3833,23 +3835,29 @@ getPublications(Archive *fout, int *numPublications)
 	resetPQExpBuffer(query);
 
 	/* Get the publications. */
-	if (fout->remoteVersion >= 130000)
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBuffer(query,
+						  "SELECT p.tableoid, p.oid, p.pubname, "
+						  "p.pubowner, "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot, p.pubddl_database, p.pubddl_table "
+						  "FROM pg_publication p");
+	else if (fout->remoteVersion >= 130000)
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "p.pubowner, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot, false AS pubddl_database, false AS pubddl_table "
 						  "FROM pg_publication p");
 	else if (fout->remoteVersion >= 110000)
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "p.pubowner, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot, false AS pubddl_database, false AS pubddl_table "
 						  "FROM pg_publication p");
 	else
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "p.pubowner, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot, false AS pubddl_database, false AS pubddl_table "
 						  "FROM pg_publication p");
 
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
@@ -3866,6 +3874,8 @@ getPublications(Archive *fout, int *numPublications)
 	i_pubdelete = PQfnumber(res, "pubdelete");
 	i_pubtruncate = PQfnumber(res, "pubtruncate");
 	i_pubviaroot = PQfnumber(res, "pubviaroot");
+	i_pubddl_database = PQfnumber(res, "pubddl_database");
+	i_pubddl_table = PQfnumber(res, "pubddl_table");
 
 	pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -3890,6 +3900,10 @@ getPublications(Archive *fout, int *numPublications)
 			(strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
 		pubinfo[i].pubviaroot =
 			(strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
+		pubinfo[i].pubddl_database =
+			(strcmp(PQgetvalue(res, i, i_pubddl_database), "t") == 0);
+		pubinfo[i].pubddl_table =
+			(strcmp(PQgetvalue(res, i, i_pubddl_table), "t") == 0);
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(pubinfo[i].dobj), fout);
@@ -3969,6 +3983,29 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo)
 
 	appendPQExpBufferStr(query, "'");
 
+	appendPQExpBufferStr(query, ", ddl = '");
+	first = true;
+
+	if (pubinfo->pubddl_database)
+	{
+		if (!first)
+			appendPQExpBufferStr(query, ", ");
+
+		appendPQExpBufferStr(query, "database");
+		first = false;
+	}
+
+	if (pubinfo->pubddl_table)
+	{
+		if (!first)
+			appendPQExpBufferStr(query, ", ");
+
+		appendPQExpBufferStr(query, "table");
+		first = false;
+	}
+
+	appendPQExpBufferStr(query, "'");
+
 	if (pubinfo->pubviaroot)
 		appendPQExpBufferStr(query, ", publish_via_partition_root = true");
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 772dc0cf7a..17789ea271 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -620,6 +620,8 @@ typedef struct _PublicationInfo
 	bool		pubdelete;
 	bool		pubtruncate;
 	bool		pubviaroot;
+	bool		pubddl_database;
+	bool		pubddl_table;
 } PublicationInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 991bfc1546..7a7d4a50e2 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5820,7 +5820,7 @@ listPublications(const char *pattern)
 	PQExpBufferData buf;
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
-	static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
+	static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -5855,6 +5855,15 @@ listPublications(const char *pattern)
 		appendPQExpBuffer(&buf,
 						  ",\n  pubviaroot AS \"%s\"",
 						  gettext_noop("Via root"));
+	if (pset.sversion >= 140000)
+	{
+		appendPQExpBuffer(&buf,
+						  ",\n  pubddl_database AS \"%s\"",
+						  gettext_noop("Database level DDLs"));
+		appendPQExpBuffer(&buf,
+						  ",\n  pubddl_table AS \"%s\"",
+						  gettext_noop("Table level DDLs"));
+	}
 
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
@@ -5936,6 +5945,7 @@ describePublications(const char *pattern)
 	PGresult   *res;
 	bool		has_pubtruncate;
 	bool		has_pubviaroot;
+	bool		has_pubddl;
 
 	PQExpBufferData title;
 	printTableContent cont;
@@ -5952,6 +5962,7 @@ describePublications(const char *pattern)
 
 	has_pubtruncate = (pset.sversion >= 110000);
 	has_pubviaroot = (pset.sversion >= 130000);
+	has_pubddl = (pset.sversion >= 150000);
 
 	initPQExpBuffer(&buf);
 
@@ -5965,6 +5976,11 @@ describePublications(const char *pattern)
 	if (has_pubviaroot)
 		appendPQExpBufferStr(&buf,
 							 ", pubviaroot");
+	if (has_pubddl)
+	{
+		appendPQExpBufferStr(&buf,
+							 ", pubddl_database, pubddl_table");
+	}
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
 
@@ -6011,6 +6027,8 @@ describePublications(const char *pattern)
 			ncols++;
 		if (has_pubviaroot)
 			ncols++;
+		if (has_pubddl)
+			ncols += 2;
 
 		initPQExpBuffer(&title);
 		printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -6025,6 +6043,11 @@ describePublications(const char *pattern)
 			printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
 		if (has_pubviaroot)
 			printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
+		if (has_pubddl)
+		{
+			printTableAddHeader(&cont, gettext_noop("Database level DDL"), true, align);
+			printTableAddHeader(&cont, gettext_noop("Table level DDL"), true, align);
+		}
 
 		printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
 		printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -6035,6 +6058,11 @@ describePublications(const char *pattern)
 			printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
 		if (has_pubviaroot)
 			printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
+		if (has_pubddl)
+		{
+			printTableAddCell(&cont, PQgetvalue(res, i, 9), false, false);
+			printTableAddCell(&cont, PQgetvalue(res, i, 10), false, false);
+		}
 
 		if (!puballtables)
 		{
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index fe773cf9b7..b86b49aafc 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -20,6 +20,7 @@
 #include "catalog/genbki.h"
 #include "catalog/objectaddress.h"
 #include "catalog/pg_publication_d.h"
+#include "tcop/utility.h"
 
 /* ----------------
  *		pg_publication definition.  cpp turns this into
@@ -54,6 +55,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 
 	/* true if partition changes are published using root schema */
 	bool		pubviaroot;
+
+	/* true if database level ddls are published */
+	bool		pubddl_database;
+
+	/* true if table level ddls are published */
+	bool		pubddl_table;
 } FormData_pg_publication;
 
 /* ----------------
@@ -72,6 +79,8 @@ typedef struct PublicationActions
 	bool		pubupdate;
 	bool		pubdelete;
 	bool		pubtruncate;
+	bool		pubddl_database;
+	bool		pubddl_table;
 } PublicationActions;
 
 typedef struct PublicationDesc
@@ -146,5 +155,6 @@ extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
+extern bool ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel);
 
 #endif							/* PG_PUBLICATION_H */
diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h
index 56d2bb6616..073f46ef1a 100644
--- a/src/include/commands/defrem.h
+++ b/src/include/commands/defrem.h
@@ -15,6 +15,7 @@
 #define DEFREM_H
 
 #include "catalog/objectaddress.h"
+#include "catalog/pg_publication.h"
 #include "nodes/params.h"
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 4e191c120a..6761e3bbc0 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -30,20 +30,20 @@ ERROR:  conflicting or redundant options
 LINE 1: ...ub_xxx WITH (publish_via_partition_root = 'true', publish_vi...
                                                              ^
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f
+                                                                   List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+---------------------+------------------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | f                   | f
+ testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f        | f                   | f
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f
+                                                                   List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+---------------------+------------------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | f                   | f
+ testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f        | f                   | f
 (2 rows)
 
 --- adding tables
@@ -87,10 +87,10 @@ RESET client_min_messages;
 -- should be able to add schema to 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable ADD ALL TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "public.testpub_tbl1"
 Tables from schemas:
@@ -99,20 +99,20 @@ Tables from schemas:
 -- should be able to drop schema from 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable DROP ALL TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "public.testpub_tbl1"
 
 -- should be able to set schema to 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable SET ALL TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test"
 
@@ -134,10 +134,10 @@ ERROR:  relation "testpub_nopk" is not part of the publication
 -- should be able to set table to schema publication
 ALTER PUBLICATION testpub_forschema SET TABLE pub_test.testpub_nopk;
 \dRp+ testpub_forschema
-                               Publication testpub_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                   Publication testpub_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "pub_test.testpub_nopk"
 
@@ -159,10 +159,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                              Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | t          | t       | t       | f       | f         | f
+                                                 Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | t          | t       | t       | f       | f         | f        | t                  | t
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -174,19 +174,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 RESET client_min_messages;
 \dRp+ testpub3
-                                    Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                       Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                                    Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                       Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "public.testpub_tbl3"
 
@@ -207,10 +207,10 @@ UPDATE testpub_parted1 SET a = 1;
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                   Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "public.testpub_parted"
 
@@ -223,10 +223,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 UPDATE testpub_parted1 SET a = 1;
 ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | t
+                                                   Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | t        | f                  | f
 Tables:
     "public.testpub_parted"
 
@@ -255,10 +255,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert');
 RESET client_min_messages;
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f                  | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -271,10 +271,10 @@ Tables:
 
 ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f                  | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -290,10 +290,10 @@ Publications:
 
 ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f                  | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
@@ -301,10 +301,10 @@ Tables:
 -- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
 ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f                  | f
 Tables:
     "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500))
 
@@ -337,10 +337,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert');
 RESET client_min_messages;
 \dRp+ testpub_syntax1
-                                Publication testpub_syntax1
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                                    Publication testpub_syntax1
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f                  | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl3" WHERE (e < 999)
@@ -350,10 +350,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert');
 RESET client_min_messages;
 \dRp+ testpub_syntax2
-                                Publication testpub_syntax2
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                                    Publication testpub_syntax2
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f                  | f
 Tables:
     "public.testpub_rf_tbl1"
     "testpub_rf_schema1.testpub_rf_tbl5" WHERE (h < 999)
@@ -658,10 +658,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                                 Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                    Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -699,10 +699,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                                    Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | f                  | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -780,10 +780,10 @@ REVOKE CREATE ON DATABASE regression FROM regress_publication_user2;
 DROP TABLE testpub_parted;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                                    Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | f                  | f
 (1 row)
 
 -- fail - must be owner of publication
@@ -793,20 +793,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                                           List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
--------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f
+                                                               List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs 
+-------------+--------------------------+------------+---------+---------+---------+-----------+----------+---------------------+------------------
+ testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f        | f                   | f
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                             List of publications
-      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
------------------+---------------------------+------------+---------+---------+---------+-----------+----------
- testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f
+                                                                  List of publications
+      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs 
+-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+---------------------+------------------
+ testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f        | f                   | f
 (1 row)
 
 -- adding schemas and tables
@@ -822,19 +822,19 @@ CREATE TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"(id int);
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub1_forschema FOR ALL TABLES IN SCHEMA pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
 
 CREATE PUBLICATION testpub2_forschema FOR ALL TABLES IN SCHEMA pub_test1, pub_test2, pub_test3;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -848,44 +848,44 @@ CREATE PUBLICATION testpub6_forschema FOR ALL TABLES IN SCHEMA "CURRENT_SCHEMA",
 CREATE PUBLICATION testpub_fortable FOR TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA";
 RESET client_min_messages;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "public"
 
 \dRp+ testpub4_forschema
-                               Publication testpub4_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub4_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "CURRENT_SCHEMA"
 
 \dRp+ testpub5_forschema
-                               Publication testpub5_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub5_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "CURRENT_SCHEMA"
     "public"
 
 \dRp+ testpub6_forschema
-                               Publication testpub6_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub6_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "CURRENT_SCHEMA"
     "public"
 
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "CURRENT_SCHEMA.CURRENT_SCHEMA"
 
@@ -919,10 +919,10 @@ ERROR:  schema "testpub_view" does not exist
 -- dropping the schema should reflect the change in publication
 DROP SCHEMA pub_test3;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -930,20 +930,20 @@ Tables from schemas:
 -- renaming the schema should reflect the change in publication
 ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1_renamed"
     "pub_test2"
 
 ALTER SCHEMA pub_test1_renamed RENAME to pub_test1;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -951,10 +951,10 @@ Tables from schemas:
 -- alter publication add schema
 ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -963,10 +963,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -975,10 +975,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test1;
 ERROR:  schema "pub_test1" is already member of publication "testpub1_forschema"
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -986,10 +986,10 @@ Tables from schemas:
 -- alter publication drop schema
 ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
 
@@ -997,10 +997,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2;
 ERROR:  tables from schema "pub_test2" are not part of the publication
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
 
@@ -1008,29 +1008,29 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
 
 -- drop all schemas
 ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 (1 row)
 
 -- alter publication set multiple schema
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1039,10 +1039,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1051,10 +1051,10 @@ Tables from schemas:
 -- removing the duplicate schemas
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
 
@@ -1124,18 +1124,18 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub3_forschema;
 RESET client_min_messages;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 (1 row)
 
 ALTER PUBLICATION testpub3_forschema SET ALL TABLES IN SCHEMA pub_test1;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                                  Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables from schemas:
     "pub_test1"
 
@@ -1145,20 +1145,20 @@ CREATE PUBLICATION testpub_forschema_fortable FOR ALL TABLES IN SCHEMA pub_test1
 CREATE PUBLICATION testpub_fortable_forschema FOR TABLE pub_test2.tbl1, ALL TABLES IN SCHEMA pub_test1;
 RESET client_min_messages;
 \dRp+ testpub_forschema_fortable
-                           Publication testpub_forschema_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                              Publication testpub_forschema_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "pub_test2.tbl1"
 Tables from schemas:
     "pub_test1"
 
 \dRp+ testpub_fortable_forschema
-                           Publication testpub_fortable_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                              Publication testpub_fortable_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL 
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f                  | f
 Tables:
     "pub_test2.tbl1"
 Tables from schemas:


  [application/octet-stream] 0002-logical_decoding_ddl_message.patch (57.5K, 3-0002-logical_decoding_ddl_message.patch)
  download | inline diff:
commit b2c35793dd28b9c5c3c06f0d81d1981009b478e1
Author: Zheng (Zane) Li <zhelli@amazon.com>
Date:   Fri Mar 18 00:04:35 2022 +0000

    0002-logical_decoding_ddl_message.patch
    2. Logical logging change
    a. A new WAL record type xl_logical_ddl_message is introduced to
    support logical logging of DDL command. xl_logical_ddl_message is
    similar to the existing xl_logical_message for generic message
    logging. The reason for not using xl_logical_message directly as
    proposed initially is I found out we need to log more information
    (such as user role, search path and potentially more in the future)
    than just one string, and we don’t want to make too much changes to
    the existing xl_logical_message which may break its current
    consumers.
    b. The logging of DDL command string is processed in function
    LogLogicalDDLCommand. We categorize DDL command types into three
    categories in this function:
      1. replicated in database level replication only (such as CREATE
    TABLE, CREATE FUNCTION).
      2. replicated in database or table level replication depending on
    the configuration (such as ALTER TABLE).
      3. not supported for replication or pending investigation.
    
    3. Logical decoding and Reorderbuffer change
    Supports logical decoding of the new WAL record
    xl_logical_ddl_message. This is similar to the logical decoding of
    xl_logical_message. Tests for this change are added in the
    test_decoding plugin.

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 36929dd97d..d5091c568d 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -4,7 +4,7 @@ MODULES = test_decoding
 PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
-	decoding_into_rel binary prepared replorigin time messages \
+	decoding_into_rel binary prepared replorigin time messages ddlmessages\
 	spill slot truncate stream stats twophase twophase_stream \
 	sequence
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out
new file mode 100644
index 0000000000..a92a959cad
--- /dev/null
+++ b/contrib/test_decoding/expected/ddlmessages.out
@@ -0,0 +1,48 @@
+-- predictability
+SET synchronous_commit = on;
+-- turn on logical ddl message logging
+CREATE publication mypub FOR ALL TABLES with (ddl = 'database');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE tab1 (id serial unique, data int);
+ALTER TABLE tab1 add c3 varchar;
+ALTER TABLE tab1 drop c3;
+DROP TABLE tab1;
+BEGIN;
+CREATE TABLE tab1 (id serial unique, data int);
+ALTER TABLE tab1 add c3 varchar;
+ROLLBACK;
+BEGIN;
+CREATE TABLE tab1 (id serial unique, data int);
+ALTER TABLE tab1 add c3 varchar;
+COMMIT;
+\o | sed 's/role.*search_path/role: redacted, search_path/g'
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
+DROP TABLE tab1;
+DROP publication mypub;
+                                                                       data                                                                        
+---------------------------------------------------------------------------------------------------------------------------------------------------
+ DDL message: transactional: 1 prefix:  role: redacted, search_path: "$user", public, sz: 47 content:CREATE TABLE tab1 (id serial unique, data int);
+ BEGIN
+ sequence public.tab1_id_seq: transactional:1 last_value: 1 log_cnt: 0 is_called:0
+ COMMIT
+ DDL message: transactional: 1 prefix:  role: redacted, search_path: "$user", public, sz: 32 content:ALTER TABLE tab1 add c3 varchar;
+ DDL message: transactional: 1 prefix:  role: redacted, search_path: "$user", public, sz: 25 content:ALTER TABLE tab1 drop c3;
+ DDL message: transactional: 1 prefix:  role: redacted, search_path: "$user", public, sz: 16 content:DROP TABLE tab1;
+ DDL message: transactional: 1 prefix:  role: redacted, search_path: "$user", public, sz: 47 content:CREATE TABLE tab1 (id serial unique, data int);
+ BEGIN
+ sequence public.tab1_id_seq: transactional:1 last_value: 1 log_cnt: 0 is_called:0
+ DDL message: transactional: 1 prefix:  role: redacted, search_path: "$user", public, sz: 32 content:ALTER TABLE tab1 add c3 varchar;
+ COMMIT
+(12 rows)
+
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/ddlmessages.sql b/contrib/test_decoding/sql/ddlmessages.sql
new file mode 100644
index 0000000000..e4fc3d89bd
--- /dev/null
+++ b/contrib/test_decoding/sql/ddlmessages.sql
@@ -0,0 +1,28 @@
+-- predictability
+SET synchronous_commit = on;
+-- turn on logical ddl message logging
+CREATE publication mypub FOR ALL TABLES with (ddl = 'database');
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE tab1 (id serial unique, data int);
+ALTER TABLE tab1 add c3 varchar;
+ALTER TABLE tab1 drop c3;
+DROP TABLE tab1;
+
+BEGIN;
+CREATE TABLE tab1 (id serial unique, data int);
+ALTER TABLE tab1 add c3 varchar;
+ROLLBACK;
+
+BEGIN;
+CREATE TABLE tab1 (id serial unique, data int);
+ALTER TABLE tab1 add c3 varchar;
+COMMIT;
+
+\o | sed 's/role.*search_path/role: redacted, search_path/g'
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
+DROP TABLE tab1;
+DROP publication mypub;
+
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ea22649e41..5e02b274c5 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,11 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
+static void pg_decode_ddlmessage(LogicalDecodingContext *ctx,
+								 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+								 bool transactional, const char *prefix,
+								 const char *role, const char *search_path,
+								 Size sz, const char *message);
 static void pg_decode_sequence(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
 							  Relation rel, bool transactional,
@@ -121,6 +126,11 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
 									 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 									 bool transactional, const char *prefix,
 									 Size sz, const char *message);
+static void pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx,
+										ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+										bool transactional, const char *prefix,
+										const char *role, const char *search_path,
+										Size sz, const char *message);
 static void pg_decode_stream_sequence(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
 									  Relation rel, bool transactional,
@@ -150,6 +160,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->ddlmessage_cb = pg_decode_ddlmessage;
 	cb->sequence_cb = pg_decode_sequence;
 	cb->filter_prepare_cb = pg_decode_filter_prepare;
 	cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
@@ -163,6 +174,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_commit_cb = pg_decode_stream_commit;
 	cb->stream_change_cb = pg_decode_stream_change;
 	cb->stream_message_cb = pg_decode_stream_message;
+	cb->stream_ddlmessage_cb = pg_decode_stream_ddlmessage;
 	cb->stream_sequence_cb = pg_decode_stream_sequence;
 	cb->stream_truncate_cb = pg_decode_stream_truncate;
 }
@@ -758,7 +770,8 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 static void
 pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
-				  const char *prefix, Size sz, const char *message)
+				  const char *prefix, Size sz,
+				  const char *message)
 {
 	OutputPluginPrepareWrite(ctx, true);
 	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
@@ -799,6 +812,19 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_ddlmessage(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+				  const char *prefix, const char *role, const char *search_path,
+				  Size sz, const char *message)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:",
+					 transactional, prefix, role, search_path, sz);
+	appendBinaryStringInfo(ctx->out, message, sz);
+	OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
@@ -979,7 +1005,8 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 static void
 pg_decode_stream_message(LogicalDecodingContext *ctx,
 						 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
-						 const char *prefix, Size sz, const char *message)
+						 const char *prefix, Size sz,
+						 const char *message)
 {
 	OutputPluginPrepareWrite(ctx, true);
 
@@ -991,7 +1018,35 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
 	else
 	{
 		appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
-						 transactional, prefix, sz);
+							 transactional, prefix, sz);
+		appendBinaryStringInfo(ctx->out, message, sz);
+	}
+
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * In streaming mode, we don't display the contents for transactional messages
+ * as the transaction can abort at a later point in time.  We don't want users to
+ * see the message contents until the transaction is committed.
+ */
+static void
+pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx,
+							ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+							const char *prefix, const char * role, const char * search_path,
+							Size sz, const char *message)
+{
+	OutputPluginPrepareWrite(ctx, true);
+
+	if (transactional)
+	{
+		appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu",
+						 transactional, prefix, role, search_path, sz);
+	}
+	else
+	{
+		appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:",
+						 transactional, prefix, role, search_path, sz);
 		appendBinaryStringInfo(ctx->out, message, sz);
 	}
 
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd86..b8e29e8df3 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -19,6 +19,7 @@ OBJS = \
 	hashdesc.o \
 	heapdesc.o \
 	logicalmsgdesc.o \
+	logicalddlmsgdesc.o \
 	mxactdesc.o \
 	nbtdesc.o \
 	relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
new file mode 100644
index 0000000000..7a352d540a
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
@@ -0,0 +1,54 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalddlmsgdesc.c
+ *	  rmgr descriptor routines for replication/logical/ddlmessage.c
+ *
+ * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/logicalddlmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/ddlmessage.h"
+
+void
+logicalddlmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_LOGICAL_DDL_MESSAGE)
+	{
+		xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec;
+		char	   *prefix = xlrec->message;
+		char       *role = xlrec->message + xlrec->prefix_size;
+		char       *search_path = xlrec->message + xlrec->prefix_size + xlrec->role_size;
+		char	   *message = xlrec->message + xlrec->prefix_size + xlrec->role_size + xlrec->search_path_size;
+		char	   *sep = "";
+
+		Assert(prefix[xlrec->prefix_size] != '\0');
+
+		appendStringInfo(buf, "%s, prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ",
+						 xlrec->transactional ? "transactional" : "non-transactional",
+						 prefix, role, search_path, xlrec->message_size);
+		/* Write message payload as a series of hex bytes */
+		for (int cnt = 0; cnt < xlrec->message_size; cnt++)
+		{
+			appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]);
+			sep = " ";
+		}
+	}
+}
+
+const char *
+logicalddlmsg_identify(uint8 info)
+{
+	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE)
+		return "DDL MESSAGE";
+
+	return NULL;
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aeb..b761662d28 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -26,6 +26,7 @@
 #include "commands/tablespace.h"
 #include "replication/decode.h"
 #include "replication/message.h"
+#include "replication/ddlmessage.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 00b5673b8f..f2d11c3847 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1007,3 +1007,55 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
 	SRF_RETURN_DONE(funcctx);
 }
+
+/*
+ * Checks if DDL on relation (relid) need xlog for logical replication
+ */
+bool
+ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel)
+{
+	List *allTablePubs = NIL;
+	List *tablePubs = NIL;
+	ListCell *lc;
+
+	/* Only replicate toplevel DDL command */
+	if (!isTopLevel)
+		return false;
+	if (relid == InvalidOid && !forAllTabPubOnly)
+		return false;
+
+	/*
+	 * Log the DDL command if
+	 * there is any FOR ALL TABLES publication with pubddl_database on
+	 * or
+	 * this TABLE belongs to any non FOR ALL publications with pubddl_table on
+	 */
+	allTablePubs = GetAllTablesPublications();
+	foreach(lc, allTablePubs)
+	{
+		Oid pubid = lfirst_oid(lc);
+		Publication *pub = GetPublication(pubid);
+
+		if (pub->pubactions.pubddl_database)
+			return true;
+	}
+
+	/*
+	 * If forAllTabPubOnly is true (i.e. database level replication is required for the DDL
+	 * to be logged), we can bail now since no publication has been found with pubddl_database on
+	 */
+	if (forAllTabPubOnly)
+		return false;
+
+	tablePubs = GetRelationPublications(relid);
+	foreach(lc, tablePubs)
+	{
+		Oid pubid = lfirst_oid(lc);
+		Publication *pub = GetPublication(pubid);
+
+		if (pub->pubactions.pubddl_table)
+			return true;
+	}
+
+	return false;
+}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index dc5872f988..f56ea4c49c 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -79,6 +79,7 @@
 #include "partitioning/partbounds.h"
 #include "partitioning/partdesc.h"
 #include "pgstat.h"
+#include "replication/ddlmessage.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rewriteHandler.h"
 #include "rewrite/rewriteManip.h"
@@ -1325,13 +1326,14 @@ DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind)
  *		DROP MATERIALIZED VIEW, DROP FOREIGN TABLE
  */
 void
-RemoveRelations(DropStmt *drop)
+RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel)
 {
 	ObjectAddresses *objects;
 	char		relkind;
 	ListCell   *cell;
 	int			flags = 0;
 	LOCKMODE	lockmode = AccessExclusiveLock;
+	bool		ddlxlog = XLogLogicalInfoActive();
 
 	/* DROP CONCURRENTLY uses a weaker lock, and has some restrictions */
 	if (drop->concurrent)
@@ -1427,10 +1429,37 @@ RemoveRelations(DropStmt *drop)
 		/* Not there? */
 		if (!OidIsValid(relOid))
 		{
+			ddlxlog = false;
 			DropErrorMsgNonExistent(rel, relkind, drop->missing_ok);
 			continue;
 		}
 
+		/*
+		 * Only log DROP RELATION cmd for logical replication if
+		 * there is any FOR ALL TABLES publication with pubddl_database on or
+		 * every relation to be dropped belongs to any non FOR ALL publications with pubddl_table on
+		 */
+		if (ddlxlog)
+		{
+			Oid tableOid = InvalidOid;
+
+			if (relkind == RELKIND_RELATION)
+				tableOid = relOid;
+			else if (relkind == RELKIND_INDEX)
+				tableOid = IndexGetRelation(relOid, true);
+			/*
+			 * Other relation types require database level ddl replication and are
+			 * already logged in LogLogicalDDLCommand() if needed.
+			 */
+			else
+				ddlxlog = false;
+
+			/* DROP RELATION or INDEX are allowed in table level DDL replication */
+			if (tableOid != InvalidOid &&
+				!ddl_need_xlog(tableOid, false, isTopLevel))
+				ddlxlog = false;
+		}
+
 		/*
 		 * Decide if concurrent mode needs to be used here or not.  The
 		 * relation persistence cannot be known without its OID.
@@ -1462,6 +1491,18 @@ RemoveRelations(DropStmt *drop)
 		add_exact_object_address(&obj, objects);
 	}
 
+	/* Log the Drop command for logical replication */
+	if (ddlxlog)
+	{
+		bool transactional = true;
+		const char* prefix = "";
+		LogLogicalDDLMessage(prefix,
+							 GetUserId(),
+							 pstate->p_sourcetext,
+							 strlen(pstate->p_sourcetext),
+							 transactional);
+	}
+
 	performMultipleDeletions(objects, drop->behavior, flags);
 
 	free_object_addresses(objects);
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb71..f3eeb67312 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
 OBJS = \
 	decode.o \
+	ddlmessage.o\
 	launcher.o \
 	logical.o \
 	logicalfuncs.o \
diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c
new file mode 100644
index 0000000000..f93573079a
--- /dev/null
+++ b/src/backend/replication/logical/ddlmessage.c
@@ -0,0 +1,99 @@
+/*-------------------------------------------------------------------------
+ *
+ * ddlmessage.c
+ *	  Logical DDL messages.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/ddlmessage.c
+ *
+ * NOTES
+ *
+ * Logical DDL messages allow XLOG logging of DDL command strings that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * Simiarl to the generic logical messages, These DDL messages can be either
+ * transactional or non-transactional. Note by default DDLs in PostgreSQL are
+ * transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG. This also means that transactional
+ * messages won't be delivered if the transaction was rolled back but the
+ * non-transactional one will always be delivered.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/namespace.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/ddlmessage.h"
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding DDL message into XLog.
+ */
+XLogRecPtr
+LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message,
+					 size_t size, bool transactional)
+{
+	xl_logical_ddl_message xlrec;
+	const char *role;
+
+	role =  GetUserNameFromId(roleoid, false);
+
+	/*
+	 * Force xid to be allocated if we're emitting a transactional message.
+	 */
+	if (transactional)
+	{
+		Assert(IsTransactionState());
+		GetCurrentTransactionId();
+	}
+
+	xlrec.dbId = MyDatabaseId;
+	xlrec.transactional = transactional;
+	/* trailing zero is critical; see logicalddlmsg_desc */
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.role_size = strlen(role) + 1;
+	xlrec.search_path_size = strlen(namespace_search_path) + 1;
+	xlrec.message_size = size;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage);
+	XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+	XLogRegisterData(unconstify(char *, role), xlrec.role_size);
+	XLogRegisterData(namespace_search_path, xlrec.search_path_size);
+	XLogRegisterData(unconstify(char *, message), size);
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding ddl messages.
+ */
+void
+logicalddlmsg_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info != XLOG_LOGICAL_DDL_MESSAGE)
+		elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info);
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8c00a73cb9..b9f3086e2c 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -36,6 +36,7 @@
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
 #include "replication/decode.h"
+#include "replication/ddlmessage.h"
 #include "replication/logical.h"
 #include "replication/message.h"
 #include "replication/origin.h"
@@ -605,6 +606,61 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 							  message->message + message->prefix_size);
 }
 
+/*
+ * Handle rmgr LOGICALDDLMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+void
+logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	XLogReaderState *r = buf->record;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	Snapshot	snapshot;
+	xl_logical_ddl_message *message;
+
+	if (info != XLOG_LOGICAL_DDL_MESSAGE)
+		elog(ERROR, "unexpected RM_LOGICALDDLMSG_ID record type: %u", info);
+
+	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding ddl messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	message = (xl_logical_ddl_message *) XLogRecGetData(r);
+
+	if (message->dbId != ctx->slot->data.database ||
+		FilterByOrigin(ctx, origin_id))
+		return;
+
+	if (message->transactional &&
+		!SnapBuildProcessChange(builder, xid, buf->origptr))
+		return;
+	else if (!message->transactional &&
+			 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+			  SnapBuildXactNeedsSkip(builder, buf->origptr)))
+		return;
+
+	snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+	ReorderBufferQueueDDLMessage(ctx->reorder, xid, snapshot, buf->endptr,
+							  message->transactional,
+							  message->message,
+							  /* first part of message is prefix */
+							  message->message + message->prefix_size,
+							  /* Second part of message is role*/
+							  message->message + message->prefix_size + message->role_size,
+							  /* Third part of message is search_path */
+							  message->message_size,
+							  message->message + message->prefix_size +
+							  message->role_size + message->search_path_size);
+}
+
 /*
  * Consolidated commit record handling between the different form of commit
  * records.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13f2d..4e84705007 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+								  XLogRecPtr message_lsn, bool transactional,
+								  const char *prefix, const char *role, const char *search_path,
+								  Size message_size, const char *message);
 static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 								XLogRecPtr sequence_lsn, Relation rel,
 								bool transactional,
@@ -94,6 +98,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
 static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									  XLogRecPtr message_lsn, bool transactional,
 									  const char *prefix, Size message_size, const char *message);
+static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+										 XLogRecPtr message_lsn, bool transactional,
+										 const char *prefix, const char *role, const char *search_path,
+										 Size message_size, const char *message);
 static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   XLogRecPtr sequence_lsn, Relation rel,
 									   bool transactional,
@@ -226,6 +234,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->ddlmessage = ddlmessage_cb_wrapper;
 	ctx->reorder->sequence = sequence_cb_wrapper;
 
 	/*
@@ -243,6 +252,7 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_commit_cb != NULL) ||
 		(ctx->callbacks.stream_change_cb != NULL) ||
 		(ctx->callbacks.stream_message_cb != NULL) ||
+		(ctx->callbacks.stream_ddlmessage_cb != NULL) ||
 		(ctx->callbacks.stream_sequence_cb != NULL) ||
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
@@ -261,6 +271,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->stream_commit = stream_commit_cb_wrapper;
 	ctx->reorder->stream_change = stream_change_cb_wrapper;
 	ctx->reorder->stream_message = stream_message_cb_wrapper;
+	ctx->reorder->stream_ddlmessage = stream_ddlmessage_cb_wrapper;
 	ctx->reorder->stream_sequence = stream_sequence_cb_wrapper;
 	ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
 
@@ -1250,6 +1261,44 @@ sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+					  XLogRecPtr message_lsn, bool transactional,
+					  const char *prefix, const char *role,
+					  const char *search_path, Size message_size,
+					  const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	if (ctx->callbacks.ddlmessage_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "ddlmessage";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix,
+								 role, search_path, message_size, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						XLogRecPtr first_lsn)
@@ -1596,6 +1645,48 @@ stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							 XLogRecPtr message_lsn, bool transactional,
+							 const char *prefix, const char *role,
+							 const char* search_path, Size message_size,
+							 const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* this callback is optional */
+	if (ctx->callbacks.stream_ddlmessage_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_ddlmessage";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix,
+										role, search_path, message_size, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						   int nrelations, Relation relations[],
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c2d9be81fa..599dcef6bf 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -555,6 +555,20 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				pfree(change->data.msg.message);
 			change->data.msg.message = NULL;
 			break;
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			if (change->data.ddlmsg.prefix != NULL)
+				pfree(change->data.ddlmsg.prefix);
+			change->data.ddlmsg.prefix = NULL;
+			if (change->data.ddlmsg.role != NULL)
+				pfree(change->data.ddlmsg.role);
+			change->data.ddlmsg.role = NULL;
+			if (change->data.ddlmsg.search_path != NULL)
+				pfree(change->data.ddlmsg.search_path);
+			change->data.ddlmsg.search_path = NULL;
+			if (change->data.ddlmsg.message != NULL)
+				pfree(change->data.ddlmsg.message);
+			change->data.ddlmsg.message = NULL;
+			break;
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			if (change->data.inval.invalidations)
 				pfree(change->data.inval.invalidations);
@@ -1140,6 +1154,64 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * A transactional DDL message is queued to be processed upon commit and a
+ * non-transactional DDL message gets processed immediately.
+ */
+void
+ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid,
+						  Snapshot snapshot, XLogRecPtr lsn,
+						  bool transactional, const char *prefix,
+						  const char *role, const char *search_path,
+						  Size message_size, const char *message)
+{
+	if (transactional)
+	{
+		MemoryContext oldcontext;
+		ReorderBufferChange *change;
+
+		Assert(xid != InvalidTransactionId);
+
+		oldcontext = MemoryContextSwitchTo(rb->context);
+
+		change = ReorderBufferGetChange(rb);
+		change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE;
+		change->data.ddlmsg.prefix = pstrdup(prefix);
+		change->data.ddlmsg.role = pstrdup(role);
+		change->data.ddlmsg.search_path = pstrdup(search_path);
+		change->data.ddlmsg.message_size = message_size;
+		change->data.ddlmsg.message = palloc(message_size);
+		memcpy(change->data.ddlmsg.message, message, message_size);
+
+		ReorderBufferQueueChange(rb, xid, lsn, change, false);
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+	else
+	{
+		ReorderBufferTXN *txn = NULL;
+		volatile Snapshot snapshot_now = snapshot;
+
+		if (xid != InvalidTransactionId)
+			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+		/* setup snapshot to allow catalog access */
+		SetupHistoricSnapshot(snapshot_now, NULL);
+		PG_TRY();
+		{
+			rb->ddlmessage(rb, txn, lsn, false, prefix, role, search_path, message_size, message);
+
+			TeardownHistoricSnapshot(false);
+		}
+		PG_CATCH();
+		{
+			TeardownHistoricSnapshot(true);
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+	}
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
@@ -2234,6 +2306,29 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					change->data.msg.message);
 }
 
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the DDL message.
+ */
+static inline void
+ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
+							 ReorderBufferChange *change, bool streaming)
+{
+	if (streaming)
+		rb->stream_ddlmessage(rb, txn, change->lsn, true,
+							  change->data.ddlmsg.prefix,
+							  change->data.ddlmsg.role,
+							  change->data.ddlmsg.search_path,
+							  change->data.ddlmsg.message_size,
+							  change->data.ddlmsg.message);
+	else
+		rb->ddlmessage(rb, txn, change->lsn, true,
+					   change->data.ddlmsg.prefix,
+					   change->data.ddlmsg.role,
+					   change->data.ddlmsg.search_path,
+					   change->data.ddlmsg.message_size,
+					   change->data.ddlmsg.message);
+}
+
 /*
  * Helper function for ReorderBufferProcessTXN for applying sequences.
  */
@@ -2635,6 +2730,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					ReorderBufferApplyMessage(rb, txn, change, streaming);
 					break;
 
+				case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+					ReorderBufferApplyDDLMessage(rb, txn, change, streaming);
+					break;
+
 				case REORDER_BUFFER_CHANGE_INVALIDATION:
 					/* Execute the invalidation messages locally */
 					ReorderBufferExecuteInvalidations(
@@ -4034,6 +4133,53 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					   change->data.msg.message_size);
 				data += change->data.msg.message_size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			{
+				char	   *data;
+				Size		prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
+				Size		role_size = strlen(change->data.ddlmsg.role) + 1;
+				Size		search_path_size = strlen(change->data.ddlmsg.search_path) + 1;
+
+				sz += prefix_size + role_size + search_path_size +
+					  change->data.ddlmsg.message_size +
+					  sizeof(Size) + sizeof(Size) + sizeof(Size) + sizeof(Size);
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				/* write the prefix including the size */
+				memcpy(data, &prefix_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, change->data.ddlmsg.prefix,
+					   prefix_size);
+				data += prefix_size;
+
+				/* write the role including the size */
+				memcpy(data, &role_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, change->data.ddlmsg.role,
+					   role_size);
+				data += role_size;
+
+				/* write the search_path including the size */
+				memcpy(data, &search_path_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, change->data.ddlmsg.search_path,
+					   search_path_size);
+				data += search_path_size;
+
+				/* write the message including the size */
+				memcpy(data, &change->data.ddlmsg.message_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, change->data.ddlmsg.message,
+					   change->data.ddlmsg.message_size);
+				data += change->data.ddlmsg.message_size;
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
@@ -4381,6 +4527,18 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 				sz += prefix_size + change->data.msg.message_size +
 					sizeof(Size) + sizeof(Size);
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			{
+				Size		prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
+				Size		role_size = strlen(change->data.ddlmsg.role) + 1;
+				Size		search_path_size = strlen(change->data.ddlmsg.search_path) + 1;
+
+				sz += prefix_size + role_size + search_path_size +
+					change->data.ddlmsg.message_size +
+					sizeof(Size) + sizeof(Size) + sizeof(Size) + sizeof(Size);
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
@@ -4657,8 +4815,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				/* read prefix */
 				memcpy(&prefix_size, data, sizeof(Size));
 				data += sizeof(Size);
-				change->data.msg.prefix = MemoryContextAlloc(rb->context,
-															 prefix_size);
+				change->data.msg.prefix = MemoryContextAlloc(rb->context, prefix_size);
 				memcpy(change->data.msg.prefix, data, prefix_size);
 				Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
 				data += prefix_size;
@@ -4672,6 +4829,49 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					   change->data.msg.message_size);
 				data += change->data.msg.message_size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			{
+				Size		prefix_size;
+				Size		role_size;
+				Size		search_path_size;
+
+				/* read prefix */
+				memcpy(&prefix_size, data, sizeof(Size));
+				data += sizeof(Size);
+				change->data.ddlmsg.prefix = MemoryContextAlloc(rb->context, prefix_size);
+				memcpy(change->data.ddlmsg.prefix, data, prefix_size);
+				Assert(change->data.ddlmsg.prefix[prefix_size - 1] == '\0');
+				data += prefix_size;
+
+				/* read role */
+				memcpy(&role_size, data, sizeof(Size));
+				data += sizeof(Size);
+				change->data.ddlmsg.role = MemoryContextAlloc(rb->context,
+															  role_size);
+				memcpy(change->data.ddlmsg.role, data, role_size);
+				Assert(change->data.ddlmsg.role[role_size - 1] == '\0');
+				data += role_size;
+
+				/* read search_path */
+				memcpy(&search_path_size, data, sizeof(Size));
+				data += sizeof(Size);
+				change->data.ddlmsg.search_path = MemoryContextAlloc(rb->context,
+																	 search_path_size);
+				memcpy(change->data.ddlmsg.search_path, data, search_path_size);
+				Assert(change->data.ddlmsg.search_path[search_path_size - 1] == '\0');
+				data += search_path_size;
+
+				/* read the message */
+				memcpy(&change->data.msg.message_size, data, sizeof(Size));
+				data += sizeof(Size);
+				change->data.msg.message = MemoryContextAlloc(rb->context,
+															  change->data.msg.message_size);
+				memcpy(change->data.msg.message, data,
+					   change->data.msg.message_size);
+				data += change->data.msg.message_size;
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 3780c6e812..e5ec5713fa 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -62,6 +62,7 @@
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
+#include "replication/ddlmessage.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rewriteRemove.h"
 #include "storage/fd.h"
@@ -86,7 +87,7 @@ static void ProcessUtilitySlow(ParseState *pstate,
 							   QueryEnvironment *queryEnv,
 							   DestReceiver *dest,
 							   QueryCompletion *qc);
-static void ExecDropStmt(DropStmt *stmt, bool isTopLevel);
+static void ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel);
 
 /*
  * CommandIsReadOnly: is an executable query read-only?
@@ -986,7 +987,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 									   context, params, queryEnv,
 									   dest, qc);
 				else
-					ExecDropStmt(stmt, isTopLevel);
+					ExecDropStmt(pstate, stmt, isTopLevel);
 			}
 			break;
 
@@ -1086,6 +1087,154 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 	CommandCounterIncrement();
 }
 
+/*
+ * Log a DDL command for logical replication
+ * Some DDLs are only replicated in Database Level DDL replication
+ * Some can be replicated in Table Level DDL replication.
+ *
+ * Currently we focus on supporting Database Level DDL replication
+ */
+static void
+LogLogicalDDLCommand(Node *parsetree, const char *queryString)
+{
+	switch (nodeTag(parsetree))
+	{
+		/* Fisrtly, commands that are only supported in Database level DDL replication */
+		case T_CreateSchemaStmt:
+		case T_CreateStmt:
+		case T_CreateForeignTableStmt:
+		case T_AlterDomainStmt:
+		case T_DefineStmt:
+		case T_CompositeTypeStmt:
+		case T_CreateEnumStmt:
+		case T_CreateRangeStmt:
+		case T_AlterEnumStmt:
+		case T_ViewStmt:
+		case T_CreateFunctionStmt:
+		case T_AlterFunctionStmt:
+		case T_CreateTrigStmt:
+		case T_CreateDomainStmt:
+		case T_CreateCastStmt:
+		case T_CreateOpClassStmt:
+		case T_CreateOpFamilyStmt:
+		case T_AlterOpFamilyStmt:
+		case T_AlterOperatorStmt:
+		case T_AlterTypeStmt:
+		case T_GrantStmt:
+		case T_AlterCollationStmt:
+			/*
+			 * Log these stmt for logical replication if
+			 * there is any FOR ALL TABLES publication with pubddl_database on.
+			 * i.e. Database level DDL replication is on for some publication.
+			 */
+			if (ddl_need_xlog(InvalidOid, true, true))
+			{
+				bool transactional = true;
+				const char* prefix = "";
+				LogLogicalDDLMessage(prefix,
+									 GetUserId(),
+									 queryString,
+									 strlen(queryString),
+									 transactional);
+			}
+			break;
+
+		/*
+		 * Secondly, commands that may be allowed in Table level DDL replication.
+		 * These are currently handled in the later execution path of the command.
+		 * Because we need to get the relation id which readily available in later
+		 * code path.
+		 */
+		case T_AlterTableStmt:
+		case T_IndexStmt:
+		case T_RenameStmt: /* TODO */
+		case T_AlterOwnerStmt: /* TODO */
+			break;
+
+		/* DropStmt depends on the removeType */
+		case T_DropStmt:
+		{
+			DropStmt* stmt = (DropStmt*) parsetree;
+			switch (stmt->removeType)
+			{
+				/* Maybe allowed in Table level DDL replication, handled in later code path */
+				case OBJECT_INDEX:
+				case OBJECT_TABLE:
+					break;
+				/* Drop of sequence is by logical replication of sequences separately */
+				case OBJECT_SEQUENCE:
+					break;
+				/* Drop of other objects are allowed in Database level DDL replication only */
+				case OBJECT_VIEW:
+				case OBJECT_MATVIEW:
+				case OBJECT_FOREIGN_TABLE:
+				default:
+					/*
+					 * Log these DropStmt for logical replication if
+					 * there is any FOR ALL TABLES publication with pubddl_database on.
+					 * i.e. Database level DDL replication is on for some publication.
+					 */
+					if (ddl_need_xlog(InvalidOid, true, true))
+					{
+						bool transactional = true;
+						const char* prefix = "";
+						LogLogicalDDLMessage(prefix,
+											 GetUserId(),
+											 queryString,
+											 strlen(queryString),
+											 transactional);
+					}
+					break;
+			}
+		}
+		/*
+		 * Lastly, rule out DDLs we don't replicate yet in DDL replication
+		 * Some of these can be supported, we just need to investigate and run tests.
+		 */
+		case T_CreateExtensionStmt:
+		case T_AlterExtensionStmt:
+		case T_AlterExtensionContentsStmt:
+		case T_CreateFdwStmt:
+		case T_AlterFdwStmt:
+		case T_CreateForeignServerStmt:
+		case T_AlterForeignServerStmt:
+		case T_CreateUserMappingStmt:
+		case T_AlterUserMappingStmt:
+		case T_DropUserMappingStmt:
+		case T_ImportForeignSchemaStmt:
+		case T_RuleStmt:
+		case T_CreateSeqStmt:
+		case T_AlterSeqStmt:
+		case T_CreateTableAsStmt:
+		case T_RefreshMatViewStmt:
+		case T_CreatePLangStmt:
+		case T_CreateConversionStmt:
+		case T_CreateTransformStmt:
+		case T_AlterTSDictionaryStmt:
+		case T_AlterTSConfigurationStmt:
+		case T_AlterTableMoveAllStmt:
+		case T_AlterObjectDependsStmt:
+		case T_AlterObjectSchemaStmt:
+		case T_CommentStmt:
+		case T_DropOwnedStmt:
+		case T_AlterDefaultPrivilegesStmt:
+		case T_CreatePolicyStmt:
+		case T_AlterPolicyStmt:
+		case T_SecLabelStmt:
+		case T_CreateAmStmt:
+		case T_CreatePublicationStmt:
+		case T_AlterPublicationStmt:
+		case T_CreateSubscriptionStmt:
+		case T_AlterSubscriptionStmt:
+		case T_DropSubscriptionStmt:
+		case T_CreateStatsStmt:
+		case T_AlterStatsStmt:
+			break;
+		default:
+			break;
+	}
+}
+
 /*
  * The "Slow" variant of ProcessUtility should only receive statements
  * supported by the event triggers facility.  Therefore, we always
@@ -1118,6 +1267,13 @@ ProcessUtilitySlow(ParseState *pstate,
 		if (isCompleteQuery)
 			EventTriggerDDLCommandStart(parsetree);
 
+		/*
+		 * Consider logging the DDL command if logical logging is enabled and this is
+		 * a top level query.
+		 */
+		if (XLogLogicalInfoActive() && isTopLevel)
+			LogLogicalDDLCommand(parsetree, queryString);
+
 		switch (nodeTag(parsetree))
 		{
 				/*
@@ -1320,6 +1476,23 @@ ProcessUtilitySlow(ParseState *pstate,
 						EventTriggerAlterTableStart(parsetree);
 						EventTriggerAlterTableRelid(relid);
 
+						/*
+						 * Log the ALTER TABLE command if
+						 * There is any publication with database level ddl on or
+						 * this TABLE belongs to any publication with table level ddl on
+						 */
+						if (XLogLogicalInfoActive() &&
+							ddl_need_xlog(relid, false, isTopLevel))
+						{
+							bool transactional = true;
+							const char* prefix = "";
+							LogLogicalDDLMessage(prefix,
+												 GetUserId(),
+												 queryString,
+												 strlen(queryString),
+												 transactional);
+						}
+
 						/* ... and do it */
 						AlterTable(atstmt, lockmode, &atcontext);
 
@@ -1538,6 +1711,24 @@ ProcessUtilitySlow(ParseState *pstate,
 
 					/* ... and do it */
 					EventTriggerAlterTableStart(parsetree);
+
+					/*
+					 * Log CREATE INDEX cmd for logical replication if
+					 * there is any publication with database level ddl on or
+					 * this TABLE belongs to any publication with table level ddl on.
+					 */
+					if (XLogLogicalInfoActive() &&
+						ddl_need_xlog(relid, false, isTopLevel))
+					{
+						bool transactional = true;
+						const char* prefix = "";
+						LogLogicalDDLMessage(prefix,
+											 GetUserId(),
+											 queryString,
+											 strlen(queryString),
+											 transactional);
+					}
+
 					address =
 						DefineIndex(relid,	/* OID of heap relation */
 									stmt,
@@ -1754,7 +1945,7 @@ ProcessUtilitySlow(ParseState *pstate,
 				break;
 
 			case T_DropStmt:
-				ExecDropStmt((DropStmt *) parsetree, isTopLevel);
+				ExecDropStmt(pstate, (DropStmt *) parsetree, isTopLevel);
 				/* no commands stashed for DROP */
 				commandCollected = true;
 				break;
@@ -1975,7 +2166,7 @@ ProcessUtilityForAlterTable(Node *stmt, AlterTableUtilityContext *context)
  * Dispatch function for DropStmt
  */
 static void
-ExecDropStmt(DropStmt *stmt, bool isTopLevel)
+ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel)
 {
 	switch (stmt->removeType)
 	{
@@ -1990,7 +2181,7 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel)
 		case OBJECT_VIEW:
 		case OBJECT_MATVIEW:
 		case OBJECT_FOREIGN_TABLE:
-			RemoveRelations(stmt);
+			RemoveRelations(pstate, stmt, isTopLevel);
 			break;
 		default:
 			RemoveObjects(stmt);
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6a4ebd1310..c38c163f28 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -27,6 +27,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/message.h"
+#include "replication/ddlmessage.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index cf8b6d4819..5ffdc16cca 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, logicalddlmsg_decode)
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 5d4037f26e..68781365de 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -18,6 +18,7 @@
 #include "catalog/dependency.h"
 #include "catalog/objectaddress.h"
 #include "nodes/parsenodes.h"
+#include "parser/parse_node.h"
 #include "storage/lock.h"
 #include "utils/relcache.h"
 
@@ -27,7 +28,7 @@ struct AlterTableUtilityContext;	/* avoid including tcop/utility.h here */
 extern ObjectAddress DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 									ObjectAddress *typaddress, const char *queryString);
 
-extern void RemoveRelations(DropStmt *drop);
+extern void RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel);
 
 extern Oid	AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode);
 
diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h
new file mode 100644
index 0000000000..1e8ef22296
--- /dev/null
+++ b/src/include/replication/ddlmessage.h
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ * ddlmessage.h
+ *	   Exports from replication/logical/ddlmessage.c
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/ddlmessage.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_DDL_MESSAGE_H
+#define PG_LOGICAL_DDL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding DDL message wal record.
+ */
+typedef struct xl_logical_ddl_message
+{
+	Oid			dbId;			/* database Oid emitted from */
+	bool		transactional;	/* is message transactional? */
+	Size		prefix_size;	/* length of prefix */
+	Size		role_size;      /* length of the role that executes the DDL command */
+	Size		search_path_size; /* length of the search path */
+	Size		message_size;	  /* size of the message */
+	/*
+	 * payload, including null-terminated prefix of length prefix_size
+	 * and null-terminated role of length role_size
+	 * and null-terminated search_path of length search_path_size
+	 */
+	char		message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_ddl_message;
+
+#define SizeOfLogicalDDLMessage	(offsetof(xl_logical_ddl_message, message))
+
+extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *ddl_message,
+									   size_t size, bool transactional);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_DDL_MESSAGE	0x00
+void		logicalddlmsg_redo(XLogReaderState *record);
+void		logicalddlmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalddlmsg_identify(uint8 info);
+
+#endif							/* PG_LOGICAL_DDL_MESSAGE_H */
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 8e07bb7409..ea0f7e5cb9 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -28,6 +28,7 @@ extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 extern void	LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf76c..f0fd1ea895 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 										Size message_size,
 										const char *message);
 
+/*
+ * Called for the logical decoding DDL messages.
+ */
+typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
+										   ReorderBufferTXN *txn,
+										   XLogRecPtr message_lsn,
+										   bool transactional,
+										   const char *prefix,
+										   const char *role,
+										   const char *search_path,
+										   Size message_size,
+										   const char *message);
+
 /*
  * Called for the generic logical decoding sequences.
  */
@@ -211,6 +224,20 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
 											  Size message_size,
 											  const char *message);
 
+/*
+ * Callback for streaming logical decoding DDL messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
+												 ReorderBufferTXN *txn,
+												 XLogRecPtr message_lsn,
+												 bool transactional,
+												 const char *prefix,
+												 const char *role,
+												 const char *search_path,
+												 Size message_size,
+												 const char *message);
+
 /*
  * Called for the streaming generic logical decoding sequences from in-progress
  * transactions.
@@ -244,6 +271,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeDDLMessageCB ddlmessage_cb;
 	LogicalDecodeSequenceCB sequence_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
@@ -263,6 +291,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeStreamCommitCB stream_commit_cb;
 	LogicalDecodeStreamChangeCB stream_change_cb;
 	LogicalDecodeStreamMessageCB stream_message_cb;
+	LogicalDecodeStreamDDLMessageCB stream_ddlmessage_cb;
 	LogicalDecodeStreamSequenceCB stream_sequence_cb;
 	LogicalDecodeStreamTruncateCB stream_truncate_cb;
 } OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0bcc150b33..0819361d25 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -56,6 +56,7 @@ typedef enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INSERT,
 	REORDER_BUFFER_CHANGE_UPDATE,
 	REORDER_BUFFER_CHANGE_DELETE,
+	REORDER_BUFFER_CHANGE_DDLMESSAGE,
 	REORDER_BUFFER_CHANGE_MESSAGE,
 	REORDER_BUFFER_CHANGE_INVALIDATION,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
@@ -131,6 +132,16 @@ typedef struct ReorderBufferChange
 			char	   *message;
 		}			msg;
 
+		/* DDL Message. */
+		struct
+		{
+			char	   *prefix;
+			char	   *role;
+			char	   *search_path;
+			Size		message_size;
+			char	   *message;
+		}			ddlmsg;
+
 		/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
 		Snapshot	snapshot;
 
@@ -438,6 +449,17 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* DDL message callback signature */
+typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb,
+										   ReorderBufferTXN *txn,
+										   XLogRecPtr message_lsn,
+										   bool transactional,
+										   const char *prefix,
+										   const char *role,
+										   const char *search_path,
+										   Size sz,
+										   const char *message);
+
 /* sequence callback signature */
 typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
 										 ReorderBufferTXN *txn,
@@ -513,6 +535,18 @@ typedef void (*ReorderBufferStreamMessageCB) (
 											  const char *prefix, Size sz,
 											  const char *message);
 
+/* stream DDL message callback signature */
+typedef void (*ReorderBufferStreamDDLMessageCB) (
+												 ReorderBuffer *rb,
+												 ReorderBufferTXN *txn,
+												 XLogRecPtr message_lsn,
+												 bool transactional,
+												 const char *prefix,
+												 const char *role,
+												 const char *search_path,
+												 Size sz,
+												 const char *message);
+
 /* stream sequence callback signature */
 typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb,
 											   ReorderBufferTXN *txn,
@@ -573,6 +607,7 @@ struct ReorderBuffer
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
+	ReorderBufferDDLMessageCB ddlmessage;
 	ReorderBufferSequenceCB sequence;
 
 	/*
@@ -593,6 +628,7 @@ struct ReorderBuffer
 	ReorderBufferStreamCommitCB stream_commit;
 	ReorderBufferStreamChangeCB stream_change;
 	ReorderBufferStreamMessageCB stream_message;
+	ReorderBufferStreamDDLMessageCB stream_ddlmessage;
 	ReorderBufferStreamSequenceCB stream_sequence;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
@@ -669,6 +705,9 @@ void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
 void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+void		ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
+										 bool transactional, const char *prefix, const char *role,
+										 const char *search_path, Size message_size, const char *message);
 void		ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
 									   RelFileNode rnode, bool transactional, bool created,


  [application/octet-stream] 0003-pgoutput-worker-ddl_replication.patch (33.7K, 4-0003-pgoutput-worker-ddl_replication.patch)
  download | inline diff:
commit 1ada981927a841d433085a42e0c6a7f2cf9cde8d
Author: Zheng (Zane) Li <zhelli@amazon.com>
Date:   Fri Mar 18 00:06:40 2022 +0000

    0003-pgoutput-worker-ddl_replication.patch
    4. Integration with pgoutput
    Supports sending and receiving the DDL message using the logical
    replication wire protocol. A new LogicalRepMsgType is introduced
    for this purpose: LOGICAL_REP_MSG_DDLMESSAGE = 'L'.
    
    5. Logical replication worker change
    Supports execution of the DDL command in the original user role and
    search_path. For any new table created this way, we also set its
    srsubstate in the pg_subscription_rel catalog to
    SUBREL_STATE_INIT, So that DML replication can progress on this
    new table without manually running
    "ALTER SUBSCRIPTION ... REFRESH PUBLICATION".
    
    6. TAP test
    A new TAP test 030_rep_ddl.pl is added. We mainly focused on
    testing the happy path of database level replication so far.
    Corner case DDLs and table level DDL replication are still to be
    carefully tested.

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index c9b0eeefd7..762b897546 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -626,8 +626,8 @@ logicalrep_read_truncate(StringInfo in,
  */
 void
 logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
-						 bool transactional, const char *prefix, Size sz,
-						 const char *message)
+						 bool transactional, const char *prefix,
+						 Size sz, const char *message)
 {
 	uint8		flags = 0;
 
@@ -648,6 +648,63 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 	pq_sendbytes(out, message, sz);
 }
 
+/*
+ * Read DDL MESSAGE from stream
+ */
+const char *
+logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn,
+						   const char **prefix,
+						   const char **role,
+						   const char **search_path,
+						   bool *transactional,
+						   Size *sz)
+{
+	uint8 flags;
+	const char *msg;
+
+	//TODO double check when do we need to get TransactionId.
+
+	flags = pq_getmsgint(in, 1);
+	*transactional = (flags & MESSAGE_TRANSACTIONAL) > 0;
+	*lsn = pq_getmsgint64(in);
+	*prefix = pq_getmsgstring(in);
+	*role = pq_getmsgstring(in);
+	*search_path = pq_getmsgstring(in);
+	*sz = pq_getmsgint(in, 4);
+	msg = pq_getmsgbytes(in, *sz);
+
+	return msg;
+}
+
+/*
+ * Write DDL MESSAGE to stream
+ */
+void
+logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+						 bool transactional, const char *prefix, const char *role,
+						 const char *search_path, Size sz, const char *message)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_DDLMESSAGE);
+
+	/* encode and send message flags */
+	if (transactional)
+		flags |= MESSAGE_TRANSACTIONAL;
+
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
+		pq_sendint32(out, xid);
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+	pq_sendstring(out, prefix);
+	pq_sendstring(out, role);
+	pq_sendstring(out, search_path);
+	pq_sendint32(out, sz);
+	pq_sendbytes(out, message, sz);
+}
+
 /*
  * Write relation description to the output stream.
  */
@@ -1185,6 +1242,8 @@ logicalrep_message_type(LogicalRepMsgType action)
 			return "TYPE";
 		case LOGICAL_REP_MSG_MESSAGE:
 			return "MESSAGE";
+		case LOGICAL_REP_MSG_DDLMESSAGE:
+			return "DDL";
 		case LOGICAL_REP_MSG_BEGIN_PREPARE:
 			return "BEGIN PREPARE";
 		case LOGICAL_REP_MSG_PREPARE:
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 03e069c7cd..26fa58973f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -155,6 +155,7 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "optimizer/optimizer.h"
+#include "parser/analyze.h"
 #include "pgstat.h"
 #include "postmaster/bgworker.h"
 #include "postmaster/interrupt.h"
@@ -179,6 +180,8 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "tcop/tcopprot.h"
+#include "tcop/pquery.h"
+#include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -329,6 +332,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
 									   TupleTableSlot *remoteslot,
 									   LogicalRepTupleData *newtup,
 									   CmdType operation);
+static void apply_execute_sql_command(const char *cmdstr,
+									  const char* role,
+									  const char* search_path,
+									  bool isTopLevel);
 
 /* Compute GID for two_phase transactions */
 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
@@ -2360,6 +2367,259 @@ apply_handle_truncate(StringInfo s)
 	end_replication_step();
 }
 
+/*
+ * Handle generic messages.
+ */
+static void
+apply_handle_ddlmessage(StringInfo s)
+{
+	XLogRecPtr lsn;
+	bool transactional;
+	Size sz;
+	const char *prefix;
+	const char *role;
+	const char *search_path;
+	const char *msg;
+
+	msg = logicalrep_read_ddlmessage(s, &lsn, &prefix, &role, &search_path, &transactional, &sz);
+
+	apply_execute_sql_command(msg, role, search_path, true);
+}
+
+/*
+ * Add context to the errors produced by apply_execute_sql_command().
+ */
+static void
+execute_sql_command_error_cb(void *arg)
+{
+	errcontext("during execution of SQL statement: %s", (char *) arg);
+}
+
+/*
+ * Execute an SQL command. This can be multiple queries.
+ * This is modified based on pglogical_execute_sql_command().
+ */
+static void
+apply_execute_sql_command(const char *cmdstr, const char *role, const char *search_path,
+						  bool isTopLevel)
+{
+	const char *save_debug_query_string = debug_query_string;
+	List	   *parsetree_list;
+	ListCell   *parsetree_item;
+	MemoryContext oldcontext;
+	ErrorContextCallback errcallback;
+	int			save_nestlevel;
+
+	/*
+	 * Switch to appropriate context for constructing parsetrees.
+	 */
+	oldcontext = MemoryContextSwitchTo(ApplyMessageContext);
+	begin_replication_step();
+
+	/*
+	 * Set the current role to the user that executed the command on the
+	 * publication server.
+	 * Set the current search_path to the search_path on the publication
+	 * server when the command was executed.
+	 */
+	save_nestlevel = NewGUCNestLevel();
+	SetConfigOption("role", role, PGC_INTERNAL, PGC_S_OVERRIDE);
+	SetConfigOption("search_path", search_path, PGC_INTERNAL, PGC_S_OVERRIDE);
+
+	errcallback.callback = execute_sql_command_error_cb;
+	errcallback.arg = (char *) cmdstr;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	debug_query_string = cmdstr;
+
+	parsetree_list = pg_parse_query(cmdstr);
+
+	/*
+	 * Do a limited amount of safety checking against CONCURRENTLY commands
+	 * executed in situations where they aren't allowed. The sender side should
+	 * provide protection, but better be safe than sorry.
+	 */
+	isTopLevel = isTopLevel && (list_length(parsetree_list) == 1);
+
+	/*
+	 * Switch back to transaction context to enter the loop.
+	 */
+	MemoryContextSwitchTo(oldcontext);
+
+	foreach(parsetree_item, parsetree_list)
+	{
+		List	   *plantree_list;
+		List	   *querytree_list;
+		RawStmt	   *command = (RawStmt *) lfirst(parsetree_item);
+		CommandTag	commandTag;
+		MemoryContext per_parsetree_context = NULL;
+		Portal		portal;
+		DestReceiver *receiver;
+		bool		 snapshot_set = false;
+		char 		 *schemaname = NULL; /* For CREATE TABLE stmt only */
+		char		 *relname = NULL; /* For CREATE TABLE stmt only */
+
+		commandTag = CreateCommandTag((Node *)command);
+
+		/*
+		 * Remember the schemaname and relname if it's a CREATE TABLE stmt
+		 * because we will need them for some post-processing after we
+		 * execute the stmt. At that point, CreateStmt may have been freeed up.
+		 */
+		if (commandTag == CMDTAG_CREATE_TABLE)
+		{
+			CreateStmt *cstmt = (CreateStmt *)command->stmt;
+			RangeVar *rv = cstmt->relation;
+			schemaname = rv->schemaname;
+			relname = rv->relname;
+		}
+
+		/*
+		 * Set up a snapshot if parse analysis/planning will need one.
+		 */
+		if (analyze_requires_snapshot(command))
+		{
+			PushActiveSnapshot(GetTransactionSnapshot());
+			snapshot_set = true;
+		}
+
+		/*
+		 * OK to analyze, rewrite, and plan this query.
+		 *
+		 * Switch to appropriate context for constructing query and plan trees
+		 * (these can't be in the transaction context, as that will get reset
+		 * when the command is COMMIT/ROLLBACK).  If we have multiple
+		 * parsetrees, we use a separate context for each one, so that we can
+		 * free that memory before moving on to the next one.  But for the
+		 * last (or only) parsetree, just use MessageContext, which will be
+		 * reset shortly after completion anyway.  In event of an error, the
+		 * per_parsetree_context will be deleted when MessageContext is reset.
+		 */
+		if (lnext(parsetree_list, parsetree_item) != NULL)
+		{
+			per_parsetree_context =
+				AllocSetContextCreate(MessageContext,
+									  "per-parsetree message context",
+									  ALLOCSET_DEFAULT_SIZES);
+			oldcontext = MemoryContextSwitchTo(per_parsetree_context);
+		}
+		else
+			oldcontext = MemoryContextSwitchTo(ApplyMessageContext);
+
+		querytree_list = pg_analyze_and_rewrite_fixedparams(
+			command,
+			cmdstr,
+			NULL, 0, NULL);
+
+		plantree_list = pg_plan_queries(
+			querytree_list, cmdstr, 0, NULL);
+
+		/*
+		 * Done with the snapshot used for parsing/planning.
+		 *
+		 * While it looks promising to reuse the same snapshot for query
+		 * execution (at least for simple protocol), unfortunately it causes
+		 * execution to use a snapshot that has been acquired before locking
+		 * any of the tables mentioned in the query.  This creates user-
+		 * visible anomalies, so refrain.  Refer to
+		 * https://postgr.es/m/flat/5075D8DF.6050500@fuzzy.cz for details.
+		 */
+		if (snapshot_set)
+			PopActiveSnapshot();
+
+		portal = CreatePortal("logical replication", true, true);
+
+		/*
+		 * We don't have to copy anything into the portal, because everything
+		 * we are passing here is in MessageContext or the
+		 * per_parsetree_context, and so will outlive the portal anyway.
+		 */
+		PortalDefineQuery(portal,
+						  NULL,
+						  cmdstr,
+						  commandTag,
+						  plantree_list,
+						  NULL);
+
+		/*
+		 * Start the portal.  No parameters here.
+		 */
+		PortalStart(portal, NULL, 0, InvalidSnapshot);
+
+		/* DestNone for logical replication */
+		receiver = CreateDestReceiver(DestNone);
+
+		/*
+		 * Switch back to transaction context for execution.
+		 */
+		MemoryContextSwitchTo(oldcontext);
+
+		(void) PortalRun(portal,
+						 FETCH_ALL,
+						 isTopLevel,
+						 true,
+						 receiver,
+						 receiver,
+						 NULL);
+		(*receiver->rDestroy) (receiver);
+
+		PortalDrop(portal, false);
+
+		CommandCounterIncrement();
+
+		/*
+		 * Table created by DDL replication (database level) is automatically
+		 * added to the subscription here.
+		 *
+		 * Call AddSubscriptionRelState for CREATE TABEL command to set
+		 * the relstate to SUBREL_STATE_INIT so DML changes on this
+		 * new table can be replicated without having to manually run
+		 * "alter subscription ... refresh publication"
+		 */
+		if (commandTag == CMDTAG_CREATE_TABLE)
+		{
+			Oid relid;
+			Oid relnamespace = InvalidOid;
+
+			if (schemaname != NULL)
+				relnamespace = get_namespace_oid(schemaname, false);
+			if (relnamespace != InvalidOid)
+				relid = get_relname_relid(relname, relnamespace);
+			else
+			{
+				/*
+				 * Try to resolve unqualified relname.
+				 * Notice we have set the search_path to the original search_path on the publisher
+				 * at the beginning of this function.
+				 */
+				relid = RelnameGetRelid(relname);
+			}
+
+			if (relid != InvalidOid)
+			{
+				AddSubscriptionRelState(MySubscription->oid, relid,
+										SUBREL_STATE_INIT,
+										InvalidXLogRecPtr);
+				ereport(DEBUG1,
+						(errmsg_internal("table \"%s\" added to subscription \"%s\"",
+										 relname, MySubscription->name)));
+			}
+		}
+	}
+
+	/*
+	 * Restore the GUC variables we set above.
+	 */
+	AtEOXact_GUC(true, save_nestlevel);
+
+	/* protect against stack resets during CONCURRENTLY processing */
+	if (error_context_stack == &errcallback)
+		error_context_stack = errcallback.previous;
+
+	debug_query_string = save_debug_query_string;
+	end_replication_step();
+}
 
 /*
  * Logical replication protocol message dispatcher.
@@ -2425,6 +2685,10 @@ apply_dispatch(StringInfo s)
 			 */
 			break;
 
+		case LOGICAL_REP_MSG_DDLMESSAGE:
+			apply_handle_ddlmessage(s);
+			break;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			break;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 5fddab3a3d..ae9c92af2c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -53,6 +53,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							 bool transactional, const char *prefix,
 							 Size sz, const char *message);
+static void pgoutput_ddlmessage(LogicalDecodingContext *ctx,
+								ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+								bool transactional, const char *prefix, const char *role,
+								const char *search_path, Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -208,6 +212,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
 	cb->message_cb = pgoutput_message;
+	cb->ddlmessage_cb = pgoutput_ddlmessage;
 	cb->commit_cb = pgoutput_commit_txn;
 
 	cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -224,6 +229,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_commit_cb = pgoutput_stream_commit;
 	cb->stream_change_cb = pgoutput_change;
 	cb->stream_message_cb = pgoutput_message;
+	cb->stream_ddlmessage_cb = pgoutput_ddlmessage;
 	cb->stream_truncate_cb = pgoutput_truncate;
 	/* transaction streaming - two-phase commit */
 	cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -1413,8 +1419,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 static void
 pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-				 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
-				 const char *message)
+				 XLogRecPtr message_lsn, bool transactional,
+				 const char *prefix, Size sz, const char *message)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
@@ -1440,6 +1446,57 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+				 XLogRecPtr message_lsn, bool transactional,
+				 const char *prefix, const char * role,
+				 const char *search_path, Size sz, const char *message)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	TransactionId xid = InvalidTransactionId;
+	ListCell *lc;
+
+	/* Reload publications if needed before use. */
+	if (!publications_valid)
+	{
+		MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+		if (data->publications)
+			list_free_deep(data->publications);
+
+		data->publications = LoadPublications(data->publication_names);
+		MemoryContextSwitchTo(oldctx);
+		publications_valid = true;
+	}
+
+	/* Check if ddl replication is turned on for the publications */
+	foreach(lc, data->publications)
+	{
+		Publication *pub = (Publication *) lfirst(lc);
+		/* TODO need to check relid for table level DDLs */
+		if (!pub->pubactions.pubddl_database && !pub->pubactions.pubddl_table)
+			return;
+	}
+
+	/*
+	 * Remember the xid for the message in streaming mode. See
+	 * pgoutput_change.
+	 */
+	if (in_streaming)
+		xid = txn->xid;
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_ddlmessage(ctx->out,
+							 xid,
+							 message_lsn,
+							 transactional,
+							 prefix,
+							 role,
+							 search_path,
+							 sz,
+							 message);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
@@ -1725,7 +1782,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
-			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+			entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+			entry->pubactions.pubddl_database =
+			entry->pubactions.pubddl_table = false;
 		entry->new_slot = NULL;
 		entry->old_slot = NULL;
 		memset(entry->exprstate, 0, sizeof(entry->exprstate));
@@ -1780,6 +1839,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->pubactions.pubupdate = false;
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
+		entry->pubactions.pubddl_database = false;
+		entry->pubactions.pubddl_table = false;
 
 		/*
 		 * Tuple slots cleanups. (Will be rebuilt later if needed).
@@ -1889,6 +1950,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+				entry->pubactions.pubddl_database |= pub->pubactions.pubddl_database;
+				entry->pubactions.pubddl_table |= pub->pubactions.pubddl_table;
 
 				/*
 				 * We want to publish the changes as the top-most ancestor
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 4d2c881644..862ed467a6 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
 	LOGICAL_REP_MSG_MESSAGE = 'M',
+	LOGICAL_REP_MSG_DDLMESSAGE = 'L',
 	LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
 	LOGICAL_REP_MSG_PREPARE = 'P',
 	LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -229,7 +230,14 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
-									 bool transactional, const char *prefix, Size sz, const char *message);
+									 bool transactional, const char *prefix,
+									 Size sz, const char *message);
+extern void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+										bool transactional, const char *prefix, const char *role,
+										const char *search_path, Size sz, const char *message);
+extern const char *logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix,
+											  const char **role, const char **search_path,
+											  bool *transactional, Size *sz);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index cf61fc1e0f..698c5114e6 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -33,7 +33,7 @@ $node_subscriber->safe_psql('postgres',
 # Setup logical replication
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION tap_pub FOR ALL TABLES");
+	"CREATE PUBLICATION tap_pub FOR ALL TABLES WITH (ddl = '')");
 
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl
index c924ff35f7..6c9b055eb0 100644
--- a/src/test/subscription/t/006_rewrite.pl
+++ b/src/test/subscription/t/006_rewrite.pl
@@ -23,7 +23,7 @@ $node_subscriber->safe_psql('postgres', $ddl);
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION mypub FOR ALL TABLES;");
+	"CREATE PUBLICATION mypub FOR ALL TABLES WITH (ddl = '');");
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
 );
diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl
index 67b4026afa..2902e6fc34 100644
--- a/src/test/subscription/t/008_diff_schema.pl
+++ b/src/test/subscription/t/008_diff_schema.pl
@@ -32,7 +32,7 @@ $node_subscriber->safe_psql('postgres',
 # Setup logical replication
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION tap_pub FOR ALL TABLES");
+	"CREATE PUBLICATION tap_pub FOR ALL TABLES WITH (ddl = '')");
 
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
diff --git a/src/test/subscription/t/009_matviews.pl b/src/test/subscription/t/009_matviews.pl
index 1ce696d4a4..6c586877a1 100644
--- a/src/test/subscription/t/009_matviews.pl
+++ b/src/test/subscription/t/009_matviews.pl
@@ -19,7 +19,7 @@ $node_subscriber->start;
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION mypub FOR ALL TABLES;");
+	"CREATE PUBLICATION mypub FOR ALL TABLES WITH (ddl = '');");
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
 );
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 66e63e755e..de967eaaf2 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -25,9 +25,9 @@ $node_subscriber2->start;
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 
 # publisher
-$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 WITH (ddl = '')");
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION pub_all FOR ALL TABLES");
+	"CREATE PUBLICATION pub_all FOR ALL TABLES WITH (ddl = '')");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
 $node_publisher->safe_psql('postgres',
@@ -424,12 +424,12 @@ $node_publisher->safe_psql('postgres',
 # and child tables are present but changes will be replicated via the parent's
 # identity and only once.
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)"
+	"CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true, ddl = '')"
 );
 
 # for tab4, we publish changes through the "middle" partitioned table
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)"
+	"CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true, ddl = '')"
 );
 
 # prepare data for the initial sync
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
new file mode 100644
index 0000000000..c88c4ea1c0
--- /dev/null
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -0,0 +1,237 @@
+
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Regression tests for logical replication of DDLs
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'autovacuum = off');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', 'autovacuum = off');
+$node_subscriber->start;
+
+my $node_subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2');
+$node_subscriber2->init(allows_streaming => 'logical');
+$node_subscriber2->append_conf('postgresql.conf', 'autovacuum = off');
+$node_subscriber2->start;
+
+my $ddl = "CREATE TABLE test_rep(id int primary key, name varchar);";
+$node_publisher->safe_psql('postgres', $ddl);
+$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (1, 'data1');");
+$node_subscriber->safe_psql('postgres', $ddl);
+$node_subscriber2->safe_psql('postgres', $ddl);
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# mypub has pubddl_database on
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION mypub FOR ALL TABLES;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
+);
+# mypub2 has pubddl_database off
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION mypub2 FOR ALL TABLES with (ddl = '');");
+$node_subscriber2->safe_psql('postgres',
+	"CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' PUBLICATION mypub2;"
+);
+
+$node_publisher->wait_for_catchup('mysub');
+
+# Test simple CREATE TABLE command is replicated to subscriber
+# Test smae simple CREATE TABLE command is not replicated to subscriber2 (ddl off)
+# Test ALTER TABLE command is replicated on table test_rep
+# Test CREATE INDEX is replicated to subscriber
+# Test CREATE FUNCTION command is replicated to subscriber
+$node_publisher->safe_psql('postgres', "CREATE TABLE t1 (a int, b varchar);");
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD c3 int;");
+$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (2, 'data2', 2);");
+$node_publisher->safe_psql('postgres', "CREATE INDEX nameindex on test_rep (name)");
+$node_publisher->safe_psql('postgres', qq{CREATE OR REPLACE FUNCTION totalRecords()
+RETURNS integer AS \$total\$
+declare
+	total integer;
+BEGIN
+   SELECT count(*) into total FROM test_rep;
+   RETURN total;
+END;
+\$total\$ LANGUAGE plpgsql;});
+
+$node_publisher->wait_for_catchup('mysub');
+
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t1");
+is($result, qq(0), 'CREATE of t1 replicated to subscriber');
+$result = $node_subscriber2->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't1';");
+is($result, qq(0), 'CREATE of t1 is not replicated to subscriber2');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_rep WHERE c3 =2;");
+is($result, qq(1), 'ALTER test_rep ADD replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_class where relname = 'nameindex'");
+is($result, qq(1), 'CREATE INDEX nameindex replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT totalRecords();");
+is($result, qq(2), 'CREATE of function totalRecords replicated to subscriber');
+$result = $node_subscriber2->safe_psql('postgres', "SELECT count(*) FROM pg_proc where proname = 'totalrecords';");
+is($result, qq(0), 'CREATE FUNCTION totalrecords is not replicated to subscriber2');
+
+# Test ALTER TABLE DROP
+# Test DROP INDEX
+# Test DROP FUNCTION
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep DROP c3;");
+$node_publisher->safe_psql('postgres', "DELETE FROM test_rep where id = 2;");
+$node_publisher->safe_psql('postgres', "DROP INDEX nameindex;");
+$node_publisher->safe_psql('postgres', "DROP FUNCTION totalRecords;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep;");
+is($result, qq(1), 'ALTER test_rep DROP replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_class where relname = 'nameindex'");
+is($result, qq(0), 'DROP INDEX nameindex replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_proc where proname = 'totalrecords';");
+is($result, qq(0), 'DROP FUNCTION totalrecords replicated');
+
+
+# TODO figure out how to set ON_ERROR_STOP = 0 in this test
+# Test failed CREATE/ALTER TABLE on publisher doesn't break replication
+# Table t1 already exits so expect the command to fail
+#$node_publisher->safe_psql('postgres', "CREATE TABLE t1 (a int, b varchar);");
+#$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep DROP c3;");
+#$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (103, 'data103', 1013);");
+
+#$node_publisher->wait_for_catchup('mysub');
+# Verify replication still works
+#$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep;");
+#is($result, qq(1), 'DELETE from test_rep replicated');
+
+# Test DDLs inside txn block
+$node_publisher->safe_psql(
+	'postgres', q{
+BEGIN;
+CREATE TABLE t2 (a int, b varchar);
+ALTER TABLE test_rep ADD c3 int;
+INSERT INTO test_rep VALUES (3, 'data3', 3);
+COMMIT;});
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t2;");
+is($result, qq(0), 'CREATE t2 replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_rep;");
+is($result, qq(2), 'ALTER test_rep ADD replicated');
+
+# Test toggling pubddl_database option off
+$node_publisher->safe_psql('postgres', "ALTER PUBLICATION mypub set (ddl = '');");
+$result = $node_publisher->safe_psql('postgres', "SELECT pubddl_database, pubddl_table from pg_publication where pubname = 'mypub';");
+is($result, qq(f|f), 'pubddl_database turned off on mypub');
+$node_publisher->safe_psql('postgres', "CREATE TABLE t3 (a int, b varchar);");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't3';");
+is($result, qq(0), 'CREATE t3 is not replicated');
+
+# Test toggling pubddl_database option on
+$node_publisher->safe_psql('postgres', "ALTER PUBLICATION mypub set (ddl = 'database');");
+$result = $node_publisher->safe_psql('postgres', "SELECT pubddl_database, pubddl_table from pg_publication where pubname = 'mypub';");
+is($result, qq(t|t), 'pubddl_database turned on on mypub');
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE t4 (a int, b varchar);");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't4';");
+is($result, qq(1), 'CREATE t4 is replicated');
+
+# Test DML changes on the new table t4 are replicated
+$node_publisher->safe_psql('postgres', "INSERT INTO t4 values (1, 'a')");
+$node_publisher->safe_psql('postgres', "INSERT INTO t4 values (2, 'b')");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t4;");
+is($result, qq(2), 'DML Changes to t4 are replicated');
+
+# A somewhat complicated test in plpgsql block with trigger
+$node_publisher->safe_psql(
+	'postgres', q{
+BEGIN;
+CREATE TABLE foo (a int);
+CREATE INDEX foo_idx ON foo (a);
+ALTER TABLE foo ADD COLUMN b timestamptz;
+CREATE FUNCTION foo_ts()
+RETURNS trigger AS $$
+BEGIN
+NEW.b := current_timestamp;
+RETURN NEW;
+END;
+$$
+LANGUAGE plpgsql;
+CREATE TRIGGER foo_ts BEFORE INSERT OR UPDATE ON foo
+FOR EACH ROW EXECUTE FUNCTION foo_ts();
+INSERT INTO foo VALUES (1);
+COMMIT;});
+$result = $node_publisher->safe_psql('postgres', "SELECT b from foo where a = 1;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+my $result_sub = $node_subscriber->safe_psql('postgres', "SELECT b from foo where a = 1;");
+is($result, qq($result_sub), 'timestamp of insert matches');
+
+# Test CREATE SCHEMA stmt is replicated
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA s1");
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = 's1';");
+is($result, qq(1), 'CREATE SCHEMA s1 is replicated');
+
+# Test CREATE TABLE in new schema s1 followed by insert
+$node_publisher->safe_psql('postgres', "CREATE TABLE s1.t1 (a int, b varchar);");
+$node_publisher->safe_psql('postgres', "INSERT INTO s1.t1 VALUES (1, 'a');");
+$node_publisher->safe_psql('postgres', "INSERT INTO s1.t1 VALUES (2, 'b');");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.t1;");
+is($result, qq(2), 'CREATE TABLE s1.t1 is replicated');
+
+# Test replication works as expected with mismatched search_path on publisher and subscriber
+$node_publisher->append_conf('postgresql.conf', 'search_path = \'s1, public\'');
+$node_publisher->restart;
+# CREATE unqualified table t2, it is s1.t2 under the modified search_path
+$node_publisher->safe_psql('postgres', "CREATE TABLE t2 (a int, b varchar);");
+$node_publisher->safe_psql('postgres', "INSERT INTO t2 VALUES (1, 'a');");
+$node_publisher->safe_psql('postgres', "INSERT INTO t2 VALUES (2, 'b');");
+$node_publisher->safe_psql('postgres', "INSERT INTO t2 VALUES (3, 'c');");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.t2;");
+is($result, qq(3), 'CREATE TABLE s1.t2 is replicated');
+
+# Test owner of new table on subscriber matches the owner on publisher
+$node_publisher->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;");
+
+$node_subscriber->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;");
+
+$node_publisher->safe_psql('postgres', "SET SESSION AUTHORIZATION 'ddl_replication_user'; CREATE TABLE t5 (a int, b varchar);");
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT tableowner from pg_catalog.pg_tables where tablename = 't5';");
+is($result, qq(ddl_replication_user), 'Owner of t5 is correct');
+
+#TODO TEST certain DDLs are not replicated
+
+pass "DDL replication tests passed!";
+
+$node_subscriber->stop;
+$node_subscriber2->stop;
+$node_publisher->stop;
+
+done_testing();


reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: pgsql-general@postgresql.org
  Cc: zhengli10@gmail.com, japinli@hotmail.com, alvherre@alvh.no-ip.org, dilipbalaut@gmail.com, rajesh.rs0541@gmail.com, pgsql-hackers@lists.postgresql.org
  Subject: Re: Support logical replication of DDLs
  In-Reply-To: <CAAD30UKTp87+kvGZYL3M2Suxq=WEvFUG24ZRT0yT9rqdkP=uMA@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox