From ba128ff1de0f99f6f2ed113d821e20bb6e762acf Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sat, 20 Dec 2025 15:20:09 +0530
Subject: [PATCH v19 2/6] Implement the conflict insertion infrastructure for
 the conflict log table

This patch introduces the core logic to populate the conflict log table whenever
a logical replication conflict is detected. It captures the remote transaction
details along with the corresponding local state at the time of the conflict.

Handling Multi-row Conflicts: A single remote tuple may conflict with multiple
local tuples (e.g., in the case of multiple_unique_conflicts). To handle this,
the infrastructure creates a single row in the conflict log table for each
remote tuple. The details of all conflicting local rows are aggregated into a
single JSON array in the local_conflicts column.

The JSON array uses the following structured format:
[ { "xid": "1001", "commit_ts": "2025-12-25 10:00:00+05:30", "origin": "node_1",
"key": {"id": 1}, "tuple": {"id": 1, "val": "old_data"} }, ... ]

Example of querying the structured conflict data:

SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid,
       local_conflicts[1] ->> 'tuple' AS local_tuple
FROM myschema.conflict_log_history2;

 remote_xid | relname  | remote_origin | local_xid |     local_tuple
------------+----------+---------------+-----------+---------------------
        760 | test     | pg_16406      | 771       | {"a":1,"b":10}
        765 | conf_tab | pg_16406      | 775       | {"a":2,"b":2,"c":2}
---
 src/backend/replication/logical/conflict.c   | 548 +++++++++++++++++--
 src/backend/replication/logical/launcher.c   |   1 +
 src/backend/replication/logical/worker.c     |  31 +-
 src/include/replication/conflict.h           |   4 +-
 src/include/replication/worker_internal.h    |   7 +
 src/test/subscription/t/037_conflict_dest.pl | 181 ++++++
 6 files changed, 728 insertions(+), 44 deletions(-)
 create mode 100644 src/test/subscription/t/037_conflict_dest.pl

diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 93222ee3b88..e23ff0b70cf 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,20 @@
 #include "postgres.h"
 
 #include "access/commit_ts.h"
+#include "access/heapam.h"
 #include "access/tableam.h"
