public inbox for pgsql-general@postgresql.org  
help / color / mirror / Atom feed
From: houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com>
To: Masahiko Sawada <sawada.mshk@gmail.com>
To: Amit Kapila <amit.kapila16@gmail.com>
Cc: Zheng Li <zhengli10@gmail.com>
Cc: Japin Li <japinli@hotmail.com>
Cc: Alvaro Herrera <alvherre@alvh.no-ip.org>
Cc: Dilip Kumar <dilipbalaut@gmail.com>
Cc: rajesh singarapu <rajesh.rs0541@gmail.com>
Cc: PostgreSQL Hackers <pgsql-hackers@lists.postgresql.org>
Subject: RE: Support logical replication of DDLs
Date: Thu, 2 Jun 2022 12:14:00 +0000
Message-ID: <OS0PR01MB571695EDF9EAB2422FBF2C1094DE9@OS0PR01MB5716.jpnprd01.prod.outlook.com> (raw)
In-Reply-To: <CAD21AoCnGwx2F+Ph3dpoJVq0YR8ke3P59XCs439pW=BRfdzgTQ@mail.gmail.com>
References: <CAAD30ULtoGp8L_GKbV15Wnm+X5r=SE7MOnYHuqBr396m26jJSA@mail.gmail.com>
	<202203162206.7spggyktx63e@alvherre.pgsql>
	<CAAD30UKRUusq8JyyHzAv71=ncN22OE8OkOOyAWvRHW3wXNjyyA@mail.gmail.com>
	<CAAD30UKTp87+kvGZYL3M2Suxq=WEvFUG24ZRT0yT9rqdkP=uMA@mail.gmail.com>
	<MEYP282MB1669863D5C31D7F6A1D996D8B6139@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM>
	<CAAD30UKc=GiGQzE8H7+Ofo18hwMOfK4qUm_KUyw6c09q4JvA5Q@mail.gmail.com>
	<MEYP282MB16691E383140844437FB0633B6139@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM>
	<CAAD30U+ZTBXLH0wWsW9+Zu2RECGKeaQNynLs7wKA0o86w8C-fw@mail.gmail.com>
	<MEYP282MB166926E46397CBFC113B4A7EB6189@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM>
	<CAA4eK1J4AekmEKgmfp6e-zZz4M02m7w7uxvC2tjqmjF-LDSGDA@mail.gmail.com>
	<CAAD30UKvv5=k6BY+JAF1fWzrYNbGcB0DEdNi1FMokULzOwSxcQ@mail.gmail.com>
	<CAAD30U+CRgUgkAg33KzNBKwCbsgiSc5z3NYvxNzEfS0Zg2S1WA@mail.gmail.com>
	<CAD21AoAv_wsBEK8jcqjBpatspiP=5E+qLokw9zCESBSvCAiRMg@mail.gmail.com>
	<CAAD30UK6T8bfW1JMaSSRDSynB6W05HjNrmvSp+tvXp-jdu9xFQ@mail.gmail.com>
	<CAA4eK1JQhz4y-1rYxwFxHYEAN-1JKeO0iT+Nip0N7jJUj_g7RA@mail.gmail.com>
	<CAD21AoCnGwx2F+Ph3dpoJVq0YR8ke3P59XCs439pW=BRfdzgTQ@mail.gmail.com>

On Monday, May 30, 2022 2:52 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
> 
> On Fri, May 27, 2022 at 11:03 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> > On Fri, May 27, 2022 at 3:49 AM Zheng Li <zhengli10@gmail.com> wrote:
> > >
> > > Hi Masahiko,
> > >
> > > > Thank you for updating the patches!
> > > >
> > > > I've not looked at these patches in-depth yet but with this approach,
> > > > what do you think we can handle the DDL syntax differences between
> > > > major versions? DDL syntax or behavior could be changed by future
> > > > changes and I think we need to somehow deal with the differences. For
> > >
> > > > example, if the user uses logical replication for major version
> > > > upgrade, the publisher is older than the subscriber. We might have to
> > > > rewrite the DDL before applying to the subscriber because the DDL
> > > > executed on the publisher no longer work on a new PostgreSQL version
> > >
> > > I don't think we will allow this kind of situation to happen in the
> > > first place for
> > > backward compatibility. If a DDL no longer works on a new version of
> > > PostgreSQL, the user will have to change the application code as well.
> > > So even if it happens for
> > > whatever reason, we could either
> > > 1. fail the apply worker and let the user fix such DDL because they'll
> > > have to fix the application code anyway when this happens.
> > > 2. add guard rail logic in the apply worker to automatically fix such
> > > DDL if possible, knowing the version of the source and target. Similar
> > > logic must have been implemented for pg_dump/restore/upgrade.
> > >
> > > > or we might have to add some options to the DDL before the application
> > > > in order to keep the same behavior. This seems to require a different
> > > > solution from what the patch does for the problem you mentioned such
> > >
> > > > as "DDL involving multiple tables where only some tables are
> > > > replicated”.
> > >
> > > First of all, this case can only happen when the customer chooses to
> > > only replicate a subset of the tables in a database in which case
> > > table level DDL replication is chosen instead of database level DDL
> > > replication (where all tables
> > > and DDLs are replicated). I think the solution would be:
> > > 1. make best effort to detect such DDLs on the publisher and avoid
> > > logging of such DDLs in table level DDL replication.
> > > 2. apply worker will fail to replay such command due to missing
> > > objects if such DDLs didn't get filtered on the publisher for some
> > > reason. This should be rare and I think it's OK even if it happens,
> > > we'll find out
> > > why and fix it.
> > >
> >
> > FWIW, both these cases could be handled with the deparsing approach,
> > and the handling related to the drop of multiple tables where only a
> > few are published is already done in the last POC patch shared by Ajin
> > [1].
> >
> 
> Right. So I'm inclined to think that deparsing approach is better from
> this point as well as the point mentioned by Álvaro before[1].

I agree. One more point about deparsing approach is that it can also
help to replicate CREATE TABLE AS/SELECT INTO in a better way.

The main idea of replicating the CREATE TABLE AS is that we deprase the CREATE
TABLE AS into a simple CREATE TABLE(without subquery) command and WAL log it
after creating the table and before writing data into the table and replicate
the incoming writes later as normal INSERTs. In this apporach, we don't execute
the subquery on subscriber so that don't need to make sure all the objects
referenced in the subquery also exists in subscriber. And This approach works
for all kind of commands(e.g. CRAETE TABLE AS [SELECT][EXECUTE][VALUES])

One problem of this approach is that we cannot use the current trigger to
deparse or WAL log the CREATE TABLE. Because none of the even trigger is fired
after creating the table and before inserting the data. To solve this, one idea
is that we could directly add some code at the end of create_ctas_internal() to
deparse and WAL log it. Moreover, we could even introduce a new type of event
trigger(table_create) which would be fired at the expected timing so that we
can use the trigger function to deparse and WAL log. I am not sure which way is
better. I temporarily use the second idea which introduce a new type event
trigger in the 0003 POC patch.

In the POC patch, we deparse the command in the table_create event trigger and
WAL log the deparsed json string. The walsender will send the string to
subscriber. And incoming INSERTs will also be replicated.

Best regards,
Hou zj





Attachments:

  [application/octet-stream] v6-0001-Functions-to-deparse-DDL-commands.patch (85.8K, 2-v6-0001-Functions-to-deparse-DDL-commands.patch)
  download | inline diff:
From 275544b1a7f7e5d68f22c66ffd21b0c3e2dccc07 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Fri, 27 May 2022 13:47:05 +0800
Subject: [PATCH] Functions to deparse DDL commands.

This patch provides JSON blobs representing DDL commands, which can
later be re-processed into plain strings by well-defined sprintf-like
expansion. These JSON objects are intended to allow for machine-editing of
the commands, by replacing certain nodes within the objects.

Much of the information in the output blob actually comes from system
catalogs, not from the command parse node, as it is impossible to reliably
construct a fully-specified command (i.e. one not dependent on search_path
etc) looking only at the parse node.

This provides base for logical replication of DDL statements. Currently,
this provides support for CREATE TABLE/ALTER TABLE/DROP TABLE specific
functions. Note that some recently introduced DDLs(e.g. DDLs related to
PARTITIONED TABLE) are unsupported. We can extend it as we need more
functionality for DDL replication.
---
 src/backend/commands/Makefile      |    2 +
 src/backend/commands/ddl_deparse.c | 2180 ++++++++++++++++++++++++++++++++++++
 src/backend/commands/ddl_json.c    |  749 +++++++++++++
 src/backend/utils/adt/ruleutils.c  |    9 +
 src/include/catalog/pg_proc.dat    |    7 +-
 src/include/tcop/ddl_deparse.h     |   10 +
 src/include/utils/ruleutils.h      |    1 +
 7 files changed, 2957 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/commands/ddl_deparse.c
 create mode 100644 src/backend/commands/ddl_json.c
 create mode 100644 src/include/tcop/ddl_deparse.h

diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348..171dfb2 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -29,6 +29,8 @@ OBJS = \
 	copyto.o \
 	createas.o \
 	dbcommands.o \
+	ddl_deparse.o \
+	ddl_json.o \
 	define.o \
 	discard.o \
 	dropcmds.o \
