From 2c4ae328ebd6d68bd7bf3c3d144ee7b42e7cf034 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Fri, 24 Mar 2023 17:50:27 +0530
Subject: [PATCH 8/8] Allow replicated objects to have the same owner from the
 publisher. Specifically, the changes include:

1. Change event trigger functions to collect the current role in CollectedCommand.

2. Change Deparser function deparse_utility_command to encode owner role in the top-level
element such as {myowner:role_name, fmt:..., identity:...} of the deparsed jsonb output
for commands that create database objects. Also change function deparse_ddl_json_to_string
to retrieve the myowner element from a jsonb string.

3. Introduce a new subscription option match_ddl_owner: when turned on, the apply worker
will apply DDL messages in the role retrieved from the "myowner" field of the deparsed
jsonb string. The default value of match_ddl_owner is on.
---
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/commands/ddl_deparse.c            |  86 +++++++++--
 src/backend/commands/ddl_json.c               |  25 ++-
 src/backend/commands/event_trigger.c          |   6 +
 src/backend/commands/subscriptioncmds.c       |  27 +++-
 src/backend/replication/logical/ddltrigger.c  |   6 +-
 src/backend/replication/logical/worker.c      |  22 ++-
 src/bin/pg_dump/pg_dump.c                     |  16 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   8 +-
 src/include/catalog/pg_subscription.h         |   5 +
 src/include/tcop/ddl_deparse.h                |   4 +-
 src/include/tcop/deparse_utility.h            |   1 +
 src/test/regress/expected/subscription.out    | 144 +++++++++---------
 .../subscription/t/033_ddl_replication.pl     |  17 +++
 15 files changed, 269 insertions(+), 100 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..022aab0fa9 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->matchddlowner = subform->submatchddlowner;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/ddl_deparse.c b/src/backend/commands/ddl_deparse.c
index bc2bb22cce..d4cec4c5c8 100644
--- a/src/backend/commands/ddl_deparse.c
+++ b/src/backend/commands/ddl_deparse.c
@@ -172,7 +172,7 @@ static ObjElem *new_object_object(ObjTree *value);
 static ObjTree *new_objtree_VA(char *fmt, int numobjs,...);
 static ObjTree *new_objtree(char *fmt);
 static ObjElem *new_string_object(char *value);
-static JsonbValue *objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state);
+static JsonbValue *objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state, char *owner);
 static void pg_get_indexdef_detailed(Oid indexrelid,
 									 char **index_am,
 									 char **definition,
@@ -993,14 +993,39 @@ objtree_fmt_to_jsonb_element(JsonbParseState *state, ObjTree *tree)
 }
 
 /*
- * Create a JSONB representation from an ObjTree.
+ * Process the role string into the output parse state.
+ */
+static void
+role_to_jsonb_element(JsonbParseState *state, char *owner)
+{
+	JsonbValue	key;
+	JsonbValue	val;
+
+	if (owner == NULL)
+		return;
+
+	/* Push the key first */
+	key.type = jbvString;
+	key.val.string.val = "myowner";
+	key.val.string.len = strlen(key.val.string.val);
+	pushJsonbValue(&state, WJB_KEY, &key);
+
+	/* Then process the role string */
+	val.type = jbvString;
+	val.val.string.len = strlen(owner);
+	val.val.string.val = owner;
+	pushJsonbValue(&state, WJB_VALUE, &val);
+}
+
+/*
+ * Create a JSONB representation from an ObjTree and its owner (if given).
  */
 static Jsonb *
-objtree_to_jsonb(ObjTree *tree)
+objtree_to_jsonb(ObjTree *tree, char *owner)
 {
 	JsonbValue *value;
 
-	value = objtree_to_jsonb_rec(tree, NULL);
+	value = objtree_to_jsonb_rec(tree, NULL, owner);
 	return JsonbValueToJsonb(value);
 }
 
@@ -1052,7 +1077,7 @@ objtree_to_jsonb_element(JsonbParseState *state, ObjElem *object,
 
 		case ObjTypeObject:
 			/* Recursively add the object into the existing parse state */
-			objtree_to_jsonb_rec(object->value.object, state);
+			objtree_to_jsonb_rec(object->value.object, state, NULL);
 			break;
 
 		case ObjTypeArray:
@@ -1080,12 +1105,13 @@ objtree_to_jsonb_element(JsonbParseState *state, ObjElem *object,
  * Recursive helper for objtree_to_jsonb.
  */
 static JsonbValue *
-objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state)
+objtree_to_jsonb_rec(ObjTree *tree, JsonbParseState *state, char *owner)
 {
 	slist_iter	iter;
 
 	pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
 
+	role_to_jsonb_element(state, owner);
 	objtree_fmt_to_jsonb_element(state, tree);
 
 	slist_foreach(iter, &tree->params)
@@ -3696,7 +3722,7 @@ deparse_drop_command(const char *objidentity, const char *objecttype,
 							"present", ObjTypeBool, behavior == DROP_CASCADE);
 	append_object_object(stmt, "%{cascade}s", tmp_obj);
 
-	jsonb = objtree_to_jsonb(stmt);
+	jsonb = objtree_to_jsonb(stmt, NULL /* Owner/role can be skipped for drop command */);
 	command = JsonbToCString(&str, &jsonb->root, JSONB_ESTIMATED_LEN);
 
 	return command;
@@ -9813,7 +9839,7 @@ deparse_AlterPublicationDropStmt(SQLDropObject *obj)
 	append_object_object(alterpub, "%{drop_object}s", drop_object);
 
 	initStringInfo(&str);
-	jsonb = objtree_to_jsonb(alterpub);
+	jsonb = objtree_to_jsonb(alterpub, NULL /* Owner/role can be skipped for drop command */);
 	command = JsonbToCString(&str, &jsonb->root, JSONB_ESTIMATED_LEN);
 
 	return command;
@@ -9825,7 +9851,7 @@ deparse_AlterPublicationDropStmt(SQLDropObject *obj)
  * This function should cover all cases handled in ProcessUtilitySlow.
  */
 static ObjTree *
-deparse_simple_command(CollectedCommand *cmd)
+deparse_simple_command(CollectedCommand *cmd, bool *include_owner)
 {
 	Oid			objectId;
 	Node	   *parsetree;
@@ -9842,64 +9868,83 @@ deparse_simple_command(CollectedCommand *cmd)
 	switch (nodeTag(parsetree))
 	{
 		case T_AlterCollationStmt:
+			*include_owner = false;
 			return deparse_AlterCollation(objectId, parsetree);
 
 		case T_AlterDomainStmt:
+			*include_owner = false;
 			return deparse_AlterDomainStmt(objectId, parsetree,
 										   cmd->d.simple.secondaryObject);
 
 		case T_AlterEnumStmt:
+			*include_owner = false;
 			return deparse_AlterEnumStmt(objectId, parsetree);
 
 		case T_AlterExtensionContentsStmt:
+			*include_owner = false;
 			return deparse_AlterExtensionContentsStmt(objectId, parsetree,
 													  cmd->d.simple.secondaryObject);
 
 		case T_AlterExtensionStmt:
+			*include_owner = false;
 			return deparse_AlterExtensionStmt(objectId, parsetree);
 
 		case T_AlterFdwStmt:
+			*include_owner = false;
 			return deparse_AlterFdwStmt(objectId, parsetree);
 
 		case T_AlterForeignServerStmt:
+			*include_owner = false;
 			return deparse_AlterForeignServerStmt(objectId, parsetree);
 
 		case T_AlterFunctionStmt:
+			*include_owner = false;
 			return deparse_AlterFunction(objectId, parsetree);
 
 		case T_AlterObjectDependsStmt:
+			*include_owner = false;
 			return deparse_AlterDependStmt(objectId, parsetree);
 
 		case T_AlterObjectSchemaStmt:
+			*include_owner = false;
 			return deparse_AlterObjectSchemaStmt(cmd->d.simple.address,
 												 parsetree,
 												 cmd->d.simple.secondaryObject);
 
 		case T_AlterOperatorStmt:
+			*include_owner = false;
 			return deparse_AlterOperatorStmt(objectId, parsetree);
 
 		case T_AlterOwnerStmt:
+			*include_owner = false;
 			return deparse_AlterOwnerStmt(cmd->d.simple.address, parsetree);
 
 		case T_AlterPolicyStmt:
+			*include_owner = false;
 			return deparse_AlterPolicyStmt(objectId, parsetree);
 
 		case T_AlterSeqStmt:
+			*include_owner = false;
 			return deparse_AlterSeqStmt(objectId, parsetree);
 
 		case T_AlterStatsStmt:
+			*include_owner = false;
 			return deparse_AlterStatsStmt(objectId, parsetree);
 
 		case T_AlterTSDictionaryStmt:
+			*include_owner = false;
 			return deparse_AlterTSDictionaryStmt(objectId, parsetree);
 
 		case T_AlterTypeStmt:
+			*include_owner = false;
 			return deparse_AlterTypeSetStmt(objectId, parsetree);
 
 		case T_AlterUserMappingStmt:
+			*include_owner = false;
 			return deparse_AlterUserMappingStmt(objectId, parsetree);
 
 		case T_CommentStmt:
+			*include_owner = false;
 			return deparse_CommentStmt(cmd->d.simple.address, parsetree);
 
 		case T_CompositeTypeStmt:
@@ -9979,9 +10024,11 @@ deparse_simple_command(CollectedCommand *cmd)
 			return deparse_IndexStmt(objectId, parsetree);
 
 		case T_RefreshMatViewStmt:
+			*include_owner = false;
 			return deparse_RefreshMatViewStmt(objectId, parsetree);
 
 		case T_RenameStmt:
+			*include_owner = false;
 			return deparse_RenameStmt(cmd->d.simple.address, parsetree);
 
 		case T_RuleStmt:
@@ -10006,9 +10053,15 @@ deparse_simple_command(CollectedCommand *cmd)
 
 /*
  * Workhorse to deparse a CollectedCommand.
+ *
+ * include_owner indicates if the owner/role of the command should be
+ * included in the deparsed Json output. It is set to false for any commands
+ * that don't CREATE database objects (ALTER commands for example), this is
+ * to avoid encoding and sending the owner to downstream for replay as it is
+ * unnecessary for such commands.
  */
 char *
-deparse_utility_command(CollectedCommand *cmd, bool verbose_mode)
+deparse_utility_command(CollectedCommand *cmd, bool include_owner, bool verbose_mode)
 {
 	OverrideSearchPath *overridePath;
 	MemoryContext oldcxt;
@@ -10049,30 +10102,36 @@ deparse_utility_command(CollectedCommand *cmd, bool verbose_mode)
 	switch (cmd->type)
 	{
 		case SCT_Simple:
-			tree = deparse_simple_command(cmd);
+			tree = deparse_simple_command(cmd, &include_owner);
 			break;
 		case SCT_AlterTable:
 			tree = deparse_AlterRelation(cmd);
+			include_owner = false;
 			break;
 		case SCT_Grant:
 			tree = deparse_GrantStmt(cmd);
+			include_owner = false;
 			break;
 		case SCT_CreateTableAs:
 			tree = deparse_CreateTableAsStmt(cmd);
 			break;
 		case SCT_AlterOpFamily:
+			include_owner = false;
 			tree = deparse_AlterOpFamily(cmd);
 			break;
 		case SCT_CreateOpClass:
 			tree = deparse_CreateOpClassStmt(cmd);
 			break;
 		case SCT_AlterDefaultPrivileges:
+			include_owner = false;
 			tree = deparse_AlterDefaultPrivilegesStmt(cmd);
 			break;
 		case SCT_AlterTSConfig:
+			include_owner = false;
 			tree = deparse_AlterTSConfigurationStmt(cmd);
 			break;
 		case SCT_SecurityLabel:
+			include_owner = false;
 			tree = deparse_SecLabelStmt(cmd);
 			break;
 		default:
@@ -10085,7 +10144,8 @@ deparse_utility_command(CollectedCommand *cmd, bool verbose_mode)
 	{
 		Jsonb	   *jsonb;
 
-		jsonb = objtree_to_jsonb(tree);
+		jsonb = include_owner ? objtree_to_jsonb(tree, cmd->role) :
+								objtree_to_jsonb(tree, NULL);
 		command = JsonbToCString(&str, &jsonb->root, JSONB_ESTIMATED_LEN);
 	}
 
@@ -10111,7 +10171,7 @@ ddl_deparse_to_json(PG_FUNCTION_ARGS)
 	CollectedCommand *cmd = (CollectedCommand *) PG_GETARG_POINTER(0);
 	char	   *command;
 
-	command = deparse_utility_command(cmd, true);
+	command = deparse_utility_command(cmd, false, true);
 
 	if (command)
 		PG_RETURN_TEXT_P(cstring_to_text(command));
diff --git a/src/backend/commands/ddl_json.c b/src/backend/commands/ddl_json.c
index 3a57d2697c..76cefb9487 100644
--- a/src/backend/commands/ddl_json.c
+++ b/src/backend/commands/ddl_json.c
@@ -718,7 +718,7 @@ expand_jsonb_array(StringInfo buf, char *param,
  * Workhorse for ddl_deparse_expand_command.
  */
 char *
-deparse_ddl_json_to_string(char *json_str)
+deparse_ddl_json_to_string(char *json_str, char** owner)
 {
 	Datum		d;
 	Jsonb	   *jsonb;
@@ -729,6 +729,27 @@ deparse_ddl_json_to_string(char *json_str)
 	d = DirectFunctionCall1(jsonb_in, PointerGetDatum(json_str));
 	jsonb = (Jsonb *) DatumGetPointer(d);
 
+	if (owner != NULL)
+	{
+		const char *key = "myowner";
+		JsonbValue *value;
+
+		value = getKeyJsonValueFromContainer(&jsonb->root, key, strlen(key), NULL);
+		if (value)
+		{
+			char *str;
+
+			/* value->val.string.val may not be NULL terminated */
+			str = palloc(value->val.string.len + 1);
+			memcpy(str, value->val.string.val, value->val.string.len);
+			str[value->val.string.len] = '\0';
+			*owner = str;
+		}
+		else
+			/* myowner is not given in this jsonb, e.g. for Drop Commands */
+			*owner = NULL;
+	}
+
 	expand_fmt_recursive(buf, &jsonb->root);
 
 	return buf->data;
@@ -765,7 +786,7 @@ ddl_deparse_expand_command(PG_FUNCTION_ARGS)
 
 	json_str = text_to_cstring(json);
 
-	PG_RETURN_TEXT_P(cstring_to_text(deparse_ddl_json_to_string(json_str)));
+	PG_RETURN_TEXT_P(cstring_to_text(deparse_ddl_json_to_string(json_str, NULL)));
 }
 
 /*
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index b3f6849005..275c4000c6 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -882,6 +882,7 @@ EventTriggerTableInitWriteStart(Node *parsetree)
 
 	command->type = (stmt->objtype == OBJECT_TABLE) ? SCT_CreateTableAs : SCT_Simple;
 	command->in_extension = creating_extension;
+	command->role = GetUserNameFromId(GetUserId(), false);
 	command->d.ctas.address = InvalidObjectAddress;
 	command->d.ctas.real_create = NULL;
 	command->parsetree = copyObject(parsetree);
@@ -1627,6 +1628,7 @@ EventTriggerCollectSimpleCommand(ObjectAddress address,
 
 	command->type = SCT_Simple;
 	command->in_extension = creating_extension;
+	command->role = GetUserNameFromId(GetUserId(), false);
 
 	command->d.simple.address = address;
 	command->d.simple.secondaryObject = secondaryObject;
@@ -1663,6 +1665,7 @@ EventTriggerAlterTableStart(Node *parsetree)
 
 	command->type = SCT_AlterTable;
 	command->in_extension = creating_extension;
+	command->role = GetUserNameFromId(GetUserId(), false);
 
 	command->d.alterTable.classId = RelationRelationId;
 	command->d.alterTable.objectId = InvalidOid;
@@ -1930,6 +1933,7 @@ EventTriggerCollectGrant(InternalGrant *istmt)
 	command = palloc(sizeof(CollectedCommand));
 	command->type = SCT_Grant;
 	command->in_extension = creating_extension;
+	command->role = GetUserNameFromId(GetUserId(), false);
 	command->d.grant.istmt = icopy;
 	command->parsetree = NULL;
 
@@ -1961,6 +1965,7 @@ EventTriggerCollectAlterOpFam(AlterOpFamilyStmt *stmt, Oid opfamoid,
 	command = palloc(sizeof(CollectedCommand));
 	command->type = SCT_AlterOpFamily;
 	command->in_extension = creating_extension;
+	command->role = GetUserNameFromId(GetUserId(), false);
 	ObjectAddressSet(command->d.opfam.address,
 					 OperatorFamilyRelationId, opfamoid);
 	command->d.opfam.operators = operators;
@@ -2091,6 +2096,7 @@ EventTriggerCollectSecLabel(ObjectAddress address, char *provider,
 	command = palloc0(sizeof(CollectedCommand));
 	command->type = SCT_SecurityLabel;
 	command->in_extension = creating_extension;
+	command->role = GetUserNameFromId(GetUserId(), false);
 	command->d.seclabel.address = address;
 	command->d.seclabel.provider = provider;
 	command->parsetree = (Node *) copyObject(stmt);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..f7bb73843e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_MATCH_DDL_OWNER		0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -89,6 +90,7 @@ typedef struct SubOpts
 	bool		twophase;
 	bool		disableonerr;
 	char	   *origin;
+	bool		matchddlowner;
 	XLogRecPtr	lsn;
 } SubOpts;
 
@@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_MATCH_DDL_OWNER))
+		opts->matchddlowner = true;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_MATCH_DDL_OWNER) &&
+				 strcmp(defel->defname, "match_ddl_owner") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_MATCH_DDL_OWNER))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MATCH_DDL_OWNER;
+			opts->matchddlowner = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -560,7 +573,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_MATCH_DDL_OWNER);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -649,6 +663,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		publicationListToArray(publications);
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
+	values[Anum_pg_subscription_submatchddlowner - 1] = BoolGetDatum(opts.matchddlowner);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -1054,7 +1069,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_MATCH_DDL_OWNER);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1118,6 +1133,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_suborigin - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_MATCH_DDL_OWNER))
+				{
+					values[Anum_pg_subscription_submatchddlowner - 1]
+						= BoolGetDatum(opts.matchddlowner);
+					replaces[Anum_pg_subscription_submatchddlowner - 1]
+						= true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/logical/ddltrigger.c b/src/backend/replication/logical/ddltrigger.c
index d68a50708d..5ccfefe089 100644
--- a/src/backend/replication/logical/ddltrigger.c
+++ b/src/backend/replication/logical/ddltrigger.c
@@ -122,7 +122,7 @@ publication_deparse_table_rewrite(PG_FUNCTION_ARGS)
 	if (relpersist != RELPERSISTENCE_TEMP)
 	{
 		/* Deparse the DDL command and WAL log it to allow decoding of the same. */
-		json_string = deparse_utility_command(cmd, false);
+		json_string = deparse_utility_command(cmd, true, false);
 
 		if (json_string != NULL)
 			LogLogicalDDLMessage("deparse", cmd->d.alterTable.objectId, DCT_TableAlter,
@@ -200,7 +200,7 @@ publication_deparse_ddl_command_end(PG_FUNCTION_ARGS)
 			 * Deparse the DDL command and WAL log it to allow decoding of the
 			 * same.
 			 */
-			json_string = deparse_utility_command(cmd, false);
+			json_string = deparse_utility_command(cmd, true, false);
 
 			if (json_string != NULL)
 				LogLogicalDDLMessage("deparse", relid, type, json_string,
@@ -355,7 +355,7 @@ publication_deparse_table_init_write(PG_FUNCTION_ARGS)
 		return PointerGetDatum(NULL);
 
 	/* Deparse the DDL command and WAL log it to allow decoding of the same. */
-	json_string = deparse_utility_command(cmd, false);
+	json_string = deparse_utility_command(cmd, true, false);
 
 	if (json_string != NULL)
 		LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_SimpleCmd,
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9e053a3d83..6710893499 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3349,21 +3349,33 @@ apply_handle_ddl(StringInfo s)
 	const char *prefix = NULL;
 	char	   *message = NULL;
 	char	   *ddl_command;
+	char	   *owner;
 	Size		sz;
 	List	   *parsetree_list;
 	ListCell   *parsetree_item;
 	DestReceiver *receiver;
 	MemoryContext oldcontext;
 	const char *save_debug_query_string = debug_query_string;
+	int			save_nestlevel = 0;
 
 	message = logicalrep_read_ddl(s, &lsn, &prefix, &sz);
 
 	/* Make sure we are in a transaction command */
 	begin_replication_step();
 
-	ddl_command = deparse_ddl_json_to_string(message);
+	ddl_command = deparse_ddl_json_to_string(message, &owner);
 	debug_query_string = ddl_command;
 
+	if (MySubscription->matchddlowner && owner)
+	{
+		/*
+		 * Set the current role to the owner that executed the command on the
+		 * publication server.
+		 */
+		save_nestlevel = NewGUCNestLevel();
+		SetConfigOption("role", owner, PGC_INTERNAL, PGC_S_OVERRIDE);
+	}
+
 	/* DestNone for logical replication */
 	receiver = CreateDestReceiver(DestNone);
 	parsetree_list = pg_parse_query(ddl_command);
@@ -3461,6 +3473,14 @@ apply_handle_ddl(StringInfo s)
 		MemoryContextDelete(per_parsetree_context);
 	}
 
+	/*
+	 * Restore the GUC variables we set above.
+	 */
+	if (save_nestlevel > 0)
+	{
+		AtEOXact_GUC(true, save_nestlevel);
+	}
+
 	debug_query_string = save_debug_query_string;
 	end_replication_step();
 }
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 292849364b..d2974595f6 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4634,6 +4634,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_submatchddlowner;
 	int			i,
 				ntups;
 
@@ -4686,9 +4687,14 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+		appendPQExpBufferStr(query,
+							 " s.suborigin,\n"
+							 " s.submatchddlowner\n");
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBuffer(query,
+						  " '%s' AS suborigin,\n"
+						  " false AS submatchddlowner\n",
+						  LOGICALREP_ORIGIN_ANY);
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4716,6 +4722,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_submatchddlowner = PQfnumber(res, "submatchddlowner");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4746,6 +4753,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].submatchddlowner =
+			pg_strdup(PQgetvalue(res, i, i_submatchddlowner));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4824,6 +4833,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
 		appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
 
+	if (strcmp(subinfo->submatchddlowner, "f") == 0)
+		appendPQExpBufferStr(query, ", match_ddl_owner = false");
+
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ed23883cdb..d613ac559e 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -665,6 +665,7 @@ typedef struct _SubscriptionInfo
 	char	   *suborigin;
 	char	   *subsynccommit;
 	char	   *subpublications;
+	char	   *submatchddlowner;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 48bf37a94f..69b37f2afc 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6521,7 +6521,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6578,8 +6578,10 @@ describeSubscriptions(const char *pattern, bool verbose)
 
 		if (pset.sversion >= 160000)
 			appendPQExpBuffer(&buf,
-							  ", suborigin AS \"%s\"\n",
-							  gettext_noop("Origin"));
+							  ", suborigin AS \"%s\"\n"
+							  ", submatchddlowner AS \"%s\"\n",
+							  gettext_noop("Origin"),
+							  gettext_noop("Match DDL owner"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..17af7c7750 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -88,6 +88,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subdisableonerr;	/* True if a worker error should cause the
 									 * subscription to be disabled */
 
+	bool		submatchddlowner;	/* True if replicated objects by DDL replication
+									 * should match the original owner on the publisher */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -137,6 +140,8 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	bool		matchddlowner;  /* Indicates if replicated objects by DDL replication
+								 * should match the original owner on the publisher */
 } Subscription;
 
 /* Disallow streaming in-progress transactions. */
diff --git a/src/include/tcop/ddl_deparse.h b/src/include/tcop/ddl_deparse.h
index e261a3e621..581a470c09 100644
--- a/src/include/tcop/ddl_deparse.h
+++ b/src/include/tcop/ddl_deparse.h
@@ -15,8 +15,8 @@
 #include "commands/event_trigger.h"
 #include "tcop/deparse_utility.h"
 
-extern char *deparse_utility_command(CollectedCommand *cmd, bool verbose_mode);
-extern char *deparse_ddl_json_to_string(char *jsonb);
+extern char *deparse_utility_command(CollectedCommand *cmd, bool include_owner, bool verbose_mode);
+extern char *deparse_ddl_json_to_string(char *jsonb, char** owner);
 extern char *deparse_drop_command(const char *objidentity, const char *objecttype,
 								  DropBehavior behavior);
 extern char * deparse_AlterPublicationDropStmt(SQLDropObject *obj);
diff --git a/src/include/tcop/deparse_utility.h b/src/include/tcop/deparse_utility.h
index fbd09763bf..cce3c07fc0 100644
--- a/src/include/tcop/deparse_utility.h
+++ b/src/include/tcop/deparse_utility.h
@@ -49,6 +49,7 @@ typedef struct CollectedCommand
 	CollectedCommandType type;
 
 	bool		in_extension;
+	char	   *role;
 	Node	   *parsetree;
 
 	union
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3f99b14394..af894f75b9 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -163,10 +163,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                        List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | t               | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                          List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more than once
@@ -324,10 +324,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -375,10 +375,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -404,18 +404,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Match DDL owner | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | t               | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/subscription/t/033_ddl_replication.pl b/src/test/subscription/t/033_ddl_replication.pl
index 4bc4ff2212..902d2034f5 100644
--- a/src/test/subscription/t/033_ddl_replication.pl
+++ b/src/test/subscription/t/033_ddl_replication.pl
@@ -457,6 +457,23 @@ is($result, qq(42), 'CREATE TABLE OF replicated');
 $node_publisher->safe_psql('postgres', "DROP TABLE tmp");
 $node_publisher->safe_psql('postgres', "DROP TYPE int42 cascade");
 
+# Test owner of replicated table on subscriber matches the owner on publisher when
+# the match_ddl_owner subscription option is enabled
+$node_publisher->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;");
+
+$node_subscriber->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION mysub SET (match_ddl_owner = true);");
+
+$node_publisher->safe_psql('postgres', "SET SESSION AUTHORIZATION 'ddl_replication_user'; CREATE TABLE tmp (a int, b varchar);");
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT tableowner from pg_catalog.pg_tables where tablename = 'tmp';");
+is($result, qq(ddl_replication_user), 'Owner of tmp is ddl_replication_user');
+$node_publisher->safe_psql('postgres', "DROP TABLE tmp");
+# reset match_ddl_owner
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION mysub SET (match_ddl_owner = false);");
+
+
 pass "DDL replication tests passed:";
 
 $node_subscriber->stop;
-- 
2.34.1