+#include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "funcapi.h"
 #include "pgstat.h"
 #include "replication/conflict.h"
 #include "replication/worker_internal.h"
 #include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/jsonb.h"
 
 static const char *const ConflictTypeNames[] = {
 	[CT_INSERT_EXISTS] = "insert_exists",
@@ -34,6 +41,19 @@ static const char *const ConflictTypeNames[] = {
 	[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
 };
 
+/* Schema for the elements within the 'local_conflicts' JSON array */
+static const ConflictLogColumnDef LocalConflictSchema[] =
+{
+	{ .attname = "xid",       .atttypid = XIDOID },
+	{ .attname = "commit_ts", .atttypid = TIMESTAMPTZOID },
+	{ .attname = "origin",    .atttypid = TEXTOID },
+	{ .attname = "key",       .atttypid = JSONOID },
+	{ .attname = "tuple",     .atttypid = JSONOID }
+};
+
+#define MAX_LOCAL_CONFLICT_INFO_ATTRS \
+	(sizeof(LocalConflictSchema) / sizeof(LocalConflictSchema[0]))
+
 static int	errcode_apply_conflict(ConflictType type);
 static void errdetail_apply_conflict(EState *estate,
 									 ResultRelInfo *relinfo,
@@ -50,8 +70,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 									   TupleTableSlot *localslot,
 									   TupleTableSlot *remoteslot,
 									   Oid indexoid);
+static void build_index_datums_from_slot(EState *estate, Relation localrel,
+										 TupleTableSlot *slot,
+										 Relation indexDesc, Datum *values,
+										 bool *isnull);
 static char *build_index_value_desc(EState *estate, Relation localrel,
 									TupleTableSlot *slot, Oid indexoid);
+static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot);
+static Datum tuple_table_slot_to_indextup_json(EState *estate,
+											   Relation localrel,
+											   Oid replica_index,
+											   TupleTableSlot *slot);
+static TupleDesc build_conflict_tupledesc(void);
+static Datum build_local_conflicts_json_array(EState *estate, Relation rel,
+											  ConflictType conflict_type,
+											  List *conflicttuples);
+static void prepare_conflict_log_tuple(EState *estate, Relation rel,
+									   Relation conflictlogrel,
+									   ConflictType conflict_type,
+									   TupleTableSlot *searchslot,
+									   List *conflicttuples,
+									   TupleTableSlot *remoteslot);
 
 /*
  * Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -105,30 +144,83 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 					ConflictType type, TupleTableSlot *searchslot,
 					TupleTableSlot *remoteslot, List *conflicttuples)
 {
-	Relation	localrel = relinfo->ri_RelationDesc;
-	StringInfoData err_detail;
+	Relation		localrel = relinfo->ri_RelationDesc;
+	ConflictLogDest	dest;
+	Relation		conflictlogrel;
 
-	initStringInfo(&err_detail);
+	/*
+	 * Get both the conflict log destination and the opened conflict log
+	 * relation for insertion.
+	 */
+	conflictlogrel = GetConflictLogTableInfo(&dest);
 
-	/* Form errdetail message by combining conflicting tuples information. */
-	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
-		errdetail_apply_conflict(estate, relinfo, type, searchslot,
-								 conflicttuple->slot, remoteslot,
-								 conflicttuple->indexoid,
-								 conflicttuple->xmin,
-								 conflicttuple->origin,
-								 conflicttuple->ts,
-								 &err_detail);
+	/* Insert to table if destination is 'table' or 'all' */
+	if ((dest & CONFLICT_LOG_DEST_TABLE) != 0)
+	{
+		Assert(conflictlogrel != NULL);
+
+		/*
+		 * Prepare the conflict log tuple. If the error level is below ERROR,
+		 * insert it immediately. Otherwise, defer the insertion to a new
+		 * transaction after the current one aborts, ensuring the insertion of
+		 * the log tuple is not rolled back.
+		 */
+		prepare_conflict_log_tuple(estate,
+								   relinfo->ri_RelationDesc,
+								   conflictlogrel,
+								   type,
+								   searchslot,
+								   conflicttuples,
+								   remoteslot);
+		if (elevel < ERROR)
+			InsertConflictLogTuple(conflictlogrel);
+
+		table_close(conflictlogrel, RowExclusiveLock);
+	}
 
 	pgstat_report_subscription_conflict(MySubscription->oid, type);
 
-	ereport(elevel,
-			errcode_apply_conflict(type),
-			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
-				   get_namespace_name(RelationGetNamespace(localrel)),
-				   RelationGetRelationName(localrel),
-				   ConflictTypeNames[type]),
-			errdetail_internal("%s", err_detail.data));
+	/* Decide what detail to show in server logs. */
+	if ((dest & CONFLICT_LOG_DEST_LOG) != 0)
+	{
+		StringInfoData	err_detail;
+
+		initStringInfo(&err_detail);
+
+		/* Form errdetail message by combining conflicting tuples information. */
+		foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+			errdetail_apply_conflict(estate, relinfo, type, searchslot,
+									conflicttuple->slot, remoteslot,
+									conflicttuple->indexoid,
+									conflicttuple->xmin,
+									conflicttuple->origin,
+									conflicttuple->ts,
+									&err_detail);
+
+		/* Standard reporting with full internal details. */
+		ereport(elevel,
+				errcode_apply_conflict(type),
+				errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+					   get_namespace_name(RelationGetNamespace(localrel)),
+					   RelationGetRelationName(localrel),
+					   ConflictTypeNames[type]),
+				errdetail_internal("%s", err_detail.data));
+	}
+	else
+	{
+		/*
+		 * 'table' only: Report the error msg but omit raw tuple data from
+		 * server logs since it's already captured in the internal table.
+		 */
+		ereport(elevel,
+				errcode_apply_conflict(type),
+				errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+					   get_namespace_name(RelationGetNamespace(localrel)),
+					   RelationGetRelationName(localrel),
+					   ConflictTypeNames[type]),
+				errdetail("Conflict details logged to internal table with OID %u.",
+						  MySubscription->conflictlogrelid));
+	}
 }
 
 /*
@@ -162,6 +254,67 @@ InitConflictIndexes(ResultRelInfo *relInfo)
 	relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
 }
 
+/*
+ * GetConflictLogTableInfo
+ *
+ * Fetches conflict logging metadata from the cached MySubscription pointer.
+ * Sets the destination enum in *log_dest and, if applicable, opens and
+ * returns the relation handle for the internal log table.
+ */
+Relation
+GetConflictLogTableInfo(ConflictLogDest *log_dest)
+{
+	Oid			conflictlogrelid;
+	Relation	conflictlogrel = NULL;
+
+	/*
+	 * Convert the text log destination to the internal enum.  MySubscription
+	 * already contains the data from pg_subscription.
+	 */
+	*log_dest = GetLogDestination(MySubscription->conflictlogdest);
+	conflictlogrelid = MySubscription->conflictlogrelid;
+
+	/* If destination is 'log' only, no table to open. */
+	if (*log_dest == CONFLICT_LOG_DEST_LOG)
+		return NULL;
+
+	Assert(OidIsValid(conflictlogrelid));
+
+	conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock);
+
+	/* Conflict log table is dropped or not accessible. */
+	if (conflictlogrel == NULL)
+		ereport(WARNING,
+				(errcode(ERRCODE_UNDEFINED_TABLE),
+				 errmsg("conflict log table with OID %u does not exist",
+						conflictlogrelid)));
+
+	return conflictlogrel;
+}
+
+/*
+ * InsertConflictLogTuple
+ *
+ * Insert conflict log tuple into the conflict log table. It uses
+ * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple
+ * inserted into the conflict log table.
+ */
+void
+InsertConflictLogTuple(Relation conflictlogrel)
+{
+	int			options = HEAP_INSERT_NO_LOGICAL;
+
+	/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
+	Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+
+	heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+				GetCurrentCommandId(true), options, NULL);
+
+	/* Free conflict log tuple. */
+	heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+	MyLogicalRepWorker->conflict_log_tuple = NULL;
+}
+
 /*
  * Add SQLSTATE error code to the current conflict report.
  */
@@ -472,6 +625,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 	return tuple_value.data;
 }
 
+/*
+ * Helper function to extract the "raw" index key Datums and their null flags
+ * from a TupleTableSlot, given an already open index descriptor.
+ * This is the reusable core logic.
+ */
+static void
+build_index_datums_from_slot(EState *estate, Relation localrel,
+							 TupleTableSlot *slot,
+							 Relation indexDesc, Datum *values,
+							 bool *isnull)
+{
+	TupleTableSlot *tableslot = slot;
+
+	/*
+	 * If the slot is a virtual slot, copy it into a heap tuple slot as
+	 * FormIndexDatum only works with heap tuple slots.
+	 */
+	if (TTS_IS_VIRTUAL(slot))
+	{
+		/* Slot is created within the EState's tuple table */
+		tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+		tableslot = ExecCopySlot(tableslot, slot);
+	}
+
+	/*
+	 * Initialize ecxt_scantuple for potential use in FormIndexDatum
+	 */
+	GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+	/* Form the index datums */
+	FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values,
+				   isnull);
+}
+
 /*
  * Helper functions to construct a string describing the contents of an index
  * entry. See BuildIndexValueDescription for details.
@@ -487,41 +674,318 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
 	Relation	indexDesc;
 	Datum		values[INDEX_MAX_KEYS];
 	bool		isnull[INDEX_MAX_KEYS];
-	TupleTableSlot *tableslot = slot;
 
-	if (!tableslot)
+	if (!slot)
 		return NULL;
 
 	Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
 	indexDesc = index_open(indexoid, NoLock);
 
-	/*
-	 * If the slot is a virtual slot, copy it into a heap tuple slot as
-	 * FormIndexDatum only works with heap tuple slots.
-	 */
-	if (TTS_IS_VIRTUAL(slot))
+	build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+								 isnull);
+
+	index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+	index_close(indexDesc, NoLock);
+
+	return index_value;
+}
+
+/*
+ * tuple_table_slot_to_json_datum
+ *
+ * Helper function to convert a TupleTableSlot to Jsonb.
+ */
+static Datum
+tuple_table_slot_to_json_datum(TupleTableSlot *slot)
+{
+	HeapTuple	tuple;
+	Datum		datum;
+	Datum		json;
+
+	Assert(slot != NULL);
+
+	tuple = ExecCopySlotHeapTuple(slot);
+	datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+
+	json = DirectFunctionCall1(row_to_json, datum);
+	heap_freetuple(tuple);
+
+	return json;
+}
+
+/*
+ * tuple_table_slot_to_indextup_json
+ *
+ * Fetch replica identity key from the tuple table slot and convert into a
+ * jsonb datum.
+ */
+static Datum
+tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
+								  Oid indexid, TupleTableSlot *slot)
+{
+	Relation	indexDesc;
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	HeapTuple	tuple;
+	TupleDesc	tupdesc;
+	Datum		datum;
+
+	Assert(slot != NULL);
+
+	Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true));
+
+	indexDesc = index_open(indexid, NoLock);
+
+	build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+								 isnull);
+	tupdesc = RelationGetDescr(indexDesc);
+
+	/* Bless the tupdesc so it can be looked up by row_to_json. */
+	BlessTupleDesc(tupdesc);
+
+	/* Form the replica identity tuple. */
+	tuple = heap_form_tuple(tupdesc, values, isnull);
+	datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+	index_close(indexDesc, NoLock);
+
+	/* Convert to a JSONB datum. */
+	return DirectFunctionCall1(row_to_json, datum);
+}
+
+static TupleDesc
+build_conflict_tupledesc(void)
+{
+	TupleDesc   tupdesc;
+
+	tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+	for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++)
+		TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1),
+						   LocalConflictSchema[i].attname,
+						   LocalConflictSchema[i].atttypid,
+						   -1, 0);
+
+	BlessTupleDesc(tupdesc);
+
+	return tupdesc;
+}
+
+/*
+ * Builds the local conflicts JSONB array column from the list of
+ * ConflictTupleInfo objects.
+ *
+ * Example output structure:
+ * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
+ */
+static Datum
+build_local_conflicts_json_array(EState *estate, Relation rel,
+								 ConflictType conflict_type,
+								 List *conflicttuples)
+{
+	ListCell   *lc;
+	List	   *json_datums = NIL; /* List to hold the row_to_json results (type json) */
+	Datum	   *json_datum_array;
+	bool	   *json_null_array;
+	Datum		json_array_datum;
+	int			num_conflicts;
+	int			i;
+	int16		typlen;
+	bool		typbyval;
+	char		typalign;
+	TupleDesc	tupdesc;
+
+	/* Build local conflicts tuple descriptor. */
+	tupdesc = build_conflict_tupledesc();
+
+	/* Process local conflict tuple list and prepare an array of JSON. */
+	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
 	{
-		tableslot = table_slot_create(localrel, &estate->es_tupleTable);
-		tableslot = ExecCopySlot(tableslot, slot);
+		Datum		values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+		bool		nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+		char	   *origin_name = NULL;
+		HeapTuple	tuple;
+		Datum		json_datum;
+		int			attno;
+
+		attno = 0;
+		values[attno++] = TransactionIdGetDatum(conflicttuple->xmin);
+
+		if (conflicttuple->ts)
+			values[attno++] = TimestampTzGetDatum(conflicttuple->ts);
+		else
+			nulls[attno++] = true;
+
+		if (conflicttuple->origin != InvalidRepOriginId)
+			replorigin_by_oid(conflicttuple->origin, true, &origin_name);
+
+		/* Store empty string if origin name for the tuple is NULL. */
+		if (origin_name != NULL)
+			values[attno++] = CStringGetTextDatum(origin_name);
+		else
+			nulls[attno++] = true;
+
+		/*
+		 * Add the conflicting key values in the case of a unique constraint
+		 * violation.
+		 */
+		if (conflict_type == CT_INSERT_EXISTS ||
+			conflict_type == CT_UPDATE_EXISTS ||
+			conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS)
+		{
+			Oid	indexoid = conflicttuple->indexoid;
+
+			Assert(OidIsValid(indexoid) && conflicttuple->slot &&
+				   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock,
+											  true));
+			values[attno++] =
+					tuple_table_slot_to_indextup_json(estate, rel,
+													  indexoid,
+													  conflicttuple->slot);
+		}
+		else
+			nulls[attno++] = true;
+
+		/* Convert conflicting tuple to JSON datum. */
+		if (conflicttuple->slot)
+			values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot);
+		else
+			nulls[attno] = true;
+
+		Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+		tuple = heap_form_tuple(tupdesc, values, nulls);
+
+		json_datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+		/*
+		 * Build the higher level JSON datum in format described in function
+		 * header.
+		 */
+		json_datum = DirectFunctionCall1(row_to_json, json_datum);
+
+		/* Done with the temporary tuple. */
+		heap_freetuple(tuple);
+
+		/* Add to the array element. */
+		json_datums = lappend(json_datums, (void *) json_datum);
 	}
 
