From 9dc28ae310ae0bd8c7668a08b601931cc4faed46 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Thu, 29 Sep 2022 09:10:44 -0400
Subject: [PATCH v25 2/4] Support DDL replication.

To support DDL replication, we use event trigger and DDL deparsing
facilities. While creating a publication, we register a command end
trigger that deparses the DDL as a JSON blob, and WAL logs it. The event
trigger is automatically removed at the time of drop publication. The
WALSender decodes the WAL and sends it downstream similar to other DML
commands. The subscriber then converts JSON back to the DDL command string
and executes it. In the subscriber, we also add the newly added rel to
pg_subscription_rel so that the DML changes on the new table can be
replicated without having to manually run
"ALTER SUBSCRIPTION ... REFRESH PUBLICATION".

This is a POC patch to show how using event triggers and DDL deparsing
facilities we can implement DDL replication. So, the implementation is
restricted to CREATE TABLE/ALTER TABLE/DROP TABLE commands.

- For non-rewrite ALTER object command and
-     CREATE object command:
we deparse the command at ddl_command_end event trigger and WAL log the
deparsed json string. The WALSender decodes the WAL and sends it to
subscriber if the created/altered table is published. It supports most of
ALTER TABLE command except some commands(DDL related to PARTITIONED TABLE
...) that introduced recently which haven't been supported by the current
ddl_deparser, we will support that later.

Note that the replication for ALTER INDEX command is still under
progress.

- For DROP object:
The 'command start' event handler logs a ddl message with the relids of
the tables that are dropped which the output plugin (pgoutput) stores in
its internal data structure after verifying that it is for a table that is
part of the publication. Later the 'command end' event handler sends the
actual drop message. Pgoutput on receiving the command end, only sends out
the drop command only if it is for one of the relids marked for deleting.
The reason we have to do this is because, once the logical decoder
receives the 'command end' message,  the relid of the table is no longer
valid as it has been deleted as part of invalidations received for the
drop table command. It is no longer possible to verify if the table is
part of the publication list or not. To make this possible, I have added
two more elements to the ddl xlog and ddl message, (relid and cmdtype).

We could have also handled all this on the subscriber side as well, but
that would mean sending spurious ddl messages for tables that are not part
of the publication.

- For table_rewrite ALTER TABLE command:
(ALTER COLUMN TYPE, ADD COLUMN DEFAULT, SET LOGGED, SET ACCESS METHOD)

we deparse the command and WAL log the deparsed json string at
table_rewrite event trigger. The WALSender decodes the WAL and sends it to
subscriber if the altered table is published. Then, the WALSender will
convert the upcoming rewrite INSERTs to UPDATEs and send them to
subscriber so that the data between publisher and subscriber can always be
consistent. Note that the tables that publish rewrite ddl must have a
replica identity configured in order to be able to replicate the upcoming
rewrite UPDATEs.

We do this way because of two reason:
(1) The data before the rewrite ddl could already be different among
publisher and subscriber. To make sure the extra data in subscriber which
doesn't exist in publisher also get rewritten, we need to let the
subscriber execute the original rewrite ddl to rewrite all the data at
first.

(2) the data after executing rewrite ddl could be different among
publisher and subscriber(due to different functions/operators used during
rewrite), so we need to replicate the rewrite UPDATEs to keep the data
consistent.

TO IMPROVE:
This approach could be improved by letting the subscriber try to update
the extra data itself instead of doing fully rewrite ddl and use the
upcoming rewrite UPDATEs to rewrite the rest data. To achieve this, we
could modify the deparsed json string to temporarily remove the rewrite
part and add some logic in subscriber to update the extra data.
Besides, we may not need to send rewrite changes for all type of rewrite
ddl, for example, it seems fine to skip sending rewrite changes for ALTER
TABLE SET LOGGED as the data in the table doesn't actually be changed. We
could use the deparser and event trigger to filter these ddls and skip
sending rewrite changes for them.
---
 src/backend/access/rmgrdesc/Makefile          |   1 +
 .../access/rmgrdesc/logicalddlmsgdesc.c       |  52 +++
 src/backend/access/rmgrdesc/meson.build       |   1 +
 src/backend/catalog/pg_publication.c          |   1 +
 src/backend/commands/event_trigger.c          | 250 ++++++++++-
 src/backend/commands/publicationcmds.c        | 157 +++++++
 src/backend/commands/tablecmds.c              |   2 +-
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/ddlmessage.c  |  86 ++++
 src/backend/replication/logical/decode.c      |  41 ++
 src/backend/replication/logical/logical.c     |  93 ++++
 src/backend/replication/logical/meson.build   |   1 +
 src/backend/replication/logical/proto.c       |  48 ++
 .../replication/logical/reorderbuffer.c       | 136 ++++++
 src/backend/replication/logical/worker.c      | 232 ++++++++++
 src/backend/replication/pgoutput/pgoutput.c   | 192 +++++++-
 src/backend/utils/cache/relcache.c            |   1 +
 src/bin/pg_dump/pg_dump.c                     |  21 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/pg_waldump/rmgrdesc.c                 |   1 +
 src/bin/psql/describe.c                       |  17 +-
 src/include/access/rmgrlist.h                 |   1 +
 src/include/catalog/pg_proc.dat               |   9 +
 src/include/catalog/pg_publication.h          |   4 +
 src/include/commands/event_trigger.h          |   3 +-
 src/include/replication/ddlmessage.h          |  60 +++
 src/include/replication/decode.h              |   1 +
 src/include/replication/logicalproto.h        |   7 +-
 src/include/replication/output_plugin.h       |  27 ++
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/reorderbuffer.h       |  39 ++
 src/test/regress/expected/psql.out            |   6 +-
 src/test/regress/expected/publication.out     | 420 +++++++++---------
 33 files changed, 1684 insertions(+), 229 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c
 create mode 100644 src/backend/replication/logical/ddlmessage.c
 create mode 100644 src/include/replication/ddlmessage.h

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd86..b8e29e8df3 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -19,6 +19,7 @@ OBJS = \
 	hashdesc.o \
 	heapdesc.o \
 	logicalmsgdesc.o \
+	logicalddlmsgdesc.o \
 	mxactdesc.o \
 	nbtdesc.o \
 	relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
new file mode 100644
index 0000000000..81dee529d0
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
@@ -0,0 +1,52 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalddlmsgdesc.c
+ *	  rmgr descriptor routines for replication/logical/ddlmessage.c
+ *
+ * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/logicalddlmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/ddlmessage.h"
+
+void
+logicalddlmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_LOGICAL_DDL_MESSAGE)
+	{
+		xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec;
+		char	   *prefix = xlrec->message;
+		char	   *message = xlrec->message + xlrec->prefix_size;
+		char	   *sep = "";
+
+		Assert(prefix[xlrec->prefix_size] != '\0');
+
+		appendStringInfo(buf, "prefix \"%s\"; payload (%zu bytes): ",
+						 prefix, xlrec->message_size);
+		appendStringInfo(buf, "relid %u cmdtype %u", xlrec->relid, xlrec->cmdtype);
+		/* Write message payload as a series of hex bytes */
+		for (int cnt = 0; cnt < xlrec->message_size; cnt++)
+		{
+			appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]);
+			sep = " ";
+		}
+	}
+}
+
+const char *
+logicalddlmsg_identify(uint8 info)
+{
+	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE)
+		return "DDL MESSAGE";
+
+	return NULL;
+}
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index f3a6e0a571..3a70c974de 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -9,6 +9,7 @@ rmgr_desc_sources = files(
   'gistdesc.c',
   'hashdesc.c',
   'heapdesc.c',
+  'logicalddlmsgdesc.c',
   'logicalmsgdesc.c',
   'mxactdesc.c',
   'nbtdesc.c',
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 59967098b3..721d023aa5 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1005,6 +1005,7 @@ GetPublication(Oid pubid)
 	pub->pubactions.pubupdate = pubform->pubupdate;
 	pub->pubactions.pubdelete = pubform->pubdelete;
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
+	pub->pubactions.pubddl = pubform->pubddl;
 	pub->pubviaroot = pubform->pubviaroot;
 
 	ReleaseSysCache(tup);
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 441f29d684..7c8c1f5394 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -37,8 +37,11 @@
 #include "miscadmin.h"
 #include "parser/parse_func.h"
 #include "pgstat.h"
+#include "replication/ddlmessage.h"
+#include "replication/message.h"
 #include "tcop/deparse_utility.h"
 #include "tcop/utility.h"
+#include "tcop/ddl_deparse.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/evtcache.h"
@@ -1537,6 +1540,7 @@ EventTriggerAlterTableStart(Node *parsetree)
 
 	command->d.alterTable.classId = RelationRelationId;
 	command->d.alterTable.objectId = InvalidOid;
+	command->d.alterTable.rewrite = false;
 	command->d.alterTable.subcmds = NIL;
 	command->parsetree = copyObject(parsetree);
 
@@ -1570,7 +1574,7 @@ EventTriggerAlterTableRelid(Oid objectId)
  * internally, so that's all that this code needs to handle at the moment.
  */
 void
-EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address)
+EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address, bool rewrite)
 {
 	MemoryContext oldcxt;
 	CollectedATSubcmd *newsub;
@@ -1590,6 +1594,7 @@ EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address)
 	newsub->address = address;
 	newsub->parsetree = copyObject(subcmd);
 
+	currentEventTriggerState->currentCommand->d.alterTable.rewrite |= rewrite;
 	currentEventTriggerState->currentCommand->d.alterTable.subcmds =
 		lappend(currentEventTriggerState->currentCommand->d.alterTable.subcmds, newsub);
 
@@ -2175,3 +2180,246 @@ stringify_adefprivs_objtype(ObjectType objtype)
 
 	return "???";				/* keep compiler quiet */
 }
+
+/*
+ * publication_deparse_ddl_command_start
+ *
+ * Deparse the ddl command and log it.
+ */
+Datum
+publication_deparse_ddl_command_start(PG_FUNCTION_ARGS)
+{
+	EventTriggerData *trigdata;
+	char		*command = psprintf("Drop table command start");
+	DropStmt	*stmt;
+	ListCell	*cell1;
+
+	if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
+		elog(ERROR, "not fired by event trigger manager");
+
+	trigdata = (EventTriggerData *) fcinfo->context;
+	stmt	 = (DropStmt *) trigdata->parsetree;
+
+	/* extract the relid from the parse tree */
+	foreach(cell1, stmt->objects)
+	{
+		char	relpersist;
+		Node	*object = lfirst(cell1);
+		ObjectAddress address;
+		Relation relation = NULL;
+
+		address = get_object_address(stmt->removeType,
+									 object,
+									 &relation,
+									 AccessExclusiveLock,
+									 true);
+
+		relpersist = get_rel_persistence(address.objectId);
+
+		/*
+		 * Do not generate wal log for commands whose target table is a
+		 * temporary table.
+		 *
+		 * We will generate wal logs for unlogged tables so that unlogged
+		 * tables can also be created and altered on the subscriber side. This
+		 * makes it possible to directly replay the SET LOGGED command and the
+		 * incoming rewrite message without creating a new table.
+		 */
+		if (relpersist == RELPERSISTENCE_TEMP)
+		{
+			table_close(relation, NoLock);
+			continue;
+		}
+
+		LogLogicalDDLMessage("deparse", address.objectId, DCT_TableDropStart,
+							 command, strlen(command) + 1);
+
+		if (relation)
+			table_close(relation, NoLock);
+	}
+	return PointerGetDatum(NULL);
+}
+
+/*
+ * publication_deparse_table_rewrite
+ *
+ * Deparse the ddl table rewrite command and log it.
+ */
+Datum
+publication_deparse_table_rewrite(PG_FUNCTION_ARGS)
+{
+	char				relpersist;
+	CollectedCommand   *cmd;
+	char			   *json_string;
+
+	if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
+		elog(ERROR, "not fired by event trigger manager");
+
+	cmd = currentEventTriggerState->currentCommand;
+
+	Assert(cmd && cmd->d.alterTable.rewrite);
+
+	relpersist = get_rel_persistence(cmd->d.alterTable.objectId);
+
+	/*
+	 * Do not generate wal log for commands whose target table is a temporary
+	 * table.
+	 *
+	 * We will generate wal logs for unlogged tables so that unlogged tables
+	 * can also be created and altered on the subscriber side. This makes it
+	 * possible to directly replay the SET LOGGED command and the incoming
+	 * rewrite message without creating a new table.
+	 */
+	if (relpersist == RELPERSISTENCE_TEMP)
+		return PointerGetDatum(NULL);
+
+	/* Deparse the DDL command and WAL log it to allow decoding of the same. */
+	json_string = deparse_utility_command(cmd, false);
+
+	if (json_string != NULL)
+		LogLogicalDDLMessage("deparse", cmd->d.alterTable.objectId, DCT_TableAlter,
+							 json_string, strlen(json_string) + 1);
+
+	return PointerGetDatum(NULL);
+}
+
+/*
+ * publication_deparse_ddl_command_end
+ *
+ * Deparse the ddl command and log it.
+ */
+Datum
+publication_deparse_ddl_command_end(PG_FUNCTION_ARGS)
+{
+	ListCell   *lc;
+	slist_iter  iter;
+	DeparsedCommandType type;
+	Oid relid;
+
+	if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
+		elog(ERROR, "not fired by event trigger manager");
+
+	foreach(lc, currentEventTriggerState->commandList)
+	{
+		char				relpersist = RELPERSISTENCE_PERMANENT;
+		CollectedCommand   *cmd = lfirst(lc);
+		char			   *json_string;
+
+		/* Rewrite DDL has been handled in table_rewrite trigger */
+		if (cmd->d.alterTable.rewrite)
+		{
+			RenameStmt *renameStmt = (RenameStmt *)cmd->parsetree;
+
+			if (renameStmt && renameStmt->relationType != OBJECT_TYPE &&
+				renameStmt->relationType != OBJECT_TABLE)
+				continue;
+		}
+
+		if (cmd->type == SCT_Simple &&
+			!OidIsValid(cmd->d.simple.address.objectId))
+			continue;
+
+		if (cmd->type == SCT_AlterTable)
+		{
+			relid = cmd->d.alterTable.objectId;
+			type = DCT_TableAlter;
+		}
+		else
+		{
+			/* Only SCT_Simple for now */
+			relid = cmd->d.simple.address.objectId;
+			type = DCT_SimpleCmd;
+		}
+
+		if (get_rel_relkind(relid))
+			relpersist = get_rel_persistence(relid);
+
+		/*
+		 * Do not generate wal log for commands whose target table is a
+		 * temporary table.
+		 *
+		 * We will generate wal logs for unlogged tables so that unlogged tables
+		 * can also be created and altered on the subscriber side. This makes it
+		 * possible to directly replay the SET LOGGED command and the incoming
+		 * rewrite message without creating a new table.
+		 */
+		if (relpersist == RELPERSISTENCE_TEMP)
+			continue;
+
+		/*
+		 * Deparse the DDL command and WAL log it to allow decoding of the
+		 * same.
+		 */
+		json_string = deparse_utility_command(cmd, false);
+
+		if (json_string == NULL)
+			continue;
+
+		LogLogicalDDLMessage("deparse", relid, type, json_string,
+							 strlen(json_string) + 1);
+	}
+
+	slist_foreach(iter, &(currentEventTriggerState->SQLDropList))
+	{
+		volatile SQLDropObject *obj;
+		DropStmt			   *stmt;
+		EventTriggerData 	   *trigdata;
+		char				   *command;
+		DeparsedCommandType		cmdtype;
+		const char			   *tmptype;
+
+		trigdata = (EventTriggerData *) fcinfo->context;
+		stmt	 = (DropStmt *) trigdata->parsetree;
+
+		obj = slist_container(SQLDropObject, next, iter.cur);
+
+		if (strcmp(obj->objecttype, "table") == 0)
+			cmdtype = DCT_TableDropEnd;
+		else if (strcmp(obj->objecttype, "sequence") == 0 ||
+				 strcmp(obj->objecttype, "schema") == 0 ||
+				 strcmp(obj->objecttype, "index") == 0 ||
+				 strcmp(obj->objecttype, "function") == 0 ||
+				 strcmp(obj->objecttype, "procedure") == 0 ||
+				 strcmp(obj->objecttype, "operator") == 0 ||
+				 strcmp(obj->objecttype, "operator class") == 0 ||
+				 strcmp(obj->objecttype, "operator family") == 0 ||
+				 strcmp(obj->objecttype, "cast") == 0 ||
+				 strcmp(obj->objecttype, "type") == 0 ||
+				 strcmp(obj->objecttype, "domain") == 0 ||
+				 strcmp(obj->objecttype, "trigger") == 0 ||
+				 strcmp(obj->objecttype, "conversion") == 0 ||
+				 strcmp(obj->objecttype, "policy") == 0	||
+				 strcmp(obj->objecttype, "rule") == 0 ||
+				 strcmp(obj->objecttype, "extension") == 0 ||
+				 strcmp(obj->objecttype, "foreign-data wrapper") == 0 ||
+				 strcmp(obj->objecttype, "text search configuration") == 0 ||
+				 strcmp(obj->objecttype, "text search dictionary") == 0 ||
+				 strcmp(obj->objecttype, "text search parser") == 0 ||
+				 strcmp(obj->objecttype, "text search template") == 0 ||
+				 strcmp(obj->objecttype, "transform") == 0)
+			cmdtype = DCT_ObjectDrop;
+		else
+			continue;
+
+		/*
+		 * Change foreign-data wrapper to foreign data wrapper.
+		 */
+		if (strncmp(obj->objecttype, "foreign-data wrapper", 20) == 0)
+		{
+			tmptype = pstrdup("foreign data wrapper");
+			command = deparse_drop_command(obj->objidentity, tmptype,
+										   stmt->behavior);
+		}
+		else
+			command = deparse_drop_command(obj->objidentity, obj->objecttype,
+									   stmt->behavior);
+
+		if (command == NULL)
+			continue;
+
+		LogLogicalDDLMessage("deparse", obj->address.objectId, cmdtype,
+							 command, strlen(command) + 1);
+	}
+
+	return PointerGetDatum(NULL);
+}
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index a8b75eb1be..5a5b979b03 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -37,10 +37,12 @@
 #include "commands/publicationcmds.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "parser/parse_clause.h"
 #include "parser/parse_collate.h"
 #include "parser/parse_relation.h"
