From 2b5ea14b6111fd69b1bd5375e1300c6a7a6d2e5f Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 9 Jan 2023 09:32:09 +0530
Subject: [PATCH v61 5/7] DDL messaging infrastructure for DDL replication.

DDL messaging infrastructure for DDL replication.
---
 contrib/test_decoding/expected/ddl.out        |  26 ++++
 contrib/test_decoding/sql/ddl.sql             |   5 +
 contrib/test_decoding/test_decoding.c         |  48 +++++++
 src/backend/access/rmgrdesc/Makefile          |   1 +
 .../access/rmgrdesc/logicalddlmsgdesc.c       |  52 +++++++
 src/backend/access/rmgrdesc/meson.build       |   1 +
 src/backend/access/transam/rmgr.c             |   1 +
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/ddlmessage.c  |  84 ++++++++++++
 src/backend/replication/logical/decode.c      |  41 ++++++
 src/backend/replication/logical/logical.c     |  44 ++++++
 .../replication/logical/logicalfuncs.c        |  24 ++++
 src/backend/replication/logical/meson.build   |   1 +
 .../replication/logical/reorderbuffer.c       | 127 ++++++++++++++++++
 src/bin/pg_waldump/rmgrdesc.c                 |   1 +
 src/include/access/rmgrlist.h                 |   1 +
 src/include/catalog/pg_proc.dat               |   8 ++
 src/include/replication/ddlmessage.h          |  60 +++++++++
 src/include/replication/decode.h              |   1 +
 src/include/replication/output_plugin.h       |  14 ++
 src/include/replication/reorderbuffer.h       |  27 ++++
 21 files changed, 568 insertions(+)
 create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c
 create mode 100644 src/backend/replication/logical/ddlmessage.c
 create mode 100644 src/include/replication/ddlmessage.h

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 9a28b5ddc5..0f51f2b41a 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -831,6 +831,32 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 ------
 (0 rows)
 
+SELECT 'ddl msg1' FROM pg_logical_emit_ddl_message('ddl msg1', 16394, 1, 'msg1');
+ ?column? 
+----------
+ ddl msg1
+(1 row)
+
+SELECT 'ddl msg2' FROM pg_logical_emit_ddl_message('ddl msg2', 16394, 1, '{"fmt": "CREATE SCHEMA %{if_not_exists}s %{name}I %{authorization}s", "name": "foo", "authorization": {"fmt": "AUTHORIZATION %{authorization_role}I", "present": false, "authorization_role": null}, "if_not_exists": ""}');
+ ?column? 
+----------
+ ddl msg2
+(1 row)
+
+SELECT 'ddl msg3' FROM pg_logical_emit_ddl_message('ddl msg3', 16394, 1, '{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "foo", "schemaname": "element_test"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}}');
+ ?column? 
+----------
+ ddl msg3
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                data                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ message: prefix: ddl msg1, relid: 16394, cmdtype: Drop start, sz: 4 content:msg1
+ message: prefix: ddl msg2, relid: 16394, cmdtype: Drop start, sz: 217 content:{"fmt": "CREATE SCHEMA %{if_not_exists}s %{name}I %{authorization}s", "name": "foo", "authorization": {"fmt": "AUTHORIZATION %{authorization_role}I", "present": false, "authorization_role": null}, "if_not_exists": ""}
+ message: prefix: ddl msg3, relid: 16394, cmdtype: Drop start, sz: 1396 content:{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "foo", "schemaname": "element_test"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}}
+(3 rows)
+
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index 4f76bed72c..1ea5a4b271 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -431,6 +431,11 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 -- done, free logical replication slot
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
+SELECT 'ddl msg1' FROM pg_logical_emit_ddl_message('ddl msg1', 16394, 1, 'msg1');
+SELECT 'ddl msg2' FROM pg_logical_emit_ddl_message('ddl msg2', 16394, 1, '{"fmt": "CREATE SCHEMA %{if_not_exists}s %{name}I %{authorization}s", "name": "foo", "authorization": {"fmt": "AUTHORIZATION %{authorization_role}I", "present": false, "authorization_role": null}, "if_not_exists": ""}');
+SELECT 'ddl msg3' FROM pg_logical_emit_ddl_message('ddl msg3', 16394, 1, '{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "foo", "schemaname": "element_test"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}}');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
 SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e523d22eba..cabf0bc1d9 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -14,9 +14,11 @@
 
 #include "catalog/pg_type.h"
 
