public inbox for pgsql-hackers@postgresql.org
help / color / mirror / Atom feedFrom: Dilip Kumar <dilipbalaut@gmail.com>
To: Peter Smith <smithpb2250@gmail.com>
Cc: Amit Kapila <amit.kapila16@gmail.com>
Cc: shveta malik <shveta.malik@gmail.com>
Cc: vignesh C <vignesh21@gmail.com>
Cc: Masahiko Sawada <sawada.mshk@gmail.com>
Cc: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Cc: PostgreSQL Hackers <pgsql-hackers@lists.postgresql.org>
Subject: Re: Proposal: Conflict log history table for Logical Replication
Date: Mon, 19 Jan 2026 10:56:44 +0530
Message-ID: <CAFiTN-v9i9RmDvdUmtMUow4=b+nr0k7LKMyEQ+6ZF=EVdfBhBA@mail.gmail.com> (raw)
In-Reply-To: <CAHut+PsWms218ENALnytLEV4NpxjOrAYhChLDaMaeE65-vNgrQ@mail.gmail.com>
References: <CAFiTN-u5D5o_AGNbHRZHaOqAMWkxLf+hSk_r9X3gv6HbLOB5+g@mail.gmail.com>
<CALj2ACViThGQDYi-yeqUeHqG2Pozn2AiyvtDtjE6zhhbM0KsEA@mail.gmail.com>
<CAA4eK1+44b3vd_OWfiaVNtjf5Njb5cek09pmKRmttBByeg0NoA@mail.gmail.com>
<CAFiTN-v3L0WacCDx5dkOSonaZQbJfstXL4HrCPD1ahRdUsRnSg@mail.gmail.com>
<CALj2ACW63uuxh0fSoxEAF8OMWhz1dJKSkp268WJDzf5BUqCf5g@mail.gmail.com>
<CAFiTN-s9WWLOhW1TO27NtJwGf0bh2+MWyp3NEkZFeN_S5_p_rA@mail.gmail.com>
<CAA4eK1LxnsEx5sMbQkK5MHAgXKPROMQXQ0n=fKMwz+UsfKQaMQ@mail.gmail.com>
<CAD21AoDj+c4LXf2y4ESR-gVyv9d8V0G4R8R9pn-PcmT5zPzYcg@mail.gmail.com>
<CAA4eK1KokmAwNOL6bS-ip_E3F96PiQTjC4j-M+5vD1T6uUyi3Q@mail.gmail.com>
<CAFiTN-vFKE8E_N6h+peX9DP92mxCeFdm5A9Esn4DkLmNcZ-dOA@mail.gmail.com>
<CAFiTN-shLYf-fOTQ_dBf3Xfx05gxs_8d93MHZXyyz6w2Bg5geQ@mail.gmail.com>
<CAFiTN-tEgkKQHUikn6iBFCYf7XOObR7ncUq=OVh7WEk=6P4ymw@mail.gmail.com>
<CAFiTN-tQiakd8m+-d6WN6RpJXSv_JcropZ2oGzme4d1JudQhYg@mail.gmail.com>
<CAJpy0uDKbYWt+YPADj=4fHEvrGEWgnG1n_YsiGT_EZiZf0VSAw@mail.gmail.com>
<CAFiTN-t82BiXen+HfdR9jZyOpuSO92xonnUK=khXsiZWBfOxMA@mail.gmail.com>
<CAJpy0uAu2paxGAEffD=vaBTW9Jqbtxxawb8K8FgiASfeKPnGog@mail.gmail.com>
<CAJpy0uC0ZWgHOivJ102A1fMkppwK3RuSMafRPKyjwkmJrjhVUw@mail.gmail.com>
<CAFiTN-vFV9-zajrwjYHYyFnyQsooOAXW4CpxB5f-iT3APjOtoQ@mail.gmail.com>
<CAJpy0uBeU1dZgaqsSVKc=P=EVUKxRgVuHR8jDXFL-HLibbE-kQ@mail.gmail.com>
<CAA4eK1+FOkOxhzVLAnDymoNjp4i98H-L1+ZsWDgJEv-ndnTzTA@mail.gmail.com>
<CAFiTN-sVK6Bp+BawCJU_WpAXQSTX4OkKmce5EE4YNBgD-XSjZw@mail.gmail.com>
<CAA4eK1LbjV0bctib9wUnBpEkC+2rZFPnGuRtrKuc5AtUAzum+A@mail.gmail.com>
<CAFiTN-vq50N3QP9p3_SH+tJ8Pn=uRDb0X4qEcQZYcGW9AX88rQ@mail.gmail.com>
<CAFiTN-u3+zRGPESP5kUUfa6NxaWh1HL-gd1225KJ0Uvzi1urow@mail.gmail.com>
<CAA4eK1L4iNk6mNTC83PbYrRfUdtivH4U961PkdFfOO7mvc=USg@mail.gmail.com>
<CAFiTN-v+Mh64UfR5zb5rwgyGm6HS80XRSZ_XeaWkg8=+s9o3Kg@mail.gmail.com>
<CAFiTN-s3ZFHteQsiC3H4=AjTWxuwN-w69XQ3xL5X6YOMTua4pA@mail.gmail.com>
<CAJpy0uDe724nY59j-8hMapZ_Fru1Wo-NucF4Ea1B3Jrw=+J+UQ@mail.gmail.com>
<CAFiTN-uR=86L_5tyiA7n73EXCSCuDfQKfL5O=c8n7zZom8_ONQ@mail.gmail.com>
<CAD21AoDfOS-J0M9WbM3D20eGbSPzbfLQ-9XoYkxO4AZ9twqyvg@mail.gmail.com>
<CAFiTN-vMTg2X7vwfHLr5Gvy8ViV63_iaEcpHmM8V5GpA9-u8cg@mail.gmail.com>
<CAA4eK1+b2Ws0e_ZYJsgZAPn7VWndxAK_YM_QMKcfXst3e7F6Jg@mail.gmail.com>
<CAFiTN-v6hFKMPrSyTBsz=AtEETYMbOxrqvhZJsPQqKgQc4WCLw@mail.gmail.com>
<CAA4eK1KV3rYkaxys5fh-PtE9kq5xrFbiaRpOSPoRgQG494ek+g@mail.gmail.com>
<CAFiTN-utvu=QjY1QQ1a_TvkpkpvesMWo9M8wTFYLaOTPdpOJvw@mail.gmail.com>
<CAA4eK1+HoSOEqNwT3twArPNx4_D7hSUoEg2LnYhX8n9iUwhXgQ@mail.gmail.com>
<CAFiTN-tqmsfW0Sk=1RhzuduxqLrf9KEc8VOvBae+4aYxWTJwuA@mail.gmail.com>
<CAA4eK1JmCQ=DHe3HsqpX+P3mGDUd_Z7E7oAxdstK6822W6tuCw@mail.gmail.com>
<CAFiTN-uE4eAUYewuq3c5deAt3TtVork+H6rkUHRv68cOGr5rmQ@mail.gmail.com>
<CAFiTN-sJbhPX+LbA8YuQeYJpfGA2XA+OKXf8jCm04RoJOyzLvw@mail.gmail.com>
<CAJpy0uBPOyWj9itFjHzGXfrUuYS8KGmAvgdcV_9FPjWZ0EZz_w@mail.gmail.com>
<CAFiTN-s=iLE4qM4qmw9yXKqW09R_c_HqaSGeZXJ2EaTVfXss+g@mail.gmail.com>
<CAA4eK1KYo0vZpPSRc_4gVpa06-J39gxjs3tHFyckgkBfYJSfFA@mail.gmail.com>
<CAFiTN-vrKc6OWzrg6yvpwYcj79k=zkrDp3uwiZzjwrWLJAq6tw@mail.gmail.com>
<CAA4eK1LmvrfEgn1NUZZ=E3yMCjQdNZ5=_SBEry73-EmF6jM_PQ@mail.gmail.com>
<CAFiTN-vjfub5b3PqPQzfOw9BSjm8jt28ott+Hoz9CrRxJHzYkg@mail.gmail.com>
<CAFiTN-v=ANapYvRK+SOy2wJb4CSuD6Vb6_bTGuReM9Dv+3tucA@mail.gmail.com>
<CALDaNm1zEYoSdf2Ns-=UJRw95E5sbfpB0oaNUWtRJN27Q1Knhw@mail.gmail.com>
<CALDaNm3USsXVNBsfdpkp60HVgrTV4taWMk1xZYNBa7QUF=V0jg@mail.gmail.com>
<CAFiTN-sNg9ghLNkB2Kn0SwBGOub9acc99XZZU_d5NAcyW-yrEg@mail.gmail.com>
<CAJpy0uAF3EYcYdpTHdKMeXfvaPbNvnWrZUATrSLL1hqjao=33A@mail.gmail.com>
<CAFiTN-uikggCKp2LscTorKY5d3KF9j93DW0xebDcRX86G+ZsSw@mail.gmail.com>
<CAJpy0uDaOoVK8S3_xxTAcTDpfK1AY7tApw7nPOZG_gUz+DMi=Q@mail.gmail.com>
<CAA4eK1+AdeC5B9xrAXSKWGtTh-0d8xdD=fZttmOBm+c8o8thAQ@mail.gmail.com>
<CAFiTN-skBQAeuzuUd+PDK0Gqc8g+4x9ypBMwJhOrmW8ZCFKGSA@mail.gmail.com>
<CAJpy0uCdrsW5T+okq7xTOVxagje7FW3DOeY5B0CGKYa5VqF_tQ@mail.gmail.com>
<CAFiTN-u+_mFj9caYYFO7=_YHFXk5y=vvOm2H2=5hctYktmAVGA@mail.gmail.com>
<CALDaNm1aivk9KgQ5daeF6YZzuE+0wWc2yb7wb6qikNyvfPN0Sg@mail.gmail.com>
<CAJpy0uD6fTEUYJx3+yDbvB=VW7c5AaGoeSd7iwHdYYO=kYGn3g@mail.gmail.com>
<CALDaNm2YOOdJ25X1sJ+DYz37K6Qi4g0ZNFHb_pQMF9UqancnEA@mail.gmail.com>
<CAHut+PtMS5bENS0DVtBj+s3kUEOq61+hSkqLODjFB78egB0imQ@mail.gmail.com>
<CAFiTN-s_M83sfs+MHHbUrMesjsCPN4JWxY5MChCEiY1U-u7=9g@mail.gmail.com>
<CAFiTN-vj8NTm9w_L2XdhxJCub_RZw__YVUgfXa1B1kJzJctRNw@mail.gmail.com>
<CAJpy0uBDLnfhuSiev8W9ZMFNTzUmqhds2dKayUpLoN-z1dtsLA@mail.gmail.com>
<CAFiTN-uL9f0X+=Ep4BbAPvaTJA7S4XHM--G4BsnPJw4uJW7EGQ@mail.gmail.com>
<CAJpy0uDG=t-y_m8t1zpBzfz9viP3K8dyQgkruaraVT85UtTkrg@mail.gmail.com>
<CAFiTN-tR8Rhs8uhfbck0Ac4dd1MopvvYgjK39nWyNXRp9Z3Qww@mail.gmail.com>
<CAA4eK1Kf15UpNmpTTE2XyX=9PE_oTpOoy5xqg3rFWbxwwP4Rbg@mail.gmail.com>
<CAFiTN-tNqb0vjuadDz-as67ksSXa=aEK+JW=4b54RVmkUK1m2Q@mail.gmail.com>
<CAFiTN-vDCxx6ydUFo59L8qNBbierg4as3TGPPiavR7UZjYurzA@mail.gmail.com>
<CAHut+PsWms218ENALnytLEV4NpxjOrAYhChLDaMaeE65-vNgrQ@mail.gmail.com>
On Mon, Jan 19, 2026 at 9:42 AM Peter Smith <smithpb2250@gmail.com> wrote:
>
> Some review comments for v22-0003.
>
> ======
>
> 1.
> It looks like none of my previous v20-0003 review comments [1] have
> been addressed. Maybe accidentally overlooked?
>
> ======
>
> 2.
> + <caution>
> + <para>
> + The internal conflict logging table is strictly tied to
> the lifecycle of the
> + subscription or the
> <literal>conflict_log_destination</literal> setting. If
> + the subscription is dropped, or if the destination is changed to
> + <literal>log</literal>, the table and all its recorded
> conflict data are
> + <emphasis>permanently deleted</emphasis>. To perform a
> post-mortem analysis
> + after removing a subscription, users must manually back up
> or rename the
> + conflict table before the deletion occurs.
> + </para>
> + </caution>
>
> 2a.
> Let's consistently call this the "Conflict log table", same as
> everywhere else, not "logging table".
>
> ~
>
> 2b.
> This is only a caution for the CLT, so I felt it's better to put this
> in the scope of the 'table' param value.
>
> ~~~
>
> 3.
> + analysis of conflicts. This table is automatically
> dropped when the
> + subscription is removed.
>
> If you move the <caution> to this scope, as suggested above in #2b,
> then you can remove the sentence "This table is automatically dropped
> when the subscription is removed", because that is duplicate
> information you already wrote in the caution.
The attached patch fixes above comments and other comments reported in
v22-0001 and v22-0002
> ======
> [1] v20 docs review -
> https://www.postgresql.org/message-id/CAHut%2BPuzB4gNYvqX9hb28KE0RK_xhU%2B2-%3DwUfL5OEVUCi92Hqw%40ma...
I think I missed them, so I will respond to them separately.
--
Regards,
Dilip Kumar
Google
Attachments:
[application/octet-stream] v23-0001-Add-configurable-conflict-log-table-for-Logical-.patch (111.6K, 2-v23-0001-Add-configurable-conflict-log-table-for-Logical-.patch)
download | inline diff:
From 71ccecd7bcb5fbea2fe282bc2d461e739b5a6cb4 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Wed, 17 Dec 2025 11:53:47 +0530
Subject: [PATCH v23 1/3] Add configurable conflict log table for Logical
Replication
This patch adds a feature to provide a structured, queryable record of all
logical replication conflicts. The current approach of logging conflicts as
plain text in the server logs makes it difficult to query, analyze, and
use for external monitoring and automation.
This patch addresses these limitations by introducing a configurable
conflict_log_destination=('log'/'table'/'all') option in the CREATE SUBSCRIPTION
command.
If the user chooses to enable logging to a table (by selecting 'table' or 'all'),
an internal logging table named conflict_log_table_<subid> is automatically
created within a dedicated, system-managed namespace named pg_conflict. This table
is linked to the subscription via an internal dependency, ensuring it is
automatically dropped when the subscription is removed.
The conflict details, including the original and remote tuples, are stored in JSON
columns, providing a flexible format to accommodate different table schemas.
The log table captures essential attributes such as local and remote transaction IDs,
LSNs, commit timestamps, and conflict type, providing a complete record for post-mortem
analysis.
This feature will make logical replication conflicts easier to monitor and manage,
significantly improving the overall resilience and operability of replication setups.
The conflict log tables will not be included in a publication, even if the publication
is configured to include ALL TABLES or ALL TABLES IN SCHEMA.
---
src/backend/catalog/catalog.c | 27 +-
src/backend/catalog/heap.c | 3 +-
src/backend/catalog/namespace.c | 8 +-
src/backend/catalog/pg_publication.c | 14 +-
src/backend/catalog/pg_subscription.c | 7 +
src/backend/commands/subscriptioncmds.c | 239 ++++++++++-
src/backend/commands/tablecmds.c | 6 +-
src/backend/executor/execMain.c | 15 +
src/bin/psql/describe.c | 16 +-
src/bin/psql/tab-complete.in.c | 8 +-
src/include/catalog/catalog.h | 2 +
src/include/catalog/pg_namespace.dat | 3 +
src/include/catalog/pg_subscription.h | 11 +
src/include/commands/subscriptioncmds.h | 3 +
src/include/replication/conflict.h | 55 +++
src/test/regress/expected/subscription.out | 444 +++++++++++++++++----
src/test/regress/sql/subscription.sql | 217 ++++++++++
src/tools/pgindent/typedefs.list | 2 +
18 files changed, 978 insertions(+), 102 deletions(-)
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 7be49032934..d438dc682ec 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -86,7 +86,8 @@ bool
IsSystemClass(Oid relid, Form_pg_class reltuple)
{
/* IsCatalogRelationOid is a bit faster, so test that first */
- return (IsCatalogRelationOid(relid) || IsToastClass(reltuple));
+ return (IsCatalogRelationOid(relid) || IsToastClass(reltuple)
+ || IsConflictClass(reltuple));
}
/*
@@ -230,6 +231,18 @@ IsToastClass(Form_pg_class reltuple)
return IsToastNamespace(relnamespace);
}
+/*
+ * IsConflictClass - Check if the given pg_class tuple belongs to the conflict
+ * namespace.
+ */
+bool
+IsConflictClass(Form_pg_class reltuple)
+{
+ Oid relnamespace = reltuple->relnamespace;
+
+ return IsConflictNamespace(relnamespace);
+}
+
/*
* IsCatalogNamespace
* True iff namespace is pg_catalog.
@@ -264,6 +277,18 @@ IsToastNamespace(Oid namespaceId)
isTempToastNamespace(namespaceId);
}
+/*
+ * IsConflictNamespace
+ * True iff namespace is pg_conflict.
+ *
+ * Does not perform any catalog accesses.
+ */
+bool
+IsConflictNamespace(Oid namespaceId)
+{
+ return namespaceId == PG_CONFLICT_NAMESPACE;
+}
+
/*
* IsReservedName
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 606434823cf..10dadf378a4 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -314,7 +314,8 @@ heap_create(const char *relname,
*/
if (!allow_system_table_mods &&
((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
- IsToastNamespace(relnamespace)) &&
+ IsToastNamespace(relnamespace) ||
+ IsConflictNamespace(relnamespace)) &&
IsNormalProcessingMode())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index c3b79a2ba48..400292fd06b 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -3523,7 +3523,7 @@ LookupCreationNamespace(const char *nspname)
*
* We complain if either the old or new namespaces is a temporary schema
* (or temporary toast schema), or if either the old or new namespaces is the
- * TOAST schema.
+ * TOAST schema or the CONFLICT schema.
*/
void
CheckSetNamespace(Oid oldNspOid, Oid nspOid)
@@ -3539,6 +3539,12 @@ CheckSetNamespace(Oid oldNspOid, Oid nspOid)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move objects into or out of TOAST schema")));
+
+ /* similarly for CONFLICT schema */
+ if (nspOid == PG_CONFLICT_NAMESPACE || oldNspOid == PG_CONFLICT_NAMESPACE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot move objects into or out of CONFLICT schema")));
}
/*
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 9a4791c573e..a33c33efe0d 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -31,6 +31,7 @@
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/publicationcmds.h"
+#include "commands/subscriptioncmds.h"
#include "funcapi.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -85,6 +86,15 @@ check_publication_add_relation(Relation targetrel)
errmsg("cannot add relation \"%s\" to publication",
RelationGetRelationName(targetrel)),
errdetail("This operation is not supported for unlogged tables.")));
+
+ /* Can't be conflict log table */
+ if (IsConflictNamespace(RelationGetNamespace(targetrel)))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add relation \"%s.%s\" to publication",
+ get_namespace_name(RelationGetNamespace(targetrel)),
+ RelationGetRelationName(targetrel)),
+ errdetail("This operation is not supported for conflict log tables.")));
}
/*
@@ -95,7 +105,8 @@ static void
check_publication_add_schema(Oid schemaid)
{
/* Can't be system namespace */
- if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
+ if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid) ||
+ IsConflictNamespace(schemaid))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot add schema \"%s\" to publication",
@@ -139,6 +150,7 @@ is_publishable_class(Oid relid, Form_pg_class reltuple)
reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
reltuple->relkind == RELKIND_SEQUENCE) &&
!IsCatalogRelationOid(relid) &&
+ !IsConflictClass(reltuple) &&
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
relid >= FirstNormalObjectId;
}
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 2b103245290..285a598497d 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -106,6 +106,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->retaindeadtuples = subform->subretaindeadtuples;
sub->maxretention = subform->submaxretention;
sub->retentionactive = subform->subretentionactive;
+ sub->conflictlogrelid = subform->subconflictlogrelid;
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
@@ -141,6 +142,12 @@ GetSubscription(Oid subid, bool missing_ok)
Anum_pg_subscription_suborigin);
sub->origin = TextDatumGetCString(datum);
+ /* Get conflict log destination */
+ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subconflictlogdest);
+ sub->conflictlogdest = TextDatumGetCString(datum);
+
/* Is the subscription owner a superuser? */
sub->ownersuperuser = superuser_arg(sub->owner);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d6674f20fc2..de1977f81ef 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -15,25 +15,31 @@
#include "postgres.h"
#include "access/commit_ts.h"
+#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
+#include "catalog/heap.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
+#include "catalog/pg_am_d.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h"
+#include "catalog/pg_namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
+#include "commands/comment.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
+#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
@@ -51,6 +57,7 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
+#include "utils/regproc.h"
#include "utils/syscache.h"
/*
@@ -75,6 +82,7 @@
#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
#define SUBOPT_LSN 0x00010000
#define SUBOPT_ORIGIN 0x00020000
+#define SUBOPT_CONFLICT_LOG_DEST 0x00040000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -103,6 +111,7 @@ typedef struct SubOpts
bool retaindeadtuples;
int32 maxretention;
char *origin;
+ ConflictLogDest conflictlogdest;
XLogRecPtr lsn;
} SubOpts;
@@ -135,7 +144,7 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
static void CheckAlterSubOption(Subscription *sub, const char *option,
bool slot_needs_update, bool isTopLevel);
-
+static Oid create_conflict_log_table(Oid subid, char *subname);
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -191,6 +200,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->maxretention = 0;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+ if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST))
+ opts->conflictlogdest = CONFLICT_LOG_DEST_LOG;
/* Parse options */
foreach(lc, stmt_options)
@@ -402,6 +413,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_LSN;
opts->lsn = lsn;
}
+ else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST) &&
+ strcmp(defel->defname, "conflict_log_destination") == 0)
+ {
+ char *val;
+
+ if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_DEST))
+ errorConflictingDefElem(defel, pstate);
+
+ val = defGetString(defel);
+ opts->conflictlogdest = GetLogDestination(val);
+ opts->specified_opts |= SUBOPT_CONFLICT_LOG_DEST;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -599,6 +622,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
bits32 supported_opts;
SubOpts opts = {0};
AclResult aclresult;
+ Oid logrelid = InvalidOid;
/*
* Parse and check options.
@@ -612,7 +636,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_RETAIN_DEAD_TUPLES |
- SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
+ SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN |
+ SUBOPT_CONFLICT_LOG_DEST);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -747,6 +772,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_suborigin - 1] =
CStringGetTextDatum(opts.origin);
+ /* Always set the destination, default will be 'log'. */
+ values[Anum_pg_subscription_subconflictlogdest - 1] =
+ CStringGetTextDatum(ConflictLogDestNames[opts.conflictlogdest]);
+
+ /* If logging to a table is required, physically create the table. */
+ if (IsSet(opts.conflictlogdest, CONFLICT_LOG_DEST_TABLE))
+ logrelid = create_conflict_log_table(subid, stmt->subname);
+
+ /* Store table OID in the catalog. */
+ values[Anum_pg_subscription_subconflictlogrelid - 1] =
+ ObjectIdGetDatum(logrelid);
+
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
/* Insert tuple into catalog. */
@@ -1410,7 +1447,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_RETAIN_DEAD_TUPLES |
SUBOPT_MAX_RETENTION_DURATION |
- SUBOPT_ORIGIN);
+ SUBOPT_ORIGIN |
+ SUBOPT_CONFLICT_LOG_DEST);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1665,6 +1703,63 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
origin = opts.origin;
}
+ if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DEST))
+ {
+ ConflictLogDest old_dest =
+ GetLogDestination(sub->conflictlogdest);
+
+ if (opts.conflictlogdest != old_dest)
+ {
+ bool want_table = IsSet(opts.conflictlogdest,
+ CONFLICT_LOG_DEST_TABLE);
+ bool has_oldtable =
+ IsSet(old_dest, CONFLICT_LOG_DEST_TABLE);
+
+ values[Anum_pg_subscription_subconflictlogdest - 1] =
+ CStringGetTextDatum(ConflictLogDestNames[opts.conflictlogdest]);
+ replaces[Anum_pg_subscription_subconflictlogdest - 1] = true;
+
+ if (want_table && !has_oldtable)
+ {
+ Oid relid;
+
+ relid = create_conflict_log_table(subid, sub->name);
+
+ values[Anum_pg_subscription_subconflictlogrelid - 1] =
+ ObjectIdGetDatum(relid);
+ replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
+ true;
+ }
+ else if (!want_table && has_oldtable)
+ {
+ ObjectAddress object;
+
+ /*
+ * Conflict log tables are recorded as internal
+ * dependencies of the subscription. Drop the
+ * table if it is not required anymore to avoid
+ * stale or orphaned relations.
+ *
+ * XXX: At present, only conflict log tables are
+ * managed this way. In future if we introduce
+ * additional internal dependencies, we may need
+ * a targeted deletion to avoid deletion of any
+ * other objects.
+ */
+ ObjectAddressSet(object, SubscriptionRelationId,
+ subid);
+ performDeletion(&object, DROP_CASCADE,
+ PERFORM_DELETION_INTERNAL |
+ PERFORM_DELETION_SKIP_ORIGINAL);
+
+ values[Anum_pg_subscription_subconflictlogrelid - 1] =
+ ObjectIdGetDatum(InvalidOid);
+ replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
+ true;
+ }
+ }
+ }
+
update_tuple = true;
break;
}
@@ -2027,6 +2122,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
Form_pg_subscription form;
List *rstates;
bool must_use_password;
+ ObjectAddress object;
/*
* The launcher may concurrently start a new worker for this subscription.
@@ -2184,6 +2280,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+ /*
+ * Conflict log tables are recorded as internal dependencies of the
+ * subscription. We must drop the dependent objects before the
+ * subscription itself is removed. By using
+ * PERFORM_DELETION_SKIP_ORIGINAL, we ensure that only the conflict log
+ * table is reaped while the subscription remains for the final deletion
+ * step.
+ */
+ ObjectAddressSet(object, SubscriptionRelationId, subid);
+ performDeletion(&object, DROP_CASCADE,
+ PERFORM_DELETION_INTERNAL |
+ PERFORM_DELETION_SKIP_ORIGINAL);
+
/* Remove any associated relation synchronization states. */
RemoveSubscriptionRel(subid, InvalidOid);
@@ -3190,3 +3299,127 @@ defGetStreamingMode(DefElem *def)
def->defname)));
return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
}
+
+/*
+ * Builds the TupleDesc for the conflict log table.
+ */
+static TupleDesc
+create_conflict_log_table_tupdesc(void)
+{
+ TupleDesc tupdesc;
+
+ tupdesc = CreateTemplateTupleDesc(MAX_CONFLICT_ATTR_NUM);
+
+ for (int i = 0; i < MAX_CONFLICT_ATTR_NUM; i++)
+ {
+ Oid type_oid = ConflictLogSchema[i].atttypid;
+
+ /*
+ * Special handling for the JSON array type for proper
+ * TupleDescInitEntry call.
+ */
+ if (type_oid == JSONARRAYOID)
+ type_oid = get_array_type(JSONOID);
+
+ TupleDescInitEntry(tupdesc, i + 1,
+ ConflictLogSchema[i].attname,
+ type_oid,
+ -1, 0);
+ }
+
+ return BlessTupleDesc(tupdesc);
+}
+
+/*
+ * Create a structured conflict log table for a subscription.
+ *
+ * The table is created within the system-managed 'pg_conflict' namespace.
+ * The table name is generated automatically using the subscription's OID
+ * (e.g., "pg_conflict_<subid>") to ensure uniqueness within the cluster and
+ * to avoid collisions during subscription renames.
+ */
+static Oid
+create_conflict_log_table(Oid subid, char *subname)
+{
+ TupleDesc tupdesc;
+ Oid relid;
+ ObjectAddress myself;
+ ObjectAddress subaddr;
+ char relname[NAMEDATALEN];
+
+ snprintf(relname, NAMEDATALEN, "pg_conflict_%u", subid);
+
+ /* There can not be an existing table with the same name. */
+ Assert(!OidIsValid(get_relname_relid(relname, PG_CONFLICT_NAMESPACE)));
+
+ /* Build the tuple descriptor for the new table. */
+ tupdesc = create_conflict_log_table_tupdesc();
+
+ /* Create conflict log table. */
+ relid = heap_create_with_catalog(relname,
+ PG_CONFLICT_NAMESPACE,
+ 0, /* tablespace */
+ InvalidOid, /* relid */
+ InvalidOid, /* reltypeid */
+ InvalidOid, /* reloftypeid */
+ GetUserId(),
+ HEAP_TABLE_AM_OID,
+ tupdesc,
+ NIL,
+ RELKIND_RELATION,
+ RELPERSISTENCE_PERMANENT,
+ false, /* shared_relation */
+ false, /* mapped_relation */
+ ONCOMMIT_NOOP,
+ (Datum) 0, /* reloptions */
+ false, /* use_user_acl */
+ true, /* allow_system_table_mods */
+ true, /* is_internal */
+ InvalidOid, /* relrewrite */
+ NULL); /* typaddress */
+
+ /*
+ * Establish an internal dependency between the conflict log table and
+ * the subscription.
+ *
+ * We use DEPENDENCY_INTERNAL to signify that the table's lifecycle is
+ * strictly tied to the subscription, similar to how a TOAST table relates
+ * to its main table or a sequence relates to an identity column.
+ *
+ * This ensures the conflict log table is automatically reaped during a
+ * DROP SUBSCRIPTION via performDeletion().
+ */
+ ObjectAddressSet(myself, RelationRelationId, relid);
+ ObjectAddressSet(subaddr, SubscriptionRelationId, subid);
+ recordDependencyOn(&myself, &subaddr, DEPENDENCY_INTERNAL);
+
+ /* Release tuple descriptor memory. */
+ FreeTupleDesc(tupdesc);
+
+ return relid;
+}
+
+/*
+ * GetLogDestination
+ *
+ * Convert string to enum by comparing against standardized labels.
+ */
+ConflictLogDest
+GetLogDestination(const char *dest)
+{
+ /* Empty string or NULL defaults to LOG. */
+ if (dest == NULL || dest[0] == '\0' || pg_strcasecmp(dest, "log") == 0)
+ return CONFLICT_LOG_DEST_LOG;
+
+ if (pg_strcasecmp(dest, "table") == 0)
+ return CONFLICT_LOG_DEST_TABLE;
+
+ if (pg_strcasecmp(dest, "all") == 0)
+ return CONFLICT_LOG_DEST_ALL;
+
+ /* Unrecognized string. */
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized conflict_log_destination value: \"%s\"", dest),
+ errhint("Valid values are \"log\", \"table\", and \"all\".")));
+}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index f976c0e5c7e..4bf39f359d7 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -2398,9 +2398,11 @@ truncate_check_rel(Oid relid, Form_pg_class reltuple)
* pg_largeobject and pg_largeobject_metadata to be truncated as part of
* pg_upgrade, because we need to change its relfilenode to match the old
* cluster, and allowing a TRUNCATE command to be executed is the easiest
- * way of doing that.
+ * way of doing that. We also allow TRUNCATE on the conflict log tables,
+ * to permit users to manually prune these logs to manage disk space.
*/
- if (!allowSystemTableMods && IsSystemClass(relid, reltuple)
+ if (!allowSystemTableMods && IsSystemClass(relid, reltuple) &&
+ !IsConflictClass(reltuple)
&& (!IsBinaryUpgrade ||
(relid != LargeObjectRelationId &&
relid != LargeObjectMetadataRelationId)))
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index bfd3ebc601e..971b75a596f 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1170,6 +1170,21 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation,
RelationGetRelationName(resultRel))));
break;
}
+
+ /*
+ * Conflict log tables are managed by the system to record logical
+ * replication conflicts. We allow DELETE to permit users to manually
+ * prune or truncate these logs, but manual data insertion or modification
+ * (INSERT, UPDATE, MERGE) is prohibited to maintain the integrity of the
+ * system-generated logs.
+ */
+ if (IsConflictNamespace(RelationGetNamespace(resultRel)) &&
+ operation != CMD_DELETE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("cannot modify or insert data for conflict log table \"%s\"",
+ RelationGetRelationName(resultRel)),
+ errdetail("Conflict log tables are system-managed and only support cleanup via DELETE or TRUNCATE.")));
}
/*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 3584c4e1428..21cfe390430 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6806,7 +6806,7 @@ describeSubscriptions(const char *pattern, bool verbose)
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
false, false, false, false, false, false, false, false, false, false,
- false, false, false, false};
+ false, false, false, false, false, false};
if (pset.sversion < 100000)
{
@@ -6900,6 +6900,20 @@ describeSubscriptions(const char *pattern, bool verbose)
appendPQExpBuffer(&buf,
", subskiplsn AS \"%s\"\n",
gettext_noop("Skip LSN"));
+
+ /* Conflict log destination is supported in v19 and higher */
+ if (pset.sversion >= 190000)
+ {
+ appendPQExpBuffer(&buf,
+ ", subconflictlogdest AS \"%s\"\n",
+ gettext_noop("Conflict log destination"));
+
+ appendPQExpBuffer(&buf,
+ ", (CASE WHEN subconflictlogdest IN ('table', 'all') "
+ " THEN 'pg_conflict_' || oid "
+ " ELSE '-' END) AS \"%s\"\n",
+ gettext_noop("Conflict log table"));
+ }
}
/* Only display subscriptions in current database. */
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 8b91bc00062..12eee8a0d43 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2344,8 +2344,8 @@ match_previous_words(int pattern_id,
COMPLETE_WITH("(", "PUBLICATION");
/* ALTER SUBSCRIPTION <name> SET ( */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "("))
- COMPLETE_WITH("binary", "disable_on_error", "failover",
- "max_retention_duration", "origin",
+ COMPLETE_WITH("binary", "conflict_log_destination", "disable_on_error",
+ "failover", "max_retention_duration", "origin",
"password_required", "retain_dead_tuples",
"run_as_owner", "slot_name", "streaming",
"synchronous_commit", "two_phase");
@@ -3860,8 +3860,8 @@ match_previous_words(int pattern_id,
COMPLETE_WITH("WITH (");
/* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */
else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "("))
- COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
- "disable_on_error", "enabled", "failover",
+ COMPLETE_WITH("binary", "conflict_log_destination", "connect", "copy_data",
+ "create_slot", "disable_on_error", "enabled", "failover",
"max_retention_duration", "origin",
"password_required", "retain_dead_tuples",
"run_as_owner", "slot_name", "streaming",
diff --git a/src/include/catalog/catalog.h b/src/include/catalog/catalog.h
index a9d6e8ea986..8193229f2e2 100644
--- a/src/include/catalog/catalog.h
+++ b/src/include/catalog/catalog.h
@@ -25,6 +25,7 @@ extern bool IsInplaceUpdateRelation(Relation relation);
extern bool IsSystemClass(Oid relid, Form_pg_class reltuple);
extern bool IsToastClass(Form_pg_class reltuple);
+extern bool IsConflictClass(Form_pg_class reltuple);
extern bool IsCatalogRelationOid(Oid relid);
extern bool IsCatalogTextUniqueIndexOid(Oid relid);
@@ -32,6 +33,7 @@ extern bool IsInplaceUpdateOid(Oid relid);
extern bool IsCatalogNamespace(Oid namespaceId);
extern bool IsToastNamespace(Oid namespaceId);
+extern bool IsConflictNamespace(Oid namespaceId);
extern bool IsReservedName(const char *name);
diff --git a/src/include/catalog/pg_namespace.dat b/src/include/catalog/pg_namespace.dat
index 3075e142c73..b45cb9383a8 100644
--- a/src/include/catalog/pg_namespace.dat
+++ b/src/include/catalog/pg_namespace.dat
@@ -18,6 +18,9 @@
{ oid => '99', oid_symbol => 'PG_TOAST_NAMESPACE',
descr => 'reserved schema for TOAST tables',
nspname => 'pg_toast', nspacl => '_null_' },
+{ oid => '1382', oid_symbol => 'PG_CONFLICT_NAMESPACE',
+ descr => 'reserved schema for subscription-specific conflict log tables',
+ nspname => 'pg_conflict', nspacl => '_null_' },
# update dumpNamespace() if changing this descr
{ oid => '2200', oid_symbol => 'PG_PUBLIC_NAMESPACE',
descr => 'standard public schema',
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index f3571d2bfcf..76a4638b389 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -90,6 +90,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
* exceeded max_retention_duration, when
* defined */
+ Oid subconflictlogrelid; /* Relid of the conflict log table. */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +104,14 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
/* List of publications subscribed to */
text subpublications[1] BKI_FORCE_NOT_NULL;
+ /*
+ * Strategy for logging replication conflicts:
+ * 'log' - server log only,
+ * 'table' - conflict log table only,
+ * 'all' - both log and table.
+ */
+ text subconflictlogdest;
+
/* Only publish data originating from the specified origin */
text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
#endif
@@ -152,12 +161,14 @@ typedef struct Subscription
* and the retention duration has not
* exceeded max_retention_duration, when
* defined */
+ Oid conflictlogrelid; /* conflict log table Oid */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
List *publications; /* List of publication names to subscribe to */
char *origin; /* Only publish data originating from the
* specified origin */
+ char *conflictlogdest; /* Conflict log destination */
} Subscription;
#ifdef EXPOSE_TO_CLIENT_CODE
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 63504232a14..a895127f8fe 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -17,6 +17,7 @@
#include "catalog/objectaddress.h"
#include "parser/parse_node.h"
+#include "replication/conflict.h"
extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
bool isTopLevel);
@@ -36,4 +37,6 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
bool retention_active,
bool max_retention_set);
+extern ConflictLogDest GetLogDestination(const char *dest);
+
#endif /* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index d538274637f..4e4f59bb453 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -10,6 +10,7 @@
#define CONFLICT_H
#include "access/xlogdefs.h"
+#include "catalog/pg_type.h"
#include "nodes/pg_list.h"
#include "utils/timestamp.h"
@@ -79,6 +80,60 @@ typedef struct ConflictTupleInfo
* conflicting local row occurred */
} ConflictTupleInfo;
+/*
+ * Conflict log destination types.
+ *
+ * These values are defined as bitmask flags to allow for multiple simultaneous
+ * logging destinations (e.g., logging to both system logs and a table).
+ * Internally, we use these for bitwise comparisons (IsSet), but the string
+ * representation is stored in pg_subscription.subconflictlogdest.
+ */
+typedef enum ConflictLogDest
+{
+ /* Log conflicts to the server logs */
+ CONFLICT_LOG_DEST_LOG = 1 << 0, /* 0x01 */
+
+ /* Log conflicts to an internally managed conflict log table */
+ CONFLICT_LOG_DEST_TABLE = 1 << 1, /* 0x02 */
+
+ /* Convenience bitmask for all supported destinations */
+ CONFLICT_LOG_DEST_ALL = (CONFLICT_LOG_DEST_LOG | CONFLICT_LOG_DEST_TABLE)
+} ConflictLogDest;
+
+/*
+ * Array mapping for converting internal enum to string.
+ */
+static const char *const ConflictLogDestNames[] = {
+ [CONFLICT_LOG_DEST_LOG] = "log",
+ [CONFLICT_LOG_DEST_TABLE] = "table",
+ [CONFLICT_LOG_DEST_ALL] = "all"
+};
+
+/* Structure to hold metadata for one column of the conflict log table */
+typedef struct ConflictLogColumnDef
+{
+ const char *attname; /* Column name */
+ Oid atttypid; /* Data type OID */
+} ConflictLogColumnDef;
+
+/* The single source of truth for the conflict log table schema */
+static const ConflictLogColumnDef ConflictLogSchema[] =
+{
+ { .attname = "relid", .atttypid = OIDOID },
+ { .attname = "schemaname", .atttypid = TEXTOID },
+ { .attname = "relname", .atttypid = TEXTOID },
+ { .attname = "conflict_type", .atttypid = TEXTOID },
+ { .attname = "remote_xid", .atttypid = XIDOID },
+ { .attname = "remote_commit_lsn",.atttypid = LSNOID },
+ { .attname = "remote_commit_ts", .atttypid = TIMESTAMPTZOID },
+ { .attname = "remote_origin", .atttypid = TEXTOID },
+ { .attname = "replica_identity", .atttypid = JSONOID },
+ { .attname = "remote_tuple", .atttypid = JSONOID },
+ { .attname = "local_conflicts", .atttypid = JSONARRAYOID }
+};
+
+#define MAX_CONFLICT_ATTR_NUM lengthof(ConflictLogSchema)
+
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
RepOriginId *localorigin,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index b3eccd8afe3..b94bcc3cc23 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 | log | -
(1 row)
-- ok - with lsn = NONE
@@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log | -
(1 row)
BEGIN;
@@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------+--------------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 | log | -
(1 row)
-- rename back to keep the rest simple
@@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
-- fail - publication already exists
@@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
-- fail - publication used more than once
@@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
-- we can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -433,10 +433,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -450,19 +450,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
-- ok
ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination | Conflict log table
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------+--------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log | -
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -517,6 +517,274 @@ COMMIT;
-- ok, owning it is enough for this stuff
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+--
+-- CONFLICT LOG DESTINATION TESTS
+--
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+-- fail - unrecognized parameter value
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid');
+ERROR: unrecognized conflict_log_destination value: "invalid"
+HINT: Valid values are "log", "table", and "all".
+-- verify subconflictlogdest is 'log' and relid is 0 (InvalidOid) for default case
+CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+SELECT subname, subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_log_default';
+ subname | subconflictlogdest | subconflictlogrelid
+------------------------------+--------------------+---------------------
+ regress_conflict_log_default | log | 0
+(1 row)
+
+-- verify empty string defaults to 'log'
+CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = '');
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+SELECT subname, subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_empty_str';
+ subname | subconflictlogdest | subconflictlogrelid
+----------------------------+--------------------+---------------------
+ regress_conflict_empty_str | log | 0
+(1 row)
+
+-- this should generate an internal conflict log table named pg_conflict_$subid$
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table');
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+-- check metadata in pg_subscription: destination should be 'table' and relid valid
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+ subname | subconflictlogdest | has_relid
+------------------------+--------------------+-----------
+ regress_conflict_test1 | table | t
+(1 row)
+
+-- verify the physical table exists, its OID matches subconflictlogrelid,
+-- and it is located in the 'pg_conflict' namespace
+SELECT n.nspname, (c.oid = s.subconflictlogrelid) AS "oid_matches"
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+JOIN pg_namespace n ON c.relnamespace = n.oid
+WHERE s.subname = 'regress_conflict_test1';
+ nspname | oid_matches
+-------------+-------------
+ pg_conflict | t
+(1 row)
+
+-- check if the conflict log table has the correct schema
+SELECT a.attnum, a.attname
+FROM pg_attribute a
+JOIN pg_class c ON a.attrelid = c.oid
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0
+ ORDER BY a.attnum;
+ attnum | attname
+--------+-------------------
+ 1 | relid
+ 2 | schemaname
+ 3 | relname
+ 4 | conflict_type
+ 5 | remote_xid
+ 6 | remote_commit_lsn
+ 7 | remote_commit_ts
+ 8 | remote_origin
+ 9 | replica_identity
+ 10 | remote_tuple
+ 11 | local_conflicts
+(11 rows)
+
+--
+-- ALTER SUBSCRIPTION - conflict_log_destination state transitions
+--
+-- These tests verify the transition logic between different logging
+-- destinations, ensuring conflict log tables are created or dropped as
+-- expected
+--
+-- transition from 'log' to 'all'
+-- a new internal conflict log table should be created
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log');
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all');
+-- verify metadata after ALTER (destination should be 'all')
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+ subname | subconflictlogdest | has_relid
+------------------------+--------------------+-----------
+ regress_conflict_test2 | all | t
+(1 row)
+
+-- transition from 'all' to 'table'
+-- should NOT drop the table, only change destination string
+SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table');
+SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+ subconflictlogdest | relid_unchanged
+--------------------+-----------------
+ table | t
+(1 row)
+
+-- transition from 'table' to 'log'
+-- should drop the table and clear relid
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log');
+SELECT subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+ subconflictlogdest | subconflictlogrelid
+--------------------+---------------------
+ log | 0
+(1 row)
+
+-- verify the physical table is gone
+SELECT count(*)
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test2';
+ count
+-------
+ 0
+(1 row)
+
+--
+-- PUBLICATION: Verify conflict log tables are not publishable
+--
+-- pg_relation_is_publishable should return false for internal conflict log
+-- tables to prevent them from being accidentally included in publications
+--
+SELECT n.nspname, pg_relation_is_publishable(c.oid)
+FROM pg_class c
+JOIN pg_namespace n ON c.relnamespace = n.oid
+JOIN pg_subscription s ON s.subconflictlogrelid = c.oid
+WHERE s.subname = 'regress_conflict_test1';
+ nspname | pg_relation_is_publishable
+-------------+----------------------------
+ pg_conflict | f
+(1 row)
+
+--
+-- Table Protection and Lifecycle Management
+--
+-- These tests verify that:
+-- Manual DROP TABLE is disallowed
+-- DROP SUBSCRIPTION automatically reaps the table
+--
+-- re-enable table logging for verification
+ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table');
+-- We use a DO block with dynamic SQL because the internal conflict log table
+-- name contains the subscription OID, which is non-deterministic. This
+-- approach allows us to attempt the DROP and capture the expected error
+-- without hard-coding a specific OID in the expected output
+-- fail - drop table not allowed due to internal dependency
+DO $$
+BEGIN
+ EXECUTE 'DROP TABLE ' || (SELECT 'pg_conflict.pg_conflict_' || oid FROM pg_subscription WHERE subname = 'regress_conflict_test1');
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege';
+END $$;
+NOTICE: captured expected error: insufficient_privilege
+-- CLEANUP: DROP SUBSCRIPTION reaps the table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+-- Verify the table OID for reap check
+SELECT 'pg_conflict_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset
+DROP SUBSCRIPTION regress_conflict_test1;
+-- should return NULL, meaning the conflict log table was reaped via dependency
+SELECT to_regclass(:'internal_tablename');
+ to_regclass
+-------------
+
+(1 row)
+
+--
+-- Additional Namespace and Table Protection Tests
+--
+-- Setup: Ensure we have a subscription with a conflict log table
+CREATE SUBSCRIPTION regress_conflict_protection_test CONNECTION 'dbname=regress_doesnotexist'
+ PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table');
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+-- Trying to ALTER the internal conflict log table
+-- This should fail because the table is system-managed
+-- As mentioned in previous test cases, we use a DO block to hide dynamic OIDs
+DO $$
+DECLARE
+ tab_name text;
+BEGIN
+ SELECT 'pg_conflict.' || relname INTO tab_name
+ FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+ WHERE s.subname = 'regress_conflict_protection_test';
+
+ RAISE NOTICE 'Attempting ALTER TABLE on internal conflict log table';
+ EXECUTE 'ALTER TABLE ' || tab_name || ' ADD COLUMN extra_info text';
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege during ALTER';
+END $$;
+NOTICE: Attempting ALTER TABLE on internal conflict log table
+NOTICE: captured expected error: insufficient_privilege during ALTER
+-- Test Manual INSERT on conflict log table
+-- This should fail because the table is system-managed
+-- Hiding the OID in the error message by catching the exception
+DO $$
+DECLARE
+ tab_name text;
+BEGIN
+ SELECT 'pg_conflict.' || relname INTO tab_name
+ FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+ WHERE s.subname = 'regress_conflict_protection_test';
+
+ EXECUTE 'INSERT INTO ' || tab_name || ' (relname) VALUES (''mytest'')';
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege during INSERT';
+END $$;
+NOTICE: captured expected error: insufficient_privilege during INSERT
+-- Test Manual UPDATE on conflict log table
+-- This should fail because the table is system-managed
+-- Hiding the OID in the error message by catching the exception
+DO $$
+DECLARE
+ tab_name text;
+BEGIN
+ SELECT 'pg_conflict.' || relname INTO tab_name
+ FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+ WHERE s.subname = 'regress_conflict_protection_test';
+
+ EXECUTE 'UPDATE ' || tab_name || ' SET relname = ''mytest'' ';
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege during UPDATE';
+END $$;
+NOTICE: captured expected error: insufficient_privilege during UPDATE
+-- Trying to perform TRUNCATE/DELETE on the internal conflict log table
+-- This should be allowed so that user can perform cleanup
+SELECT 'pg_conflict.' || relname AS conflict_tab
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_protection_test' \gset
+TRUNCATE :conflict_tab;
+DELETE FROM :conflict_tab;
+-- Trying to create a new table manually in the pg_conflict namespace
+-- This should fail as the namespace is reserved for conflict logs tables
+CREATE TABLE pg_conflict.manual_table (id int);
+ERROR: permission denied to create "pg_conflict.manual_table"
+DETAIL: System catalog modifications are currently disallowed.
+-- Moving an existing table into the pg_conflict namespace
+-- Users should not be able to move their own tables within this namespace
+CREATE TABLE public.test_move (id int);
+ALTER TABLE public.test_move SET SCHEMA pg_conflict;
+ERROR: cannot move objects into or out of CONFLICT schema
+DROP TABLE public.test_move;
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_log_default DISABLE;
+ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_log_default;
+ALTER SUBSCRIPTION regress_conflict_empty_str DISABLE;
+ALTER SUBSCRIPTION regress_conflict_empty_str SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_empty_str;
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
+ALTER SUBSCRIPTION regress_conflict_protection_test DISABLE;
+ALTER SUBSCRIPTION regress_conflict_protection_test SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_protection_test;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index ef0c298d2df..d2934478392 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -365,6 +365,223 @@ COMMIT;
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+--
+-- CONFLICT LOG DESTINATION TESTS
+--
+
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+
+-- fail - unrecognized parameter value
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid');
+
+-- verify subconflictlogdest is 'log' and relid is 0 (InvalidOid) for default case
+CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+SELECT subname, subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_log_default';
+
+-- verify empty string defaults to 'log'
+CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = '');
+SELECT subname, subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_empty_str';
+
+-- this should generate an internal conflict log table named pg_conflict_$subid$
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table');
+
+-- check metadata in pg_subscription: destination should be 'table' and relid valid
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+
+-- verify the physical table exists, its OID matches subconflictlogrelid,
+-- and it is located in the 'pg_conflict' namespace
+SELECT n.nspname, (c.oid = s.subconflictlogrelid) AS "oid_matches"
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+JOIN pg_namespace n ON c.relnamespace = n.oid
+WHERE s.subname = 'regress_conflict_test1';
+
+-- check if the conflict log table has the correct schema
+SELECT a.attnum, a.attname
+FROM pg_attribute a
+JOIN pg_class c ON a.attrelid = c.oid
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0
+ ORDER BY a.attnum;
+
+--
+-- ALTER SUBSCRIPTION - conflict_log_destination state transitions
+--
+-- These tests verify the transition logic between different logging
+-- destinations, ensuring conflict log tables are created or dropped as
+-- expected
+--
+-- transition from 'log' to 'all'
+-- a new internal conflict log table should be created
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log');
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all');
+
+-- verify metadata after ALTER (destination should be 'all')
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- transition from 'all' to 'table'
+-- should NOT drop the table, only change destination string
+SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table');
+SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- transition from 'table' to 'log'
+-- should drop the table and clear relid
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log');
+SELECT subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- verify the physical table is gone
+SELECT count(*)
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test2';
+
+--
+-- PUBLICATION: Verify conflict log tables are not publishable
+--
+-- pg_relation_is_publishable should return false for internal conflict log
+-- tables to prevent them from being accidentally included in publications
+--
+SELECT n.nspname, pg_relation_is_publishable(c.oid)
+FROM pg_class c
+JOIN pg_namespace n ON c.relnamespace = n.oid
+JOIN pg_subscription s ON s.subconflictlogrelid = c.oid
+WHERE s.subname = 'regress_conflict_test1';
+
+--
+-- Table Protection and Lifecycle Management
+--
+-- These tests verify that:
+-- Manual DROP TABLE is disallowed
+-- DROP SUBSCRIPTION automatically reaps the table
+--
+-- re-enable table logging for verification
+ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table');
+
+-- We use a DO block with dynamic SQL because the internal conflict log table
+-- name contains the subscription OID, which is non-deterministic. This
+-- approach allows us to attempt the DROP and capture the expected error
+-- without hard-coding a specific OID in the expected output
+
+-- fail - drop table not allowed due to internal dependency
+DO $$
+BEGIN
+ EXECUTE 'DROP TABLE ' || (SELECT 'pg_conflict.pg_conflict_' || oid FROM pg_subscription WHERE subname = 'regress_conflict_test1');
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege';
+END $$;
+
+-- CLEANUP: DROP SUBSCRIPTION reaps the table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+
+-- Verify the table OID for reap check
+SELECT 'pg_conflict_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset
+
+DROP SUBSCRIPTION regress_conflict_test1;
+
+-- should return NULL, meaning the conflict log table was reaped via dependency
+SELECT to_regclass(:'internal_tablename');
+
+--
+-- Additional Namespace and Table Protection Tests
+--
+
+-- Setup: Ensure we have a subscription with a conflict log table
+CREATE SUBSCRIPTION regress_conflict_protection_test CONNECTION 'dbname=regress_doesnotexist'
+ PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table');
+
+-- Trying to ALTER the internal conflict log table
+-- This should fail because the table is system-managed
+-- As mentioned in previous test cases, we use a DO block to hide dynamic OIDs
+DO $$
+DECLARE
+ tab_name text;
+BEGIN
+ SELECT 'pg_conflict.' || relname INTO tab_name
+ FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+ WHERE s.subname = 'regress_conflict_protection_test';
+
+ RAISE NOTICE 'Attempting ALTER TABLE on internal conflict log table';
+ EXECUTE 'ALTER TABLE ' || tab_name || ' ADD COLUMN extra_info text';
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege during ALTER';
+END $$;
+
+-- Test Manual INSERT on conflict log table
+-- This should fail because the table is system-managed
+-- Hiding the OID in the error message by catching the exception
+DO $$
+DECLARE
+ tab_name text;
+BEGIN
+ SELECT 'pg_conflict.' || relname INTO tab_name
+ FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+ WHERE s.subname = 'regress_conflict_protection_test';
+
+ EXECUTE 'INSERT INTO ' || tab_name || ' (relname) VALUES (''mytest'')';
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege during INSERT';
+END $$;
+
+-- Test Manual UPDATE on conflict log table
+-- This should fail because the table is system-managed
+-- Hiding the OID in the error message by catching the exception
+DO $$
+DECLARE
+ tab_name text;
+BEGIN
+ SELECT 'pg_conflict.' || relname INTO tab_name
+ FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+ WHERE s.subname = 'regress_conflict_protection_test';
+
+ EXECUTE 'UPDATE ' || tab_name || ' SET relname = ''mytest'' ';
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege during UPDATE';
+END $$;
+
+-- Trying to perform TRUNCATE/DELETE on the internal conflict log table
+-- This should be allowed so that user can perform cleanup
+SELECT 'pg_conflict.' || relname AS conflict_tab
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_protection_test' \gset
+TRUNCATE :conflict_tab;
+DELETE FROM :conflict_tab;
+
+-- Trying to create a new table manually in the pg_conflict namespace
+-- This should fail as the namespace is reserved for conflict logs tables
+CREATE TABLE pg_conflict.manual_table (id int);
+
+-- Moving an existing table into the pg_conflict namespace
+-- Users should not be able to move their own tables within this namespace
+CREATE TABLE public.test_move (id int);
+ALTER TABLE public.test_move SET SCHEMA pg_conflict;
+DROP TABLE public.test_move;
+
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_log_default DISABLE;
+ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_log_default;
+
+ALTER SUBSCRIPTION regress_conflict_empty_str DISABLE;
+ALTER SUBSCRIPTION regress_conflict_empty_str SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_empty_str;
+
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
+
+ALTER SUBSCRIPTION regress_conflict_protection_test DISABLE;
+ALTER SUBSCRIPTION regress_conflict_protection_test SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_protection_test;
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 3f3a888fd0e..e11873ab931 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -502,6 +502,8 @@ ConditionVariableMinimallyPadded
ConditionalStack
ConfigData
ConfigVariable
+ConflictLogColumnDef
+ConflictLogDest
ConflictTupleInfo
ConflictType
ConnCacheEntry
--
2.49.0
[application/octet-stream] v23-0002-Implement-the-conflict-insertion-infrastructure-.patch (28.6K, 3-v23-0002-Implement-the-conflict-insertion-infrastructure-.patch)
download | inline diff:
From 1d5b648a36087044f377e99eb091ac181b1baa75 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Mon, 12 Jan 2026 20:21:03 +0530
Subject: [PATCH v23 2/3] Implement the conflict insertion infrastructure for
the conflict log table
This patch introduces the core logic to populate the conflict log table whenever
a logical replication conflict is detected. It captures the remote transaction
details along with the corresponding local state at the time of the conflict.
Handling Multi-row Conflicts: A single remote tuple may conflict with multiple
local tuples (e.g., in the case of multiple_unique_conflicts). To handle this,
the infrastructure creates a single row in the conflict log table for each
remote tuple. The details of all conflicting local rows are aggregated into a
single JSON array in the local_conflicts column.
The JSON array uses the following structured format:
[ { "xid": "1001", "commit_ts": "2025-12-25 10:00:00+05:30", "origin": "node_1",
"key": {"id": 1}, "tuple": {"id": 1, "val": "old_data"} }, ... ]
Example of querying the structured conflict data:
SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid,
local_conflicts[1] ->> 'tuple' AS local_tuple
FROM myschema.conflict_log_history2;
remote_xid | relname | remote_origin | local_xid | local_tuple
------------+----------+---------------+-----------+---------------------
760 | test | pg_16406 | 771 | {"a":1,"b":10}
765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2}
---
src/backend/replication/logical/conflict.c | 561 +++++++++++++++++++--
src/backend/replication/logical/launcher.c | 1 +
src/backend/replication/logical/worker.c | 31 +-
src/include/replication/conflict.h | 3 +
src/include/replication/worker_internal.h | 7 +
src/test/subscription/t/035_conflicts.pl | 47 +-
6 files changed, 605 insertions(+), 45 deletions(-)
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 93222ee3b88..99926036825 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,21 @@
#include "postgres.h"
#include "access/commit_ts.h"
+#include "access/heapam.h"
#include "access/tableam.h"
+#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
+#include "funcapi.h"
#include "pgstat.h"
#include "replication/conflict.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/json.h"
static const char *const ConflictTypeNames[] = {
[CT_INSERT_EXISTS] = "insert_exists",
@@ -34,6 +42,18 @@ static const char *const ConflictTypeNames[] = {
[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
+/* Schema for the elements within the 'local_conflicts' JSON array */
+static const ConflictLogColumnDef LocalConflictSchema[] =
+{
+ { .attname = "xid", .atttypid = XIDOID },
+ { .attname = "commit_ts", .atttypid = TIMESTAMPTZOID },
+ { .attname = "origin", .atttypid = TEXTOID },
+ { .attname = "key", .atttypid = JSONOID },
+ { .attname = "tuple", .atttypid = JSONOID }
+};
+
+#define MAX_LOCAL_CONFLICT_INFO_ATTRS lengthof(LocalConflictSchema)
+
static int errcode_apply_conflict(ConflictType type);
static void errdetail_apply_conflict(EState *estate,
ResultRelInfo *relinfo,
@@ -50,8 +70,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *localslot,
TupleTableSlot *remoteslot,
Oid indexoid);
+static void build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull);
static char *build_index_value_desc(EState *estate, Relation localrel,
TupleTableSlot *slot, Oid indexoid);
+static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot);
+static Datum tuple_table_slot_to_indextup_json(EState *estate,
+ Relation localrel,
+ Oid replica_index,
+ TupleTableSlot *slot);
+static TupleDesc build_conflict_tupledesc(void);
+static Datum build_local_conflicts_json_array(EState *estate, Relation rel,
+ ConflictType conflict_type,
+ List *conflicttuples);
+static void prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ ConflictType conflict_type,
+ TupleTableSlot *searchslot,
+ List *conflicttuples,
+ TupleTableSlot *remoteslot);
/*
* Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -105,30 +144,90 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *remoteslot, List *conflicttuples)
{
- Relation localrel = relinfo->ri_RelationDesc;
- StringInfoData err_detail;
+ Relation localrel = relinfo->ri_RelationDesc;
+ ConflictLogDest dest;
+ Relation conflictlogrel;
+ bool log_dest_clt;
+ bool log_dest_logfile;
- initStringInfo(&err_detail);
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
- /* Form errdetail message by combining conflicting tuples information. */
- foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
- errdetail_apply_conflict(estate, relinfo, type, searchslot,
- conflicttuple->slot, remoteslot,
- conflicttuple->indexoid,
- conflicttuple->xmin,
- conflicttuple->origin,
- conflicttuple->ts,
- &err_detail);
+ /*
+ * Get the conflict log destination. Also, (if there is one) return the
+ * CLT relation already opened and ready for insertion.
+ */
+ conflictlogrel = GetConflictLogDestAndTable(&dest);
- pgstat_report_subscription_conflict(MySubscription->oid, type);
+ log_dest_clt = ((dest & CONFLICT_LOG_DEST_TABLE) != 0);
+ log_dest_logfile = ((dest & CONFLICT_LOG_DEST_LOG) != 0);
+
+ /* Insert to table if requested. */
+ if (log_dest_clt)
+ {
+ Assert(conflictlogrel != NULL);
+
+ /*
+ * Prepare the conflict log tuple. If the error level is below ERROR,
+ * insert it immediately. Otherwise, defer the insertion to a new
+ * transaction after the current one aborts, ensuring the insertion of
+ * the log tuple is not rolled back.
+ */
+ prepare_conflict_log_tuple(estate,
+ relinfo->ri_RelationDesc,
+ conflictlogrel,
+ type,
+ searchslot,
+ conflicttuples,
+ remoteslot);
+ if (elevel < ERROR)
+ InsertConflictLogTuple(conflictlogrel);
+
+ if (!log_dest_logfile)
+ {
+ /*
+ * Not logging conflict details to the server log; Report the error
+ * msg but omit raw tuple data from server logs since it's already
+ * captured in the conflict log table.
+ */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail("Conflict details are logged to the conflict log table: %s",
+ RelationGetRelationName(conflictlogrel)));
+ }
+
+ table_close(conflictlogrel, RowExclusiveLock);
+ }
- ereport(elevel,
- errcode_apply_conflict(type),
- errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
- get_namespace_name(RelationGetNamespace(localrel)),
- RelationGetRelationName(localrel),
- ConflictTypeNames[type]),
- errdetail_internal("%s", err_detail.data));
+ /* Log into the server log if requested. */
+ if (log_dest_logfile)
+ {
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
+
+ /* Form errdetail message by combining conflicting tuples information. */
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ conflicttuple->slot, remoteslot,
+ conflicttuple->indexoid,
+ conflicttuple->xmin,
+ conflicttuple->origin,
+ conflicttuple->ts,
+ &err_detail);
+
+ /* Standard reporting with full internal details. */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail_internal("%s", err_detail.data));
+ }
}
/*
@@ -162,6 +261,64 @@ InitConflictIndexes(ResultRelInfo *relInfo)
relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
}
+/*
+ * GetConflictLogDestAndTable
+ *
+ * Fetches conflict logging metadata from the cached MySubscription pointer.
+ * Sets the destination enum in *log_dest and, if applicable, opens and
+ * returns the relation handle for the conflict log table.
+ */
+Relation
+GetConflictLogDestAndTable(ConflictLogDest *log_dest)
+{
+ Oid conflictlogrelid;
+ Relation conflictlogrel = NULL;
+
+ /*
+ * Convert the text log destination to the internal enum. MySubscription
+ * already contains the data from pg_subscription.
+ */
+ *log_dest = GetLogDestination(MySubscription->conflictlogdest);
+
+ /* Quick exit if a conflict log table was not requested. */
+ if ((*log_dest & CONFLICT_LOG_DEST_TABLE) == 0)
+ return NULL;
+
+ conflictlogrelid = MySubscription->conflictlogrelid;
+
+ Assert(OidIsValid(conflictlogrelid));
+
+ conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock);
+ if (conflictlogrel == NULL)
+ elog(ERROR, "could not open conflict log table (OID %u)",
+ conflictlogrelid);
+
+ return conflictlogrel;
+}
+
+/*
+ * InsertConflictLogTuple
+ *
+ * Insert conflict log tuple into the conflict log table. It uses
+ * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple
+ * inserted into the conflict log table.
+ */
+void
+InsertConflictLogTuple(Relation conflictlogrel)
+{
+ int options = HEAP_INSERT_NO_LOGICAL;
+
+ /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
+ Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+
+ heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+ GetCurrentCommandId(true), options, NULL);
+
+ /* Free conflict log tuple. */
+ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+}
+
/*
* Add SQLSTATE error code to the current conflict report.
*/
@@ -472,6 +629,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
return tuple_value.data;
}
+/*
+ * Helper function to extract the "raw" index key Datums and their null flags
+ * from a TupleTableSlot, given an already open index descriptor.
+ * This is the reusable core logic.
+ */
+static void
+build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull)
+{
+ TupleTableSlot *tableslot = slot;
+
+ /*
+ * If the slot is a virtual slot, copy it into a heap tuple slot as
+ * FormIndexDatum only works with heap tuple slots.
+ */
+ if (TTS_IS_VIRTUAL(slot))
+ {
+ /* Slot is created within the EState's tuple table */
+ tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+ tableslot = ExecCopySlot(tableslot, slot);
+ }
+
+ /*
+ * Initialize ecxt_scantuple for potential use in FormIndexDatum
+ */
+ GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+ /* Form the index datums */
+ FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values,
+ isnull);
+}
+
/*
* Helper functions to construct a string describing the contents of an index
* entry. See BuildIndexValueDescription for details.
@@ -487,41 +678,325 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
Relation indexDesc;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
- TupleTableSlot *tableslot = slot;
- if (!tableslot)
+ if (!slot)
return NULL;
Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
indexDesc = index_open(indexoid, NoLock);
- /*
- * If the slot is a virtual slot, copy it into a heap tuple slot as
- * FormIndexDatum only works with heap tuple slots.
- */
- if (TTS_IS_VIRTUAL(slot))
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+
+ index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+ index_close(indexDesc, NoLock);
+
+ return index_value;
+}
+
+/*
+ * tuple_table_slot_to_json_datum
+ *
+ * Helper function to convert a TupleTableSlot to JSON.
+ */
+static Datum
+tuple_table_slot_to_json_datum(TupleTableSlot *slot)
+{
+ HeapTuple tuple;
+ Datum datum;
+ Datum json;
+
+ Assert(slot != NULL);
+
+ tuple = ExecCopySlotHeapTuple(slot);
+ datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+
+ json = DirectFunctionCall1(row_to_json, datum);
+ heap_freetuple(tuple);
+
+ return json;
+}
+
+/*
+ * tuple_table_slot_to_indextup_json
+ *
+ * Fetch replica identity key from the tuple table slot and convert into a
+ * JSON datum.
+ */
+static Datum
+tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
+ Oid indexid, TupleTableSlot *slot)
+{
+ Relation indexDesc;
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ HeapTuple tuple;
+ TupleDesc tupdesc;
+ Datum datum;
+
+ Assert(slot != NULL);
+
+ Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true));
+
+ indexDesc = index_open(indexid, NoLock);
+
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+ tupdesc = RelationGetDescr(indexDesc);
+
+ /* Bless the tupdesc so it can be looked up by row_to_json. */
+ BlessTupleDesc(tupdesc);
+
+ /* Form the replica identity tuple. */
+ tuple = heap_form_tuple(tupdesc, values, isnull);
+ datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+ index_close(indexDesc, NoLock);
+ heap_freetuple(tuple);
+
+ /* Convert to a JSON datum. */
+ return DirectFunctionCall1(row_to_json, datum);
+}
+
+/*
+ * build_conflict_tupledesc
+ *
+ * Build and bless a tuple descriptor for the internal conflict log table
+ * based on the predefined LocalConflictSchema.
+ */
+static TupleDesc
+build_conflict_tupledesc(void)
+{
+ TupleDesc tupdesc;
+
+ tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++)
+ TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1),
+ LocalConflictSchema[i].attname,
+ LocalConflictSchema[i].atttypid,
+ -1, 0);
+
+ BlessTupleDesc(tupdesc);
+
+ return tupdesc;
+}
+
+/*
+ * Builds the local conflicts JSON array column from the list of
+ * ConflictTupleInfo objects.
+ *
+ * Example output structure:
+ * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
+ */
+static Datum
+build_local_conflicts_json_array(EState *estate, Relation rel,
+ ConflictType conflict_type,
+ List *conflicttuples)
+{
+ ListCell *lc;
+ List *json_datums = NIL;
+ Datum *json_datum_array;
+ bool *json_null_array;
+ Datum json_array_datum;
+ int num_conflicts;
+ int i;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+ TupleDesc tupdesc;
+
+ /* Build local conflicts tuple descriptor. */
+ tupdesc = build_conflict_tupledesc();
+
+ /* Process local conflict tuple list and prepare an array of JSON. */
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
{
- tableslot = table_slot_create(localrel, &estate->es_tupleTable);
- tableslot = ExecCopySlot(tableslot, slot);
+ Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+ bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+ char *origin_name = NULL;
+ HeapTuple tuple;
+ Datum json_datum;
+ int attno;
+
+ attno = 0;
+ values[attno++] = TransactionIdGetDatum(conflicttuple->xmin);
+
+ if (conflicttuple->ts)
+ values[attno++] = TimestampTzGetDatum(conflicttuple->ts);
+ else
+ nulls[attno++] = true;
+
+ if (conflicttuple->origin != InvalidRepOriginId)
+ replorigin_by_oid(conflicttuple->origin, true, &origin_name);
+
+ /* Store empty string if origin name for the tuple is NULL. */
+ if (origin_name != NULL)
+ values[attno++] = CStringGetTextDatum(origin_name);
+ else
+ nulls[attno++] = true;
+
+ /*
+ * Add the conflicting key values in the case of a unique constraint
+ * violation.
+ */
+ if (conflict_type == CT_INSERT_EXISTS ||
+ conflict_type == CT_UPDATE_EXISTS ||
+ conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS)
+ {
+ Oid indexoid = conflicttuple->indexoid;
+
+ Assert(OidIsValid(indexoid) && conflicttuple->slot &&
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock,
+ true));
+ values[attno++] =
+ tuple_table_slot_to_indextup_json(estate, rel,
+ indexoid,
+ conflicttuple->slot);
+ }
+ else
+ nulls[attno++] = true;
+
+ /* Convert conflicting tuple to JSON datum. */
+ if (conflicttuple->slot)
+ values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot);
+ else
+ nulls[attno] = true;
+
+ Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+
+ json_datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+ /*
+ * Build the higher level JSON datum in format described in function
+ * header.
+ */
+ json_datum = DirectFunctionCall1(row_to_json, json_datum);
+
+ /* Done with the temporary tuple. */
+ heap_freetuple(tuple);
+
+ /* Add to the array element. */
+ json_datums = lappend(json_datums, (void *) json_datum);
}
- /*
- * Initialize ecxt_scantuple for potential use in FormIndexDatum when
- * index expressions are present.
- */
- GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+ num_conflicts = list_length(json_datums);
- /*
- * The values/nulls arrays passed to BuildIndexValueDescription should be
- * the results of FormIndexDatum, which are the "raw" input to the index
- * AM.
- */
- FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+ json_datum_array = palloc_array(Datum, num_conflicts);
+ json_null_array = palloc0_array(bool, num_conflicts);
- index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+ i = 0;
+ foreach(lc, json_datums)
+ {
+ json_datum_array[i] = (Datum) lfirst(lc);
+ i++;
+ }
- index_close(indexDesc, NoLock);
+ /* Construct the JSON array Datum. */
+ get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign);
+ json_array_datum = PointerGetDatum(construct_array(json_datum_array,
+ num_conflicts,
+ JSONOID,
+ typlen,
+ typbyval,
+ typalign));
+ pfree(json_datum_array);
+ pfree(json_null_array);
+
+ return json_array_datum;
+}
- return index_value;
+/*
+ * prepare_conflict_log_tuple
+ *
+ * This routine prepares a tuple detailing a conflict encountered during
+ * logical replication. The prepared tuple will be stored in
+ * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * conflict log table by calling InsertConflictLogTuple.
+ */
+static void
+prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ ConflictType conflict_type,
+ TupleTableSlot *searchslot,
+ List *conflicttuples,
+ TupleTableSlot *remoteslot)
+{
+ Datum values[MAX_CONFLICT_ATTR_NUM] = {0};
+ bool nulls[MAX_CONFLICT_ATTR_NUM] = {0};
+ int attno;
+ char *remote_origin = NULL;
+ MemoryContext oldctx;
+
+ Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+
+ /* Populate the values and nulls arrays. */
+ attno = 0;
+ values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel));
+
+ values[attno++] =
+ CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+
+ values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel));
+
+ values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+
+ if (TransactionIdIsValid(remote_xid))
+ values[attno++] = TransactionIdGetDatum(remote_xid);
+ else
+ nulls[attno++] = true;
+
+ values[attno++] = LSNGetDatum(remote_final_lsn);
+
+ if (remote_commit_ts > 0)
+ values[attno++] = TimestampTzGetDatum(remote_commit_ts);
+ else
+ nulls[attno++] = true;
+
+ if (replorigin_session_origin != InvalidRepOriginId)
+ replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+ if (remote_origin != NULL)
+ values[attno++] = CStringGetTextDatum(remote_origin);
+ else
+ nulls[attno++] = true;
+
+ if (!TupIsNull(searchslot))
+ {
+ Oid replica_index = GetRelationIdentityOrPK(rel);
+
+ /*
+ * If the table has a valid replica identity index, build the index
+ * JSON datum from key value. Otherwise, construct it from the complete
+ * tuple in REPLICA IDENTITY FULL cases.
+ */
+ if (OidIsValid(replica_index))
+ values[attno++] = tuple_table_slot_to_indextup_json(estate, rel,
+ replica_index,
+ searchslot);
+ else
+ values[attno++] = tuple_table_slot_to_json_datum(searchslot);
+ }
+ else
+ nulls[attno++] = true;
+
+ if (!TupIsNull(remoteslot))
+ values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+ else
+ nulls[attno++] = true;
+
+ values[attno] = build_local_conflicts_json_array(estate, rel,
+ conflict_type,
+ conflicttuples);
+
+ Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ MyLogicalRepWorker->conflict_log_tuple =
+ heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+ MemoryContextSwitchTo(oldctx);
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3ed86480be2..2dda5a44218 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -477,6 +477,7 @@ retry:
worker->oldest_nonremovable_xid = retain_dead_tuples
? MyReplicationSlot->data.xmin
: InvalidTransactionId;
+ worker->conflict_log_tuple = NULL;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ad281e7069b..d4be1122603 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
static List *on_commit_wakeup_workers_subids = NIL;
bool in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId remote_xid = InvalidTransactionId;
+TimestampTz remote_commit_ts = 0;
/* fields valid only when processing streamed transaction */
static bool in_streamed_transaction = false;
@@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s)
set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
+ remote_commit_ts = begin_data.committime;
+ remote_xid = begin_data.xid;
maybe_start_skipping_changes(begin_data.final_lsn);
@@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s)
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
+ remote_xid = stream_xid;
+ remote_final_lsn = InvalidXLogRecPtr;
+ remote_commit_ts = 0;
+
if (!TransactionIdIsValid(stream_xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -5609,6 +5617,27 @@ start_apply(XLogRecPtr origin_startpos)
pgstat_report_subscription_error(MySubscription->oid,
MyLogicalRepWorker->type);
+ /*
+ * Insert any pending conflict log tuple under a new transaction.
+ */
+ if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+ {
+ Relation conflictlogrel;
+ ConflictLogDest dest;
+
+ StartTransactionCommand();
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /* Open conflict log table and insert the tuple. */
+ conflictlogrel = GetConflictLogDestAndTable(&dest);
+ Assert((dest & CONFLICT_LOG_DEST_TABLE) != 0);
+ InsertConflictLogTuple(conflictlogrel);
+ table_close(conflictlogrel, RowExclusiveLock);
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ }
+
PG_RE_THROW();
}
}
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 4e4f59bb453..aff1db222a4 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -144,4 +144,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
List *conflicttuples);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern Relation GetConflictLogDestAndTable(ConflictLogDest *log_dest);
+extern void InsertConflictLogTuple(Relation conflictlogrel);
+extern bool ValidateConflictLogTable(Relation rel);
#endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index c1285fdd1bc..5bedfc5450f 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -101,6 +101,9 @@ typedef struct LogicalRepWorker
*/
TransactionId oldest_nonremovable_xid;
+ /* A conflict log tuple that is prepared but not yet inserted. */
+ HeapTuple conflict_log_tuple;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
extern PGDLLIMPORT List *table_states_not_ready;
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId remote_xid;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
Oid subid, Oid relid,
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index ddc75e23fb0..a0f3befc06f 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -50,7 +50,7 @@ $node_subscriber->safe_psql(
'postgres',
"CREATE SUBSCRIPTION sub_tab
CONNECTION '$publisher_connstr application_name=$appname'
- PUBLICATION pub_tab;");
+ PUBLICATION pub_tab WITH (conflict_log_destination=all)");
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
@@ -86,10 +86,35 @@ $node_subscriber->wait_for_log(
.*Key \(c\)=\(4\); existing local row \(4, 4, 4\); remote row \(2, 3, 4\)./,
$log_offset);
+# Verify the contents of the Conflict Log Table (CLT)
+# This section ensures that the clt contains the expected
+# type and specific key data.
+my $subid = $node_subscriber->safe_psql('postgres',
+ "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';");
+my $clt = "pg_conflict.pg_conflict_$subid";
+
+# Wait for the conflict to be logged in the CLT
+my $log_check = $node_subscriber->poll_query_until(
+ 'postgres',
+ "SELECT count(*) > 0 FROM $clt;"
+);
+
+my $conflict_check = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';");
+is($conflict_check, 1, 'Verified multiple_unique_conflicts logged into conflict log table');
+
+my $json_query = "SELECT local_conflicts FROM $clt;";
+my $raw_json = $node_subscriber->safe_psql('postgres', $json_query);
+
+# Verify that '2' is present inside the JSON structure using a regex
+# This matches the key/value pattern for "a": 2
+like($raw_json, qr/\\"a\\":2/, 'Verified that key 2 exists in the local_conflicts');
+
pass('multiple_unique_conflicts detected during insert');
# Truncate table to get rid of the error
$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+$node_subscriber->safe_psql('postgres', "DELETE FROM $clt");
##################################################
# Test multiple_unique_conflicts due to UPDATE
@@ -118,6 +143,26 @@ $node_subscriber->wait_for_log(
.*Key \(c\)=\(8\); existing local row \(8, 8, 8\); remote row \(6, 7, 8\)./,
$log_offset);
+# Verify the contents of the Conflict Log Table (CLT)
+# This section ensures that the CLT contains the expected
+# type and specific key data.
+
+# Wait for the conflict to be logged in the CLT
+$log_check = $node_subscriber->poll_query_until(
+ 'postgres',
+ "SELECT count(*) > 0 FROM $clt;"
+);
+
+$conflict_check = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';");
+is($conflict_check, 1, 'Verified multiple_unique_conflicts logged into conflict log table');
+
+$raw_json = $node_subscriber->safe_psql('postgres', $json_query);
+
+# Verify that '6' is present inside the JSON structure using a regex
+# This matches the key/value pattern for "a": 6
+like($raw_json, qr/\\"a\\":6/, 'Verified that key 6 exists in the local_conflicts');
+
pass('multiple_unique_conflicts detected during update');
# Truncate table to get rid of the error
--
2.49.0
[application/octet-stream] v23-0003-Doccumentation-patch.patch (11.0K, 4-v23-0003-Doccumentation-patch.patch)
download | inline diff:
From bdd78671e510b3721023a0027bb69ca3ad6e87a4 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sun, 21 Dec 2025 18:51:57 +0530
Subject: [PATCH v23 3/3] Doccumentation patch
---
doc/src/sgml/logical-replication.sgml | 116 +++++++++++++++++++++-
doc/src/sgml/ref/alter_subscription.sgml | 14 ++-
doc/src/sgml/ref/create_subscription.sgml | 44 ++++++++
3 files changed, 171 insertions(+), 3 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 68d6efe5114..ba9342936ee 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -289,6 +289,19 @@
option of <command>CREATE SUBSCRIPTION</command> for details.
</para>
+ <para>
+ Conflicts that occur during replication are, by default, logged as plain text
+ in the server log, which can make automated monitoring and analysis difficult.
+ The <command>CREATE SUBSCRIPTION</command> command provides the
+ <link linkend="sql-createsubscription-params-with-conflict-log-destination">
+ <literal>conflict_log_destination</literal></link> option to record detailed
+ conflict information in a structured, queryable format. When this parameter
+ is set to <literal>table</literal> or <literal>all</literal>, the system
+ automatically manages a dedicated conflict log table, which is created and
+ dropped along with the subscription. This significantly improves post-mortem
+ analysis and operational visibility of the replication setup.
+ </para>
+
<sect2 id="logical-replication-subscription-slot">
<title>Logical Replication Slot Management</title>
@@ -2118,7 +2131,98 @@ Publications:
</para>
<para>
- The log format for logical replication conflicts is as follows:
+ When the <link linkend="sql-createsubscription-params-with-conflict-log-destination"><literal>conflict_log_destination</literal></link>
+ parameter is set to <literal>table</literal> or <literal>all</literal>, the system
+ automatically creates a new table with a predefined schema to log conflict
+ details. This table is created in the dedicated
+ <literal>pg_conflict</literal> namespace. The name of the conflict log table
+ is <literal>pg_conflict_<subid></literal>. The schema of this
+ table is detailed in
+ <xref linkend="logical-replication-conflict-log-schema"/>.
+ </para>
+
+ <table id="logical-replication-conflict-log-schema">
+ <title>Conflict Log Table Schema</title>
+ <tgroup cols="3">
+ <thead>
+ <row>
+ <entry>Column</entry>
+ <entry>Type</entry>
+ <entry>Description</entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry><literal>relid</literal></entry>
+ <entry><type>oid</type></entry>
+ <entry>The OID of the local table where the conflict occurred.</entry>
+ </row>
+ <row>
+ <entry><literal>schemaname</literal></entry>
+ <entry><type>text</type></entry>
+ <entry>The schema name of the conflicting table.</entry>
+ </row>
+ <row>
+ <entry><literal>relname</literal></entry>
+ <entry><type>text</type></entry>
+ <entry>The name of the conflicting table.</entry>
+ </row>
+ <row>
+ <entry><literal>conflict_type</literal></entry>
+ <entry><type>text</type></entry>
+ <entry>The type of conflict that occurred (e.g., <literal>insert_exists</literal>).</entry>
+ </row>
+ <row>
+ <entry><literal>remote_xid</literal></entry>
+ <entry><type>xid</type></entry>
+ <entry>The remote transaction ID that caused the conflict.</entry>
+ </row>
+ <row>
+ <entry><literal>remote_commit_lsn</literal></entry>
+ <entry><type>pg_lsn</type></entry>
+ <entry>The final LSN of the remote transaction.</entry>
+ </row>
+ <row>
+ <entry><literal>remote_commit_ts</literal></entry>
+ <entry><type>timestamptz</type></entry>
+ <entry>The remote commit timestamp of the remote transaction.</entry>
+ </row>
+ <row>
+ <entry><literal>remote_origin</literal></entry>
+ <entry><type>text</type></entry>
+ <entry>The origin of the remote transaction.</entry>
+ </row>
+ <row>
+ <entry><literal>remote_tuple</literal></entry>
+ <entry><type>json</type></entry>
+ <entry>The JSON representation of the incoming remote row that caused the conflict.</entry>
+ </row>
+ <row>
+ <entry><literal>local_conflicts</literal></entry>
+ <entry><type>json[]</type></entry>
+ <entry>
+ An array of JSON objects representing the local state for each conflict attempt.
+ Each object includes the local transaction ID (<literal>xid</literal>), commit
+ timestamp (<literal>commit_ts</literal>), origin (<literal>origin</literal>),
+ conflicting key data (<literal>key</literal>), and the full local row
+ image (<literal>tuple</literal>).
+ </entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+
+ <para>
+ The conflicting row data, including the incoming remote row and the associated
+ local conflict details, is stored in <type>JSON</type> formats (<literal>remote_tuple</literal>
+ and <literal>local_conflicts</literal>) for flexible querying and analysis.
+ </para>
+
+ <para>
+ If <link linkend="sql-createsubscription-params-with-conflict-log-destination"><literal>conflict_log_destination</literal></link>
+ is left at the default setting or explicitly configured as <literal>log</literal>
+ or <literal>all</literal>, logical replication conflicts are logged in the
+ following format:
<synopsis>
LOG: conflict detected on relation "<replaceable>schemaname</replaceable>.<replaceable>tablename</replaceable>": conflict=<replaceable>conflict_type</replaceable>
DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
@@ -2412,6 +2516,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
key or replica identity defined for it.
</para>
</listitem>
+
+ <listitem>
+ <para>
+ The internal table automatically created when
+ <link linkend="sql-createsubscription-params-with-conflict-log-destination"><literal>conflict_log_destination</literal></link>
+ is set to <literal>table</literal> or <literal>all</literal> is excluded from
+ logical replication. It will not be published, even if a publication on the
+ subscriber is defined using <literal>FOR ALL TABLES</literal>.
+ </para>
+ </listitem>
</itemizedlist>
</sect1>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 27c06439f4f..2de2c3c52fb 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -280,8 +280,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
<link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>,
- <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>, and
- <link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link>.
+ <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>,
+ <link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link> and,
+ <link linkend="sql-createsubscription-params-with-conflict-log-destination"><literal>conflict_log_destination</literal></link>.
Only a superuser can set <literal>password_required = false</literal>.
</para>
@@ -339,6 +340,15 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<quote><literal>pg_conflict_detection</literal></quote>, created to retain
dead tuples for conflict detection, will be dropped.
</para>
+
+ <para>
+ When the <link linkend="sql-createsubscription-params-with-conflict-log-destination"><literal>conflict_log_destination</literal></link>
+ parameter is set to <literal>table</literal> or <literal>all</literal>, the system
+ automatically creates the internal conflict log table if it does not already
+ exist. Conversely, if the destination is changed to
+ <literal>log</literal>, logging to the table stops and the internal
+ table is automatically dropped.
+ </para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index b7dd361294b..561d8f11995 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -247,6 +247,50 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</listitem>
</varlistentry>
+ <varlistentry id="sql-createsubscription-params-with-conflict-log-destination">
+ <term><literal>conflict_log_destination</literal> (<type>enum</type>)</term>
+ <listitem>
+ <para>
+ Specifies the destination for recording logical replication conflicts.
+ </para>
+ <para>
+ The available destinations are:
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>log</literal>: Conflict details are recorded in the server log.
+ This is the default behavior.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>table</literal>: The system automatically creates a structured table
+ named <literal>pg_conflict_<subid></literal> in the
+ <literal>pg_conflict</literal> schema. This allows for easy querying and
+ analysis of conflicts.
+ </para>
+ <caution>
+ <para>
+ The internal conflict log table is strictly tied to the lifecycle of the
+ subscription or the <literal>conflict_log_destination</literal> setting. If
+ the subscription is dropped, or if the destination is changed to
+ <literal>log</literal>, the table and all its recorded conflict data are
+ <emphasis>permanently deleted</emphasis>. To perform a post-mortem analysis
+ after removing a subscription, users must manually back up the conflict log
+ table before the deletion occurs.
+ </para>
+ </caution>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>all</literal>: Records to both the server log and the table.
+ </para>
+ </listitem>
+ </itemizedlist>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="sql-createsubscription-params-with-copy-data">
<term><literal>copy_data</literal> (<type>boolean</type>)</term>
<listitem>
--
2.49.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: pgsql-hackers@postgresql.org
Cc: dilipbalaut@gmail.com, smithpb2250@gmail.com, amit.kapila16@gmail.com, shveta.malik@gmail.com, vignesh21@gmail.com, sawada.mshk@gmail.com, bharath.rupireddyforpostgres@gmail.com, pgsql-hackers@lists.postgresql.org
Subject: Re: Proposal: Conflict log history table for Logical Replication
In-Reply-To: <CAFiTN-v9i9RmDvdUmtMUow4=b+nr0k7LKMyEQ+6ZF=EVdfBhBA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox