From 006e999ab88a962a3ac8967d25f0bee4b421c745 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 20 May 2026 10:13:28 +0530 Subject: [PATCH v38 06/10] Review comment fixes for Implement the conflict insertion infrastructure for the conflict log table Review comment fixes for Implement the conflict insertion infrastructure for the conflict log table --- src/backend/replication/logical/conflict.c | 150 ++++++++++++++------- src/backend/replication/logical/worker.c | 32 +---- src/include/replication/conflict.h | 1 + src/test/subscription/t/030_origin.pl | 4 +- src/test/subscription/t/035_conflicts.pl | 4 +- 5 files changed, 111 insertions(+), 80 deletions(-) diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 2dc10d80bf2..db09480ef49 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -31,10 +31,8 @@ #include "storage/lmgr.h" #include "utils/array.h" #include "utils/builtins.h" -#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/pg_lsn.h" -#include "utils/json.h" /* * String representations for the supported conflict logging destinations. @@ -48,7 +46,6 @@ const char *const ConflictLogDestNames[] = { StaticAssertDecl(lengthof(ConflictLogDestNames) == 3, "ConflictLogDestNames length mismatch"); - /* Structure to hold metadata for one column of the conflict log table */ typedef struct ConflictLogColumnDef { @@ -80,17 +77,6 @@ static const ConflictLogColumnDef ConflictLogSchema[] = { #define NUM_CONFLICT_ATTRS lengthof(ConflictLogSchema) -static const char *const ConflictTypeNames[] = { - [CT_INSERT_EXISTS] = "insert_exists", - [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", - [CT_UPDATE_EXISTS] = "update_exists", - [CT_UPDATE_MISSING] = "update_missing", - [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs", - [CT_UPDATE_DELETED] = "update_deleted", - [CT_DELETE_MISSING] = "delete_missing", - [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" -}; - /* Schema for the elements within the 'local_conflicts' JSON array */ static const ConflictLogColumnDef LocalConflictSchema[] = { @@ -101,7 +87,18 @@ static const ConflictLogColumnDef LocalConflictSchema[] = { .attname = "tuple", .atttypid = JSONOID } }; -#define MAX_LOCAL_CONFLICT_INFO_ATTRS lengthof(LocalConflictSchema) +#define NUM_LOCAL_CONFLICT_ATTRS lengthof(LocalConflictSchema) + +static const char *const ConflictTypeNames[] = { + [CT_INSERT_EXISTS] = "insert_exists", + [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", + [CT_UPDATE_EXISTS] = "update_exists", + [CT_UPDATE_MISSING] = "update_missing", + [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs", + [CT_UPDATE_DELETED] = "update_deleted", + [CT_DELETE_MISSING] = "delete_missing", + [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" +}; static int errcode_apply_conflict(ConflictType type); static void errdetail_apply_conflict(EState *estate, @@ -340,7 +337,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, Relation localrel = relinfo->ri_RelationDesc; ConflictLogDest dest; Relation conflictlogrel; - bool log_dest_clt = false; + bool log_dest_table; bool log_dest_logfile; pgstat_report_subscription_conflict(MySubscription->oid, type); @@ -351,13 +348,11 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, */ conflictlogrel = GetConflictLogDestAndTable(&dest); - if (dest == CONFLICT_LOG_DEST_TABLE || dest == CONFLICT_LOG_DEST_ALL) - log_dest_clt = true; - if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL) - log_dest_logfile = true; + log_dest_table = CONFLICTS_LOGGED_TO_TABLE(dest); + log_dest_logfile = CONFLICTS_LOGGED_TO_FILE(dest); /* Insert to table if requested. */ - if (log_dest_clt) + if (log_dest_table) { Assert(conflictlogrel != NULL); @@ -386,9 +381,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, */ ereport(elevel, errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s.%s\": conflict=%s", - get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel), + errmsg("conflict detected on relation \"%s\": conflict=%s", + RelationGetQualifiedRelationName(localrel), ConflictTypeNames[type]), errdetail("Conflict details are logged to the conflict log table: %s", RelationGetRelationName(conflictlogrel))); @@ -417,14 +411,54 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, /* Standard reporting with full internal details. */ ereport(elevel, errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s.%s\": conflict=%s", - get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel), + errmsg("conflict detected on relation \"%s\": conflict=%s", + RelationGetQualifiedRelationName(localrel), ConflictTypeNames[type]), errdetail_internal("%s", err_detail.data)); } } +/* + * ProcessPendingConflictLogTuple + * Insert any deferred conflict log tuple in a separate transaction. + * + * For conflicts raised at ERROR level, the conflict log tuple cannot be + * inserted immediately because the surrounding transaction will abort. + * To ensure that conflict information is not lost, such tuples are prepared + * during error processing (see prepare_conflict_log_tuple()) but their + * insertion is deferred. + * + * This function is responsible for completing that deferred insertion after + * the failing transaction has been aborted and the system has returned to an + * idle state. It executes the insertion in a new, independent transaction, + * ensuring that the conflict log entry is durable and not rolled back + * together with the failed apply transaction. + */ +void +ProcessPendingConflictLogTuple(void) +{ + Relation conflictlogrel; + ConflictLogDest dest; + + /* Nothing to do */ + if (MyLogicalRepWorker->conflict_log_tuple == NULL) + return; + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Open conflict log table and insert the tuple */ + conflictlogrel = GetConflictLogDestAndTable(&dest); + Assert(conflictlogrel); + + InsertConflictLogTuple(conflictlogrel); + + table_close(conflictlogrel, RowExclusiveLock); + + PopActiveSnapshot(); + CommitTransactionCommand(); +} + /* * Find all unique indexes to check for a conflict and store them into * ResultRelInfo. @@ -475,7 +509,7 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest) *log_dest = GetLogDestination(MySubscription->conflictlogdest); /* Quick exit if a conflict log table was not requested. */ - if (*log_dest == CONFLICT_LOG_DEST_LOG) + if (!CONFLICTS_LOGGED_TO_TABLE(*log_dest)) return NULL; conflictlogrelid = MySubscription->conflictlogrelid; @@ -495,13 +529,11 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest) void InsertConflictLogTuple(Relation conflictlogrel) { - int options = HEAP_INSERT_NO_LOGICAL; - /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, - GetCurrentCommandId(true), options, NULL); + GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL); /* Free conflict log tuple. */ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); @@ -1056,7 +1088,7 @@ tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, isnull); - tupdesc = RelationGetDescr(indexDesc); + tupdesc = CreateTupleDescCopy(RelationGetDescr(indexDesc)); /* Bless the tupdesc so it can be looked up by row_to_json. */ BlessTupleDesc(tupdesc); @@ -1065,8 +1097,9 @@ tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, tuple = heap_form_tuple(tupdesc, values, isnull); datum = heap_copy_tuple_as_datum(tuple, tupdesc); - index_close(indexDesc, NoLock); heap_freetuple(tuple); + FreeTupleDesc(tupdesc); + index_close(indexDesc, NoLock); /* Convert to a JSON datum. */ return DirectFunctionCall1(row_to_json, datum); @@ -1075,26 +1108,41 @@ tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, /* * build_conflict_tupledesc * - * Build and bless a tuple descriptor for the internal conflict log table - * based on the predefined LocalConflictSchema. + * Build and bless a tuple descriptor for the conflict log table based on the + * predefined LocalConflictSchema. */ static TupleDesc build_conflict_tupledesc(void) { - TupleDesc tupdesc; + static TupleDesc cached_tupdesc = NULL; - tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS); + if (cached_tupdesc == NULL) + { + MemoryContext oldcxt; - for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++) - TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1), - LocalConflictSchema[i].attname, - LocalConflictSchema[i].atttypid, - -1, 0); + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - TupleDescFinalize(tupdesc); - BlessTupleDesc(tupdesc); + cached_tupdesc = CreateTemplateTupleDesc(NUM_LOCAL_CONFLICT_ATTRS); - return tupdesc; + for (int i = 0; i < NUM_LOCAL_CONFLICT_ATTRS; i++) + TupleDescInitEntry(cached_tupdesc, + (AttrNumber) (i + 1), + LocalConflictSchema[i].attname, + LocalConflictSchema[i].atttypid, + -1, 0); + + TupleDescFinalize(cached_tupdesc); + + /* + * Bless once so it can be used as a RECORD type (e.g. for + * row_to_json or other record-based operations). + */ + BlessTupleDesc(cached_tupdesc); + + MemoryContextSwitchTo(oldcxt); + } + + return cached_tupdesc; } /* @@ -1126,8 +1174,8 @@ build_local_conflicts_json_array(EState *estate, Relation rel, /* Process local conflict tuple list and prepare an array of JSON. */ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) { - Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0}; - bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0}; + Datum values[NUM_LOCAL_CONFLICT_ATTRS] = {0}; + bool nulls[NUM_LOCAL_CONFLICT_ATTRS] = {0}; char *origin_name = NULL; HeapTuple tuple; Datum json_datum; @@ -1177,7 +1225,7 @@ build_local_conflicts_json_array(EState *estate, Relation rel, else nulls[attno] = true; - Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS); + Assert(attno + 1 == NUM_LOCAL_CONFLICT_ATTRS); tuple = heap_form_tuple(tupdesc, values, nulls); @@ -1236,8 +1284,8 @@ prepare_conflict_log_tuple(EState *estate, Relation rel, List *conflicttuples, TupleTableSlot *remoteslot) { - Datum values[MAX_CONFLICT_ATTR_NUM] = {0}; - bool nulls[MAX_CONFLICT_ATTR_NUM] = {0}; + Datum values[NUM_CONFLICT_ATTRS] = {0}; + bool nulls[NUM_CONFLICT_ATTRS] = {0}; int attno; char *remote_origin = NULL; MemoryContext oldctx; @@ -1303,7 +1351,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel, conflict_type, conflicttuples); - Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM); + Assert(attno + 1 == NUM_CONFLICT_ATTRS); oldctx = MemoryContextSwitchTo(ApplyContext); MyLogicalRepWorker->conflict_log_tuple = diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 469451c736a..70ae38a7bd1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1766,15 +1766,15 @@ apply_handle_stream_start(StringInfo s) /* extract XID of the top-level transaction */ stream_xid = logicalrep_read_stream_start(s, &first_segment); - remote_xid = stream_xid; - remote_final_lsn = InvalidXLogRecPtr; - remote_commit_ts = 0; - if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); + remote_xid = stream_xid; + remote_final_lsn = InvalidXLogRecPtr; + remote_commit_ts = 0; + set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr); /* Try to allocate a worker for the streaming transaction. */ @@ -5674,27 +5674,7 @@ start_apply(XLogRecPtr origin_startpos) */ AbortOutOfAnyTransaction(); pgstat_report_subscription_error(MySubscription->oid); - - /* - * Insert any pending conflict log tuple under a new transaction. - */ - if (MyLogicalRepWorker->conflict_log_tuple != NULL) - { - Relation conflictlogrel; - ConflictLogDest dest; - - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - /* Open conflict log table and insert the tuple. */ - conflictlogrel = GetConflictLogDestAndTable(&dest); - Assert(dest != CONFLICT_LOG_DEST_LOG); - InsertConflictLogTuple(conflictlogrel); - table_close(conflictlogrel, RowExclusiveLock); - - PopActiveSnapshot(); - CommitTransactionCommand(); - } + ProcessPendingConflictLogTuple(); PG_RE_THROW(); } @@ -6069,6 +6049,8 @@ DisableSubscriptionAndExit(void) */ pgstat_report_subscription_error(MyLogicalRepWorker->subid); + ProcessPendingConflictLogTuple(); + /* Disable the subscription */ StartTransactionCommand(); diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 6dcb7970bb7..8829f6c6378 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -115,6 +115,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples); +extern void ProcessPendingConflictLogTuple(void); extern void InitConflictIndexes(ResultRelInfo *relInfo); extern Relation GetConflictLogDestAndTable(ConflictLogDest *log_dest); extern void InsertConflictLogTuple(Relation conflictlogrel); diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index 6bc6b7874c2..5f4d00bdd33 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -166,7 +166,7 @@ is($result, qq(32), 'The node_A data replicated to node_B'); $node_C->safe_psql('postgres', "UPDATE $tab SET a = 33 WHERE a = 32;"); $node_B->wait_for_log( - qr/conflict detected on relation "public.$tab_unquoted": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./ + qr/conflict detected on relation "public.$tab": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./ ); $node_B->safe_psql('postgres', "DELETE FROM $tab;"); @@ -182,7 +182,7 @@ is($result, qq(33), 'The node_A data replicated to node_B'); $node_C->safe_psql('postgres', "DELETE FROM $tab WHERE a = 33;"); $node_B->wait_for_log( - qr/conflict detected on relation "public.$tab_unquoted": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/ + qr/conflict detected on relation "public.$tab": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/ ); # The remaining tests no longer test conflict detection. diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 05c2179b9a8..4f3880e5b83 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -85,11 +85,11 @@ $node_subscriber->wait_for_log( $log_offset); # Verify the contents of the Conflict Log Table (CLT) -# This section ensures that the clt contains the expected +# This section ensures that the CLT contains the expected # type and specific key data. my $subid = $node_subscriber->safe_psql('postgres', "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';"); -my $clt = "pg_conflict.pg_conflict_log_$subid"; +my $clt = "pg_conflict.pg_conflict_log_for_subid_$subid"; # Wait for the conflict to be logged in the CLT my $log_check = $node_subscriber->poll_query_until( -- 2.53.0