+#include "parser/parser.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/array.h"
@@ -96,6 +98,7 @@ parse_publication_options(ParseState *pstate,
 	pubactions->pubupdate = true;
 	pubactions->pubdelete = true;
 	pubactions->pubtruncate = true;
+	pubactions->pubddl = false;
 	*publish_via_partition_root = false;
 
 	/* Parse options */
@@ -143,6 +146,8 @@ parse_publication_options(ParseState *pstate,
 					pubactions->pubdelete = true;
 				else if (strcmp(publish_opt, "truncate") == 0)
 					pubactions->pubtruncate = true;
+				else if (strcmp(publish_opt, "ddl") == 0)
+					pubactions->pubddl = true;
 				else
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
@@ -727,6 +732,53 @@ CheckPubRelationColumnList(char *pubname, List *tables,
 	}
 }
 
+/*
+ * Create event trigger which is used for DDL replication.
+ */
+static void
+CreateDDLReplicaEventTrigger(char *eventname, CommandTag *commands,
+							 int ncommands, ObjectAddress pubaddress,
+							 Oid puboid)
+{
+	int					i;
+	List			   *tags = NIL;
+	Oid					trigger_id;
+	ObjectAddress		referenced;
+	CreateEventTrigStmt *ddl_trigger;
+	char				trigger_name[NAMEDATALEN];
+	char				trigger_func_name[NAMEDATALEN];
+	static const char   *trigger_name_prefix = "pg_deparse_trig_%s_%u";
+	static const char   *trigger_func_prefix = "publication_deparse_%s";
+
+	ddl_trigger = makeNode(CreateEventTrigStmt);
+
+	snprintf(trigger_name, sizeof(trigger_name), trigger_name_prefix,
+			 eventname, puboid);
+	snprintf(trigger_func_name, sizeof(trigger_func_name), trigger_func_prefix,
+			 eventname);
+
+	ddl_trigger->trigname = pstrdup(trigger_name);
+	ddl_trigger->eventname = eventname;
+	ddl_trigger->funcname = SystemFuncName(trigger_func_name);
+
+	for (i = 0; i < ncommands; i++)
+	{
+		String *tag = makeString(pstrdup(GetCommandTagName(commands[i])));
+		tags = lappend(tags, tag);
+	}
+
+	ddl_trigger->whenclause = list_make1(makeDefElem("tag", (Node *) tags, -1));
+
+	trigger_id = CreateEventTrigger(ddl_trigger);
+
+	/*
+	 * Register the event triggers as internally dependent on the
+	 * publication.
+	 */
+	ObjectAddressSet(referenced, EventTriggerRelationId, trigger_id);
+	recordDependencyOn(&referenced, &pubaddress, DEPENDENCY_INTERNAL);
+}
+
 /*
  * Create new publication.
  */
@@ -797,6 +849,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 		BoolGetDatum(pubactions.pubdelete);
 	values[Anum_pg_publication_pubtruncate - 1] =
 		BoolGetDatum(pubactions.pubtruncate);
+	values[Anum_pg_publication_pubddl - 1] =
+		BoolGetDatum(pubactions.pubddl);
 	values[Anum_pg_publication_pubviaroot - 1] =
 		BoolGetDatum(publish_via_partition_root);
 
@@ -857,6 +911,106 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 		}
 	}
 
+	/*
+	 * Create an event trigger to allow logging of DDL statements.
+	 *
+	 * TODO: We need to find a better syntax to allow replication of DDL
+	 * statements.
+	 *
+	 * XXX: This code is just to show the replication of CREATE/ALTER/DROP
+	 * TABLE works. We need to enhance this once the approach for DDL
+	 * replication is finalized.
+	 */
+	if (pubactions.pubddl)
+	{
+		CommandTag	start_commands[] = {CMDTAG_DROP_TABLE};
+		CommandTag	rewrite_commands[] = {CMDTAG_ALTER_TABLE};
+
+		CommandTag	end_commands[] = {
+			CMDTAG_CREATE_TRANSFORM,
+			CMDTAG_DROP_TRANSFORM,
+			CMDTAG_CREATE_FOREIGN_DATA_WRAPPER,
+			CMDTAG_ALTER_FOREIGN_DATA_WRAPPER,
+			CMDTAG_DROP_FOREIGN_DATA_WRAPPER,
+			CMDTAG_CREATE_EXTENSION,
+			CMDTAG_ALTER_EXTENSION,
+			CMDTAG_DROP_EXTENSION,
+			CMDTAG_CREATE_TEXT_SEARCH_CONFIGURATION,
+			CMDTAG_ALTER_TEXT_SEARCH_CONFIGURATION,
+			CMDTAG_DROP_TEXT_SEARCH_CONFIGURATION,
+			CMDTAG_CREATE_TEXT_SEARCH_DICTIONARY,
+			CMDTAG_ALTER_TEXT_SEARCH_DICTIONARY,
+			CMDTAG_DROP_TEXT_SEARCH_DICTIONARY,
+			CMDTAG_CREATE_TEXT_SEARCH_PARSER,
+			CMDTAG_DROP_TEXT_SEARCH_PARSER,
+			CMDTAG_CREATE_TEXT_SEARCH_TEMPLATE,
+			CMDTAG_DROP_TEXT_SEARCH_TEMPLATE,
+			CMDTAG_CREATE_POLICY,
+			CMDTAG_ALTER_POLICY,
+			CMDTAG_DROP_POLICY,
+			CMDTAG_CREATE_CONVERSION,
+			CMDTAG_ALTER_CONVERSION,
+			CMDTAG_DROP_CONVERSION,
+			CMDTAG_CREATE_DOMAIN,
+			CMDTAG_ALTER_DOMAIN,
+			CMDTAG_DROP_DOMAIN,
+			CMDTAG_ALTER_INDEX,
+			CMDTAG_CREATE_TYPE,
+			CMDTAG_DROP_TYPE,
+			CMDTAG_ALTER_TYPE,
+			CMDTAG_CREATE_CAST,
+			CMDTAG_ALTER_CAST,
+			CMDTAG_DROP_CAST,
+			CMDTAG_CREATE_OPERATOR,
+			CMDTAG_CREATE_OPERATOR_CLASS,
+			CMDTAG_CREATE_OPERATOR_FAMILY,
+			CMDTAG_ALTER_OPERATOR_FAMILY,
+			CMDTAG_ALTER_OPERATOR_CLASS,
+			CMDTAG_ALTER_OPERATOR,
+			CMDTAG_DROP_OPERATOR,
+			CMDTAG_DROP_OPERATOR_CLASS,
+			CMDTAG_DROP_OPERATOR_FAMILY,
+			CMDTAG_CREATE_PROCEDURE,
+			CMDTAG_ALTER_PROCEDURE,
+			CMDTAG_DROP_PROCEDURE,
+			CMDTAG_CREATE_FUNCTION,
+			CMDTAG_ALTER_FUNCTION,
+			CMDTAG_DROP_FUNCTION,
+			CMDTAG_CREATE_TRIGGER,
+			CMDTAG_ALTER_TRIGGER,
+			CMDTAG_DROP_TABLE,
+			CMDTAG_CREATE_TABLE,
+			CMDTAG_ALTER_TABLE,
+			CMDTAG_CREATE_SEQUENCE,
+			CMDTAG_ALTER_SEQUENCE,
+			CMDTAG_DROP_SEQUENCE,
+			CMDTAG_CREATE_SCHEMA,
+			CMDTAG_ALTER_SCHEMA,
+			CMDTAG_DROP_SCHEMA,
+			CMDTAG_CREATE_INDEX,
+			CMDTAG_DROP_INDEX,
+			CMDTAG_ALTER_INDEX,
+			CMDTAG_GRANT,
+			CMDTAG_REVOKE,
+			CMDTAG_CREATE_RULE,
+			CMDTAG_ALTER_RULE,
+			CMDTAG_DROP_RULE,
+			CMDTAG_REFRESH_MATERIALIZED_VIEW
+		};
+
+		/* Create the ddl_command_end event trigger */
+		CreateDDLReplicaEventTrigger("ddl_command_end", end_commands,
+									 lengthof(end_commands), myself, puboid);
+
+		/* Create the ddl_command_start event trigger */
+		CreateDDLReplicaEventTrigger("ddl_command_start", start_commands,
+									 lengthof(start_commands), myself, puboid);
+
+		/* Create the table_rewrite event trigger */
+		CreateDDLReplicaEventTrigger("table_rewrite", rewrite_commands,
+									 lengthof(rewrite_commands), myself, puboid);
+	}
+
 	table_close(rel, RowExclusiveLock);
 
 	InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
@@ -995,6 +1149,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 
 		values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
 		replaces[Anum_pg_publication_pubtruncate - 1] = true;
+
+		values[Anum_pg_publication_pubddl - 1] = BoolGetDatum(pubactions.pubddl);
+		replaces[Anum_pg_publication_pubddl - 1] = true;
 	}
 
 	if (publish_via_partition_root_given)
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 1f774ac065..7fc6f41601 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -5253,7 +5253,7 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 	 * Report the subcommand to interested event triggers.
 	 */
 	if (cmd)
-		EventTriggerCollectAlterTableSubcmd((Node *) cmd, address);
+		EventTriggerCollectAlterTableSubcmd((Node *) cmd, address, tab->rewrite);
 
 	/*
 	 * Bump the command counter to ensure the next subcommand in the sequence
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb71..f3eeb67312 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
 OBJS = \
 	decode.o \
+	ddlmessage.o\
 	launcher.o \
 	logical.o \
 	logicalfuncs.o \
diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c
new file mode 100644
index 0000000000..5093523e9a
--- /dev/null
+++ b/src/backend/replication/logical/ddlmessage.c
@@ -0,0 +1,86 @@
+/*-------------------------------------------------------------------------
+ *
+ * ddlmessage.c
+ *	  Logical DDL messages.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/ddlmessage.c
+ *
+ * NOTES
+ *
+ * Logical DDL messages allow XLOG logging of DDL command strings that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * Unlike generic logical messages, these DDL messages have only transactional
+ * mode.Note by default DDLs in PostgreSQL are transactional.
+ *
+ * These messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/namespace.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/ddlmessage.h"
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding DDL message into XLog.
+ */
+XLogRecPtr
+LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+					 const char *message, size_t size)
+{
+	xl_logical_ddl_message xlrec;
+
+	/*
+	 * Ensure we have a valid transaction id.
+	 */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	xlrec.dbId = MyDatabaseId;
+	/* trailing zero is critical; see logicalddlmsg_desc */
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.message_size = size;
+	xlrec.relid = relid;
+	xlrec.cmdtype = cmdtype;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage);
+	XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+	XLogRegisterData(unconstify(char *, message), size);
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding ddl messages.
+ */
+void
+logicalddlmsg_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info != XLOG_LOGICAL_DDL_MESSAGE)
+		elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info);
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 2cc0ac9eb0..1f2c751759 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -36,6 +36,7 @@
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
 #include "replication/decode.h"
+#include "replication/ddlmessage.h"
 #include "replication/logical.h"
 #include "replication/message.h"
 #include "replication/origin.h"
@@ -603,6 +604,46 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 							  message->message + message->prefix_size);
 }
 
+/*
+ * Handle rmgr LOGICALDDLMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+void
+logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	XLogReaderState *r = buf->record;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	xl_logical_ddl_message *message;
+
+	if (info != XLOG_LOGICAL_DDL_MESSAGE)
+		elog(ERROR, "unexpected RM_LOGICALDDLMSG_ID record type: %u", info);
+
+	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding ddl messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	message = (xl_logical_ddl_message *) XLogRecGetData(r);
+
+	if (message->dbId != ctx->slot->data.database ||
+		FilterByOrigin(ctx, origin_id))
+		return;
+
+	if (SnapBuildProcessChange(builder, xid, buf->origptr))
+		ReorderBufferQueueDDLMessage(ctx->reorder, xid, buf->endptr,
+									 message->message, /* first part of message is prefix */
+									 message->message_size,
+									 message->message + message->prefix_size,
+									 message->relid, message->cmdtype);
+}
+
 /*
  * Consolidated commit record handling between the different form of commit
  * records.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 625a7f4273..98969c7aec 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -33,6 +33,7 @@
 #include "fmgr.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/ddlmessage.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
@@ -73,6 +74,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+								  XLogRecPtr message_lsn, const char *prefix,
+								  Oid relid, DeparsedCommandType cmdtype,
+								  Size message_size, const char *message);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -90,6 +95,11 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
 static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									  XLogRecPtr message_lsn, bool transactional,
 									  const char *prefix, Size message_size, const char *message);
+static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+										 XLogRecPtr message_lsn,
+										 const char *prefix,
+										 Oid relid, DeparsedCommandType cmdtype,
+										 Size message_size, const char *message);
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
@@ -218,6 +228,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->ddlmessage = ddlmessage_cb_wrapper;
 
 	/*
 	 * To support streaming, we require start/stop/abort/commit/change
@@ -234,6 +245,7 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_commit_cb != NULL) ||
 		(ctx->callbacks.stream_change_cb != NULL) ||
 		(ctx->callbacks.stream_message_cb != NULL) ||
+		(ctx->callbacks.stream_ddlmessage_cb != NULL) ||
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
 	/*
@@ -251,6 +263,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->stream_commit = stream_commit_cb_wrapper;
 	ctx->reorder->stream_change = stream_change_cb_wrapper;
 	ctx->reorder->stream_message = stream_message_cb_wrapper;
+	ctx->reorder->stream_ddlmessage = stream_ddlmessage_cb_wrapper;
 	ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
 
 
@@ -1220,6 +1233,44 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+					  XLogRecPtr message_lsn,
+					  const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+					  Size message_size,
+					  const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	if (ctx->callbacks.ddlmessage_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "ddlmessage";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, prefix, relid, cmdtype,
+								 message_size, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						XLogRecPtr first_lsn)
@@ -1535,6 +1586,48 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							 XLogRecPtr message_lsn,
+							 const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+							 Size message_size,
+							 const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* this callback is optional */
+	if (ctx->callbacks.stream_ddlmessage_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_ddlmessage";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, prefix, relid,
+										cmdtype, message_size, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						   int nrelations, Relation relations[],
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 773583a12b..e7c70aa261 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -1,4 +1,5 @@
 backend_sources += files(
+  'ddlmessage.c',
   'decode.c',
   'launcher.c',
   'logical.c',
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index ff8513e2d2..f35406275b 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -662,6 +662,52 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 	pq_sendbytes(out, message, sz);
 }
 
+/*
+ * Read DDL MESSAGE from stream
+ */
+char *
+logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn,
+						   const char **prefix,
+						   Size *sz)
+{
+	uint8 flags;
+	char *msg;
+
+	//TODO double check when do we need to get TransactionId.
+
+	flags = pq_getmsgint(in, 1);
+	if (flags != 0)
+		elog(ERROR, "unrecognized flags %u in ddl message", flags);
+	*lsn = pq_getmsgint64(in);
+	*prefix = pq_getmsgstring(in);
+	*sz = pq_getmsgint(in, 4);
+	msg = (char *) pq_getmsgbytes(in, *sz);
+
+	return msg;
+}
+
+/*
+ * Write DDL MESSAGE to stream
+ */
+void
+logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+							const char *prefix, Size sz, const char *message)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_DDLMESSAGE);
+
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
+		pq_sendint32(out, xid);
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+	pq_sendstring(out, prefix);
+	pq_sendint32(out, sz);
+	pq_sendbytes(out, message, sz);
+}
+
 /*
  * Write relation description to the output stream.
  */
@@ -1218,6 +1264,8 @@ logicalrep_message_type(LogicalRepMsgType action)
 			return "TYPE";
 		case LOGICAL_REP_MSG_MESSAGE:
 			return "MESSAGE";
+		case LOGICAL_REP_MSG_DDLMESSAGE:
+			return "DDL";
 		case LOGICAL_REP_MSG_BEGIN_PREPARE:
 			return "BEGIN PREPARE";
 		case LOGICAL_REP_MSG_PREPARE:
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 6dff9915a5..3c37690c32 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -94,6 +94,7 @@
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/ddlmessage.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
@@ -515,6 +516,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				pfree(change->data.msg.message);
 			change->data.msg.message = NULL;
 			break;
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			if (change->data.ddlmsg.prefix != NULL)
+				pfree(change->data.ddlmsg.prefix);
+			change->data.ddlmsg.prefix = NULL;
+			if (change->data.ddlmsg.message != NULL)
+				pfree(change->data.ddlmsg.message);
+			change->data.ddlmsg.message = NULL;
+			break;
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			if (change->data.inval.invalidations)
 				pfree(change->data.inval.invalidations);
@@ -869,6 +878,36 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * A transactional DDL message is queued to be processed upon commit.
+ */
+void
+ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid,
+							 XLogRecPtr lsn, const char *prefix,
+							 Size message_size, const char *message,
+							 Oid relid, DeparsedCommandType cmdtype)
+{
+	MemoryContext oldcontext;
+	ReorderBufferChange *change;
+
+	Assert(xid != InvalidTransactionId);
+
+	oldcontext = MemoryContextSwitchTo(rb->context);
+
+	change = ReorderBufferGetChange(rb);
+	change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE;
+	change->data.ddlmsg.prefix = pstrdup(prefix);
+	change->data.ddlmsg.relid = relid;
+	change->data.ddlmsg.cmdtype = cmdtype;
+	change->data.ddlmsg.message_size = message_size;
+	change->data.ddlmsg.message = palloc(message_size);
+	memcpy(change->data.ddlmsg.message, message, message_size);
+
+	ReorderBufferQueueChange(rb, xid, lsn, change, false);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1968,6 +2007,29 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					change->data.msg.message);
 }
 
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the DDL message.
+ */
+static inline void
+ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
+							 ReorderBufferChange *change, bool streaming)
+{
+	if (streaming)
+		rb->stream_ddlmessage(rb, txn, change->lsn,
+							  change->data.ddlmsg.prefix,
+							  change->data.ddlmsg.relid,
+							  change->data.ddlmsg.cmdtype,
+							  change->data.ddlmsg.message_size,
+							  change->data.ddlmsg.message);
+	else
+		rb->ddlmessage(rb, txn, change->lsn,
+					   change->data.ddlmsg.prefix,
+					   change->data.ddlmsg.relid,
+					   change->data.ddlmsg.cmdtype,
+					   change->data.ddlmsg.message_size,
+					   change->data.ddlmsg.message);
+}
+
 /*
  * Function to store the command id and snapshot at the end of the current
  * stream so that we can reuse the same while sending the next stream.
@@ -2348,6 +2410,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					ReorderBufferApplyMessage(rb, txn, change, streaming);
 					break;
 
+				case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+					ReorderBufferApplyDDLMessage(rb, txn, change, streaming);
+					break;
+
 				case REORDER_BUFFER_CHANGE_INVALIDATION:
 					/* Execute the invalidation messages locally */
 					ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
@@ -3771,6 +3837,40 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					   change->data.msg.message_size);
 				data += change->data.msg.message_size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			{
+				char	   *data;
+				Size		prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
+
+				sz += prefix_size + change->data.ddlmsg.message_size +
+					sizeof(Size) + sizeof(Oid) + sizeof(int) + sizeof(Size);
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				/* write the prefix, relid and cmdtype including the size */
+				memcpy(data, &prefix_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, &change->data.ddlmsg.relid, sizeof(Oid));
+				data += sizeof(Oid);
+				memcpy(data, &change->data.ddlmsg.cmdtype, sizeof(int));
+				data += sizeof(int);
+				memcpy(data, change->data.ddlmsg.prefix,
+					   prefix_size);
+				data += prefix_size;
+
+				/* write the message including the size */
+				memcpy(data, &change->data.ddlmsg.message_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, change->data.ddlmsg.message,
+					   change->data.ddlmsg.message_size);
+				data += change->data.ddlmsg.message_size;
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
@@ -4085,6 +4185,15 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 				sz += prefix_size + change->data.msg.message_size +
 					sizeof(Size) + sizeof(Size);
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			{
+				Size		prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
+
+				sz += prefix_size + change->data.ddlmsg.message_size +
+					sizeof(Size) + sizeof(Size) + sizeof(Oid) + sizeof(int);
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
@@ -4360,6 +4469,33 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					   change->data.msg.message_size);
 				data += change->data.msg.message_size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			{
+				Size		prefix_size;
+
+				/* read prefix */
+				memcpy(&prefix_size, data, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(&change->data.ddlmsg.relid, data, sizeof(Oid));
+				data += sizeof(Oid);
+				memcpy(&change->data.ddlmsg.cmdtype, data, sizeof(int));
+				data += sizeof(int);
+				change->data.ddlmsg.prefix = MemoryContextAlloc(rb->context, prefix_size);
+				memcpy(change->data.ddlmsg.prefix, data, prefix_size);
+				Assert(change->data.ddlmsg.prefix[prefix_size - 1] == '\0');
+				data += prefix_size;
+
+				/* read the message */
+				memcpy(&change->data.msg.message_size, data, sizeof(Size));
+				data += sizeof(Size);
+				change->data.msg.message = MemoryContextAlloc(rb->context,
+															  change->data.msg.message_size);
+				memcpy(change->data.msg.message, data,
+					   change->data.msg.message_size);
+				data += change->data.msg.message_size;
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 207a5805ba..21b82f344f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,6 +156,7 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "optimizer/optimizer.h"
+#include "parser/analyze.h"
 #include "pgstat.h"
 #include "postmaster/bgworker.h"
 #include "postmaster/interrupt.h"
@@ -179,7 +180,10 @@
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "tcop/ddl_deparse.h"
+#include "tcop/pquery.h"
 #include "tcop/tcopprot.h"
+#include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -2464,6 +2468,230 @@ apply_handle_truncate(StringInfo s)
 	end_replication_step();
 }
 
+/* Remove the data population from the command */
+static void
+preprocess_create_table(RawStmt *command)
+{
+	CommandTag	commandTag;
+
+	commandTag = CreateCommandTag((Node *)command);
+
+	switch (commandTag)
+	{
+		case CMDTAG_CREATE_TABLE_AS:
+		case CMDTAG_SELECT_INTO:
+			{
+				CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt;
+				if (castmt->objtype == OBJECT_TABLE)
+				{
+					/*
+					 * Force skipping data population to avoid data
+					 * inconsistency. Data should be replicated from the
+					 * publisher instead.
+					 */
+					castmt->into->skipData = true;
+				}
+			}
+			break;
+		case CMDTAG_SELECT:
+			{
+				SelectStmt *sstmt = (SelectStmt *) command->stmt;
+
+				if (sstmt->intoClause != NULL)
+				{
+					/*
+					 * Force skipping data population to avoid data
+					 * inconsistency. Data should be replicated from the
+					 * publisher instead.
+					 */
+					sstmt->intoClause->skipData = true;
+				}
+			}
+			break;
+	default:
+		break;
+	}
+}
+
+/*
+ * Handle CREATE TABLE command
+ *
+ * Call AddSubscriptionRelState for CREATE TABEL command to set the relstate to
+ * SUBREL_STATE_READY so DML changes on this new table can be replicated without
+ * having to manually run "alter subscription ... refresh publication"
+ */
+static void
+handle_create_table(RawStmt *command)
+{
+	CommandTag	commandTag;
+	RangeVar	 *rv = NULL;
+	Oid			relid;
+	Oid			relnamespace = InvalidOid;
+	char 		 *schemaname = NULL;
+	char		 *relname = NULL;
+
+	commandTag = CreateCommandTag((Node *) command);
+
+	switch (commandTag)
+	{
+		case CMDTAG_CREATE_TABLE:
+			{
+				CreateStmt *cstmt = (CreateStmt *) command->stmt;
+				rv = cstmt->relation;
+			}
+			break;
+		default:
+			break;
+	}
+
+	if (!rv)
+		return;
+
+	schemaname = rv->schemaname;
+	relname = rv->relname;
+
+	if (schemaname != NULL)
+		relnamespace = get_namespace_oid(schemaname, false);
+
+	if (relnamespace != InvalidOid)
+		relid = get_relname_relid(relname, relnamespace);
+	else
+		relid = RelnameGetRelid(relname);
+
+	if (relid != InvalidOid)
+	{
+		AddSubscriptionRelState(MySubscription->oid, relid,
+								SUBREL_STATE_READY,
+								InvalidXLogRecPtr);
+		ereport(DEBUG1,
+				(errmsg_internal("table \"%s\" added to subscription \"%s\"",
+								 relname, MySubscription->name)));
+	}
+}
+
+static void
+apply_handle_ddl(StringInfo s)
+{
+	XLogRecPtr lsn;
+	const char *prefix = NULL;
+	char *message = NULL;
+	char	   *ddl_command;
+	Size		sz;
+	List	   *parsetree_list;
+	ListCell   *parsetree_item;
+	DestReceiver *receiver;
+	MemoryContext oldcontext;
+	const char *save_debug_query_string = debug_query_string;
+
+	message = logicalrep_read_ddlmessage(s, &lsn, &prefix, &sz);
+
+	/* Make sure we are in a transaction command */
+	begin_replication_step();
+
+	ddl_command = ddl_deparse_json_to_string(message);
+	debug_query_string = ddl_command;
+
+	/* DestNone for logical replication */
+	receiver = CreateDestReceiver(DestNone);
+	parsetree_list = pg_parse_query(ddl_command);
+
+	foreach(parsetree_item, parsetree_list)
+	{
+		List	   *plantree_list;
+		List	   *querytree_list;
+		RawStmt	   *command = (RawStmt *) lfirst(parsetree_item);
+		CommandTag	commandTag;
+		MemoryContext per_parsetree_context = NULL;
+		Portal		portal;
+		bool		 snapshot_set = false;
+
+		commandTag = CreateCommandTag((Node *) command);
+
+		/* If we got a cancel signal in parsing or prior command, quit */
+		CHECK_FOR_INTERRUPTS();
+
+		/* Remove data population from the command */
+		preprocess_create_table(command);
+
+		/*
+		 * Set up a snapshot if parse analysis/planning will need one.
+		 */
+		if (analyze_requires_snapshot(command))
+		{
+			PushActiveSnapshot(GetTransactionSnapshot());
+			snapshot_set = true;
+		}
+
+		/*
+		 * We do the work for each parsetree in a short-lived context, to
+		 * limit the memory used when there are many commands in the string.
+		 */
+		per_parsetree_context =
+			AllocSetContextCreate(CurrentMemoryContext,
+								  "execute_sql_string per-statement context",
+								  ALLOCSET_DEFAULT_SIZES);
+		oldcontext = MemoryContextSwitchTo(per_parsetree_context);
+
+		querytree_list = pg_analyze_and_rewrite_fixedparams(command,
+															ddl_command,
+															NULL, 0, NULL);
+
+		plantree_list = pg_plan_queries(querytree_list, ddl_command, 0, NULL);
+
+		/* Done with the snapshot used for parsing/planning */
+		if (snapshot_set)
+			PopActiveSnapshot();
+
+		portal = CreatePortal("logical replication", true, true);
+
+		/*
+		 * We don't have to copy anything into the portal, because everything
+		 * we are passing here is in ApplyMessageContext or the
+		 * per_parsetree_context, and so will outlive the portal anyway.
+		 */
+		PortalDefineQuery(portal,
+						  NULL,
+						  ddl_command,
+						  commandTag,
+						  plantree_list,
+						  NULL);
+
+		/*
+		 * Start the portal.  No parameters here.
+		 */
+		PortalStart(portal, NULL, 0, InvalidSnapshot);
+
+		/*
+		 * Switch back to transaction context for execution.
+		 */
+		MemoryContextSwitchTo(oldcontext);
+
+		(void) PortalRun(portal,
+						 FETCH_ALL,
+						 true,
+						 true,
+						 receiver,
+						 receiver,
+						 NULL);
+
+		PortalDrop(portal, false);
+
+		CommandCounterIncrement();
+
+		/*
+		 * Table created by DDL replication (database level) is automatically
+		 * added to the subscription here.
+		 */
+		handle_create_table(command);
+
+		/* Now we may drop the per-parsetree context, if one was created. */
+		MemoryContextDelete(per_parsetree_context);
+	}
+
+	debug_query_string = save_debug_query_string;
+	end_replication_step();
+}
+
 
 /*
  * Logical replication protocol message dispatcher.
@@ -2529,6 +2757,10 @@ apply_dispatch(StringInfo s)
 			 */
 			break;
 
+		case LOGICAL_REP_MSG_DDLMESSAGE:
+			apply_handle_ddl(s);
+			break;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			break;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 2ecaa5b907..5e5b1ca12f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -53,6 +53,11 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							 bool transactional, const char *prefix,
 							 Size sz, const char *message);
+static void pgoutput_ddlmessage(LogicalDecodingContext *ctx,
+								ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+								const char *prefix, Oid relid,
+								DeparsedCommandType cmdtype,
+								Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -256,6 +261,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
 	cb->message_cb = pgoutput_message;
+	cb->ddlmessage_cb = pgoutput_ddlmessage;
 	cb->commit_cb = pgoutput_commit_txn;
 
 	cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -272,6 +278,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_commit_cb = pgoutput_stream_commit;
 	cb->stream_change_cb = pgoutput_change;
 	cb->stream_message_cb = pgoutput_message;
+	cb->stream_ddlmessage_cb = pgoutput_ddlmessage;
 	cb->stream_truncate_cb = pgoutput_truncate;
 	/* transaction streaming - two-phase commit */
 	cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -426,6 +433,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 
 	/* This plugin uses binary protocol. */
 	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+	opt->receive_rewrites = true;
 
 	/*
 	 * This is replication start and not slot initialization.
@@ -499,6 +507,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 
 		/* Init publication state. */
 		data->publications = NIL;
+		data->deleted_relids = NIL;
 		publications_valid = false;
 		CacheRegisterSyscacheCallback(PUBLICATIONOID,
 									  publication_invalidation_cb,
@@ -1377,9 +1386,22 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
+	bool			table_rewrite = false;
 
 	update_replication_progress(ctx, false);
 
+	/*
+	 * For heap rewrites, we might need to replicate them if the rewritten
+	 * table publishes rewrite ddl message. So get the actual relation here and
+	 * check the pubaction later.
+	 */
+	if (relation->rd_rel->relrewrite)
+	{
+		table_rewrite = true;
+		relation = RelationIdGetRelation(relation->rd_rel->relrewrite);
+		targetrel = relation;
+	}
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1413,6 +1435,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/*
+	 * We don't publish table rewrite change unless we publish the rewrite ddl
+	 * message.
+	 */
+	if (table_rewrite && !relentry->pubactions.pubddl)
+		return;
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -1442,8 +1471,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			}
 
 			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
+			if (!table_rewrite &&
+				!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry, &action))
 				break;
 
 			/*
@@ -1463,8 +1492,19 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			maybe_send_schema(ctx, change, relation, relentry);
 
 			OutputPluginPrepareWrite(ctx, true);
-			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
+
+			/*
+			 * Convert the rewrite inserts to updates so that the subscriber
+			 * can replay it. This is needed to make sure the data between
+			 * publisher and subscriber is consistent.
+			 */
+			if (table_rewrite)
+				logicalrep_write_update(ctx->out, xid, targetrel,
+										NULL, new_slot, data->binary,
+										relentry->columns);
+			else
+				logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+										data->binary, relentry->columns);
 			OutputPluginWrite(ctx, true);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -1594,6 +1634,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		ancestor = NULL;
 	}
 
+	if (table_rewrite)
+		RelationClose(relation);
+
 	/* Cleanup */
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
@@ -1671,8 +1714,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 static void
 pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-				 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
-				 const char *message)
+				 XLogRecPtr message_lsn, bool transactional,
+				 const char *prefix, Size sz, const char *message)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
@@ -1712,6 +1755,137 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr message_lsn,
+					const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+					Size sz, const char *message)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+	Relation	relation = NULL;
+	TransactionId xid = InvalidTransactionId;
+	RelationSyncEntry *relentry;
+
+	/*
+	 * Remember the xid for the message in streaming mode. See
+	 * pgoutput_change.
+	 */
+	if (in_streaming)
+		xid = txn->xid;
+
+	switch (cmdtype)
+	{
+		case DCT_TableDropStart:
+			/*
+			 * On DROP start, add the relid to a deleted_relid list if the
+			 * relid is part of a publication that supports ddl publication. We
+			 * need this because on DROP end, the relid will no longer be
+			 * valid. Later on Drop end, verify that the drop is for a relid
+			 * that is on the deleted_rid list, and only then send the ddl
+			 * message.
+			 */
+			relation = RelationIdGetRelation(relid);
+
+			Assert(relation);
+			relentry = get_rel_sync_entry(data, relation);
+
+			if (relentry->pubactions.pubddl)
+				data->deleted_relids = lappend_oid(data->deleted_relids, relid);
+
+			RelationClose(relation);
+			return;
+		case DCT_TableDropEnd:
+			if (!list_member_oid(data->deleted_relids, relid))
+				return;
+			else
+				data->deleted_relids = list_delete_oid(data->deleted_relids, relid);
+			break;
+		case DCT_TableAlter:
+			/*
+			 * For table rewrite ddl, we first send the original ddl message to
+			 * subscriber, then convert the upcoming rewrite INSERT to UPDATE and
+			 * send them to subscriber so that the data between publisher and
+			 * subscriber can always be consistent.
+			 *
+			 * We do this way because of two reason:
+			 *
+			 * (1) The data before the rewrite ddl could already be different among
+			 * publisher and subscriber. To make sure the extra data in subscriber
+			 * which doesn't exist in publisher also get rewritten, we need to let
+			 * the subscriber execute the original rewrite ddl to rewrite all the
+			 * data at first.
+			 *
+			 * (2) the data after executing rewrite ddl could be different among
+			 * publisher and subscriber(due to different functions/operators used
+			 * during rewrite), so we need to replicate the rewrite UPDATEs to keep
+			 * the data consistent.
+			 *
+			 * TO IMPROVE: We could improve this by letting the subscriber only
+			 * rewrite the extra data instead of doing fully rewrite and use the
+			 * upcoming rewrite UPDATEs to rewrite the rest data. Besides, we may
+			 * not need to send rewrite changes for all type of rewrite ddl, for
+			 * example, it seems fine to skip sending rewrite changes for ALTER
+			 * TABLE SET LOGGED as the data in the table doesn't actually be
+			 * changed.
+			 */
+			relation = RelationIdGetRelation(relid);
+			Assert(relation);
+
+			relentry = get_rel_sync_entry(data, relation);
+
+			/*
+			 * Skip sending this ddl if we don't publish ddl message or the ddl
+			 * need to be published via its root relation.
+			 */
+			if (!relentry->pubactions.pubddl ||
+				relentry->publish_as_relid != relid)
+			{
+				RelationClose(relation);
+				return;
+			}
+
+			break;
+		case DCT_SimpleCmd:
+			relation = RelationIdGetRelation(relid);
+
+			if (relation == NULL)
+				break;
+
+			relentry = get_rel_sync_entry(data, relation);
+
+			if (!relentry->pubactions.pubddl)
+			{
+				RelationClose(relation);
+				return;
+			}
+
+			break;
+		case DCT_ObjectDrop:
+			/* do nothing */
+			break;
+		default:
+			elog(ERROR, "unsupported type %d", cmdtype);
+			break;
+	}
+
+	/* Send BEGIN if we haven't yet */
+	if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_ddlmessage(ctx->out,
+								xid,
+								message_lsn,
+								prefix,
+								sz,
+								message);
+	OutputPluginWrite(ctx, true);
+
+	if (relation)
+		RelationClose(relation);
+}
+
 /*
  * Return true if the data is associated with an origin and the user has
  * requested the changes that don't have an origin, false otherwise.
@@ -1993,7 +2167,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
-			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+			entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+			entry->pubactions.pubddl = false;
 		entry->new_slot = NULL;
 		entry->old_slot = NULL;
 		memset(entry->exprstate, 0, sizeof(entry->exprstate));
@@ -2051,6 +2226,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->pubactions.pubupdate = false;
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
+		entry->pubactions.pubddl = false;
 
 		/*
 		 * Tuple slots cleanups. (Will be rebuilt later if needed).
@@ -2164,6 +2340,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+				entry->pubactions.pubddl |= pub->pubactions.pubddl;
 
 				/*
 				 * We want to publish the changes as the top-most ancestor
@@ -2349,6 +2526,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	{
 		entry->replicate_valid = false;
 	}
+
 }
 
 /* Send Replication origin */
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 00dc0f2403..f1b4d093ce 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5683,6 +5683,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
 		pubdesc->pubactions.pubupdate |= pubform->pubupdate;
 		pubdesc->pubactions.pubdelete |= pubform->pubdelete;
 		pubdesc->pubactions.pubtruncate |= pubform->pubtruncate;
+		pubdesc->pubactions.pubddl |= pubform->pubddl;
 
 		/*
 		 * Check if all columns referenced in the filter expression are part
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index bd9b066e4e..97f434a7e8 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3897,6 +3897,7 @@ getPublications(Archive *fout, int *numPublications)
 	int			i_pubupdate;
 	int			i_pubdelete;
 	int			i_pubtruncate;
+	int			i_pubddl;
 	int			i_pubviaroot;
 	int			i,
 				ntups;
@@ -3912,23 +3913,23 @@ getPublications(Archive *fout, int *numPublications)
 	resetPQExpBuffer(query);
 
 	/* Get the publications. */
-	if (fout->remoteVersion >= 130000)
+	if (fout->remoteVersion >= 150000)
 		appendPQExpBufferStr(query,
 							 "SELECT p.tableoid, p.oid, p.pubname, "
 							 "p.pubowner, "
-							 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
+							 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubddl, p.pubviaroot "
 							 "FROM pg_publication p");
 	else if (fout->remoteVersion >= 110000)
 		appendPQExpBufferStr(query,
 							 "SELECT p.tableoid, p.oid, p.pubname, "
 							 "p.pubowner, "
-							 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
+							 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false as p.pubddl, false AS pubviaroot "
 							 "FROM pg_publication p");
 	else
 		appendPQExpBufferStr(query,
 							 "SELECT p.tableoid, p.oid, p.pubname, "
 							 "p.pubowner, "
-							 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
+							 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false as p.pubddl, false AS pubviaroot "
 							 "FROM pg_publication p");
 
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
@@ -3944,6 +3945,7 @@ getPublications(Archive *fout, int *numPublications)
 	i_pubupdate = PQfnumber(res, "pubupdate");
 	i_pubdelete = PQfnumber(res, "pubdelete");
 	i_pubtruncate = PQfnumber(res, "pubtruncate");
+	i_pubddl = PQfnumber(res, "pubddl");
 	i_pubviaroot = PQfnumber(res, "pubviaroot");
 
 	pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
@@ -3967,6 +3969,8 @@ getPublications(Archive *fout, int *numPublications)
 			(strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
 		pubinfo[i].pubtruncate =
 			(strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
+		pubinfo[i].pubddl =
+			(strcmp(PQgetvalue(res, i, i_pubddl), "t") == 0);
 		pubinfo[i].pubviaroot =
 			(strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
 
@@ -4046,6 +4050,15 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo)
 		first = false;
 	}
 
+	if (pubinfo->pubddl)
+	{
+		if (!first)
+			appendPQExpBufferStr(query, ", ");
+
+		appendPQExpBufferStr(query, "ddl");
+		first = false;
+	}
+
 	appendPQExpBufferChar(query, '\'');
 
 	if (pubinfo->pubviaroot)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 427f5d45f6..685683eeb0 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -620,6 +620,7 @@ typedef struct _PublicationInfo
 	bool		pubdelete;
 	bool		pubtruncate;
 	bool		pubviaroot;
+	bool		pubddl;
 } PublicationInfo;
 
 /*
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6b8c17bb4c..792f438959 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -27,6 +27,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/message.h"
+#include "replication/ddlmessage.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c645d66418..2e94fca744 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6159,7 +6159,7 @@ listPublications(const char *pattern)
 	PQExpBufferData buf;
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
-	static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
+	static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6194,6 +6194,10 @@ listPublications(const char *pattern)
 		appendPQExpBuffer(&buf,
 						  ",\n  pubviaroot AS \"%s\"",
 						  gettext_noop("Via root"));
+	if (pset.sversion >= 140000)
+		appendPQExpBuffer(&buf,
+						  ",\n  pubddl AS \"%s\"",
+						  gettext_noop("DDLs"));
 
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
@@ -6284,6 +6288,7 @@ describePublications(const char *pattern)
 	PGresult   *res;
 	bool		has_pubtruncate;
 	bool		has_pubviaroot;
+	bool		has_pubddl;
 
 	PQExpBufferData title;
 	printTableContent cont;
@@ -6300,6 +6305,7 @@ describePublications(const char *pattern)
 
 	has_pubtruncate = (pset.sversion >= 110000);
 	has_pubviaroot = (pset.sversion >= 130000);
+	has_pubddl =  (pset.sversion >= 150000);
 
 	initPQExpBuffer(&buf);
 
@@ -6313,6 +6319,9 @@ describePublications(const char *pattern)
 	if (has_pubviaroot)
 		appendPQExpBufferStr(&buf,
 							 ", pubviaroot");
+	if (has_pubddl)
+		appendPQExpBufferStr(&buf,
+							 ", pubddl");
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
 
@@ -6364,6 +6373,8 @@ describePublications(const char *pattern)
 			ncols++;
 		if (has_pubviaroot)
 			ncols++;
+		if (has_pubddl)
+			ncols++;
 
 		initPQExpBuffer(&title);
 		printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -6378,6 +6389,8 @@ describePublications(const char *pattern)
 			printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
 		if (has_pubviaroot)
 			printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
+		if (has_pubddl)
+			printTableAddHeader(&cont, gettext_noop("DDLs"), true, align);
 
 		printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
 		printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -6388,6 +6401,8 @@ describePublications(const char *pattern)
 			printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
 		if (has_pubviaroot)
 			printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
+		if (has_pubddl)
+			printTableAddCell(&cont, PQgetvalue(res, i, 9), false, false);
 
 		if (!puballtables)
 		{
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 000bcbfdaf..37dfd451f6 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, logicalddlmsg_decode)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 2c87106eef..ce1b17f5ab 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11829,4 +11829,13 @@
 { oid => '4643', descr => 'expand json format DDL to a plain DDL command',
   proname => 'ddl_deparse_expand_command', prorettype => 'text',
   proargtypes => 'text', prosrc => 'ddl_deparse_expand_command' },
+{ oid => '4644', descr => 'trigger for ddl command deparse',
+  proname => 'publication_deparse_ddl_command_end', prorettype => 'event_trigger',
+  proargtypes => '', prosrc => 'publication_deparse_ddl_command_end' },
+{ oid => '4645', descr => 'trigger for ddl command deparse start',
+  proname => 'publication_deparse_ddl_command_start', prorettype => 'event_trigger',
+  proargtypes => '', prosrc => 'publication_deparse_ddl_command_start' },
+{ oid => '4646', descr => 'trigger for ddl command deparse table rewrite',
+  proname => 'publication_deparse_table_rewrite', prorettype => 'event_trigger',
+  proargtypes => '', prosrc => 'publication_deparse_table_rewrite' },
 ]
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index ecf5a28e00..dafd48376a 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -54,6 +54,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 
 	/* true if partition changes are published using root schema */
 	bool		pubviaroot;
+
+	/* true if table creations are published */
+	bool		pubddl;
 } FormData_pg_publication;
 
 /* ----------------
@@ -72,6 +75,7 @@ typedef struct PublicationActions
 	bool		pubupdate;
 	bool		pubdelete;
 	bool		pubtruncate;
+	bool		pubddl;
 } PublicationActions;
 
 typedef struct PublicationDesc
diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h
index 10091c3aaf..fd2ee7ffe4 100644
--- a/src/include/commands/event_trigger.h
+++ b/src/include/commands/event_trigger.h
@@ -71,7 +71,8 @@ extern void EventTriggerCollectSimpleCommand(ObjectAddress address,
 extern void EventTriggerAlterTableStart(Node *parsetree);
 extern void EventTriggerAlterTableRelid(Oid objectId);
 extern void EventTriggerCollectAlterTableSubcmd(Node *subcmd,
-												ObjectAddress address);
+												ObjectAddress address,
+												bool rewrite);
 extern void EventTriggerAlterTableEnd(void);
 
 extern void EventTriggerCollectGrant(InternalGrant *istmt);
diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h
new file mode 100644
index 0000000000..a8dca863b5
--- /dev/null
+++ b/src/include/replication/ddlmessage.h
@@ -0,0 +1,60 @@
+/*-------------------------------------------------------------------------
+ * ddlmessage.h
+ *	   Exports from replication/logical/ddlmessage.c
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/ddlmessage.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_DDL_MESSAGE_H
+#define PG_LOGICAL_DDL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "nodes/nodes.h"
+
+
+/*
+ * Support for keeping track of deparsed commands.
+ */
+typedef enum DeparsedCommandType
+{
+	DCT_SimpleCmd,
+	DCT_TableDropStart,
+	DCT_TableDropEnd,
+	DCT_TableAlter,
+	DCT_ObjectCreate,
+	DCT_ObjectDrop
+} DeparsedCommandType;
+
+/*
+ * Generic logical decoding DDL message wal record.
+ */
+typedef struct xl_logical_ddl_message
+{
+	Oid						dbId;			/* database Oid emitted from */
+	Size					prefix_size;	/* length of prefix */
+	Oid						relid;			/* id of the table */
+	DeparsedCommandType		cmdtype;		/* type of sql command */
+	Size					message_size;	  /* size of the message */
+
+	/*
+	 * payload, including null-terminated prefix of length prefix_size
+	 */
+	char		message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_ddl_message;
+
+#define SizeOfLogicalDDLMessage	(offsetof(xl_logical_ddl_message, message))
+
+extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+									   const char *ddl_message, size_t size);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_DDL_MESSAGE	0x00
+void		logicalddlmsg_redo(XLogReaderState *record);
+void		logicalddlmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalddlmsg_identify(uint8 info);
+
+#endif
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 741bf65cf7..427a7b997d 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 7eaa4c97ed..5d617484fb 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
 	LOGICAL_REP_MSG_MESSAGE = 'M',
+	LOGICAL_REP_MSG_DDLMESSAGE = 'L',
 	LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
 	LOGICAL_REP_MSG_PREPARE = 'P',
 	LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -229,7 +230,11 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
-									 bool transactional, const char *prefix, Size sz, const char *message);
+									 bool transactional, const char *prefix,
+									 Size sz, const char *message);
+extern void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+										const char *prefix, Size sz, const char *message);
+extern char *logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix, Size *sz);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel, Bitmapset *columns);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index b7d28d7045..763e43f6be 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -90,6 +90,18 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 										Size message_size,
 										const char *message);
 
+/*
+ * Called for the logical decoding DDL messages.
+ */
+typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
+										   ReorderBufferTXN *txn,
+										   XLogRecPtr message_lsn,
+										   const char *prefix,
+										   Oid relid,
+										   DeparsedCommandType cmdtype,
+										   Size message_size,
+										   const char *message);
+
 /*
  * Filter changes by origin.
  */
@@ -201,6 +213,19 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
 											  Size message_size,
 											  const char *message);
 
+/*
+ * Callback for streaming logical decoding DDL messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
+												 ReorderBufferTXN *txn,
+												 XLogRecPtr message_lsn,
+												 const char *prefix,
+												 Oid relid,
+												 DeparsedCommandType cmdtype,
+												 Size message_size,
+												 const char *message);
+
 /*
  * Callback for streaming truncates from in-progress transactions.
  */
@@ -221,6 +246,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeDDLMessageCB ddlmessage_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
@@ -239,6 +265,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeStreamCommitCB stream_commit_cb;
 	LogicalDecodeStreamChangeCB stream_change_cb;
 	LogicalDecodeStreamMessageCB stream_message_cb;
+	LogicalDecodeStreamDDLMessageCB stream_ddlmessage_cb;
 	LogicalDecodeStreamTruncateCB stream_truncate_cb;
 } OutputPluginCallbacks;
 
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 02027550e2..83e0b1e2e5 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -25,6 +25,7 @@ typedef struct PGOutputData
 	uint32		protocol_version;
 	List	   *publication_names;
 	List	   *publications;
+	List	   *deleted_relids;
 	bool		binary;
 	bool		streaming;
 	bool		messages;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 02b59a1931..0b1a9161c5 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -11,6 +11,8 @@
 
 #include "access/htup_details.h"
 #include "lib/ilist.h"
+#include "nodes/nodes.h"
+#include "replication/ddlmessage.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
 #include "utils/relcache.h"
@@ -56,6 +58,7 @@ typedef enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INSERT,
 	REORDER_BUFFER_CHANGE_UPDATE,
 	REORDER_BUFFER_CHANGE_DELETE,
+	REORDER_BUFFER_CHANGE_DDLMESSAGE,
 	REORDER_BUFFER_CHANGE_MESSAGE,
 	REORDER_BUFFER_CHANGE_INVALIDATION,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
@@ -130,6 +133,16 @@ typedef struct ReorderBufferChange
 			char	   *message;
 		}			msg;
 
+		/* DDL Message. */
+		struct
+		{
+			char	   *prefix;
+			Size		message_size;
+			char	   *message;
+			Oid			relid;
+			DeparsedCommandType	cmdtype;
+		}			ddlmsg;
+
 		/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
 		Snapshot	snapshot;
 
@@ -435,6 +448,16 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* DDL message callback signature */
+typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb,
+										   ReorderBufferTXN *txn,
+										   XLogRecPtr message_lsn,
+										   const char *prefix,
+										   Oid relid,
+										   DeparsedCommandType cmdtype,
+										   Size sz,
+										   const char *message);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -501,6 +524,17 @@ typedef void (*ReorderBufferStreamMessageCB) (
 											  const char *prefix, Size sz,
 											  const char *message);
 