-	/*
-	 * Initialize ecxt_scantuple for potential use in FormIndexDatum when
-	 * index expressions are present.
-	 */
-	GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+	num_conflicts = list_length(json_datums);
 
-	/*
-	 * The values/nulls arrays passed to BuildIndexValueDescription should be
-	 * the results of FormIndexDatum, which are the "raw" input to the index
-	 * AM.
-	 */
-	FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+	json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum));
+	json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool));
 
-	index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+	i = 0;
+	foreach(lc, json_datums)
+	{
+		json_datum_array[i] = (Datum) lfirst(lc);
+		i++;
+	}
 
-	index_close(indexDesc, NoLock);
+	/* Construct the json[] array Datum. */
+	get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign);
+	json_array_datum = PointerGetDatum(construct_array(json_datum_array,
+													   num_conflicts,
+													   JSONOID,
+													   typlen,
+													   typbyval,
+													   typalign));
+	pfree(json_datum_array);
+	pfree(json_null_array);
+
+	return json_array_datum;
+}
 
-	return index_value;
+/*
+ * prepare_conflict_log_tuple
+ *
+ * This routine prepares a tuple detailing a conflict encountered during
+ * logical replication. The prepared tuple will be stored in
+ * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * conflict log table by calling InsertConflictLogTuple.
+ */
+static void
+prepare_conflict_log_tuple(EState *estate, Relation rel,
+						   Relation conflictlogrel,
+						   ConflictType conflict_type,
+						   TupleTableSlot *searchslot,
+						   List *conflicttuples,
+						   TupleTableSlot *remoteslot)
+{
+	Datum		values[MAX_CONFLICT_ATTR_NUM] = {0};
+	bool		nulls[MAX_CONFLICT_ATTR_NUM] = {0};
+	int			attno;
+	char	   *remote_origin = NULL;
+	MemoryContext	oldctx;
+
+	Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+
+	/* Populate the values and nulls arrays. */
+	attno = 0;
+	values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel));
+
+	values[attno++] =
+			CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+
+	values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel));
+
+	values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+
+	if (TransactionIdIsValid(remote_xid))
+		values[attno++] = TransactionIdGetDatum(remote_xid);
+	else
+		nulls[attno++] = true;
+
+	values[attno++] = LSNGetDatum(remote_final_lsn);
+
+	if (remote_commit_ts > 0)
+		values[attno++] = TimestampTzGetDatum(remote_commit_ts);
+	else
+		nulls[attno++] = true;
+
+	if (replorigin_session_origin != InvalidRepOriginId)
+		replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+	if (remote_origin != NULL)
+		values[attno++] = CStringGetTextDatum(remote_origin);
+	else
+		nulls[attno++] = true;
+
+	if (!TupIsNull(searchslot))
+	{
+		Oid		replica_index = GetRelationIdentityOrPK(rel);
+
+		/*
+		 * If the table has a valid replica identity index, build the index
+		 * json datum from key value. Otherwise, construct it from the complete
+		 * tuple in REPLICA IDENTITY FULL cases.
+		 */
+		if (OidIsValid(replica_index))
+			values[attno++] = tuple_table_slot_to_indextup_json(estate, rel,
+																replica_index,
+																searchslot);
+		else
+			values[attno++] = tuple_table_slot_to_json_datum(searchslot);
+	}
+	else
+		nulls[attno++] = true;
+
+	if (!TupIsNull(remoteslot))
+		values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+	else
+		nulls[attno++] = true;
+
+	values[attno] = build_local_conflicts_json_array(estate, rel,
+													 conflict_type,
+													 conflicttuples);
+
+	Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+
+	oldctx = MemoryContextSwitchTo(ApplyContext);
+	MyLogicalRepWorker->conflict_log_tuple =
+		heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+	MemoryContextSwitchTo(oldctx);
 }
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3ed86480be2..2dda5a44218 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -477,6 +477,7 @@ retry:
 	worker->oldest_nonremovable_xid = retain_dead_tuples
 		? MyReplicationSlot->data.xmin
 		: InvalidTransactionId;
