From b9962108dbd00e9ca385cbae7a418772c433d267 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Wed, 21 Dec 2022 22:20:14 +0530
Subject: [PATCH v55 3/5] DDL messaging infrastructure for DDL replication.

DDL messaging infrastructure for DDL replication.
---
 src/backend/access/rmgrdesc/Makefile          |  1 +
 .../access/rmgrdesc/logicalddlmsgdesc.c       | 52 ++++++++++++
 src/backend/access/rmgrdesc/meson.build       |  1 +
 src/backend/access/transam/rmgr.c             |  1 +
 src/backend/replication/logical/Makefile      |  1 +
 src/backend/replication/logical/ddlmessage.c  | 84 +++++++++++++++++++
 src/backend/replication/logical/meson.build   |  1 +
 src/bin/pg_waldump/rmgrdesc.c                 |  1 +
 src/include/access/rmgrlist.h                 |  1 +
 src/include/replication/ddlmessage.h          | 60 +++++++++++++
 10 files changed, 203 insertions(+)
 create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c
 create mode 100644 src/backend/replication/logical/ddlmessage.c
 create mode 100644 src/include/replication/ddlmessage.h

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd86..2ff01e69bf 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -18,6 +18,7 @@ OBJS = \
 	gistdesc.o \
 	hashdesc.o \
 	heapdesc.o \
+	logicalddlmsgdesc.o \
 	logicalmsgdesc.o \
 	mxactdesc.o \
 	nbtdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
new file mode 100644
index 0000000000..05e930c90c
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
@@ -0,0 +1,52 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalddlmsgdesc.c
+ *	  rmgr descriptor routines for replication/logical/ddlmessage.c
+ *
+ * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/logicalddlmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/ddlmessage.h"
+
+void
+logicalddlmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_LOGICAL_DDL_MESSAGE)
+	{
+		xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec;
+		char	   *prefix = xlrec->message;
+		char	   *message = xlrec->message + xlrec->prefix_size;
+		char	   *sep = "";
+
+		Assert(prefix[xlrec->prefix_size] != '\0');
+
+		appendStringInfo(buf, "prefix \"%s\"; payload (%zu bytes): ",
+						 prefix, xlrec->message_size);
+		appendStringInfo(buf, "relid %u cmdtype %u", xlrec->relid, xlrec->cmdtype);
+		/* Write message payload as a series of hex bytes */
+		for (int cnt = 0; cnt < xlrec->message_size; cnt++)
+		{
+			appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]);
+			sep = " ";
+		}
+	}
+}
+
+const char *
+logicalddlmsg_identify(uint8 info)
+{
+	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE)
+		return "DDL";
+
+	return NULL;
+}
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index a7a7baecd1..30520b05e0 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -11,6 +11,7 @@ rmgr_desc_sources = files(
   'gistdesc.c',
   'hashdesc.c',
   'heapdesc.c',
+  'logicalddlmsgdesc.c',
   'logicalmsgdesc.c',
   'mxactdesc.c',
   'nbtdesc.c',
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7d67eda5f7..678e81ae01 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -27,6 +27,7 @@
 #include "fmgr.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "replication/ddlmessage.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb71..d4f29f8ffc 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
 OBJS = \
+	ddlmessage.o\
 	decode.o \
 	launcher.o \
 	logical.o \
diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c
new file mode 100644
index 0000000000..e11e56e5a3
--- /dev/null
+++ b/src/backend/replication/logical/ddlmessage.c
@@ -0,0 +1,84 @@
+/*-------------------------------------------------------------------------
+ *
+ * ddlmessage.c
+ *	  Logical DDL messages.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/ddlmessage.c
+ *
+ * NOTES
+ *
+ * Logical DDL messages allow XLOG logging of DDL command strings that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * Unlike generic logical messages, these DDL messages have only transactional
+ * mode. Note by default DDLs in PostgreSQL are transactional.
+ *
+ * These messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/namespace.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/ddlmessage.h"
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding DDL message into XLog.
+ */
+XLogRecPtr
+LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+					 const char *message, size_t size)
+{
+	xl_logical_ddl_message xlrec;
+
+	 /* Ensure we have a valid transaction id. */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	xlrec.dbId = MyDatabaseId;
+	/* Trailing zero is critical; see logicalddlmsg_desc */
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.message_size = size;
+	xlrec.relid = relid;
+	xlrec.cmdtype = cmdtype;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage);
+	XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+	XLogRegisterData(unconstify(char *, message), size);
+
+	/* Allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding ddl messages.
+ */
+void
+logicalddlmsg_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info != XLOG_LOGICAL_DDL_MESSAGE)
+		elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info);
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 1b9f072edc..07756e90ae 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -1,6 +1,7 @@
 # Copyright (c) 2022, PostgreSQL Global Development Group
 
 backend_sources += files(
+  'ddlmessage.c',
   'decode.c',
   'launcher.c',
   'logical.c',
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6b8c17bb4c..daf1730252 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -26,6 +26,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/ddlmessage.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 000bcbfdaf..eefc13a1e1 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h
new file mode 100644
index 0000000000..77df6ea11a
--- /dev/null
+++ b/src/include/replication/ddlmessage.h
@@ -0,0 +1,60 @@
+/*-------------------------------------------------------------------------
+ * ddlmessage.h
+ *	   Exports from replication/logical/ddlmessage.c
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/ddlmessage.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_DDL_MESSAGE_H
+#define PG_LOGICAL_DDL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "nodes/nodes.h"
+
+
+/*
+ * Support for keeping track of deparsed commands.
+ */
+typedef enum DeparsedCommandType
+{
+	DCT_SimpleCmd,
+	DCT_TableDropStart,
+	DCT_TableDropEnd,
+	DCT_TableAlter,
+	DCT_ObjectCreate,
+	DCT_ObjectDrop
+} DeparsedCommandType;
+
+/*
+ * Generic logical decoding DDL message wal record.
+ */
+typedef struct xl_logical_ddl_message
+{
+	Oid			dbId;			/* database Oid emitted from */
+	Size		prefix_size;	/* length of prefix including null terminator */
+	Oid			relid;			/* id of the table */
+	DeparsedCommandType cmdtype;	/* type of sql command */
+	Size		message_size;	/* size of the message */
+
+	/*
+	 * payload, including null-terminated prefix of length prefix_size
+	 */
+	char		message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_ddl_message;
+
+#define SizeOfLogicalDDLMessage	(offsetof(xl_logical_ddl_message, message))
+
+extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+									   const char *ddl_message, size_t size);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_DDL_MESSAGE	0x00
+void		logicalddlmsg_redo(XLogReaderState *record);
+void		logicalddlmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalddlmsg_identify(uint8 info);
+
+#endif
-- 
2.34.1