+/* stream DDL message callback signature */
+typedef void (*ReorderBufferStreamDDLMessageCB) (
+												 ReorderBuffer *rb,
+												 ReorderBufferTXN *txn,
+												 XLogRecPtr message_lsn,
+												 const char *prefix,
+												 Oid relid,
+												 DeparsedCommandType cmdtype,
+												 Size sz,
+												 const char *message);
+
 /* stream truncate callback signature */
 typedef void (*ReorderBufferStreamTruncateCB) (
 											   ReorderBuffer *rb,
@@ -552,6 +586,7 @@ struct ReorderBuffer
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
+	ReorderBufferDDLMessageCB ddlmessage;
 
 	/*
 	 * Callbacks to be called when streaming a transaction at prepare time.
@@ -571,6 +606,7 @@ struct ReorderBuffer
 	ReorderBufferStreamCommitCB stream_commit;
 	ReorderBufferStreamChangeCB stream_change;
 	ReorderBufferStreamMessageCB stream_message;
+	ReorderBufferStreamDDLMessageCB stream_ddlmessage;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
 	/*
@@ -650,6 +686,9 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 									  Snapshot snap, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+										  const char *prefix, Size message_size,
+										  const char *message, Oid relid, DeparsedCommandType cmdtype);
 extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index a7f5700edc..bfc73c2328 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -5970,9 +5970,9 @@ List of schemas
 (0 rows)
 
 \dRp "no.such.publication"
-                              List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root 
-------+-------+------------+---------+---------+---------+-----------+----------
+                                 List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+------+-------+------------+---------+---------+---------+-----------+----------+------
 (0 rows)
 
 \dRs "no.such.subscription"
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 427f87ea07..2b7cd0d596 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -30,20 +30,20 @@ ERROR:  conflicting or redundant options
 LINE 1: ...ub_xxx WITH (publish_via_partition_root = 'true', publish_vi...
                                                              ^
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f
+                                                  List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | f
+ testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f        | f
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f
+                                                  List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | f
+ testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f        | f
 (2 rows)
 
 --- adding tables
@@ -87,10 +87,10 @@ RESET client_min_messages;
 -- should be able to add schema to 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable ADD TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl1"
 Tables from schemas:
@@ -99,20 +99,20 @@ Tables from schemas:
 -- should be able to drop schema from 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable DROP TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl1"
 
 -- should be able to set schema to 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable SET TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test"
 
@@ -123,10 +123,10 @@ CREATE PUBLICATION testpub_forschema FOR TABLES IN SCHEMA pub_test;
 CREATE PUBLICATION testpub_for_tbl_schema FOR TABLES IN SCHEMA pub_test, TABLE pub_test.testpub_nopk;
 RESET client_min_messages;
 \dRp+ testpub_for_tbl_schema
-                             Publication testpub_for_tbl_schema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                Publication testpub_for_tbl_schema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test.testpub_nopk"
 Tables from schemas:
@@ -135,10 +135,10 @@ Tables from schemas:
 -- should be able to add a table of the same schema to the schema publication
 ALTER PUBLICATION testpub_forschema ADD TABLE pub_test.testpub_nopk;
 \dRp+ testpub_forschema
-                               Publication testpub_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test.testpub_nopk"
 Tables from schemas:
@@ -147,10 +147,10 @@ Tables from schemas:
 -- should be able to drop the table
 ALTER PUBLICATION testpub_forschema DROP TABLE pub_test.testpub_nopk;
 \dRp+ testpub_forschema
-                               Publication testpub_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test"
 
@@ -161,10 +161,10 @@ ERROR:  relation "testpub_nopk" is not part of the publication
 -- should be able to set table to schema publication
 ALTER PUBLICATION testpub_forschema SET TABLE pub_test.testpub_nopk;
 \dRp+ testpub_forschema
-                               Publication testpub_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test.testpub_nopk"
 
@@ -186,10 +186,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                              Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | t          | t       | t       | f       | f         | f
+                                 Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | t          | t       | t       | f       | f         | f        | f
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -201,19 +201,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 RESET client_min_messages;
 \dRp+ testpub3
-                                    Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                       Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                                    Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                       Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl3"
 
@@ -234,10 +234,10 @@ UPDATE testpub_parted1 SET a = 1;
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_parted"
 
@@ -252,10 +252,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 UPDATE testpub_parted1 SET a = 1;
 ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | t
+                                   Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | t        | f
 Tables:
     "public.testpub_parted"
 
@@ -284,10 +284,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert');
 RESET client_min_messages;
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -300,10 +300,10 @@ Tables:
 
 ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -319,10 +319,10 @@ Publications:
 
 ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
@@ -330,10 +330,10 @@ Tables:
 -- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
 ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500))
 
@@ -366,10 +366,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert');
 RESET client_min_messages;
 \dRp+ testpub_syntax1
-                                Publication testpub_syntax1
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                    Publication testpub_syntax1
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl3" WHERE (e < 999)
@@ -379,10 +379,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert');
 RESET client_min_messages;
 \dRp+ testpub_syntax2
-                                Publication testpub_syntax2
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                    Publication testpub_syntax2
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "testpub_rf_schema1.testpub_rf_tbl5" WHERE (h < 999)
@@ -497,10 +497,10 @@ CREATE PUBLICATION testpub6 FOR TABLES IN SCHEMA testpub_rf_schema2;
 ALTER PUBLICATION testpub6 SET TABLES IN SCHEMA testpub_rf_schema2, TABLE testpub_rf_schema2.testpub_rf_tbl6 WHERE (i < 99);
 RESET client_min_messages;
 \dRp+ testpub6
-                                    Publication testpub6
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                       Publication testpub6
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "testpub_rf_schema2.testpub_rf_tbl6" WHERE (i < 99)
 Tables from schemas:
@@ -714,10 +714,10 @@ CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate');
 RESET client_min_messages;
 ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a);		-- ok
 \dRp+ testpub_table_ins
-                               Publication testpub_table_ins
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | t         | f
+                                   Publication testpub_table_ins
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | t         | f        | f
 Tables:
     "public.testpub_tbl5" (a)
 
@@ -891,10 +891,10 @@ CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c));
 ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey;
 ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1);
 \dRp+ testpub_both_filters
-                              Publication testpub_both_filters
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                 Publication testpub_both_filters
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1)
 
@@ -1099,10 +1099,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                                 Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                    Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -1140,10 +1140,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                    Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -1221,10 +1221,10 @@ REVOKE CREATE ON DATABASE regression FROM regress_publication_user2;
 DROP TABLE testpub_parted;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                    Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | f
 (1 row)
 
 -- fail - must be owner of publication
@@ -1234,20 +1234,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                                           List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
--------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f
+                                              List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+-------------+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f        | f
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                             List of publications
-      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
------------------+---------------------------+------------+---------+---------+---------+-----------+----------
- testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f
+                                                 List of publications
+      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+------
+ testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f        | f
 (1 row)
 
 -- adding schemas and tables
@@ -1263,19 +1263,19 @@ CREATE TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"(id int);
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub1_forschema FOR TABLES IN SCHEMA pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
 CREATE PUBLICATION testpub2_forschema FOR TABLES IN SCHEMA pub_test1, pub_test2, pub_test3;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1289,44 +1289,44 @@ CREATE PUBLICATION testpub6_forschema FOR TABLES IN SCHEMA "CURRENT_SCHEMA", CUR
 CREATE PUBLICATION testpub_fortable FOR TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA";
 RESET client_min_messages;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "public"
 
 \dRp+ testpub4_forschema
-                               Publication testpub4_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub4_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "CURRENT_SCHEMA"
 
 \dRp+ testpub5_forschema
-                               Publication testpub5_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub5_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "CURRENT_SCHEMA"
     "public"
 
 \dRp+ testpub6_forschema
-                               Publication testpub6_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub6_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "CURRENT_SCHEMA"
     "public"
 
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "CURRENT_SCHEMA.CURRENT_SCHEMA"
 
@@ -1360,10 +1360,10 @@ ERROR:  schema "testpub_view" does not exist
 -- dropping the schema should reflect the change in publication
 DROP SCHEMA pub_test3;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1371,20 +1371,20 @@ Tables from schemas:
 -- renaming the schema should reflect the change in publication
 ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1_renamed"
     "pub_test2"
 
 ALTER SCHEMA pub_test1_renamed RENAME to pub_test1;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1392,10 +1392,10 @@ Tables from schemas:
 -- alter publication add schema
 ALTER PUBLICATION testpub1_forschema ADD TABLES IN SCHEMA pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1404,10 +1404,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema ADD TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1416,10 +1416,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema ADD TABLES IN SCHEMA pub_test1;
 ERROR:  schema "pub_test1" is already member of publication "testpub1_forschema"
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1427,10 +1427,10 @@ Tables from schemas:
 -- alter publication drop schema
 ALTER PUBLICATION testpub1_forschema DROP TABLES IN SCHEMA pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
@@ -1438,10 +1438,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema DROP TABLES IN SCHEMA pub_test2;
 ERROR:  tables from schema "pub_test2" are not part of the publication
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
@@ -1449,29 +1449,29 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema DROP TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
 -- drop all schemas
 ALTER PUBLICATION testpub1_forschema DROP TABLES IN SCHEMA pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 (1 row)
 
 -- alter publication set multiple schema
 ALTER PUBLICATION testpub1_forschema SET TABLES IN SCHEMA pub_test1, pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1480,10 +1480,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema SET TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1492,10 +1492,10 @@ Tables from schemas:
 -- removing the duplicate schemas
 ALTER PUBLICATION testpub1_forschema SET TABLES IN SCHEMA pub_test1, pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
@@ -1574,18 +1574,18 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub3_forschema;
 RESET client_min_messages;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 (1 row)
 
 ALTER PUBLICATION testpub3_forschema SET TABLES IN SCHEMA pub_test1;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
@@ -1595,20 +1595,20 @@ CREATE PUBLICATION testpub_forschema_fortable FOR TABLES IN SCHEMA pub_test1, TA
 CREATE PUBLICATION testpub_fortable_forschema FOR TABLE pub_test2.tbl1, TABLES IN SCHEMA pub_test1;
 RESET client_min_messages;
 \dRp+ testpub_forschema_fortable
-                           Publication testpub_forschema_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                              Publication testpub_forschema_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test2.tbl1"
 Tables from schemas:
     "pub_test1"
 
 \dRp+ testpub_fortable_forschema
-                           Publication testpub_fortable_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                              Publication testpub_fortable_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test2.tbl1"
 Tables from schemas:
-- 
2.32.0