+	worker->conflict_log_tuple = NULL;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ad281e7069b..5ac826c279f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
 static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId	remote_xid = InvalidTransactionId;
+TimestampTz	remote_commit_ts = 0;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s)
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
+	remote_commit_ts = begin_data.committime;
+	remote_xid = begin_data.xid;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
@@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s)
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
+	remote_xid = stream_xid;
+	remote_final_lsn = InvalidXLogRecPtr;
+	remote_commit_ts = 0;
+
 	if (!TransactionIdIsValid(stream_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -5609,6 +5617,27 @@ start_apply(XLogRecPtr origin_startpos)
 			pgstat_report_subscription_error(MySubscription->oid,
 											 MyLogicalRepWorker->type);
 
+			/*
+			 * Insert any pending conflict log tuple under a new transaction.
+			 */
+			if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+			{
+				Relation	conflictlogrel;
+				ConflictLogDest	dest;
+
+				StartTransactionCommand();
+				PushActiveSnapshot(GetTransactionSnapshot());
+
+				/* Open conflict log table and insert the tuple. */
+				conflictlogrel = GetConflictLogTableInfo(&dest);
+				Assert((dest & CONFLICT_LOG_DEST_TABLE) != 0);
+				InsertConflictLogTuple(conflictlogrel);
+				table_close(conflictlogrel, RowExclusiveLock);
+
+				PopActiveSnapshot();
+				CommitTransactionCommand();
+			}
+
 			PG_RE_THROW();
 		}
 	}
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index af6deaa4297..b60c0b03e26 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -132,7 +132,6 @@ static const ConflictLogColumnDef ConflictLogSchema[] =
 	{ .attname = "local_conflicts",  .atttypid = JSONARRAYOID }
 };
 
-/* Define the count using the array size */
 #define MAX_CONFLICT_ATTR_NUM (sizeof(ConflictLogSchema) / sizeof(ConflictLogSchema[0]))
 
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
@@ -145,4 +144,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								TupleTableSlot *remoteslot,
 								List *conflicttuples);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern Relation GetConflictLogTableInfo(ConflictLogDest *log_dest);
+extern void InsertConflictLogTuple(Relation conflictlogrel);
+extern bool ValidateConflictLogTable(Relation rel);
 #endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index c1285fdd1bc..5bedfc5450f 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -101,6 +101,9 @@ typedef struct LogicalRepWorker
 	 */
 	TransactionId oldest_nonremovable_xid;
 
+	/* A conflict log tuple that is prepared but not yet inserted. */
+	HeapTuple	conflict_log_tuple;
+
 	/* Stats. */
 	XLogRecPtr	last_lsn;
 	TimestampTz last_send_time;
@@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern PGDLLIMPORT List *table_states_not_ready;
 
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId	remote_xid;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
 												Oid subid, Oid relid,
diff --git a/src/test/subscription/t/037_conflict_dest.pl b/src/test/subscription/t/037_conflict_dest.pl
new file mode 100644
index 00000000000..d33993530a4
--- /dev/null
+++ b/src/test/subscription/t/037_conflict_dest.pl
@@ -0,0 +1,181 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test conflicts in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);"
+);
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE conf_tab (a int PRIMARY key, b int UNIQUE, c int UNIQUE);");
+
+$node_subscriber->safe_psql(
+	'postgres', qq[
+	 CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int, c int, unique(a,b)) PARTITION BY RANGE (a);
+	 CREATE TABLE conf_tab_2_p1 PARTITION OF conf_tab_2 FOR VALUES FROM (MINVALUE) TO (100);
+]);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub_tab FOR TABLE conf_tab, conf_tab_2");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION sub_tab
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION pub_tab WITH (conflict_log_destination=table)");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+	"INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+###############################################################################
+# Test conflict insertion into the internal conflict log table
+###############################################################################
+
+$node_subscriber->safe_psql('postgres',
+	"INSERT INTO conf_tab VALUES (10, 10, 10);");
+
+# Get the internally generated table name
+my $subid = $node_subscriber->safe_psql('postgres',
+	"SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';");
+my $conflict_table = "pg_conflict.pg_conflict_$subid";
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab VALUES (10, 20, 30);");
+
+# Wait for the conflict to be logged
+my $log_check = $node_subscriber->poll_query_until(
+    'postgres',
+    "SELECT count(*) > 0 FROM $conflict_table;"
+);
+
+is($log_check, 1, 'Conflict was successfully logged to the internal table');
+
+my $json_query = qq[
+    SELECT string_agg((unnested.j::json)->'key'->>'a', ',')
+    FROM (
+        SELECT unnest(local_conflicts) AS j
+        FROM $conflict_table
+    ) AS unnested;
+];
+
+my $all_keys = $node_subscriber->safe_psql('postgres', $json_query);
+
+# Verify that '10' is present in the resulting string
+like($all_keys, qr/10/, 'Verified that key 10 exists in the local_conflicts log');
+
+pass('Conflict type and data successfully validated in internal table');
+
+# Final cleanup for subsequent bidirectional tests in the script
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+###############################################################################
+# Test Case: update_missing
+###############################################################################
+
+# Sync a row, then delete it locally on subscriber
+$node_publisher->safe_psql('postgres', "INSERT INTO conf_tab VALUES (50, 50, 50);");
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres', "DELETE FROM conf_tab WHERE a = 50;");
+
+# Trigger conflict by updating that row on publisher
+$node_publisher->safe_psql('postgres', "UPDATE conf_tab SET b = 500 WHERE a = 50;");
+
+# Wait for the apply worker to detect the missing row and log it
+$node_subscriber->poll_query_until('postgres',
+    "SELECT count(*) > 0 FROM $conflict_table WHERE conflict_type = 'update_missing';"
+) or die "Timed out waiting for update_missing conflict";
+
+my $upd_miss_check = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM $conflict_table WHERE conflict_type = 'update_missing';");
+is($upd_miss_check, 1, 'Verified update_missing conflict logged to internal table');
+
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+###############################################################################
+# Test Case: insert_exists (via secondary unique index)
+###############################################################################
+
+# 1. Subscriber has a row with b=100
+$node_subscriber->safe_psql('postgres', "INSERT INTO conf_tab VALUES (100, 100, 100);");
+
+# 2. Publisher inserts a NEW PK (101) but a DUPLICATE 'b' (100)
+$node_publisher->safe_psql('postgres', "INSERT INTO conf_tab VALUES (101, 100, 101);");
+
+# 3. Verify it appears as 'insert_exists' in your log table
+$node_subscriber->poll_query_until('postgres',
+    "SELECT count(*) > 0 FROM $conflict_table WHERE conflict_type = 'insert_exists' AND local_conflicts::text LIKE '%100%';"
+) or die "Timed out waiting for secondary index insert_exists conflict";
+
+pass('Logged insert_exists triggered by secondary unique index violation');
+
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+###############################################################################
+# CASE 3: Switching Destination to 'log' (Server Log Verification)
+###############################################################################
+
+# Switch destination
+$node_subscriber->safe_psql('postgres',
+    "ALTER SUBSCRIPTION sub_tab SET (conflict_log_destination = 'all');");
+
+$node_subscriber->safe_psql('postgres', "DELETE FROM $conflict_table;");
+# Trigger a conflict for server log (insert_exists)
+$node_subscriber->safe_psql('postgres', "INSERT INTO conf_tab VALUES (600, 600, 600);");
+$node_publisher->safe_psql('postgres', "INSERT INTO conf_tab VALUES (600, 700, 700);");
+
+# Wait for table log
+$node_subscriber->poll_query_until('postgres', "SELECT count(*) > 0 FROM $conflict_table;")
+    or die "Timed out waiting for insert_exists conflict";
+
+# Check subscriber server log
+my $log_found = $node_subscriber->wait_for_log(
+    qr/conflict detected on relation "public.conf_tab": conflict=insert_exists/
+);
+ok($log_found, 'Conflict correctly directed to server stderr log');
+
+# Verify table count DID NOT increase for this conflict
+my $table_check = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM $conflict_table WHERE local_conflicts::text LIKE '%600%';");
+is($table_check, 1, 'Table log was bypassed when destination set to log');
+
+done_testing();
-- 
2.43.0