diff --git a/src/backend/commands/ddl_deparse.c b/src/backend/commands/ddl_deparse.c
new file mode 100644
index 0000000..60c077f
--- /dev/null
+++ b/src/backend/commands/ddl_deparse.c
@@ -0,0 +1,2180 @@
+/*-------------------------------------------------------------------------
+ *
+ * ddl_deparse.c
+ *	  Functions to convert utility commands to machine-parseable
+ *	  representation
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * NOTES
+ *
+ * This is intended to provide JSON blobs representing DDL commands, which can
+ * later be re-processed into plain strings by well-defined sprintf-like
+ * expansion.  These JSON objects are intended to allow for machine-editing of
+ * the commands, by replacing certain nodes within the objects.
+ *
+ * Much of the information in the output blob actually comes from system
+ * catalogs, not from the command parse node, as it is impossible to reliably
+ * construct a fully-specified command (i.e. one not dependent on search_path
+ * etc) looking only at the parse node.
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/ddl_deparse.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "tcop/ddl_deparse.h"
+
+#include "access/amapi.h"
+#include "access/table.h"
+#include "access/relation.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_attribute.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_collation.h"
+#include "catalog/pg_constraint.h"
+#include "catalog/pg_inherits.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+#include "commands/defrem.h"
+#include "lib/ilist.h"
+#include "rewrite/rewriteHandler.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/jsonb.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/ruleutils.h"
+#include "utils/syscache.h"
+
+
+/*
+ * Before they are turned into JSONB representation, each command is
+ * represented as an object tree, using the structs below.
+ */
+typedef enum
+{
+	ObjTypeNull,
+	ObjTypeBool,
+	ObjTypeString,
+	ObjTypeArray,
+	ObjTypeInteger,
+	ObjTypeFloat,
+	ObjTypeObject
+} ObjType;
+
+typedef struct ObjTree
+{
+	slist_head	params;
+	int			numParams;
+} ObjTree;
+
+typedef struct ObjElem
+{
+	char	   *name;
+	ObjType		objtype;
+
+	union
+	{
+		bool		boolean;
+		char	   *string;
+		int64		integer;
+		float8		flt;
+		ObjTree	   *object;
+		List	   *array;
+	} value;
+	slist_node	node;
+} ObjElem;
+
+static ObjElem *new_null_object(void);
+static ObjElem *new_bool_object(bool value);
+static ObjElem *new_string_object(char *value);
+static ObjElem *new_object_object(ObjTree *value);
+static ObjElem *new_array_object(List *array);
+static ObjElem *new_integer_object(int64 value);
+static ObjElem *new_float_object(float8 value);
+static void append_null_object(ObjTree *tree, char *name);
+static void append_bool_object(ObjTree *tree, char *name, bool value);
+static void append_string_object(ObjTree *tree, char *name, char *value);
+static void append_object_object(ObjTree *tree, char *name, ObjTree *value);
+static void append_array_object(ObjTree *tree, char *name, List *array);
+static inline void append_premade_object(ObjTree *tree, ObjElem *elem);
+static JsonbValue *objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state);
+static void format_type_detailed(Oid type_oid, int32 typemod,
+					 Oid *nspid, char **typname, char **typemodstr,
+					 bool *typarray);
+static char *printTypmod(const char *typname, int32 typmod, Oid typmodout);
+
+static char *RelationGetColumnDefault(Relation rel, AttrNumber attno, List *dpcontext);
+
+/*
+ * Similar to format_type_internal, except we return each bit of information
+ * separately:
+ *
+ * - nspid is the schema OID.  For certain SQL-standard types which have weird
+ *   typmod rules, we return InvalidOid; caller is expected to not schema-
+ *   qualify the name nor add quotes to the type name in this case.
+ *
+ * - typename is set to the type name, without quotes
+ *
+ * - typmod is set to the typemod, if any, as a string with parens
+ *
+ * - typarray indicates whether []s must be added
+ *
+ * We don't try to decode type names to their standard-mandated names, except
+ * in the cases of types with unusual typmod rules.
+ */
+static void
+format_type_detailed(Oid type_oid, int32 typemod,
+					 Oid *nspid, char **typname, char **typemodstr,
+					 bool *typarray)
+{
+	HeapTuple	tuple;
+	Form_pg_type typeform;
+	Oid			array_base_type;
+
+	tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid));
+	if (!HeapTupleIsValid(tuple))
+		elog(ERROR, "cache lookup failed for type %u", type_oid);
+
+	typeform = (Form_pg_type) GETSTRUCT(tuple);
+
+	/*
+	 * Special-case crock for types with strange typmod rules.
+	 */
+	if (type_oid == INTERVALOID ||
+		type_oid == TIMESTAMPOID ||
+		type_oid == TIMESTAMPTZOID ||
+		type_oid == TIMEOID ||
+		type_oid == TIMETZOID)
+	{
+		*typarray = false;
+
+peculiar_typmod:
+		switch (type_oid)
+		{
+			case INTERVALOID:
+				*typname = pstrdup("INTERVAL");
+				break;
+			case TIMESTAMPTZOID:
+				if (typemod < 0)
+					*typname = pstrdup("TIMESTAMP WITH TIME ZONE");
+				else
+					*typname = pstrdup("TIMESTAMP");
+				break;
+				/* otherwise, WITH TZ is added by typmod, so fall through */
+			case TIMESTAMPOID:
+				*typname = pstrdup("TIMESTAMP");
+				break;
+			case TIMETZOID:
+				if (typemod < 0)
+					*typname = pstrdup("TIME WITH TIME ZONE");
+				else
+					*typname = pstrdup("TIME");
+				break;
+				/* otherwise, WITH TZ is added by typmode, so fall through */
+			case TIMEOID:
+				*typname = pstrdup("TIME");
+				break;
+		}
+		*nspid = InvalidOid;
+
+		if (typemod >= 0)
+			*typemodstr = printTypmod("", typemod, typeform->typmodout);
+		else
+			*typemodstr = pstrdup("");
+
+		ReleaseSysCache(tuple);
+		return;
+	}
+
+	/*
+	 * Check if it's a regular (variable length) array type.  As above,
+	 * fixed-length array types such as "name" shouldn't get deconstructed.
+	 */
+	array_base_type = typeform->typelem;
+
+	if (array_base_type != InvalidOid &&
+		typeform->typstorage != 'p')
+	{
+		/* Switch our attention to the array element type */
+		ReleaseSysCache(tuple);
+		tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(array_base_type));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for type %u", type_oid);
+
+		typeform = (Form_pg_type) GETSTRUCT(tuple);
+		type_oid = array_base_type;
+		*typarray = true;
+
+		/*
+		 * If it's an array of one of the types with special typmod rules,
+		 * have the element type be processed as above, but now with typarray
+		 * set to true.
+		 */
+		if (type_oid == INTERVALOID ||
+			type_oid == TIMESTAMPTZOID ||
+			type_oid == TIMESTAMPOID ||
+			type_oid == TIMETZOID ||
+			type_oid == TIMEOID)
+			goto peculiar_typmod;
+	}
+	else
+		*typarray = false;
+
+	*nspid = typeform->typnamespace;
+	*typname = pstrdup(NameStr(typeform->typname));
+
+	if (typemod >= 0)
+		*typemodstr = printTypmod("", typemod, typeform->typmodout);
+	else
+		*typemodstr = pstrdup("");
+
+	ReleaseSysCache(tuple);
+}
+
+/*
+ * Add typmod decoration to the basic type name
+ */
+static char *
+printTypmod(const char *typname, int32 typmod, Oid typmodout)
+{
+	char	   *res;
+
+	/* Shouldn't be called if typmod is -1 */
+	Assert(typmod >= 0);
+
+	if (typmodout == InvalidOid)
+	{
+		/* Default behavior: just print the integer typmod with parens */
+		res = psprintf("%s(%d)", typname, (int) typmod);
+	}
+	else
+	{
+		/* Use the type-specific typmodout procedure */
+		char	   *tmstr;
+
+		tmstr = DatumGetCString(OidFunctionCall1(typmodout,
+												 Int32GetDatum(typmod)));
+		res = psprintf("%s%s", typname, tmstr);
+	}
+
+	return res;
+}
+
+/*
+ * Obtain the deparsed default value for the given column of the given table.
+ *
+ * Caller must have set a correct deparse context.
+ */
+static char *
+RelationGetColumnDefault(Relation rel, AttrNumber attno, List *dpcontext)
+{
+	Node *defval;
+	char *defstr;
+
+	defval = build_column_default(rel, attno);
+	defstr = deparse_expression(defval, dpcontext, false, false);
+
+	return defstr;
+}
+
+/*
+ * Allocate a new object tree to store parameter values.
+ */
+static ObjTree *
+new_objtree(void)
+{
+	ObjTree    *params;
+
+	params = palloc(sizeof(ObjTree));
+	params->numParams = 0;
+	slist_init(&params->params);
+
+	return params;
+}
+
+/*
+ * Allocate a new object tree to store parameter values -- varargs version.
+ *
+ * The "fmt" argument is used to append as a "fmt" element in the output blob.
+ * numobjs indicates the number of extra elements to append; for each one, a
+ * name (string), type (from the ObjType enum) and value must be supplied.  The
+ * value must match the type given; for instance, ObjTypeInteger requires an
+ * int64, ObjTypeString requires a char *, ObjTypeArray requires a list (of
+ * ObjElem), ObjTypeObject requires an ObjTree, and so on.  Each element type *
+ * must match the conversion specifier given in the format string, as described
+ * in ddl_deparse_expand_command, q.v.
+ *
+ * Note we don't have the luxury of sprintf-like compiler warnings for
+ * malformed argument lists.
+ */
+static ObjTree *
+new_objtree_VA(char *fmt, int numobjs,...)
+{
+	ObjTree    *tree;
+	va_list		args;
+	int			i;
+
+	/* Set up the toplevel object and its "fmt" */
+	tree = new_objtree();
+	append_string_object(tree, "fmt", fmt);
+
+	/* And process the given varargs */
+	va_start(args, numobjs);
+	for (i = 0; i < numobjs; i++)
+	{
+		char	   *name;
+		ObjType		type;
+		ObjElem	   *elem;
+
+		name = va_arg(args, char *);
+		type = va_arg(args, ObjType);
+
+		/*
+		 * For all other param types there must be a value in the varargs.
+		 * Fetch it and add the fully formed subobject into the main object.
+		 */
+		switch (type)
+		{
+			case ObjTypeBool:
+				elem = new_bool_object(va_arg(args, int));
+				break;
+			case ObjTypeString:
+				elem = new_string_object(va_arg(args, char *));
+				break;
+			case ObjTypeObject:
+				elem = new_object_object(va_arg(args, ObjTree *));
+				break;
+			case ObjTypeArray:
+				elem = new_array_object(va_arg(args, List *));
+				break;
+			case ObjTypeInteger:
+				elem = new_integer_object(va_arg(args, int64));
+				break;
+			case ObjTypeFloat:
+				elem = new_float_object(va_arg(args, double));
+				break;
+			case ObjTypeNull:
+				/* Null params don't have a value (obviously) */
+				elem = new_null_object();
+				break;
+			default:
+				elog(ERROR, "invalid ObjTree element type %d", type);
+		}
+
+		elem->name = name;
+		append_premade_object(tree, elem);
+	}
+
+	va_end(args);
+	return tree;
+}
+
+/* Allocate a new parameter with a NULL value */
+static ObjElem *
+new_null_object(void)
+{
+	ObjElem    *param;
+
+	param = palloc0(sizeof(ObjElem));
+
+	param->name = NULL;
+	param->objtype = ObjTypeNull;
+
+	return param;
+}
+
+/* Append a NULL object to a tree */
+static void
+append_null_object(ObjTree *tree, char *name)
+{
+	ObjElem    *param;
+
+	param = new_null_object();
+	param->name = name;
+	append_premade_object(tree, param);
+}
+
+/* Allocate a new boolean parameter */
+static ObjElem *
+new_bool_object(bool value)
+{
+	ObjElem    *param;
+
+	param = palloc0(sizeof(ObjElem));
+	param->name = NULL;
+	param->objtype = ObjTypeBool;
+	param->value.boolean = value;
+
+	return param;
+}
+
+/* Append a boolean parameter to a tree */
+static void
+append_bool_object(ObjTree *tree, char *name, bool value)
+{
+	ObjElem    *param;
+
+	param = new_bool_object(value);
+	param->name = name;
+	append_premade_object(tree, param);
+}
+
+/* Allocate a new string object */
+static ObjElem *
+new_string_object(char *value)
+{
+	ObjElem    *param;
+
+	Assert(value);
+
+	param = palloc0(sizeof(ObjElem));
+	param->name = NULL;
+	param->objtype = ObjTypeString;
+	param->value.string = value;
+
+	return param;
+}
+
+/*
+ * Append a string parameter to a tree.
+ */
+static void
+append_string_object(ObjTree *tree, char *name, char *value)
+{
+	ObjElem	   *param;
+
+	Assert(name);
+	param = new_string_object(value);
+	param->name = name;
+	append_premade_object(tree, param);
+}
+
+static ObjElem *
+new_integer_object(int64 value)
+{
+	ObjElem	   *param;
+
+	param = palloc0(sizeof(ObjElem));
+	param->name = NULL;
+	param->objtype = ObjTypeInteger;
+	param->value.integer = value;
+
+	return param;
+}
+
+static ObjElem *
+new_float_object(float8 value)
+{
+	ObjElem	   *param;
+
+	param = palloc0(sizeof(ObjElem));
+	param->name = NULL;
+	param->objtype = ObjTypeFloat;
+	param->value.flt = value;
+
+	return param;
+}
+
+/* Allocate a new object parameter */
+static ObjElem *
+new_object_object(ObjTree *value)
+{
+	ObjElem    *param;
+
+	param = palloc0(sizeof(ObjElem));
+	param->name = NULL;
+	param->objtype = ObjTypeObject;
+	param->value.object = value;
+
+	return param;
+}
+
+/* Append an object parameter to a tree */
+static void
+append_object_object(ObjTree *tree, char *name, ObjTree *value)
+{
+	ObjElem    *param;
+
+	Assert(name);
+	param = new_object_object(value);
+	param->name = name;
+	append_premade_object(tree, param);
+}
+
+/* Allocate a new array parameter */
+static ObjElem *
+new_array_object(List *array)
+{
+	ObjElem    *param;
+
+	param = palloc0(sizeof(ObjElem));
+	param->name = NULL;
+	param->objtype = ObjTypeArray;
+	param->value.array = array;
+
+	return param;
+}
+
+/* Append an array parameter to a tree */
+static void
+append_array_object(ObjTree *tree, char *name, List *array)
+{
+	ObjElem    *param;
+
+	param = new_array_object(array);
+	param->name = name;
+	append_premade_object(tree, param);
+}
+
+/* Append a preallocated parameter to a tree */
+static inline void
+append_premade_object(ObjTree *tree, ObjElem *elem)
+{
+	slist_push_head(&tree->params, &elem->node);
+	tree->numParams++;
+}
+
+/*
+ * Helper for objtree_to_jsonb: process an individual element from an object or
+ * an array into the output parse state.
+ */
+static void
+objtree_to_jsonb_element(JsonbParseState *state, ObjElem *object,
+						 JsonbIteratorToken elem_token)
+{
+	ListCell   *cell;
+	JsonbValue	val;
+
+	switch (object->objtype)
+	{
+		case ObjTypeNull:
+			val.type = jbvNull;
+			pushJsonbValue(&state, elem_token, &val);
+			break;
+
+		case ObjTypeString:
+			val.type = jbvString;
+			val.val.string.len = strlen(object->value.string);
+			val.val.string.val = object->value.string;
+			pushJsonbValue(&state, elem_token, &val);
+			break;
+
+		case ObjTypeInteger:
+			val.type = jbvNumeric;
+			val.val.numeric = (Numeric)
+				DatumGetNumeric(DirectFunctionCall1(int8_numeric,
+													object->value.integer));
+			pushJsonbValue(&state, elem_token, &val);
+			break;
+
+		case ObjTypeFloat:
+			val.type = jbvNumeric;
+			val.val.numeric = (Numeric)
+				DatumGetNumeric(DirectFunctionCall1(float8_numeric,
+													object->value.integer));
+			pushJsonbValue(&state, elem_token, &val);
+			break;
+
+		case ObjTypeBool:
+			val.type = jbvBool;
+			val.val.boolean = object->value.boolean;
+			pushJsonbValue(&state, elem_token, &val);
+			break;
+
+		case ObjTypeObject:
+			/* recursively add the object into the existing parse state */
+			objtree_to_jsonb_rec(object->value.object, state);
+			break;
+
+		case ObjTypeArray:
+			pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
+			foreach(cell, object->value.array)
+			{
+				ObjElem   *elem = lfirst(cell);
+
+				objtree_to_jsonb_element(state, elem, WJB_ELEM);
+			}
+			pushJsonbValue(&state, WJB_END_ARRAY, NULL);
+			break;
+
+		default:
+			elog(ERROR, "unrecognized object type %d", object->objtype);
+			break;
+	}
+}
+
+/*
+ * Recursive helper for objtree_to_jsonb
+ */
+static JsonbValue *
+objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state)
+{
+	slist_iter	iter;
+
+	pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
+
+	slist_foreach(iter, &tree->params)
+	{
+		ObjElem    *object = slist_container(ObjElem, node, iter.cur);
+		JsonbValue	key;
+
+		/* Push the key first */
+		key.type = jbvString;
+		key.val.string.len = strlen(object->name);
+		key.val.string.val = object->name;
+		pushJsonbValue(&state, WJB_KEY, &key);
+
+		/* Then process the value according to its type */
+		objtree_to_jsonb_element(state, object, WJB_VALUE);
+	}
+
+	return pushJsonbValue(&state, WJB_END_OBJECT, NULL);
+}
+
+/*
+ * Create a JSONB representation from an ObjTree.
+ */
+static Jsonb *
+objtree_to_jsonb(ObjTree *tree)
+{
+	JsonbValue *value;
+
+	value = objtree_to_jsonb_rec(tree, NULL);
+	return JsonbValueToJsonb(value);
+}
+
+/*
+ * A helper routine to setup %{}T elements.
+ */
+static ObjTree *
+new_objtree_for_type(Oid typeId, int32 typmod)
+{
+	ObjTree    *typeParam;
+	Oid			typnspid;
+	char	   *typnsp;
+	char	   *typename = NULL;
+	char	   *typmodstr;
+	bool		typarray;
+
+	format_type_detailed(typeId, typmod,
+						 &typnspid, &typename, &typmodstr, &typarray);
+
+	if (!OidIsValid(typnspid))
+		typnsp = pstrdup("");
+	else if (isAnyTempNamespace(typnspid))
+		typnsp = pstrdup("pg_temp");
+	else
+		typnsp = get_namespace_name(typnspid);
+
+	/* We don't use new_objtree_VA here because types don't have a "fmt" */
+	typeParam = new_objtree();
+	append_string_object(typeParam, "schemaname", typnsp);
+	append_string_object(typeParam, "typename", typename);
+	append_string_object(typeParam, "typmod", typmodstr);
+	append_bool_object(typeParam, "typarray", typarray);
+
+	return typeParam;
+}
+
+/*
+ * A helper routine to setup %{}D and %{}O elements
+ *
+ * Elements "schemaname" and "objname" are set.  If the namespace OID
+ * corresponds to a temp schema, that's set to "pg_temp".
+ *
+ * The difference between those two element types is whether the objname will
+ * be quoted as an identifier or not, which is not something that this routine
+ * concerns itself with; that will be up to the expand function.
+ */
+static ObjTree *
+new_objtree_for_qualname(Oid nspid, char *name)
+{
+	ObjTree    *qualified;
+	char	   *namespace;
+
+	/*
+	 * We don't use new_objtree_VA here because these names don't have a "fmt"
+	 */
+	qualified = new_objtree();
+	if (isAnyTempNamespace(nspid))
+		namespace = pstrdup("pg_temp");
+	else
+		namespace = get_namespace_name(nspid);
+	append_string_object(qualified, "schemaname", namespace);
+	append_string_object(qualified, "objname", pstrdup(name));
+
+	return qualified;
+}
+
+/*
+ * A helper routine to setup %{}D and %{}O elements, with the object specified
+ * by classId/objId
+ *
+ * Elements "schemaname" and "objname" are set.  If the object is a temporary
+ * object, the schema name is set to "pg_temp".
+ */
+static ObjTree *
+new_objtree_for_qualname_id(Oid classId, Oid objectId)
+{
+	ObjTree    *qualified;
+	Relation	catalog;
+	HeapTuple	catobj;
+	Datum		objnsp;
+	Datum		objname;
+	AttrNumber	Anum_name;
+	AttrNumber	Anum_namespace;
+	AttrNumber	Anum_oid = get_object_attnum_oid(classId);
+	bool		isnull;
+
+	catalog = table_open(classId, AccessShareLock);
+
+	catobj = get_catalog_object_by_oid(catalog, Anum_oid, objectId);
+	if (!catobj)
+		elog(ERROR, "cache lookup failed for object %u of catalog \"%s\"",
+			 objectId, RelationGetRelationName(catalog));
+	Anum_name = get_object_attnum_name(classId);
+	Anum_namespace = get_object_attnum_namespace(classId);
+
+	objnsp = heap_getattr(catobj, Anum_namespace, RelationGetDescr(catalog),
+						  &isnull);
+	if (isnull)
+		elog(ERROR, "unexpected NULL namespace");
+	objname = heap_getattr(catobj, Anum_name, RelationGetDescr(catalog),
+						   &isnull);
+	if (isnull)
+		elog(ERROR, "unexpected NULL name");
+
+	qualified = new_objtree_for_qualname(DatumGetObjectId(objnsp),
+										 NameStr(*DatumGetName(objname)));
+	table_close(catalog, AccessShareLock);
+
+	return qualified;
+}
+
+/*
+ * Return the string representation of the given RELPERSISTENCE value
+ */
+static char *
+get_persistence_str(char persistence)
+{
+	switch (persistence)
+	{
+		case RELPERSISTENCE_TEMP:
+			return "TEMPORARY";
+		case RELPERSISTENCE_UNLOGGED:
+			return "UNLOGGED";
+		case RELPERSISTENCE_PERMANENT:
+			return "";
+		default:
+			elog(ERROR, "unexpected persistence marking %c", persistence);
+			return "";		/* make compiler happy */
+	}
+}
+
+/*
+ * deparse_ColumnDef
+ *		Subroutine for CREATE TABLE deparsing
+ *
+ * Deparse a ColumnDef node within a regular (non typed) table creation.
+ *
+ * NOT NULL constraints in the column definition are emitted directly in the
+ * column definition by this routine; other constraints must be emitted
+ * elsewhere (the info in the parse node is incomplete anyway.)
+ */
+static ObjTree *
+deparse_ColumnDef(Relation relation, List *dpcontext, bool composite,
+				  ColumnDef *coldef, bool is_alter)
+{
+	ObjTree    *column;
+	ObjTree    *tmp;
+	Oid			relid = RelationGetRelid(relation);
+	HeapTuple	attrTup;
+	Form_pg_attribute attrForm;
+	Oid			typid;
+	int32		typmod;
+	Oid			typcollation;
+	bool		saw_notnull;
+	ListCell   *cell;
+
+	/*
+	 * Inherited columns without local definitions must not be emitted. XXX --
+	 * maybe it is useful to have them with "present = false" or some such?
+	 */
+	if (!coldef->is_local)
+		return NULL;
+
+	attrTup = SearchSysCacheAttName(relid, coldef->colname);
+	if (!HeapTupleIsValid(attrTup))
+		elog(ERROR, "could not find cache entry for column \"%s\" of relation %u",
+			 coldef->colname, relid);
+	attrForm = (Form_pg_attribute) GETSTRUCT(attrTup);
+
+	get_atttypetypmodcoll(relid, attrForm->attnum,
+						  &typid, &typmod, &typcollation);
+
+	/* Composite types use a slightly simpler format string */
+	if (composite)
+		column = new_objtree_VA("%{name}I %{coltype}T %{collation}s",
+								3,
+								"type", ObjTypeString, "column",
+								"name", ObjTypeString, coldef->colname,
+								"coltype", ObjTypeObject,
+								new_objtree_for_type(typid, typmod));
+	else
+		column = new_objtree_VA("%{name}I %{coltype}T %{default}s %{not_null}s %{collation}s",
+								3,
+								"type", ObjTypeString, "column",
+								"name", ObjTypeString, coldef->colname,
+								"coltype", ObjTypeObject,
+								new_objtree_for_type(typid, typmod));
+
+	tmp = new_objtree_VA("COLLATE %{name}D", 0);
+	if (OidIsValid(typcollation))
+	{
+		ObjTree *collname;
+
+		collname = new_objtree_for_qualname_id(CollationRelationId,
+											   typcollation);
+		append_object_object(tmp, "name", collname);
+	}
+	else
+		append_bool_object(tmp, "present", false);
+	append_object_object(column, "collation", tmp);
+
+	if (!composite)
+	{
+		/*
+		 * Emit a NOT NULL declaration if necessary.  Note that we cannot trust
+		 * pg_attribute.attnotnull here, because that bit is also set when
+		 * primary keys are specified; and we must not emit a NOT NULL
+		 * constraint in that case, unless explicitely specified.  Therefore,
+		 * we scan the list of constraints attached to this column to determine
+		 * whether we need to emit anything.
+		 * (Fortunately, NOT NULL constraints cannot be table constraints.)
+		 *
+		 * In the ALTER TABLE cases, we also add a NOT NULL if the colDef is
+		 * marked is_not_null.
+		 */
+		saw_notnull = false;
+		foreach(cell, coldef->constraints)
+		{
+			Constraint *constr = (Constraint *) lfirst(cell);
+
+			if (constr->contype == CONSTR_NOTNULL)
+				saw_notnull = true;
+		}
+		if (is_alter && coldef->is_not_null)
+			saw_notnull = true;
+
+		if (saw_notnull)
+			append_string_object(column, "not_null", "NOT NULL");
+		else
+			append_string_object(column, "not_null", "");
+
+		tmp = new_objtree_VA("DEFAULT %{default}s", 0);
+		if (attrForm->atthasdef)
+		{
+			char *defstr;
+
+			defstr = RelationGetColumnDefault(relation, attrForm->attnum,
+											  dpcontext);
+
+			append_string_object(tmp, "default", defstr);
+		}
+		else
+			append_bool_object(tmp, "present", false);
+		append_object_object(column, "default", tmp);
+	}
+
+	ReleaseSysCache(attrTup);
+
+	return column;
+}
+
+/*
+ * deparse_ColumnDef_Typed
+ *		Subroutine for CREATE TABLE OF deparsing
+ *
+ * Deparse a ColumnDef node within a typed table creation.	This is simpler
+ * than the regular case, because we don't have to emit the type declaration,
+ * collation, or default.  Here we only return something if the column is being
+ * declared NOT NULL.
+ *
+ * As in deparse_ColumnDef, any other constraint is processed elsewhere.
+ *
+ * FIXME --- actually, what about default values?
+ */
+static ObjTree *
+deparse_ColumnDef_typed(Relation relation, List *dpcontext, ColumnDef *coldef)
+{
+	ObjTree    *column = NULL;
+	Oid			relid = RelationGetRelid(relation);
+	HeapTuple	attrTup;
+	Form_pg_attribute attrForm;
+	Oid			typid;
+	int32		typmod;
+	Oid			typcollation;
+	bool		saw_notnull;
+	ListCell   *cell;
+
+	attrTup = SearchSysCacheAttName(relid, coldef->colname);
+	if (!HeapTupleIsValid(attrTup))
+		elog(ERROR, "could not find cache entry for column \"%s\" of relation %u",
+			 coldef->colname, relid);
+	attrForm = (Form_pg_attribute) GETSTRUCT(attrTup);
+
+	get_atttypetypmodcoll(relid, attrForm->attnum,
+						  &typid, &typmod, &typcollation);
+
+	/*
+	 * Search for a NOT NULL declaration.  As in deparse_ColumnDef, we rely on
+	 * finding a constraint on the column rather than coldef->is_not_null.
+	 * (This routine is never used for ALTER cases.)
+	 */
+	saw_notnull = false;
+	foreach(cell, coldef->constraints)
+	{
+		Constraint *constr = (Constraint *) lfirst(cell);
+
+		if (constr->contype == CONSTR_NOTNULL)
+		{
+			saw_notnull = true;
+			break;
+		}
+	}
+
+	if (saw_notnull)
+		column = new_objtree_VA("%{name}I WITH OPTIONS NOT NULL", 2,
+								"type", ObjTypeString, "column_notnull",
+								"name", ObjTypeString, coldef->colname);
+
+	ReleaseSysCache(attrTup);
+
+	return column;
+}
+
+/*
+ * deparseTableElements
+ *		Subroutine for CREATE TABLE deparsing
+ *
+ * Deal with all the table elements (columns and constraints).
+ *
+ * Note we ignore constraints in the parse node here; they are extracted from
+ * system catalogs instead.
+ */
+static List *
+deparseTableElements(Relation relation, List *tableElements, List *dpcontext,
+					 bool typed, bool composite)
+{
+	List	   *elements = NIL;
+	ListCell   *lc;
+
+	foreach(lc, tableElements)
+	{
+		Node	   *elt = (Node *) lfirst(lc);
+
+		switch (nodeTag(elt))
+		{
+			case T_ColumnDef:
+				{
+					ObjTree	   *tree;
+
+					tree = typed ?
+						deparse_ColumnDef_typed(relation, dpcontext,
+												(ColumnDef *) elt) :
+						deparse_ColumnDef(relation, dpcontext,
+										  composite, (ColumnDef *) elt,
+										  false);
+					if (tree != NULL)
+					{
+						ObjElem    *column;
+
+						column = new_object_object(tree);
+						elements = lappend(elements, column);
+					}
+				}
+				break;
+			case T_Constraint:
+				break;
+			default:
+				elog(ERROR, "invalid node type %d", nodeTag(elt));
+		}
+	}
+
+	return elements;
+}
+
+/*
+ * obtainConstraints
+ *		Subroutine for CREATE TABLE/CREATE DOMAIN deparsing
+ *
+ * Given a table OID or domain OID, obtain its constraints and append them to
+ * the given elements list.  The updated list is returned.
+ *
+ * This works for typed tables, regular tables, and domains.
+ *
+ * Note that CONSTRAINT_FOREIGN constraints are always ignored.
+ */
+static List *
+obtainConstraints(List *elements, Oid relationId, Oid domainId)
+{
+	Relation	conRel;
+	ScanKeyData key;
+	SysScanDesc scan;
+	HeapTuple	tuple;
+	ObjTree    *tmp;
+
+	/* only one may be valid */
+	Assert(OidIsValid(relationId) ^ OidIsValid(domainId));
+
+	/*
+	 * scan pg_constraint to fetch all constraints linked to the given
+	 * relation.
+	 */
+	conRel = table_open(ConstraintRelationId, AccessShareLock);
+	if (OidIsValid(relationId))
+	{
+		ScanKeyInit(&key,
+					Anum_pg_constraint_conrelid,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(relationId));
+		scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId,
+								  true, NULL, 1, &key);
+	}
+	else
+	{
+		Assert(OidIsValid(domainId));
+		ScanKeyInit(&key,
+					Anum_pg_constraint_contypid,
+					BTEqualStrategyNumber, F_OIDEQ,
+					ObjectIdGetDatum(domainId));
+		scan = systable_beginscan(conRel, ConstraintTypidIndexId,
+								  true, NULL, 1, &key);
+	}
+
+	/*
+	 * For each constraint, add a node to the list of table elements.  In
+	 * these nodes we include not only the printable information ("fmt"), but
+	 * also separate attributes to indicate the type of constraint, for
+	 * automatic processing.
+	 */
+	while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+	{
+		Form_pg_constraint constrForm;
+		char	   *contype;
+
+		constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+		switch (constrForm->contype)
+		{
+			case CONSTRAINT_CHECK:
+				contype = "check";
+				break;
+			case CONSTRAINT_FOREIGN:
+				continue;	/* not here */
+			case CONSTRAINT_PRIMARY:
+				contype = "primary key";
+				break;
+			case CONSTRAINT_UNIQUE:
+				contype = "unique";
+				break;
+			case CONSTRAINT_TRIGGER:
+				contype = "trigger";
+				break;
+			case CONSTRAINT_EXCLUSION:
+				contype = "exclusion";
+				break;
+			default:
+				elog(ERROR, "unrecognized constraint type");
+		}
+
+		/*
+		 * "type" and "contype" are not part of the printable output, but are
+		 * useful to programmatically distinguish these from columns and among
+		 * different constraint types.
+		 *
+		 * XXX it might be useful to also list the column names in a PK, etc.
+		 */
+		tmp = new_objtree_VA("CONSTRAINT %{name}I %{definition}s",
+							 4,
+							 "type", ObjTypeString, "constraint",
+							 "contype", ObjTypeString, contype,
+						 "name", ObjTypeString, NameStr(constrForm->conname),
+							 "definition", ObjTypeString,
+						  pg_get_constraintdef_command_simple(constrForm->oid));
+		elements = lappend(elements, new_object_object(tmp));
+	}
+
+	systable_endscan(scan);
+	table_close(conRel, AccessShareLock);
+
+	return elements;
+}
+
+/*
+ * deparse the ON COMMMIT ... clause for CREATE ... TEMPORARY ...
+ */
+static ObjTree *
+deparse_OnCommitClause(OnCommitAction option)
+{
+	ObjTree	   *tmp;
+
+	tmp = new_objtree_VA("ON COMMIT %{on_commit_value}s", 0);
+	switch (option)
+	{
+		case ONCOMMIT_DROP:
+			append_string_object(tmp, "on_commit_value", "DROP");
+			break;
+
+		case ONCOMMIT_DELETE_ROWS:
+			append_string_object(tmp, "on_commit_value", "DELETE ROWS");
+			break;
+
+		case ONCOMMIT_PRESERVE_ROWS:
+			append_string_object(tmp, "on_commit_value", "PRESERVE ROWS");
+			break;
+
+		case ONCOMMIT_NOOP:
+			append_null_object(tmp, "on_commit_value");
+			append_bool_object(tmp, "present", false);
+			break;
+	}
+
+	return tmp;
+}
+
+/*
+ * Deparse DefElems, as used e.g. by ALTER COLUMN ... SET, into a list of SET
+ * (...)  or RESET (...) contents.
+ */
+static ObjTree *
+deparse_DefElem(DefElem *elem, bool is_reset)
+{
+	ObjTree	   *set;
+	ObjTree	   *optname;
+
+	if (elem->defnamespace != NULL)
+		optname = new_objtree_VA("%{schema}I.%{label}I", 1,
+								 "schema", ObjTypeString, elem->defnamespace);
+	else
+		optname = new_objtree_VA("%{label}I", 0);
+
+	append_string_object(optname, "label", elem->defname);
+
+	if (is_reset)
+		set = new_objtree_VA("%{label}s", 0);
+	else
+		set = new_objtree_VA("%{label}s = %{value}L", 1,
+							 "value", ObjTypeString,
+							 elem->arg ? defGetString(elem) :
+							 defGetBoolean(elem) ? "TRUE" : "FALSE");
+
+	append_object_object(set, "label", optname);
+	return set;
+}
+
+/*
+ * ... ALTER COLUMN ... SET/RESET (...)
+ */
+static ObjTree *
+deparse_ColumnSetOptions(AlterTableCmd *subcmd)
+{
+	List	   *sets = NIL;
+	ListCell   *cell;
+	ObjTree    *tmp;
+	bool		is_reset = subcmd->subtype == AT_ResetOptions;
+
+	if (is_reset)
+		tmp = new_objtree_VA("ALTER COLUMN %{column}I RESET (%{options:, }s)", 0);
+	else
+		tmp = new_objtree_VA("ALTER COLUMN %{column}I SET (%{options:, }s)", 0);
+
+	append_string_object(tmp, "column", subcmd->name);
+
+	foreach(cell, (List *) subcmd->def)
+	{
+		DefElem	   *elem;
+		ObjTree	   *set;
+
+		elem = (DefElem *) lfirst(cell);
+		set = deparse_DefElem(elem, is_reset);
+		sets = lappend(sets, new_object_object(set));
+	}
+
+	append_array_object(tmp, "options", sets);
+
+	return tmp;
+}
+
+/*
+ * ... ALTER COLUMN ... SET/RESET (...)
+ */
+static ObjTree *
+deparse_RelSetOptions(AlterTableCmd *subcmd)
+{
+	List	   *sets = NIL;
+	ListCell   *cell;
+	ObjTree    *tmp;
+	bool		is_reset = subcmd->subtype == AT_ResetRelOptions;
+
+	if (is_reset)
+		tmp = new_objtree_VA("RESET (%{options:, }s)", 0);
+	else
+		tmp = new_objtree_VA("SET (%{options:, }s)", 0);
+
+	foreach(cell, (List *) subcmd->def)
+	{
+		DefElem	   *elem;
+		ObjTree	   *set;
+
+		elem = (DefElem *) lfirst(cell);
+		set = deparse_DefElem(elem, is_reset);
+		sets = lappend(sets, new_object_object(set));
+	}
+
+	append_array_object(tmp, "options", sets);
+
+	return tmp;
+}
+
+/*
+ * deparse_CreateStmt
+ *		Deparse a CreateStmt (CREATE TABLE)
+ *
+ * Given a table OID and the parsetree that created it, return an ObjTree
+ * representing the creation command.
+ */
+static ObjTree *
+deparse_CreateStmt(Oid objectId, Node *parsetree)
+{
+	CreateStmt *node = (CreateStmt *) parsetree;
+	Relation	relation = relation_open(objectId, AccessShareLock);
+	List	   *dpcontext;
+	ObjTree    *createStmt;
+	ObjTree    *tmp;
+	List	   *list;
+	ListCell   *cell;
+	char	   *fmtstr;
+
+	/*
+	 * Typed tables use a slightly different format string: we must not put
+	 * table_elements with parents directly in the fmt string, because if
+	 * there are no options the parens must not be emitted; and also, typed
+	 * tables do not allow for inheritance.
+	 */
+	if (node->ofTypename)
+		fmtstr = "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D "
+			"OF %{of_type}T %{table_elements}s "
+			"%{with_clause}s %{on_commit}s %{tablespace}s";
+	else
+		fmtstr = "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D "
+			"(%{table_elements:, }s) %{inherits}s "
+			"%{with_clause}s %{on_commit}s %{tablespace}s";
+
+	createStmt =
+		new_objtree_VA(fmtstr, 1,
+					   "persistence", ObjTypeString,
+					   get_persistence_str(relation->rd_rel->relpersistence));
+
+	tmp = new_objtree_for_qualname(relation->rd_rel->relnamespace,
+								   RelationGetRelationName(relation));
+	append_object_object(createStmt, "identity", tmp);
+
+	append_string_object(createStmt, "if_not_exists",
+						 node->if_not_exists ? "IF NOT EXISTS" : "");
+
+	dpcontext = deparse_context_for(RelationGetRelationName(relation),
+									objectId);
+
+	if (node->ofTypename)
+	{
+		List	   *tableelts = NIL;
+
+		/*
+		 * We can't put table elements directly in the fmt string as an array
+		 * surrounded by parens here, because an empty clause would cause a
+		 * syntax error.  Therefore, we use an indirection element and set
+		 * present=false when there are no elements.
+		 */
+		append_string_object(createStmt, "table_kind", "typed");
+
+		tmp = new_objtree_for_type(relation->rd_rel->reloftype, -1);
+		append_object_object(createStmt, "of_type", tmp);
+
+		tableelts = deparseTableElements(relation, node->tableElts, dpcontext,
+										 true,		/* typed table */
+										 false);	/* not composite */
+		tableelts = obtainConstraints(tableelts, objectId, InvalidOid);
+		if (tableelts == NIL)
+			tmp = new_objtree_VA("", 1,
+								 "present", ObjTypeBool, false);
+		else
+			tmp = new_objtree_VA("(%{elements:, }s)", 1,
+								 "elements", ObjTypeArray, tableelts);
+		append_object_object(createStmt, "table_elements", tmp);
+	}
+	else
+	{
+		List	   *tableelts = NIL;
+
+		/*
+		 * There is no need to process LIKE clauses separately; they have
+		 * already been transformed into columns and constraints.
+		 */
+		append_string_object(createStmt, "table_kind", "plain");
+
+		/*
+		 * Process table elements: column definitions and constraints.	Only
+		 * the column definitions are obtained from the parse node itself.	To
+		 * get constraints we rely on pg_constraint, because the parse node
+		 * might be missing some things such as the name of the constraints.
+		 */
+		tableelts = deparseTableElements(relation, node->tableElts, dpcontext,
+										 false,		/* not typed table */
+										 false);	/* not composite */
+		tableelts = obtainConstraints(tableelts, objectId, InvalidOid);
+
+		append_array_object(createStmt, "table_elements", tableelts);
+
+		/*
+		 * Add inheritance specification.  We cannot simply scan the list of
+		 * parents from the parser node, because that may lack the actual
+		 * qualified names of the parent relations.  Rather than trying to
+		 * re-resolve them from the information in the parse node, it seems
+		 * more accurate and convenient to grab it from pg_inherits.
+		 */
+		tmp = new_objtree_VA("INHERITS (%{parents:, }D)", 0);
+		if (list_length(node->inhRelations) > 0)
+		{
+			List	   *parents = NIL;
+			Relation	inhRel;
+			SysScanDesc scan;
+			ScanKeyData key;
+			HeapTuple	tuple;
+
+			inhRel = table_open(InheritsRelationId, RowExclusiveLock);
+
+			ScanKeyInit(&key,
+						Anum_pg_inherits_inhrelid,
+						BTEqualStrategyNumber, F_OIDEQ,
+						ObjectIdGetDatum(objectId));
+
+			scan = systable_beginscan(inhRel, InheritsRelidSeqnoIndexId,
+									  true, NULL, 1, &key);
+
+			while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+			{
+				ObjTree    *parent;
+				Form_pg_inherits formInh = (Form_pg_inherits) GETSTRUCT(tuple);
+
+				parent = new_objtree_for_qualname_id(RelationRelationId,
+													 formInh->inhparent);
+				parents = lappend(parents, new_object_object(parent));
+			}
+
+			systable_endscan(scan);
+			table_close(inhRel, RowExclusiveLock);
+
+			append_array_object(tmp, "parents", parents);
+		}
+		else
+		{
+			append_null_object(tmp, "parents");
+			append_bool_object(tmp, "present", false);
+		}
+		append_object_object(createStmt, "inherits", tmp);
+	}
+
+	tmp = new_objtree_VA("TABLESPACE %{tablespace}I", 0);
+	if (node->tablespacename)
+		append_string_object(tmp, "tablespace", node->tablespacename);
+	else
+	{
+		append_null_object(tmp, "tablespace");
+		append_bool_object(tmp, "present", false);
+	}
+	append_object_object(createStmt, "tablespace", tmp);
+
+	append_object_object(createStmt, "on_commit",
+						 deparse_OnCommitClause(node->oncommit));
+
+	/* WITH clause */
+	tmp = new_objtree_VA("WITH (%{with:, }s)", 0);
+	list = NIL;
+
+	foreach(cell, node->options)
+	{
+		ObjTree	   *tmp2;
+		DefElem	*opt = (DefElem *) lfirst(cell);
+
+		tmp2 = deparse_DefElem(opt, false);
+		list = lappend(list, new_object_object(tmp2));
+	}
+
+	if (list)
+		append_array_object(tmp, "with", list);
+	else
+		append_bool_object(tmp, "present", false);
+
+	append_object_object(createStmt, "with_clause", tmp);
+
+	relation_close(relation, AccessShareLock);
+
+	return createStmt;
+}
+
+static ObjTree *
+deparse_AlterTableStmt(CollectedCommand *cmd)
+{
+	ObjTree	   *alterTableStmt;
+	ObjTree	   *tmp;
+	ObjTree	   *tmp2;
+	List	   *dpcontext;
+	Relation	rel;
+	List	   *subcmds = NIL;
+	ListCell   *cell;
+	char	   *fmtstr;
+	const char *reltype;
+	bool		istype = false;
+
+	Assert(cmd->type == SCT_AlterTable);
+
+	rel = relation_open(cmd->d.alterTable.objectId, AccessShareLock);
+	dpcontext = deparse_context_for(RelationGetRelationName(rel),
+									cmd->d.alterTable.objectId);
+
+	switch (rel->rd_rel->relkind)
+	{
+		case RELKIND_RELATION:
+			reltype = "TABLE";
+			break;
+		case RELKIND_INDEX:
+			reltype = "INDEX";
+			break;
+		case RELKIND_VIEW:
+			reltype = "VIEW";
+			break;
+		case RELKIND_COMPOSITE_TYPE:
+			reltype = "TYPE";
+			istype = true;
+			break;
+		case RELKIND_FOREIGN_TABLE:
+			reltype = "FOREIGN TABLE";
+			break;
+
+		/* TODO support for partitioned table */
+
+		default:
+			elog(ERROR, "unexpected relkind %d", rel->rd_rel->relkind);
+			reltype = NULL;;
+	}
+
+	fmtstr = psprintf("ALTER %s %%{identity}D %%{subcmds:, }s", reltype);
+	alterTableStmt = new_objtree_VA(fmtstr, 0);
+
+	tmp = new_objtree_for_qualname(rel->rd_rel->relnamespace,
+								   RelationGetRelationName(rel));
+	append_object_object(alterTableStmt, "identity", tmp);
+
+	foreach(cell, cmd->d.alterTable.subcmds)
+	{
+		CollectedATSubcmd *sub = (CollectedATSubcmd *) lfirst(cell);
+		AlterTableCmd	*subcmd = (AlterTableCmd *) sub->parsetree;
+		ObjTree	   *tree;
+
+		Assert(IsA(subcmd, AlterTableCmd));
+
+		switch (subcmd->subtype)
+		{
+			case AT_AddColumn:
+			case AT_AddColumnRecurse:
+				/* XXX need to set the "recurse" bit somewhere? */
+				Assert(IsA(subcmd->def, ColumnDef));
+				tree = deparse_ColumnDef(rel, dpcontext, false,
+										 (ColumnDef *) subcmd->def, true);
+				fmtstr = psprintf("ADD %s %%{definition}s",
+								  istype ? "ATTRIBUTE" : "COLUMN");
+				tmp = new_objtree_VA(fmtstr, 2,
+									 "type", ObjTypeString, "add column",
+									 "definition", ObjTypeObject, tree);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_AddIndexConstraint:
+				{
+					IndexStmt  *istmt;
+					Relation	idx;
+					const char *idxname;
+					Oid			constrOid = sub->address.objectId;
+
+					Assert(IsA(subcmd->def, IndexStmt));
+					istmt = (IndexStmt *) subcmd->def;
+
+					Assert(istmt->isconstraint && istmt->unique);
+
+					idx = relation_open(istmt->indexOid, AccessShareLock);
+					idxname = RelationGetRelationName(idx);
+
+					tmp = new_objtree_VA("ADD CONSTRAINT %{name}I %{constraint_type}s USING INDEX %index_name}I %{deferrable}s %{init_deferred}s",
+										 4, "type", ObjTypeString, "add constraint using index",
+										 "name", ObjTypeString, get_constraint_name(constrOid),
+										 "constraint_type", ObjTypeString,
+										 istmt->primary ? "PRIMARY KEY" : "UNIQUE",
+										 "index_name", ObjTypeString, idxname);
+
+					append_string_object(tmp, "deferrable", istmt->deferrable ?
+										 "DEFERRABLE" : "NOT DEFERRABLE");
+					append_string_object(tmp, "init_deferred", istmt->initdeferred ?
+										 "INITIALLY DEFERRED" : "INITIALLY IMMEDIATE");
+
+					subcmds = lappend(subcmds, new_object_object(tmp));
+
+					relation_close(idx, AccessShareLock);
+				}
+				break;
+
+			case AT_ReAddIndex:
+			case AT_ReAddConstraint:
+			case AT_ReAddComment:
+			case AT_ReplaceRelOptions:
+				/* Subtypes used for internal operations; nothing to do here */
+				break;
+
+			case AT_AddColumnToView:
+				/* CREATE OR REPLACE VIEW -- nothing to do here */
+				break;
+
+			case AT_ColumnDefault:
+				if (subcmd->def == NULL)
+				{
+					tmp = new_objtree_VA("ALTER COLUMN %{column}I DROP DEFAULT",
+										 1, "type", ObjTypeString, "drop default");
+				}
+				else
+				{
+					List	   *dpcontext;
+					HeapTuple	attrtup;
+					AttrNumber	attno;
+
+					tmp = new_objtree_VA("ALTER COLUMN %{column}I SET DEFAULT %{definition}s",
+										 1, "type", ObjTypeString, "set default");
+
+					dpcontext = deparse_context_for(RelationGetRelationName(rel),
+													RelationGetRelid(rel));
+					attrtup = SearchSysCacheAttName(RelationGetRelid(rel), subcmd->name);
+					attno = ((Form_pg_attribute) GETSTRUCT(attrtup))->attnum;
+					append_string_object(tmp, "definition",
+										 RelationGetColumnDefault(rel, attno, dpcontext));
+					ReleaseSysCache(attrtup);
+				}
+				append_string_object(tmp, "column", subcmd->name);
+
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DropNotNull:
+				tmp = new_objtree_VA("ALTER COLUMN %{column}I DROP NOT NULL",
+									 1, "type", ObjTypeString, "drop not null");
+				append_string_object(tmp, "column", subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_SetNotNull:
+				tmp = new_objtree_VA("ALTER COLUMN %{column}I SET NOT NULL",
+									 1, "type", ObjTypeString, "set not null");
+				append_string_object(tmp, "column", subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_SetStatistics:
+				{
+					Assert(IsA(subcmd->def, Integer));
+					tmp = new_objtree_VA("ALTER COLUMN %{column}I SET STATISTICS %{statistics}n",
+										 3, "type", ObjTypeString, "set statistics",
+										 "column", ObjTypeString, subcmd->name,
+										 "statistics", ObjTypeInteger,
+										 intVal((Integer *) subcmd->def));
+					subcmds = lappend(subcmds, new_object_object(tmp));
+				}
+				break;
+
+			case AT_SetOptions:
+			case AT_ResetOptions:
+				subcmds = lappend(subcmds, new_object_object(
+									  deparse_ColumnSetOptions(subcmd)));
+				break;
+
+			case AT_SetStorage:
+				Assert(IsA(subcmd->def, String));
+				tmp = new_objtree_VA("ALTER COLUMN %{column}I SET STORAGE %{storage}s",
+									 3, "type", ObjTypeString, "set storage",
+									 "column", ObjTypeString, subcmd->name,
+									 "storage", ObjTypeString,
+									 strVal((String *) subcmd->def));
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DropColumnRecurse:
+			case AT_DropColumn:
+				fmtstr = psprintf("DROP %s %%{column}I %%{cascade}s",
+								  istype ? "ATTRIBUTE" : "COLUMN");
+				tmp = new_objtree_VA(fmtstr, 2,
+									 "type", ObjTypeString, "drop column",
+									 "column", ObjTypeString, subcmd->name);
+				tmp2 = new_objtree_VA("CASCADE", 1,
+									  "present", ObjTypeBool, subcmd->behavior);
+				append_object_object(tmp, "cascade", tmp2);
+
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_AddIndex:
+				{
+					Oid			idxOid = sub->address.objectId;
+					IndexStmt  *istmt;
+					Relation	idx;
+					const char *idxname;
+					Oid			constrOid;
+
+					Assert(IsA(subcmd->def, IndexStmt));
+					istmt = (IndexStmt *) subcmd->def;
+
+					if (!istmt->isconstraint)
+						break;
+
+					idx = relation_open(idxOid, AccessShareLock);
+					idxname = RelationGetRelationName(idx);
+
+					constrOid = get_relation_constraint_oid(
+						cmd->d.alterTable.objectId, idxname, false);
+
+					tmp = new_objtree_VA("ADD CONSTRAINT %{name}I %{definition}s",
+										 3, "type", ObjTypeString, "add constraint",
+										 "name", ObjTypeString, idxname,
+										 "definition", ObjTypeString,
+										 pg_get_constraintdef_command_simple(constrOid));
+					subcmds = lappend(subcmds, new_object_object(tmp));
+
+					relation_close(idx, AccessShareLock);
+				}
+				break;
+
+			case AT_AddConstraint:
+			case AT_AddConstraintRecurse:
+				{
+					/* XXX need to set the "recurse" bit somewhere? */
+					Oid			constrOid = sub->address.objectId;
+
+					tmp = new_objtree_VA("ADD CONSTRAINT %{name}I %{definition}s",
+										 3, "type", ObjTypeString, "add constraint",
+										 "name", ObjTypeString, get_constraint_name(constrOid),
+										 "definition", ObjTypeString,
+										 pg_get_constraintdef_command_simple(constrOid));
+					subcmds = lappend(subcmds, new_object_object(tmp));
+				}
+				break;
+
+			case AT_AlterConstraint:
+				{
+					Oid		constrOid = sub->address.objectId;
+					Constraint *c = (Constraint *) subcmd->def;
+
+					/* if no constraint was altered, silently skip it */
+					if (!OidIsValid(constrOid))
+						break;
+
+					Assert(IsA(c, Constraint));
+					tmp = new_objtree_VA("ALTER CONSTRAINT %{name}I %{deferrable}s %{init_deferred}s",
+										 2, "type", ObjTypeString, "alter constraint",
+										 "name", ObjTypeString, get_constraint_name(constrOid));
+					append_string_object(tmp, "deferrable", c->deferrable ?
+										 "DEFERRABLE" : "NOT DEFERRABLE");
+					append_string_object(tmp, "init_deferred", c->initdeferred ?
+										 "INITIALLY DEFERRED" : "INITIALLY IMMEDIATE");
+					subcmds = lappend(subcmds, new_object_object(tmp));
+				}
+				break;
+
+			case AT_ValidateConstraintRecurse:
+			case AT_ValidateConstraint:
+				tmp = new_objtree_VA("VALIDATE CONSTRAINT %{constraint}I", 2,
+									 "type", ObjTypeString, "validate constraint",
+									 "constraint", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DropConstraintRecurse:
+			case AT_DropConstraint:
+				tmp = new_objtree_VA("DROP CONSTRAINT %{constraint}I", 2,
+									 "type", ObjTypeString, "drop constraint",
+									 "constraint", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_AlterColumnType:
+				{
+					TupleDesc tupdesc = RelationGetDescr(rel);
+					Form_pg_attribute att;
+					ColumnDef	   *def;
+
+					att = &(tupdesc->attrs[sub->address.objectSubId - 1]);
+					def = (ColumnDef *) subcmd->def;
+					Assert(IsA(def, ColumnDef));
+
+					fmtstr = psprintf("ALTER %s %%{column}I SET DATA TYPE %%{datatype}T %%{collation}s %s",
+									  istype ? "ATTRIBUTE" : "COLUMN",
+									  istype ? "%{cascade}s" : "%{using}s");
+
+					tmp = new_objtree_VA(fmtstr, 2,
+										 "type", ObjTypeString, "alter column type",
+										 "column", ObjTypeString, subcmd->name);
+					/* add the TYPE clause */
+					append_object_object(tmp, "datatype",
+										 new_objtree_for_type(att->atttypid,
+															  att->atttypmod));
+
+					/* add a COLLATE clause, if needed */
+					tmp2 = new_objtree_VA("COLLATE %{name}D", 0);
+					if (OidIsValid(att->attcollation))
+					{
+						ObjTree *collname;
+
+						collname = new_objtree_for_qualname_id(CollationRelationId,
+															   att->attcollation);
+						append_object_object(tmp2, "name", collname);
+					}
+					else
+						append_bool_object(tmp2, "present", false);
+					append_object_object(tmp, "collation", tmp2);
+
+					/* if not a composite type, add the USING clause */
+					if (!istype)
+					{
+						/*
+						 * If there's a USING clause, transformAlterTableStmt
+						 * ran it through transformExpr and stored the
+						 * resulting node in cooked_default, which we can use
+						 * here.
+						 */
+						tmp2 = new_objtree_VA("USING %{expression}s", 0);
+						if (def->raw_default)
+						{
+							Datum	deparsed;
+							char   *defexpr;
+
+							defexpr = nodeToString(def->cooked_default);
+							deparsed = DirectFunctionCall2(pg_get_expr,
+														   CStringGetTextDatum(defexpr),
+														   RelationGetRelid(rel));
+							append_string_object(tmp2, "expression",
+												 TextDatumGetCString(deparsed));
+						}
+						else
+							append_bool_object(tmp2, "present", false);
+						append_object_object(tmp, "using", tmp2);
+					}
+
+					/* if it's a composite type, add the CASCADE clause */
+					if (istype)
+					{
+						tmp2 = new_objtree_VA("CASCADE", 0);
+						if (subcmd->behavior != DROP_CASCADE)
+							append_bool_object(tmp2, "present", false);
+						append_object_object(tmp, "cascade", tmp2);
+					}
+
+					subcmds = lappend(subcmds, new_object_object(tmp));
+				}
+				break;
+
+#ifdef TODOLIST
+			case AT_AlterColumnGenericOptions:
+				tmp = deparse_FdwOptions((List *) subcmd->def,
+										 subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+#endif
+			case AT_ChangeOwner:
+				tmp = new_objtree_VA("OWNER TO %{owner}I",
+									 2, "type", ObjTypeString, "change owner",
+									 "owner",  ObjTypeString,
+									 get_rolespec_name(subcmd->newowner));
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_ClusterOn:
+				tmp = new_objtree_VA("CLUSTER ON %{index}I", 2,
+									 "type", ObjTypeString, "cluster on",
+									 "index", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DropCluster:
+				tmp = new_objtree_VA("SET WITHOUT CLUSTER", 1,
+									 "type", ObjTypeString, "set without cluster");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_SetLogged:
+				tmp = new_objtree_VA("SET LOGGED", 1,
+									 "type", ObjTypeString, "set logged");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_SetUnLogged:
+				tmp = new_objtree_VA("SET UNLOGGED", 1,
+									 "type", ObjTypeString, "set unlogged");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+/*
+removed feature
+			case AT_AddOidsRecurse:
+			case AT_AddOids:
+				tmp = new_objtree_VA("SET WITH OIDS", 1,
+									 "type", ObjTypeString, "set with oids");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+*/
+			case AT_DropOids:
+				tmp = new_objtree_VA("SET WITHOUT OIDS", 1,
+									 "type", ObjTypeString, "set without oids");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+			case AT_SetAccessMethod:
+				tmp = new_objtree_VA("SET ACCESS METHOD %{access_method}I", 2,
+									 "type", ObjTypeString, "set access method",
+									 "access_method", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+			case AT_SetTableSpace:
+				tmp = new_objtree_VA("SET TABLESPACE %{tablespace}I", 2,
+									 "type", ObjTypeString, "set tablespace",
+									 "tablespace", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_SetRelOptions:
+			case AT_ResetRelOptions:
+				subcmds = lappend(subcmds, new_object_object(
+									  deparse_RelSetOptions(subcmd)));
+				break;
+
+			case AT_EnableTrig:
+				tmp = new_objtree_VA("ENABLE TRIGGER %{trigger}I", 2,
+									 "type", ObjTypeString, "enable trigger",
+									 "trigger", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_EnableAlwaysTrig:
+				tmp = new_objtree_VA("ENABLE ALWAYS TRIGGER %{trigger}I", 2,
+									 "type", ObjTypeString, "enable always trigger",
+									 "trigger", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_EnableReplicaTrig:
+				tmp = new_objtree_VA("ENABLE REPLICA TRIGGER %{trigger}I", 2,
+									 "type", ObjTypeString, "enable replica trigger",
+									 "trigger", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DisableTrig:
+				tmp = new_objtree_VA("DISABLE TRIGGER %{trigger}I", 2,
+									 "type", ObjTypeString, "disable trigger",
+									 "trigger", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_EnableTrigAll:
+				tmp = new_objtree_VA("ENABLE TRIGGER ALL", 1,
+									 "type", ObjTypeString, "enable trigger all");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DisableTrigAll:
+				tmp = new_objtree_VA("DISABLE TRIGGER ALL", 1,
+									 "type", ObjTypeString, "disable trigger all");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_EnableTrigUser:
+				tmp = new_objtree_VA("ENABLE TRIGGER USER", 1,
+									 "type", ObjTypeString, "enable trigger user");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DisableTrigUser:
+				tmp = new_objtree_VA("DISABLE TRIGGER USER", 1,
+									 "type", ObjTypeString, "disable trigger user");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_EnableRule:
+				tmp = new_objtree_VA("ENABLE RULE %{rule}I", 2,
+									 "type", ObjTypeString, "enable rule",
+									 "rule", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_EnableAlwaysRule:
+				tmp = new_objtree_VA("ENABLE ALWAYS RULE %{rule}I", 2,
+									 "type", ObjTypeString, "enable always rule",
+									 "rule", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_EnableReplicaRule:
+				tmp = new_objtree_VA("ENABLE REPLICA RULE %{rule}I", 2,
+									 "type", ObjTypeString, "enable replica rule",
+									 "rule", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DisableRule:
+				tmp = new_objtree_VA("DISABLE RULE %{rule}I", 2,
+									 "type", ObjTypeString, "disable rule",
+									 "rule", ObjTypeString, subcmd->name);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_AddInherit:
+				tmp = new_objtree_VA("INHERIT %{parent}D",
+									 2, "type", ObjTypeString, "inherit",
+									 "parent", ObjTypeObject,
+									 new_objtree_for_qualname_id(RelationRelationId,
+																 sub->address.objectId));
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DropInherit:
+				tmp = new_objtree_VA("NO INHERIT %{parent}D",
+									 2, "type", ObjTypeString, "drop inherit",
+									 "parent", ObjTypeObject,
+									 new_objtree_for_qualname_id(RelationRelationId,
+																 sub->address.objectId));
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_AddOf:
+				tmp = new_objtree_VA("OF %{type_of}T",
+									 2, "type", ObjTypeString, "add of",
+									 "type_of", ObjTypeObject,
+									 new_objtree_for_type(sub->address.objectId, -1));
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DropOf:
+				tmp = new_objtree_VA("NOT OF",
+									 1, "type", ObjTypeString, "not of");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_ReplicaIdentity:
+				tmp = new_objtree_VA("REPLICA IDENTITY %{ident}s", 1,
+									 "type", ObjTypeString, "replica identity");
+				switch (((ReplicaIdentityStmt *) subcmd->def)->identity_type)
+				{
+					case REPLICA_IDENTITY_DEFAULT:
+						append_string_object(tmp, "ident", "DEFAULT");
+						break;
+					case REPLICA_IDENTITY_FULL:
+						append_string_object(tmp, "ident", "FULL");
+						break;
+					case REPLICA_IDENTITY_NOTHING:
+						append_string_object(tmp, "ident", "NOTHING");
+						break;
+					case REPLICA_IDENTITY_INDEX:
+						tmp2 = new_objtree_VA("USING INDEX %{index}I", 1,
+											  "index", ObjTypeString,
+											  ((ReplicaIdentityStmt *) subcmd->def)->name);
+						append_object_object(tmp, "ident", tmp2);
+						break;
+				}
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_EnableRowSecurity:
+				tmp = new_objtree_VA("ENABLE ROW LEVEL SECURITY", 1,
+									 "type", ObjTypeString, "enable row security");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+
+			case AT_DisableRowSecurity:
+				tmp = new_objtree_VA("DISABLE ROW LEVEL SECURITY", 1,
+									 "type", ObjTypeString, "disable row security");
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+#ifdef TODOLIST
+			case AT_GenericOptions:
+				tmp = deparse_FdwOptions((List *) subcmd->def, NULL);
+				subcmds = lappend(subcmds, new_object_object(tmp));
+				break;
+#endif
+			default:
+				elog(WARNING, "unsupported alter table subtype %d",
+					 subcmd->subtype);
+				break;
+		}
+	}
+
+	table_close(rel, AccessShareLock);
+
+	if (list_length(subcmds) == 0)
+		return NULL;
+
+	append_array_object(alterTableStmt, "subcmds", subcmds);
+	return alterTableStmt;
+}
+
+/*
+ * Handle deparsing of simple commands.
+ *
+ * This function should cover all cases handled in ProcessUtilitySlow.
+ */
+static ObjTree *
+deparse_simple_command(CollectedCommand *cmd)
+{
+	Oid			objectId;
+	Node	   *parsetree;
+	ObjTree	   *command;
+
+	Assert(cmd->type == SCT_Simple);
+
+	parsetree = cmd->parsetree;
+	objectId = cmd->d.simple.address.objectId;
+
+	/* This switch needs to handle everything that ProcessUtilitySlow does */
+	switch (nodeTag(parsetree))
+	{
+		case T_CreateStmt:
+			command = deparse_CreateStmt(objectId, parsetree);
+			break;
+
+		default:
+			command = NULL;
+			elog(LOG, "unrecognized node type: %d",
+				 (int) nodeTag(parsetree));
+	}
+
+	return command;
+}
+
+char *
+deparse_drop_table(const char *objidentity)
+{
+	StringInfoData  str;
+	char           *command;
+	ObjTree		   *stmt;
+	char		   *fmt;
+	Jsonb		   *jsonb;
+
+	initStringInfo(&str);
+	fmt = psprintf("DROP TABLE IF EXISTS %%{objidentity}s");
+
+	stmt = new_objtree_VA(fmt, 1, "objidentity", ObjTypeString,
+						  objidentity);
+	jsonb = objtree_to_jsonb(stmt);
+	command = JsonbToCString(&str, &jsonb->root, 128);
+
+	return command;
+}
+
+char *
+deparse_utility_command(CollectedCommand *cmd)
+{
+	OverrideSearchPath *overridePath;
+	MemoryContext	oldcxt;
+	MemoryContext	tmpcxt;
+	ObjTree		   *tree;
+	char		   *command;
+	StringInfoData  str;
+
+	/*
+	 * Allocate everything done by the deparsing routines into a temp context,
+	 * to avoid having to sprinkle them with memory handling code; but allocate
+	 * the output StringInfo before switching.
+	 */
+	initStringInfo(&str);
+	tmpcxt = AllocSetContextCreate(CurrentMemoryContext,
+								   "deparse ctx",
+								   ALLOCSET_DEFAULT_MINSIZE,
+								   ALLOCSET_DEFAULT_INITSIZE,
+								   ALLOCSET_DEFAULT_MAXSIZE);
+	oldcxt = MemoryContextSwitchTo(tmpcxt);
+
+	/*
+	 * Many routines underlying this one will invoke ruleutils.c functionality
+	 * in order to obtain deparsed versions of expressions.  In such results,
+	 * we want all object names to be qualified, so that results are "portable"
+	 * to environments with different search_path settings.  Rather than inject
+	 * what would be repetitive calls to override search path all over the
+	 * place, we do it centrally here.
+	 */
+	overridePath = GetOverrideSearchPath(CurrentMemoryContext);
+	overridePath->schemas = NIL;
+	overridePath->addCatalog = false;
+	overridePath->addTemp = true;
+	PushOverrideSearchPath(overridePath);
+
+	switch (cmd->type)
+	{
+		case SCT_Simple:
+			tree = deparse_simple_command(cmd);
+			break;
+		case SCT_AlterTable:
+			tree = deparse_AlterTableStmt(cmd);
+			break;
+		default:
+			elog(ERROR, "unexpected deparse node type %d", cmd->type);
+	}
+
+	PopOverrideSearchPath();
+
+	if (tree)
+	{
+		Jsonb *jsonb;
+
+		jsonb = objtree_to_jsonb(tree);
+		command = JsonbToCString(&str, &jsonb->root, 128);
+	}
+	else
+		command = NULL;
+
+	/*
+	 * Clean up.  Note that since we created the StringInfo in the caller's
+	 * context, the output string is not deleted here.
+	 */
+	MemoryContextSwitchTo(oldcxt);
+	MemoryContextDelete(tmpcxt);
+
+	return command;
+}
+
+/*
+ * Given a CollectedCommand, return a JSON representation of it.
+ *
+ * The command is expanded fully, so that there are no ambiguities even in the
+ * face of search_path changes.
+ */
+Datum
+ddl_deparse_to_json(PG_FUNCTION_ARGS)
+{
+	CollectedCommand *cmd = (CollectedCommand *) PG_GETARG_POINTER(0);
+	char		   *command;
+
+	command = deparse_utility_command(cmd);
+
+	if (command)
+		PG_RETURN_TEXT_P(CStringGetTextDatum(command));
+	else
+		PG_RETURN_NULL();
+}
diff --git a/src/backend/commands/ddl_json.c b/src/backend/commands/ddl_json.c
new file mode 100644
index 0000000..95ed18f
--- /dev/null
+++ b/src/backend/commands/ddl_json.c
@@ -0,0 +1,749 @@
+/*-------------------------------------------------------------------------
+ *
+ * ddl_json.c
+ *	  JSON code related to DDL command deparsing
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/ddl_json.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "lib/stringinfo.h"
+#include "utils/builtins.h"
+#include "utils/jsonb.h"
+
+
+typedef enum
+{
+	SpecTypename,
+	SpecOperatorname,
+	SpecDottedName,
+	SpecString,
+	SpecNumber,
+	SpecStringLiteral,
+	SpecIdentifier,
+	SpecRole
+} convSpecifier;
+
+typedef enum
+{
+	tv_absent,
+	tv_true,
+	tv_false
+} trivalue;
+
+static bool expand_one_jsonb_element(StringInfo out, char *param,
+						 JsonbValue *jsonval, convSpecifier specifier,
+						 const char *fmt);
+static void expand_jsonb_array(StringInfo out, char *param,
+				   JsonbValue *jsonarr, char *arraysep,
+				   convSpecifier specifier, const char *fmt);
+static void fmtstr_error_callback(void *arg);
+char *ddl_deparse_json_to_string(char *jsonb);
+
+static trivalue
+find_bool_in_jsonbcontainer(JsonbContainer *container, char *keyname)
+{
+	JsonbValue	key;
+	JsonbValue *value;
+	trivalue	result;
+
+	key.type = jbvString;
+	key.val.string.val = keyname;
+	key.val.string.len = strlen(keyname);
+	value = findJsonbValueFromContainer(container,
+										JB_FOBJECT, &key);
+	if (value == NULL)
+		return tv_absent;
+	if (value->type != jbvBool)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("element \"%s\" is not of type boolean",
+						keyname)));
+	result = value->val.boolean ? tv_true : tv_false;
+	pfree(value);
+
+	return result;
+}
+
+/*
+ * Given a JsonbContainer, find the JsonbValue with the given key name in it.
+ * If it's of a type other than jbvString, an error is raised.  If it doesn't
+ * exist, an error is raised if missing_ok; otherwise return NULL.
+ *
+ * If it exists and is a string, a freshly palloc'ed copy is returned.
+ *
+ * If *length is not NULL, it is set to the length of the string.
+ */
+static char *
+find_string_in_jsonbcontainer(JsonbContainer *container, char *keyname,
+							  bool missing_ok, int *length)
+{
+	JsonbValue	key;
+	JsonbValue *value;
+	char	   *str;
+
+	/* XXX verify that this is an object, not an array */
+
+	key.type = jbvString;
+	key.val.string.val = keyname;
+	key.val.string.len = strlen(keyname);
+	value = findJsonbValueFromContainer(container,
+										JB_FOBJECT, &key);
+	if (value == NULL)
+	{
+		if (missing_ok)
+			return NULL;
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("missing element \"%s\" in json object", keyname)));
+	}
+
+	str = pnstrdup(value->val.string.val, value->val.string.len);
+	if (length)
+		*length = value->val.string.len;
+	pfree(value);
+	return str;
+}
+
+#define ADVANCE_PARSE_POINTER(ptr,end_ptr) \
+	do { \
+		if (++(ptr) >= (end_ptr)) \
+			ereport(ERROR, \
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE), \
+					 errmsg("unterminated format specifier"))); \
+	} while (0)
+
+/*
+ * Recursive helper for pg_event_trigger_expand_command
+ *
+ * Find the "fmt" element in the given container, and expand it into the
+ * provided StringInfo.
+ */
+static void
+expand_fmt_recursive(JsonbContainer *container, StringInfo out)
+{
+	JsonbValue	key;
+	JsonbValue *value;
+	const char *cp;
+	const char *start_ptr;
+	const char *end_ptr;
+	int			len;
+
+	start_ptr = find_string_in_jsonbcontainer(container, "fmt", false, &len);
+	end_ptr = start_ptr + len;
+
+	for (cp = start_ptr; cp < end_ptr; cp++)
+	{
+		convSpecifier specifier;
+		bool		is_array;
+		char	   *param = NULL;
+		char	   *arraysep = NULL;
+
+		if (*cp != '%')
+		{
+			appendStringInfoCharMacro(out, *cp);
+			continue;
+		}
+
+		is_array = false;
+
+		ADVANCE_PARSE_POINTER(cp, end_ptr);
+
+		/* Easy case: %% outputs a single % */
+		if (*cp == '%')
+		{
+			appendStringInfoCharMacro(out, *cp);
+			continue;
+		}
+
+		/*
+		 * Scan the mandatory element name.  Allow for an array separator
+		 * (which may be the empty string) to be specified after colon.
+		 */
+		if (*cp == '{')
+		{
+			StringInfoData parbuf;
+			StringInfoData arraysepbuf;
+			StringInfo	appendTo;
+
+			initStringInfo(&parbuf);
+			appendTo = &parbuf;
+
+			ADVANCE_PARSE_POINTER(cp, end_ptr);
+			for (; cp < end_ptr;)
+			{
+				if (*cp == ':')
+				{
+					/*
+					 * found array separator delimiter; element name is now
+					 * complete, start filling the separator.
+					 */
+					initStringInfo(&arraysepbuf);
+					appendTo = &arraysepbuf;
+					is_array = true;
+					ADVANCE_PARSE_POINTER(cp, end_ptr);
+					continue;
+				}
+
+				if (*cp == '}')
+				{
+					ADVANCE_PARSE_POINTER(cp, end_ptr);
+					break;
+				}
+				appendStringInfoCharMacro(appendTo, *cp);
+				ADVANCE_PARSE_POINTER(cp, end_ptr);
+			}
+			param = parbuf.data;
+			if (is_array)
+				arraysep = arraysepbuf.data;
+		}
+		if (param == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("missing conversion name in conversion specifier")));
+
+		switch (*cp)
+		{
+			case 'I':
+				specifier = SpecIdentifier;
+				break;
+			case 'D':
+				specifier = SpecDottedName;
+				break;
+			case 's':
+				specifier = SpecString;
+				break;
+			case 'L':
+				specifier = SpecStringLiteral;
+				break;
+			case 'T':
+				specifier = SpecTypename;
+				break;
+			case 'O':
+				specifier = SpecOperatorname;
+				break;
+			case 'n':
+				specifier = SpecNumber;
+				break;
+			case 'R':
+				specifier = SpecRole;
+				break;
+			default:
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid conversion specifier \"%c\"", *cp)));
+		}
+
+		/*
+		 * Obtain the element to be expanded.
+		 */
+		key.type = jbvString;
+		key.val.string.val = param;
+		key.val.string.len = strlen(param);
+
+		value = findJsonbValueFromContainer(container, JB_FOBJECT, &key);
+
+		/* Validate that we got an array if the format string specified one. */
+
+		/* And finally print out the data */
+		if (is_array)
+			expand_jsonb_array(out, param, value, arraysep, specifier, start_ptr);
+		else
+			expand_one_jsonb_element(out, param, value, specifier, start_ptr);
+	}
+}
+
+/*
+ * Expand a json value as an identifier.  The value must be of type string.
+ */
+static void
+expand_jsonval_identifier(StringInfo buf, JsonbValue *jsonval)
+{
+	char	   *str;
+
+	Assert(jsonval->type == jbvString);
+
+	str = pnstrdup(jsonval->val.string.val,
+				   jsonval->val.string.len);
+	appendStringInfoString(buf, quote_identifier(str));
+	pfree(str);
+}
+
+/*
+ * Expand a json value as a dot-separated-name.  The value must be of type
+ * object and must contain elements "schemaname" (optional), "objname"
+ * (mandatory), "attrname" (optional).  Double quotes are added to each element
+ * as necessary, and dot separators where needed.
+ *
+ * One day we might need a "catalog" element as well, but no current use case
+ * needs that.
+ */
+static void
+expand_jsonval_dottedname(StringInfo buf, JsonbValue *jsonval)
+{
+	char	   *str;
+
+	str = find_string_in_jsonbcontainer(jsonval->val.binary.data,
+										"schemaname", true, NULL);
+	if (str)
+	{
+		appendStringInfo(buf, "%s.", quote_identifier(str));
+		pfree(str);
+	}
+
+	str = find_string_in_jsonbcontainer(jsonval->val.binary.data,
+										"objname", false, NULL);
+	appendStringInfo(buf, "%s", quote_identifier(str));
+	pfree(str);
+
+	str = find_string_in_jsonbcontainer(jsonval->val.binary.data,
+										"attrname", true, NULL);
+	if (str)
+	{
+		appendStringInfo(buf, ".%s", quote_identifier(str));
+		pfree(str);
+	}
+}
+
+/*
+ * expand a json value as a type name.
+ */
+static void
+expand_jsonval_typename(StringInfo buf, JsonbValue *jsonval)
+{
+	char	   *schema = NULL;
+	char	   *typename;
+	char	   *typmodstr;
+	trivalue	is_array;
+	char	   *array_decor;
+
+	/*
+	 * We omit schema-qualifying the output name if the schema element is
+	 * either the empty string or NULL; the difference between those two cases
+	 * is that in the latter we quote the type name, in the former we don't.
+	 * This allows for types with special typmod needs, such as interval and
+	 * timestamp (see format_type_detailed), while at the same time allowing
+	 * for the schema name to be omitted from type names that require quotes
+	 * but are to be obtained from a user schema.
+	 */
+
+	schema = find_string_in_jsonbcontainer(jsonval->val.binary.data,
+										   "schemaname", true, NULL);
+	typename = find_string_in_jsonbcontainer(jsonval->val.binary.data,
+											 "typename", false, NULL);
+	typmodstr = find_string_in_jsonbcontainer(jsonval->val.binary.data,
+											  "typmod", true, NULL);
+	is_array = find_bool_in_jsonbcontainer(jsonval->val.binary.data,
+										   "typarray");
+	switch (is_array)
+	{
+		default:
+		case tv_absent:
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("missing typarray element")));
+			break;
+		case tv_true:
+			array_decor = "[]";
+			break;
+		case tv_false:
+			array_decor = "";
+			break;
+	}
+
+	if (schema == NULL)
+		appendStringInfo(buf, "%s%s%s",
+						 quote_identifier(typename),
+						 typmodstr ? typmodstr : "",
+						 array_decor);
+	else if (schema[0] == '\0')
+		appendStringInfo(buf, "%s%s%s",
+						 typename,
+						 typmodstr ? typmodstr : "",
+						 array_decor);
+	else
+		appendStringInfo(buf, "%s.%s%s%s",
+						 quote_identifier(schema),
+						 quote_identifier(typename),
+						 typmodstr ? typmodstr : "",
+						 array_decor);
+}
+
+/*
+ * Expand a json value as an operator name
+ */
+static void
+expand_jsonval_operator(StringInfo buf, JsonbValue *jsonval)
+{
+	char	   *str;
+
+	str = find_string_in_jsonbcontainer(jsonval->val.binary.data,
+										"schemaname", true, NULL);
+	/* schema might be NULL or empty */
+	if (str != NULL && str[0] != '\0')
+		appendStringInfo(buf, "%s.", quote_identifier(str));
+
+	str = find_string_in_jsonbcontainer(jsonval->val.binary.data,
+										"objname", false, NULL);
+	appendStringInfoString(buf, str);
+}
+
+/*
+ * Expand a json value as a string.  The value must be of type string or of
+ * type object.  In the latter case it must contain a "fmt" element which will
+ * be recursively expanded; also, if the object contains an element "present"
+ * and it is set to false, the expansion is the empty string.
+ *
+ * Returns false if no actual expansion was made due to the "present" flag
+ * being set to "false".
+ */
+static bool
+expand_jsonval_string(StringInfo buf, JsonbValue *jsonval)
+{
+	if (jsonval->type == jbvString)
+	{
+		appendBinaryStringInfo(buf, jsonval->val.string.val,
+							   jsonval->val.string.len);
+	}
+	else if (jsonval->type == jbvBinary)
+	{
+		trivalue	present;
+
+		present = find_bool_in_jsonbcontainer(jsonval->val.binary.data,
+											  "present");
+		/*
+		 * If "present" is set to false, this element expands to empty;
+		 * otherwise (either true or absent), fall through to expand "fmt".
+		 */
+		if (present == tv_false)
+			return false;
+
+		expand_fmt_recursive(jsonval->val.binary.data, buf);
+	}
+	return true;
+}
+
+/*
+ * Expand a json value as a string literal
+ */
+static void
+expand_jsonval_strlit(StringInfo buf, JsonbValue *jsonval)
+{
+	char   *str;
+	StringInfoData dqdelim;
+	static const char dqsuffixes[] = "_XYZZYX_";
+	int         dqnextchar = 0;
+
+	str = pnstrdup(jsonval->val.string.val, jsonval->val.string.len);
+
+	/* easy case: if there are no ' and no \, just use a single quote */
+	if (strchr(str, '\'') == NULL &&
+		strchr(str, '\\') == NULL)
+	{
+		appendStringInfo(buf, "'%s'", str);
+		pfree(str);
+		return;
+	}
+
+	/* Otherwise need to find a useful dollar-quote delimiter */
+	initStringInfo(&dqdelim);
+	appendStringInfoString(&dqdelim, "$");
+	while (strstr(str, dqdelim.data) != NULL)
+	{
+		appendStringInfoChar(&dqdelim, dqsuffixes[dqnextchar++]);
+		dqnextchar %= sizeof(dqsuffixes) - 1;
+	}
+	/* add trailing $ */
+	appendStringInfoChar(&dqdelim, '$');
+
+	/* And finally produce the quoted literal into the output StringInfo */
+	appendStringInfo(buf, "%s%s%s", dqdelim.data, str, dqdelim.data);
+	pfree(dqdelim.data);
+	pfree(str);
+}
+
+/*
+ * Expand a json value as an integer quantity
+ */
+static void
+expand_jsonval_number(StringInfo buf, JsonbValue *jsonval)
+{
+	char *strdatum;
+
+	strdatum = DatumGetCString(DirectFunctionCall1(numeric_out,
+												   NumericGetDatum(jsonval->val.numeric)));
+	appendStringInfoString(buf, strdatum);
+}
+
+/*
+ * Expand a json value as a role name.  If the is_public element is set to
+ * true, PUBLIC is expanded (no quotes); otherwise, expand the given role name,
+ * quoting as an identifier.
+ */
+static void
+expand_jsonval_role(StringInfo buf, JsonbValue *jsonval)
+{
+	trivalue	is_public;
+
+	is_public = find_bool_in_jsonbcontainer(jsonval->val.binary.data,
+											"is_public");
+	if (is_public == tv_true)
+		appendStringInfoString(buf, "PUBLIC");
+	else
+	{
+		char *rolename;
+
+		rolename = find_string_in_jsonbcontainer(jsonval->val.binary.data,
+												 "rolename", false, NULL);
+		appendStringInfoString(buf, quote_identifier(rolename));
+	}
+}
+
+/*
+ * Expand one json element into the output StringInfo according to the
+ * conversion specifier.  The element type is validated, and an error is raised
+ * if it doesn't match what we expect for the conversion specifier.
+ *
+ * Returns false if no actual expansion was made (due to the "present" flag
+ * being set to "false" in formatted string expansion).
+ */
+static bool
+expand_one_jsonb_element(StringInfo out, char *param, JsonbValue *jsonval,
+						 convSpecifier specifier, const char *fmt)
+{
+	bool result = true;
+	ErrorContextCallback sqlerrcontext;
+
+	/* If we were given a format string, setup an ereport() context callback */
+	if (fmt)
+	{
+		sqlerrcontext.callback = fmtstr_error_callback;
+		sqlerrcontext.arg = (void *) fmt;
+		sqlerrcontext.previous = error_context_stack;
+		error_context_stack = &sqlerrcontext;
+	}
+
+	if (!jsonval)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("element \"%s\" not found", param)));
+
+	switch (specifier)
+	{
+		case SpecIdentifier:
+			if (jsonval->type != jbvString)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("expected JSON string for %%I element \"%s\", got %d",
+								param, jsonval->type)));
+			expand_jsonval_identifier(out, jsonval);
+			break;
+
+		case SpecDottedName:
+			if (jsonval->type != jbvBinary)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("expected JSON object for %%D element \"%s\", got %d",
+								param, jsonval->type)));
+			expand_jsonval_dottedname(out, jsonval);
+			break;
+
+		case SpecString:
+			if (jsonval->type != jbvString &&
+				jsonval->type != jbvBinary)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("expected JSON string or object for %%s element \"%s\", got %d",
+								param, jsonval->type)));
+			result = expand_jsonval_string(out, jsonval);
+			break;
+
+		case SpecStringLiteral:
+			if (jsonval->type != jbvString)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("expected JSON string for %%L element \"%s\", got %d",
+								param, jsonval->type)));
+			expand_jsonval_strlit(out, jsonval);
+			break;
+
+		case SpecTypename:
+			if (jsonval->type != jbvBinary)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("expected JSON object for %%T element \"%s\", got %d",
+								param, jsonval->type)));
+			expand_jsonval_typename(out, jsonval);
+			break;
+
+		case SpecOperatorname:
+			if (jsonval->type != jbvBinary)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("expected JSON object for %%O element \"%s\", got %d",
+								param, jsonval->type)));
+			expand_jsonval_operator(out, jsonval);
+			break;
+
+		case SpecNumber:
+			if (jsonval->type != jbvNumeric)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("expected JSON numeric for %%n element \"%s\", got %d",
+								param, jsonval->type)));
+			expand_jsonval_number(out, jsonval);
+			break;
+
+		case SpecRole:
+			if (jsonval->type != jbvBinary)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("expected JSON object for %%R element \"%s\", got %d",
+								param, jsonval->type)));
+			expand_jsonval_role(out, jsonval);
+			break;
+	}
+
+	if (fmt)
+		error_context_stack = sqlerrcontext.previous;
+
+	return result;
+}
+
+/*
+ * Iterate on the elements of a JSON array, expanding each one into the output
+ * StringInfo per the given conversion specifier, separated by the given
+ * separator.
+ */
+static void
+expand_jsonb_array(StringInfo out, char *param,
+				   JsonbValue *jsonarr, char *arraysep, convSpecifier specifier,
+				   const char *fmt)
+{
+	ErrorContextCallback sqlerrcontext;
+	JsonbContainer *container;
+	JsonbIterator  *it;
+	JsonbValue	v;
+	int			type;
+	size_t		arrayseplen;
+	bool		first = true;
+
+	/* If we were given a format string, setup an ereport() context callback */
+	if (fmt)
+	{
+		sqlerrcontext.callback = fmtstr_error_callback;
+		sqlerrcontext.arg = (void *) fmt;
+		sqlerrcontext.previous = error_context_stack;
+		error_context_stack = &sqlerrcontext;
+	}
+
+	if (jsonarr->type != jbvBinary)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("element \"%s\" is not a JSON array", param)));
+
+	container = jsonarr->val.binary.data;
+	if ((container->header & JB_FARRAY) == 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("element \"%s\" is not a JSON array", param)));
+
+	arrayseplen = strlen(arraysep);
+
+	it = JsonbIteratorInit(container);
+	while ((type = JsonbIteratorNext(&it, &v, true)) != WJB_DONE)
+	{
+		switch (type)
+		{
+			case WJB_ELEM:
+				if (!first)
+					appendStringInfoString(out, arraysep);
+
+				if (expand_one_jsonb_element(out, param, &v, specifier, NULL))
+					first = false;
+				else
+				{
+					if (!first)
+					{
+						/* remove the array separator */
+						out->len -= arrayseplen;
+						out->data[out->len] = '\0';
+					}
+				}
+				break;
+		}
+	}
+
+	if (fmt)
+		error_context_stack = sqlerrcontext.previous;
+}
+
+char *
+ddl_deparse_json_to_string(char *json_str)
+{
+	Datum		d;
+	Jsonb	   *jsonb;
+	StringInfo out = (StringInfo) palloc0(sizeof(StringInfoData));
+
+	initStringInfo(out);
+
+	d = DirectFunctionCall1(jsonb_in,
+							PointerGetDatum(json_str));
+	jsonb = (Jsonb *) DatumGetPointer(d);
+
+	expand_fmt_recursive(&jsonb->root, out);
+
+	return out->data;
+}
+
+/*------
+ * Returns a formatted string from a JSON object.
+ *
+ * The starting point is the element named "fmt" (which must be a string).
+ * This format string may contain zero or more %-escapes, which consist of an
+ * element name enclosed in { }, possibly followed by a conversion modifier,
+ * followed by a conversion specifier.	Possible conversion specifiers are:
+ *
+ * %		expand to a literal %.
+ * I		expand as a single, non-qualified identifier
+ * D		expand as a possibly-qualified identifier
+ * T		expand as a type name
+ * O		expand as an operator name
+ * L		expand as a string literal (quote using single quotes)
+ * s		expand as a simple string (no quoting)
+ * n		expand as a simple number (no quoting)
+ * R		expand as a role name (possibly quoted name, or PUBLIC)
+ *
+ * The element name may have an optional separator specification preceded
+ * by a colon.	Its presence indicates that the element is expected to be
+ * an array; the specified separator is used to join the array elements.
+ *------
+ */
+Datum
+ddl_deparse_expand_command(PG_FUNCTION_ARGS)
+{
+	text	   *json = PG_GETARG_TEXT_P(0);
+	char	   *json_str;
+
+	json_str = TextDatumGetCString(json);
+
+	PG_RETURN_TEXT_P(CStringGetTextDatum(ddl_deparse_json_to_string(json_str)));
+}
+
+/*
+ * Error context callback for JSON format string expansion.
+ *
+ * Possible improvement: indicate which element we're expanding, if applicable
+ */
+static void
+fmtstr_error_callback(void *arg)
+{
+	errcontext("while expanding format string \"%s\"", (char *) arg);
+
+}
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index c3937a6..2c4bcdc 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -2143,6 +2143,15 @@ pg_get_constraintdef_ext(PG_FUNCTION_ARGS)
 }
 
 /*
+ * Internal version that returns definition of a CONSTRAINT command
+ */
+char *
+pg_get_constraintdef_command_simple(Oid constraintId)
+{
+	return pg_get_constraintdef_worker(constraintId, false, 0, false);
+}
+
+/*
  * Internal version that returns a full ALTER TABLE ... ADD CONSTRAINT command
  */
 char *
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 87aa571..8aa636c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11884,5 +11884,10 @@
   proname => 'brin_minmax_multi_summary_send', provolatile => 's',
   prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
   prosrc => 'brin_minmax_multi_summary_send' },
