public inbox for pgsql-general@postgresql.org
help / color / mirror / Atom feedFrom: Zheng Li <zhengli10@gmail.com>
To: Amit Kapila <amit.kapila16@gmail.com>
Cc: houzj.fnst@fujitsu.com <houzj.fnst@fujitsu.com>
Cc: Masahiko Sawada <sawada.mshk@gmail.com>
Cc: Japin Li <japinli@hotmail.com>
Cc: Alvaro Herrera <alvherre@alvh.no-ip.org>
Cc: Dilip Kumar <dilipbalaut@gmail.com>
Cc: rajesh singarapu <rajesh.rs0541@gmail.com>
Cc: PostgreSQL Hackers <pgsql-hackers@lists.postgresql.org>
Subject: Re: Support logical replication of DDLs
Date: Fri, 17 Jun 2022 15:38:03 -0400
Message-ID: <CAAD30U+wDPDFzUoPkSg2WYMNCXWNc8wa7GYB1Tzh_2PNUBsEHA@mail.gmail.com> (raw)
In-Reply-To: <CAA4eK1JKK9LACPovjogS-LThQBscwkrxBy9RuA6aHFP=vTGjtg@mail.gmail.com>
References: <CAAD30ULtoGp8L_GKbV15Wnm+X5r=SE7MOnYHuqBr396m26jJSA@mail.gmail.com>
<202203162206.7spggyktx63e@alvherre.pgsql>
<CAAD30UKRUusq8JyyHzAv71=ncN22OE8OkOOyAWvRHW3wXNjyyA@mail.gmail.com>
<CAAD30UKTp87+kvGZYL3M2Suxq=WEvFUG24ZRT0yT9rqdkP=uMA@mail.gmail.com>
<MEYP282MB1669863D5C31D7F6A1D996D8B6139@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM>
<CAAD30UKc=GiGQzE8H7+Ofo18hwMOfK4qUm_KUyw6c09q4JvA5Q@mail.gmail.com>
<MEYP282MB16691E383140844437FB0633B6139@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM>
<CAAD30U+ZTBXLH0wWsW9+Zu2RECGKeaQNynLs7wKA0o86w8C-fw@mail.gmail.com>
<MEYP282MB166926E46397CBFC113B4A7EB6189@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM>
<CAA4eK1J4AekmEKgmfp6e-zZz4M02m7w7uxvC2tjqmjF-LDSGDA@mail.gmail.com>
<CAAD30UKvv5=k6BY+JAF1fWzrYNbGcB0DEdNi1FMokULzOwSxcQ@mail.gmail.com>
<CAAD30U+CRgUgkAg33KzNBKwCbsgiSc5z3NYvxNzEfS0Zg2S1WA@mail.gmail.com>
<CAD21AoAv_wsBEK8jcqjBpatspiP=5E+qLokw9zCESBSvCAiRMg@mail.gmail.com>
<CAAD30UK6T8bfW1JMaSSRDSynB6W05HjNrmvSp+tvXp-jdu9xFQ@mail.gmail.com>
<CAA4eK1JQhz4y-1rYxwFxHYEAN-1JKeO0iT+Nip0N7jJUj_g7RA@mail.gmail.com>
<CAD21AoCnGwx2F+Ph3dpoJVq0YR8ke3P59XCs439pW=BRfdzgTQ@mail.gmail.com>
<OS0PR01MB571695EDF9EAB2422FBF2C1094DE9@OS0PR01MB5716.jpnprd01.prod.outlook.com>
<CAAD30ULe-cZTELQJbcAahsOFoUO-ftMxorwBTmj7uYK=_=mwxg@mail.gmail.com>
<OS0PR01MB571684CBF660D05B63B4412C94AB9@OS0PR01MB5716.jpnprd01.prod.outlook.com>
<CAAD30U+oi6e6Vh_zAzhuXzkqUhagmLGD+_iyn2N9w_sNRKsoag@mail.gmail.com>
<CAA4eK1JKK9LACPovjogS-LThQBscwkrxBy9RuA6aHFP=vTGjtg@mail.gmail.com>
On Wed, Jun 15, 2022 at 12:00 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Jun 15, 2022 at 5:44 AM Zheng Li <zhengli10@gmail.com> wrote:
> >
> >
> > While I agree that the deparser is needed to handle the potential
> > syntax differences between
> > the pub/sub, I think it's only relevant for the use cases where only a
> > subset of tables in the database
> > are replicated. For other use cases where all tables, functions and
> > other objects need to be replicated,
> > (for example, creating a logical replica for major version upgrade)
> > there won't be any syntax difference to
> > handle and the schemas are supposed to match exactly between the
> > pub/sub. In other words the user seeks to create an identical replica
> > of the source database and the DDLs should be replicated
> > as is in this case.
> >
>
> I think even for database-level replication we can't assume that
> source and target will always have the same data in which case "Create
> Table As ..", "Alter Table .. " kind of statements can't be replicated
> as it is because that can lead to different results.
"Create Table As .." is already handled by setting the skipData flag of
the statement parsetreee before replay:
/*
* Force skipping data population to avoid data inconsistency.
* Data should be replicated from the publisher instead.
*/
castmt->into->skipData = true;
"Alter Table .. " that rewrites with volatile expressions can also be handled
without any syntax change, by enabling the table rewrite replication and
converting the rewrite inserts to updates. ZJ's patch introduced this solution.
I've also adopted this approach in my latest patch
0012-Support-replication-of-ALTER-TABLE-commands-that-rew.patch
> The other point
> is tomorrow we can extend the database level option/syntax to exclude
> a few objects (something like [1]) as well in which case we again need
> to filter at the publisher level
I think for such cases it's not full database replication and we could treat it
as table level DDL replication, i.e. use the the deparser format.
> > So I think it's better to define DDL replication levels [1] to tailor
> > for the two different use cases. We can use different logging format
> > based on the DDL replication level. For example,
> > we can simply log the DDL query string and the search_path for
> > database level DDL replication. But for table level DDL replication we
> > need to use the deparser format in order to
> > handle the potential syntax differences and schema mapping requests.
> >
>
> I think having different logging formats is worth considering but I am
> not sure we can distinguish it for database and table level
> replication because of the reasons mentioned above. One thing which
> may need a different format is the replication of global objects like
> roles, tablespace, etc. but we haven't analyzed them in detail about
> those. I feel we may also need a different syntax altogether to
> replicate such objects.
Yes, global objects are not schema qualified so we probably don't need to
use the deparser format for these. We plan to do some evaluation on replication
of global objects.
> Also, I think we may want to optimize the
> current format in some cases so that the WAL amount could be reduced.
>
> I feel if we think that deparsing is required for this project then
> probably at this stage it would be a good idea to explore ways to have
> independent ways to test it. One way is to do testing via the logical
> replication of DDL (aka via this patch) and the other is to write an
> independent test suite as Sawada-San seems to be speculating above
> [2]. I am not sure if there is any progress yet on the independent
> test suite front yet.
Testing DDL deparsing support has been discussed before in [1], we
will also take a close look on it.
[1] https://www.postgresql.org/message-id/5477FD78.1060306%402ndquadrant.com
Regards,
Zheng
Attachments:
[application/octet-stream] 0003-Logical-replication-of-DDL-messages.patch (41.2K, 2-0003-Logical-replication-of-DDL-messages.patch)
download | inline diff:
From b193e29c362aaf379012c2fc6c2e4e648d1c0bb0 Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Fri, 18 Mar 2022 17:17:39 +0000
Subject: [PATCH 03/12] Logical replication of DDL messages
Integration with pgoutput:
Supports sending and receiving the DDL message using the logical
replication wire protocol. A new LogicalRepMsgType is introduced
for this purpose: LOGICAL_REP_MSG_DDLMESSAGE = 'L'.
Logical replication worker change:
Supports execution of the DDL command in the original user role and
search_path. For any new table created this way, we also set its
srsubstate in the pg_subscription_rel catalog to SUBREL_STATE_INIT,
So that DML replication can progress on this new table without
manually running "ALTER SUBSCRIPTION ... REFRESH PUBLICATION".
TAP test:
A new TAP test 030_rep_ddl.pl is added. We mainly focused on
testing the happy path of database level replication so far.
Corner case DDLs and table level DDL replication are still to be
carefully tested.
---
.../test_decoding/expected/ddlmessages.out | 26 +-
contrib/test_decoding/sql/ddlmessages.sql | 5 +-
src/backend/replication/logical/proto.c | 63 ++++-
src/backend/replication/logical/worker.c | 264 ++++++++++++++++++
src/backend/replication/pgoutput/pgoutput.c | 82 +++++-
src/backend/tcop/utility.c | 14 +-
src/include/replication/logicalproto.h | 10 +-
src/test/subscription/t/004_sync.pl | 2 +-
src/test/subscription/t/006_rewrite.pl | 2 +-
src/test/subscription/t/008_diff_schema.pl | 2 +-
src/test/subscription/t/009_matviews.pl | 2 +-
src/test/subscription/t/012_collation.pl | 2 +-
src/test/subscription/t/013_partition.pl | 8 +-
src/test/subscription/t/030_rep_ddls.pl | 237 ++++++++++++++++
14 files changed, 684 insertions(+), 35 deletions(-)
create mode 100644 src/test/subscription/t/030_rep_ddls.pl
diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out
index 79284f9def..0376f36c24 100644
--- a/contrib/test_decoding/expected/ddlmessages.out
+++ b/contrib/test_decoding/expected/ddlmessages.out
@@ -2,6 +2,9 @@
SET synchronous_commit = on;
-- turn on logical ddl message logging
CREATE publication mypub FOR ALL TABLES with (ddl = 'database');
+-- SET USER
+CREATE ROLE ddl_replication_user LOGIN SUPERUSER;
+SET SESSION AUTHORIZATION 'ddl_replication_user';
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
@@ -20,23 +23,22 @@ BEGIN;
CREATE TABLE test_ddlmessage (id serial unique, data int);
ALTER TABLE test_ddlmessage add c3 varchar;
COMMIT;
-\o | sed 's/role.*search_path/role: redacted, search_path/g'
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-SELECT pg_drop_replication_slot('regression_slot');
-DROP TABLE test_ddlmessage;
-DROP publication mypub;
- data
---------------------------------------------------------------------------------------------------------------------------------------------------------------
- DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int);
- DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
- DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3;
- DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage;
- DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int);
- DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
+ data
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int);
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3;
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage;
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int);
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
(6 rows)
+SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
+DROP TABLE test_ddlmessage;
+DROP publication mypub;
diff --git a/contrib/test_decoding/sql/ddlmessages.sql b/contrib/test_decoding/sql/ddlmessages.sql
index 211497ee22..c23610f9b4 100644
--- a/contrib/test_decoding/sql/ddlmessages.sql
+++ b/contrib/test_decoding/sql/ddlmessages.sql
@@ -3,6 +3,10 @@ SET synchronous_commit = on;
-- turn on logical ddl message logging
CREATE publication mypub FOR ALL TABLES with (ddl = 'database');
+-- SET USER
+CREATE ROLE ddl_replication_user LOGIN SUPERUSER;
+SET SESSION AUTHORIZATION 'ddl_replication_user';
+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
CREATE TABLE test_ddlmessage (id serial unique, data int);
@@ -20,7 +24,6 @@ CREATE TABLE test_ddlmessage (id serial unique, data int);
ALTER TABLE test_ddlmessage add c3 varchar;
COMMIT;
-\o | sed 's/role.*search_path/role: redacted, search_path/g'
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
DROP TABLE test_ddlmessage;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index ff8513e2d2..3cfa94dd8c 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -640,8 +640,8 @@ logicalrep_read_truncate(StringInfo in,
*/
void
logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
- bool transactional, const char *prefix, Size sz,
- const char *message)
+ bool transactional, const char *prefix,
+ Size sz, const char *message)
{
uint8 flags = 0;
@@ -662,6 +662,63 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
pq_sendbytes(out, message, sz);
}
+/*
+ * Read DDL MESSAGE from stream
+ */
+const char *
+logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn,
+ const char **prefix,
+ const char **role,
+ const char **search_path,
+ bool *transactional,
+ Size *sz)
+{
+ uint8 flags;
+ const char *msg;
+
+ //TODO double check when do we need to get TransactionId.
+
+ flags = pq_getmsgint(in, 1);
+ *transactional = (flags & MESSAGE_TRANSACTIONAL) > 0;
+ *lsn = pq_getmsgint64(in);
+ *prefix = pq_getmsgstring(in);
+ *role = pq_getmsgstring(in);
+ *search_path = pq_getmsgstring(in);
+ *sz = pq_getmsgint(in, 4);
+ msg = pq_getmsgbytes(in, *sz);
+
+ return msg;
+}
+
+/*
+ * Write DDL MESSAGE to stream
+ */
+void
+logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const char *prefix, const char *role,
+ const char *search_path, Size sz, const char *message)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_DDLMESSAGE);
+
+ /* encode and send message flags */
+ if (transactional)
+ flags |= MESSAGE_TRANSACTIONAL;
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+ pq_sendstring(out, prefix);
+ pq_sendstring(out, role);
+ pq_sendstring(out, search_path);
+ pq_sendint32(out, sz);
+ pq_sendbytes(out, message, sz);
+}
+
/*
* Write relation description to the output stream.
*/
@@ -1218,6 +1275,8 @@ logicalrep_message_type(LogicalRepMsgType action)
return "TYPE";
case LOGICAL_REP_MSG_MESSAGE:
return "MESSAGE";
+ case LOGICAL_REP_MSG_DDLMESSAGE:
+ return "DDL";
case LOGICAL_REP_MSG_BEGIN_PREPARE:
return "BEGIN PREPARE";
case LOGICAL_REP_MSG_PREPARE:
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 607f719fd6..2149a9894e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -156,6 +156,7 @@
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
+#include "parser/analyze.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
@@ -180,6 +181,8 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
+#include "tcop/pquery.h"
+#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
@@ -346,6 +349,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
CmdType operation);
+static void apply_execute_sql_command(const char *cmdstr,
+ const char* role,
+ const char* search_path,
+ bool isTopLevel);
/* Compute GID for two_phase transactions */
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
@@ -2448,6 +2455,259 @@ apply_handle_truncate(StringInfo s)
end_replication_step();
}
+/*
+ * Handle generic messages.
+ */
+static void
+apply_handle_ddlmessage(StringInfo s)
+{
+ XLogRecPtr lsn;
+ bool transactional;
+ Size sz;
+ const char *prefix;
+ const char *role;
+ const char *search_path;
+ const char *msg;
+
+ msg = logicalrep_read_ddlmessage(s, &lsn, &prefix, &role, &search_path, &transactional, &sz);
+
+ apply_execute_sql_command(msg, role, search_path, true);
+}
+
+/*
+ * Add context to the errors produced by apply_execute_sql_command().
+ */
+static void
+execute_sql_command_error_cb(void *arg)
+{
+ errcontext("during execution of SQL statement: %s", (char *) arg);
+}
+
+/*
+ * Execute an SQL command. This can be multiple queries.
+ * This is modified based on pglogical_execute_sql_command().
+ */
+static void
+apply_execute_sql_command(const char *cmdstr, const char *role, const char *search_path,
+ bool isTopLevel)
+{
+ const char *save_debug_query_string = debug_query_string;
+ List *parsetree_list;
+ ListCell *parsetree_item;
+ MemoryContext oldcontext;
+ ErrorContextCallback errcallback;
+ int save_nestlevel;
+
+ /*
+ * Switch to appropriate context for constructing parsetrees.
+ */
+ oldcontext = MemoryContextSwitchTo(ApplyMessageContext);
+ begin_replication_step();
+
+ /*
+ * Set the current role to the user that executed the command on the
+ * publication server.
+ * Set the current search_path to the search_path on the publication
+ * server when the command was executed.
+ */
+ save_nestlevel = NewGUCNestLevel();
+ SetConfigOption("role", role, PGC_INTERNAL, PGC_S_OVERRIDE);
+ SetConfigOption("search_path", search_path, PGC_INTERNAL, PGC_S_OVERRIDE);
+
+ errcallback.callback = execute_sql_command_error_cb;
+ errcallback.arg = (char *) cmdstr;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ debug_query_string = cmdstr;
+
+ parsetree_list = pg_parse_query(cmdstr);
+
+ /*
+ * Do a limited amount of safety checking against CONCURRENTLY commands
+ * executed in situations where they aren't allowed. The sender side should
+ * provide protection, but better be safe than sorry.
+ */
+ isTopLevel = isTopLevel && (list_length(parsetree_list) == 1);
+
+ /*
+ * Switch back to transaction context to enter the loop.
+ */
+ MemoryContextSwitchTo(oldcontext);
+
+ foreach(parsetree_item, parsetree_list)
+ {
+ List *plantree_list;
+ List *querytree_list;
+ RawStmt *command = (RawStmt *) lfirst(parsetree_item);
+ CommandTag commandTag;
+ MemoryContext per_parsetree_context = NULL;
+ Portal portal;
+ DestReceiver *receiver;
+ bool snapshot_set = false;
+ char *schemaname = NULL; /* For CREATE TABLE stmt only */
+ char *relname = NULL; /* For CREATE TABLE stmt only */
+
+ commandTag = CreateCommandTag((Node *)command);
+
+ /*
+ * Remember the schemaname and relname if it's a CREATE TABLE stmt
+ * because we will need them for some post-processing after we
+ * execute the stmt. At that point, CreateStmt may have been freeed up.
+ */
+ if (commandTag == CMDTAG_CREATE_TABLE)
+ {
+ CreateStmt *cstmt = (CreateStmt *)command->stmt;
+ RangeVar *rv = cstmt->relation;
+ schemaname = rv->schemaname;
+ relname = rv->relname;
+ }
+
+ /*
+ * Set up a snapshot if parse analysis/planning will need one.
+ */
+ if (analyze_requires_snapshot(command))
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ }
+
+ /*
+ * OK to analyze, rewrite, and plan this query.
+ *
+ * Switch to appropriate context for constructing query and plan trees
+ * (these can't be in the transaction context, as that will get reset
+ * when the command is COMMIT/ROLLBACK). If we have multiple
+ * parsetrees, we use a separate context for each one, so that we can
+ * free that memory before moving on to the next one. But for the
+ * last (or only) parsetree, just use MessageContext, which will be
+ * reset shortly after completion anyway. In event of an error, the
+ * per_parsetree_context will be deleted when MessageContext is reset.
+ */
+ if (lnext(parsetree_list, parsetree_item) != NULL)
+ {
+ per_parsetree_context =
+ AllocSetContextCreate(MessageContext,
+ "per-parsetree message context",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(per_parsetree_context);
+ }
+ else
+ oldcontext = MemoryContextSwitchTo(ApplyMessageContext);
+
+ querytree_list = pg_analyze_and_rewrite_fixedparams(
+ command,
+ cmdstr,
+ NULL, 0, NULL);
+
+ plantree_list = pg_plan_queries(
+ querytree_list, cmdstr, 0, NULL);
+
+ /*
+ * Done with the snapshot used for parsing/planning.
+ *
+ * While it looks promising to reuse the same snapshot for query
+ * execution (at least for simple protocol), unfortunately it causes
+ * execution to use a snapshot that has been acquired before locking
+ * any of the tables mentioned in the query. This creates user-
+ * visible anomalies, so refrain. Refer to
+ * https://postgr.es/m/flat/5075D8DF.6050500@fuzzy.cz for details.
+ */
+ if (snapshot_set)
+ PopActiveSnapshot();
+
+ portal = CreatePortal("logical replication", true, true);
+
+ /*
+ * We don't have to copy anything into the portal, because everything
+ * we are passing here is in MessageContext or the
+ * per_parsetree_context, and so will outlive the portal anyway.
+ */
+ PortalDefineQuery(portal,
+ NULL,
+ cmdstr,
+ commandTag,
+ plantree_list,
+ NULL);
+
+ /*
+ * Start the portal. No parameters here.
+ */
+ PortalStart(portal, NULL, 0, InvalidSnapshot);
+
+ /* DestNone for logical replication */
+ receiver = CreateDestReceiver(DestNone);
+
+ /*
+ * Switch back to transaction context for execution.
+ */
+ MemoryContextSwitchTo(oldcontext);
+
+ (void) PortalRun(portal,
+ FETCH_ALL,
+ isTopLevel,
+ true,
+ receiver,
+ receiver,
+ NULL);
+ (*receiver->rDestroy) (receiver);
+
+ PortalDrop(portal, false);
+
+ CommandCounterIncrement();
+
+ /*
+ * Table created by DDL replication (database level) is automatically
+ * added to the subscription here.
+ *
+ * Call AddSubscriptionRelState for CREATE TABEL command to set
+ * the relstate to SUBREL_STATE_INIT so DML changes on this
+ * new table can be replicated without having to manually run
+ * "alter subscription ... refresh publication"
+ */
+ if (commandTag == CMDTAG_CREATE_TABLE)
+ {
+ Oid relid;
+ Oid relnamespace = InvalidOid;
+
+ if (schemaname != NULL)
+ relnamespace = get_namespace_oid(schemaname, false);
+ if (relnamespace != InvalidOid)
+ relid = get_relname_relid(relname, relnamespace);
+ else
+ {
+ /*
+ * Try to resolve unqualified relname.
+ * Notice we have set the search_path to the original search_path on the publisher
+ * at the beginning of this function.
+ */
+ relid = RelnameGetRelid(relname);
+ }
+
+ if (relid != InvalidOid)
+ {
+ AddSubscriptionRelState(MySubscription->oid, relid,
+ SUBREL_STATE_INIT,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s\" added to subscription \"%s\"",
+ relname, MySubscription->name)));
+ }
+ }
+ }
+
+ /*
+ * Restore the GUC variables we set above.
+ */
+ AtEOXact_GUC(true, save_nestlevel);
+
+ /* protect against stack resets during CONCURRENTLY processing */
+ if (error_context_stack == &errcallback)
+ error_context_stack = errcallback.previous;
+
+ debug_query_string = save_debug_query_string;
+ end_replication_step();
+}
/*
* Logical replication protocol message dispatcher.
@@ -2513,6 +2773,10 @@ apply_dispatch(StringInfo s)
*/
break;
+ case LOGICAL_REP_MSG_DDLMESSAGE:
+ apply_handle_ddlmessage(s);
+ break;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
break;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 8deae57143..db221c7e8d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -54,6 +54,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pgoutput_ddlmessage(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix, const char *role,
+ const char *search_path, Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -256,6 +260,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
+ cb->ddlmessage_cb = pgoutput_ddlmessage;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -272,6 +277,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
+ cb->stream_ddlmessage_cb = pgoutput_ddlmessage;
cb->stream_truncate_cb = pgoutput_truncate;
/* transaction streaming - two-phase commit */
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -1656,8 +1662,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
static void
pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
- XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
- const char *message)
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size sz, const char *message)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
@@ -1697,6 +1703,70 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+static void
+pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, const char * role,
+ const char *search_path, Size sz, const char *message)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ TransactionId xid = InvalidTransactionId;
+ ListCell *lc;
+
+ /* Reload publications if needed before use. */
+ if (!publications_valid)
+ {
+ MemoryContext oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ if (data->publications)
+ list_free_deep(data->publications);
+
+ data->publications = LoadPublications(data->publication_names);
+ MemoryContextSwitchTo(oldctx);
+ publications_valid = true;
+ }
+
+ /* Check if ddl replication is turned on for the publications */
+ foreach(lc, data->publications)
+ {
+ Publication *pub = (Publication *) lfirst(lc);
+ /* TODO need to check relid for table level DDLs */
+ if (!pub->pubactions.pubddl_database && !pub->pubactions.pubddl_table)
+ return;
+ }
+
+ /*
+ * Remember the xid for the message in streaming mode. See
+ * pgoutput_change.
+ */
+ if (in_streaming)
+ xid = txn->xid;
+
+ /*
+ * Output BEGIN if we haven't yet. Avoid for non-transactional
+ * messages.
+ */
+ if (transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_ddlmessage(ctx->out,
+ xid,
+ message_lsn,
+ transactional,
+ prefix,
+ role,
+ search_path,
+ sz,
+ message);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Currently we always forward.
*/
@@ -1982,7 +2052,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->schema_sent = false;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+ entry->pubactions.pubddl_database =
+ entry->pubactions.pubddl_table = false;
entry->new_slot = NULL;
entry->old_slot = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
@@ -2040,6 +2112,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubddl_database = false;
+ entry->pubactions.pubddl_table = false;
/*
* Tuple slots cleanups. (Will be rebuilt later if needed).
@@ -2153,6 +2227,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+ entry->pubactions.pubddl_database |= pub->pubactions.pubddl_database;
+ entry->pubactions.pubddl_table |= pub->pubactions.pubddl_table;
/*
* We want to publish the changes as the top-most ancestor
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 5fe54f742a..d0bb232af6 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1268,13 +1268,6 @@ ProcessUtilitySlow(ParseState *pstate,
if (isCompleteQuery)
EventTriggerDDLCommandStart(parsetree);
- /*
- * Consider logging the DDL command if logical logging is enabled and this is
- * a top level query.
- */
- if (XLogLogicalInfoActive() && isTopLevel)
- LogLogicalDDLCommand(parsetree, queryString);
-
switch (nodeTag(parsetree))
{
/*
@@ -2110,6 +2103,13 @@ ProcessUtilitySlow(ParseState *pstate,
if (isCompleteQuery)
{
+ /*
+ * Consider logging the DDL command if logical logging is enabled and this is
+ * a complete top level query.
+ */
+ if (XLogLogicalInfoActive() && isTopLevel)
+ LogLogicalDDLCommand(parsetree, queryString);
+
EventTriggerSQLDrop(parsetree);
EventTriggerDDLCommandEnd(parsetree);
}
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a771ab8ff3..28ff562d62 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
+ LOGICAL_REP_MSG_DDLMESSAGE = 'L',
LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
LOGICAL_REP_MSG_PREPARE = 'P',
LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -229,7 +230,14 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs);
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
- bool transactional, const char *prefix, Size sz, const char *message);
+ bool transactional, const char *prefix,
+ Size sz, const char *message);
+extern void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const char *prefix, const char *role,
+ const char *search_path, Size sz, const char *message);
+extern const char *logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix,
+ const char **role, const char **search_path,
+ bool *transactional, Size *sz);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel, Bitmapset *columns);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index cf61fc1e0f..698c5114e6 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -33,7 +33,7 @@ $node_subscriber->safe_psql('postgres',
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub FOR ALL TABLES");
+ "CREATE PUBLICATION tap_pub FOR ALL TABLES WITH (ddl = '')");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl
index c924ff35f7..6c9b055eb0 100644
--- a/src/test/subscription/t/006_rewrite.pl
+++ b/src/test/subscription/t/006_rewrite.pl
@@ -23,7 +23,7 @@ $node_subscriber->safe_psql('postgres', $ddl);
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION mypub FOR ALL TABLES;");
+ "CREATE PUBLICATION mypub FOR ALL TABLES WITH (ddl = '');");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
);
diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl
index 67b4026afa..2902e6fc34 100644
--- a/src/test/subscription/t/008_diff_schema.pl
+++ b/src/test/subscription/t/008_diff_schema.pl
@@ -32,7 +32,7 @@ $node_subscriber->safe_psql('postgres',
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub FOR ALL TABLES");
+ "CREATE PUBLICATION tap_pub FOR ALL TABLES WITH (ddl = '')");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
diff --git a/src/test/subscription/t/009_matviews.pl b/src/test/subscription/t/009_matviews.pl
index 1ce696d4a4..6c586877a1 100644
--- a/src/test/subscription/t/009_matviews.pl
+++ b/src/test/subscription/t/009_matviews.pl
@@ -19,7 +19,7 @@ $node_subscriber->start;
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION mypub FOR ALL TABLES;");
+ "CREATE PUBLICATION mypub FOR ALL TABLES WITH (ddl = '');");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
);
diff --git a/src/test/subscription/t/012_collation.pl b/src/test/subscription/t/012_collation.pl
index 2182f7948e..e1b5050663 100644
--- a/src/test/subscription/t/012_collation.pl
+++ b/src/test/subscription/t/012_collation.pl
@@ -76,7 +76,7 @@ $node_subscriber->safe_psql('postgres',
# set up publication, subscription
$node_publisher->safe_psql('postgres',
- q{CREATE PUBLICATION pub1 FOR ALL TABLES});
+ q{CREATE PUBLICATION pub1 FOR ALL TABLES WITH (ddl = '')});
$node_subscriber->safe_psql('postgres',
qq{CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (copy_data = false)}
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 69f4009a14..af028e0258 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -25,9 +25,9 @@ $node_subscriber2->start;
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
# publisher
-$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 WITH (ddl = '')");
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION pub_all FOR ALL TABLES");
+ "CREATE PUBLICATION pub_all FOR ALL TABLES WITH (ddl = '')");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
$node_publisher->safe_psql('postgres',
@@ -425,12 +425,12 @@ $node_publisher->safe_psql('postgres',
# and child tables are present but changes will be replicated via the parent's
# identity and only once.
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)"
+ "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true, ddl = '')"
);
# for tab4, we publish changes through the "middle" partitioned table
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)"
+ "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true, ddl = '')"
);
# prepare data for the initial sync
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
new file mode 100644
index 0000000000..c88c4ea1c0
--- /dev/null
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -0,0 +1,237 @@
+
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Regression tests for logical replication of DDLs
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'autovacuum = off');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', 'autovacuum = off');
+$node_subscriber->start;
+
+my $node_subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2');
+$node_subscriber2->init(allows_streaming => 'logical');
+$node_subscriber2->append_conf('postgresql.conf', 'autovacuum = off');
+$node_subscriber2->start;
+
+my $ddl = "CREATE TABLE test_rep(id int primary key, name varchar);";
+$node_publisher->safe_psql('postgres', $ddl);
+$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (1, 'data1');");
+$node_subscriber->safe_psql('postgres', $ddl);
+$node_subscriber2->safe_psql('postgres', $ddl);
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# mypub has pubddl_database on
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION mypub FOR ALL TABLES;");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
+);
+# mypub2 has pubddl_database off
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION mypub2 FOR ALL TABLES with (ddl = '');");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' PUBLICATION mypub2;"
+);
+
+$node_publisher->wait_for_catchup('mysub');
+
+# Test simple CREATE TABLE command is replicated to subscriber
+# Test smae simple CREATE TABLE command is not replicated to subscriber2 (ddl off)
+# Test ALTER TABLE command is replicated on table test_rep
+# Test CREATE INDEX is replicated to subscriber
+# Test CREATE FUNCTION command is replicated to subscriber
+$node_publisher->safe_psql('postgres', "CREATE TABLE t1 (a int, b varchar);");
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD c3 int;");
+$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (2, 'data2', 2);");
+$node_publisher->safe_psql('postgres', "CREATE INDEX nameindex on test_rep (name)");
+$node_publisher->safe_psql('postgres', qq{CREATE OR REPLACE FUNCTION totalRecords()
+RETURNS integer AS \$total\$
+declare
+ total integer;
+BEGIN
+ SELECT count(*) into total FROM test_rep;
+ RETURN total;
+END;
+\$total\$ LANGUAGE plpgsql;});
+
+$node_publisher->wait_for_catchup('mysub');
+
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t1");
+is($result, qq(0), 'CREATE of t1 replicated to subscriber');
+$result = $node_subscriber2->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't1';");
+is($result, qq(0), 'CREATE of t1 is not replicated to subscriber2');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_rep WHERE c3 =2;");
+is($result, qq(1), 'ALTER test_rep ADD replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_class where relname = 'nameindex'");
+is($result, qq(1), 'CREATE INDEX nameindex replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT totalRecords();");
+is($result, qq(2), 'CREATE of function totalRecords replicated to subscriber');
+$result = $node_subscriber2->safe_psql('postgres', "SELECT count(*) FROM pg_proc where proname = 'totalrecords';");
+is($result, qq(0), 'CREATE FUNCTION totalrecords is not replicated to subscriber2');
+
+# Test ALTER TABLE DROP
+# Test DROP INDEX
+# Test DROP FUNCTION
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep DROP c3;");
+$node_publisher->safe_psql('postgres', "DELETE FROM test_rep where id = 2;");
+$node_publisher->safe_psql('postgres', "DROP INDEX nameindex;");
+$node_publisher->safe_psql('postgres', "DROP FUNCTION totalRecords;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep;");
+is($result, qq(1), 'ALTER test_rep DROP replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_class where relname = 'nameindex'");
+is($result, qq(0), 'DROP INDEX nameindex replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_proc where proname = 'totalrecords';");
+is($result, qq(0), 'DROP FUNCTION totalrecords replicated');
+
+
+# TODO figure out how to set ON_ERROR_STOP = 0 in this test
+# Test failed CREATE/ALTER TABLE on publisher doesn't break replication
+# Table t1 already exits so expect the command to fail
+#$node_publisher->safe_psql('postgres', "CREATE TABLE t1 (a int, b varchar);");
+#$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep DROP c3;");
+#$node_publisher->safe_psql('postgres', "INSERT INTO test_rep VALUES (103, 'data103', 1013);");
+
+#$node_publisher->wait_for_catchup('mysub');
+# Verify replication still works
+#$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep;");
+#is($result, qq(1), 'DELETE from test_rep replicated');
+
+# Test DDLs inside txn block
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+CREATE TABLE t2 (a int, b varchar);
+ALTER TABLE test_rep ADD c3 int;
+INSERT INTO test_rep VALUES (3, 'data3', 3);
+COMMIT;});
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t2;");
+is($result, qq(0), 'CREATE t2 replicated');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_rep;");
+is($result, qq(2), 'ALTER test_rep ADD replicated');
+
+# Test toggling pubddl_database option off
+$node_publisher->safe_psql('postgres', "ALTER PUBLICATION mypub set (ddl = '');");
+$result = $node_publisher->safe_psql('postgres', "SELECT pubddl_database, pubddl_table from pg_publication where pubname = 'mypub';");
+is($result, qq(f|f), 'pubddl_database turned off on mypub');
+$node_publisher->safe_psql('postgres', "CREATE TABLE t3 (a int, b varchar);");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't3';");
+is($result, qq(0), 'CREATE t3 is not replicated');
+
+# Test toggling pubddl_database option on
+$node_publisher->safe_psql('postgres', "ALTER PUBLICATION mypub set (ddl = 'database');");
+$result = $node_publisher->safe_psql('postgres', "SELECT pubddl_database, pubddl_table from pg_publication where pubname = 'mypub';");
+is($result, qq(t|t), 'pubddl_database turned on on mypub');
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE t4 (a int, b varchar);");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_tables where tablename = 't4';");
+is($result, qq(1), 'CREATE t4 is replicated');
+
+# Test DML changes on the new table t4 are replicated
+$node_publisher->safe_psql('postgres', "INSERT INTO t4 values (1, 'a')");
+$node_publisher->safe_psql('postgres', "INSERT INTO t4 values (2, 'b')");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from t4;");
+is($result, qq(2), 'DML Changes to t4 are replicated');
+
+# A somewhat complicated test in plpgsql block with trigger
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+CREATE TABLE foo (a int);
+CREATE INDEX foo_idx ON foo (a);
+ALTER TABLE foo ADD COLUMN b timestamptz;
+CREATE FUNCTION foo_ts()
+RETURNS trigger AS $$
+BEGIN
+NEW.b := current_timestamp;
+RETURN NEW;
+END;
+$$
+LANGUAGE plpgsql;
+CREATE TRIGGER foo_ts BEFORE INSERT OR UPDATE ON foo
+FOR EACH ROW EXECUTE FUNCTION foo_ts();
+INSERT INTO foo VALUES (1);
+COMMIT;});
+$result = $node_publisher->safe_psql('postgres', "SELECT b from foo where a = 1;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+my $result_sub = $node_subscriber->safe_psql('postgres', "SELECT b from foo where a = 1;");
+is($result, qq($result_sub), 'timestamp of insert matches');
+
+# Test CREATE SCHEMA stmt is replicated
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA s1");
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = 's1';");
+is($result, qq(1), 'CREATE SCHEMA s1 is replicated');
+
+# Test CREATE TABLE in new schema s1 followed by insert
+$node_publisher->safe_psql('postgres', "CREATE TABLE s1.t1 (a int, b varchar);");
+$node_publisher->safe_psql('postgres', "INSERT INTO s1.t1 VALUES (1, 'a');");
+$node_publisher->safe_psql('postgres', "INSERT INTO s1.t1 VALUES (2, 'b');");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.t1;");
+is($result, qq(2), 'CREATE TABLE s1.t1 is replicated');
+
+# Test replication works as expected with mismatched search_path on publisher and subscriber
+$node_publisher->append_conf('postgresql.conf', 'search_path = \'s1, public\'');
+$node_publisher->restart;
+# CREATE unqualified table t2, it is s1.t2 under the modified search_path
+$node_publisher->safe_psql('postgres', "CREATE TABLE t2 (a int, b varchar);");
+$node_publisher->safe_psql('postgres', "INSERT INTO t2 VALUES (1, 'a');");
+$node_publisher->safe_psql('postgres', "INSERT INTO t2 VALUES (2, 'b');");
+$node_publisher->safe_psql('postgres', "INSERT INTO t2 VALUES (3, 'c');");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.t2;");
+is($result, qq(3), 'CREATE TABLE s1.t2 is replicated');
+
+# Test owner of new table on subscriber matches the owner on publisher
+$node_publisher->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;");
+
+$node_subscriber->safe_psql('postgres', "CREATE ROLE ddl_replication_user LOGIN SUPERUSER;");
+
+$node_publisher->safe_psql('postgres', "SET SESSION AUTHORIZATION 'ddl_replication_user'; CREATE TABLE t5 (a int, b varchar);");
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT tableowner from pg_catalog.pg_tables where tablename = 't5';");
+is($result, qq(ddl_replication_user), 'Owner of t5 is correct');
+
+#TODO TEST certain DDLs are not replicated
+
+pass "DDL replication tests passed!";
+
+$node_subscriber->stop;
+$node_subscriber2->stop;
+$node_publisher->stop;
+
+done_testing();
--
2.32.0
[application/octet-stream] 0005-Support-replication-of-CREATE-.-AS-.-and-SELECT-.-IN.patch (8.0K, 3-0005-Support-replication-of-CREATE-.-AS-.-and-SELECT-.-IN.patch)
download | inline diff:
From 4938badf12bebf461dae27cd76b6015c27fb6f53 Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Thu, 24 Mar 2022 17:46:55 +0000
Subject: [PATCH 05/12] Support replication of CREATE ... AS .. and SELECT ...
INTO ... statements.
The idea is to force skipping the direct data population
(which could potentially cause data mismatch compared to the publisher)
with these command on the subscriber by force setting the skipData flag
in the intoClause of the parsetree after the logical replication worker
parses the command. The data sync will be taken care of by DML replication
after the replication of the DDL command.
---
src/backend/replication/logical/worker.c | 51 ++++++++++++++++++++----
src/backend/tcop/utility.c | 41 +++++++++++--------
src/test/subscription/t/030_rep_ddls.pl | 24 +++++++++++
3 files changed, 92 insertions(+), 24 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2149a9894e..72711b06d6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2545,23 +2545,58 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
Portal portal;
DestReceiver *receiver;
bool snapshot_set = false;
- char *schemaname = NULL; /* For CREATE TABLE stmt only */
- char *relname = NULL; /* For CREATE TABLE stmt only */
+ char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */
+ char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */
commandTag = CreateCommandTag((Node *)command);
/*
- * Remember the schemaname and relname if it's a CREATE TABLE stmt
+ * Remember the schemaname and relname if the cmd is going to create a table
* because we will need them for some post-processing after we
- * execute the stmt. At that point, CreateStmt may have been freeed up.
+ * execute the stmt. At that point, command->stmt may have been freeed up.
*/
if (commandTag == CMDTAG_CREATE_TABLE)
{
- CreateStmt *cstmt = (CreateStmt *)command->stmt;
+ CreateStmt *cstmt = (CreateStmt *) command->stmt;
RangeVar *rv = cstmt->relation;
schemaname = rv->schemaname;
relname = rv->relname;
}
+ else if (commandTag == CMDTAG_CREATE_TABLE_AS)
+ {
+ CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt;
+
+ if (castmt->objtype == OBJECT_TABLE)
+ {
+ RangeVar *rv = castmt->into->rel;
+ schemaname = rv->schemaname;
+ relname = rv->relname;
+
+ /*
+ * Force skipping data population to avoid data inconsistency.
+ * Data should be replicated from the publisher instead.
+ */
+ castmt->into->skipData = true;
+ }
+ }
+ /* SELECT INTO */
+ else if (commandTag == CMDTAG_SELECT)
+ {
+ SelectStmt *sstmt = (SelectStmt *) command->stmt;
+
+ if (sstmt->intoClause != NULL)
+ {
+ RangeVar *rv = sstmt->intoClause->rel;
+ schemaname = rv->schemaname;
+ relname = rv->relname;
+
+ /*
+ * Force skipping data population to avoid data inconsistency.
+ * Data should be replicated from the publisher instead.
+ */
+ sstmt->intoClause->skipData = true;
+ }
+ }
/*
* Set up a snapshot if parse analysis/planning will need one.
@@ -2660,12 +2695,12 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
* Table created by DDL replication (database level) is automatically
* added to the subscription here.
*
- * Call AddSubscriptionRelState for CREATE TABEL command to set
- * the relstate to SUBREL_STATE_INIT so DML changes on this
+ * Call AddSubscriptionRelState for CREATE TABEL and CREATE TABLE AS
+ * command to set the relstate to SUBREL_STATE_INIT so DML changes on this
* new table can be replicated without having to manually run
* "alter subscription ... refresh publication"
*/
- if (commandTag == CMDTAG_CREATE_TABLE)
+ if (relname != NULL)
{
Oid relid;
Oid relnamespace = InvalidOid;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 79b5faccc0..29381b53b6 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1142,24 +1142,31 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
/*
* CreateTableAsStmt can create either a table a materialized view
- * and they are handled differently.
*/
case T_CreateTableAsStmt:
{
CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree;
+
switch(stmt->objtype)
{
+ /*
+ * Either CREATE TABLE AS or SELECT ... INTO stmt
+ * The statement is logged as is, but when we apply the
+ * CREATE TABLE AS or SELECT ... INTO statemtns on the logical
+ * replication worker, we will force the skipData flag in the
+ * intoClause. This way we avoid direct data population on the
+ * subsriber with the execution of these commands which can
+ * potentially cause data mismatch bewteen the publisher.
+ *
+ * The data sync will be handled by DML replication after the
+ * target table has been created.
+ */
case OBJECT_TABLE:
- /*
- * FIXME CREATE TABLE AS stmt needs to be broken down into two parts
- * 1. A normal CREATE TABLE string that get's logged and replicated via
- * DDL replication.
- * 2. Insertions that get replicated by DML replication.
- */
- break;
+
+ /* CREATE MATERIALIZED VIEW */
case OBJECT_MATVIEW:
/*
- * Log CREATE MATERIALIZED VIEW AS stmt for logical replication if
+ * Log these stmt for logical replication if
* there is any FOR ALL TABLES publication with pubddl_database on.
* i.e. Database level DDL replication is on for some publication.
*/
@@ -1304,8 +1311,17 @@ ProcessUtilitySlow(ParseState *pstate,
PG_TRY();
{
if (isCompleteQuery)
+ {
EventTriggerDDLCommandStart(parsetree);
+ /*
+ * Consider logging the DDL command if logical logging is enabled and this is
+ * a complete top level query.
+ */
+ if (XLogLogicalInfoActive() && isTopLevel)
+ LogLogicalDDLCommand(parsetree, queryString);
+ }
+
switch (nodeTag(parsetree))
{
/*
@@ -2141,13 +2157,6 @@ ProcessUtilitySlow(ParseState *pstate,
if (isCompleteQuery)
{
- /*
- * Consider logging the DDL command if logical logging is enabled and this is
- * a complete top level query.
- */
- if (XLogLogicalInfoActive() && isTopLevel)
- LogLogicalDDLCommand(parsetree, queryString);
-
EventTriggerSQLDrop(parsetree);
EventTriggerDDLCommandEnd(parsetree);
}
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
index 562efe2cf7..3b15c6d9f0 100644
--- a/src/test/subscription/t/030_rep_ddls.pl
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -244,6 +244,30 @@ $node_publisher->wait_for_catchup('mysub');
$result_sub = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.view1;");
is($result, qq($result_sub), 'CREATE of s1.view1 is replicated');
+# TEST CREATE TABLE AS stmt
+$node_publisher->safe_psql('postgres', "CREATE TABLE s1.t3 AS SELECT a, b from s1.t1;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.t3;");
+is($result, qq(2), 'CREATE TABLE s1.t3 AS is replicated with data');
+
+# TEST CREATE TABLE AS stmt ... WITH NO DATA
+$node_publisher->safe_psql('postgres', "CREATE TABLE s1.t4 AS SELECT a, b from s1.t1 WITH NO DATA;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.t4;");
+is($result, qq(0), 'CREATE TABLE s1.t4 AS is replicated with no data');
+
+# TEST SELECT INTO stmt
+$node_publisher->safe_psql('postgres', "SELECT b into s1.t6 from s1.t1 where a > 1");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.t6;");
+is($result, qq(1), 'SELECT INTO s1.t6 is replicated with data');
+
#TODO TEST certain DDLs are not replicated
pass "DDL replication tests passed!";
--
2.32.0
[application/octet-stream] 0002-Support-logical-logging-and-decoding-of-DDL-command-.patch (61.4K, 4-0002-Support-logical-logging-and-decoding-of-DDL-command-.patch)
download | inline diff:
From 7ab1fd432489f3dbd056fa897786cc5315c7d8ac Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Fri, 18 Mar 2022 16:57:23 +0000
Subject: [PATCH 02/12] Support logical logging and decoding of DDL command
string.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
A new WAL record type xl_logical_ddl_message is introduced to
support logical logging of DDL command. xl_logical_ddl_message is
similar to the existing xl_logical_message for generic message
logging. The reason for not using xl_logical_message directly as
proposed initially is I found out we need to log more information
(such as user role, search path and potentially more in the future)
than just one string, and we don’t want to make too much changes to
the existing xl_logical_message which may break its current consumers.
The logging of DDL command string is processed in function
LogLogicalDDLCommand. We categorize DDL command types into three
categories in this function:
1. replicated in database level replication only (such as CREATE
TABLE, CREATE FUNCTION).
2. replicated in database or table level replication depending on
the configuration (such as ALTER TABLE).
3. not supported for replication or pending investigation.
Support logical decoding of the new WAL record xl_logical_ddl_message.
This is similar to the logical decoding of xl_logical_message. Tests
for this change are added in the test_decoding plugin.
---
contrib/test_decoding/Makefile | 4 +-
.../test_decoding/expected/ddlmessages.out | 42 ++++
contrib/test_decoding/sql/ddlmessages.sql | 28 +++
contrib/test_decoding/test_decoding.c | 61 +++++-
src/backend/access/rmgrdesc/Makefile | 1 +
.../access/rmgrdesc/logicalddlmsgdesc.c | 54 +++++
src/backend/access/transam/rmgr.c | 1 +
src/backend/catalog/pg_publication.c | 52 +++++
src/backend/commands/tablecmds.c | 43 +++-
src/backend/replication/logical/Makefile | 1 +
src/backend/replication/logical/ddlmessage.c | 99 +++++++++
src/backend/replication/logical/decode.c | 56 +++++
src/backend/replication/logical/logical.c | 91 ++++++++
.../replication/logical/reorderbuffer.c | 204 +++++++++++++++++-
src/backend/tcop/utility.c | 201 ++++++++++++++++-
src/bin/pg_waldump/rmgrdesc.c | 1 +
src/include/access/rmgrlist.h | 1 +
src/include/commands/tablecmds.h | 3 +-
src/include/replication/ddlmessage.h | 47 ++++
src/include/replication/decode.h | 1 +
src/include/replication/output_plugin.h | 29 +++
src/include/replication/reorderbuffer.h | 39 ++++
src/test/regress/expected/publication.out | 16 +-
23 files changed, 1054 insertions(+), 21 deletions(-)
create mode 100644 contrib/test_decoding/expected/ddlmessages.out
create mode 100644 contrib/test_decoding/sql/ddlmessages.sql
create mode 100644 src/backend/access/rmgrdesc/logicalddlmsgdesc.c
create mode 100644 src/backend/replication/logical/ddlmessage.c
create mode 100644 src/include/replication/ddlmessage.h
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index b220906479..e58a76d3f1 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,9 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot truncate stream stats twophase twophase_stream
+ spill slot truncate stream stats twophase twophase_stream \
+ ddlmessages
+
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error
diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out
new file mode 100644
index 0000000000..79284f9def
--- /dev/null
+++ b/contrib/test_decoding/expected/ddlmessages.out
@@ -0,0 +1,42 @@
+-- predictability
+SET synchronous_commit = on;
+-- turn on logical ddl message logging
+CREATE publication mypub FOR ALL TABLES with (ddl = 'database');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE TABLE test_ddlmessage (id serial unique, data int);
+ALTER TABLE test_ddlmessage add c3 varchar;
+ALTER TABLE test_ddlmessage drop c3;
+DROP TABLE test_ddlmessage;
+BEGIN;
+CREATE TABLE test_ddlmessage (id serial unique, data int);
+ALTER TABLE test_ddlmessage add c3 varchar;
+ROLLBACK;
+BEGIN;
+CREATE TABLE test_ddlmessage (id serial unique, data int);
+ALTER TABLE test_ddlmessage add c3 varchar;
+COMMIT;
+\o | sed 's/role.*search_path/role: redacted, search_path/g'
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
+DROP TABLE test_ddlmessage;
+DROP publication mypub;
+ data
+--------------------------------------------------------------------------------------------------------------------------------------------------------------
+ DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int);
+ DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
+ DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3;
+ DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage;
+ DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int);
+ DDL message: transactional: 1 prefix: role: redacted, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
+(6 rows)
+
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/ddlmessages.sql b/contrib/test_decoding/sql/ddlmessages.sql
new file mode 100644
index 0000000000..211497ee22
--- /dev/null
+++ b/contrib/test_decoding/sql/ddlmessages.sql
@@ -0,0 +1,28 @@
+-- predictability
+SET synchronous_commit = on;
+-- turn on logical ddl message logging
+CREATE publication mypub FOR ALL TABLES with (ddl = 'database');
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE test_ddlmessage (id serial unique, data int);
+ALTER TABLE test_ddlmessage add c3 varchar;
+ALTER TABLE test_ddlmessage drop c3;
+DROP TABLE test_ddlmessage;
+
+BEGIN;
+CREATE TABLE test_ddlmessage (id serial unique, data int);
+ALTER TABLE test_ddlmessage add c3 varchar;
+ROLLBACK;
+
+BEGIN;
+CREATE TABLE test_ddlmessage (id serial unique, data int);
+ALTER TABLE test_ddlmessage add c3 varchar;
+COMMIT;
+
+\o | sed 's/role.*search_path/role: redacted, search_path/g'
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
+DROP TABLE test_ddlmessage;
+DROP publication mypub;
+
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 3736da6784..a44e1f79e3 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -76,6 +76,11 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_ddlmessage(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ const char *role, const char *search_path,
+ Size sz, const char *message);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
@@ -116,6 +121,11 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ const char *role, const char *search_path,
+ Size sz, const char *message);
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
@@ -141,6 +151,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->ddlmessage_cb = pg_decode_ddlmessage;
cb->filter_prepare_cb = pg_decode_filter_prepare;
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
cb->prepare_cb = pg_decode_prepare_txn;
@@ -153,6 +164,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_message_cb = pg_decode_stream_message;
+ cb->stream_ddlmessage_cb = pg_decode_stream_ddlmessage;
cb->stream_truncate_cb = pg_decode_stream_truncate;
}
@@ -747,7 +759,8 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
static void
pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
- const char *prefix, Size sz, const char *message)
+ const char *prefix, Size sz,
+ const char *message)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
@@ -756,6 +769,19 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_ddlmessage(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+ const char *prefix, const char *role, const char *search_path,
+ Size sz, const char *message)
+{
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfo(ctx->out, "DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:",
+ transactional, prefix, role, search_path, sz);
+ appendBinaryStringInfo(ctx->out, message, sz);
+ OutputPluginWrite(ctx, true);
+}
+
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
@@ -936,7 +962,8 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
static void
pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
- const char *prefix, Size sz, const char *message)
+ const char *prefix, Size sz,
+ const char *message)
{
OutputPluginPrepareWrite(ctx, true);
@@ -948,7 +975,35 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
else
{
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
- transactional, prefix, sz);
+ transactional, prefix, sz);
+ appendBinaryStringInfo(ctx->out, message, sz);
+ }
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * In streaming mode, we don't display the contents for transactional messages
+ * as the transaction can abort at a later point in time. We don't want users to
+ * see the message contents until the transaction is committed.
+ */
+static void
+pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+ const char *prefix, const char * role, const char * search_path,
+ Size sz, const char *message)
+{
+ OutputPluginPrepareWrite(ctx, true);
+
+ if (transactional)
+ {
+ appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu",
+ transactional, prefix, role, search_path, sz);
+ }
+ else
+ {
+ appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:",
+ transactional, prefix, role, search_path, sz);
appendBinaryStringInfo(ctx->out, message, sz);
}
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd86..b8e29e8df3 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -19,6 +19,7 @@ OBJS = \
hashdesc.o \
heapdesc.o \
logicalmsgdesc.o \
+ logicalddlmsgdesc.o \
mxactdesc.o \
nbtdesc.o \
relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
new file mode 100644
index 0000000000..7a352d540a
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
@@ -0,0 +1,54 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalddlmsgdesc.c
+ * rmgr descriptor routines for replication/logical/ddlmessage.c
+ *
+ * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/rmgrdesc/logicalddlmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/ddlmessage.h"
+
+void
+logicalddlmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info == XLOG_LOGICAL_DDL_MESSAGE)
+ {
+ xl_logical_ddl_message *xlrec = (xl_logical_ddl_message *) rec;
+ char *prefix = xlrec->message;
+ char *role = xlrec->message + xlrec->prefix_size;
+ char *search_path = xlrec->message + xlrec->prefix_size + xlrec->role_size;
+ char *message = xlrec->message + xlrec->prefix_size + xlrec->role_size + xlrec->search_path_size;
+ char *sep = "";
+
+ Assert(prefix[xlrec->prefix_size] != '\0');
+
+ appendStringInfo(buf, "%s, prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ",
+ xlrec->transactional ? "transactional" : "non-transactional",
+ prefix, role, search_path, xlrec->message_size);
+ /* Write message payload as a series of hex bytes */
+ for (int cnt = 0; cnt < xlrec->message_size; cnt++)
+ {
+ appendStringInfo(buf, "%s%02X", sep, (unsigned char) message[cnt]);
+ sep = " ";
+ }
+ }
+}
+
+const char *
+logicalddlmsg_identify(uint8 info)
+{
+ if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE)
+ return "DDL MESSAGE";
+
+ return NULL;
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 8ed69244e3..6db9a593b8 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -29,6 +29,7 @@
#include "miscadmin.h"
#include "replication/decode.h"
#include "replication/message.h"
+#include "replication/ddlmessage.h"
#include "replication/origin.h"
#include "storage/standby.h"
#include "utils/builtins.h"
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index b8ab1d8141..fad21a31d0 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1206,3 +1206,55 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+
+/*
+ * Checks if DDL on relation (relid) need xlog for logical replication
+ */
+bool
+ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel)
+{
+ List *allTablePubs = NIL;
+ List *tablePubs = NIL;
+ ListCell *lc;
+
+ /* Only replicate toplevel DDL command */
+ if (!isTopLevel)
+ return false;
+ if (relid == InvalidOid && !forAllTabPubOnly)
+ return false;
+
+ /*
+ * Log the DDL command if
+ * there is any FOR ALL TABLES publication with pubddl_database on
+ * or
+ * this TABLE belongs to any non FOR ALL publications with pubddl_table on
+ */
+ allTablePubs = GetAllTablesPublications();
+ foreach(lc, allTablePubs)
+ {
+ Oid pubid = lfirst_oid(lc);
+ Publication *pub = GetPublication(pubid);
+
+ if (pub->pubactions.pubddl_database)
+ return true;
+ }
+
+ /*
+ * If forAllTabPubOnly is true (i.e. database level replication is required for the DDL
+ * to be logged), we can bail now since no publication has been found with pubddl_database on
+ */
+ if (forAllTabPubOnly)
+ return false;
+
+ tablePubs = GetRelationPublications(relid);
+ foreach(lc, tablePubs)
+ {
+ Oid pubid = lfirst_oid(lc);
+ Publication *pub = GetPublication(pubid);
+
+ if (pub->pubactions.pubddl_table)
+ return true;
+ }
+
+ return false;
+}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 2de0ebacec..6d1487951f 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -80,6 +80,7 @@
#include "partitioning/partbounds.h"
#include "partitioning/partdesc.h"
#include "pgstat.h"
+#include "replication/ddlmessage.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteHandler.h"
#include "rewrite/rewriteManip.h"
@@ -1332,13 +1333,14 @@ DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind)
* DROP MATERIALIZED VIEW, DROP FOREIGN TABLE
*/
void
-RemoveRelations(DropStmt *drop)
+RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel)
{
ObjectAddresses *objects;
char relkind;
ListCell *cell;
int flags = 0;
LOCKMODE lockmode = AccessExclusiveLock;
+ bool ddlxlog = XLogLogicalInfoActive();
/* DROP CONCURRENTLY uses a weaker lock, and has some restrictions */
if (drop->concurrent)
@@ -1437,10 +1439,37 @@ RemoveRelations(DropStmt *drop)
/* Not there? */
if (!OidIsValid(relOid))
{
+ ddlxlog = false;
DropErrorMsgNonExistent(rel, relkind, drop->missing_ok);
continue;
}
+ /*
+ * Only log DROP RELATION cmd for logical replication if
+ * there is any FOR ALL TABLES publication with pubddl_database on or
+ * every relation to be dropped belongs to any non FOR ALL publications with pubddl_table on
+ */
+ if (ddlxlog)
+ {
+ Oid tableOid = InvalidOid;
+
+ if (relkind == RELKIND_RELATION)
+ tableOid = relOid;
+ else if (relkind == RELKIND_INDEX)
+ tableOid = IndexGetRelation(relOid, true);
+ /*
+ * Other relation types require database level ddl replication and are
+ * already logged in LogLogicalDDLCommand() if needed.
+ */
+ else
+ ddlxlog = false;
+
+ /* DROP RELATION or INDEX are allowed in table level DDL replication */
+ if (tableOid != InvalidOid &&
+ !ddl_need_xlog(tableOid, false, isTopLevel))
+ ddlxlog = false;
+ }
+
/*
* Decide if concurrent mode needs to be used here or not. The
* callback retrieved the rel's persistence for us.
@@ -1484,6 +1513,18 @@ RemoveRelations(DropStmt *drop)
add_exact_object_address(&obj, objects);
}
+ /* Log the Drop command for logical replication */
+ if (ddlxlog)
+ {
+ bool transactional = true;
+ const char* prefix = "";
+ LogLogicalDDLMessage(prefix,
+ GetUserId(),
+ pstate->p_sourcetext,
+ strlen(pstate->p_sourcetext),
+ transactional);
+ }
+
performMultipleDeletions(objects, drop->behavior, flags);
free_object_addresses(objects);
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb71..f3eeb67312 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -16,6 +16,7 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = \
decode.o \
+ ddlmessage.o\
launcher.o \
logical.o \
logicalfuncs.o \
diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c
new file mode 100644
index 0000000000..f93573079a
--- /dev/null
+++ b/src/backend/replication/logical/ddlmessage.c
@@ -0,0 +1,99 @@
+/*-------------------------------------------------------------------------
+ *
+ * ddlmessage.c
+ * Logical DDL messages.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/ddlmessage.c
+ *
+ * NOTES
+ *
+ * Logical DDL messages allow XLOG logging of DDL command strings that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * Simiarl to the generic logical messages, These DDL messages can be either
+ * transactional or non-transactional. Note by default DDLs in PostgreSQL are
+ * transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG. This also means that transactional
+ * messages won't be delivered if the transaction was rolled back but the
+ * non-transactional one will always be delivered.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/namespace.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/ddlmessage.h"
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding DDL message into XLog.
+ */
+XLogRecPtr
+LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message,
+ size_t size, bool transactional)
+{
+ xl_logical_ddl_message xlrec;
+ const char *role;
+
+ role = GetUserNameFromId(roleoid, false);
+
+ /*
+ * Force xid to be allocated if we're emitting a transactional message.
+ */
+ if (transactional)
+ {
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+ }
+
+ xlrec.dbId = MyDatabaseId;
+ xlrec.transactional = transactional;
+ /* trailing zero is critical; see logicalddlmsg_desc */
+ xlrec.prefix_size = strlen(prefix) + 1;
+ xlrec.role_size = strlen(role) + 1;
+ xlrec.search_path_size = strlen(namespace_search_path) + 1;
+ xlrec.message_size = size;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalDDLMessage);
+ XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+ XLogRegisterData(unconstify(char *, role), xlrec.role_size);
+ XLogRegisterData(namespace_search_path, xlrec.search_path_size);
+ XLogRegisterData(unconstify(char *, message), size);
+
+ /* allow origin filtering */
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ return XLogInsert(RM_LOGICALDDLMSG_ID, XLOG_LOGICAL_DDL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding ddl messages.
+ */
+void
+logicalddlmsg_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info != XLOG_LOGICAL_DDL_MESSAGE)
+ elog(PANIC, "logicalddlmsg_redo: unknown op code %u", info);
+
+ /* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index aa2427ba73..034c7f2413 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -36,6 +36,7 @@
#include "access/xlogutils.h"
#include "catalog/pg_control.h"
#include "replication/decode.h"
+#include "replication/ddlmessage.h"
#include "replication/logical.h"
#include "replication/message.h"
#include "replication/origin.h"
@@ -603,6 +604,61 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message->message + message->prefix_size);
}
+/*
+ * Handle rmgr LOGICALDDLMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+void
+logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogReaderState *r = buf->record;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
+ Snapshot snapshot;
+ xl_logical_ddl_message *message;
+
+ if (info != XLOG_LOGICAL_DDL_MESSAGE)
+ elog(ERROR, "unexpected RM_LOGICALDDLMSG_ID record type: %u", info);
+
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding ddl messages.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ message = (xl_logical_ddl_message *) XLogRecGetData(r);
+
+ if (message->dbId != ctx->slot->data.database ||
+ FilterByOrigin(ctx, origin_id))
+ return;
+
+ if (message->transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+ else if (!message->transactional &&
+ (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+ SnapBuildXactNeedsSkip(builder, buf->origptr)))
+ return;
+
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ ReorderBufferQueueDDLMessage(ctx->reorder, xid, snapshot, buf->endptr,
+ message->transactional,
+ message->message,
+ /* first part of message is prefix */
+ message->message + message->prefix_size,
+ /* Second part of message is role*/
+ message->message + message->prefix_size + message->role_size,
+ /* Third part of message is search_path */
+ message->message_size,
+ message->message + message->prefix_size +
+ message->role_size + message->search_path_size);
+}
+
/*
* Consolidated commit record handling between the different form of commit
* records.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 625a7f4273..3004f02433 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, const char *role, const char *search_path,
+ Size message_size, const char *message);
/* streaming callbacks */
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -90,6 +94,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, const char *role, const char *search_path,
+ Size message_size, const char *message);
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
@@ -218,6 +226,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ ctx->reorder->ddlmessage = ddlmessage_cb_wrapper;
/*
* To support streaming, we require start/stop/abort/commit/change
@@ -234,6 +243,7 @@ StartupDecodingContext(List *output_plugin_options,
(ctx->callbacks.stream_commit_cb != NULL) ||
(ctx->callbacks.stream_change_cb != NULL) ||
(ctx->callbacks.stream_message_cb != NULL) ||
+ (ctx->callbacks.stream_ddlmessage_cb != NULL) ||
(ctx->callbacks.stream_truncate_cb != NULL);
/*
@@ -251,6 +261,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->stream_commit = stream_commit_cb_wrapper;
ctx->reorder->stream_change = stream_change_cb_wrapper;
ctx->reorder->stream_message = stream_message_cb_wrapper;
+ ctx->reorder->stream_ddlmessage = stream_ddlmessage_cb_wrapper;
ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
@@ -1220,6 +1231,44 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, const char *role,
+ const char *search_path, Size message_size,
+ const char *message)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ if (ctx->callbacks.ddlmessage_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "ddlmessage";
+ state.report_location = message_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = message_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix,
+ role, search_path, message_size, message);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn)
@@ -1535,6 +1584,48 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, const char *role,
+ const char* search_path, Size message_size,
+ const char *message)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (ctx->callbacks.stream_ddlmessage_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_ddlmessage";
+ state.report_location = message_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = message_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix,
+ role, search_path, message_size, message);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 8da5f9089c..ca01336604 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -512,6 +512,20 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
pfree(change->data.msg.message);
change->data.msg.message = NULL;
break;
+ case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+ if (change->data.ddlmsg.prefix != NULL)
+ pfree(change->data.ddlmsg.prefix);
+ change->data.ddlmsg.prefix = NULL;
+ if (change->data.ddlmsg.role != NULL)
+ pfree(change->data.ddlmsg.role);
+ change->data.ddlmsg.role = NULL;
+ if (change->data.ddlmsg.search_path != NULL)
+ pfree(change->data.ddlmsg.search_path);
+ change->data.ddlmsg.search_path = NULL;
+ if (change->data.ddlmsg.message != NULL)
+ pfree(change->data.ddlmsg.message);
+ change->data.ddlmsg.message = NULL;
+ break;
case REORDER_BUFFER_CHANGE_INVALIDATION:
if (change->data.inval.invalidations)
pfree(change->data.inval.invalidations);
@@ -866,6 +880,64 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
}
+/*
+ * A transactional DDL message is queued to be processed upon commit and a
+ * non-transactional DDL message gets processed immediately.
+ */
+void
+ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn,
+ bool transactional, const char *prefix,
+ const char *role, const char *search_path,
+ Size message_size, const char *message)
+{
+ if (transactional)
+ {
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
+
+ Assert(xid != InvalidTransactionId);
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ change = ReorderBufferGetChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE;
+ change->data.ddlmsg.prefix = pstrdup(prefix);
+ change->data.ddlmsg.role = pstrdup(role);
+ change->data.ddlmsg.search_path = pstrdup(search_path);
+ change->data.ddlmsg.message_size = message_size;
+ change->data.ddlmsg.message = palloc(message_size);
+ memcpy(change->data.ddlmsg.message, message, message_size);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ ReorderBufferTXN *txn = NULL;
+ volatile Snapshot snapshot_now = snapshot;
+
+ if (xid != InvalidTransactionId)
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ /* setup snapshot to allow catalog access */
+ SetupHistoricSnapshot(snapshot_now, NULL);
+ PG_TRY();
+ {
+ rb->ddlmessage(rb, txn, lsn, false, prefix, role, search_path, message_size, message);
+
+ TeardownHistoricSnapshot(false);
+ }
+ PG_CATCH();
+ {
+ TeardownHistoricSnapshot(true);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+}
+
/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
@@ -1957,6 +2029,29 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
change->data.msg.message);
}
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the DDL message.
+ */
+static inline void
+ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferChange *change, bool streaming)
+{
+ if (streaming)
+ rb->stream_ddlmessage(rb, txn, change->lsn, true,
+ change->data.ddlmsg.prefix,
+ change->data.ddlmsg.role,
+ change->data.ddlmsg.search_path,
+ change->data.ddlmsg.message_size,
+ change->data.ddlmsg.message);
+ else
+ rb->ddlmessage(rb, txn, change->lsn, true,
+ change->data.ddlmsg.prefix,
+ change->data.ddlmsg.role,
+ change->data.ddlmsg.search_path,
+ change->data.ddlmsg.message_size,
+ change->data.ddlmsg.message);
+}
+
/*
* Function to store the command id and snapshot at the end of the current
* stream so that we can reuse the same while sending the next stream.
@@ -2335,6 +2430,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferApplyMessage(rb, txn, change, streaming);
break;
+ case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+ ReorderBufferApplyDDLMessage(rb, txn, change, streaming);
+ break;
+
case REORDER_BUFFER_CHANGE_INVALIDATION:
/* Execute the invalidation messages locally */
ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
@@ -3708,6 +3807,53 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
change->data.msg.message_size);
data += change->data.msg.message_size;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+ {
+ char *data;
+ Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
+ Size role_size = strlen(change->data.ddlmsg.role) + 1;
+ Size search_path_size = strlen(change->data.ddlmsg.search_path) + 1;
+
+ sz += prefix_size + role_size + search_path_size +
+ change->data.ddlmsg.message_size +
+ sizeof(Size) + sizeof(Size) + sizeof(Size) + sizeof(Size);
+ ReorderBufferSerializeReserve(rb, sz);
+
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+ /* might have been reallocated above */
+ ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+ /* write the prefix including the size */
+ memcpy(data, &prefix_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.ddlmsg.prefix,
+ prefix_size);
+ data += prefix_size;
+
+ /* write the role including the size */
+ memcpy(data, &role_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.ddlmsg.role,
+ role_size);
+ data += role_size;
+
+ /* write the search_path including the size */
+ memcpy(data, &search_path_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.ddlmsg.search_path,
+ search_path_size);
+ data += search_path_size;
+
+ /* write the message including the size */
+ memcpy(data, &change->data.ddlmsg.message_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.ddlmsg.message,
+ change->data.ddlmsg.message_size);
+ data += change->data.ddlmsg.message_size;
+
break;
}
case REORDER_BUFFER_CHANGE_INVALIDATION:
@@ -4022,6 +4168,18 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
sz += prefix_size + change->data.msg.message_size +
sizeof(Size) + sizeof(Size);
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+ {
+ Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
+ Size role_size = strlen(change->data.ddlmsg.role) + 1;
+ Size search_path_size = strlen(change->data.ddlmsg.search_path) + 1;
+
+ sz += prefix_size + role_size + search_path_size +
+ change->data.ddlmsg.message_size +
+ sizeof(Size) + sizeof(Size) + sizeof(Size) + sizeof(Size);
+
break;
}
case REORDER_BUFFER_CHANGE_INVALIDATION:
@@ -4282,8 +4440,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
/* read prefix */
memcpy(&prefix_size, data, sizeof(Size));
data += sizeof(Size);
- change->data.msg.prefix = MemoryContextAlloc(rb->context,
- prefix_size);
+ change->data.msg.prefix = MemoryContextAlloc(rb->context, prefix_size);
memcpy(change->data.msg.prefix, data, prefix_size);
Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
data += prefix_size;
@@ -4297,6 +4454,49 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
change->data.msg.message_size);
data += change->data.msg.message_size;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+ {
+ Size prefix_size;
+ Size role_size;
+ Size search_path_size;
+
+ /* read prefix */
+ memcpy(&prefix_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.ddlmsg.prefix = MemoryContextAlloc(rb->context, prefix_size);
+ memcpy(change->data.ddlmsg.prefix, data, prefix_size);
+ Assert(change->data.ddlmsg.prefix[prefix_size - 1] == '\0');
+ data += prefix_size;
+
+ /* read role */
+ memcpy(&role_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.ddlmsg.role = MemoryContextAlloc(rb->context,
+ role_size);
+ memcpy(change->data.ddlmsg.role, data, role_size);
+ Assert(change->data.ddlmsg.role[role_size - 1] == '\0');
+ data += role_size;
+
+ /* read search_path */
+ memcpy(&search_path_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.ddlmsg.search_path = MemoryContextAlloc(rb->context,
+ search_path_size);
+ memcpy(change->data.ddlmsg.search_path, data, search_path_size);
+ Assert(change->data.ddlmsg.search_path[search_path_size - 1] == '\0');
+ data += search_path_size;
+
+ /* read the message */
+ memcpy(&change->data.msg.message_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.msg.message = MemoryContextAlloc(rb->context,
+ change->data.msg.message_size);
+ memcpy(change->data.msg.message, data,
+ change->data.msg.message_size);
+ data += change->data.msg.message_size;
+
break;
}
case REORDER_BUFFER_CHANGE_INVALIDATION:
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 6a5bcded55..5fe54f742a 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -62,6 +62,7 @@
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
+#include "replication/ddlmessage.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteRemove.h"
#include "storage/fd.h"
@@ -86,7 +87,7 @@ static void ProcessUtilitySlow(ParseState *pstate,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc);
-static void ExecDropStmt(DropStmt *stmt, bool isTopLevel);
+static void ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel);
/*
* CommandIsReadOnly: is an executable query read-only?
@@ -987,7 +988,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
context, params, queryEnv,
dest, qc);
else
- ExecDropStmt(stmt, isTopLevel);
+ ExecDropStmt(pstate, stmt, isTopLevel);
}
break;
@@ -1087,6 +1088,154 @@ standard_ProcessUtility(PlannedStmt *pstmt,
CommandCounterIncrement();
}
+/*
+ * Log a DDL command for logical replication
+ * Some DDLs are only replicated in Database Level DDL replication
+ * Some can be replicated in Table Level DDL replication.
+ *
+ * Currently we focus on supporting Database Level DDL replication
+ */
+static void
+LogLogicalDDLCommand(Node *parsetree, const char *queryString)
+{
+ switch (nodeTag(parsetree))
+ {
+ /* Fisrtly, commands that are only supported in Database level DDL replication */
+ case T_CreateSchemaStmt:
+ case T_CreateStmt:
+ case T_CreateForeignTableStmt:
+ case T_AlterDomainStmt:
+ case T_DefineStmt:
+ case T_CompositeTypeStmt:
+ case T_CreateEnumStmt:
+ case T_CreateRangeStmt:
+ case T_AlterEnumStmt:
+ case T_ViewStmt:
+ case T_CreateFunctionStmt:
+ case T_AlterFunctionStmt:
+ case T_CreateTrigStmt:
+ case T_CreateDomainStmt:
+ case T_CreateCastStmt:
+ case T_CreateOpClassStmt:
+ case T_CreateOpFamilyStmt:
+ case T_AlterOpFamilyStmt:
+ case T_AlterOperatorStmt:
+ case T_AlterTypeStmt:
+ case T_GrantStmt:
+ case T_AlterCollationStmt:
+ /*
+ * Log these stmt for logical replication if
+ * there is any FOR ALL TABLES publication with pubddl_database on.
+ * i.e. Database level DDL replication is on for some publication.
+ */
+ if (ddl_need_xlog(InvalidOid, true, true))
+ {
+ bool transactional = true;
+ const char* prefix = "";
+ LogLogicalDDLMessage(prefix,
+ GetUserId(),
+ queryString,
+ strlen(queryString),
+ transactional);
+ }
+ break;
+
+ /*
+ * Secondly, commands that may be allowed in Table level DDL replication.
+ * These are currently handled in the later execution path of the command.
+ * Because we need to get the relation id which readily available in later
+ * code path.
+ */
+ case T_AlterTableStmt:
+ case T_IndexStmt:
+ case T_RenameStmt: /* TODO */
+ case T_AlterOwnerStmt: /* TODO */
+ break;
+
+ /* DropStmt depends on the removeType */
+ case T_DropStmt:
+ {
+ DropStmt* stmt = (DropStmt*) parsetree;
+ switch (stmt->removeType)
+ {
+ /* Maybe allowed in Table level DDL replication, handled in later code path */
+ case OBJECT_INDEX:
+ case OBJECT_TABLE:
+ break;
+ /* Drop of sequence is by logical replication of sequences separately */
+ case OBJECT_SEQUENCE:
+ break;
+ /* Drop of other objects are allowed in Database level DDL replication only */
+ case OBJECT_VIEW:
+ case OBJECT_MATVIEW:
+ case OBJECT_FOREIGN_TABLE:
+ default:
+ /*
+ * Log these DropStmt for logical replication if
+ * there is any FOR ALL TABLES publication with pubddl_database on.
+ * i.e. Database level DDL replication is on for some publication.
+ */
+ if (ddl_need_xlog(InvalidOid, true, true))
+ {
+ bool transactional = true;
+ const char* prefix = "";
+ LogLogicalDDLMessage(prefix,
+ GetUserId(),
+ queryString,
+ strlen(queryString),
+ transactional);
+ }
+ break;
+ }
+ }
+ /*
+ * Lastly, rule out DDLs we don't replicate yet in DDL replication
+ * Some of these can be supported, we just need to investigate and run tests.
+ */
+ case T_CreateExtensionStmt:
+ case T_AlterExtensionStmt:
+ case T_AlterExtensionContentsStmt:
+ case T_CreateFdwStmt:
+ case T_AlterFdwStmt:
+ case T_CreateForeignServerStmt:
+ case T_AlterForeignServerStmt:
+ case T_CreateUserMappingStmt:
+ case T_AlterUserMappingStmt:
+ case T_DropUserMappingStmt:
+ case T_ImportForeignSchemaStmt:
+ case T_RuleStmt:
+ case T_CreateSeqStmt:
+ case T_AlterSeqStmt:
+ case T_CreateTableAsStmt:
+ case T_RefreshMatViewStmt:
+ case T_CreatePLangStmt:
+ case T_CreateConversionStmt:
+ case T_CreateTransformStmt:
+ case T_AlterTSDictionaryStmt:
+ case T_AlterTSConfigurationStmt:
+ case T_AlterTableMoveAllStmt:
+ case T_AlterObjectDependsStmt:
+ case T_AlterObjectSchemaStmt:
+ case T_CommentStmt:
+ case T_DropOwnedStmt:
+ case T_AlterDefaultPrivilegesStmt:
+ case T_CreatePolicyStmt:
+ case T_AlterPolicyStmt:
+ case T_SecLabelStmt:
+ case T_CreateAmStmt:
+ case T_CreatePublicationStmt:
+ case T_AlterPublicationStmt:
+ case T_CreateSubscriptionStmt:
+ case T_AlterSubscriptionStmt:
+ case T_DropSubscriptionStmt:
+ case T_CreateStatsStmt:
+ case T_AlterStatsStmt:
+ break;
+ default:
+ break;
+ }
+}
+
/*
* The "Slow" variant of ProcessUtility should only receive statements
* supported by the event triggers facility. Therefore, we always
@@ -1119,6 +1268,13 @@ ProcessUtilitySlow(ParseState *pstate,
if (isCompleteQuery)
EventTriggerDDLCommandStart(parsetree);
+ /*
+ * Consider logging the DDL command if logical logging is enabled and this is
+ * a top level query.
+ */
+ if (XLogLogicalInfoActive() && isTopLevel)
+ LogLogicalDDLCommand(parsetree, queryString);
+
switch (nodeTag(parsetree))
{
/*
@@ -1321,6 +1477,23 @@ ProcessUtilitySlow(ParseState *pstate,
EventTriggerAlterTableStart(parsetree);
EventTriggerAlterTableRelid(relid);
+ /*
+ * Log the ALTER TABLE command if
+ * There is any publication with database level ddl on or
+ * this TABLE belongs to any publication with table level ddl on
+ */
+ if (XLogLogicalInfoActive() &&
+ ddl_need_xlog(relid, false, isTopLevel))
+ {
+ bool transactional = true;
+ const char* prefix = "";
+ LogLogicalDDLMessage(prefix,
+ GetUserId(),
+ queryString,
+ strlen(queryString),
+ transactional);
+ }
+
/* ... and do it */
AlterTable(atstmt, lockmode, &atcontext);
@@ -1539,6 +1712,24 @@ ProcessUtilitySlow(ParseState *pstate,
/* ... and do it */
EventTriggerAlterTableStart(parsetree);
+
+ /*
+ * Log CREATE INDEX cmd for logical replication if
+ * there is any publication with database level ddl on or
+ * this TABLE belongs to any publication with table level ddl on.
+ */
+ if (XLogLogicalInfoActive() &&
+ ddl_need_xlog(relid, false, isTopLevel))
+ {
+ bool transactional = true;
+ const char* prefix = "";
+ LogLogicalDDLMessage(prefix,
+ GetUserId(),
+ queryString,
+ strlen(queryString),
+ transactional);
+ }
+
address =
DefineIndex(relid, /* OID of heap relation */
stmt,
@@ -1761,7 +1952,7 @@ ProcessUtilitySlow(ParseState *pstate,
break;
case T_DropStmt:
- ExecDropStmt((DropStmt *) parsetree, isTopLevel);
+ ExecDropStmt(pstate, (DropStmt *) parsetree, isTopLevel);
/* no commands stashed for DROP */
commandCollected = true;
break;
@@ -1982,7 +2173,7 @@ ProcessUtilityForAlterTable(Node *stmt, AlterTableUtilityContext *context)
* Dispatch function for DropStmt
*/
static void
-ExecDropStmt(DropStmt *stmt, bool isTopLevel)
+ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel)
{
switch (stmt->removeType)
{
@@ -1997,7 +2188,7 @@ ExecDropStmt(DropStmt *stmt, bool isTopLevel)
case OBJECT_VIEW:
case OBJECT_MATVIEW:
case OBJECT_FOREIGN_TABLE:
- RemoveRelations(stmt);
+ RemoveRelations(pstate, stmt, isTopLevel);
break;
default:
RemoveObjects(stmt);
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6b8c17bb4c..792f438959 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -27,6 +27,7 @@
#include "commands/sequence.h"
#include "commands/tablespace.h"
#include "replication/message.h"
+#include "replication/ddlmessage.h"
#include "replication/origin.h"
#include "rmgrdesc.h"
#include "storage/standbydefs.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 9a74721c97..9de3b8f2eb 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_LOGICALDDLMSG_ID, "LogicalDDLMessage", logicalddlmsg_redo, logicalddlmsg_desc, logicalddlmsg_identify, NULL, NULL, NULL, logicalddlmsg_decode)
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 5d4037f26e..68781365de 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -18,6 +18,7 @@
#include "catalog/dependency.h"
#include "catalog/objectaddress.h"
#include "nodes/parsenodes.h"
+#include "parser/parse_node.h"
#include "storage/lock.h"
#include "utils/relcache.h"
@@ -27,7 +28,7 @@ struct AlterTableUtilityContext; /* avoid including tcop/utility.h here */
extern ObjectAddress DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
ObjectAddress *typaddress, const char *queryString);
-extern void RemoveRelations(DropStmt *drop);
+extern void RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel);
extern Oid AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode);
diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h
new file mode 100644
index 0000000000..1e8ef22296
--- /dev/null
+++ b/src/include/replication/ddlmessage.h
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ * ddlmessage.h
+ * Exports from replication/logical/ddlmessage.c
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/ddlmessage.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_DDL_MESSAGE_H
+#define PG_LOGICAL_DDL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding DDL message wal record.
+ */
+typedef struct xl_logical_ddl_message
+{
+ Oid dbId; /* database Oid emitted from */
+ bool transactional; /* is message transactional? */
+ Size prefix_size; /* length of prefix */
+ Size role_size; /* length of the role that executes the DDL command */
+ Size search_path_size; /* length of the search path */
+ Size message_size; /* size of the message */
+ /*
+ * payload, including null-terminated prefix of length prefix_size
+ * and null-terminated role of length role_size
+ * and null-terminated search_path of length search_path_size
+ */
+ char message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_ddl_message;
+
+#define SizeOfLogicalDDLMessage (offsetof(xl_logical_ddl_message, message))
+
+extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *ddl_message,
+ size_t size, bool transactional);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_DDL_MESSAGE 0x00
+void logicalddlmsg_redo(XLogReaderState *record);
+void logicalddlmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalddlmsg_identify(uint8 info);
+
+#endif /* PG_LOGICAL_DDL_MESSAGE_H */
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 741bf65cf7..427a7b997d 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *record);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 539dc8e697..5b1c245b72 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -88,6 +88,19 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
Size message_size,
const char *message);
+/*
+ * Called for the logical decoding DDL messages.
+ */
+typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ const char *role,
+ const char *search_path,
+ Size message_size,
+ const char *message);
+
/*
* Filter changes by origin.
*/
@@ -199,6 +212,20 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
Size message_size,
const char *message);
+/*
+ * Callback for streaming logical decoding DDL messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ const char *role,
+ const char *search_path,
+ Size message_size,
+ const char *message);
+
/*
* Callback for streaming truncates from in-progress transactions.
*/
@@ -219,6 +246,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
+ LogicalDecodeDDLMessageCB ddlmessage_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
@@ -237,6 +265,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamDDLMessageCB stream_ddlmessage_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4a01f877e5..dd89e08efc 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -56,6 +56,7 @@ typedef enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
REORDER_BUFFER_CHANGE_DELETE,
+ REORDER_BUFFER_CHANGE_DDLMESSAGE,
REORDER_BUFFER_CHANGE_MESSAGE,
REORDER_BUFFER_CHANGE_INVALIDATION,
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
@@ -130,6 +131,16 @@ typedef struct ReorderBufferChange
char *message;
} msg;
+ /* DDL Message. */
+ struct
+ {
+ char *prefix;
+ char *role;
+ char *search_path;
+ Size message_size;
+ char *message;
+ } ddlmsg;
+
/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
Snapshot snapshot;
@@ -430,6 +441,17 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* DDL message callback signature */
+typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ const char *role,
+ const char *search_path,
+ Size sz,
+ const char *message);
+
/* begin prepare callback signature */
typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn);
@@ -496,6 +518,18 @@ typedef void (*ReorderBufferStreamMessageCB) (
const char *prefix, Size sz,
const char *message);
+/* stream DDL message callback signature */
+typedef void (*ReorderBufferStreamDDLMessageCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ const char *role,
+ const char *search_path,
+ Size sz,
+ const char *message);
+
/* stream truncate callback signature */
typedef void (*ReorderBufferStreamTruncateCB) (
ReorderBuffer *rb,
@@ -541,6 +575,7 @@ struct ReorderBuffer
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;
+ ReorderBufferDDLMessageCB ddlmessage;
/*
* Callbacks to be called when streaming a transaction at prepare time.
@@ -560,6 +595,7 @@ struct ReorderBuffer
ReorderBufferStreamCommitCB stream_commit;
ReorderBufferStreamChangeCB stream_change;
ReorderBufferStreamMessageCB stream_message;
+ ReorderBufferStreamDDLMessageCB stream_ddlmessage;
ReorderBufferStreamTruncateCB stream_truncate;
/*
@@ -635,6 +671,9 @@ extern void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
extern void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);
+extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
+ bool transactional, const char *prefix, const char *role,
+ const char *search_path, Size message_size, const char *message);
extern void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index e9cea11a5b..5b4bee74cb 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -676,10 +676,10 @@ CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate');
RESET client_min_messages;
ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok
\dRp+ testpub_table_ins
- Publication testpub_table_ins
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | f | f | t | f
+ Publication testpub_table_ins
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | f | f | t | f | f | f
Tables:
"public.testpub_tbl5" (a)
@@ -821,10 +821,10 @@ CREATE TABLE testpub_tbl_both_filters (a int, b int, c int, PRIMARY KEY (a,c));
ALTER TABLE testpub_tbl_both_filters REPLICA IDENTITY USING INDEX testpub_tbl_both_filters_pkey;
ALTER PUBLICATION testpub_both_filters ADD TABLE testpub_tbl_both_filters (a,c) WHERE (c != 1);
\dRp+ testpub_both_filters
- Publication testpub_both_filters
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_both_filters
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"public.testpub_tbl_both_filters" (a, c) WHERE (c <> 1)
--
2.32.0
[application/octet-stream] 0001-Define-DDL-replication-levels-via-the-CREATE-PUBLICA.patch (65.0K, 5-0001-Define-DDL-replication-levels-via-the-CREATE-PUBLICA.patch)
download | inline diff:
From 0188dbbe250a4e809edf556fad34dd37ca46a4eb Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Fri, 18 Mar 2022 16:47:16 +0000
Subject: [PATCH 01/12] Define DDL replication levels via the CREATE
PUBLICATION command, currently allow Database or Table level DDL replication.
Allow the user to configure either database level or table level
DDL replication via the CREATE PUBLICATION command as in my first
email. Two new columns are added to the pg_publication catalog to
show the DDL replication levels, test output publication.out is
updated accordingly. pg_dump is also modified to accommodate the
pg_publication catalog change.
---
src/backend/catalog/pg_publication.c | 2 +
src/backend/commands/publicationcmds.c | 89 +++++-
src/backend/utils/cache/relcache.c | 2 +
src/bin/pg_dump/pg_dump.c | 45 ++-
src/bin/pg_dump/pg_dump.h | 2 +
src/bin/pg_dump/t/002_pg_dump.pl | 10 +-
src/bin/psql/describe.c | 30 +-
src/include/catalog/pg_publication.h | 10 +
src/include/commands/defrem.h | 1 +
src/test/regress/expected/publication.out | 372 +++++++++++-----------
10 files changed, 364 insertions(+), 199 deletions(-)
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 8c7fca62de..b8ab1d8141 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1006,6 +1006,8 @@ GetPublication(Oid pubid)
pub->pubactions.pubdelete = pubform->pubdelete;
pub->pubactions.pubtruncate = pubform->pubtruncate;
pub->pubviaroot = pubform->pubviaroot;
+ pub->pubactions.pubddl_database = pubform->pubddl_database;
+ pub->pubactions.pubddl_table = pubform->pubddl_table;
ReleaseSysCache(tup);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 8e645741e4..edceaa0c4e 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -80,15 +80,18 @@ static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
static void
parse_publication_options(ParseState *pstate,
List *options,
+ bool for_all_tables,
bool *publish_given,
PublicationActions *pubactions,
bool *publish_via_partition_root_given,
- bool *publish_via_partition_root)
+ bool *publish_via_partition_root,
+ bool *ddl_level_given)
{
ListCell *lc;
*publish_given = false;
*publish_via_partition_root_given = false;
+ *ddl_level_given = false;
/* defaults */
pubactions->pubinsert = true;
@@ -96,6 +99,16 @@ parse_publication_options(ParseState *pstate,
pubactions->pubdelete = true;
pubactions->pubtruncate = true;
*publish_via_partition_root = false;
+ if (for_all_tables)
+ {
+ pubactions->pubddl_database = true;
+ pubactions->pubddl_table = true;
+ }
+ else
+ {
+ pubactions->pubddl_database = false;
+ pubactions->pubddl_table = false;
+ }
/* Parse options */
foreach(lc, options)
@@ -154,6 +167,55 @@ parse_publication_options(ParseState *pstate,
*publish_via_partition_root_given = true;
*publish_via_partition_root = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "ddl") == 0)
+ {
+ char *ddl_level;
+ List *ddl_list;
+ ListCell *lc;
+
+ if (*ddl_level_given)
+ errorConflictingDefElem(defel, pstate);
+
+ /*
+ * If publish option was given only the explicitly listed actions
+ * should be published.
+ */
+ pubactions->pubddl_database = false;
+ pubactions->pubddl_table = false;
+
+ *ddl_level_given = true;
+ ddl_level = defGetString(defel);
+
+ if (!SplitIdentifierString(ddl_level, ',', &ddl_list))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid list syntax for \"ddl\" option")));
+
+ /* Process the option list. */
+ foreach(lc, ddl_list)
+ {
+ char *publish_opt = (char *) lfirst(lc);
+
+ if (strcmp(publish_opt, "database") == 0)
+ {
+ if (!for_all_tables)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("DDL replication on the database level is only supported in FOR ALL TABLES publication")));
+ else
+ {
+ pubactions->pubddl_database = true;
+ pubactions->pubddl_table = true;
+ }
+ }
+ else if (strcmp(publish_opt, "table") == 0)
+ pubactions->pubddl_table = true;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized \"ddl\" value: \"%s\"", publish_opt)));
+ }
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -761,6 +823,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
Datum values[Natts_pg_publication];
HeapTuple tup;
bool publish_given;
+ bool ddl_level_given;
PublicationActions pubactions;
bool publish_via_partition_root_given;
bool publish_via_partition_root;
@@ -803,9 +866,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
parse_publication_options(pstate,
stmt->options,
+ stmt->for_all_tables,
&publish_given, &pubactions,
&publish_via_partition_root_given,
- &publish_via_partition_root);
+ &publish_via_partition_root,
+ &ddl_level_given);
puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
Anum_pg_publication_oid);
@@ -822,6 +887,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
BoolGetDatum(pubactions.pubtruncate);
values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
+ values[Anum_pg_publication_pubddl_database - 1] =
+ BoolGetDatum(pubactions.pubddl_database);
+ values[Anum_pg_publication_pubddl_table - 1] =
+ BoolGetDatum(pubactions.pubddl_table);
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
@@ -908,6 +977,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
bool replaces[Natts_pg_publication];
Datum values[Natts_pg_publication];
bool publish_given;
+ bool ddl_level_given;
PublicationActions pubactions;
bool publish_via_partition_root_given;
bool publish_via_partition_root;
@@ -916,11 +986,15 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
List *root_relids = NIL;
ListCell *lc;
+ pubform = (Form_pg_publication) GETSTRUCT(tup);
+
parse_publication_options(pstate,
stmt->options,
+ pubform->puballtables,
&publish_given, &pubactions,
&publish_via_partition_root_given,
- &publish_via_partition_root);
+ &publish_via_partition_root,
+ &ddl_level_given);
pubform = (Form_pg_publication) GETSTRUCT(tup);
@@ -1024,6 +1098,15 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
replaces[Anum_pg_publication_pubtruncate - 1] = true;
}
+ if (ddl_level_given)
+ {
+ values[Anum_pg_publication_pubddl_database - 1] = BoolGetDatum(pubactions.pubddl_database);
+ replaces[Anum_pg_publication_pubddl_database - 1] = true;
+
+ values[Anum_pg_publication_pubddl_table - 1] = BoolGetDatum(pubactions.pubddl_table);
+ replaces[Anum_pg_publication_pubddl_table - 1] = true;
+ }
+
if (publish_via_partition_root_given)
{
values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 0e8fda97f8..fcc651dae5 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5627,6 +5627,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
pubdesc->pubactions.pubupdate |= pubform->pubupdate;
pubdesc->pubactions.pubdelete |= pubform->pubdelete;
pubdesc->pubactions.pubtruncate |= pubform->pubtruncate;
+ pubdesc->pubactions.pubddl_database |= pubform->pubddl_database;
+ pubdesc->pubactions.pubddl_table |= pubform->pubddl_table;
/*
* Check if all columns referenced in the filter expression are part
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7cc9c72e49..8860510ac1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3868,6 +3868,8 @@ getPublications(Archive *fout, int *numPublications)
int i_pubdelete;
int i_pubtruncate;
int i_pubviaroot;
+ int i_pubddl_database;
+ int i_pubddl_table;
int i,
ntups;
@@ -3882,23 +3884,29 @@ getPublications(Archive *fout, int *numPublications)
resetPQExpBuffer(query);
/* Get the publications. */
- if (fout->remoteVersion >= 130000)
+ if (fout->remoteVersion >= 150000)
+ appendPQExpBuffer(query,
+ "SELECT p.tableoid, p.oid, p.pubname, "
+ "p.pubowner, "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot, p.pubddl_database, p.pubddl_table "
+ "FROM pg_publication p");
+ else if (fout->remoteVersion >= 130000)
appendPQExpBuffer(query,
"SELECT p.tableoid, p.oid, p.pubname, "
"p.pubowner, "
- "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot, false AS pubddl_database, false AS pubddl_table "
"FROM pg_publication p");
else if (fout->remoteVersion >= 110000)
appendPQExpBuffer(query,
"SELECT p.tableoid, p.oid, p.pubname, "
"p.pubowner, "
- "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot, false AS pubddl_database, false AS pubddl_table "
"FROM pg_publication p");
else
appendPQExpBuffer(query,
"SELECT p.tableoid, p.oid, p.pubname, "
"p.pubowner, "
- "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot, false AS pubddl_database, false AS pubddl_table "
"FROM pg_publication p");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
@@ -3915,6 +3923,8 @@ getPublications(Archive *fout, int *numPublications)
i_pubdelete = PQfnumber(res, "pubdelete");
i_pubtruncate = PQfnumber(res, "pubtruncate");
i_pubviaroot = PQfnumber(res, "pubviaroot");
+ i_pubddl_database = PQfnumber(res, "pubddl_database");
+ i_pubddl_table = PQfnumber(res, "pubddl_table");
pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
@@ -3939,6 +3949,10 @@ getPublications(Archive *fout, int *numPublications)
(strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
pubinfo[i].pubviaroot =
(strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
+ pubinfo[i].pubddl_database =
+ (strcmp(PQgetvalue(res, i, i_pubddl_database), "t") == 0);
+ pubinfo[i].pubddl_table =
+ (strcmp(PQgetvalue(res, i, i_pubddl_table), "t") == 0);
/* Decide whether we want to dump it */
selectDumpableObject(&(pubinfo[i].dobj), fout);
@@ -4018,6 +4032,29 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo)
appendPQExpBufferStr(query, "'");
+ appendPQExpBufferStr(query, ", ddl = '");
+ first = true;
+
+ if (pubinfo->pubddl_database)
+ {
+ if (!first)
+ appendPQExpBufferStr(query, ", ");
+
+ appendPQExpBufferStr(query, "database");
+ first = false;
+ }
+
+ if (pubinfo->pubddl_table)
+ {
+ if (!first)
+ appendPQExpBufferStr(query, ", ");
+
+ appendPQExpBufferStr(query, "table");
+ first = false;
+ }
+
+ appendPQExpBufferStr(query, "'");
+
if (pubinfo->pubviaroot)
appendPQExpBufferStr(query, ", publish_via_partition_root = true");
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 1d21c2906f..8c5ef01cb0 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -620,6 +620,8 @@ typedef struct _PublicationInfo
bool pubdelete;
bool pubtruncate;
bool pubviaroot;
+ bool pubddl_database;
+ bool pubddl_table;
} PublicationInfo;
/*
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 1f08716f69..cbfcabcfa8 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -2420,7 +2420,7 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE PUBLICATION pub1;',
regexp => qr/^
- \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate');\E
+ \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate', ddl = '');\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -2429,9 +2429,9 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE PUBLICATION pub2
FOR ALL TABLES
- WITH (publish = \'\');',
+ WITH (publish = \'\', ddl = \'\');',
regexp => qr/^
- \QCREATE PUBLICATION pub2 FOR ALL TABLES WITH (publish = '');\E
+ \QCREATE PUBLICATION pub2 FOR ALL TABLES WITH (publish = '', ddl = '');\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -2440,7 +2440,7 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE PUBLICATION pub3;',
regexp => qr/^
- \QCREATE PUBLICATION pub3 WITH (publish = 'insert, update, delete, truncate');\E
+ \QCREATE PUBLICATION pub3 WITH (publish = 'insert, update, delete, truncate', ddl = '');\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
@@ -2449,7 +2449,7 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE PUBLICATION pub4;',
regexp => qr/^
- \QCREATE PUBLICATION pub4 WITH (publish = 'insert, update, delete, truncate');\E
+ \QCREATE PUBLICATION pub4 WITH (publish = 'insert, update, delete, truncate', ddl = '');\E
/xm,
like => { %full_runs, section_post_data => 1, },
},
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 1a5d924a23..b07c6b1841 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6050,7 +6050,7 @@ listPublications(const char *pattern)
PQExpBufferData buf;
PGresult *res;
printQueryOpt myopt = pset.popt;
- static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
+ static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false};
if (pset.sversion < 100000)
{
@@ -6085,6 +6085,15 @@ listPublications(const char *pattern)
appendPQExpBuffer(&buf,
",\n pubviaroot AS \"%s\"",
gettext_noop("Via root"));
+ if (pset.sversion >= 140000)
+ {
+ appendPQExpBuffer(&buf,
+ ",\n pubddl_database AS \"%s\"",
+ gettext_noop("Database level DDLs"));
+ appendPQExpBuffer(&buf,
+ ",\n pubddl_table AS \"%s\"",
+ gettext_noop("Table level DDLs"));
+ }
appendPQExpBufferStr(&buf,
"\nFROM pg_catalog.pg_publication\n");
@@ -6172,6 +6181,7 @@ describePublications(const char *pattern)
PGresult *res;
bool has_pubtruncate;
bool has_pubviaroot;
+ bool has_pubddl;
PQExpBufferData title;
printTableContent cont;
@@ -6188,6 +6198,7 @@ describePublications(const char *pattern)
has_pubtruncate = (pset.sversion >= 110000);
has_pubviaroot = (pset.sversion >= 130000);
+ has_pubddl = (pset.sversion >= 150000);
initPQExpBuffer(&buf);
@@ -6201,6 +6212,11 @@ describePublications(const char *pattern)
if (has_pubviaroot)
appendPQExpBufferStr(&buf,
", pubviaroot");
+ if (has_pubddl)
+ {
+ appendPQExpBufferStr(&buf,
+ ", pubddl_database, pubddl_table");
+ }
appendPQExpBufferStr(&buf,
"\nFROM pg_catalog.pg_publication\n");
@@ -6249,6 +6265,8 @@ describePublications(const char *pattern)
ncols++;
if (has_pubviaroot)
ncols++;
+ if (has_pubddl)
+ ncols += 2;
initPQExpBuffer(&title);
printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -6263,6 +6281,11 @@ describePublications(const char *pattern)
printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
if (has_pubviaroot)
printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
+ if (has_pubddl)
+ {
+ printTableAddHeader(&cont, gettext_noop("Database level DDL"), true, align);
+ printTableAddHeader(&cont, gettext_noop("Table level DDL"), true, align);
+ }
printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -6273,6 +6296,11 @@ describePublications(const char *pattern)
printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
if (has_pubviaroot)
printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
+ if (has_pubddl)
+ {
+ printTableAddCell(&cont, PQgetvalue(res, i, 9), false, false);
+ printTableAddCell(&cont, PQgetvalue(res, i, 10), false, false);
+ }
if (!puballtables)
{
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 48205ba429..a94e152256 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -20,6 +20,7 @@
#include "catalog/genbki.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_publication_d.h"
+#include "tcop/utility.h"
/* ----------------
* pg_publication definition. cpp turns this into
@@ -54,6 +55,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
/* true if partition changes are published using root schema */
bool pubviaroot;
+
+ /* true if database level ddls are published */
+ bool pubddl_database;
+
+ /* true if table level ddls are published */
+ bool pubddl_table;
} FormData_pg_publication;
/* ----------------
@@ -72,6 +79,8 @@ typedef struct PublicationActions
bool pubupdate;
bool pubdelete;
bool pubtruncate;
+ bool pubddl_database;
+ bool pubddl_table;
} PublicationActions;
typedef struct PublicationDesc
@@ -157,5 +166,6 @@ extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols,
extern Oid get_publication_oid(const char *pubname, bool missing_ok);
extern char *get_publication_name(Oid pubid, bool missing_ok);
+extern bool ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel);
#endif /* PG_PUBLICATION_H */
diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h
index 56d2bb6616..073f46ef1a 100644
--- a/src/include/commands/defrem.h
+++ b/src/include/commands/defrem.h
@@ -15,6 +15,7 @@
#define DEFREM_H
#include "catalog/objectaddress.h"
+#include "catalog/pg_publication.h"
#include "nodes/params.h"
#include "parser/parse_node.h"
#include "tcop/dest.h"
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 274b37dfe5..e9cea11a5b 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -30,20 +30,20 @@ ERROR: conflicting or redundant options
LINE 1: ...ub_xxx WITH (publish_via_partition_root = 'true', publish_vi...
^
\dRp
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f
- testpub_default | regress_publication_user | f | f | t | f | f | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+---------------------+------------------
+ testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | f | f
+ testpub_default | regress_publication_user | f | f | t | f | f | f | f | f
(2 rows)
ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
\dRp
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f
- testpub_default | regress_publication_user | f | t | t | t | f | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+---------------------+------------------
+ testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | f | f
+ testpub_default | regress_publication_user | f | t | t | t | f | f | f | f
(2 rows)
--- adding tables
@@ -87,10 +87,10 @@ RESET client_min_messages;
-- should be able to add schema to 'FOR TABLE' publication
ALTER PUBLICATION testpub_fortable ADD ALL TABLES IN SCHEMA pub_test;
\dRp+ testpub_fortable
- Publication testpub_fortable
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_fortable
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"public.testpub_tbl1"
Tables from schemas:
@@ -99,20 +99,20 @@ Tables from schemas:
-- should be able to drop schema from 'FOR TABLE' publication
ALTER PUBLICATION testpub_fortable DROP ALL TABLES IN SCHEMA pub_test;
\dRp+ testpub_fortable
- Publication testpub_fortable
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_fortable
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"public.testpub_tbl1"
-- should be able to set schema to 'FOR TABLE' publication
ALTER PUBLICATION testpub_fortable SET ALL TABLES IN SCHEMA pub_test;
\dRp+ testpub_fortable
- Publication testpub_fortable
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_fortable
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test"
@@ -134,10 +134,10 @@ ERROR: relation "testpub_nopk" is not part of the publication
-- should be able to set table to schema publication
ALTER PUBLICATION testpub_forschema SET TABLE pub_test.testpub_nopk;
\dRp+ testpub_forschema
- Publication testpub_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"pub_test.testpub_nopk"
@@ -159,10 +159,10 @@ Publications:
"testpub_foralltables"
\dRp+ testpub_foralltables
- Publication testpub_foralltables
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | t | t | t | f | f | f
+ Publication testpub_foralltables
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | t | t | t | f | f | f | t | t
(1 row)
DROP TABLE testpub_tbl2;
@@ -174,19 +174,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
RESET client_min_messages;
\dRp+ testpub3
- Publication testpub3
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub3
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"public.testpub_tbl3"
"public.testpub_tbl3a"
\dRp+ testpub4
- Publication testpub4
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub4
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"public.testpub_tbl3"
@@ -207,10 +207,10 @@ UPDATE testpub_parted1 SET a = 1;
-- only parent is listed as being in publication, not the partition
ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
\dRp+ testpub_forparted
- Publication testpub_forparted
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_forparted
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"public.testpub_parted"
@@ -223,10 +223,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
UPDATE testpub_parted1 SET a = 1;
ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
\dRp+ testpub_forparted
- Publication testpub_forparted
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | t
+ Publication testpub_forparted
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | t | f | f
Tables:
"public.testpub_parted"
@@ -255,10 +255,10 @@ SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert');
RESET client_min_messages;
\dRp+ testpub5
- Publication testpub5
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | f | f | f | f
+ Publication testpub5
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | f | f | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -271,10 +271,10 @@ Tables:
ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
\dRp+ testpub5
- Publication testpub5
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | f | f | f | f
+ Publication testpub5
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | f | f | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
@@ -290,10 +290,10 @@ Publications:
ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
\dRp+ testpub5
- Publication testpub5
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | f | f | f | f
+ Publication testpub5
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | f | f | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
@@ -301,10 +301,10 @@ Tables:
-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
\dRp+ testpub5
- Publication testpub5
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | f | f | f | f
+ Publication testpub5
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | f | f | f | f | f | f
Tables:
"public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500))
@@ -337,10 +337,10 @@ SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert');
RESET client_min_messages;
\dRp+ testpub_syntax1
- Publication testpub_syntax1
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | f | f | f | f
+ Publication testpub_syntax1
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | f | f | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"public.testpub_rf_tbl3" WHERE (e < 999)
@@ -350,10 +350,10 @@ SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert');
RESET client_min_messages;
\dRp+ testpub_syntax2
- Publication testpub_syntax2
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | f | f | f | f
+ Publication testpub_syntax2
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | f | f | f | f | f | f
Tables:
"public.testpub_rf_tbl1"
"testpub_rf_schema1.testpub_rf_tbl5" WHERE (h < 999)
@@ -1029,10 +1029,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl
CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
ERROR: publication "testpub_fortbl" already exists
\dRp+ testpub_fortbl
- Publication testpub_fortbl
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_fortbl
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"pub_test.testpub_nopk"
"public.testpub_tbl1"
@@ -1070,10 +1070,10 @@ Publications:
"testpub_fortbl"
\dRp+ testpub_default
- Publication testpub_default
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | f | f
+ Publication testpub_default
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | f | f | f | f
Tables:
"pub_test.testpub_nopk"
"public.testpub_tbl1"
@@ -1151,10 +1151,10 @@ REVOKE CREATE ON DATABASE regression FROM regress_publication_user2;
DROP TABLE testpub_parted;
DROP TABLE testpub_tbl1;
\dRp+ testpub_default
- Publication testpub_default
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | f | f
+ Publication testpub_default
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | f | f | f | f
(1 row)
-- fail - must be owner of publication
@@ -1164,20 +1164,20 @@ ERROR: must be owner of publication testpub_default
RESET ROLE;
ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
\dRp testpub_foo
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
--------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpub_foo | regress_publication_user | f | t | t | t | f | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs
+-------------+--------------------------+------------+---------+---------+---------+-----------+----------+---------------------+------------------
+ testpub_foo | regress_publication_user | f | t | t | t | f | f | f | f
(1 row)
-- rename back to keep the rest simple
ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
\dRp testpub_default
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
------------------+---------------------------+------------+---------+---------+---------+-----------+----------
- testpub_default | regress_publication_user2 | f | t | t | t | f | f
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs
+-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+---------------------+------------------
+ testpub_default | regress_publication_user2 | f | t | t | t | f | f | f | f
(1 row)
-- adding schemas and tables
@@ -1193,19 +1193,19 @@ CREATE TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA"(id int);
SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub1_forschema FOR ALL TABLES IN SCHEMA pub_test1;
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
CREATE PUBLICATION testpub2_forschema FOR ALL TABLES IN SCHEMA pub_test1, pub_test2, pub_test3;
\dRp+ testpub2_forschema
- Publication testpub2_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub2_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
"pub_test2"
@@ -1219,44 +1219,44 @@ CREATE PUBLICATION testpub6_forschema FOR ALL TABLES IN SCHEMA "CURRENT_SCHEMA",
CREATE PUBLICATION testpub_fortable FOR TABLE "CURRENT_SCHEMA"."CURRENT_SCHEMA";
RESET client_min_messages;
\dRp+ testpub3_forschema
- Publication testpub3_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub3_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"public"
\dRp+ testpub4_forschema
- Publication testpub4_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub4_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"CURRENT_SCHEMA"
\dRp+ testpub5_forschema
- Publication testpub5_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub5_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"CURRENT_SCHEMA"
"public"
\dRp+ testpub6_forschema
- Publication testpub6_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub6_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"CURRENT_SCHEMA"
"public"
\dRp+ testpub_fortable
- Publication testpub_fortable
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_fortable
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"CURRENT_SCHEMA.CURRENT_SCHEMA"
@@ -1290,10 +1290,10 @@ ERROR: schema "testpub_view" does not exist
-- dropping the schema should reflect the change in publication
DROP SCHEMA pub_test3;
\dRp+ testpub2_forschema
- Publication testpub2_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub2_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
"pub_test2"
@@ -1301,20 +1301,20 @@ Tables from schemas:
-- renaming the schema should reflect the change in publication
ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed;
\dRp+ testpub2_forschema
- Publication testpub2_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub2_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1_renamed"
"pub_test2"
ALTER SCHEMA pub_test1_renamed RENAME to pub_test1;
\dRp+ testpub2_forschema
- Publication testpub2_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub2_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
"pub_test2"
@@ -1322,10 +1322,10 @@ Tables from schemas:
-- alter publication add schema
ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test2;
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
"pub_test2"
@@ -1334,10 +1334,10 @@ Tables from schemas:
ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA non_existent_schema;
ERROR: schema "non_existent_schema" does not exist
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
"pub_test2"
@@ -1346,10 +1346,10 @@ Tables from schemas:
ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA pub_test1;
ERROR: schema "pub_test1" is already member of publication "testpub1_forschema"
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
"pub_test2"
@@ -1357,10 +1357,10 @@ Tables from schemas:
-- alter publication drop schema
ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2;
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
@@ -1368,10 +1368,10 @@ Tables from schemas:
ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test2;
ERROR: tables from schema "pub_test2" are not part of the publication
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
@@ -1379,29 +1379,29 @@ Tables from schemas:
ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA non_existent_schema;
ERROR: schema "non_existent_schema" does not exist
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
-- drop all schemas
ALTER PUBLICATION testpub1_forschema DROP ALL TABLES IN SCHEMA pub_test1;
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
(1 row)
-- alter publication set multiple schema
ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test2;
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
"pub_test2"
@@ -1410,10 +1410,10 @@ Tables from schemas:
ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schema;
ERROR: schema "non_existent_schema" does not exist
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
"pub_test2"
@@ -1422,10 +1422,10 @@ Tables from schemas:
-- removing the duplicate schemas
ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
\dRp+ testpub1_forschema
- Publication testpub1_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub1_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
@@ -1504,18 +1504,18 @@ SET client_min_messages = 'ERROR';
CREATE PUBLICATION testpub3_forschema;
RESET client_min_messages;
\dRp+ testpub3_forschema
- Publication testpub3_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub3_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
(1 row)
ALTER PUBLICATION testpub3_forschema SET ALL TABLES IN SCHEMA pub_test1;
\dRp+ testpub3_forschema
- Publication testpub3_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub3_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables from schemas:
"pub_test1"
@@ -1525,20 +1525,20 @@ CREATE PUBLICATION testpub_forschema_fortable FOR ALL TABLES IN SCHEMA pub_test1
CREATE PUBLICATION testpub_fortable_forschema FOR TABLE pub_test2.tbl1, ALL TABLES IN SCHEMA pub_test1;
RESET client_min_messages;
\dRp+ testpub_forschema_fortable
- Publication testpub_forschema_fortable
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_forschema_fortable
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"pub_test2.tbl1"
Tables from schemas:
"pub_test1"
\dRp+ testpub_fortable_forschema
- Publication testpub_fortable_forschema
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f | t | t | t | t | f
+ Publication testpub_fortable_forschema
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDL | Table level DDL
+--------------------------+------------+---------+---------+---------+-----------+----------+--------------------+-----------------
+ regress_publication_user | f | t | t | t | t | f | f | f
Tables:
"pub_test2.tbl1"
Tables from schemas:
--
2.32.0
[application/octet-stream] 0004-Enable-replication-of-CREATE-MATERIALIZED-VIEW-AS-st.patch (4.1K, 6-0004-Enable-replication-of-CREATE-MATERIALIZED-VIEW-AS-st.patch)
download | inline diff:
From b9b3ca9f7e5749ca0c89a88ab5b5bd990a8598a0 Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Tue, 22 Mar 2022 23:47:52 +0000
Subject: [PATCH 04/12] Enable replication of CREATE MATERIALIZED VIEW AS stmt.
Add tests for both CREATE MATERIALIZED VIEW and CREATE VIEW.
---
src/backend/tcop/utility.c | 42 +++++++++++++++++++++++--
src/test/subscription/t/030_rep_ddls.pl | 18 +++++++++++
2 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index d0bb232af6..79b5faccc0 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1140,6 +1140,45 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
}
break;
+ /*
+ * CreateTableAsStmt can create either a table a materialized view
+ * and they are handled differently.
+ */
+ case T_CreateTableAsStmt:
+ {
+ CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree;
+ switch(stmt->objtype)
+ {
+ case OBJECT_TABLE:
+ /*
+ * FIXME CREATE TABLE AS stmt needs to be broken down into two parts
+ * 1. A normal CREATE TABLE string that get's logged and replicated via
+ * DDL replication.
+ * 2. Insertions that get replicated by DML replication.
+ */
+ break;
+ case OBJECT_MATVIEW:
+ /*
+ * Log CREATE MATERIALIZED VIEW AS stmt for logical replication if
+ * there is any FOR ALL TABLES publication with pubddl_database on.
+ * i.e. Database level DDL replication is on for some publication.
+ */
+ if (ddl_need_xlog(InvalidOid, true, true))
+ {
+ bool transactional = true;
+ const char* prefix = "";
+ LogLogicalDDLMessage(prefix,
+ GetUserId(),
+ queryString,
+ strlen(queryString),
+ transactional);
+ }
+ default:
+ break;
+ }
+ break;
+ }
+
/*
* Secondly, commands that may be allowed in Table level DDL replication.
* These are currently handled in the later execution path of the command.
@@ -1155,7 +1194,7 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
/* DropStmt depends on the removeType */
case T_DropStmt:
{
- DropStmt* stmt = (DropStmt*) parsetree;
+ DropStmt *stmt = (DropStmt *) parsetree;
switch (stmt->removeType)
{
/* Maybe allowed in Table level DDL replication, handled in later code path */
@@ -1206,7 +1245,6 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
case T_RuleStmt:
case T_CreateSeqStmt:
case T_AlterSeqStmt:
- case T_CreateTableAsStmt:
case T_RefreshMatViewStmt:
case T_CreatePLangStmt:
case T_CreateConversionStmt:
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
index c88c4ea1c0..562efe2cf7 100644
--- a/src/test/subscription/t/030_rep_ddls.pl
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -226,6 +226,24 @@ $node_publisher->wait_for_catchup('mysub');
$result = $node_subscriber->safe_psql('postgres', "SELECT tableowner from pg_catalog.pg_tables where tablename = 't5';");
is($result, qq(ddl_replication_user), 'Owner of t5 is correct');
+# Test CREATE MATERIALIZED VIEW stmt is replicated
+$node_publisher->safe_psql('postgres', "CREATE MATERIALIZED VIEW s1.matview1 AS SELECT a, b from s1.t1;");
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) from s1.matview1;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result_sub = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.matview1;");
+is($result, qq($result_sub), 'CREATE of s1.matview1 is replicated');
+
+# Test CREATE VIEW stmt is replicated
+$node_publisher->safe_psql('postgres', "CREATE VIEW s1.view1 AS SELECT a, b from s1.t1;");
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) from s1.view1;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result_sub = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.view1;");
+is($result, qq($result_sub), 'CREATE of s1.view1 is replicated');
+
#TODO TEST certain DDLs are not replicated
pass "DDL replication tests passed!";
--
2.32.0
[application/octet-stream] 0008-Fail-replication-worker-on-DDL-command-that-rewrites.patch (4.8K, 7-0008-Fail-replication-worker-on-DDL-command-that-rewrites.patch)
download | inline diff:
From 1c240d98130bd67568b631d3c06b0877ae82e9f8 Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Fri, 29 Apr 2022 16:57:03 +0000
Subject: [PATCH 08/12] Fail replication worker on DDL command that rewrites
table using volatile functions, such as ALTER TABLE tab ADD COLUMN col
DEFAULT volatile_expr. This is to avoid data mismatch compared to the
publisher. We can potentially unblock this type of command when table rewrite
is supported in logical replication.
---
src/backend/replication/logical/worker.c | 44 ++++++++++++++++++++++++
src/test/subscription/t/030_rep_ddls.pl | 22 ++++++++++++
2 files changed, 66 insertions(+)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 72711b06d6..39702f4a0f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -157,6 +157,7 @@
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/analyze.h"
+#include "parser/parse_expr.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
@@ -2550,6 +2551,8 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
commandTag = CreateCommandTag((Node *)command);
+ /* The following DDL commands need special handling */
+
/*
* Remember the schemaname and relname if the cmd is going to create a table
* because we will need them for some post-processing after we
@@ -2597,6 +2600,47 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
sstmt->intoClause->skipData = true;
}
}
+ /*
+ * ALTER TABLE ADD COLUMN col DEFAULT volatile_expr is not supported.
+ * Until we support logical replication of table rewrite, see ATRewriteTables()
+ * for details on table rewrite.
+ */
+ else if (commandTag == CMDTAG_ALTER_TABLE)
+ {
+ AlterTableStmt *atstmt = (AlterTableStmt *) command->stmt;
+ ListCell *lc;
+
+ foreach(lc, atstmt->cmds)
+ {
+ AlterTableCmd *cmd = lfirst_node(AlterTableCmd, lc);
+
+ if (cmd->subtype == AT_AddColumn)
+ {
+ ColumnDef *colDef;
+ ListCell *c;
+
+ colDef = castNode(ColumnDef, cmd->def);
+ foreach(c, colDef->constraints)
+ {
+ Constraint *con = lfirst_node(Constraint, c);
+
+ if (con->contype == CONSTR_DEFAULT)
+ {
+ Node *expr;
+ ParseState *pstate = make_parsestate(NULL);
+
+ expr = transformExpr(pstate, copyObject(con->raw_expr), EXPR_KIND_COLUMN_DEFAULT);
+ if (contain_volatile_functions(expr))
+ {
+ elog(ERROR,
+ "Do not support replication of DDL statement that rewrites table using volatile functions: %s",
+ cmdstr);
+ }
+ }
+ }
+ }
+ }
+ }
/*
* Set up a snapshot if parse analysis/planning will need one.
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
index 34b9d51eb1..b4df1bfefd 100644
--- a/src/test/subscription/t/030_rep_ddls.pl
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -386,7 +386,29 @@ $node_publisher->wait_for_catchup('mysub');
$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.proc_table where c3 = 22;");
is($result, qq(1), 'DDLs in procedure are replicated');
+# Test Alter table alter column type stmt is replicated
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ALTER COLUMN name type TEXT;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT data_type FROM information_schema.columns WHERE table_name = 'test_rep' and column_name = 'name';");
+is($result, qq(text), 'Alter table column type stmt is replicated');
+
+# Test Alter table add column default 0.01 is replicated
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN non_volatile double precision DEFAULT 0.01;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep where non_volatile = 0.01;");
+is($result, qq(2), 'Alter table add column default 0.01 is replicated');
+
#TODO TEST certain DDLs are not replicated
+# Test DDL statement that rewrites table with volatile functions are not replicated
+$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN volatile double precision DEFAULT 3 * random();");
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM information_schema.columns WHERE table_name = 'test_rep' and column_name = 'volatile';");
+is($result, qq(1), 'Alter table add column default random() is executed on the publisher DB.');
+
+$result = $node_subscriber->wait_for_log("Do not support replication of DDL statement that rewrites table using volatile functions", $result);
pass "DDL replication tests passed!";
--
2.32.0
[application/octet-stream] 0009-Support-replication-of-DDL-type-T_RenameStmt-table-r.patch (8.3K, 8-0009-Support-replication-of-DDL-type-T_RenameStmt-table-r.patch)
download | inline diff:
From ac6778fa3348db8522c3177839659a2c38484497 Mon Sep 17 00:00:00 2001
From: PonyboyYbr <94borelyang@gmail.com>
Date: Wed, 11 May 2022 15:46:40 -0700
Subject: [PATCH 09/12] Support replication of DDL type T_RenameStmt: table
rename is allowed in both database level and table level DDL replication.
Rename of other objects are only allowed in database level DDL replication.
Co-authored-by: Borui Yang <boruiyan@amazon.com>
---
src/backend/commands/alter.c | 5 ++--
src/backend/commands/tablecmds.c | 15 ++++++++++-
src/backend/tcop/utility.c | 20 ++++++++++++---
src/include/commands/alter.h | 3 ++-
src/include/commands/tablecmds.h | 2 +-
src/test/subscription/t/030_rep_ddls.pl | 33 +++++++++++++++++++++++--
6 files changed, 67 insertions(+), 11 deletions(-)
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 5456b8222b..1ca8f99e0c 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -328,7 +328,8 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
* Return value is the address of the renamed object.
*/
ObjectAddress
-ExecRenameStmt(RenameStmt *stmt)
+ExecRenameStmt(ParseState *pstate, RenameStmt *stmt,
+ bool isCompleteQuery)
{
switch (stmt->renameType)
{
@@ -354,7 +355,7 @@ ExecRenameStmt(RenameStmt *stmt)
case OBJECT_MATVIEW:
case OBJECT_INDEX:
case OBJECT_FOREIGN_TABLE:
- return RenameRelation(stmt);
+ return RenameRelation(pstate, stmt, isCompleteQuery);
case OBJECT_COLUMN:
case OBJECT_ATTRIBUTE:
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3c9b6409ca..65386f2641 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -3820,11 +3820,12 @@ RenameConstraint(RenameStmt *stmt)
* RENAME
*/
ObjectAddress
-RenameRelation(RenameStmt *stmt)
+RenameRelation(ParseState *pstate, RenameStmt *stmt, bool isCompleteQuery)
{
bool is_index_stmt = stmt->renameType == OBJECT_INDEX;
Oid relid;
ObjectAddress address;
+ bool ddlxlog = XLogLogicalInfoActive() && isCompleteQuery;
/*
* Grab an exclusive lock on the target table, index, sequence, view,
@@ -3872,6 +3873,18 @@ RenameRelation(RenameStmt *stmt)
is_index_stmt = obj_is_index;
}
+ if (ddlxlog &&
+ ddl_need_xlog(relid, false))
+ {
+ bool transactional = true;
+ const char* prefix = "";
+ LogLogicalDDLMessage(prefix,
+ GetUserId(),
+ pstate->p_sourcetext,
+ strlen(pstate->p_sourcetext),
+ transactional);
+ }
+
/* Do the work */
RenameRelationInternal(relid, stmt->newname, false, is_index_stmt);
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index e9e7567209..1abd77a60e 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1002,7 +1002,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
context, params, queryEnv,
dest, qc);
else
- ExecRenameStmt(stmt);
+ ExecRenameStmt(pstate, stmt, context != PROCESS_UTILITY_SUBCOMMAND);
}
break;
@@ -1195,8 +1195,20 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
*/
case T_AlterTableStmt:
case T_IndexStmt:
- case T_RenameStmt: /* TODO */
- case T_AlterOwnerStmt: /* TODO */
+ case T_RenameStmt:
+ {
+ RenameStmt *stmt = (RenameStmt *) parsetree;
+ if(!stmt->relation && ddl_need_xlog(InvalidOid, true)){
+ bool transactional = true;
+ const char* prefix = "";
+ LogLogicalDDLMessage(prefix,
+ GetUserId(),
+ queryString,
+ strlen(queryString),
+ transactional);
+ }
+ }
+ case T_AlterOwnerStmt: /* TODO, it is data control case, save for later update */
break;
/* DropStmt depends on the removeType */
@@ -2008,7 +2020,7 @@ ProcessUtilitySlow(ParseState *pstate,
break;
case T_RenameStmt:
- address = ExecRenameStmt((RenameStmt *) parsetree);
+ address = ExecRenameStmt(pstate, (RenameStmt *) parsetree, isCompleteQuery);
break;
case T_AlterObjectDependsStmt:
diff --git a/src/include/commands/alter.h b/src/include/commands/alter.h
index 52f5e6f6d2..df9333eb93 100644
--- a/src/include/commands/alter.h
+++ b/src/include/commands/alter.h
@@ -17,9 +17,10 @@
#include "catalog/dependency.h"
#include "catalog/objectaddress.h"
#include "nodes/parsenodes.h"
+#include "parser/parse_node.h"
#include "utils/relcache.h"
-extern ObjectAddress ExecRenameStmt(RenameStmt *stmt);
+extern ObjectAddress ExecRenameStmt(ParseState *pstate, RenameStmt *stmt, bool isCompleteQuery);
extern ObjectAddress ExecAlterObjectDependsStmt(AlterObjectDependsStmt *stmt,
ObjectAddress *refAddress);
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 24106de2e5..f96bb57d56 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -74,7 +74,7 @@ extern ObjectAddress renameatt(RenameStmt *stmt);
extern ObjectAddress RenameConstraint(RenameStmt *stmt);
-extern ObjectAddress RenameRelation(RenameStmt *stmt);
+extern ObjectAddress RenameRelation(ParseState *pstate, RenameStmt *stmt, bool isCompleteQuery);
extern void RenameRelationInternal(Oid myrelid,
const char *newrelname, bool is_internal,
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
index b4df1bfefd..51126b489b 100644
--- a/src/test/subscription/t/030_rep_ddls.pl
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -30,7 +30,6 @@ $node_subscriber->safe_psql('postgres', $ddl);
$node_subscriber2->safe_psql('postgres', $ddl);
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
-
# mypub has pubddl_database on
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION mypub FOR ALL TABLES;");
@@ -269,7 +268,7 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.t6;")
is($result, qq(1), 'SELECT INTO s1.t6 is replicated with data');
# TEST Create DomainStmt
-$node_publisher->safe_psql('postgres', "CREATE DOMAIN s1.space_check AS VARCHAR NOT NULL CHECK (value !~ '\s');");
+$node_publisher->safe_psql('postgres', "CREATE DOMAIN s1.space_check AS VARCHAR NOT NULL CHECK (value !~ '\\s');");
$node_publisher->wait_for_catchup('mysub');
@@ -348,6 +347,36 @@ $node_publisher->wait_for_catchup('mysub');
$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_catalog.pg_cast c, pg_catalog.pg_proc p WHERE p.proname='add' AND c.castfunc=p.oid;");
is($result, qq(1), 'CreateCast Stmt is replicated');
+#TEST RenameStmt for FUNCTION
+$node_publisher->safe_psql('postgres', "ALTER FUNCTION add RENAME TO plus;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_catalog.pg_proc p where p.proname='plus';");
+is($result, qq(1), 'RENAME FUNCTION Stmt is replicated');
+
+#TEST RenameStmt for table
+$node_publisher->safe_psql('postgres', "CREATE DATABASE db1;");
+$node_publisher->safe_psql('db1', "CREATE TABLE t7 (id int primary key, name varchar);");
+$node_publisher->safe_psql('db1', "CREATE TABLE t8 (id int primary key, name varchar);");
+$node_publisher->safe_psql('db1',
+ "CREATE PUBLICATION mypub3 FOR TABLE t7 with (ddl = 'table');");
+my $publisher_connstr_db1 = $node_publisher->connstr . ' dbname=db1';
+$node_subscriber->safe_psql('postgres', "CREATE DATABASE db1;");
+$node_subscriber->safe_psql('db1', "CREATE TABLE t7 (id int primary key, name varchar);");
+$node_subscriber->safe_psql('db1', "CREATE TABLE t8 (id int primary key, name varchar);");
+$node_subscriber->safe_psql('db1',
+ "CREATE SUBSCRIPTION mysub3 CONNECTION '$publisher_connstr_db1' PUBLICATION mypub3;"
+);
+$node_publisher->wait_for_catchup('mysub3');
+$node_publisher->safe_psql('db1', "ALTER TABLE t7 RENAME TO newt7;");
+$node_publisher->safe_psql('db1', "ALTER TABLE t8 RENAME TO newt8;");
+$node_publisher->wait_for_catchup('mysub3');
+$result = $node_subscriber->safe_psql('db1', "SELECT count(*) from pg_tables where tablename = 'newt7';");
+is($result, qq(1), 'Rename t7 to newt7 is replicated');
+$result = $node_subscriber->safe_psql('db1', "SELECT count(*) from pg_tables where tablename = 'newt8';");
+is($result, qq(0), 'Rename t8 to newt8 is not replicated');
+
#TEST DDL in function
$node_publisher->safe_psql('postgres', qq{
CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20))
--
2.32.0
[application/octet-stream] 0007-Enable-logging-and-replication-of-DDLs-executed-insi.patch (17.5K, 9-0007-Enable-logging-and-replication-of-DDLs-executed-insi.patch)
download | inline diff:
From 22ebcce07b52dcfde005564932273b21f1d4baf8 Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Fri, 15 Apr 2022 02:02:10 +0000
Subject: [PATCH 07/12] Enable logging and replication of DDLs executed inside
function and procedures. Also fixed a bug where DDL with SUBCOMMAND gets
logged twice.
---
.../test_decoding/expected/ddlmessages.out | 47 +++++++++++++++++--
contrib/test_decoding/sql/ddlmessages.sql | 19 +++++++-
src/backend/catalog/pg_publication.c | 5 +-
src/backend/commands/tablecmds.c | 6 +--
src/backend/tcop/utility.c | 26 +++++-----
src/include/catalog/pg_publication.h | 2 +-
src/include/commands/tablecmds.h | 3 +-
src/test/subscription/t/030_rep_ddls.pl | 38 +++++++++++++++
8 files changed, 121 insertions(+), 25 deletions(-)
diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out
index 0376f36c24..823029d03d 100644
--- a/contrib/test_decoding/expected/ddlmessages.out
+++ b/contrib/test_decoding/expected/ddlmessages.out
@@ -11,7 +11,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
init
(1 row)
-CREATE TABLE test_ddlmessage (id serial unique, data int);
+CREATE TABLE test_ddlmessage (id serial unique primary key, data int);
ALTER TABLE test_ddlmessage add c3 varchar;
ALTER TABLE test_ddlmessage drop c3;
DROP TABLE test_ddlmessage;
@@ -24,9 +24,9 @@ CREATE TABLE test_ddlmessage (id serial unique, data int);
ALTER TABLE test_ddlmessage add c3 varchar;
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
- data
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int);
+ data
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 70 content:CREATE TABLE test_ddlmessage (id serial unique primary key, data int);
DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3;
DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage;
@@ -34,6 +34,44 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
(6 rows)
+-- Test logging DDL in function
+CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20))
+RETURNS VOID AS $$
+BEGIN
+ execute format('CREATE TABLE %I(id int primary key, name varchar);', tname);
+ execute format('ALTER TABLE %I ADD c3 int', tname);
+ execute format('INSERT INTO %I VALUES (1, ''a'');', tname);
+ execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname);
+END;
+$$
+LANGUAGE plpgsql;
+SELECT func_ddl ('tab_from_func');
+ func_ddl
+----------
+
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 375 content:CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) +
+ RETURNS VOID AS $$ +
+ BEGIN +
+ execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); +
+ execute format('ALTER TABLE %I ADD c3 int', tname); +
+ execute format('INSERT INTO %I VALUES (1, ''a'');', tname); +
+ execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); +
+ END; +
+ $$ +
+ LANGUAGE plpgsql;
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 61 content:CREATE TABLE tab_from_func(id int primary key, name varchar);
+ DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE tab_from_func ADD c3 int
+ BEGIN
+ table public.tab_from_func: INSERT: id[integer]:1 name[character varying]:'a' c3[integer]:null
+ table public.tab_from_func: INSERT: id[integer]:2 name[character varying]:'b' c3[integer]:22
+ COMMIT
+(7 rows)
+
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
@@ -41,4 +79,5 @@ SELECT pg_drop_replication_slot('regression_slot');
(1 row)
DROP TABLE test_ddlmessage;
+DROP TABLE tab_from_func;
DROP publication mypub;
diff --git a/contrib/test_decoding/sql/ddlmessages.sql b/contrib/test_decoding/sql/ddlmessages.sql
index c23610f9b4..3082671488 100644
--- a/contrib/test_decoding/sql/ddlmessages.sql
+++ b/contrib/test_decoding/sql/ddlmessages.sql
@@ -9,7 +9,7 @@ SET SESSION AUTHORIZATION 'ddl_replication_user';
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
-CREATE TABLE test_ddlmessage (id serial unique, data int);
+CREATE TABLE test_ddlmessage (id serial unique primary key, data int);
ALTER TABLE test_ddlmessage add c3 varchar;
ALTER TABLE test_ddlmessage drop c3;
DROP TABLE test_ddlmessage;
@@ -25,7 +25,24 @@ ALTER TABLE test_ddlmessage add c3 varchar;
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test logging DDL in function
+CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20))
+RETURNS VOID AS $$
+BEGIN
+ execute format('CREATE TABLE %I(id int primary key, name varchar);', tname);
+ execute format('ALTER TABLE %I ADD c3 int', tname);
+ execute format('INSERT INTO %I VALUES (1, ''a'');', tname);
+ execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname);
+END;
+$$
+LANGUAGE plpgsql;
+
+SELECT func_ddl ('tab_from_func');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
SELECT pg_drop_replication_slot('regression_slot');
DROP TABLE test_ddlmessage;
+DROP TABLE tab_from_func;
DROP publication mypub;
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index fad21a31d0..d5e4f3713d 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1211,15 +1211,12 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
* Checks if DDL on relation (relid) need xlog for logical replication
*/
bool
-ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel)
+ddl_need_xlog(Oid relid, bool forAllTabPubOnly)
{
List *allTablePubs = NIL;
List *tablePubs = NIL;
ListCell *lc;
- /* Only replicate toplevel DDL command */
- if (!isTopLevel)
- return false;
if (relid == InvalidOid && !forAllTabPubOnly)
return false;
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 6d1487951f..3c9b6409ca 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -1333,14 +1333,14 @@ DropErrorMsgWrongType(const char *relname, char wrongkind, char rightkind)
* DROP MATERIALIZED VIEW, DROP FOREIGN TABLE
*/
void
-RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel)
+RemoveRelations(ParseState *pstate, DropStmt *drop, bool isCompleteQuery)
{
ObjectAddresses *objects;
char relkind;
ListCell *cell;
int flags = 0;
LOCKMODE lockmode = AccessExclusiveLock;
- bool ddlxlog = XLogLogicalInfoActive();
+ bool ddlxlog = XLogLogicalInfoActive() && isCompleteQuery;
/* DROP CONCURRENTLY uses a weaker lock, and has some restrictions */
if (drop->concurrent)
@@ -1466,7 +1466,7 @@ RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel)
/* DROP RELATION or INDEX are allowed in table level DDL replication */
if (tableOid != InvalidOid &&
- !ddl_need_xlog(tableOid, false, isTopLevel))
+ !ddl_need_xlog(tableOid, false))
ddlxlog = false;
}
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 29381b53b6..e9e7567209 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -87,7 +87,8 @@ static void ProcessUtilitySlow(ParseState *pstate,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc);
-static void ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel);
+static void ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel,
+ bool isCompleteQuery);
/*
* CommandIsReadOnly: is an executable query read-only?
@@ -988,7 +989,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
context, params, queryEnv,
dest, qc);
else
- ExecDropStmt(pstate, stmt, isTopLevel);
+ ExecDropStmt(pstate, stmt, isTopLevel, context != PROCESS_UTILITY_SUBCOMMAND);
}
break;
@@ -1128,7 +1129,7 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
* there is any FOR ALL TABLES publication with pubddl_database on.
* i.e. Database level DDL replication is on for some publication.
*/
- if (ddl_need_xlog(InvalidOid, true, true))
+ if (ddl_need_xlog(InvalidOid, true))
{
bool transactional = true;
const char* prefix = "";
@@ -1170,7 +1171,7 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
* there is any FOR ALL TABLES publication with pubddl_database on.
* i.e. Database level DDL replication is on for some publication.
*/
- if (ddl_need_xlog(InvalidOid, true, true))
+ if (ddl_need_xlog(InvalidOid, true))
{
bool transactional = true;
const char* prefix = "";
@@ -1221,7 +1222,7 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
* there is any FOR ALL TABLES publication with pubddl_database on.
* i.e. Database level DDL replication is on for some publication.
*/
- if (ddl_need_xlog(InvalidOid, true, true))
+ if (ddl_need_xlog(InvalidOid, true))
{
bool transactional = true;
const char* prefix = "";
@@ -1318,7 +1319,7 @@ ProcessUtilitySlow(ParseState *pstate,
* Consider logging the DDL command if logical logging is enabled and this is
* a complete top level query.
*/
- if (XLogLogicalInfoActive() && isTopLevel)
+ if (XLogLogicalInfoActive())
LogLogicalDDLCommand(parsetree, queryString);
}
@@ -1530,7 +1531,8 @@ ProcessUtilitySlow(ParseState *pstate,
* this TABLE belongs to any publication with table level ddl on
*/
if (XLogLogicalInfoActive() &&
- ddl_need_xlog(relid, false, isTopLevel))
+ isCompleteQuery &&
+ ddl_need_xlog(relid, false))
{
bool transactional = true;
const char* prefix = "";
@@ -1766,7 +1768,8 @@ ProcessUtilitySlow(ParseState *pstate,
* this TABLE belongs to any publication with table level ddl on.
*/
if (XLogLogicalInfoActive() &&
- ddl_need_xlog(relid, false, isTopLevel))
+ isCompleteQuery &&
+ ddl_need_xlog(relid, false))
{
bool transactional = true;
const char* prefix = "";
@@ -1999,7 +2002,7 @@ ProcessUtilitySlow(ParseState *pstate,
break;
case T_DropStmt:
- ExecDropStmt(pstate, (DropStmt *) parsetree, isTopLevel);
+ ExecDropStmt(pstate, (DropStmt *) parsetree, isTopLevel, isCompleteQuery);
/* no commands stashed for DROP */
commandCollected = true;
break;
@@ -2220,7 +2223,8 @@ ProcessUtilityForAlterTable(Node *stmt, AlterTableUtilityContext *context)
* Dispatch function for DropStmt
*/
static void
-ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel)
+ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel,
+ bool isCompleteQuery)
{
switch (stmt->removeType)
{
@@ -2235,7 +2239,7 @@ ExecDropStmt(ParseState *pstate, DropStmt *stmt, bool isTopLevel)
case OBJECT_VIEW:
case OBJECT_MATVIEW:
case OBJECT_FOREIGN_TABLE:
- RemoveRelations(pstate, stmt, isTopLevel);
+ RemoveRelations(pstate, stmt, isCompleteQuery);
break;
default:
RemoveObjects(stmt);
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index a94e152256..8c114b2447 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -166,6 +166,6 @@ extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols,
extern Oid get_publication_oid(const char *pubname, bool missing_ok);
extern char *get_publication_name(Oid pubid, bool missing_ok);
-extern bool ddl_need_xlog(Oid relid, bool forAllTabPubOnly, bool isTopLevel);
+extern bool ddl_need_xlog(Oid relid, bool forAllTabPubOnly);
#endif /* PG_PUBLICATION_H */
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 68781365de..24106de2e5 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -28,7 +28,8 @@ struct AlterTableUtilityContext; /* avoid including tcop/utility.h here */
extern ObjectAddress DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
ObjectAddress *typaddress, const char *queryString);
-extern void RemoveRelations(ParseState *pstate, DropStmt *drop, bool isTopLevel);
+extern void RemoveRelations(ParseState *pstate, DropStmt *drop,
+ bool isCompleteQuery);
extern Oid AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode);
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
index 4e05212068..34b9d51eb1 100644
--- a/src/test/subscription/t/030_rep_ddls.pl
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -348,6 +348,44 @@ $node_publisher->wait_for_catchup('mysub');
$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_catalog.pg_cast c, pg_catalog.pg_proc p WHERE p.proname='add' AND c.castfunc=p.oid;");
is($result, qq(1), 'CreateCast Stmt is replicated');
+#TEST DDL in function
+$node_publisher->safe_psql('postgres', qq{
+CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20))
+RETURNS VOID AS \$\$
+BEGIN
+ execute format('CREATE TABLE %I(id int primary key, name varchar);', tname);
+ execute format('ALTER TABLE %I ADD c3 int', tname);
+ execute format('INSERT INTO %I VALUES (1, ''a'');', tname);
+ execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname);
+END;
+\$\$
+LANGUAGE plpgsql;});
+
+$node_publisher->safe_psql('postgres', "SELECT func_ddl('func_table');");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.func_table where c3 = 22;");
+is($result, qq(1), 'DDLs in function are replicated');
+
+#TEST DDL in procedure
+$node_publisher->safe_psql('postgres', qq{
+CREATE OR REPLACE procedure proc_ddl (tname varchar(20))
+LANGUAGE plpgsql AS \$\$
+BEGIN
+ execute format('CREATE TABLE %I(id int primary key, name varchar);', tname);
+ execute format('ALTER TABLE %I ADD c3 int', tname);
+ execute format('INSERT INTO %I VALUES (1, ''a'');', tname);
+ execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname);
+END \$\$;});
+
+$node_publisher->safe_psql('postgres', "CALL proc_ddl('proc_table');");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM s1.proc_table where c3 = 22;");
+is($result, qq(1), 'DDLs in procedure are replicated');
+
#TODO TEST certain DDLs are not replicated
pass "DDL replication tests passed!";
--
2.32.0
[application/octet-stream] 0010-Handle-partitioned-table-creation-on-the-apply-worke.patch (11.5K, 10-0010-Handle-partitioned-table-creation-on-the-apply-worke.patch)
download | inline diff:
From 19a3952e7db725a41d726bbfe54bbda6bf401975 Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Fri, 13 May 2022 19:47:44 +0000
Subject: [PATCH 10/12] Handle partitioned table creation on the apply worker:
whether a partitioned table should be added to pg_subscription_rel catalog
depends on the setting of publish_via_partition_root of the publication. Thus
we need to connect to the source DB and check whehter the partitioned table
should be subscribed.
---
src/backend/commands/subscriptioncmds.c | 54 +++++++++++++++++++++-
src/backend/replication/logical/worker.c | 35 ++++++++++++---
src/include/commands/subscriptioncmds.h | 3 ++
src/test/regress/expected/psql.out | 6 +--
src/test/subscription/t/030_rep_ddls.pl | 57 ++++++++++++++++++++++++
5 files changed, 145 insertions(+), 10 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 83e6eae855..25d0a23db9 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -23,7 +23,6 @@
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
-#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
@@ -746,6 +745,59 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
return myself;
}
+/*
+ * Check if a partitioned table is being published by any publication subscirbed by the subscription.
+ * Whether a partitioned table is published also depends on the publication option
+ * publish_via_partition_root. But the subscriber doesn't know the setting of publish_via_partition_root,
+ * this is why we need to check the source DB so that we can decide whether to subscribe to the partitioned
+ * table (could be either root or leaf table) during replication of create partitioned table.
+ */
+bool
+IsPartitionedTablePublishedOnSource(Subscription *sub, char *schema_name, char *table_name)
+{
+ char *err;
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ Oid tableRow[1] = {BOOLOID};
+ bool result;
+
+ WalReceiverConn *wrconn;
+ /* Load the library providing us libpq calls. */
+ load_file("libpqwalreceiver", false);
+
+ /* Try to connect to the publisher. */
+ wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
+
+ PG_TRY();
+ {
+ initStringInfo(&cmd);
+ appendStringInfoString(&cmd, "SELECT TRUE\n"
+ " FROM pg_catalog.pg_publication_tables t\n"
+ " WHERE t.pubname IN (");
+ get_publications_str(sub->publications, &cmd, true);
+ appendStringInfoString(&cmd, ") AND t.schemaname = '");
+ appendStringInfoString(&cmd, schema_name);
+ appendStringInfoString(&cmd, "' AND t.tablename = '");
+ appendStringInfoString(&cmd, table_name);
+ appendStringInfoString(&cmd, "'");
+
+ res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+ pfree(cmd.data);
+ result = tuplestore_tuple_count(res->tuplestore) > 0;
+ }
+ PG_FINALLY();
+ {
+ walrcv_disconnect(wrconn);
+ }
+ PG_END_TRY();
+
+ return result;
+}
+
static void
AlterSubscription_refresh(Subscription *sub, bool copy_data,
List *validate_publications)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 39702f4a0f..1108030179 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -143,6 +143,7 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
+#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
@@ -2548,6 +2549,7 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
bool snapshot_set = false;
char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */
char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */
+ bool is_partitioned_table = false;
commandTag = CreateCommandTag((Node *)command);
@@ -2564,6 +2566,8 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
RangeVar *rv = cstmt->relation;
schemaname = rv->schemaname;
relname = rv->relname;
+ if (cstmt->inhRelations != NIL || cstmt->partspec != NULL)
+ is_partitioned_table = true;
}
else if (commandTag == CMDTAG_CREATE_TABLE_AS)
{
@@ -2765,12 +2769,31 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
if (relid != InvalidOid)
{
- AddSubscriptionRelState(MySubscription->oid, relid,
- SUBREL_STATE_INIT,
- InvalidXLogRecPtr);
- ereport(DEBUG1,
- (errmsg_internal("table \"%s\" added to subscription \"%s\"",
- relname, MySubscription->name)));
+ bool subscribe_table = true;
+
+ if (is_partitioned_table)
+ {
+ Relation rel = RelationIdGetRelation(relid);
+ char *table_name = RelationGetRelationName(rel);
+ char *schema_name = get_namespace_name(RelationGetNamespace(rel));
+ /*
+ * Connect to the source DB and check whehter the partitioned table should be subscribed.
+ * Because it depends on the setting of publish_via_partition_root, which the subscription
+ * doesn't know.
+ */
+ subscribe_table = IsPartitionedTablePublishedOnSource(MySubscription, schema_name, table_name);
+ RelationClose(rel);
+ }
+
+ if (subscribe_table)
+ {
+ AddSubscriptionRelState(MySubscription->oid, relid,
+ SUBREL_STATE_INIT,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s\" added to subscription \"%s\"",
+ relname, MySubscription->name)));
+ }
}
}
}
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 2cbe7d7b65..451953af60 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -16,6 +16,7 @@
#define SUBSCRIPTIONCMDS_H
#include "catalog/objectaddress.h"
+#include "catalog/pg_subscription.h"
#include "parser/parse_node.h"
extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
@@ -26,4 +27,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
+extern bool IsPartitionedTablePublishedOnSource(Subscription *sub, char *schema_name, char *table_name);
+
#endif /* SUBSCRIPTIONCMDS_H */
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index 60acbd1241..3429e34339 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -5969,9 +5969,9 @@ List of schemas
(0 rows)
\dRp "no.such.publication"
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
-------+-------+------------+---------+---------+---------+-----------+----------
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Database level DDLs | Table level DDLs
+------+-------+------------+---------+---------+---------+-----------+----------+---------------------+------------------
(0 rows)
\dRs "no.such.subscription"
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
index 51126b489b..a03d598266 100644
--- a/src/test/subscription/t/030_rep_ddls.pl
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -431,6 +431,63 @@ $node_publisher->wait_for_catchup('mysub');
$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from test_rep where non_volatile = 0.01;");
is($result, qq(2), 'Alter table add column default 0.01 is replicated');
+# Test partitioned table creation is replicated based on the setting of publish_via_partition_root
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE s1.test_part_a (a int, b int, c int) PARTITION BY LIST (a);
+
+ CREATE TABLE s1.test_part_a_1 PARTITION OF s1.test_part_a FOR VALUES IN (1,2,3,4,5);
+ ALTER TABLE s1.test_part_a_1 ADD PRIMARY KEY (a);
+ ALTER TABLE s1.test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey;
+
+ CREATE TABLE s1.test_part_a_2 PARTITION OF s1.test_part_a FOR VALUES IN (6,7,8,9,10);
+ ALTER TABLE s1.test_part_a_2 ADD PRIMARY KEY (b);
+ ALTER TABLE s1.test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey;
+
+ -- initial data, one row in each partition
+ INSERT INTO s1.test_part_a VALUES (1, 3);
+ INSERT INTO s1.test_part_a VALUES (6, 4);
+));
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) from s1.test_part_a;");
+is($result, qq(2), 'Partitioned table is created and populated');
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.test_part_a;");
+is($result, qq(2), 'Partitioned table and data is replicated');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a'::regclass::oid;");
+is($result, qq(0), 'Root table of the partitioned table is not subscribed since publish_via_partition_root is false by default');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a_1'::regclass::oid OR srrelid = 's1.test_part_a_2'::regclass::oid;");
+is($result, qq(2), 'Only leaf tables of the partitioned table are subscribed since publish_via_partition_root is false by default');
+
+# Test only root partition table is subscribed when publish_via_partition_root is enabled
+$node_publisher->safe_psql('postgres', qq(
+ DROP TABLE s1.test_part_a;
+ ALTER PUBLICATION mypub SET (publish_via_partition_root = true);
+ CREATE TABLE s1.test_part_a (a int, b int, c int) PARTITION BY LIST (a);
+
+ CREATE TABLE s1.test_part_a_1 PARTITION OF s1.test_part_a FOR VALUES IN (1,2,3,4,5);
+ ALTER TABLE s1.test_part_a_1 ADD PRIMARY KEY (a);
+ ALTER TABLE s1.test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey;
+
+ CREATE TABLE s1.test_part_a_2 PARTITION OF s1.test_part_a FOR VALUES IN (6,7,8,9,10);
+ ALTER TABLE s1.test_part_a_2 ADD PRIMARY KEY (b);
+ ALTER TABLE s1.test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey;
+
+ -- initial data, one row in each partition
+ INSERT INTO s1.test_part_a VALUES (1, 3);
+ INSERT INTO s1.test_part_a VALUES (6, 4);
+));
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a'::regclass::oid;");
+is($result, qq(1), 'Only root table of the partitioned table is subscribed since publish_via_partition_root is enabled');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a_1'::regclass::oid OR srrelid = 's1.test_part_a_2'::regclass::oid;");
+is($result, qq(0), 'Leaf tables of the partitioned table are not subscribed since publish_via_partition_root is enabled');
+
#TODO TEST certain DDLs are not replicated
# Test DDL statement that rewrites table with volatile functions are not replicated
$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN volatile double precision DEFAULT 3 * random();");
--
2.32.0
[application/octet-stream] 0006-Add-couple-tests-for-DDL-replication-case.patch (4.5K, 11-0006-Add-couple-tests-for-DDL-replication-case.patch)
download | inline diff:
From 85f9568fb52490e8cef0f3fd4b7f5b2df0760aa3 Mon Sep 17 00:00:00 2001
From: Borui Yang <boruiyan@amazon.com>
Date: Mon, 11 Apr 2022 18:02:21 +0000
Subject: [PATCH 06/12] Add couple tests for DDL replication case
---
src/test/subscription/t/030_rep_ddls.pl | 80 +++++++++++++++++++++++++
1 file changed, 80 insertions(+)
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
index 3b15c6d9f0..4e05212068 100644
--- a/src/test/subscription/t/030_rep_ddls.pl
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -268,6 +268,86 @@ $node_publisher->wait_for_catchup('mysub');
$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from s1.t6;");
is($result, qq(1), 'SELECT INTO s1.t6 is replicated with data');
+# TEST Create DomainStmt
+$node_publisher->safe_psql('postgres', "CREATE DOMAIN s1.space_check AS VARCHAR NOT NULL CHECK (value !~ '\s');");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT t.typnotnull from pg_catalog.pg_type t where t.typname='space_check';");
+is($result, qq(t), 'CreateDomain Stmt is replicatted');
+
+# TEST AlterDomainStmt
+$node_publisher->safe_psql('postgres', "Alter domain s1.space_check drop not null;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT t.typnotnull from pg_catalog.pg_type t where t.typname='space_check';");
+is($result, qq(f), 'ALTER DOMAIN Stmt is replicated');
+
+#TEST DEFINE Stmt
+$node_publisher->safe_psql('postgres', "CREATE AGGREGATE s1.inc_sum(int) (sfunc = int4pl,stype = int,initcond = 10);");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_catalog.pg_proc p where p.proname='inc_sum';");
+is($result, qq(1), 'Define stmt is replicated');
+
+#TEST CompositeTypeStmt
+$node_publisher->safe_psql('postgres', "CREATE TYPE s1.compfoo AS (f1 int, f2 text);");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_catalog.pg_type t where t.typname='compfoo';");
+is($result, qq(1), 'CompositeType Stmt is replicated');
+
+#TEST CreateEnum Stmt
+$node_publisher->safe_psql('postgres', "CREATE TYPE s1.mood AS ENUM ('sad', 'ok', 'happy');");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_catalog.pg_type t where t.typname='mood';");
+is($result, qq(1), 'CreateEnumType Stmt is replicated');
+
+#TEST AlterEnum Stmt
+$node_publisher->safe_psql('postgres', "ALTER TYPE s1.mood RENAME VALUE 'sad' to 'fine';");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_catalog.pg_enum e, pg_catalog.pg_type t WHERE e.enumtypid = t.oid AND t.typname='mood' AND e.enumlabel='fine';");
+is($result, qq(1), 'AlterEnumType Stmt is replicated');
+
+#TEST CreateRange Stmt
+$node_publisher->safe_psql('postgres', "CREATE TYPE floatrange AS RANGE (subtype = float8,subtype_diff = float8mi);");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_catalog.pg_type t where t.typname='floatrange';");
+is($result, qq(1), 'CreateRange Stmt is replicated');
+
+#TEST VIEW Stmt
+$node_publisher->safe_psql('postgres', "CREATE VIEW s1.vista AS SELECT 'Hello World';");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_catalog.pg_class c where c.relname='vista';");
+is($result, qq(1), 'VIEW Stmt is replicated');
+
+#TEST CreateFunction Stmt
+$node_publisher->safe_psql('postgres', "CREATE FUNCTION s1.add(a integer, b integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURN a + b;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_catalog.pg_proc p where p.proname='add';");
+is($result, qq(1), 'CreateFunction Stmt is replicated');
+
+#TEST CreateCast Stmt
+$node_publisher->safe_psql('postgres', "CREATE CAST (int AS int4) WITH FUNCTION add(int,int) AS ASSIGNMENT;");
+
+$node_publisher->wait_for_catchup('mysub');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_catalog.pg_cast c, pg_catalog.pg_proc p WHERE p.proname='add' AND c.castfunc=p.oid;");
+is($result, qq(1), 'CreateCast Stmt is replicated');
+
#TODO TEST certain DDLs are not replicated
pass "DDL replication tests passed!";
--
2.32.0
[application/octet-stream] 0012-Support-replication-of-ALTER-TABLE-commands-that-rew.patch (7.3K, 12-0012-Support-replication-of-ALTER-TABLE-commands-that-rew.patch)
download | inline diff:
From 7a94899f8d92b5878d9551e7c0b3eb620c9e8a4b Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Thu, 16 Jun 2022 05:25:40 +0000
Subject: [PATCH 12/12] Support replication of ALTER TABLE commands that
rewrite table with potentially volatile functions. This is done by enabling
logical replication of table rewrite and converting the rewrite inserts to
updates which can be replayed on the subscriber without violating primary key
constraint.
An improvement is to only replicate the rewrite insert/updates when
a volatile function is used to generate the rewritten value.
---
src/backend/replication/logical/worker.c | 41 -------------------
src/backend/replication/pgoutput/pgoutput.c | 45 +++++++++++++++++++--
src/test/subscription/t/030_rep_ddls.pl | 14 ++++---
3 files changed, 50 insertions(+), 50 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ef79d10115..698ae14270 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2548,47 +2548,6 @@ preprocess_ddl(RawStmt *command, char **schemaname, char **relname, bool *is_par
}
break;
}
- /*
- * ALTER TABLE ADD COLUMN col DEFAULT volatile_expr is not supported.
- * Until we support logical replication of table rewrite, see ATRewriteTables()
- * for details on table rewrite.
- */
- case T_AlterTableStmt:
- {
- AlterTableStmt *atstmt = (AlterTableStmt *) command->stmt;
- ListCell *lc;
-
- foreach(lc, atstmt->cmds)
- {
- AlterTableCmd *cmd = lfirst_node(AlterTableCmd, lc);
-
- if (cmd->subtype == AT_AddColumn)
- {
- ColumnDef *colDef;
- ListCell *c;
-
- colDef = castNode(ColumnDef, cmd->def);
- foreach(c, colDef->constraints)
- {
- Constraint *con = lfirst_node(Constraint, c);
-
- if (con->contype == CONSTR_DEFAULT)
- {
- Node *expr;
- ParseState *pstate = make_parsestate(NULL);
-
- expr = transformExpr(pstate, copyObject(con->raw_expr), EXPR_KIND_COLUMN_DEFAULT);
- if (contain_volatile_functions(expr))
- {
- elog(ERROR,
- "Do not support replication of DDL statement that rewrites table using volatile functions");
- }
- }
- }
- }
- }
- break;
- }
case T_DropStmt:
{
DropStmt *dstmt = (DropStmt *) command->stmt;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index a66367fe7a..fd4140484c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -413,6 +413,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* This plugin uses binary protocol. */
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+ opt->receive_rewrites = true;
/*
* This is replication start and not slot initialization.
@@ -1368,9 +1369,22 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
ReorderBufferChangeType action = change->action;
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
+ bool table_rewrite = false;
update_replication_progress(ctx, false);
+ /*
+ * For heap rewrites, we might need to replicate them if the rewritten
+ * table publishes rewrite ddl message. So get the actual relation here and
+ * check the pubaction later.
+ */
+ if (relation->rd_rel->relrewrite)
+ {
+ table_rewrite = true;
+ relation = RelationIdGetRelation(relation->rd_rel->relrewrite);
+ targetrel = relation;
+ }
+
if (!is_publishable_relation(relation))
return;
@@ -1404,6 +1418,14 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Assert(false);
}
+ /*
+ * We don't publish table rewrite change unless we publish the rewrite ddl
+ * message.
+ */
+ if (table_rewrite &&
+ (!relentry->pubactions.pubddl_database || !relentry->pubactions.pubddl_table))
+ return;
+
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
@@ -1433,8 +1455,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
/* Check row filter */
- if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
- &action))
+ if (!table_rewrite &&
+ !pgoutput_row_filter(targetrel, NULL, &new_slot, relentry, &action))
break;
/*
@@ -1454,8 +1476,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
- data->binary, relentry->columns);
+
+ /*
+ * Convert the rewrite inserts to updates so that the subscriber
+ * can replay it. This is needed to make sure the data between
+ * publisher and subscriber is consistent.
+ */
+ if (table_rewrite)
+ logicalrep_write_update(ctx->out, xid, targetrel,
+ NULL, new_slot, data->binary,
+ relentry->columns);
+ else
+ logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+ data->binary, relentry->columns);
+
OutputPluginWrite(ctx, true);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
@@ -1585,6 +1619,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
ancestor = NULL;
}
+ if (table_rewrite)
+ RelationClose(relation);
+
/* Cleanup */
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
diff --git a/src/test/subscription/t/030_rep_ddls.pl b/src/test/subscription/t/030_rep_ddls.pl
index a03d598266..f5c50ffe88 100644
--- a/src/test/subscription/t/030_rep_ddls.pl
+++ b/src/test/subscription/t/030_rep_ddls.pl
@@ -488,13 +488,17 @@ is($result, qq(1), 'Only root table of the partitioned table is subscribed since
$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) from pg_subscription_rel where srrelid = 's1.test_part_a_1'::regclass::oid OR srrelid = 's1.test_part_a_2'::regclass::oid;");
is($result, qq(0), 'Leaf tables of the partitioned table are not subscribed since publish_via_partition_root is enabled');
-#TODO TEST certain DDLs are not replicated
-# Test DDL statement that rewrites table with volatile functions are not replicated
+# Test DDL statement that rewrites table with volatile functions are replicated with the same values from the publisher
$node_publisher->safe_psql('postgres', "ALTER TABLE test_rep ADD COLUMN volatile double precision DEFAULT 3 * random();");
-$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM information_schema.columns WHERE table_name = 'test_rep' and column_name = 'volatile';");
-is($result, qq(1), 'Alter table add column default random() is executed on the publisher DB.');
+$result = $node_publisher->safe_psql('postgres', "SELECT avg(volatile) FROM test_rep;;");
+
+$node_publisher->wait_for_catchup('mysub');
-$result = $node_subscriber->wait_for_log("Do not support replication of DDL statement that rewrites table using volatile functions", $result);
+$result_sub = $node_subscriber->safe_psql('postgres', "SELECT avg(volatile) FROM test_rep;;");
+is($result, qq($result_sub), 'Alter table add column default random() is replicated correctly');
+
+#TODO TEST certain DDLs are not replicated
+#$result = $node_subscriber->wait_for_log("Do not support replication of DDL statement that rewrites table using volatile functions", $result);
pass "DDL replication tests passed!";
--
2.32.0
[application/octet-stream] 0011-Remove-non-transactional-ddl-message-decoding-becaus.patch (46.9K, 13-0011-Remove-non-transactional-ddl-message-decoding-becaus.patch)
download | inline diff:
From 4e9706bf9421ec24d7fe2e4b985d44d456fa5e3e Mon Sep 17 00:00:00 2001
From: "Zheng (Zane) Li" <zhelli@amazon.com>
Date: Tue, 24 May 2022 15:37:34 +0000
Subject: [PATCH 11/12] Remove non-transactional ddl message decoding because
the use case is unclear and it has unresolved issues under concurrency. Some
code cleanup.
---
.../test_decoding/expected/ddlmessages.out | 42 +--
contrib/test_decoding/test_decoding.c | 25 +-
.../access/rmgrdesc/logicalddlmsgdesc.c | 3 +-
src/backend/commands/tablecmds.c | 8 +-
src/backend/replication/logical/ddlmessage.c | 12 +-
src/backend/replication/logical/decode.c | 13 +-
src/backend/replication/logical/logical.c | 26 +-
src/backend/replication/logical/proto.c | 12 +-
.../replication/logical/reorderbuffer.c | 64 +---
src/backend/replication/logical/worker.c | 309 ++++++++++--------
src/backend/replication/pgoutput/pgoutput.c | 17 +-
src/backend/tcop/utility.c | 35 +-
src/include/replication/ddlmessage.h | 3 +-
src/include/replication/logicalproto.h | 4 +-
src/include/replication/output_plugin.h | 2 -
src/include/replication/reorderbuffer.h | 6 +-
16 files changed, 264 insertions(+), 317 deletions(-)
diff --git a/contrib/test_decoding/expected/ddlmessages.out b/contrib/test_decoding/expected/ddlmessages.out
index 823029d03d..56cbafa4cf 100644
--- a/contrib/test_decoding/expected/ddlmessages.out
+++ b/contrib/test_decoding/expected/ddlmessages.out
@@ -24,14 +24,14 @@ CREATE TABLE test_ddlmessage (id serial unique, data int);
ALTER TABLE test_ddlmessage add c3 varchar;
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
- data
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 70 content:CREATE TABLE test_ddlmessage (id serial unique primary key, data int);
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3;
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage;
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int);
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
+ data
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 70 content:CREATE TABLE test_ddlmessage (id serial unique primary key, data int);
+ DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
+ DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE test_ddlmessage drop c3;
+ DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 27 content:DROP TABLE test_ddlmessage;
+ DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 58 content:CREATE TABLE test_ddlmessage (id serial unique, data int);
+ DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 43 content:ALTER TABLE test_ddlmessage add c3 varchar;
(6 rows)
-- Test logging DDL in function
@@ -52,20 +52,20 @@ SELECT func_ddl ('tab_from_func');
(1 row)
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
- data
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 375 content:CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) +
- RETURNS VOID AS $$ +
- BEGIN +
- execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); +
- execute format('ALTER TABLE %I ADD c3 int', tname); +
- execute format('INSERT INTO %I VALUES (1, ''a'');', tname); +
- execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); +
- END; +
- $$ +
+ data
+--------------------------------------------------------------------------------------------------------------------------------------------------------------
+ DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 375 content:CREATE OR REPLACE FUNCTION func_ddl (tname varchar(20)) +
+ RETURNS VOID AS $$ +
+ BEGIN +
+ execute format('CREATE TABLE %I(id int primary key, name varchar);', tname); +
+ execute format('ALTER TABLE %I ADD c3 int', tname); +
+ execute format('INSERT INTO %I VALUES (1, ''a'');', tname); +
+ execute format('INSERT INTO %I VALUES (2, ''b'', 22);', tname); +
+ END; +
+ $$ +
LANGUAGE plpgsql;
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 61 content:CREATE TABLE tab_from_func(id int primary key, name varchar);
- DDL message: transactional: 1 prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE tab_from_func ADD c3 int
+ DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 61 content:CREATE TABLE tab_from_func(id int primary key, name varchar);
+ DDL message: prefix: role: ddl_replication_user, search_path: "$user", public, sz: 36 content:ALTER TABLE tab_from_func ADD c3 int
BEGIN
table public.tab_from_func: INSERT: id[integer]:1 name[character varying]:'a' c3[integer]:null
table public.tab_from_func: INSERT: id[integer]:2 name[character varying]:'b' c3[integer]:22
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index a44e1f79e3..eb3dd76782 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -78,7 +78,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static void pg_decode_ddlmessage(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
- bool transactional, const char *prefix,
+ const char *prefix,
const char *role, const char *search_path,
Size sz, const char *message);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
@@ -123,7 +123,7 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static void pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
- bool transactional, const char *prefix,
+ const char *prefix,
const char *role, const char *search_path,
Size sz, const char *message);
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
@@ -771,13 +771,13 @@ pg_decode_message(LogicalDecodingContext *ctx,
static void
pg_decode_ddlmessage(LogicalDecodingContext *ctx,
- ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+ ReorderBufferTXN *txn, XLogRecPtr lsn,
const char *prefix, const char *role, const char *search_path,
Size sz, const char *message)
{
OutputPluginPrepareWrite(ctx, true);
- appendStringInfo(ctx->out, "DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:",
- transactional, prefix, role, search_path, sz);
+ appendStringInfo(ctx->out, "DDL message: prefix: %s role: %s, search_path: %s, sz: %zu content:",
+ prefix, role, search_path, sz);
appendBinaryStringInfo(ctx->out, message, sz);
OutputPluginWrite(ctx, true);
}
@@ -989,23 +989,14 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
*/
static void
pg_decode_stream_ddlmessage(LogicalDecodingContext *ctx,
- ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+ ReorderBufferTXN *txn, XLogRecPtr lsn,
const char *prefix, const char * role, const char * search_path,
Size sz, const char *message)
{
OutputPluginPrepareWrite(ctx, true);
- if (transactional)
- {
- appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu",
- transactional, prefix, role, search_path, sz);
- }
- else
- {
- appendStringInfo(ctx->out, "streaming DDL message: transactional: %d prefix: %s role: %s, search_path: %s, sz: %zu content:",
- transactional, prefix, role, search_path, sz);
- appendBinaryStringInfo(ctx->out, message, sz);
- }
+ appendStringInfo(ctx->out, "streaming DDL message: prefix: %s role: %s, search_path: %s, sz: %zu",
+ prefix, role, search_path, sz);
OutputPluginWrite(ctx, true);
}
diff --git a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
index 7a352d540a..0aaebf8a08 100644
--- a/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
+++ b/src/backend/access/rmgrdesc/logicalddlmsgdesc.c
@@ -32,8 +32,7 @@ logicalddlmsg_desc(StringInfo buf, XLogReaderState *record)
Assert(prefix[xlrec->prefix_size] != '\0');
- appendStringInfo(buf, "%s, prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ",
- xlrec->transactional ? "transactional" : "non-transactional",
+ appendStringInfo(buf, "prefix \"%s\"; role \"%s\"; search_path \"%s\"; payload (%zu bytes): ",
prefix, role, search_path, xlrec->message_size);
/* Write message payload as a series of hex bytes */
for (int cnt = 0; cnt < xlrec->message_size; cnt++)
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 65386f2641..94e350b80d 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -1516,13 +1516,11 @@ RemoveRelations(ParseState *pstate, DropStmt *drop, bool isCompleteQuery)
/* Log the Drop command for logical replication */
if (ddlxlog)
{
- bool transactional = true;
const char* prefix = "";
LogLogicalDDLMessage(prefix,
GetUserId(),
pstate->p_sourcetext,
- strlen(pstate->p_sourcetext),
- transactional);
+ strlen(pstate->p_sourcetext));
}
performMultipleDeletions(objects, drop->behavior, flags);
@@ -3876,13 +3874,11 @@ RenameRelation(ParseState *pstate, RenameStmt *stmt, bool isCompleteQuery)
if (ddlxlog &&
ddl_need_xlog(relid, false))
{
- bool transactional = true;
const char* prefix = "";
LogLogicalDDLMessage(prefix,
GetUserId(),
pstate->p_sourcetext,
- strlen(pstate->p_sourcetext),
- transactional);
+ strlen(pstate->p_sourcetext));
}
/* Do the work */
diff --git a/src/backend/replication/logical/ddlmessage.c b/src/backend/replication/logical/ddlmessage.c
index f93573079a..5f89afec49 100644
--- a/src/backend/replication/logical/ddlmessage.c
+++ b/src/backend/replication/logical/ddlmessage.c
@@ -47,7 +47,7 @@
*/
XLogRecPtr
LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message,
- size_t size, bool transactional)
+ size_t size)
{
xl_logical_ddl_message xlrec;
const char *role;
@@ -55,16 +55,12 @@ LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *message,
role = GetUserNameFromId(roleoid, false);
/*
- * Force xid to be allocated if we're emitting a transactional message.
+ * Force xid to be allocated since we're emitting a transactional message.
*/
- if (transactional)
- {
- Assert(IsTransactionState());
- GetCurrentTransactionId();
- }
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
xlrec.dbId = MyDatabaseId;
- xlrec.transactional = transactional;
/* trailing zero is critical; see logicalddlmsg_desc */
xlrec.prefix_size = strlen(prefix) + 1;
xlrec.role_size = strlen(role) + 1;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 034c7f2413..ce5b595326 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -615,7 +615,6 @@ logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
RepOriginId origin_id = XLogRecGetOrigin(r);
- Snapshot snapshot;
xl_logical_ddl_message *message;
if (info != XLOG_LOGICAL_DDL_MESSAGE)
@@ -637,17 +636,7 @@ logicalddlmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
FilterByOrigin(ctx, origin_id))
return;
- if (message->transactional &&
- !SnapBuildProcessChange(builder, xid, buf->origptr))
- return;
- else if (!message->transactional &&
- (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
- SnapBuildXactNeedsSkip(builder, buf->origptr)))
- return;
-
- snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
- ReorderBufferQueueDDLMessage(ctx->reorder, xid, snapshot, buf->endptr,
- message->transactional,
+ ReorderBufferQueueDDLMessage(ctx->reorder, xid, buf->endptr,
message->message,
/* first part of message is prefix */
message->message + message->prefix_size,
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3004f02433..c02ea6fb99 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -74,8 +74,8 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
static void ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr message_lsn, bool transactional,
- const char *prefix, const char *role, const char *search_path,
+ XLogRecPtr message_lsn, const char *prefix,
+ const char *role, const char *search_path,
Size message_size, const char *message);
/* streaming callbacks */
@@ -95,8 +95,8 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
static void stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr message_lsn, bool transactional,
- const char *prefix, const char *role, const char *search_path,
+ XLogRecPtr message_lsn, const char *prefix,
+ const char *role, const char *search_path,
Size message_size, const char *message);
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
@@ -1233,10 +1233,9 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void
ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr message_lsn, bool transactional,
- const char *prefix, const char *role,
- const char *search_path, Size message_size,
- const char *message)
+ XLogRecPtr message_lsn, const char *prefix,
+ const char *role, const char *search_path,
+ Size message_size, const char *message)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
@@ -1262,7 +1261,7 @@ ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->write_location = message_lsn;
/* do the actual work: call callback */
- ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix,
+ ctx->callbacks.ddlmessage_cb(ctx, txn, message_lsn, prefix,
role, search_path, message_size, message);
/* Pop the error context stack */
@@ -1586,10 +1585,9 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void
stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr message_lsn, bool transactional,
- const char *prefix, const char *role,
- const char* search_path, Size message_size,
- const char *message)
+ XLogRecPtr message_lsn, const char *prefix,
+ const char *role, const char* search_path,
+ Size message_size, const char *message)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
@@ -1619,7 +1617,7 @@ stream_ddlmessage_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->write_location = message_lsn;
/* do the actual work: call callback */
- ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, transactional, prefix,
+ ctx->callbacks.stream_ddlmessage_cb(ctx, txn, message_lsn, prefix,
role, search_path, message_size, message);
/* Pop the error context stack */
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 3cfa94dd8c..2072207647 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -670,16 +670,12 @@ logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn,
const char **prefix,
const char **role,
const char **search_path,
- bool *transactional,
Size *sz)
{
- uint8 flags;
const char *msg;
//TODO double check when do we need to get TransactionId.
- flags = pq_getmsgint(in, 1);
- *transactional = (flags & MESSAGE_TRANSACTIONAL) > 0;
*lsn = pq_getmsgint64(in);
*prefix = pq_getmsgstring(in);
*role = pq_getmsgstring(in);
@@ -695,22 +691,16 @@ logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn,
*/
void
logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
- bool transactional, const char *prefix, const char *role,
+ const char *prefix, const char *role,
const char *search_path, Size sz, const char *message)
{
- uint8 flags = 0;
pq_sendbyte(out, LOGICAL_REP_MSG_DDLMESSAGE);
- /* encode and send message flags */
- if (transactional)
- flags |= MESSAGE_TRANSACTIONAL;
-
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
- pq_sendint8(out, flags);
pq_sendint64(out, lsn);
pq_sendstring(out, prefix);
pq_sendstring(out, role);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index ca01336604..f5a3247348 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -881,61 +881,33 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
/*
- * A transactional DDL message is queued to be processed upon commit and a
- * non-transactional DDL message gets processed immediately.
+ * A transactional DDL message is queued to be processed upon commit
*/
void
ReorderBufferQueueDDLMessage(ReorderBuffer *rb, TransactionId xid,
- Snapshot snapshot, XLogRecPtr lsn,
- bool transactional, const char *prefix,
+ XLogRecPtr lsn, const char *prefix,
const char *role, const char *search_path,
Size message_size, const char *message)
{
- if (transactional)
- {
- MemoryContext oldcontext;
- ReorderBufferChange *change;
-
- Assert(xid != InvalidTransactionId);
-
- oldcontext = MemoryContextSwitchTo(rb->context);
-
- change = ReorderBufferGetChange(rb);
- change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE;
- change->data.ddlmsg.prefix = pstrdup(prefix);
- change->data.ddlmsg.role = pstrdup(role);
- change->data.ddlmsg.search_path = pstrdup(search_path);
- change->data.ddlmsg.message_size = message_size;
- change->data.ddlmsg.message = palloc(message_size);
- memcpy(change->data.ddlmsg.message, message, message_size);
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
- ReorderBufferQueueChange(rb, xid, lsn, change, false);
+ Assert(xid != InvalidTransactionId);
- MemoryContextSwitchTo(oldcontext);
- }
- else
- {
- ReorderBufferTXN *txn = NULL;
- volatile Snapshot snapshot_now = snapshot;
+ oldcontext = MemoryContextSwitchTo(rb->context);
- if (xid != InvalidTransactionId)
- txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+ change = ReorderBufferGetChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_DDLMESSAGE;
+ change->data.ddlmsg.prefix = pstrdup(prefix);
+ change->data.ddlmsg.role = pstrdup(role);
+ change->data.ddlmsg.search_path = pstrdup(search_path);
+ change->data.ddlmsg.message_size = message_size;
+ change->data.ddlmsg.message = palloc(message_size);
+ memcpy(change->data.ddlmsg.message, message, message_size);
- /* setup snapshot to allow catalog access */
- SetupHistoricSnapshot(snapshot_now, NULL);
- PG_TRY();
- {
- rb->ddlmessage(rb, txn, lsn, false, prefix, role, search_path, message_size, message);
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
- TeardownHistoricSnapshot(false);
- }
- PG_CATCH();
- {
- TeardownHistoricSnapshot(true);
- PG_RE_THROW();
- }
- PG_END_TRY();
- }
+ MemoryContextSwitchTo(oldcontext);
}
/*
@@ -2037,14 +2009,14 @@ ReorderBufferApplyDDLMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferChange *change, bool streaming)
{
if (streaming)
- rb->stream_ddlmessage(rb, txn, change->lsn, true,
+ rb->stream_ddlmessage(rb, txn, change->lsn,
change->data.ddlmsg.prefix,
change->data.ddlmsg.role,
change->data.ddlmsg.search_path,
change->data.ddlmsg.message_size,
change->data.ddlmsg.message);
else
- rb->ddlmessage(rb, txn, change->lsn, true,
+ rb->ddlmessage(rb, txn, change->lsn,
change->data.ddlmsg.prefix,
change->data.ddlmsg.role,
change->data.ddlmsg.search_path,
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 1108030179..ef79d10115 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2464,14 +2464,13 @@ static void
apply_handle_ddlmessage(StringInfo s)
{
XLogRecPtr lsn;
- bool transactional;
Size sz;
const char *prefix;
const char *role;
const char *search_path;
const char *msg;
- msg = logicalrep_read_ddlmessage(s, &lsn, &prefix, &role, &search_path, &transactional, &sz);
+ msg = logicalrep_read_ddlmessage(s, &lsn, &prefix, &role, &search_path, &sz);
apply_execute_sql_command(msg, role, search_path, true);
}
@@ -2486,98 +2485,41 @@ execute_sql_command_error_cb(void *arg)
}
/*
- * Execute an SQL command. This can be multiple queries.
- * This is modified based on pglogical_execute_sql_command().
+ * Preprocess certain DDL commands before apply
+ * -Remove data population for table creation
+ * -Enable missing_ok for drop stmt
+ * -Disallow table rewrites using volatile functions
*/
static void
-apply_execute_sql_command(const char *cmdstr, const char *role, const char *search_path,
- bool isTopLevel)
+preprocess_ddl(RawStmt *command, char **schemaname, char **relname, bool *is_partitioned_table)
{
- const char *save_debug_query_string = debug_query_string;
- List *parsetree_list;
- ListCell *parsetree_item;
- MemoryContext oldcontext;
- ErrorContextCallback errcallback;
- int save_nestlevel;
-
- /*
- * Switch to appropriate context for constructing parsetrees.
- */
- oldcontext = MemoryContextSwitchTo(ApplyMessageContext);
- begin_replication_step();
-
- /*
- * Set the current role to the user that executed the command on the
- * publication server.
- * Set the current search_path to the search_path on the publication
- * server when the command was executed.
- */
- save_nestlevel = NewGUCNestLevel();
- SetConfigOption("role", role, PGC_INTERNAL, PGC_S_OVERRIDE);
- SetConfigOption("search_path", search_path, PGC_INTERNAL, PGC_S_OVERRIDE);
-
- errcallback.callback = execute_sql_command_error_cb;
- errcallback.arg = (char *) cmdstr;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
-
- debug_query_string = cmdstr;
-
- parsetree_list = pg_parse_query(cmdstr);
-
- /*
- * Do a limited amount of safety checking against CONCURRENTLY commands
- * executed in situations where they aren't allowed. The sender side should
- * provide protection, but better be safe than sorry.
- */
- isTopLevel = isTopLevel && (list_length(parsetree_list) == 1);
-
- /*
- * Switch back to transaction context to enter the loop.
- */
- MemoryContextSwitchTo(oldcontext);
-
- foreach(parsetree_item, parsetree_list)
+ switch(nodeTag(command->stmt))
{
- List *plantree_list;
- List *querytree_list;
- RawStmt *command = (RawStmt *) lfirst(parsetree_item);
- CommandTag commandTag;
- MemoryContext per_parsetree_context = NULL;
- Portal portal;
- DestReceiver *receiver;
- bool snapshot_set = false;
- char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */
- char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */
- bool is_partitioned_table = false;
-
- commandTag = CreateCommandTag((Node *)command);
-
- /* The following DDL commands need special handling */
-
- /*
- * Remember the schemaname and relname if the cmd is going to create a table
- * because we will need them for some post-processing after we
- * execute the stmt. At that point, command->stmt may have been freeed up.
- */
- if (commandTag == CMDTAG_CREATE_TABLE)
+ case T_CreateStmt:
{
+ /*
+ * Remember the schemaname and relname if the cmd is going to create a table
+ * because we will need them for some post-processing after we
+ * execute the stmt. At that point, command->stmt may have been freeed up.
+ */
CreateStmt *cstmt = (CreateStmt *) command->stmt;
RangeVar *rv = cstmt->relation;
- schemaname = rv->schemaname;
- relname = rv->relname;
+ *schemaname = rv->schemaname;
+ *relname = rv->relname;
if (cstmt->inhRelations != NIL || cstmt->partspec != NULL)
- is_partitioned_table = true;
+ *is_partitioned_table = true;
+
+ break;
}
- else if (commandTag == CMDTAG_CREATE_TABLE_AS)
+ case T_CreateTableAsStmt:
{
CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt;
if (castmt->objtype == OBJECT_TABLE)
{
RangeVar *rv = castmt->into->rel;
- schemaname = rv->schemaname;
- relname = rv->relname;
+ *schemaname = rv->schemaname;
+ *relname = rv->relname;
/*
* Force skipping data population to avoid data inconsistency.
@@ -2585,17 +2527,18 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
*/
castmt->into->skipData = true;
}
+ break;
}
/* SELECT INTO */
- else if (commandTag == CMDTAG_SELECT)
+ case T_SelectStmt:
{
SelectStmt *sstmt = (SelectStmt *) command->stmt;
if (sstmt->intoClause != NULL)
{
RangeVar *rv = sstmt->intoClause->rel;
- schemaname = rv->schemaname;
- relname = rv->relname;
+ *schemaname = rv->schemaname;
+ *relname = rv->relname;
/*
* Force skipping data population to avoid data inconsistency.
@@ -2603,13 +2546,14 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
*/
sstmt->intoClause->skipData = true;
}
+ break;
}
/*
* ALTER TABLE ADD COLUMN col DEFAULT volatile_expr is not supported.
* Until we support logical replication of table rewrite, see ATRewriteTables()
* for details on table rewrite.
*/
- else if (commandTag == CMDTAG_ALTER_TABLE)
+ case T_AlterTableStmt:
{
AlterTableStmt *atstmt = (AlterTableStmt *) command->stmt;
ListCell *lc;
@@ -2637,15 +2581,153 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
if (contain_volatile_functions(expr))
{
elog(ERROR,
- "Do not support replication of DDL statement that rewrites table using volatile functions: %s",
- cmdstr);
+ "Do not support replication of DDL statement that rewrites table using volatile functions");
}
}
}
}
}
+ break;
+ }
+ case T_DropStmt:
+ {
+ DropStmt *dstmt = (DropStmt *) command->stmt;
+ dstmt->missing_ok = true;
+ break;
+ }
+ default:
+ break;
+ }
+}
+
+/*
+* Table created by DDL replication (database level) is automatically
+* added to the subscription here.
+*
+* Call AddSubscriptionRelState for CREATE TABEL and CREATE TABLE AS
+* command to set the relstate to SUBREL_STATE_INIT so DML changes on this
+* new table can be replicated without having to manually run
+* "alter subscription ... refresh publication"
+*/
+static void
+handle_create_table(char* relname, char* schemaname, bool is_partitioned_table)
+{
+ Oid relid;
+ Oid relnamespace = InvalidOid;
+
+ if (schemaname != NULL)
+ relnamespace = get_namespace_oid(schemaname, false);
+ if (relnamespace != InvalidOid)
+ relid = get_relname_relid(relname, relnamespace);
+ else
+ {
+ /*
+ * Try to resolve unqualified relname.
+ * Notice we have set the search_path to the original search_path on the publisher
+ * at the beginning of this function.
+ */
+ relid = RelnameGetRelid(relname);
+ }
+
+ if (relid != InvalidOid)
+ {
+ bool subscribe_table = true;
+
+ if (is_partitioned_table)
+ {
+ Relation rel = RelationIdGetRelation(relid);
+ char *table_name = RelationGetRelationName(rel);
+ char *schema_name = get_namespace_name(RelationGetNamespace(rel));
+ /*
+ * Connect to the source DB and check whehter the partitioned table should be subscribed.
+ * Because it depends on the setting of publish_via_partition_root, which the subscription
+ * doesn't know.
+ */
+ subscribe_table = IsPartitionedTablePublishedOnSource(MySubscription, schema_name, table_name);
+ RelationClose(rel);
}
+ if (subscribe_table)
+ {
+ AddSubscriptionRelState(MySubscription->oid, relid,
+ SUBREL_STATE_INIT,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s\" added to subscription \"%s\"",
+ relname, MySubscription->name)));
+ }
+ }
+}
+
+/*
+ * Execute an SQL command. This can be multiple queries.
+ * This is modified based on pglogical_execute_sql_command().
+ */
+static void
+apply_execute_sql_command(const char *cmdstr, const char *role, const char *search_path,
+ bool isTopLevel)
+{
+ const char *save_debug_query_string = debug_query_string;
+ List *parsetree_list;
+ ListCell *parsetree_item;
+ MemoryContext oldcontext;
+ ErrorContextCallback errcallback;
+ int save_nestlevel;
+
+ /*
+ * Switch to appropriate context for constructing parsetrees.
+ */
+ oldcontext = MemoryContextSwitchTo(ApplyMessageContext);
+ begin_replication_step();
+
+ /*
+ * Set the current role to the user that executed the command on the
+ * publication server.
+ * Set the current search_path to the search_path on the publication
+ * server when the command was executed.
+ */
+ save_nestlevel = NewGUCNestLevel();
+ SetConfigOption("role", role, PGC_INTERNAL, PGC_S_OVERRIDE);
+ SetConfigOption("search_path", search_path, PGC_INTERNAL, PGC_S_OVERRIDE);
+
+ errcallback.callback = execute_sql_command_error_cb;
+ errcallback.arg = (char *) cmdstr;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ debug_query_string = cmdstr;
+
+ parsetree_list = pg_parse_query(cmdstr);
+
+ /*
+ * Do a limited amount of safety checking against CONCURRENTLY commands
+ * executed in situations where they aren't allowed. The sender side should
+ * provide protection, but better be safe than sorry.
+ */
+ isTopLevel = isTopLevel && (list_length(parsetree_list) == 1);
+
+ /*
+ * Switch back to transaction context to enter the loop.
+ */
+ MemoryContextSwitchTo(oldcontext);
+
+ foreach(parsetree_item, parsetree_list)
+ {
+ List *plantree_list;
+ List *querytree_list;
+ RawStmt *command = (RawStmt *) lfirst(parsetree_item);
+ CommandTag commandTag;
+ MemoryContext per_parsetree_context = NULL;
+ Portal portal;
+ DestReceiver *receiver;
+ bool snapshot_set = false;
+ char *schemaname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */
+ char *relname = NULL; /* For CREATE TABLE and CREATE TABLE AS stmt only */
+ bool is_partitioned_table = false;
+
+ commandTag = CreateCommandTag((Node *)command);
+ preprocess_ddl(command, &schemaname, &relname, &is_partitioned_table);
+
/*
* Set up a snapshot if parse analysis/planning will need one.
*/
@@ -2739,63 +2821,8 @@ apply_execute_sql_command(const char *cmdstr, const char *role, const char *sear
CommandCounterIncrement();
- /*
- * Table created by DDL replication (database level) is automatically
- * added to the subscription here.
- *
- * Call AddSubscriptionRelState for CREATE TABEL and CREATE TABLE AS
- * command to set the relstate to SUBREL_STATE_INIT so DML changes on this
- * new table can be replicated without having to manually run
- * "alter subscription ... refresh publication"
- */
if (relname != NULL)
- {
- Oid relid;
- Oid relnamespace = InvalidOid;
-
- if (schemaname != NULL)
- relnamespace = get_namespace_oid(schemaname, false);
- if (relnamespace != InvalidOid)
- relid = get_relname_relid(relname, relnamespace);
- else
- {
- /*
- * Try to resolve unqualified relname.
- * Notice we have set the search_path to the original search_path on the publisher
- * at the beginning of this function.
- */
- relid = RelnameGetRelid(relname);
- }
-
- if (relid != InvalidOid)
- {
- bool subscribe_table = true;
-
- if (is_partitioned_table)
- {
- Relation rel = RelationIdGetRelation(relid);
- char *table_name = RelationGetRelationName(rel);
- char *schema_name = get_namespace_name(RelationGetNamespace(rel));
- /*
- * Connect to the source DB and check whehter the partitioned table should be subscribed.
- * Because it depends on the setting of publish_via_partition_root, which the subscription
- * doesn't know.
- */
- subscribe_table = IsPartitionedTablePublishedOnSource(MySubscription, schema_name, table_name);
- RelationClose(rel);
- }
-
- if (subscribe_table)
- {
- AddSubscriptionRelState(MySubscription->oid, relid,
- SUBREL_STATE_INIT,
- InvalidXLogRecPtr);
- ereport(DEBUG1,
- (errmsg_internal("table \"%s\" added to subscription \"%s\"",
- relname, MySubscription->name)));
- }
- }
- }
+ handle_create_table(relname, schemaname, is_partitioned_table);
}
/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index db221c7e8d..a66367fe7a 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -56,7 +56,7 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static void pgoutput_ddlmessage(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
- bool transactional, const char *prefix, const char *role,
+ const char *prefix, const char *role,
const char *search_path, Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
@@ -1705,13 +1705,14 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
static void
pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
- XLogRecPtr message_lsn, bool transactional,
+ XLogRecPtr message_lsn,
const char *prefix, const char * role,
const char *search_path, Size sz, const char *message)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
ListCell *lc;
+ PGOutputTxnData *txndata;
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -1745,20 +1746,16 @@ pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* Output BEGIN if we haven't yet. Avoid for non-transactional
* messages.
*/
- if (transactional)
- {
- PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
- /* Send BEGIN if we haven't yet */
- if (txndata && !txndata->sent_begin_txn)
- pgoutput_send_begin(ctx, txn);
- }
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_ddlmessage(ctx->out,
xid,
message_lsn,
- transactional,
prefix,
role,
search_path,
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 1abd77a60e..7cf8fe0ba3 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1131,13 +1131,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
*/
if (ddl_need_xlog(InvalidOid, true))
{
- bool transactional = true;
const char* prefix = "";
LogLogicalDDLMessage(prefix,
GetUserId(),
queryString,
- strlen(queryString),
- transactional);
+ strlen(queryString));
}
break;
@@ -1173,13 +1171,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
*/
if (ddl_need_xlog(InvalidOid, true))
{
- bool transactional = true;
const char* prefix = "";
LogLogicalDDLMessage(prefix,
GetUserId(),
queryString,
- strlen(queryString),
- transactional);
+ strlen(queryString));
}
default:
break;
@@ -1195,18 +1191,25 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
*/
case T_AlterTableStmt:
case T_IndexStmt:
+ break;
+ /*
+ * Rename of objects other than table is only allowed in database level
+ * replication.
+ * Rename of table is allowed in both table level and database level
+ * replication.
+ */
case T_RenameStmt:
{
RenameStmt *stmt = (RenameStmt *) parsetree;
- if(!stmt->relation && ddl_need_xlog(InvalidOid, true)){
- bool transactional = true;
+ if(!stmt->relation && ddl_need_xlog(InvalidOid, true))
+ {
const char* prefix = "";
LogLogicalDDLMessage(prefix,
GetUserId(),
queryString,
- strlen(queryString),
- transactional);
+ strlen(queryString));
}
+ break;
}
case T_AlterOwnerStmt: /* TODO, it is data control case, save for later update */
break;
@@ -1236,13 +1239,11 @@ LogLogicalDDLCommand(Node *parsetree, const char *queryString)
*/
if (ddl_need_xlog(InvalidOid, true))
{
- bool transactional = true;
const char* prefix = "";
LogLogicalDDLMessage(prefix,
GetUserId(),
queryString,
- strlen(queryString),
- transactional);
+ strlen(queryString));
}
break;
}
@@ -1546,13 +1547,11 @@ ProcessUtilitySlow(ParseState *pstate,
isCompleteQuery &&
ddl_need_xlog(relid, false))
{
- bool transactional = true;
const char* prefix = "";
LogLogicalDDLMessage(prefix,
GetUserId(),
queryString,
- strlen(queryString),
- transactional);
+ strlen(queryString));
}
/* ... and do it */
@@ -1783,13 +1782,11 @@ ProcessUtilitySlow(ParseState *pstate,
isCompleteQuery &&
ddl_need_xlog(relid, false))
{
- bool transactional = true;
const char* prefix = "";
LogLogicalDDLMessage(prefix,
GetUserId(),
queryString,
- strlen(queryString),
- transactional);
+ strlen(queryString));
}
address =
diff --git a/src/include/replication/ddlmessage.h b/src/include/replication/ddlmessage.h
index 1e8ef22296..0418d16dca 100644
--- a/src/include/replication/ddlmessage.h
+++ b/src/include/replication/ddlmessage.h
@@ -20,7 +20,6 @@
typedef struct xl_logical_ddl_message
{
Oid dbId; /* database Oid emitted from */
- bool transactional; /* is message transactional? */
Size prefix_size; /* length of prefix */
Size role_size; /* length of the role that executes the DDL command */
Size search_path_size; /* length of the search path */
@@ -36,7 +35,7 @@ typedef struct xl_logical_ddl_message
#define SizeOfLogicalDDLMessage (offsetof(xl_logical_ddl_message, message))
extern XLogRecPtr LogLogicalDDLMessage(const char *prefix, Oid roleoid, const char *ddl_message,
- size_t size, bool transactional);
+ size_t size);
/* RMGR API*/
#define XLOG_LOGICAL_DDL_MESSAGE 0x00
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 28ff562d62..cc0ba60905 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -233,11 +233,11 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP
bool transactional, const char *prefix,
Size sz, const char *message);
extern void logicalrep_write_ddlmessage(StringInfo out, TransactionId xid, XLogRecPtr lsn,
- bool transactional, const char *prefix, const char *role,
+ const char *prefix, const char *role,
const char *search_path, Size sz, const char *message);
extern const char *logicalrep_read_ddlmessage(StringInfo in, XLogRecPtr *lsn, const char **prefix,
const char **role, const char **search_path,
- bool *transactional, Size *sz);
+ Size *sz);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel, Bitmapset *columns);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 5b1c245b72..444b75bef0 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -94,7 +94,6 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
- bool transactional,
const char *prefix,
const char *role,
const char *search_path,
@@ -219,7 +218,6 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
- bool transactional,
const char *prefix,
const char *role,
const char *search_path,
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index dd89e08efc..cd8c69ceb8 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -445,7 +445,6 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
- bool transactional,
const char *prefix,
const char *role,
const char *search_path,
@@ -523,7 +522,6 @@ typedef void (*ReorderBufferStreamDDLMessageCB) (
ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
- bool transactional,
const char *prefix,
const char *role,
const char *search_path,
@@ -671,8 +669,8 @@ extern void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
extern void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);
-extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
- bool transactional, const char *prefix, const char *role,
+extern void ReorderBufferQueueDDLMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+ const char *prefix, const char *role,
const char *search_path, Size message_size, const char *message);
extern void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
--
2.32.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: pgsql-general@postgresql.org
Cc: zhengli10@gmail.com, amit.kapila16@gmail.com, houzj.fnst@fujitsu.com, sawada.mshk@gmail.com, japinli@hotmail.com, alvherre@alvh.no-ip.org, dilipbalaut@gmail.com, rajesh.rs0541@gmail.com, pgsql-hackers@lists.postgresql.org
Subject: Re: Support logical replication of DDLs
In-Reply-To: <CAAD30U+wDPDFzUoPkSg2WYMNCXWNc8wa7GYB1Tzh_2PNUBsEHA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox