From 8ef65da8dadcc037e11e4e9356e945ec725c3e90 Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Wed, 29 Jun 2022 17:04:32 +0800 Subject: [PATCH] Support CREATE TABLE AS SELECT INTO The main idea of replicating the CREATE TABLE AS is that we deprase the CREATE TABLE AS into a simple CREATE TABLE(without subquery) command and WAL log it after creating the table and before writing data into the table and replicate the incoming writes later as normal INSERTs. In this apporach, we don't execute the subquery on subscriber so that don't need to make sure all the objects referenced in the subquery also exists in subscriber. And This approach works for all kind of commands(e.g. CRAETE TABLE AS [SELECT][EXECUTE][VALUES]) Introducing a new type of event trigger "table_init_write". which would be fired for CREATE TABLE AS/SELECT INTO after creating the table and before any other modification. we deparse the command in the table_init_write event trigger and WAL log the deparsed json string. The walsender will send the string to subscriber. And incoming INSERTs will also be replicated. --- src/backend/commands/createas.c | 10 ++ src/backend/commands/ddl_deparse.c | 18 ++++ src/backend/commands/event_trigger.c | 167 ++++++++++++++++++++++++++++++++- src/backend/commands/publicationcmds.c | 9 ++ src/backend/tcop/utility.c | 2 + src/backend/utils/cache/evtcache.c | 2 + src/include/catalog/pg_proc.dat | 3 + src/include/commands/event_trigger.h | 4 + src/include/tcop/deparse_utility.h | 9 +- src/include/utils/evtcache.h | 3 +- 10 files changed, 221 insertions(+), 6 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 9abbb6b..989e894 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -34,6 +34,7 @@ #include "catalog/namespace.h" #include "catalog/toasting.h" #include "commands/createas.h" +#include "commands/event_trigger.h" #include "commands/matview.h" #include "commands/prepare.h" #include "commands/tablecmds.h" @@ -143,6 +144,15 @@ create_ctas_internal(List *attrList, IntoClause *into) StoreViewQuery(intoRelationAddr.objectId, query, false); CommandCounterIncrement(); } + else + { + /* + * Fire the trigger for table_init_write after creating the table so + * that we can access the catalog info about the newly created table in + * the trigger function. + */ + EventTriggerTableInitWrite((Node *) create, intoRelationAddr); + } return intoRelationAddr; } diff --git a/src/backend/commands/ddl_deparse.c b/src/backend/commands/ddl_deparse.c index 2aadd3a..91e43fd 100644 --- a/src/backend/commands/ddl_deparse.c +++ b/src/backend/commands/ddl_deparse.c @@ -3308,6 +3308,21 @@ removed feature return alterTableStmt; } +static ObjTree * +deparse_CreateTableAsStmt(CollectedCommand *cmd) +{ + Oid objectId; + Node *parsetree; + + Assert(cmd->type == SCT_CreateTableAs); + + parsetree = cmd->d.ctas.real_create; + objectId = cmd->d.ctas.address.objectId; + + return deparse_CreateStmt(objectId, parsetree); +} + + /* * Handle deparsing of simple commands. * @@ -3498,6 +3513,9 @@ deparse_utility_command(CollectedCommand *cmd) case SCT_AlterTable: tree = deparse_AlterTableStmt(cmd); break; + case SCT_CreateTableAs: + tree = deparse_CreateTableAsStmt(cmd); + break; default: elog(ERROR, "unexpected deparse node type %d", cmd->type); } diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index ddb84d4..7b5c7a0 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -133,7 +133,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) if (strcmp(stmt->eventname, "ddl_command_start") != 0 && strcmp(stmt->eventname, "ddl_command_end") != 0 && strcmp(stmt->eventname, "sql_drop") != 0 && - strcmp(stmt->eventname, "table_rewrite") != 0) + strcmp(stmt->eventname, "table_rewrite") != 0 && + strcmp(stmt->eventname, "table_init_write") != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized event name \"%s\"", @@ -159,7 +160,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) /* Validate tag list, if any. */ if ((strcmp(stmt->eventname, "ddl_command_start") == 0 || strcmp(stmt->eventname, "ddl_command_end") == 0 || - strcmp(stmt->eventname, "sql_drop") == 0) + strcmp(stmt->eventname, "sql_drop") == 0 || + strcmp(stmt->eventname, "table_init_write") == 0) && tags != NULL) validate_ddl_tags("tag", tags); else if (strcmp(stmt->eventname, "table_rewrite") == 0 @@ -586,7 +588,8 @@ EventTriggerCommonSetup(Node *parsetree, dbgtag = CreateCommandTag(parsetree); if (event == EVT_DDLCommandStart || event == EVT_DDLCommandEnd || - event == EVT_SQLDrop) + event == EVT_SQLDrop || + event == EVT_TableInitWrite) { if (!command_tag_event_trigger_ok(dbgtag)) elog(ERROR, "unexpected command tag \"%s\"", GetCommandTagName(dbgtag)); @@ -869,6 +872,158 @@ EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason) CommandCounterIncrement(); } + +/* + * EventTriggerTableInitWriteStart + * Prepare to receive data on an CREATE TABLE AS/SELET INTO command about + * to be executed. + */ +void +EventTriggerTableInitWriteStart(Node *parsetree) +{ + MemoryContext oldcxt; + CollectedCommand *command; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + command = palloc(sizeof(CollectedCommand)); + + command->type = SCT_CreateTableAs; + command->in_extension = creating_extension; + command->d.ctas.address = InvalidObjectAddress; + command->d.ctas.real_create = NULL; + command->parsetree = copyObject(parsetree); + + command->parent = currentEventTriggerState->currentCommand; + currentEventTriggerState->currentCommand = command; + + MemoryContextSwitchTo(oldcxt); +} + +/* + * EventTriggerTableInitWriteEnd + * Finish up saving an CREATE TABLE AS/SELECT INTO command. + * + * FIXME this API isn't considering the possibility that an xact/subxact is + * aborted partway through. Probably it's best to add an + * AtEOSubXact_EventTriggers() to fix this. + */ +void +EventTriggerTableInitWriteEnd(void) +{ + CollectedCommand *parent; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + parent = currentEventTriggerState->currentCommand->parent; + + pfree(currentEventTriggerState->currentCommand); + + currentEventTriggerState->currentCommand = parent; +} + +/* + * publication_deparse_table_init_write + * + * Deparse the ddl table create command and log it. + */ +Datum +publication_deparse_table_init_write(PG_FUNCTION_ARGS) +{ + char relpersist; + CollectedCommand *cmd; + char *json_string; + + if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) + elog(ERROR, "not fired by event trigger manager"); + + cmd = currentEventTriggerState->currentCommand; + Assert(cmd); + + relpersist = get_rel_persistence(cmd->d.simple.address.objectId); + + /* + * Do not generate wal log for commands whose target table is a + * temporary table. + * + * TO CHECK: Do we need to generate wal log for unlogged table ? For + * now, we still generate wal for unlogged table like what we do for + * DDLs for unlogged table in streaming replication. + */ + if (relpersist == RELPERSISTENCE_TEMP) + return PointerGetDatum(NULL); + + /* Deparse the DDL command and WAL log it to allow decoding of the same. */ + json_string = deparse_utility_command(cmd); + + if (json_string != NULL) + LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_SimpleCmd, + json_string, strlen(json_string) + 1); + + return PointerGetDatum(NULL); +} + +/* + * Fire table_init_rewrite triggers. + */ +void +EventTriggerTableInitWrite(Node *real_create, ObjectAddress address) +{ + List *runlist; + EventTriggerData trigdata; + CollectedCommand *command; + + /* + * See EventTriggerDDLCommandStart for a discussion about why event + * triggers are disabled in single user mode. + */ + if (!IsUnderPostmaster) + return; + + /* + * Also do nothing if our state isn't set up, which it won't be if there + * weren't any relevant event triggers at the start of the current DDL + * command. This test might therefore seem optional, but it's + * *necessary*, because EventTriggerCommonSetup might find triggers that + * didn't exist at the time the command started. + */ + if (!currentEventTriggerState) + return; + + command = currentEventTriggerState->currentCommand; + + runlist = EventTriggerCommonSetup(command->parsetree, + EVT_TableInitWrite, + "table_init_write", + &trigdata); + if (runlist == NIL) + return; + + /* Set the real CreateTable statment and object address */ + command->d.ctas.real_create = real_create; + command->d.ctas.address = address; + + /* Run the triggers. */ + EventTriggerInvoke(runlist, &trigdata); + + /* Cleanup. */ + list_free(runlist); + + /* + * Make sure anything the event triggers did will be visible to the main + * command. + */ + CommandCounterIncrement(); +} + /* * Invoke each event trigger in a list of event triggers. */ @@ -1149,7 +1304,8 @@ trackDroppedObjectsNeeded(void) */ return list_length(EventCacheLookup(EVT_SQLDrop)) > 0 || list_length(EventCacheLookup(EVT_TableRewrite)) > 0 || - list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0; + list_length(EventCacheLookup(EVT_DDLCommandEnd)) > 0 || + list_length(EventCacheLookup(EVT_TableInitWrite)) > 0; } /* @@ -1873,6 +2029,7 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) case SCT_AlterOpFamily: case SCT_CreateOpClass: case SCT_AlterTSConfig: + case SCT_CreateTableAs: { char *identity; char *type; @@ -1890,6 +2047,8 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) addr = cmd->d.createopc.address; else if (cmd->type == SCT_AlterTSConfig) addr = cmd->d.atscfg.address; + else if (cmd->type == SCT_AlterTSConfig) + addr = cmd->d.ctas.address; /* * If an object was dropped in the same command we may end diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 12d4b2e..ff75999 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -966,6 +966,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CMDTAG_ALTER_INDEX }; + CommandTag init_commands[] = { + CMDTAG_CREATE_TABLE_AS, + CMDTAG_SELECT_INTO + }; + /* Create the ddl_command_end event trigger */ CreateDDLReplicaEventTrigger("ddl_command_end", end_commands, lengthof(end_commands), myself, puboid); @@ -977,6 +982,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Create the table_rewrite event trigger */ CreateDDLReplicaEventTrigger("table_rewrite", rewrite_commands, lengthof(rewrite_commands), myself, puboid); + + /* Create the table_init_write event trigger */ + CreateDDLReplicaEventTrigger("table_init_write", init_commands, + lengthof(init_commands), myself, puboid); } table_close(rel, RowExclusiveLock); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 6a5bcde..4d003de 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1665,8 +1665,10 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_CreateTableAsStmt: + EventTriggerTableInitWriteStart(parsetree); address = ExecCreateTableAs(pstate, (CreateTableAsStmt *) parsetree, params, queryEnv, qc); + EventTriggerTableInitWriteEnd(); break; case T_RefreshMatViewStmt: diff --git a/src/backend/utils/cache/evtcache.c b/src/backend/utils/cache/evtcache.c index 3a9c9f0..467cd65 100644 --- a/src/backend/utils/cache/evtcache.c +++ b/src/backend/utils/cache/evtcache.c @@ -167,6 +167,8 @@ BuildEventTriggerCache(void) event = EVT_SQLDrop; else if (strcmp(evtevent, "table_rewrite") == 0) event = EVT_TableRewrite; + else if (strcmp(evtevent, "table_init_write") == 0) + event = EVT_TableInitWrite; else continue; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index bb98c17..5fe28c9 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11899,4 +11899,7 @@ { oid => '4646', descr => 'trigger for ddl command deparse table rewrite', proname => 'publication_deparse_table_rewrite', prorettype => 'event_trigger', proargtypes => '', prosrc => 'publication_deparse_table_rewrite' }, +{ oid => '4647', descr => 'trigger for ddl command deparse table init', + proname => 'publication_deparse_table_init_write', prorettype => 'event_trigger', + proargtypes => '', prosrc => 'publication_deparse_table_init_write' }, ] diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h index fd2ee7f..a9e0f71 100644 --- a/src/include/commands/event_trigger.h +++ b/src/include/commands/event_trigger.h @@ -55,6 +55,10 @@ extern void EventTriggerDDLCommandEnd(Node *parsetree); extern void EventTriggerSQLDrop(Node *parsetree); extern void EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason); +extern void EventTriggerTableInitWriteStart(Node *parsetree); +extern void EventTriggerTableInitWrite(Node *parsetree, ObjectAddress address); +extern void EventTriggerTableInitWriteEnd(void); + extern bool EventTriggerBeginCompleteQuery(void); extern void EventTriggerEndCompleteQuery(void); extern bool trackDroppedObjectsNeeded(void); diff --git a/src/include/tcop/deparse_utility.h b/src/include/tcop/deparse_utility.h index b53294b..3d294a0 100644 --- a/src/include/tcop/deparse_utility.h +++ b/src/include/tcop/deparse_utility.h @@ -29,7 +29,8 @@ typedef enum CollectedCommandType SCT_AlterOpFamily, SCT_AlterDefaultPrivileges, SCT_CreateOpClass, - SCT_AlterTSConfig + SCT_AlterTSConfig, + SCT_CreateTableAs } CollectedCommandType; /* @@ -101,6 +102,12 @@ typedef struct CollectedCommand { ObjectType objtype; } defprivs; + + struct + { + ObjectAddress address; + Node *real_create; + } ctas; } d; struct CollectedCommand *parent; /* when nested */ diff --git a/src/include/utils/evtcache.h b/src/include/utils/evtcache.h index ddb67a6..1e64831 100644 --- a/src/include/utils/evtcache.h +++ b/src/include/utils/evtcache.h @@ -22,7 +22,8 @@ typedef enum EVT_DDLCommandStart, EVT_DDLCommandEnd, EVT_SQLDrop, - EVT_TableRewrite + EVT_TableRewrite, + EVT_TableInitWrite } EventTriggerEvent; typedef struct -- 2.7.2.windows.1