-
+{ oid => '4642', descr => 'ddl deparse',
+  proname => 'ddl_deparse_to_json', prorettype => 'text',
+  proargtypes => 'pg_ddl_command', prosrc => 'ddl_deparse_to_json' },
+{ oid => '4643', descr => 'json to string',
+  proname => 'ddl_deparse_expand_command', prorettype => 'text',
+  proargtypes => 'text', prosrc => 'ddl_deparse_expand_command' },
 ]
diff --git a/src/include/tcop/ddl_deparse.h b/src/include/tcop/ddl_deparse.h
new file mode 100644
index 0000000..fb174d2
--- /dev/null
+++ b/src/include/tcop/ddl_deparse.h
@@ -0,0 +1,10 @@
+#ifndef DDL_DEPARSE_H
+#define DDL_DEPARSE_H
+
+#include "tcop/deparse_utility.h"
+
+extern char *deparse_utility_command(CollectedCommand *cmd);
+extern char *ddl_deparse_json_to_string(char *jsonb);
+extern char *deparse_drop_table(const char *objidentity);
+
+#endif		/* DDL_DEPARSE_H */
diff --git a/src/include/utils/ruleutils.h b/src/include/utils/ruleutils.h
index 7d48971..467de7c 100644
--- a/src/include/utils/ruleutils.h
+++ b/src/include/utils/ruleutils.h
@@ -29,6 +29,7 @@ extern char *pg_get_partkeydef_columns(Oid relid, bool pretty);
 extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname);
 
 extern char *pg_get_constraintdef_command(Oid constraintId);