+#include "replication/ddlmessage.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
 
+#include "tcop/ddl_deparse.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -72,6 +74,12 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr lsn,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
+static void pg_decode_ddl_message(LogicalDecodingContext *ctx,
+								  ReorderBufferTXN *txn,
+								  XLogRecPtr message_lsn,
+								  const char *prefix, Oid relid,
+								  DeparsedCommandType cmdtype,
+								  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
 									 TransactionId xid,
 									 const char *gid);
@@ -137,6 +145,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->ddl_cb = pg_decode_ddl_message;
 	cb->filter_prepare_cb = pg_decode_filter_prepare;
 	cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
 	cb->prepare_cb = pg_decode_prepare_txn;
@@ -752,6 +761,45 @@ pg_decode_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_ddl_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					  XLogRecPtr message_lsn, const char *prefix, Oid relid,
+					  DeparsedCommandType cmdtype, Size sz, const char *message)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "message: prefix: %s, relid: %u, ",
+					 prefix, relid);
+
+	switch(cmdtype)
+	{
+	case DCT_SimpleCmd:
+		appendStringInfo(ctx->out, "cmdtype: Simple, ");
+		break;
+	case DCT_TableDropStart:
+		appendStringInfo(ctx->out, "cmdtype: Drop start, ");
+		break;
+	case DCT_TableDropEnd:
+		appendStringInfo(ctx->out, "cmdtype: Drop end, ");
+		break;
+	case DCT_TableAlter:
+		appendStringInfo(ctx->out, "cmdtype: Alter table, ");
+		break;
+	case DCT_ObjectCreate:
+		appendStringInfo(ctx->out, "cmdtype: Create, ");
+		break;
+	case DCT_ObjectDrop:
+		appendStringInfo(ctx->out, "cmdtype: Drop, ");
+		break;
+	default:
+		appendStringInfo(ctx->out, "cmdtype: Invalid, ");
+		break;
+	}
+
+	appendStringInfo(ctx->out, "sz: %zu content:", sz);
+	appendBinaryStringInfo(ctx->out, message, sz);
+	OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd86..2ff01e69bf 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -18,6 +18,7 @@ OBJS = \
 	gistdesc.o \
 	hashdesc.o \
 	heapdesc.o \
+	logicalddlmsgdesc.o \
 	logicalmsgdesc.o \
 	mxactdesc.o \
 	nbtdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
new file mode 100644
index 0000000000..05e930c90c
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
@@ -0,0 +1,52 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalddlmsgdesc.c
+ *	  rmgr descriptor routines for replication/logical/ddlmessage.c
+ *
+ * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/logicalddlmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/ddlmessage.h"
+
+void
+logicalddlmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_LOGICAL_DDL_MESSAGE)
+	{
+		xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec;
+		char	   *prefix = xlrec->message;
+		char	   *message = xlrec->message + xlrec->prefix_size;
+		char	   *sep = "";
+
+		Assert(prefix[xlrec->prefix_size] != '\0');
+
+		appendStringInfo(buf, "prefix \"%s\"; payload (%zu bytes): ",
+						 prefix, xlrec->message_size);
+		appendStringInfo(buf, "relid %u cmdtype %u", xlrec->relid, xlrec->cmdtype);
+		/* Write message payload as a series of hex bytes */
+		for (int cnt = 0; cnt < xlrec->message_size; cnt++)
+		{
+			appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]);
+			sep = " ";
+		}
+	}
+}
+
+const char *
+logicalddlmsg_identify(uint8 info)
+{
+	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE)
+		return "DDL";
+
+	return NULL;
+}
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index 166cee67b6..781e2d7713 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -11,6 +11,7 @@ rmgr_desc_sources = files(
   'gistdesc.c',
   'hashdesc.c',
   'heapdesc.c',
+  'logicalddlmsgdesc.c',
   'logicalmsgdesc.c',
   'mxactdesc.c',
   'nbtdesc.c',
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7d67eda5f7..678e81ae01 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -27,6 +27,7 @@
 #include "fmgr.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "replication/ddlmessage.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 2dc25e37bb..d3680e9bb5 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
 OBJS = \
 	applyparallelworker.o \
+	ddlmessage.o \
 	decode.o \
 	launcher.o \
 	logical.o \
diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c
new file mode 100644
index 0000000000..e11e56e5a3
--- /dev/null
+++ b/src/backend/replication/logical/ddlmessage.c
@@ -0,0 +1,84 @@
+/*-------------------------------------------------------------------------
+ *
+ * ddlmessage.c
+ *	  Logical DDL messages.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/ddlmessage.c
+ *
+ * NOTES
+ *
+ * Logical DDL messages allow XLOG logging of DDL command strings that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * Unlike generic logical messages, these DDL messages have only transactional
+ * mode. Note by default DDLs in PostgreSQL are transactional.
+ *
+ * These messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/namespace.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/ddlmessage.h"
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding DDL message into XLog.
+ */
+XLogRecPtr
+LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+					 const char *message, size_t size)
+{
+	xl_logical_ddl_message xlrec;
+
+	 /* Ensure we have a valid transaction id. */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	xlrec.dbId = MyDatabaseId;
+	/* Trailing zero is critical; see logicalddlmsg_desc */
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.message_size = size;
+	xlrec.relid = relid;
+	xlrec.cmdtype = cmdtype;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage);
+	XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+	XLogRegisterData(unconstify(char *, message), size);
+
+	/* Allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding ddl messages.
+ */
+void
+logicalddlmsg_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info != XLOG_LOGICAL_DDL_MESSAGE)
+		elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info);
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index a53e23c679..4151aa730e 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -36,6 +36,7 @@
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
 #include "replication/decode.h"
+#include "replication/ddlmessage.h"
 #include "replication/logical.h"
 #include "replication/message.h"
 #include "replication/origin.h"
@@ -603,6 +604,46 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 							  message->message + message->prefix_size);
 }
 
+/*
+ * Handle rmgr LOGICALDDLMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+void
+logicalddl_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	XLogReaderState *r = buf->record;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	xl_logical_ddl_message *message;
+
+	if (info != XLOG_LOGICAL_DDL_MESSAGE)
+		elog(ERROR, "unexpected RM_LOGICALDDLMSG_ID record type: %u", info);
+
+	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding ddl messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	message = (xl_logical_ddl_message *) XLogRecGetData(r);
+
+	if (message->dbId != ctx->slot->data.database ||
+		FilterByOrigin(ctx, origin_id))
+		return;
+
+	if (SnapBuildProcessChange(builder, xid, buf->origptr))
+		ReorderBufferQueueDDLMessage(ctx->reorder, xid, buf->endptr,
+									 message->message, /* first part of message is prefix */
+									 message->message_size,
+									 message->message + message->prefix_size,
+									 message->relid, message->cmdtype);
+}
+
 /*
  * Consolidated commit record handling between the different form of commit
  * records.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1a58dd7649..d5afae9d90 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -33,6 +33,7 @@
 #include "fmgr.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/ddlmessage.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
@@ -73,6 +74,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static void ddl_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+								  XLogRecPtr message_lsn, const char *prefix,
+								  Oid relid, DeparsedCommandType cmdtype,
+								  Size message_size, const char *message);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -218,6 +223,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->ddl = ddl_cb_wrapper;
 
 	/*
 	 * To support streaming, we require start/stop/abort/commit/change
@@ -1222,6 +1228,44 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+ddl_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+					  XLogRecPtr message_lsn,
+					  const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+					  Size message_size,
+					  const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	if (ctx->callbacks.ddl_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "ddl";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.ddl_cb(ctx, txn, message_lsn, prefix, relid, cmdtype,
+								 message_size, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						XLogRecPtr first_lsn)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b..92f5f9357e 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -29,6 +29,7 @@
 #include "nodes/makefuncs.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/ddlmessage.h"
 #include "replication/message.h"
 #include "storage/fd.h"
 #include "utils/array.h"
@@ -388,3 +389,26 @@ pg_logical_emit_message_text(PG_FUNCTION_ARGS)
 	/* bytea and text are compatible */
 	return pg_logical_emit_message_bytea(fcinfo);
 }
+
+/*
+ * SQL function for writing logical decoding DDL message into WAL.
+ */
+Datum
+pg_logical_emit_ddl_message_bytea(PG_FUNCTION_ARGS)
+{
+	char	   *prefix = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	Oid			relid = PG_GETARG_OID(1);
+	DeparsedCommandType cmdtype = PG_GETARG_INT16(2);
+	char	   *data = text_to_cstring(PG_GETARG_TEXT_PP(3));
+	XLogRecPtr	lsn;
+
+	lsn = LogLogicalDDLMessage(prefix, relid, cmdtype, data, strlen(data));
+	PG_RETURN_LSN(lsn);
+}
+
+Datum
+pg_logical_emit_ddl_message_text(PG_FUNCTION_ARGS)
+{
+	/* bytea and text are compatible */
+	return pg_logical_emit_ddl_message_bytea(fcinfo);
+}
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index d48cd4c590..99c608d03f 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -2,6 +2,7 @@
 
 backend_sources += files(
   'applyparallelworker.c',
+  'ddlmessage.c',
   'decode.c',
   'launcher.c',
   'logical.c',
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 54ee824e6c..9a7dc9e404 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -94,6 +94,7 @@
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/ddlmessage.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
@@ -516,6 +517,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				pfree(change->data.msg.message);
 			change->data.msg.message = NULL;
 			break;
+		case REORDER_BUFFER_CHANGE_DDL:
+			if (change->data.ddl.prefix != NULL)
+				pfree(change->data.ddl.prefix);
+			change->data.ddl.prefix = NULL;
+			if (change->data.ddl.message != NULL)
+				pfree(change->data.ddl.message);
+			change->data.ddl.message = NULL;
+			break;
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			if (change->data.inval.invalidations)
 				pfree(change->data.inval.invalidations);
@@ -894,6 +903,36 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * A transactional DDL message is queued to be processed upon commit.
+ */
+void
+ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid,
+							 XLogRecPtr lsn, const char *prefix,
+							 Size message_size, const char *message,
+							 Oid relid, DeparsedCommandType cmdtype)
+{
+	MemoryContext oldcontext;
+	ReorderBufferChange *change;
+
+	Assert(TransactionIdIsValid(xid));
+
+	oldcontext = MemoryContextSwitchTo(rb->context);
+
+	change = ReorderBufferGetChange(rb);
+	change->action = REORDER_BUFFER_CHANGE_DDL;
+	change->data.ddl.prefix = pstrdup(prefix);
+	change->data.ddl.relid = relid;
+	change->data.ddl.cmdtype = cmdtype;
+	change->data.ddl.message_size = message_size;
+	change->data.ddl.message = palloc(message_size);
+	memcpy(change->data.ddl.message, message, message_size);
+
+	ReorderBufferQueueChange(rb, xid, lsn, change, false);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
@@ -2002,6 +2041,21 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					change->data.msg.message);
 }
 
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the DDL message.
+ */
+static inline void
+ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
+							 ReorderBufferChange *change, bool streaming)
+{
+	rb->ddl(rb, txn, change->lsn,
+			change->data.ddl.prefix,
+			change->data.ddl.relid,
+			change->data.ddl.cmdtype,
+			change->data.ddl.message_size,
+			change->data.ddl.message);
+}
+
 /*
  * Function to store the command id and snapshot at the end of the current
  * stream so that we can reuse the same while sending the next stream.
@@ -2382,6 +2436,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					ReorderBufferApplyMessage(rb, txn, change, streaming);
 					break;
 
+				case REORDER_BUFFER_CHANGE_DDL:
+					ReorderBufferApplyDDLMessage(rb, txn, change, streaming);
+					break;
+
 				case REORDER_BUFFER_CHANGE_INVALIDATION:
 					/* Execute the invalidation messages locally */
 					ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
@@ -3821,6 +3879,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					   change->data.msg.message_size);
 				data += change->data.msg.message_size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DDL:
+			{
+				char	   *data;
+				Size		prefix_size = strlen(change->data.ddl.prefix) + 1;
+
+				sz += prefix_size + change->data.ddl.message_size +
+					sizeof(Size) + sizeof(Oid) + sizeof(int) + sizeof(Size);
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				/* write the prefix, relid and cmdtype including the size */
+				memcpy(data, &prefix_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, &change->data.ddl.relid, sizeof(Oid));
+				data += sizeof(Oid);
+				memcpy(data, &change->data.ddl.cmdtype, sizeof(DeparsedCommandType));
+				data += sizeof(int);
+				memcpy(data, change->data.ddl.prefix, prefix_size);
+				data += prefix_size;
+
+				/* write the message including the size */
+				memcpy(data, &change->data.ddl.message_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, change->data.ddl.message,
+					   change->data.ddl.message_size);
+				data += change->data.ddl.message_size;
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
@@ -4135,6 +4226,15 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 				sz += prefix_size + change->data.msg.message_size +
 					sizeof(Size) + sizeof(Size);
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DDL:
+			{
+				Size		prefix_size = strlen(change->data.ddl.prefix) + 1;
+
+				sz += prefix_size + change->data.ddl.message_size +
+					sizeof(Size) + sizeof(Size) + sizeof(Oid) + sizeof(DeparsedCommandType);
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
@@ -4412,6 +4512,33 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					   change->data.msg.message_size);
 				data += change->data.msg.message_size;
 
+				break;
+			}
+		case REORDER_BUFFER_CHANGE_DDL:
+			{
+				Size		prefix_size;
+
+				/* read prefix */
+				memcpy(&prefix_size, data, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(&change->data.ddl.relid, data, sizeof(Oid));
+				data += sizeof(Oid);
+				memcpy(&change->data.ddl.cmdtype, data, sizeof(DeparsedCommandType));
+				data += sizeof(int);
+				change->data.ddl.prefix = MemoryContextAlloc(rb->context, prefix_size);
+				memcpy(change->data.ddl.prefix, data, prefix_size);
+				Assert(change->data.ddl.prefix[prefix_size - 1] == '\0');
+				data += prefix_size;
+
+				/* read the message */
+				memcpy(&change->data.msg.message_size, data, sizeof(Size));
+				data += sizeof(Size);
+				change->data.msg.message = MemoryContextAlloc(rb->context,
+															  change->data.msg.message_size);
+				memcpy(change->data.msg.message, data,
+					   change->data.msg.message_size);
+				data += change->data.msg.message_size;
+
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6b8c17bb4c..daf1730252 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -26,6 +26,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/ddlmessage.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 463bcb67c5..abcbe97593 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, logicalddl_decode)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7a9d846303..f5d1b0101b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11004,6 +11004,14 @@
   proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
   prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
   prosrc => 'pg_logical_emit_message_bytea' },
+{ oid => '3813', descr => 'emit a textual logical decoding message',
+  proname => 'pg_logical_emit_ddl_message', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => 'bool text text',
+  prosrc => 'pg_logical_emit_ddl_message_text' },
+{ oid => '3814', descr => 'emit a binary logical decoding message',
+  proname => 'pg_logical_emit_ddl_message', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => 'text regclass int4 text',
+  prosrc => 'pg_logical_emit_ddl_message_bytea' },
 
 # event triggers
 { oid => '3566', descr => 'list objects dropped by the current command',
diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h
new file mode 100644
index 0000000000..77df6ea11a
--- /dev/null
+++ b/src/include/replication/ddlmessage.h
@@ -0,0 +1,60 @@
+/*-------------------------------------------------------------------------
+ * ddlmessage.h
+ *	   Exports from replication/logical/ddlmessage.c
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/ddlmessage.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_DDL_MESSAGE_H
+#define PG_LOGICAL_DDL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "nodes/nodes.h"
+
+
+/*
+ * Support for keeping track of deparsed commands.
+ */
+typedef enum DeparsedCommandType
+{
+	DCT_SimpleCmd,
+	DCT_TableDropStart,
+	DCT_TableDropEnd,
+	DCT_TableAlter,
+	DCT_ObjectCreate,
+	DCT_ObjectDrop
+} DeparsedCommandType;
+
+/*
+ * Generic logical decoding DDL message wal record.
+ */
+typedef struct xl_logical_ddl_message
+{
+	Oid			dbId;			/* database Oid emitted from */
+	Size		prefix_size;	/* length of prefix including null terminator */
+	Oid			relid;			/* id of the table */
+	DeparsedCommandType cmdtype;	/* type of sql command */
+	Size		message_size;	/* size of the message */
+
+	/*
+	 * payload, including null-terminated prefix of length prefix_size
+	 */
+	char		message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_ddl_message;
+
+#define SizeOfLogicalDDLMessage	(offsetof(xl_logical_ddl_message, message))
+
+extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+									   const char *ddl_message, size_t size);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_DDL_MESSAGE	0x00
+void		logicalddlmsg_redo(XLogReaderState *record);
+void		logicalddlmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalddlmsg_identify(uint8 info);
+
+#endif
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 14fa921ab4..c9ac708d32 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalddl_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2d89d26586..5ed5e6a7bc 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -9,6 +9,7 @@
 #ifndef OUTPUT_PLUGIN_H
 #define OUTPUT_PLUGIN_H
 
+#include "replication/ddlmessage.h"
 #include "replication/reorderbuffer.h"
 
 struct LogicalDecodingContext;
@@ -90,6 +91,18 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 										Size message_size,
 										const char *message);
 
+/*
+ * Called for the logical decoding DDL messages.
+ */
+typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
+										   ReorderBufferTXN *txn,
+										   XLogRecPtr message_lsn,
+										   const char *prefix,
+										   Oid relid,
+										   DeparsedCommandType cmdtype,
+										   Size message_size,
+										   const char *message);
+
 /*
  * Filter changes by origin.
  */
@@ -221,6 +234,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeDDLMessageCB ddl_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f6c4dd75db..7b6a6331bb 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -11,6 +11,8 @@
 
 #include "access/htup_details.h"
 #include "lib/ilist.h"
+#include "nodes/nodes.h"
+#include "replication/ddlmessage.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
 #include "utils/relcache.h"
@@ -64,6 +66,7 @@ typedef enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INSERT,
 	REORDER_BUFFER_CHANGE_UPDATE,
 	REORDER_BUFFER_CHANGE_DELETE,
+	REORDER_BUFFER_CHANGE_DDL,
 	REORDER_BUFFER_CHANGE_MESSAGE,
 	REORDER_BUFFER_CHANGE_INVALIDATION,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
@@ -138,6 +141,16 @@ typedef struct ReorderBufferChange
 			char	   *message;
 		}			msg;
 
+		/* DDL */
+		struct
+		{
+			char	   *prefix;
+			Size		message_size;
+			char	   *message;
+			Oid			relid;
+			DeparsedCommandType cmdtype;
+		}			ddl;
+
 		/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
 		Snapshot	snapshot;
 
@@ -451,6 +464,16 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* DDL message callback signature */
+typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb,
+										   ReorderBufferTXN *txn,
+										   XLogRecPtr message_lsn,
+										   const char *prefix,
+										   Oid relid,
+										   DeparsedCommandType cmdtype,
+										   Size sz,
+										   const char *message);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -567,6 +590,7 @@ struct ReorderBuffer
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
+	ReorderBufferDDLMessageCB ddl;
 
 	/*
 	 * Callbacks to be called when streaming a transaction at prepare time.
@@ -665,6 +689,9 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 									  Snapshot snap, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+										 const char *prefix, Size message_size,
+										 const char *message, Oid relid, DeparsedCommandType cmdtype);
 extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
-- 
2.34.1