+extern char *pg_get_constraintdef_command_simple(Oid constraintId);
 extern char *deparse_expression(Node *expr, List *dpcontext,
 								bool forceprefix, bool showimplicit);
 extern List *deparse_context_for(const char *aliasname, Oid relid);
-- 
2.7.2.windows.1



  [application/octet-stream] v6-0002-Support-DDL-replication.patch (127.5K, 3-v6-0002-Support-DDL-replication.patch)
  download | inline diff:
From 5a10fd3e92c86b6cd926a6bfafc10b31a1c560e1 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Fri, 27 May 2022 14:00:07 +0800
Subject: [PATCH] Support DDL replication.

To support DDL replication, we use event trigger and DDL deparsing
facilities. While creating a publication, we register a command end
trigger that deparses the DDL as a JSON blob, and WAL logs it. The event
trigger is automatically removed at the time of drop publication. The
WALSender decodes the WAL and sends it downstream similar to other DML
commands. The subscriber then converts JSON back to the DDL command string
and executes it. In the subscriber, we also add the newly added rel to
pg_subscription_rel so that the DML changes on the new table can be
replicated without having to manually run
"ALTER SUBSCRIPTION ... REFRESH PUBLICATION".

This is a POC patch to show how using event triggers and DDL deparsing
facilities we can implement DDL replication. So, the implementation is
restricted to CREATE TABLE/ALTER TABLE/DROP TABLE commands.

- For non-rewrite ALTER TABLE command and CREATE TABLE:
we deparse the command at ddl_command_end event trigger and WAL log the
deparsed json string. The WALSender decodes the WAL and sends it to
subscriber if the created/altered table is published. It supports most of
ALTER TABLE command except some commands(DDL related to PARTITIONED TABLE
...) that introduced recently which haven't been supported by the current
ddl_deparser, we will support that later.

- For DROP TABLE:
The 'command start' event handler logs a ddl message with the relids of
the tables that are dropped which the output plugin (pgoutput) stores in
its internal data structure after verifying that it is for a table that is
part of the publication. Later the 'command end' event handler sends the
actual drop message. Pgoutput on receiving the command end, only sends out
the drop command only if it is for one of the relids marked for deleting.
The reason we have to do this is because, once the logical decoder
receives the 'command end' message,  the relid of the table is no longer
valid as it has been deleted as part of invalidations received for the
drop table command. It is no longer possible to verify if the table is
part of the publication list or not. To make this possible, I have added
two more elements to the ddl xlog and ddl message, (relid and cmdtype).

We could have also handled all this on the subscriber side as well, but
that would mean sending spurious ddl messages for tables that are not part
of the publication.

- For table_rewrite ALTER TABLE command:
(ALTER COLUMN TYPE, ADD COLUMN DEFAULT, SET LOGGED, SET ACCESS METHOD)

we deparse the command and WAL log the deparsed json string at
table_rewrite event trigger. The WALSender decodes the WAL and sends it to
subscriber if the altered table is published. Then, the WALSender will
convert the upcoming rewrite INSERTs to UPDATEs and send them to
subscriber so that the data between publisher and subscriber can always be
consistent. Note that the tables that publish rewrite ddl must have a
replica identity configured in order to be able to replicate the upcoming
rewrite UPDATEs.

We do this way because of two reason:
(1) The data before the rewrite ddl could already be different among
publisher and subscriber. To make sure the extra data in subscriber which
doesn't exist in publisher also get rewritten, we need to let the
subscriber execute the original rewrite ddl to rewrite all the data at
first.

(2) the data after executing rewrite ddl could be different among
publisher and subscriber(due to different functions/operators used during
rewrite), so we need to replicate the rewrite UPDATEs to keep the data
consistent.

TO IMPROVE:
This approach could be improved by letting the subscriber try to update
the extra data itself instead of doing fully rewrite ddl and use the
upcoming rewrite UPDATEs to rewrite the rest data. To achieve this, we
could modify the deparsed json string to temporarily remove the rewrite
part and add some logic in subscriber to update the extra data.
Besides, we may not need to send rewrite changes for all type of rewrite
ddl, for example, it seems fine to skip sending rewrite changes for ALTER
TABLE SET LOGGED as the data in the table doesn't actually be changed. We
could use the deparser and event trigger to filter these ddls and skip
sending rewrite changes for them.
---
 src/backend/access/rmgrdesc/Makefile            |   1 +
 src/backend/access/rmgrdesc/logicalddlmsgdesc.c |  52 ++++
 src/backend/access/transam/rmgr.c               |   1 +
 src/backend/catalog/pg_publication.c            |   1 +
 src/backend/commands/event_trigger.c            | 146 ++++++++-
 src/backend/commands/publicationcmds.c          | 104 +++++++
 src/backend/commands/tablecmds.c                |   2 +-
 src/backend/replication/logical/Makefile        |   1 +
 src/backend/replication/logical/ddlmessage.c    |  86 ++++++
 src/backend/replication/logical/decode.c        |  41 +++
 src/backend/replication/logical/logical.c       |  93 ++++++
 src/backend/replication/logical/proto.c         |  52 +++-
 src/backend/replication/logical/reorderbuffer.c | 139 ++++++++-
 src/backend/replication/logical/worker.c        | 232 ++++++++++++++
 src/backend/replication/pgoutput/pgoutput.c     | 164 +++++++++-
 src/backend/utils/adt/ri_triggers.c             |   2 +
 src/backend/utils/cache/relcache.c              |   1 +
 src/bin/pg_dump/pg_dump.c                       |  27 +-
 src/bin/pg_dump/pg_dump.h                       |   1 +
 src/bin/pg_waldump/rmgrdesc.c                   |   1 +
 src/bin/psql/describe.c                         |  17 +-
 src/include/access/rmgrlist.h                   |   1 +
 src/include/catalog/pg_proc.dat                 |   9 +
 src/include/catalog/pg_publication.h            |   4 +
 src/include/commands/event_trigger.h            |   3 +-
 src/include/replication/ddlmessage.h            |  57 ++++
 src/include/replication/decode.h                |   1 +
 src/include/replication/logicalproto.h          |   7 +-
 src/include/replication/output_plugin.h         |  27 ++
 src/include/replication/pgoutput.h              |   1 +
 src/include/replication/reorderbuffer.h         |  39 +++
 src/include/tcop/deparse_utility.h              |   1 +
 src/test/regress/expected/psql.out              |   6 +-
 src/test/regress/expected/publication.out       | 388 ++++++++++++------------
 src/test/subscription/t/032_ddl_replication.pl  |  86 ++++++
 35 files changed, 1577 insertions(+), 217 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c
 create mode 100644 src/backend/replication/logical/ddlmessage.c
 create mode 100644 src/include/replication/ddlmessage.h
 create mode 100644 src/test/subscription/t/032_ddl_replication.pl

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72f..b8e29e8 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -19,6 +19,7 @@ OBJS = \
 	hashdesc.o \
 	heapdesc.o \
 	logicalmsgdesc.o \
+	logicalddlmsgdesc.o \
 	mxactdesc.o \
 	nbtdesc.o \
 	relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
new file mode 100644
index 0000000..81dee52
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
@@ -0,0 +1,52 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalddlmsgdesc.c
+ *	  rmgr descriptor routines for replication/logical/ddlmessage.c
+ *
+ * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/logicalddlmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/ddlmessage.h"
+
+void
+logicalddlmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_LOGICAL_DDL_MESSAGE)
+	{
+		xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec;
+		char	   *prefix = xlrec->message;
+		char	   *message = xlrec->message + xlrec->prefix_size;
+		char	   *sep = "";
+
+		Assert(prefix[xlrec->prefix_size] != '\0');
+
+		appendStringInfo(buf, "prefix \"%s\"; payload (%zu bytes): ",
+						 prefix, xlrec->message_size);
+		appendStringInfo(buf, "relid %u cmdtype %u", xlrec->relid, xlrec->cmdtype);
+		/* Write message payload as a series of hex bytes */
+		for (int cnt = 0; cnt < xlrec->message_size; cnt++)
+		{
+			appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]);
+			sep = " ";
+		}
+	}
+}
+
+const char *
+logicalddlmsg_identify(uint8 info)
+{
+	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE)
+		return "DDL MESSAGE";
+
+	return NULL;
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 8ed6924..312f117 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -27,6 +27,7 @@
 #include "fmgr.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "replication/ddlmessage.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 8c7fca6..20bf8c1 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1005,6 +1005,7 @@ GetPublication(Oid pubid)
 	pub->pubactions.pubupdate = pubform->pubupdate;
 	pub->pubactions.pubdelete = pubform->pubdelete;
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
+	pub->pubactions.pubddl = pubform->pubddl;
 	pub->pubviaroot = pubform->pubviaroot;
 
 	ReleaseSysCache(tup);
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 4642527..9bc2145 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -37,8 +37,11 @@
 #include "miscadmin.h"
 #include "parser/parse_func.h"
 #include "pgstat.h"
+#include "replication/ddlmessage.h"
+#include "replication/message.h"
 #include "tcop/deparse_utility.h"
 #include "tcop/utility.h"
+#include "tcop/ddl_deparse.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/evtcache.h"
@@ -1540,6 +1543,7 @@ EventTriggerAlterTableStart(Node *parsetree)
 
 	command->d.alterTable.classId = RelationRelationId;
 	command->d.alterTable.objectId = InvalidOid;
+	command->d.alterTable.rewrite = false;
 	command->d.alterTable.subcmds = NIL;
 	command->parsetree = copyObject(parsetree);
 
@@ -1573,7 +1577,7 @@ EventTriggerAlterTableRelid(Oid objectId)
  * internally, so that's all that this code needs to handle at the moment.
  */
 void
-EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address)
+EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address, bool rewrite)
 {
 	MemoryContext oldcxt;
 	CollectedATSubcmd *newsub;
@@ -1593,6 +1597,7 @@ EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address)
 	newsub->address = address;
 	newsub->parsetree = copyObject(subcmd);
 
+	currentEventTriggerState->currentCommand->d.alterTable.rewrite |= rewrite;
 	currentEventTriggerState->currentCommand->d.alterTable.subcmds =
 		lappend(currentEventTriggerState->currentCommand->d.alterTable.subcmds, newsub);
 
@@ -2180,3 +2185,142 @@ stringify_adefprivs_objtype(ObjectType objtype)
 
 	return "???";				/* keep compiler quiet */
 }
+
+/*
+ * publication_ddl_deparse
+ *
+ * Deparse the ddl command and log it.
+ */
+Datum
+publication_ddl_deparse_start(PG_FUNCTION_ARGS)
+{
+	EventTriggerData *trigdata;
+	char		*command = psprintf("Drop table command start");
+	DropStmt	*stmt;
+	ListCell	*cell1;
+
+	if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
+		elog(ERROR, "not fired by event trigger manager");
+
+	trigdata = (EventTriggerData *) fcinfo->context;
+	stmt	 = (DropStmt *) trigdata->parsetree;
+
+	/* extract the relid from the parse tree */
+	foreach(cell1, stmt->objects)
+	{
+		Node	*object = lfirst(cell1);
+		ObjectAddress address;
+		Relation relation = NULL;
+
+		address = get_object_address(stmt->removeType,
+									 object,
+									 &relation,
+									 AccessExclusiveLock,
+									 true);
+		LogLogicalDDLMessage("deparse", address.objectId, DCT_DropStart,
+							 command, strlen(command) + 1);
+
+		if (relation)
+			table_close(relation, NoLock);
+	}
+	return PointerGetDatum(NULL);
+}
+
+/*
+ * publication_ddl_deparse_table_rewrite
+ *
+ * Deparse the ddl table rewrite command and log it.
+ */
+Datum
+publication_ddl_deparse_table_rewrite(PG_FUNCTION_ARGS)
+{
+	CollectedCommand *cmd;
+	char        *json_string;
+
+	if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
+		elog(ERROR, "not fired by event trigger manager");
+
+	cmd = currentEventTriggerState->currentCommand;
+
+	Assert(cmd && cmd->d.alterTable.rewrite);
+
+	/* Deparse the DDL command and WAL log it to allow decoding of the same. */
+	json_string = deparse_utility_command(cmd);
+
+	if (json_string != NULL)
+		LogLogicalDDLMessage("deparse", cmd->d.alterTable.objectId, DCT_Alter,
+							 json_string, strlen(json_string) + 1);
+
+	return PointerGetDatum(NULL);
+}
+
+/*
+ * publication_ddl_deparse
+ *
+ * Deparse the ddl command and log it.
+ */
+Datum
+publication_ddl_deparse(PG_FUNCTION_ARGS)
+{
+	ListCell   *lc;
+	slist_iter  iter;
+	DeparsedCommandType type;
+	Oid relid;
+
+	if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
+		elog(ERROR, "not fired by event trigger manager");
+
+	foreach(lc, currentEventTriggerState->commandList)
+	{
+		CollectedCommand *cmd = lfirst(lc);
+		char             *json_string;
+
+		/* Rewrite DDL has been handled in table_rewrite trigger */
+		if (cmd->d.alterTable.rewrite)
+			continue;
+
+		if (cmd->type == SCT_Simple &&
+			!OidIsValid(cmd->d.simple.address.objectId))
+			continue;
+
+		/* Deparse the DDL command and WAL log it to allow decoding of the same. */
+		json_string = deparse_utility_command(cmd);
+
+		if (json_string == NULL)
+			continue;
+
+		if (cmd->type == SCT_AlterTable)
+		{
+			relid = cmd->d.alterTable.objectId;
+			type = DCT_Alter;
+		}
+		else
+		{
+			/* Only SCT_Simple for now */
+			relid = cmd->d.simple.address.objectId;
+			type = DCT_Create;
+		}
+
+		LogLogicalDDLMessage("deparse", relid, type, json_string,
+							 strlen(json_string) + 1);
+	}
+
+	slist_foreach(iter, &(currentEventTriggerState->SQLDropList))
+	{
+		SQLDropObject *obj;
+
+		obj = slist_container(SQLDropObject, next, iter.cur);
+
+		if (strncmp(obj->objecttype, "table", 5) == 0)
+		{
+			char    *command;
+
+			command = deparse_drop_table(obj->objidentity);
+
+			LogLogicalDDLMessage("deparse", obj->address.objectId, DCT_DropEnd,
+								 command, strlen(command) + 1);
+		}
+	}
+
+	return PointerGetDatum(NULL);
+}
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 8e64574..63ffc62 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -37,10 +37,12 @@
 #include "commands/publicationcmds.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "parser/parse_clause.h"
 #include "parser/parse_collate.h"
 #include "parser/parse_relation.h"
+#include "parser/parser.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/array.h"
@@ -95,6 +97,7 @@ parse_publication_options(ParseState *pstate,
 	pubactions->pubupdate = true;
 	pubactions->pubdelete = true;
 	pubactions->pubtruncate = true;
+	pubactions->pubddl = false;
 	*publish_via_partition_root = false;
 
 	/* Parse options */
@@ -141,6 +144,8 @@ parse_publication_options(ParseState *pstate,
 					pubactions->pubdelete = true;
 				else if (strcmp(publish_opt, "truncate") == 0)
 					pubactions->pubtruncate = true;
+				else if (strcmp(publish_opt, "ddl") == 0)
+					pubactions->pubddl = true;
 				else
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
@@ -756,6 +761,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 {
 	Relation	rel;
 	ObjectAddress myself;
+	ObjectAddress referenced_start, referenced_end,
+				  referenced_table_rewrite;
 	Oid			puboid;
 	bool		nulls[Natts_pg_publication];
 	Datum		values[Natts_pg_publication];
@@ -820,6 +827,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 		BoolGetDatum(pubactions.pubdelete);
 	values[Anum_pg_publication_pubtruncate - 1] =
 		BoolGetDatum(pubactions.pubtruncate);
+	values[Anum_pg_publication_pubddl - 1] =
+		BoolGetDatum(pubactions.pubddl);
 	values[Anum_pg_publication_pubviaroot - 1] =
 		BoolGetDatum(publish_via_partition_root);
 
@@ -882,6 +891,98 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 		}
 	}
 
+	/*
+	 * Create an event trigger to allow logging of DDL statements.
+	 *
+	 * TODO: We need to find a better syntax to allow replication of DDL
+	 * statements.
+	 *
+	 * XXX: This code is just to show the replication of CREATE TABLE works.
+	 * We need to enhance this once the approach for DDL replication is
+	 * finalized.
+	 */
+	if (pubactions.pubddl)
+	{
+		CreateEventTrigStmt *ddl_trigg_start, *ddl_trigg_end,
+							*ddl_trigg_table_rewrite;
+		Node				*end_arg1 = NULL;
+		Node				*start_arg1 = NULL;
+		Node				*table_rewrite_arg1 = NULL;
+		Node				*arg2 = NULL;
+		Node				*arg3 = NULL;
+		List				*end_tags = NIL;
+		List				*start_tags = NIL;
+		List				*table_rewrite_tags = NIL;
+		Oid					event_trig_start_id, event_trig_end_id,
+							event_trig_table_rewrite_id;
+		char				trigger_name_start[NAMEDATALEN];
+		char				trigger_name_end[NAMEDATALEN];
+		char				trigger_name_table_rewrite[NAMEDATALEN];
+
+		ddl_trigg_end = makeNode(CreateEventTrigStmt);
+
+		snprintf(trigger_name_end, sizeof(trigger_name_end), "pg_deparse_trig_end_%u",
+				 puboid);
+		ddl_trigg_end->trigname = pstrdup(trigger_name_end);
+		ddl_trigg_end->eventname = "ddl_command_end";
+		ddl_trigg_end->funcname = SystemFuncName("publication_ddl_deparse");
+
+		end_arg1 = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_DROP_TABLE)));
+		end_tags = list_make1(end_arg1);
+
+		/* support create table only when publication is for all tables */
+		if (stmt->for_all_tables)
+		{
+			arg2 = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_CREATE_TABLE)));
+			end_tags = lappend(end_tags, arg2);
+		}
+
+		arg3 = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_ALTER_TABLE)));
+		end_tags = lappend(end_tags, arg3);
+
+		ddl_trigg_end->whenclause = list_make1(makeDefElem("tag", (Node *) end_tags, -1));
+		event_trig_end_id = CreateEventTrigger(ddl_trigg_end);
+
+		ddl_trigg_start = makeNode(CreateEventTrigStmt);
+
+		snprintf(trigger_name_start, sizeof(trigger_name_start), "pg_deparse_trig_start_%u",
+				 puboid);
+		ddl_trigg_start->trigname = pstrdup(trigger_name_start);
+		ddl_trigg_start->eventname = "ddl_command_start";
+		ddl_trigg_start->funcname = SystemFuncName("publication_ddl_deparse_start");
+
+		start_arg1 = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_DROP_TABLE)));
+		start_tags = list_make1(start_arg1);
+		ddl_trigg_start->whenclause = list_make1(makeDefElem("tag", (Node *) start_tags, -1));
+		event_trig_start_id = CreateEventTrigger(ddl_trigg_start);
+
+		ddl_trigg_table_rewrite = makeNode(CreateEventTrigStmt);
+
+		snprintf(trigger_name_table_rewrite, sizeof(trigger_name_table_rewrite),
+				 "pg_deparse_trig_table_rewrite_%u", puboid);
+		ddl_trigg_table_rewrite->trigname = pstrdup(trigger_name_table_rewrite);
+		ddl_trigg_table_rewrite->eventname = "table_rewrite";
+		ddl_trigg_table_rewrite->funcname = SystemFuncName("publication_ddl_deparse_table_rewrite");
+
+		table_rewrite_arg1 = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_ALTER_TABLE)));
+		table_rewrite_tags = list_make1(table_rewrite_arg1);
+		ddl_trigg_table_rewrite->whenclause = list_make1(makeDefElem("tag", (Node *) table_rewrite_tags, -1));
+		event_trig_table_rewrite_id = CreateEventTrigger(ddl_trigg_table_rewrite);
+
+		/*
+		 * Register the event triggers as internally dependent on the
+		 * publication.
+		 */
+		ObjectAddressSet(referenced_end, EventTriggerRelationId, event_trig_end_id);
+		recordDependencyOn(&referenced_end, &myself, DEPENDENCY_INTERNAL);
+
+		ObjectAddressSet(referenced_start, EventTriggerRelationId, event_trig_start_id);
+		recordDependencyOn(&referenced_start, &myself, DEPENDENCY_INTERNAL);
+
+		ObjectAddressSet(referenced_table_rewrite, EventTriggerRelationId, event_trig_table_rewrite_id);
+		recordDependencyOn(&referenced_table_rewrite, &myself, DEPENDENCY_INTERNAL);
+	}
+
 	table_close(rel, RowExclusiveLock);
 
 	InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
@@ -1022,6 +1123,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 
 		values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
 		replaces[Anum_pg_publication_pubtruncate - 1] = true;
+
+		values[Anum_pg_publication_pubddl - 1] = BoolGetDatum(pubactions.pubddl);
+		replaces[Anum_pg_publication_pubddl - 1] = true;
 	}
 
 	if (publish_via_partition_root_given)
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 2de0eba..74108b9 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -5218,7 +5218,7 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 	 * Report the subcommand to interested event triggers.
 	 */
 	if (cmd)
-		EventTriggerCollectAlterTableSubcmd((Node *) cmd, address);
+		EventTriggerCollectAlterTableSubcmd((Node *) cmd, address, tab->rewrite);
 
 	/*
 	 * Bump the command counter to ensure the next subcommand in the sequence
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fde..f3eeb67 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
 OBJS = \
 	decode.o \
+	ddlmessage.o\
 	launcher.o \
 	logical.o \
 	logicalfuncs.o \
diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c
new file mode 100644
index 0000000..5093523
--- /dev/null
+++ b/src/backend/replication/logical/ddlmessage.c
@@ -0,0 +1,86 @@
+/*-------------------------------------------------------------------------
+ *
+ * ddlmessage.c
+ *	  Logical DDL messages.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/ddlmessage.c
+ *
+ * NOTES
+ *
+ * Logical DDL messages allow XLOG logging of DDL command strings that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * Unlike generic logical messages, these DDL messages have only transactional
+ * mode.Note by default DDLs in PostgreSQL are transactional.
+ *
+ * These messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/namespace.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/ddlmessage.h"
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding DDL message into XLog.
+ */
+XLogRecPtr
+LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+					 const char *message, size_t size)
+{
+	xl_logical_ddl_message xlrec;
+
+	/*
+	 * Ensure we have a valid transaction id.
+	 */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	xlrec.dbId = MyDatabaseId;
+	/* trailing zero is critical; see logicalddlmsg_desc */
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.message_size = size;
+	xlrec.relid = relid;
+	xlrec.cmdtype = cmdtype;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage);
+	XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+	XLogRegisterData(unconstify(char *, message), size);
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding ddl messages.
+ */
+void
+logicalddlmsg_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info != XLOG_LOGICAL_DDL_MESSAGE)
+		elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info);
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index aa2427b..255b0eb 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -36,6 +36,7 @@
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
 #include "replication/decode.h"
+#include "replication/ddlmessage.h"
 #include "replication/logical.h"
 #include "replication/message.h"
 #include "replication/origin.h"
@@ -604,6 +605,46 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 }
 
 /*
+ * Handle rmgr LOGICALDDLMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+void
+logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	XLogReaderState *r = buf->record;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	xl_logical_ddl_message *message;
+
+	if (info != XLOG_LOGICAL_DDL_MESSAGE)
+		elog(ERROR, "unexpected RM_LOGICALDDLMSG_ID record type: %u", info);
+
+	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding ddl messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	message = (xl_logical_ddl_message *) XLogRecGetData(r);
+
+	if (message->dbId != ctx->slot->data.database ||
+		FilterByOrigin(ctx, origin_id))
+		return;
+
+	if (SnapBuildProcessChange(builder, xid, buf->origptr))
+		ReorderBufferQueueDDLMessage(ctx->reorder, xid, buf->endptr,
+									 message->message, /* first part of message is prefix */
+									 message->message_size,
+									 message->message + message->prefix_size,
+									 message->relid, message->cmdtype);
+}
+
+/*
  * Consolidated commit record handling between the different form of commit
  * records.
  *
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 625a7f4..98969c7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -33,6 +33,7 @@
 #include "fmgr.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/ddlmessage.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
@@ -73,6 +74,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+								  XLogRecPtr message_lsn, const char *prefix,
+								  Oid relid, DeparsedCommandType cmdtype,
+								  Size message_size, const char *message);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -90,6 +95,11 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
 static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									  XLogRecPtr message_lsn, bool transactional,
 									  const char *prefix, Size message_size, const char *message);
+static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+										 XLogRecPtr message_lsn,
+										 const char *prefix,
+										 Oid relid, DeparsedCommandType cmdtype,
+										 Size message_size, const char *message);
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
@@ -218,6 +228,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
+	ctx->reorder->ddlmessage = ddlmessage_cb_wrapper;
 
 	/*
 	 * To support streaming, we require start/stop/abort/commit/change
@@ -234,6 +245,7 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_commit_cb != NULL) ||
 		(ctx->callbacks.stream_change_cb != NULL) ||
 		(ctx->callbacks.stream_message_cb != NULL) ||
+		(ctx->callbacks.stream_ddlmessage_cb != NULL) ||
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
 	/*
@@ -251,6 +263,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->stream_commit = stream_commit_cb_wrapper;
 	ctx->reorder->stream_change = stream_change_cb_wrapper;
 	ctx->reorder->stream_message = stream_message_cb_wrapper;
+	ctx->reorder->stream_ddlmessage = stream_ddlmessage_cb_wrapper;
 	ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
 
 
@@ -1221,6 +1234,44 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+					  XLogRecPtr message_lsn,
+					  const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+					  Size message_size,
+					  const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	if (ctx->callbacks.ddlmessage_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "ddlmessage";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, prefix, relid, cmdtype,
+								 message_size, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						XLogRecPtr first_lsn)
 {
@@ -1536,6 +1587,48 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							 XLogRecPtr message_lsn,
+							 const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+							 Size message_size,
+							 const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* We're only supposed to call this when streaming is supported. */
+	Assert(ctx->streaming);
+
+	/* this callback is optional */
+	if (ctx->callbacks.stream_ddlmessage_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_ddlmessage";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, prefix, relid,
+										cmdtype, message_size, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						   int nrelations, Relation relations[],
 						   ReorderBufferChange *change)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index ff8513e..eaec031 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -640,8 +640,8 @@ logicalrep_read_truncate(StringInfo in,
  */
 void
 logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
-						 bool transactional, const char *prefix, Size sz,
-						 const char *message)
+						 bool transactional, const char *prefix,
+						 Size sz, const char *message)
 {
 	uint8		flags = 0;
 
@@ -663,6 +663,52 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 }
 
 /*
+ * Read DDL MESSAGE from stream
+ */
+char *
+logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn,
+						   const char **prefix,
+						   Size *sz)
+{
+	uint8 flags;
+	char *msg;
+
+	//TODO double check when do we need to get TransactionId.
+
+	flags = pq_getmsgint(in, 1);
+	if (flags != 0)
+		elog(ERROR, "unrecognized flags %u in ddl message", flags);
+	*lsn = pq_getmsgint64(in);
+	*prefix = pq_getmsgstring(in);
+	*sz = pq_getmsgint(in, 4);
+	msg = (char *) pq_getmsgbytes(in, *sz);
+
+	return msg;
+}
+
+/*
+ * Write DDL MESSAGE to stream
+ */
+void
+logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+							const char *prefix, Size sz, const char *message)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_DDLMESSAGE);
+
+	/* transaction ID (if not valid, we're not streaming) */
+	if (TransactionIdIsValid(xid))
+		pq_sendint32(out, xid);
+
+	pq_sendint8(out, flags);
+	pq_sendint64(out, lsn);
+	pq_sendstring(out, prefix);
+	pq_sendint32(out, sz);
+	pq_sendbytes(out, message, sz);
+}
+
+/*
  * Write relation description to the output stream.
  */
 void
@@ -1218,6 +1264,8 @@ logicalrep_message_type(LogicalRepMsgType action)
 			return "TYPE";
 		case LOGICAL_REP_MSG_MESSAGE:
 			return "MESSAGE";
+		case LOGICAL_REP_MSG_DDLMESSAGE:
+			return "DDL";
 		case LOGICAL_REP_MSG_BEGIN_PREPARE:
 			return "BEGIN PREPARE";
 		case LOGICAL_REP_MSG_PREPARE:
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 8da5f90..346e19e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -94,6 +94,7 @@
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/ddlmessage.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
@@ -512,6 +513,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 				pfree(change->data.msg.message);
 			change->data.msg.message = NULL;
 			break;
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			if (change->data.ddlmsg.prefix != NULL)
+				pfree(change->data.ddlmsg.prefix);
+			change->data.ddlmsg.prefix = NULL;
+			if (change->data.ddlmsg.message != NULL)
+				pfree(change->data.ddlmsg.message);
+			change->data.ddlmsg.message = NULL;
+			break;
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			if (change->data.inval.invalidations)
 				pfree(change->data.inval.invalidations);
@@ -867,6 +876,36 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
+ * A transactional DDL message is queued to be processed upon commit.
+ */
+void
+ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid,
+							 XLogRecPtr lsn, const char *prefix,
+							 Size message_size, const char *message,
+							 Oid relid, DeparsedCommandType cmdtype)
+{
+	MemoryContext oldcontext;
+	ReorderBufferChange *change;
+
+	Assert(xid != InvalidTransactionId);
+
+	oldcontext = MemoryContextSwitchTo(rb->context);
+
+	change = ReorderBufferGetChange(rb);
+	change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE;
+	change->data.ddlmsg.prefix = pstrdup(prefix);
+	change->data.ddlmsg.relid = relid;
+	change->data.ddlmsg.cmdtype = cmdtype;
+	change->data.ddlmsg.message_size = message_size;
+	change->data.ddlmsg.message = palloc(message_size);
+	memcpy(change->data.ddlmsg.message, message, message_size);
+
+	ReorderBufferQueueChange(rb, xid, lsn, change, false);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
  *
@@ -1958,6 +1997,29 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
 }
 
 /*
+ * Helper function for ReorderBufferProcessTXN for applying the DDL message.
+ */
+static inline void
+ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
+							 ReorderBufferChange *change, bool streaming)
+{
+	if (streaming)
+		rb->stream_ddlmessage(rb, txn, change->lsn,
+							  change->data.ddlmsg.prefix,
+							  change->data.ddlmsg.relid,
+							  change->data.ddlmsg.cmdtype,
+							  change->data.ddlmsg.message_size,
+							  change->data.ddlmsg.message);
+	else
+		rb->ddlmessage(rb, txn, change->lsn,
+					   change->data.ddlmsg.prefix,
+					   change->data.ddlmsg.relid,
+					   change->data.ddlmsg.cmdtype,
+					   change->data.ddlmsg.message_size,
+					   change->data.ddlmsg.message);
+}
+
+/*
  * Function to store the command id and snapshot at the end of the current
  * stream so that we can reuse the same while sending the next stream.
  */
@@ -2335,6 +2397,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					ReorderBufferApplyMessage(rb, txn, change, streaming);
 					break;
 
+				case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+					ReorderBufferApplyDDLMessage(rb, txn, change, streaming);
+					break;
+
 				case REORDER_BUFFER_CHANGE_INVALIDATION:
 					/* Execute the invalidation messages locally */
 					ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
@@ -3710,6 +3776,40 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 				break;
 			}
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			{
+				char	   *data;
+				Size		prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
+
+				sz += prefix_size + change->data.ddlmsg.message_size +
+					sizeof(Size) + sizeof(Oid) + sizeof(int) + sizeof(Size);
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				/* write the prefix, relid and cmdtype including the size */
+				memcpy(data, &prefix_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, &change->data.ddlmsg.relid, sizeof(Oid));
+				data += sizeof(Oid);
+				memcpy(data, &change->data.ddlmsg.cmdtype, sizeof(int));
+				data += sizeof(int);
+				memcpy(data, change->data.ddlmsg.prefix,
+					   prefix_size);
+				data += prefix_size;
+
+				/* write the message including the size */
+				memcpy(data, &change->data.ddlmsg.message_size, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(data, change->data.ddlmsg.message,
+					   change->data.ddlmsg.message_size);
+				data += change->data.ddlmsg.message_size;
+
+				break;
+			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			{
 				char	   *data;
@@ -4024,6 +4124,15 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 
 				break;
 			}
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			{
+				Size		prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
+
+				sz += prefix_size + change->data.ddlmsg.message_size +
+					sizeof(Size) + sizeof(Size) + sizeof(Oid) + sizeof(int);
+
+				break;
+			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			{
 				sz += sizeof(SharedInvalidationMessage) *
@@ -4282,8 +4391,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				/* read prefix */
 				memcpy(&prefix_size, data, sizeof(Size));
 				data += sizeof(Size);
-				change->data.msg.prefix = MemoryContextAlloc(rb->context,
-															 prefix_size);
+				change->data.msg.prefix = MemoryContextAlloc(rb->context, prefix_size);
 				memcpy(change->data.msg.prefix, data, prefix_size);
 				Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
 				data += prefix_size;
@@ -4299,6 +4407,33 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 				break;
 			}
+		case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+			{
+				Size		prefix_size;
+
+				/* read prefix */
+				memcpy(&prefix_size, data, sizeof(Size));
+				data += sizeof(Size);
+				memcpy(&change->data.ddlmsg.relid, data, sizeof(Oid));
+				data += sizeof(Oid);
+				memcpy(&change->data.ddlmsg.cmdtype, data, sizeof(int));
+				data += sizeof(int);
+				change->data.ddlmsg.prefix = MemoryContextAlloc(rb->context, prefix_size);
+				memcpy(change->data.ddlmsg.prefix, data, prefix_size);
+				Assert(change->data.ddlmsg.prefix[prefix_size - 1] == '\0');
+				data += prefix_size;
+
+				/* read the message */
+				memcpy(&change->data.msg.message_size, data, sizeof(Size));
+				data += sizeof(Size);
+				change->data.msg.message = MemoryContextAlloc(rb->context,
+															  change->data.msg.message_size);
+				memcpy(change->data.msg.message, data,
+					   change->data.msg.message_size);
+				data += change->data.msg.message_size;
+
+				break;
+			}
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			{
 				Size		inval_size = sizeof(SharedInvalidationMessage) *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fc210a9..233abcd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,6 +156,7 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "optimizer/optimizer.h"
+#include "parser/analyze.h"
 #include "pgstat.h"
 #include "postmaster/bgworker.h"
 #include "postmaster/interrupt.h"
@@ -179,7 +180,10 @@
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "tcop/ddl_deparse.h"
+#include "tcop/pquery.h"
 #include "tcop/tcopprot.h"
+#include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -2445,6 +2449,230 @@ apply_handle_truncate(StringInfo s)
 	end_replication_step();
 }
 
+/* Remove the data population from the command */
+static void
+preprocess_create_table(RawStmt *command)
+{
+	CommandTag	commandTag;
+
+	commandTag = CreateCommandTag((Node *)command);
+
+	switch (commandTag)
+	{
+		case CMDTAG_CREATE_TABLE_AS:
+		case CMDTAG_SELECT_INTO:
+			{
+				CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt;
+				if (castmt->objtype == OBJECT_TABLE)
+				{
+					/*
+					 * Force skipping data population to avoid data
+					 * inconsistency. Data should be replicated from the
+					 * publisher instead.
+					 */
+					castmt->into->skipData = true;
+				}
+			}
+			break;
+		case CMDTAG_SELECT:
+			{
+				SelectStmt *sstmt = (SelectStmt *) command->stmt;
+
+				if (sstmt->intoClause != NULL)
+				{
+					/*
+					 * Force skipping data population to avoid data
+					 * inconsistency. Data should be replicated from the
+					 * publisher instead.
+					 */
+					sstmt->intoClause->skipData = true;
+				}
+			}
+			break;
+	default:
+		break;
+	}
+}
+
+/*
+ * Handle CREATE TABLE command
+ *
+ * Call AddSubscriptionRelState for CREATE TABEL command to set the relstate to
+ * SUBREL_STATE_READY so DML changes on this new table can be replicated without
+ * having to manually run "alter subscription ... refresh publication"
+ */
+static void
+handle_create_table(RawStmt *command)
+{
+	CommandTag	commandTag;
+	RangeVar	 *rv = NULL;
+	Oid			relid;
+	Oid			relnamespace = InvalidOid;
+	char 		 *schemaname = NULL;
+	char		 *relname = NULL;
+
+	commandTag = CreateCommandTag((Node *) command);
+
+	switch (commandTag)
+	{
+		case CMDTAG_CREATE_TABLE:
+			{
+				CreateStmt *cstmt = (CreateStmt *) command->stmt;
+				rv = cstmt->relation;
+			}
+			break;
+		default:
+			break;
+	}
+
+	if (!rv)
+		return;
+
+	schemaname = rv->schemaname;
+	relname = rv->relname;
+
+	if (schemaname != NULL)
+		relnamespace = get_namespace_oid(schemaname, false);
+
+	if (relnamespace != InvalidOid)
+		relid = get_relname_relid(relname, relnamespace);
+	else
+		relid = RelnameGetRelid(relname);
+
+	if (relid != InvalidOid)
+	{
+		AddSubscriptionRelState(MySubscription->oid, relid,
+								SUBREL_STATE_READY,
+								InvalidXLogRecPtr);
+		ereport(DEBUG1,
+				(errmsg_internal("table \"%s\" added to subscription \"%s\"",
+								 relname, MySubscription->name)));
+	}
+}
+
+static void
+apply_handle_ddl(StringInfo s)
+{
+	XLogRecPtr lsn;
+	const char *prefix = NULL;
+	char *message = NULL;
+	char	   *ddl_command;
+	Size		sz;
+	List	   *parsetree_list;
+	ListCell   *parsetree_item;
+	DestReceiver *receiver;
+	MemoryContext oldcontext;
+	const char *save_debug_query_string = debug_query_string;
+
+	message = logicalrep_read_ddlmessage(s, &lsn, &prefix, &sz);
+
+	/* Make sure we are in a transaction command */
+	begin_replication_step();
+
+	ddl_command = ddl_deparse_json_to_string(message);
+	debug_query_string = ddl_command;
+
+	/* DestNone for logical replication */
+	receiver = CreateDestReceiver(DestNone);
+	parsetree_list = pg_parse_query(ddl_command);
+
+	foreach(parsetree_item, parsetree_list)
+	{
+		List	   *plantree_list;
+		List	   *querytree_list;
+		RawStmt	   *command = (RawStmt *) lfirst(parsetree_item);
+		CommandTag	commandTag;
+		MemoryContext per_parsetree_context = NULL;
+		Portal		portal;
+		bool		 snapshot_set = false;
+
+		commandTag = CreateCommandTag((Node *) command);
+
+		/* If we got a cancel signal in parsing or prior command, quit */
+		CHECK_FOR_INTERRUPTS();
+
+		/* Remove data population from the command */
+		preprocess_create_table(command);
+
+		/*
+		 * Set up a snapshot if parse analysis/planning will need one.
+		 */
+		if (analyze_requires_snapshot(command))
+		{
+			PushActiveSnapshot(GetTransactionSnapshot());
+			snapshot_set = true;
+		}
+
+		/*
+		 * We do the work for each parsetree in a short-lived context, to
+		 * limit the memory used when there are many commands in the string.
+		 */
+		per_parsetree_context =
+			AllocSetContextCreate(CurrentMemoryContext,
+								  "execute_sql_string per-statement context",
+								  ALLOCSET_DEFAULT_SIZES);
+		oldcontext = MemoryContextSwitchTo(per_parsetree_context);
+
+		querytree_list = pg_analyze_and_rewrite_fixedparams(command,
+															ddl_command,
+															NULL, 0, NULL);
+
+		plantree_list = pg_plan_queries(querytree_list, ddl_command, 0, NULL);
+
+		/* Done with the snapshot used for parsing/planning */
+		if (snapshot_set)
+			PopActiveSnapshot();
+
+		portal = CreatePortal("logical replication", true, true);
+
+		/*
+		 * We don't have to copy anything into the portal, because everything
+		 * we are passing here is in ApplyMessageContext or the
+		 * per_parsetree_context, and so will outlive the portal anyway.
+		 */
+		PortalDefineQuery(portal,
+						  NULL,
+						  ddl_command,
+						  commandTag,
+						  plantree_list,
+						  NULL);
+
+		/*
+		 * Start the portal.  No parameters here.
+		 */
+		PortalStart(portal, NULL, 0, InvalidSnapshot);
+
+		/*
+		 * Switch back to transaction context for execution.
+		 */
+		MemoryContextSwitchTo(oldcontext);
+
+		(void) PortalRun(portal,
+						 FETCH_ALL,
+						 true,
+						 true,
+						 receiver,
+						 receiver,
+						 NULL);
+
+		PortalDrop(portal, false);
+
+		CommandCounterIncrement();
+
+		/*
+		 * Table created by DDL replication (database level) is automatically
+		 * added to the subscription here.
+		 */
+		handle_create_table(command);
+
+		/* Now we may drop the per-parsetree context, if one was created. */
+		MemoryContextDelete(per_parsetree_context);
+	}
+
+	debug_query_string = save_debug_query_string;
+	end_replication_step();
+}
+
 
 /*
  * Logical replication protocol message dispatcher.
@@ -2510,6 +2738,10 @@ apply_dispatch(StringInfo s)
 			 */
 			break;
 
+		case LOGICAL_REP_MSG_DDLMESSAGE:
+			apply_handle_ddl(s);
+			break;
+
 		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
 			break;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 42c06af..8b2648b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -54,6 +54,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							 bool transactional, const char *prefix,
 							 Size sz, const char *message);
+static void pgoutput_ddlmessage(LogicalDecodingContext *ctx,
+								ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+								const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+								Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -256,6 +260,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
 	cb->message_cb = pgoutput_message;
+	cb->ddlmessage_cb = pgoutput_ddlmessage;
 	cb->commit_cb = pgoutput_commit_txn;
 
 	cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -272,6 +277,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_commit_cb = pgoutput_stream_commit;
 	cb->stream_change_cb = pgoutput_change;
 	cb->stream_message_cb = pgoutput_message;
+	cb->stream_ddlmessage_cb = pgoutput_ddlmessage;
 	cb->stream_truncate_cb = pgoutput_truncate;
 	/* transaction streaming - two-phase commit */
 	cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -407,6 +413,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 
 	/* This plugin uses binary protocol. */
 	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+	opt->receive_rewrites = true;
 
 	/*
 	 * This is replication start and not slot initialization.
@@ -480,6 +487,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 
 		/* Init publication state. */
 		data->publications = NIL;
+		data->deleted_relids = NIL;
 		publications_valid = false;
 		CacheRegisterSyscacheCallback(PUBLICATIONOID,
 									  publication_invalidation_cb,
@@ -1362,9 +1370,22 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
+	bool			table_rewrite = false;
 
 	update_replication_progress(ctx, false);
 
+	/*
+	 * For heap rewrites, we might need to replicate them if the rewritten
+	 * table publishes rewrite ddl message. So get the actual relation here and
+	 * check the pubaction later.
+	 */
+	if (relation->rd_rel->relrewrite)
+	{
+		table_rewrite = true;
+		relation = RelationIdGetRelation(relation->rd_rel->relrewrite);
+		targetrel = relation;
+	}
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1398,6 +1419,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/*
+	 * We don't publish table rewrite change unless we publish the rewrite ddl
+	 * message.
+	 */
+	if (table_rewrite && !relentry->pubactions.pubddl)
+		return;
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -1427,8 +1455,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			}
 
 			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
+			if (!table_rewrite &&
+				!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry, &action))
 				break;
 
 			/*
@@ -1448,8 +1476,19 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			maybe_send_schema(ctx, change, relation, relentry);
 
 			OutputPluginPrepareWrite(ctx, true);
-			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
+
+			/*
+			 * Convert the rewrite inserts to updates so that the subscriber
+			 * can replay it. This is needed to make sure the data between
+			 * publisher and subscriber is consistent.
+			 */
+			if (table_rewrite)
+				logicalrep_write_update(ctx->out, xid, targetrel,
+										NULL, new_slot, data->binary,
+										relentry->columns);
+			else
+				logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+										data->binary, relentry->columns);
 			OutputPluginWrite(ctx, true);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -1579,6 +1618,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		ancestor = NULL;
 	}
 
+	if (table_rewrite)
+		RelationClose(relation);
+
 	/* Cleanup */
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
@@ -1656,8 +1698,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 static void
 pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-				 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
-				 const char *message)
+				 XLogRecPtr message_lsn, bool transactional,
+				 const char *prefix, Size sz, const char *message)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
@@ -1697,6 +1739,110 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+static void
+pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr message_lsn,
+					const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+					Size sz, const char *message)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+	Relation	relation = NULL;
+	TransactionId xid = InvalidTransactionId;
+	RelationSyncEntry *relentry;
+
+	/*
+	 * Remember the xid for the message in streaming mode. See
+	 * pgoutput_change.
+	 */
+	if (in_streaming)
+		xid = txn->xid;
+
+	/*
+	 * On DROP start, add the relid to a deleted_relid list if the relid
+	 * is part of a publication that supports ddl publication. We need this because
+	 * on DROP end, the relid will no longer be valid. Later on Drop end, verify that
+	 * the drop is for a relid that is on the deleted_rid list, and only then send
+	 * the ddl message.
+	 */
+
+	if ((cmdtype == DCT_DropStart))
+	{
+		relation = RelationIdGetRelation(relid);
+		Assert(relation);
+		relentry = get_rel_sync_entry(data, relation);
+		if (relentry->pubactions.pubddl)
+		{
+			data->deleted_relids = lappend_oid(data->deleted_relids, relid);
+		}
+		return;
+	}
+	else if ((cmdtype == DCT_DropEnd))
+	{
+		if(!list_member_oid(data->deleted_relids, relid))
+			return;
+		else
+		{
+			data->deleted_relids = list_delete_oid(data->deleted_relids, relid);
+		}
+	}
+	else if (cmdtype == DCT_Alter)
+	{
+		/*
+		 * For table rewrite ddl, we first send the original ddl message to
+		 * subscriber, then convert the upcoming rewrite INSERT to UPDATE and
+		 * send them to subscriber so that the data between publisher and
+		 * subscriber can always be consistent.
+		 *
+		 * We do this way because of two reason:
+		 *
+		 * (1) The data before the rewrite ddl could already be different among
+		 * publisher and subscriber. To make sure the extra data in subscriber
+		 * which doesn't exist in publisher also get rewritten, we need to let
+		 * the subscriber execute the original rewrite ddl to rewrite all the
+		 * data at first.
+		 *
+		 * (2) the data after executing rewrite ddl could be different among
+		 * publisher and subscriber(due to different functions/operators used
+		 * during rewrite), so we need to replicate the rewrite UPDATEs to keep
+		 * the data consistent.
+		 *
+		 * TO IMPROVE: We could improve this by letting the subscriber only
+		 * rewrite the extra data instead of doing fully rewrite and use the
+		 * upcoming rewrite UPDATEs to rewrite the rest data. Besides, we may
+		 * not need to send rewrite changes for all type of rewrite ddl, for
+		 * example, it seems fine to skip sending rewrite changes for ALTER
+		 * TABLE SET LOGGED as the data in the table doesn't actually be
+		 * changed.
+		 */
+		relation = RelationIdGetRelation(relid);
+		Assert(relation);
+
+		relentry = get_rel_sync_entry(data, relation);
+
+		/*
+		 * Skip sending this ddl if we don't publish ddl message or the ddl
+		 * need to be published via its root relation.
+		 */
+		if (!relentry->pubactions.pubddl ||
+			relentry->publish_as_relid != relid)
+			return;
+	}
+
+	/* Send BEGIN if we haven't yet */
+	if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_ddlmessage(ctx->out,
+								xid,
+								message_lsn,
+								prefix,
+								sz,
+								message);
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
@@ -1982,7 +2128,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
-			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+			entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+			entry->pubactions.pubddl = false;
 		entry->new_slot = NULL;
 		entry->old_slot = NULL;
 		memset(entry->exprstate, 0, sizeof(entry->exprstate));
@@ -2040,6 +2187,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->pubactions.pubupdate = false;
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
+		entry->pubactions.pubddl = false;
 
 		/*
 		 * Tuple slots cleanups. (Will be rebuilt later if needed).
@@ -2153,6 +2301,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+				entry->pubactions.pubddl    |= pub->pubactions.pubddl;
 
 				/*
 				 * We want to publish the changes as the top-most ancestor
@@ -2338,6 +2487,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	{
 		entry->replicate_valid = false;
 	}
+
 }
 
 /* Send Replication origin */
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 51b3fdc..7d60aac 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -33,6 +33,7 @@
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "commands/trigger.h"
+#include "commands/event_trigger.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "lib/ilist.h"
@@ -40,6 +41,7 @@
 #include "parser/parse_coerce.h"
 #include "parser/parse_relation.h"
 #include "storage/bufmgr.h"
+#include "tcop/ddl_deparse.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 60e72f9..7b51fb4 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5633,6 +5633,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
 		pubdesc->pubactions.pubupdate |= pubform->pubupdate;
 		pubdesc->pubactions.pubdelete |= pubform->pubdelete;
 		pubdesc->pubactions.pubtruncate |= pubform->pubtruncate;
+		pubdesc->pubactions.pubddl |= pubform->pubddl;
 
 		/*
 		 * Check if all columns referenced in the filter expression are part
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7cc9c72..030d491 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3867,6 +3867,7 @@ getPublications(Archive *fout, int *numPublications)
 	int			i_pubupdate;
 	int			i_pubdelete;
 	int			i_pubtruncate;
+	int			i_pubddl;
 	int			i_pubviaroot;
 	int			i,
 				ntups;
@@ -3882,23 +3883,29 @@ getPublications(Archive *fout, int *numPublications)
 	resetPQExpBuffer(query);
 
 	/* Get the publications. */
-	if (fout->remoteVersion >= 130000)
+	if (fout->remoteVersion >= 150000)
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "p.pubowner, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubddl, p.pubviaroot "
+						  "FROM pg_publication p");
+	else if (fout->remoteVersion >= 130000)
+		appendPQExpBuffer(query,
+						  "SELECT p.tableoid, p.oid, p.pubname, "
+						  "p.pubowner, "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false as p.pubddl, p.pubviaroot "
 						  "FROM pg_publication p");
 	else if (fout->remoteVersion >= 110000)
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "p.pubowner, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false as p.pubddl, false AS pubviaroot "
 						  "FROM pg_publication p");
 	else
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "p.pubowner, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false as p.pubddl, false AS pubviaroot "
 						  "FROM pg_publication p");
 
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
@@ -3914,6 +3921,7 @@ getPublications(Archive *fout, int *numPublications)
 	i_pubupdate = PQfnumber(res, "pubupdate");
 	i_pubdelete = PQfnumber(res, "pubdelete");
 	i_pubtruncate = PQfnumber(res, "pubtruncate");
+	i_pubddl = PQfnumber(res, "pubddl");
 	i_pubviaroot = PQfnumber(res, "pubviaroot");
 
 	pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
@@ -3937,6 +3945,8 @@ getPublications(Archive *fout, int *numPublications)
 			(strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
 		pubinfo[i].pubtruncate =
 			(strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
+		pubinfo[i].pubddl =
+			(strcmp(PQgetvalue(res, i, i_pubddl), "t") == 0);
 		pubinfo[i].pubviaroot =
 			(strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
 
@@ -4016,6 +4026,15 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo)
 		first = false;
 	}
 
+	if (pubinfo->pubddl)
+	{
+		if (!first)
+			appendPQExpBufferStr(query, ", ");
+
+		appendPQExpBufferStr(query, "ddl");
+		first = false;
+	}
+
 	appendPQExpBufferStr(query, "'");
 
 	if (pubinfo->pubviaroot)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 1d21c29..69b94ae 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -620,6 +620,7 @@ typedef struct _PublicationInfo
 	bool		pubdelete;
 	bool		pubtruncate;
 	bool		pubviaroot;
+	bool		pubddl;
 } PublicationInfo;
 
 /*
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6b8c17b..792f438 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -27,6 +27,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/message.h"
+#include "replication/ddlmessage.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 1a5d924..fe28abb 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6050,7 +6050,7 @@ listPublications(const char *pattern)
 	PQExpBufferData buf;
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
-	static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
+	static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6085,6 +6085,10 @@ listPublications(const char *pattern)
 		appendPQExpBuffer(&buf,
 						  ",\n  pubviaroot AS \"%s\"",
 						  gettext_noop("Via root"));
+	if (pset.sversion >= 140000)
+		appendPQExpBuffer(&buf,
+						  ",\n  pubddl AS \"%s\"",
+						  gettext_noop("DDLs"));
 
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
@@ -6172,6 +6176,7 @@ describePublications(const char *pattern)
 	PGresult   *res;
 	bool		has_pubtruncate;
 	bool		has_pubviaroot;
+	bool		has_pubddl;
 
 	PQExpBufferData title;
 	printTableContent cont;
@@ -6188,6 +6193,7 @@ describePublications(const char *pattern)
 
 	has_pubtruncate = (pset.sversion >= 110000);
 	has_pubviaroot = (pset.sversion >= 130000);
+	has_pubddl =  (pset.sversion >= 150000);
 
 	initPQExpBuffer(&buf);
 
@@ -6201,6 +6207,9 @@ describePublications(const char *pattern)
 	if (has_pubviaroot)
 		appendPQExpBufferStr(&buf,
 							 ", pubviaroot");
+	if (has_pubddl)
+		appendPQExpBufferStr(&buf,
+							 ", pubddl");
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
 
@@ -6249,6 +6258,8 @@ describePublications(const char *pattern)
 			ncols++;
 		if (has_pubviaroot)
 			ncols++;
+		if (has_pubddl)
+			ncols++;
 
 		initPQExpBuffer(&title);
 		printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -6263,6 +6274,8 @@ describePublications(const char *pattern)
 			printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
 		if (has_pubviaroot)
 			printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
+		if (has_pubddl)
+			printTableAddHeader(&cont, gettext_noop("DDLs"), true, align);
 
 		printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
 		printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -6273,6 +6286,8 @@ describePublications(const char *pattern)
 			printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
 		if (has_pubviaroot)
 			printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
+		if (has_pubddl)
+			printTableAddCell(&cont, PQgetvalue(res, i, 9), false, false);
 
 		if (!puballtables)
 		{
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 9a74721..9de3b8f 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, logicalddlmsg_decode)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 8aa636c..6396566 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11890,4 +11890,13 @@
 { oid => '4643', descr => 'json to string',
   proname => 'ddl_deparse_expand_command', prorettype => 'text',
   proargtypes => 'text', prosrc => 'ddl_deparse_expand_command' },
+{ oid => '4644', descr => 'trigger for ddl command deparse',
+  proname => 'publication_ddl_deparse', prorettype => 'event_trigger',
+  proargtypes => '', prosrc => 'publication_ddl_deparse' },
+{ oid => '4645', descr => 'trigger for ddl command deparse start',
+  proname => 'publication_ddl_deparse_start', prorettype => 'event_trigger',
+  proargtypes => '', prosrc => 'publication_ddl_deparse_start' },
+{ oid => '4646', descr => 'trigger for ddl command deparse table rewrite',
+  proname => 'publication_ddl_deparse_table_rewrite', prorettype => 'event_trigger',
+  proargtypes => '', prosrc => 'publication_ddl_deparse_table_rewrite' },
 ]
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 48205ba..606c009 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -54,6 +54,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 
 	/* true if partition changes are published using root schema */
 	bool		pubviaroot;
+
+	/* true if table creations are published */
+	bool		pubddl;
 } FormData_pg_publication;
 
 /* ----------------
@@ -72,6 +75,7 @@ typedef struct PublicationActions
 	bool		pubupdate;
 	bool		pubdelete;
 	bool		pubtruncate;
+	bool		pubddl;
 } PublicationActions;
 
 typedef struct PublicationDesc
diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h
index 10091c3..fd2ee7f 100644
--- a/src/include/commands/event_trigger.h
+++ b/src/include/commands/event_trigger.h
@@ -71,7 +71,8 @@ extern void EventTriggerCollectSimpleCommand(ObjectAddress address,
 extern void EventTriggerAlterTableStart(Node *parsetree);
 extern void EventTriggerAlterTableRelid(Oid objectId);
 extern void EventTriggerCollectAlterTableSubcmd(Node *subcmd,
-												ObjectAddress address);
+												ObjectAddress address,
+												bool rewrite);
 extern void EventTriggerAlterTableEnd(void);
 
 extern void EventTriggerCollectGrant(InternalGrant *istmt);
diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h
new file mode 100644
index 0000000..172d091
--- /dev/null
+++ b/src/include/replication/ddlmessage.h
@@ -0,0 +1,57 @@
+/*-------------------------------------------------------------------------
+ * ddlmessage.h
+ *	   Exports from replication/logical/ddlmessage.c
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/ddlmessage.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_DDL_MESSAGE_H
+#define PG_LOGICAL_DDL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "nodes/nodes.h"
+
+
+/*
+ * Support for keeping track of deparsed commands.
+ */
+typedef enum DeparsedCommandType
+{
+	DCT_Create,
+	DCT_DropStart,
+	DCT_DropEnd,
+	DCT_Alter
+} DeparsedCommandType;
+
+/*
+ * Generic logical decoding DDL message wal record.
+ */
+typedef struct xl_logical_ddl_message
+{
+	Oid			dbId;			/* database Oid emitted from */
+	Size		prefix_size;	/* length of prefix */
+	Oid			relid;			/* id of the table */
+	NodeTag		cmdtype;/* type of sql command */
+	Size		message_size;	  /* size of the message */
+	/*
+	 * payload, including null-terminated prefix of length prefix_size
+	 */
+	char		message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_ddl_message;
+
+#define SizeOfLogicalDDLMessage	(offsetof(xl_logical_ddl_message, message))
+
+extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+									   const char *ddl_message, size_t size);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_DDL_MESSAGE	0x00
+void		logicalddlmsg_redo(XLogReaderState *record);
+void		logicalddlmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalddlmsg_identify(uint8 info);
+
+#endif
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 741bf65..427a7b9 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a771ab8..2abccb8 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
 	LOGICAL_REP_MSG_MESSAGE = 'M',
+	LOGICAL_REP_MSG_DDLMESSAGE = 'L',
 	LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
 	LOGICAL_REP_MSG_PREPARE = 'P',
 	LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -229,7 +230,11 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
 extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
-									 bool transactional, const char *prefix, Size sz, const char *message);
+									 bool transactional, const char *prefix,
+									 Size sz, const char *message);
+extern void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+										const char *prefix, Size sz, const char *message);
+extern char *logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix, Size *sz);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel, Bitmapset *columns);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 539dc8e..933d297 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -89,6 +89,18 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 										const char *message);
 
 /*
+ * Called for the logical decoding DDL messages.
+ */
+typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
+										   ReorderBufferTXN *txn,
+										   XLogRecPtr message_lsn,
+										   const char *prefix,
+										   Oid relid,
+										   DeparsedCommandType cmdtype,
+										   Size message_size,
+										   const char *message);
+
+/*
  * Filter changes by origin.
  */
 typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
@@ -200,6 +212,19 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
 											  const char *message);
 
 /*
+ * Callback for streaming logical decoding DDL messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
+												 ReorderBufferTXN *txn,
+												 XLogRecPtr message_lsn,
+												 const char *prefix,
+												 Oid relid,
+												 DeparsedCommandType cmdtype,
+												 Size message_size,
+												 const char *message);
+
+/*
  * Callback for streaming truncates from in-progress transactions.
  */
 typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
@@ -219,6 +244,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeDDLMessageCB ddlmessage_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
@@ -237,6 +263,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeStreamCommitCB stream_commit_cb;
 	LogicalDecodeStreamChangeCB stream_change_cb;
 	LogicalDecodeStreamMessageCB stream_message_cb;
+	LogicalDecodeStreamDDLMessageCB stream_ddlmessage_cb;
 	LogicalDecodeStreamTruncateCB stream_truncate_cb;
 } OutputPluginCallbacks;
 
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index eafedd6..8ebcc12 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -25,6 +25,7 @@ typedef struct PGOutputData
 	uint32		protocol_version;
 	List	   *publication_names;
 	List	   *publications;
+	List	   *deleted_relids;
 	bool		binary;
 	bool		streaming;
 	bool		messages;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4a01f87..3664873 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -11,6 +11,8 @@
 
 #include "access/htup_details.h"
 #include "lib/ilist.h"
+#include "nodes/nodes.h"
+#include "replication/ddlmessage.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
 #include "utils/relcache.h"
@@ -56,6 +58,7 @@ typedef enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INSERT,
 	REORDER_BUFFER_CHANGE_UPDATE,
 	REORDER_BUFFER_CHANGE_DELETE,
+	REORDER_BUFFER_CHANGE_DDLMESSAGE,
 	REORDER_BUFFER_CHANGE_MESSAGE,
 	REORDER_BUFFER_CHANGE_INVALIDATION,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
@@ -130,6 +133,16 @@ typedef struct ReorderBufferChange
 			char	   *message;
 		}			msg;
 
+		/* DDL Message. */
+		struct
+		{
+			char	   *prefix;
+			Size		message_size;
+			char	   *message;
+			Oid			relid;
+			NodeTag		cmdtype;
+		}			ddlmsg;
+
 		/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
 		Snapshot	snapshot;
 
@@ -430,6 +443,16 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* DDL message callback signature */
+typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb,
+										   ReorderBufferTXN *txn,
+										   XLogRecPtr message_lsn,
+										   const char *prefix,
+										   Oid relid,
+										   DeparsedCommandType cmdtype,
+										   Size sz,
+										   const char *message);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -496,6 +519,17 @@ typedef void (*ReorderBufferStreamMessageCB) (
 											  const char *prefix, Size sz,
 											  const char *message);
 
+/* stream DDL message callback signature */
+typedef void (*ReorderBufferStreamDDLMessageCB) (
+												 ReorderBuffer *rb,
+												 ReorderBufferTXN *txn,
+												 XLogRecPtr message_lsn,
+												 const char *prefix,
+												 Oid relid,
+												 DeparsedCommandType cmdtype,
+												 Size sz,
+												 const char *message);
+
 /* stream truncate callback signature */
 typedef void (*ReorderBufferStreamTruncateCB) (
 											   ReorderBuffer *rb,
@@ -541,6 +575,7 @@ struct ReorderBuffer
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
+	ReorderBufferDDLMessageCB ddlmessage;
 
 	/*
 	 * Callbacks to be called when streaming a transaction at prepare time.
@@ -560,6 +595,7 @@ struct ReorderBuffer
 	ReorderBufferStreamCommitCB stream_commit;
 	ReorderBufferStreamChangeCB stream_change;
 	ReorderBufferStreamMessageCB stream_message;
+	ReorderBufferStreamDDLMessageCB stream_ddlmessage;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
 	/*
@@ -635,6 +671,9 @@ extern void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
 extern void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+										 const char *prefix, Size message_size,
+										 const char *message, Oid relid, DeparsedCommandType cmdtype);
 extern void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
diff --git a/src/include/tcop/deparse_utility.h b/src/include/tcop/deparse_utility.h
index 94de13d..b53294b 100644
--- a/src/include/tcop/deparse_utility.h
+++ b/src/include/tcop/deparse_utility.h
@@ -62,6 +62,7 @@ typedef struct CollectedCommand
 		{
 			Oid			objectId;
 			Oid			classId;
+			bool		rewrite;
 			List	   *subcmds;
 		}			alterTable;
 
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index 2a38a93..b1921cc 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -5969,9 +5969,9 @@ List of schemas
 (0 rows)
 
 \dRp "no.such.publication"
-                              List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root 
-------+-------+------------+---------+---------+---------+-----------+----------
+                                 List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+------+-------+------------+---------+---------+---------+-----------+----------+------
 (0 rows)
 
 \dRs "no.such.subscription"
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 274b37d..09b14e0 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -30,20 +30,20 @@ ERROR:  conflicting or redundant options
 LINE 1: ...ub_xxx WITH (publish_via_partition_root = 'true', publish_vi...
                                                              ^
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f
+                                                  List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | f
+ testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f        | f
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f
+                                                  List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | f
+ testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f        | f
 (2 rows)
 
 --- adding tables
@@ -87,10 +87,10 @@ RESET client_min_messages;
 -- should be able to add schema to 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable ADD ALL TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl1"
 Tables from schemas:
@@ -99,20 +99,20 @@ Tables from schemas:
 -- should be able to drop schema from 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable DROP ALL TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl1"
 
 -- should be able to set schema to 'FOR TABLE' publication
 ALTER PUBLICATION testpub_fortable SET ALL TABLES IN SCHEMA pub_test;
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test"
 
@@ -134,10 +134,10 @@ ERROR:  relation "testpub_nopk" is not part of the publication
 -- should be able to set table to schema publication
 ALTER PUBLICATION testpub_forschema SET TABLE pub_test.testpub_nopk;
 \dRp+ testpub_forschema
-                               Publication testpub_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test.testpub_nopk"
 
@@ -159,10 +159,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                              Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | t          | t       | t       | f       | f         | f
+                                 Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | t          | t       | t       | f       | f         | f        | f
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -174,19 +174,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 RESET client_min_messages;
 \dRp+ testpub3
-                                    Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                       Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                                    Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                       Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl3"
 
@@ -207,10 +207,10 @@ UPDATE testpub_parted1 SET a = 1;
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_parted"
 
@@ -223,10 +223,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 UPDATE testpub_parted1 SET a = 1;
 ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | t
+                                   Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | t        | f
 Tables:
     "public.testpub_parted"
 
@@ -255,10 +255,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert');
 RESET client_min_messages;
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -271,10 +271,10 @@ Tables:
 
 ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -290,10 +290,10 @@ Publications:
 
 ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
@@ -301,10 +301,10 @@ Tables:
 -- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
 ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
 \dRp+ testpub5
-                                    Publication testpub5
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                       Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500))
 
@@ -337,10 +337,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert');
 RESET client_min_messages;
 \dRp+ testpub_syntax1
-                                Publication testpub_syntax1
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                    Publication testpub_syntax1
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "public.testpub_rf_tbl3" WHERE (e < 999)
@@ -350,10 +350,10 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert');
 RESET client_min_messages;
 \dRp+ testpub_syntax2
-                                Publication testpub_syntax2
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | f         | f
+                                    Publication testpub_syntax2
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | f         | f        | f
 Tables:
     "public.testpub_rf_tbl1"
     "testpub_rf_schema1.testpub_rf_tbl5" WHERE (h < 999)
@@ -676,10 +676,10 @@ CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate');
 RESET client_min_messages;
 ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a);		-- ok
 \dRp+ testpub_table_ins
-                               Publication testpub_table_ins
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | f       | f       | t         | f
+                                   Publication testpub_table_ins
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | f       | f       | t         | f        | f
 Tables:
     "public.testpub_tbl5" (a)
 
@@ -821,10 +821,10 @@ CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c));
 ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey;
 ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1);
 \dRp+ testpub_both_filters
-                              Publication testpub_both_filters
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                 Publication testpub_both_filters
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1)
 
@@ -1029,10 +1029,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                                 Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                    Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -1070,10 +1070,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                    Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -1151,10 +1151,10 @@ REVOKE CREATE ON DATABASE regression FROM regress_publication_user2;
 DROP TABLE testpub_parted;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                    Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | f
 (1 row)
 
 -- fail - must be owner of publication
@@ -1164,20 +1164,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                                           List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
--------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f
+                                              List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+-------------+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f        | f
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                             List of publications
-      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
------------------+---------------------------+------------+---------+---------+---------+-----------+----------
- testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f
+                                                 List of publications
+      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+------
+ testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f        | f
 (1 row)
 
 -- adding schemas and tables
@@ -1193,19 +1193,19 @@ CREATE TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"(id int);
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub1_forschema FOR ALL TABLES IN SCHEMA pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
 CREATE PUBLICATION testpub2_forschema FOR ALL TABLES IN SCHEMA pub_test1, pub_test2, pub_test3;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1219,44 +1219,44 @@ CREATE PUBLICATION testpub6_forschema FOR ALL TABLES IN SCHEMA "CURRENT_SCHEMA",
 CREATE PUBLICATION testpub_fortable FOR TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA";
 RESET client_min_messages;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "public"
 
 \dRp+ testpub4_forschema
-                               Publication testpub4_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub4_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "CURRENT_SCHEMA"
 
 \dRp+ testpub5_forschema
-                               Publication testpub5_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub5_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "CURRENT_SCHEMA"
     "public"
 
 \dRp+ testpub6_forschema
-                               Publication testpub6_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub6_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "CURRENT_SCHEMA"
     "public"
 
 \dRp+ testpub_fortable
-                                Publication testpub_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                   Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "CURRENT_SCHEMA.CURRENT_SCHEMA"
 
@@ -1290,10 +1290,10 @@ ERROR:  schema "testpub_view" does not exist
 -- dropping the schema should reflect the change in publication
 DROP SCHEMA pub_test3;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1301,20 +1301,20 @@ Tables from schemas:
 -- renaming the schema should reflect the change in publication
 ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1_renamed"
     "pub_test2"
 
 ALTER SCHEMA pub_test1_renamed RENAME to pub_test1;
 \dRp+ testpub2_forschema
-                               Publication testpub2_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub2_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1322,10 +1322,10 @@ Tables from schemas:
 -- alter publication add schema
 ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1334,10 +1334,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1346,10 +1346,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test1;
 ERROR:  schema "pub_test1" is already member of publication "testpub1_forschema"
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1357,10 +1357,10 @@ Tables from schemas:
 -- alter publication drop schema
 ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
@@ -1368,10 +1368,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2;
 ERROR:  tables from schema "pub_test2" are not part of the publication
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
@@ -1379,29 +1379,29 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
 -- drop all schemas
 ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 (1 row)
 
 -- alter publication set multiple schema
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test2;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1410,10 +1410,10 @@ Tables from schemas:
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schema;
 ERROR:  schema "non_existent_schema" does not exist
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
     "pub_test2"
@@ -1422,10 +1422,10 @@ Tables from schemas:
 -- removing the duplicate schemas
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
 \dRp+ testpub1_forschema
-                               Publication testpub1_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub1_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
@@ -1504,18 +1504,18 @@ SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub3_forschema;
 RESET client_min_messages;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 (1 row)
 
 ALTER PUBLICATION testpub3_forschema SET ALL TABLES IN SCHEMA pub_test1;
 \dRp+ testpub3_forschema
-                               Publication testpub3_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                  Publication testpub3_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables from schemas:
     "pub_test1"
 
@@ -1525,20 +1525,20 @@ CREATE PUBLICATION testpub_forschema_fortable FOR ALL TABLES IN SCHEMA pub_test1
 CREATE PUBLICATION testpub_fortable_forschema FOR TABLE pub_test2.tbl1, ALL TABLES IN SCHEMA pub_test1;
 RESET client_min_messages;
 \dRp+ testpub_forschema_fortable
-                           Publication testpub_forschema_fortable
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                              Publication testpub_forschema_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test2.tbl1"
 Tables from schemas:
     "pub_test1"
 
 \dRp+ testpub_fortable_forschema
-                           Publication testpub_fortable_forschema
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                              Publication testpub_fortable_forschema
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | DDLs 
+--------------------------+------------+---------+---------+---------+-----------+----------+------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | f
 Tables:
     "pub_test2.tbl1"
 Tables from schemas:
diff --git a/src/test/subscription/t/032_ddl_replication.pl b/src/test/subscription/t/032_ddl_replication.pl
new file mode 100644
index 0000000..c36a7c0
--- /dev/null
+++ b/src/test/subscription/t/032_ddl_replication.pl
@@ -0,0 +1,86 @@
+# Copyright (c) 2022, PostgreSQL Global Development Group
+# Regression tests for logical replication of DDLs
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'autovacuum = off');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', 'autovacuum = off');
+$node_subscriber->start;
+
+my $ddl = "CREATE TABLE test_rep(id int primary key, name varchar);";
+$node_publisher->safe_psql('postgres', $ddl);
+$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (1, 'data1');");
+$node_subscriber->safe_psql('postgres', $ddl);
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION mypub FOR ALL TABLES with (publish = 'insert, update, delete, ddl');");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
+);
+
+$node_publisher->wait_for_catchup('mysub');
+
+# Test simple CREATE TABLE command is replicated to subscriber
+# Test ALTER TABLE command is replicated on table test_rep
+$node_publisher->safe_psql('postgres', "CREATE TABLE t1 (a int, b varchar);");
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD c3 int;");
+$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (2, 'data2', 2);");
+
+$node_publisher->wait_for_catchup('mysub');
+
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t1");
+is($result, qq(0), 'CREATE of t1 replicated to subscriber');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_rep WHERE c3 =2;");
+is($result, qq(1), 'ALTER test_rep ADD replicated');
+
+# Test ALTER TABLE DROP
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep DROP c3;");
+$node_publisher->safe_psql('postgres', "DELETE FROM test_rep where id = 2;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep;");
+is($result, qq(1), 'ALTER test_rep DROP replicated');
+$node_publisher->wait_for_catchup('mysub');
+
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE t4 (a int, b varchar);");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't4';");
+is($result, qq(1), 'CREATE t4 is replicated');
+
+# Test DML changes on the new table t4 are replicated
+$node_publisher->safe_psql('postgres', "INSERT INTO t4 values (1, 'a')");
+$node_publisher->safe_psql('postgres', "INSERT INTO t4 values (2, 'b')");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t4;");
+is($result, qq(2), 'DML Changes to t4 are replicated');
+
+# Test DROP TABLE
+$node_publisher->safe_psql('postgres', "DROP TABLE t4;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't4';");
+is($result, qq(0), 'TABLE t4 is dropped');
+
+pass "DDL replication tests passed:";
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();
-- 
2.7.2.windows.1



  [application/octet-stream] v6-0003-support-CREATE-TABLE-AS-SELECT-INTO.patch (14.9K, 4-v6-0003-support-CREATE-TABLE-AS-SELECT-INTO.patch)
  download | inline diff:
From cdd09f1ac681b3d0d5bf88c0bfa920896dd4b3be Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Thu, 2 Jun 2022 13:32:39 +0800
Subject: [PATCH] support CREATE TABLE AS/SELECT INTO

The main idea of replicating the CREATE TABLE AS is that we deprase the CREATE
TABLE AS into a simple CREATE TABLE(without subquery) command and WAL log it
after creating the table and before writing data into the table and replicate
the incoming writes later as normal INSERTs. In this apporach, we don't execute
the subquery on subscriber so that don't need to make sure all the objects
referenced in the subquery also exists in subscriber. And This approach works
for all kind of commands(e.g. CRAETE TABLE AS [SELECT][EXECUTE][VALUES])

Introducing a new type of event trigger "table_create". which would be fired
for CREATE TABLE/CREATE TABLE AS/SELECT INTO after creating the table and
before any other modification. we deparse the command in the table_create event
trigger and WAL log the deparsed json string. The walsender will send the
string to subscriber. And incoming INSERTs will also be replicated.

---
 src/backend/commands/createas.c        |   3 +
 src/backend/commands/event_trigger.c   | 161 ++++++++++++++++++++++++++++++++-
 src/backend/commands/publicationcmds.c |  33 ++++++-
 src/backend/tcop/utility.c             |   7 ++
 src/backend/utils/cache/evtcache.c     |   2 +
 src/include/catalog/pg_proc.dat        |   3 +
 src/include/commands/event_trigger.h   |   4 +
 src/include/utils/evtcache.h           |   3 +-
 8 files changed, 208 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 9abbb6b..ae25d2a 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -34,6 +34,7 @@
 #include "catalog/namespace.h"
 #include "catalog/toasting.h"
 #include "commands/createas.h"
+#include "commands/event_trigger.h"
 #include "commands/matview.h"
 #include "commands/prepare.h"
 #include "commands/tablecmds.h"
@@ -143,6 +144,8 @@ create_ctas_internal(List *attrList, IntoClause *into)
 		StoreViewQuery(intoRelationAddr.objectId, query, false);
 		CommandCounterIncrement();
 	}
+	else
+		EventTriggerTableCreate((Node *) create, intoRelationAddr);
 
 	return intoRelationAddr;
 }
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 9bc2145..e77f01b 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -133,7 +133,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt)
 	if (strcmp(stmt->eventname, "ddl_command_start") != 0 &&
 		strcmp(stmt->eventname, "ddl_command_end") != 0 &&
 		strcmp(stmt->eventname, "sql_drop") != 0 &&
-		strcmp(stmt->eventname, "table_rewrite") != 0)
+		strcmp(stmt->eventname, "table_rewrite") != 0 &&
+		strcmp(stmt->eventname, "table_create") != 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_SYNTAX_ERROR),
 				 errmsg("unrecognized event name \"%s\"",
@@ -159,7 +160,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt)
 	/* Validate tag list, if any. */
 	if ((strcmp(stmt->eventname, "ddl_command_start") == 0 ||
 		 strcmp(stmt->eventname, "ddl_command_end") == 0 ||
-		 strcmp(stmt->eventname, "sql_drop") == 0)
+		 strcmp(stmt->eventname, "sql_drop") == 0 ||
+		 strcmp(stmt->eventname, "table_create") == 0)
 		&& tags != NULL)
 		validate_ddl_tags("tag", tags);
 	else if (strcmp(stmt->eventname, "table_rewrite") == 0
@@ -586,7 +588,8 @@ EventTriggerCommonSetup(Node *parsetree,
 		dbgtag = CreateCommandTag(parsetree);
 		if (event == EVT_DDLCommandStart ||
 			event == EVT_DDLCommandEnd ||
-			event == EVT_SQLDrop)
+			event == EVT_SQLDrop ||
+			event == EVT_TableCreate)
 		{
 			if (!command_tag_event_trigger_ok(dbgtag))
 				elog(ERROR, "unexpected command tag \"%s\"", GetCommandTagName(dbgtag));
@@ -869,6 +872,155 @@ EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason)
 	CommandCounterIncrement();
 }
 
+
+/*
+ * EventTriggerCreateTableStart
+ *		Prepare to receive data on an CREATE TABLE [AS] command about to be
+ * 		executed.
+ */
+void
+EventTriggerCreateTableStart(Node *parsetree)
+{
+	MemoryContext oldcxt;
+	CollectedCommand *command;
+
+	/* ignore if event trigger context not set, or collection disabled */
+	if (!currentEventTriggerState ||
+		currentEventTriggerState->commandCollectionInhibited)
+		return;
+
+	oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt);
+
+	command = palloc(sizeof(CollectedCommand));
+
+	command->type = SCT_Simple;
+	command->in_extension = creating_extension;
+	command->d.simple.address = InvalidObjectAddress;
+	command->d.simple.secondaryObject = InvalidObjectAddress;
+	command->parsetree = copyObject(parsetree);
+
+	command->parent = currentEventTriggerState->currentCommand;
+	currentEventTriggerState->currentCommand = command;
+
+	MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * EventTriggerCreateTableEnd
+ *		Finish up saving an CREATE TABLE [AS] command.
+ *
+ * FIXME this API isn't considering the possibility that an xact/subxact is
+ * aborted partway through.  Probably it's best to add an
+ * AtEOSubXact_EventTriggers() to fix this.
+ */
+void
+EventTriggerCreateTableEnd(void)
+{
+	CollectedCommand *parent;
+
+	/* ignore if event trigger context not set, or collection disabled */
+	if (!currentEventTriggerState ||
+		currentEventTriggerState->commandCollectionInhibited)
+		return;
+
+	parent = currentEventTriggerState->currentCommand->parent;
+
+	pfree(currentEventTriggerState->currentCommand);
+
+	currentEventTriggerState->currentCommand = parent;
+}
+
+/*
+ * publication_ddl_deparse_table_create
+ *
+ * Deparse the ddl table create command and log it.
+ */
+Datum
+publication_ddl_deparse_table_create(PG_FUNCTION_ARGS)
+{
+	CollectedCommand *cmd;
+	char        *json_string;
+
+	if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
+		elog(ERROR, "not fired by event trigger manager");
+
+	cmd = currentEventTriggerState->currentCommand;
+	Assert(cmd);
+
+	/* Deparse the DDL command and WAL log it to allow decoding of the same. */
+	json_string = deparse_utility_command(cmd);
+
+	if (json_string != NULL)
+		LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_Create,
+							 json_string, strlen(json_string) + 1);
+
+	return PointerGetDatum(NULL);
+}
+
+/*
+ * Fire table_rewrite triggers.
+ */
+void
+EventTriggerTableCreate(Node *parsetree, ObjectAddress address)
+{
+	List	   *runlist;
+	Node	   *old_parsetree;
+	EventTriggerData trigdata;
+
+	/*
+	 * See EventTriggerDDLCommandStart for a discussion about why event
+	 * triggers are disabled in single user mode.
+	 */
+	if (!IsUnderPostmaster)
+		return;
+
+	/*
+	 * Also do nothing if our state isn't set up, which it won't be if there
+	 * weren't any relevant event triggers at the start of the current DDL
+	 * command.  This test might therefore seem optional, but it's
+	 * *necessary*, because EventTriggerCommonSetup might find triggers that
+	 * didn't exist at the time the command started.
+	 */
+	if (!currentEventTriggerState)
+		return;
+
+	runlist = EventTriggerCommonSetup(currentEventTriggerState->currentCommand->parsetree,
+									  EVT_TableCreate,
+									  "table_create",
+									  &trigdata);
+	if (runlist == NIL)
+		return;
+
+	/*
+	 * Use PG_TRY to ensure parsetree is reset even when one trigger
+	 * fails. (This is perhaps not necessary, as the currentState variable will
+	 * be removed shortly by our caller, but it seems better to play safe.)
+	 */
+	old_parsetree = currentEventTriggerState->currentCommand->parsetree;
+	currentEventTriggerState->currentCommand->d.simple.address = address;
+	currentEventTriggerState->currentCommand->parsetree = parsetree;
+
+	/* Run the triggers. */
+	PG_TRY();
+	{
+		EventTriggerInvoke(runlist, &trigdata);
+	}
+	PG_FINALLY();
+	{
+		currentEventTriggerState->currentCommand->parsetree = old_parsetree;
+	}
+	PG_END_TRY();
+
+	/* Cleanup. */
+	list_free(runlist);
+
+	/*
+	 * Make sure anything the event triggers did will be visible to the main
+	 * command.
+	 */
+	CommandCounterIncrement();
+}
+
 /*
  * Invoke each event trigger in a list of event triggers.
  */
@@ -1149,7 +1301,8 @@ trackDroppedObjectsNeeded(void)
 	 */
 	return list_length(EventCacheLookup(EVT_SQLDrop)) > 0 ||
 		list_length(EventCacheLookup(EVT_TableRewrite)) > 0 ||
-		list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0;
+		list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0 ||
+		list_length(EventCacheLookup(EVT_TableCreate)) > 0;
 }
 
 /*
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 63ffc62..187e326 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -762,7 +762,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 	Relation	rel;
 	ObjectAddress myself;
 	ObjectAddress referenced_start, referenced_end,
-				  referenced_table_rewrite;
+				  referenced_table_rewrite,
+				  referenced_table_create;
 	Oid			puboid;
 	bool		nulls[Natts_pg_publication];
 	Datum		values[Natts_pg_publication];
@@ -904,20 +905,25 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 	if (pubactions.pubddl)
 	{
 		CreateEventTrigStmt *ddl_trigg_start, *ddl_trigg_end,
-							*ddl_trigg_table_rewrite;
+							*ddl_trigg_table_rewrite,
+							*ddl_trigg_table_create;
 		Node				*end_arg1 = NULL;
 		Node				*start_arg1 = NULL;
 		Node				*table_rewrite_arg1 = NULL;
+		Node				*table_create_arg1 = NULL;
 		Node				*arg2 = NULL;
 		Node				*arg3 = NULL;
 		List				*end_tags = NIL;
 		List				*start_tags = NIL;
 		List				*table_rewrite_tags = NIL;
+		List				*table_create_tags = NIL;
 		Oid					event_trig_start_id, event_trig_end_id,
-							event_trig_table_rewrite_id;
+							event_trig_table_rewrite_id,
+							event_trig_table_create_id;
 		char				trigger_name_start[NAMEDATALEN];
 		char				trigger_name_end[NAMEDATALEN];
 		char				trigger_name_table_rewrite[NAMEDATALEN];
+		char				trigger_name_table_create[NAMEDATALEN];
 
 		ddl_trigg_end = makeNode(CreateEventTrigStmt);
 
@@ -969,6 +975,24 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 		ddl_trigg_table_rewrite->whenclause = list_make1(makeDefElem("tag", (Node *) table_rewrite_tags, -1));
 		event_trig_table_rewrite_id = CreateEventTrigger(ddl_trigg_table_rewrite);
 
+		/* Trigger for table_create */
+		ddl_trigg_table_create = makeNode(CreateEventTrigStmt);
+
+		snprintf(trigger_name_table_create, sizeof(trigger_name_table_create),
+				 "pg_deparse_trig_table_create_%u", puboid);
+		ddl_trigg_table_create->trigname = pstrdup(trigger_name_table_create);
+		ddl_trigg_table_create->eventname = "table_create";
+		ddl_trigg_table_create->funcname = SystemFuncName("publication_ddl_deparse_table_create");
+
+		table_create_arg1 = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_CREATE_TABLE_AS)));
+		table_create_tags = list_make1(table_create_arg1);
+		table_create_arg1 = (Node *) makeString(pstrdup(GetCommandTagName(CMDTAG_SELECT_INTO)));
+		table_create_tags = lappend(table_create_tags, table_create_arg1);
+
+		ddl_trigg_table_create->whenclause = list_make1(makeDefElem("tag", (Node *) table_create_tags, -1));
+		event_trig_table_create_id = CreateEventTrigger(ddl_trigg_table_create);
+
+
 		/*
 		 * Register the event triggers as internally dependent on the
 		 * publication.
@@ -981,6 +1005,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 
 		ObjectAddressSet(referenced_table_rewrite, EventTriggerRelationId, event_trig_table_rewrite_id);
 		recordDependencyOn(&referenced_table_rewrite, &myself, DEPENDENCY_INTERNAL);
+
+		ObjectAddressSet(referenced_table_create, EventTriggerRelationId, event_trig_table_create_id);
+		recordDependencyOn(&referenced_table_create, &myself, DEPENDENCY_INTERNAL);
 	}
 
 	table_close(rel, RowExclusiveLock);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 6a5bcde..c16f804 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1164,6 +1164,8 @@ ProcessUtilitySlow(ParseState *pstate,
 							Datum		toast_options;
 							static char *validnsps[] = HEAP_RELOPT_NAMESPACES;
 
+							EventTriggerCreateTableStart(parsetree);
+
 							/* Remember transformed RangeVar for LIKE */
 							table_rv = cstmt->relation;
 
@@ -1198,6 +1200,9 @@ ProcessUtilitySlow(ParseState *pstate,
 
 							NewRelationCreateToastTable(address.objectId,
 														toast_options);
+
+							EventTriggerTableCreate((Node *) cstmt, address);
+							EventTriggerCreateTableEnd();
 						}
 						else if (IsA(stmt, CreateForeignTableStmt))
 						{
@@ -1665,8 +1670,10 @@ ProcessUtilitySlow(ParseState *pstate,
 				break;
 
 			case T_CreateTableAsStmt:
+				EventTriggerCreateTableStart(parsetree);
 				address = ExecCreateTableAs(pstate, (CreateTableAsStmt *) parsetree,
 											params, queryEnv, qc);
+				EventTriggerCreateTableEnd();
 				break;
 
 			case T_RefreshMatViewStmt:
diff --git a/src/backend/utils/cache/evtcache.c b/src/backend/utils/cache/evtcache.c
index 3a9c9f0..f4c8b73 100644
--- a/src/backend/utils/cache/evtcache.c
+++ b/src/backend/utils/cache/evtcache.c
@@ -167,6 +167,8 @@ BuildEventTriggerCache(void)
 			event = EVT_SQLDrop;
 		else if (strcmp(evtevent, "table_rewrite") == 0)
 			event = EVT_TableRewrite;
+		else if (strcmp(evtevent, "table_create") == 0)
+			event = EVT_TableCreate;
 		else
 			continue;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6396566..f55f455 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11899,4 +11899,7 @@
 { oid => '4646', descr => 'trigger for ddl command deparse table rewrite',
   proname => 'publication_ddl_deparse_table_rewrite', prorettype => 'event_trigger',
   proargtypes => '', prosrc => 'publication_ddl_deparse_table_rewrite' },
+{ oid => '4647', descr => 'trigger for ddl command deparse table rewrite',
+  proname => 'publication_ddl_deparse_table_create', prorettype => 'event_trigger',
+  proargtypes => '', prosrc => 'publication_ddl_deparse_table_create' },
 ]
diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h
index fd2ee7f..d617e94 100644
--- a/src/include/commands/event_trigger.h
+++ b/src/include/commands/event_trigger.h
@@ -55,6 +55,10 @@ extern void EventTriggerDDLCommandEnd(Node *parsetree);
 extern void EventTriggerSQLDrop(Node *parsetree);
 extern void EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason);
 
+extern void EventTriggerCreateTableStart(Node *parsetree);
+extern void EventTriggerTableCreate(Node *parsetree, ObjectAddress address);
+extern void EventTriggerCreateTableEnd(void);
+
 extern bool EventTriggerBeginCompleteQuery(void);
 extern void EventTriggerEndCompleteQuery(void);
 extern bool trackDroppedObjectsNeeded(void);
diff --git a/src/include/utils/evtcache.h b/src/include/utils/evtcache.h
index ddb67a6..070c6b9 100644
--- a/src/include/utils/evtcache.h
+++ b/src/include/utils/evtcache.h
@@ -22,7 +22,8 @@ typedef enum
 	EVT_DDLCommandStart,
 	EVT_DDLCommandEnd,
 	EVT_SQLDrop,
-	EVT_TableRewrite
+	EVT_TableRewrite,
+	EVT_TableCreate
 } EventTriggerEvent;
 
 typedef struct
-- 
2.7.2.windows.1



reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: pgsql-general@postgresql.org
  Cc: houzj.fnst@fujitsu.com, sawada.mshk@gmail.com, amit.kapila16@gmail.com, zhengli10@gmail.com, japinli@hotmail.com, alvherre@alvh.no-ip.org, dilipbalaut@gmail.com, rajesh.rs0541@gmail.com, pgsql-hackers@lists.postgresql.org
  Subject: RE: Support logical replication of DDLs
  In-Reply-To: <OS0PR01MB571695EDF9EAB2422FBF2C1094DE9@OS0PR01MB5716.jpnprd01.prod.outlook.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox