public inbox for pgsql-hackers@postgresql.org
help / color / mirror / Atom feedFrom: vignesh C <vignesh21@gmail.com>
To: shveta malik <shveta.malik@gmail.com>
Cc: Dilip Kumar <dilipbalaut@gmail.com>
Cc: Amit Kapila <amit.kapila16@gmail.com>
Cc: Masahiko Sawada <sawada.mshk@gmail.com>
Cc: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Cc: PostgreSQL Hackers <pgsql-hackers@lists.postgresql.org>
Subject: Re: Proposal: Conflict log history table for Logical Replication
Date: Mon, 5 Jan 2026 20:30:15 +0530
Message-ID: <CALDaNm2YOOdJ25X1sJ+DYz37K6Qi4g0ZNFHb_pQMF9UqancnEA@mail.gmail.com> (raw)
In-Reply-To: <CAJpy0uD6fTEUYJx3+yDbvB=VW7c5AaGoeSd7iwHdYYO=kYGn3g@mail.gmail.com>
References: <CAFiTN-u5D5o_AGNbHRZHaOqAMWkxLf+hSk_r9X3gv6HbLOB5+g@mail.gmail.com>
<CALj2ACViThGQDYi-yeqUeHqG2Pozn2AiyvtDtjE6zhhbM0KsEA@mail.gmail.com>
<CAA4eK1+44b3vd_OWfiaVNtjf5Njb5cek09pmKRmttBByeg0NoA@mail.gmail.com>
<CAFiTN-v3L0WacCDx5dkOSonaZQbJfstXL4HrCPD1ahRdUsRnSg@mail.gmail.com>
<CALj2ACW63uuxh0fSoxEAF8OMWhz1dJKSkp268WJDzf5BUqCf5g@mail.gmail.com>
<CAFiTN-s9WWLOhW1TO27NtJwGf0bh2+MWyp3NEkZFeN_S5_p_rA@mail.gmail.com>
<CAA4eK1LxnsEx5sMbQkK5MHAgXKPROMQXQ0n=fKMwz+UsfKQaMQ@mail.gmail.com>
<CAD21AoDj+c4LXf2y4ESR-gVyv9d8V0G4R8R9pn-PcmT5zPzYcg@mail.gmail.com>
<CAA4eK1KokmAwNOL6bS-ip_E3F96PiQTjC4j-M+5vD1T6uUyi3Q@mail.gmail.com>
<CAFiTN-vFKE8E_N6h+peX9DP92mxCeFdm5A9Esn4DkLmNcZ-dOA@mail.gmail.com>
<CAFiTN-shLYf-fOTQ_dBf3Xfx05gxs_8d93MHZXyyz6w2Bg5geQ@mail.gmail.com>
<CAFiTN-tEgkKQHUikn6iBFCYf7XOObR7ncUq=OVh7WEk=6P4ymw@mail.gmail.com>
<CAFiTN-tQiakd8m+-d6WN6RpJXSv_JcropZ2oGzme4d1JudQhYg@mail.gmail.com>
<CAJpy0uDKbYWt+YPADj=4fHEvrGEWgnG1n_YsiGT_EZiZf0VSAw@mail.gmail.com>
<CAFiTN-t82BiXen+HfdR9jZyOpuSO92xonnUK=khXsiZWBfOxMA@mail.gmail.com>
<CAJpy0uAu2paxGAEffD=vaBTW9Jqbtxxawb8K8FgiASfeKPnGog@mail.gmail.com>
<CAJpy0uC0ZWgHOivJ102A1fMkppwK3RuSMafRPKyjwkmJrjhVUw@mail.gmail.com>
<CAFiTN-vFV9-zajrwjYHYyFnyQsooOAXW4CpxB5f-iT3APjOtoQ@mail.gmail.com>
<CAJpy0uBeU1dZgaqsSVKc=P=EVUKxRgVuHR8jDXFL-HLibbE-kQ@mail.gmail.com>
<CAA4eK1+FOkOxhzVLAnDymoNjp4i98H-L1+ZsWDgJEv-ndnTzTA@mail.gmail.com>
<CAFiTN-sVK6Bp+BawCJU_WpAXQSTX4OkKmce5EE4YNBgD-XSjZw@mail.gmail.com>
<CAA4eK1LbjV0bctib9wUnBpEkC+2rZFPnGuRtrKuc5AtUAzum+A@mail.gmail.com>
<CAFiTN-vq50N3QP9p3_SH+tJ8Pn=uRDb0X4qEcQZYcGW9AX88rQ@mail.gmail.com>
<CAFiTN-u3+zRGPESP5kUUfa6NxaWh1HL-gd1225KJ0Uvzi1urow@mail.gmail.com>
<CAA4eK1L4iNk6mNTC83PbYrRfUdtivH4U961PkdFfOO7mvc=USg@mail.gmail.com>
<CAFiTN-v+Mh64UfR5zb5rwgyGm6HS80XRSZ_XeaWkg8=+s9o3Kg@mail.gmail.com>
<CAFiTN-s3ZFHteQsiC3H4=AjTWxuwN-w69XQ3xL5X6YOMTua4pA@mail.gmail.com>
<CAJpy0uDe724nY59j-8hMapZ_Fru1Wo-NucF4Ea1B3Jrw=+J+UQ@mail.gmail.com>
<CAFiTN-uR=86L_5tyiA7n73EXCSCuDfQKfL5O=c8n7zZom8_ONQ@mail.gmail.com>
<CAD21AoDfOS-J0M9WbM3D20eGbSPzbfLQ-9XoYkxO4AZ9twqyvg@mail.gmail.com>
<CAFiTN-vMTg2X7vwfHLr5Gvy8ViV63_iaEcpHmM8V5GpA9-u8cg@mail.gmail.com>
<CAA4eK1+b2Ws0e_ZYJsgZAPn7VWndxAK_YM_QMKcfXst3e7F6Jg@mail.gmail.com>
<CAFiTN-v6hFKMPrSyTBsz=AtEETYMbOxrqvhZJsPQqKgQc4WCLw@mail.gmail.com>
<CAA4eK1KV3rYkaxys5fh-PtE9kq5xrFbiaRpOSPoRgQG494ek+g@mail.gmail.com>
<CAFiTN-utvu=QjY1QQ1a_TvkpkpvesMWo9M8wTFYLaOTPdpOJvw@mail.gmail.com>
<CAA4eK1+HoSOEqNwT3twArPNx4_D7hSUoEg2LnYhX8n9iUwhXgQ@mail.gmail.com>
<CAFiTN-tqmsfW0Sk=1RhzuduxqLrf9KEc8VOvBae+4aYxWTJwuA@mail.gmail.com>
<CAA4eK1JmCQ=DHe3HsqpX+P3mGDUd_Z7E7oAxdstK6822W6tuCw@mail.gmail.com>
<CAFiTN-uE4eAUYewuq3c5deAt3TtVork+H6rkUHRv68cOGr5rmQ@mail.gmail.com>
<CAFiTN-sJbhPX+LbA8YuQeYJpfGA2XA+OKXf8jCm04RoJOyzLvw@mail.gmail.com>
<CAJpy0uBPOyWj9itFjHzGXfrUuYS8KGmAvgdcV_9FPjWZ0EZz_w@mail.gmail.com>
<CAFiTN-s=iLE4qM4qmw9yXKqW09R_c_HqaSGeZXJ2EaTVfXss+g@mail.gmail.com>
<CAA4eK1KYo0vZpPSRc_4gVpa06-J39gxjs3tHFyckgkBfYJSfFA@mail.gmail.com>
<CAFiTN-vrKc6OWzrg6yvpwYcj79k=zkrDp3uwiZzjwrWLJAq6tw@mail.gmail.com>
<CAA4eK1LmvrfEgn1NUZZ=E3yMCjQdNZ5=_SBEry73-EmF6jM_PQ@mail.gmail.com>
<CAFiTN-vjfub5b3PqPQzfOw9BSjm8jt28ott+Hoz9CrRxJHzYkg@mail.gmail.com>
<CAFiTN-v=ANapYvRK+SOy2wJb4CSuD6Vb6_bTGuReM9Dv+3tucA@mail.gmail.com>
<CALDaNm1zEYoSdf2Ns-=UJRw95E5sbfpB0oaNUWtRJN27Q1Knhw@mail.gmail.com>
<CALDaNm3USsXVNBsfdpkp60HVgrTV4taWMk1xZYNBa7QUF=V0jg@mail.gmail.com>
<CAFiTN-sNg9ghLNkB2Kn0SwBGOub9acc99XZZU_d5NAcyW-yrEg@mail.gmail.com>
<CAJpy0uAF3EYcYdpTHdKMeXfvaPbNvnWrZUATrSLL1hqjao=33A@mail.gmail.com>
<CAFiTN-uikggCKp2LscTorKY5d3KF9j93DW0xebDcRX86G+ZsSw@mail.gmail.com>
<CAJpy0uDaOoVK8S3_xxTAcTDpfK1AY7tApw7nPOZG_gUz+DMi=Q@mail.gmail.com>
<CAA4eK1+AdeC5B9xrAXSKWGtTh-0d8xdD=fZttmOBm+c8o8thAQ@mail.gmail.com>
<CAFiTN-skBQAeuzuUd+PDK0Gqc8g+4x9ypBMwJhOrmW8ZCFKGSA@mail.gmail.com>
<CAJpy0uCdrsW5T+okq7xTOVxagje7FW3DOeY5B0CGKYa5VqF_tQ@mail.gmail.com>
<CAFiTN-u+_mFj9caYYFO7=_YHFXk5y=vvOm2H2=5hctYktmAVGA@mail.gmail.com>
<CALDaNm1aivk9KgQ5daeF6YZzuE+0wWc2yb7wb6qikNyvfPN0Sg@mail.gmail.com>
<CAJpy0uD6fTEUYJx3+yDbvB=VW7c5AaGoeSd7iwHdYYO=kYGn3g@mail.gmail.com>
On Fri, 2 Jan 2026 at 12:06, shveta malik <shveta.malik@gmail.com> wrote:
>
> On Fri, Dec 26, 2025 at 8:58 PM vignesh C <vignesh21@gmail.com> wrote:
> >
> >
> > Here is a rebased version of the remaining patches.
> >
>
> Thank You Vignesh. Please find a few comments on 004:
>
>
> 1)
> IIUC, SubscriptionConflictrelIndexId is an unique index on sub-oid and
> conf-relid, but we use it only on relid as key. Why didn't we create
> it only on 'conf-relid' alone? Using a composite unique index is
> guaranteed to give unique row only when all keys are used, but for a
> single key, a unique row is not guaranteed. In our case, it will be a
> unique row as conflict-relid is not shared, but still as an overall
> general concept, it may not be.
As we are searching only on subconflictlogrelid, index on
subconflictlogrelid is sufficient. I have modified it.
> 2)
> IsConflictLogTable():
> + if (OidIsValid(subform->subconflictlogrelid))
>
> Do we need this check? Since we’ve already performed an index access
> using subconflictlogrelid as the key, isn’t it guaranteed to always be
> valid?
It is not required, removed it
> 3)
> Please update the commit message to indicate that this patch makes CLT
> publishable if a publication is explicitly created on it, else few
> changes become very confusing due to unclear intent.
Included this in the commit message.
> 4)
> pg_relation_is_publishable():
>
> /* Subscription conflict log tables are not published */
> - result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)) &&
> - !IsConflictLogTable(relid);
>
> Comment should be removed too.
Removed
> 5)
> We need to remove below comment:
>
> * Note: Conflict log tables are not publishable. However, we intentionally
> * skip this check here because this function is called for every change and
> * performing this check during every change publication is costly. To ensure
> * unpublishable entries are ignored without incurring performance overhead,
> * tuples inserted into the conflict log table uses the HEAP_INSERT_NO_LOGICAL
> * flag. This allows the decoding layer to bypass these entries automatically.
> */
> bool
> is_publishable_relation(Relation rel)
Removed
> 6)
> get_rel_sync_entry:
> + /* is this relation used for conflict logging? */
> + isconflictlogrel = IsConflictLogTable(relid);
>
> Shall we add a comment indicating the intent of change in this
> function. Something like:
>
> /*
> * Check whether this is a conflict log table. If so, avoid publishing it via
> * FOR ALL TABLES or FOR TABLES IN SCHEMA publications, but still allow it
> * to be published through a publication explicitly created for this table.
> */
Included
The attached v19 patch has the changes for the same.
Regards,
Vignesh
Attachments:
[text/x-patch] v19-0001-Add-configurable-conflict-log-table-for-Logical-.patch (101.0K, 2-v19-0001-Add-configurable-conflict-log-table-for-Logical-.patch)
download | inline diff:
From 6df7e487f9e047083235dc28975a4cfc1b72511b Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Wed, 17 Dec 2025 11:53:47 +0530
Subject: [PATCH v19 1/6] Add configurable conflict log table for Logical
Replication
This patch adds a feature to provide a structured, queryable record of all
logical replication conflicts. The current approach of logging conflicts as
plain text in the server logs makes it difficult to query, analyze, and
use for external monitoring and automation.
This patch addresses these limitations by introducing a configurable
conflict_log_destination=('log'/'table'/'all') option in the CREATE SUBSCRIPTION
command.
If the user chooses to enable logging to a table (by selecting 'table' or 'all'),
an internal logging table named conflict_log_table_<subid> is automatically
created within a dedicated, system-managed namespace named pg_conflict. This table
is linked to the subscription via an internal dependency, ensuring it is
automatically dropped when the subscription is removed.
The conflict details, including the original and remote tuples, are stored in JSON
columns, providing a flexible format to accommodate different table schemas.
The log table captures essential attributes such as local and remote transaction IDs,
LSNs, commit timestamps, and conflict type, providing a complete record for post-mortem
analysis.
This feature will make logical replication conflicts easier to monitor and manage,
significantly improving the overall resilience and operability of replication setups.
The conflict log tables will not be included in a publication, even if the publication
is configured to include ALL TABLES or ALL TABLES IN SCHEMA.
---
src/backend/catalog/catalog.c | 27 +-
src/backend/catalog/heap.c | 3 +-
src/backend/catalog/namespace.c | 6 +
src/backend/catalog/pg_publication.c | 26 +-
src/backend/catalog/pg_subscription.c | 7 +
src/backend/commands/subscriptioncmds.c | 284 ++++++++++++++++-
src/bin/psql/describe.c | 21 +-
src/bin/psql/tab-complete.in.c | 8 +-
src/include/catalog/catalog.h | 2 +
src/include/catalog/pg_namespace.dat | 3 +
src/include/catalog/pg_subscription.h | 11 +
src/include/commands/subscriptioncmds.h | 4 +
src/include/replication/conflict.h | 56 ++++
src/test/regress/expected/subscription.out | 336 +++++++++++++++------
src/test/regress/sql/subscription.sql | 118 ++++++++
src/tools/pgindent/typedefs.list | 2 +
16 files changed, 808 insertions(+), 106 deletions(-)
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 7be49032934..d438dc682ec 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -86,7 +86,8 @@ bool
IsSystemClass(Oid relid, Form_pg_class reltuple)
{
/* IsCatalogRelationOid is a bit faster, so test that first */
- return (IsCatalogRelationOid(relid) || IsToastClass(reltuple));
+ return (IsCatalogRelationOid(relid) || IsToastClass(reltuple)
+ || IsConflictClass(reltuple));
}
/*
@@ -230,6 +231,18 @@ IsToastClass(Form_pg_class reltuple)
return IsToastNamespace(relnamespace);
}
+/*
+ * IsConflictClass - Check if the given pg_class tuple belongs to the conflict
+ * namespace.
+ */
+bool
+IsConflictClass(Form_pg_class reltuple)
+{
+ Oid relnamespace = reltuple->relnamespace;
+
+ return IsConflictNamespace(relnamespace);
+}
+
/*
* IsCatalogNamespace
* True iff namespace is pg_catalog.
@@ -264,6 +277,18 @@ IsToastNamespace(Oid namespaceId)
isTempToastNamespace(namespaceId);
}
+/*
+ * IsConflictNamespace
+ * True iff namespace is pg_conflict.
+ *
+ * Does not perform any catalog accesses.
+ */
+bool
+IsConflictNamespace(Oid namespaceId)
+{
+ return namespaceId == PG_CONFLICT_NAMESPACE;
+}
+
/*
* IsReservedName
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 606434823cf..10dadf378a4 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -314,7 +314,8 @@ heap_create(const char *relname,
*/
if (!allow_system_table_mods &&
((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
- IsToastNamespace(relnamespace)) &&
+ IsToastNamespace(relnamespace) ||
+ IsConflictNamespace(relnamespace)) &&
IsNormalProcessingMode())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index c3b79a2ba48..cc7f0a045a6 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -3539,6 +3539,12 @@ CheckSetNamespace(Oid oldNspOid, Oid nspOid)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move objects into or out of TOAST schema")));
+
+ /* similarly for CONFLICT schema */
+ if (nspOid == PG_CONFLICT_NAMESPACE || oldNspOid == PG_CONFLICT_NAMESPACE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot move objects into or out of CONFLICT schema")));
}
/*
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 9a4791c573e..cb383a5ce04 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -31,6 +31,7 @@
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/publicationcmds.h"
+#include "commands/subscriptioncmds.h"
#include "funcapi.h"
#include "utils/array.h"
#include "utils/builtins.h"
@@ -85,6 +86,15 @@ check_publication_add_relation(Relation targetrel)
errmsg("cannot add relation \"%s\" to publication",
RelationGetRelationName(targetrel)),
errdetail("This operation is not supported for unlogged tables.")));
+
+ /* Can't be conflict log table */
+ if (IsConflictLogTable(RelationGetRelid(targetrel)))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot add relation \"%s.%s\" to publication",
+ get_namespace_name(RelationGetNamespace(targetrel)),
+ RelationGetRelationName(targetrel)),
+ errdetail("This operation is not supported for conflict log tables.")));
}
/*
@@ -145,6 +155,13 @@ is_publishable_class(Oid relid, Form_pg_class reltuple)
/*
* Another variant of is_publishable_class(), taking a Relation.
+ *
+ * Note: Conflict log tables are not publishable. However, we intentionally
+ * skip this check here because this function is called for every change and
+ * performing this check during every change publication is costly. To ensure
+ * unpublishable entries are ignored without incurring performance overhead,
+ * tuples inserted into the conflict log table uses the HEAP_INSERT_NO_LOGICAL
+ * flag. This allows the decoding layer to bypass these entries automatically.
*/
bool
is_publishable_relation(Relation rel)
@@ -169,7 +186,10 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
PG_RETURN_NULL();
- result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
+
+ /* Subscription conflict log tables are not published */
+ result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)) &&
+ !IsConflictLogTable(relid);
ReleaseSysCache(tuple);
PG_RETURN_BOOL(result);
}
@@ -890,7 +910,9 @@ GetAllPublicationRelations(char relkind, bool pubviaroot)
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
Oid relid = relForm->oid;
+ /* Subscription conflict log tables are not published */
if (is_publishable_class(relid, relForm) &&
+ !IsConflictLogTable(relid) &&
!(relForm->relispartition && pubviaroot))
result = lappend_oid(result, relid);
}
@@ -1018,7 +1040,7 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Oid relid = relForm->oid;
char relkind;
- if (!is_publishable_class(relid, relForm))
+ if (!is_publishable_class(relid, relForm) || IsConflictLogTable(relid))
continue;
relkind = get_rel_relkind(relid);
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 2b103245290..285a598497d 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -106,6 +106,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->retaindeadtuples = subform->subretaindeadtuples;
sub->maxretention = subform->submaxretention;
sub->retentionactive = subform->subretentionactive;
+ sub->conflictlogrelid = subform->subconflictlogrelid;
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
@@ -141,6 +142,12 @@ GetSubscription(Oid subid, bool missing_ok)
Anum_pg_subscription_suborigin);
sub->origin = TextDatumGetCString(datum);
+ /* Get conflict log destination */
+ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subconflictlogdest);
+ sub->conflictlogdest = TextDatumGetCString(datum);
+
/* Is the subscription owner a superuser? */
sub->ownersuperuser = superuser_arg(sub->owner);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d6674f20fc2..4b2d98c9ed2 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -15,25 +15,31 @@
#include "postgres.h"
#include "access/commit_ts.h"
+#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
+#include "catalog/heap.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
+#include "catalog/pg_am_d.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h"
+#include "catalog/pg_namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
+#include "commands/comment.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
+#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
@@ -51,6 +57,7 @@
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
+#include "utils/regproc.h"
#include "utils/syscache.h"
/*
@@ -75,6 +82,7 @@
#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
#define SUBOPT_LSN 0x00010000
#define SUBOPT_ORIGIN 0x00020000
+#define SUBOPT_CONFLICT_LOG_DEST 0x00040000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -103,6 +111,7 @@ typedef struct SubOpts
bool retaindeadtuples;
int32 maxretention;
char *origin;
+ ConflictLogDest logdest;
XLogRecPtr lsn;
} SubOpts;
@@ -135,7 +144,7 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
static void CheckAlterSubOption(Subscription *sub, const char *option,
bool slot_needs_update, bool isTopLevel);
-
+static Oid create_conflict_log_table(Oid subid, char *subname);
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -191,6 +200,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->maxretention = 0;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+ if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST))
+ opts->logdest = CONFLICT_LOG_DEST_LOG;
/* Parse options */
foreach(lc, stmt_options)
@@ -402,6 +413,20 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_LSN;
opts->lsn = lsn;
}
+ else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST) &&
+ strcmp(defel->defname, "conflict_log_destination") == 0)
+ {
+ char *val;
+ ConflictLogDest dest;
+
+ if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_DEST))
+ errorConflictingDefElem(defel, pstate);
+
+ val = defGetString(defel);
+ dest = GetLogDestination(val);
+ opts->logdest = dest;
+ opts->specified_opts |= SUBOPT_CONFLICT_LOG_DEST;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -612,7 +637,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_RETAIN_DEAD_TUPLES |
- SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
+ SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN |
+ SUBOPT_CONFLICT_LOG_DEST);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -747,6 +773,31 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_suborigin - 1] =
CStringGetTextDatum(opts.origin);
+ /* Always set the destination, default will be 'log'. */
+ values[Anum_pg_subscription_subconflictlogdest - 1] =
+ CStringGetTextDatum(ConflictLogDestNames[opts.logdest]);
+
+ /*
+ * If logging to a table is required, physically create the logging
+ * relation and store its OID in the catalog.
+ */
+ if (opts.logdest == CONFLICT_LOG_DEST_TABLE ||
+ opts.logdest == CONFLICT_LOG_DEST_ALL)
+ {
+ Oid logrelid;
+
+ /* Store the Oid returned from creation. */
+ logrelid = create_conflict_log_table(subid, stmt->subname);
+ values[Anum_pg_subscription_subconflictlogrelid - 1] =
+ ObjectIdGetDatum(logrelid);
+ }
+ else
+ {
+ /* Destination is 'log'; no table is needed. */
+ values[Anum_pg_subscription_subconflictlogrelid - 1] =
+ ObjectIdGetDatum(InvalidOid);
+ }
+
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
/* Insert tuple into catalog. */
@@ -1410,7 +1461,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_RETAIN_DEAD_TUPLES |
SUBOPT_MAX_RETENTION_DURATION |
- SUBOPT_ORIGIN);
+ SUBOPT_ORIGIN |
+ SUBOPT_CONFLICT_LOG_DEST);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1665,6 +1717,63 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
origin = opts.origin;
}
+ if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DEST))
+ {
+ ConflictLogDest old_dest =
+ GetLogDestination(sub->conflictlogdest);
+
+ if (opts.logdest != old_dest)
+ {
+ bool want_table =
+ IsSet(opts.logdest, CONFLICT_LOG_DEST_TABLE);
+ bool has_oldtable =
+ IsSet(old_dest, CONFLICT_LOG_DEST_TABLE);
+
+ values[Anum_pg_subscription_subconflictlogdest - 1] =
+ CStringGetTextDatum(ConflictLogDestNames[opts.logdest]);
+ replaces[Anum_pg_subscription_subconflictlogdest - 1] = true;
+
+ if (want_table && !has_oldtable)
+ {
+ Oid relid;
+
+ relid = create_conflict_log_table(subid, sub->name);
+
+ values[Anum_pg_subscription_subconflictlogrelid - 1] =
+ ObjectIdGetDatum(relid);
+ replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
+ true;
+ }
+ else if (!want_table && has_oldtable)
+ {
+ ObjectAddress object;
+
+ /*
+ * Conflict log tables are recorded as internal
+ * dependencies of the subscription. Drop the
+ * table if it is not required anymore to avoid
+ * stale or orphaned relations.
+ *
+ * XXX: At present, only conflict log tables are
+ * managed this way. In future if we introduce
+ * additional internal dependencies, we may need
+ * a targeted deletion to avoid deletion of any
+ * other objects.
+ */
+ ObjectAddressSet(object, SubscriptionRelationId,
+ subid);
+ performDeletion(&object, DROP_CASCADE,
+ PERFORM_DELETION_INTERNAL |
+ PERFORM_DELETION_SKIP_ORIGINAL);
+
+ values[Anum_pg_subscription_subconflictlogrelid - 1] =
+ ObjectIdGetDatum(InvalidOid);
+ replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
+ true;
+ }
+ }
+ }
+
update_tuple = true;
break;
}
@@ -2027,6 +2136,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
Form_pg_subscription form;
List *rstates;
bool must_use_password;
+ ObjectAddress object;
/*
* The launcher may concurrently start a new worker for this subscription.
@@ -2184,6 +2294,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+ /*
+ * Conflict log tables are recorded as internal dependencies of the
+ * subscription. We must drop the dependent objects before the
+ * subscription itself is removed. By using
+ * PERFORM_DELETION_SKIP_ORIGINAL, we ensure that only the conflict log
+ * table is reaped while the subscription remains for the final deletion
+ * step.
+ */
+ ObjectAddressSet(object, SubscriptionRelationId, subid);
+ performDeletion(&object, DROP_CASCADE,
+ PERFORM_DELETION_INTERNAL |
+ PERFORM_DELETION_SKIP_ORIGINAL);
+
/* Remove any associated relation synchronization states. */
RemoveSubscriptionRel(subid, InvalidOid);
@@ -3190,3 +3313,158 @@ defGetStreamingMode(DefElem *def)
def->defname)));
return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
}
+
+/*
+ * Builds the TupleDesc for the conflict log table.
+ */
+static TupleDesc
+create_conflict_log_table_tupdesc(void)
+{
+ TupleDesc tupdesc;
+
+ tupdesc = CreateTemplateTupleDesc(MAX_CONFLICT_ATTR_NUM);
+
+ for (int i = 0; i < MAX_CONFLICT_ATTR_NUM; i++)
+ {
+ Oid type_oid = ConflictLogSchema[i].atttypid;
+
+ /*
+ * Special handling for the JSON array type for proper
+ * TupleDescInitEntry call.
+ */
+ if (type_oid == JSONARRAYOID)
+ type_oid = get_array_type(JSONOID);
+
+ TupleDescInitEntry(tupdesc, i + 1,
+ ConflictLogSchema[i].attname,
+ type_oid,
+ -1, 0);
+ }
+
+ return BlessTupleDesc(tupdesc);
+}
+
+/*
+ * Create a structured conflict log table for a subscription.
+ *
+ * The table is created within the dedicated 'pg_conflict' namespace, which
+ * is system-managed. The table name is generated automatically using the
+ * subscription's OID (e.g., "pg_conflict_<subid>") to ensure uniqueness
+ * within the cluster and to avoid collisions during subscription renames.
+ */
+static Oid
+create_conflict_log_table(Oid subid, char *subname)
+{
+ TupleDesc tupdesc;
+ Oid relid;
+ ObjectAddress myself;
+ ObjectAddress subaddr;
+ char relname[NAMEDATALEN];
+
+ snprintf(relname, NAMEDATALEN, "pg_conflict_%u", subid);
+
+ /* There can not be an existing table with the same name. */
+ Assert(!OidIsValid(get_relname_relid(relname, PG_CONFLICT_NAMESPACE)));
+
+ /* Build the tuple descriptor for the new table. */
+ tupdesc = create_conflict_log_table_tupdesc();
+
+ /* Create conflict log table. */
+ relid = heap_create_with_catalog(relname,
+ PG_CONFLICT_NAMESPACE,
+ 0,
+ InvalidOid,
+ InvalidOid,
+ InvalidOid,
+ GetUserId(),
+ HEAP_TABLE_AM_OID,
+ tupdesc,
+ NIL,
+ RELKIND_RELATION,
+ RELPERSISTENCE_PERMANENT,
+ false,
+ false,
+ ONCOMMIT_NOOP,
+ (Datum) 0,
+ false,
+ true,
+ false,
+ InvalidOid,
+ NULL);
+
+ /*
+ * Establish an internal dependency between the conflict log table and
+ * the subscription. By using DEPENDENCY_INTERNAL, we ensure the table
+ * is automatically reaped when the subscription is dropped. This also
+ * prevents the table from being dropped independently unless the
+ * subscription itself is removed.
+ */
+ ObjectAddressSet(myself, RelationRelationId, relid);
+ ObjectAddressSet(subaddr, SubscriptionRelationId, subid);
+ recordDependencyOn(&myself, &subaddr, DEPENDENCY_INTERNAL);
+
+ /* Release tuple descriptor memory. */
+ FreeTupleDesc(tupdesc);
+
+ return relid;
+}
+
+/*
+ * GetLogDestination
+ *
+ * Convert string to enum by comparing against standardized labels.
+ */
+ConflictLogDest
+GetLogDestination(const char *dest)
+{
+ /* Empty string or NULL defaults to LOG. */
+ if (dest == NULL || dest[0] == '\0' || pg_strcasecmp(dest, "log") == 0)
+ return CONFLICT_LOG_DEST_LOG;
+
+ if (pg_strcasecmp(dest,
+ ConflictLogDestNames[CONFLICT_LOG_DEST_TABLE]) == 0)
+ return CONFLICT_LOG_DEST_TABLE;
+
+ if (pg_strcasecmp(dest, ConflictLogDestNames[CONFLICT_LOG_DEST_ALL]) == 0)
+ return CONFLICT_LOG_DEST_ALL;
+
+ /* Unrecognized string. */
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized conflict_log_destination value: \"%s\"", dest),
+ errhint("Valid values are \"log\", \"table\", and \"all\".")));
+}
+
+/*
+ * Check if the specified relation is used as a conflict log table by any
+ * subscription.
+ */
+bool
+IsConflictLogTable(Oid relid)
+{
+ Relation rel;
+ TableScanDesc scan;
+ HeapTuple tup;
+ bool is_clt = false;
+
+ rel = table_open(SubscriptionRelationId, AccessShareLock);
+ scan = table_beginscan_catalog(rel, 0, NULL);
+
+ while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+ {
+ Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+ /* Direct Oid comparison from catalog */
+ if (OidIsValid(subform->subconflictlogrelid) &&
+ subform->subconflictlogrelid == relid)
+ {
+ is_clt = true;
+ break;
+ }
+ }
+
+ table_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ return is_clt;
+}
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 3584c4e1428..20f08e548ba 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6806,7 +6806,7 @@ describeSubscriptions(const char *pattern, bool verbose)
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
false, false, false, false, false, false, false, false, false, false,
- false, false, false, false};
+ false, false, false, false, false};
if (pset.sversion < 100000)
{
@@ -6900,15 +6900,22 @@ describeSubscriptions(const char *pattern, bool verbose)
appendPQExpBuffer(&buf,
", subskiplsn AS \"%s\"\n",
gettext_noop("Skip LSN"));
+
+ /* Conflict log destination is supported in v19 and higher */
+ if (pset.sversion >= 190000)
+ {
+ appendPQExpBuffer(&buf,
+ ", subconflictlogdest AS \"%s\"\n",
+ gettext_noop("Conflict log destination"));
+ }
}
/* Only display subscriptions in current database. */
- appendPQExpBufferStr(&buf,
- "FROM pg_catalog.pg_subscription\n"
- "WHERE subdbid = (SELECT oid\n"
- " FROM pg_catalog.pg_database\n"
- " WHERE datname = pg_catalog.current_database())");
-
+ appendPQExpBuffer(&buf,
+ "FROM pg_catalog.pg_subscription "
+ "WHERE subdbid = (SELECT oid\n"
+ " FROM pg_catalog.pg_database\n"
+ " WHERE datname = pg_catalog.current_database())");
if (!validateSQLNamePattern(&buf, pattern, true, false,
NULL, "subname", NULL,
NULL,
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index d81f2fcdbe6..e5eb434c90c 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2344,8 +2344,8 @@ match_previous_words(int pattern_id,
COMPLETE_WITH("(", "PUBLICATION");
/* ALTER SUBSCRIPTION <name> SET ( */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "("))
- COMPLETE_WITH("binary", "disable_on_error", "failover",
- "max_retention_duration", "origin",
+ COMPLETE_WITH("binary", "conflict_log_destination", "disable_on_error",
+ "failover", "max_retention_duration", "origin",
"password_required", "retain_dead_tuples",
"run_as_owner", "slot_name", "streaming",
"synchronous_commit", "two_phase");
@@ -3851,8 +3851,8 @@ match_previous_words(int pattern_id,
COMPLETE_WITH("WITH (");
/* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */
else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "("))
- COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
- "disable_on_error", "enabled", "failover",
+ COMPLETE_WITH("binary", "conflict_log_destination", "connect", "copy_data",
+ "create_slot", "disable_on_error", "enabled", "failover",
"max_retention_duration", "origin",
"password_required", "retain_dead_tuples",
"run_as_owner", "slot_name", "streaming",
diff --git a/src/include/catalog/catalog.h b/src/include/catalog/catalog.h
index a9d6e8ea986..8193229f2e2 100644
--- a/src/include/catalog/catalog.h
+++ b/src/include/catalog/catalog.h
@@ -25,6 +25,7 @@ extern bool IsInplaceUpdateRelation(Relation relation);
extern bool IsSystemClass(Oid relid, Form_pg_class reltuple);
extern bool IsToastClass(Form_pg_class reltuple);
+extern bool IsConflictClass(Form_pg_class reltuple);
extern bool IsCatalogRelationOid(Oid relid);
extern bool IsCatalogTextUniqueIndexOid(Oid relid);
@@ -32,6 +33,7 @@ extern bool IsInplaceUpdateOid(Oid relid);
extern bool IsCatalogNamespace(Oid namespaceId);
extern bool IsToastNamespace(Oid namespaceId);
+extern bool IsConflictNamespace(Oid namespaceId);
extern bool IsReservedName(const char *name);
diff --git a/src/include/catalog/pg_namespace.dat b/src/include/catalog/pg_namespace.dat
index 3075e142c73..c6e10150b21 100644
--- a/src/include/catalog/pg_namespace.dat
+++ b/src/include/catalog/pg_namespace.dat
@@ -18,6 +18,9 @@
{ oid => '99', oid_symbol => 'PG_TOAST_NAMESPACE',
descr => 'reserved schema for TOAST tables',
nspname => 'pg_toast', nspacl => '_null_' },
+{ oid => '1382', oid_symbol => 'PG_CONFLICT_NAMESPACE',
+ descr => 'reserved schema for subscription-specific conflict tables',
+ nspname => 'pg_conflict', nspacl => '_null_' },
# update dumpNamespace() if changing this descr
{ oid => '2200', oid_symbol => 'PG_PUBLIC_NAMESPACE',
descr => 'standard public schema',
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index f3571d2bfcf..4aa29ea15d4 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -90,6 +90,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
* exceeded max_retention_duration, when
* defined */
+ Oid subconflictlogrelid; /* Relid of the conflict log table. */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +104,14 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
/* List of publications subscribed to */
text subpublications[1] BKI_FORCE_NOT_NULL;
+ /*
+ * Strategy for logging replication conflicts:
+ * 'log' - server log only,
+ * 'table' - internal table only,
+ * 'all' - both log and table.
+ */
+ text subconflictlogdest;
+
/* Only publish data originating from the specified origin */
text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
#endif
@@ -152,12 +161,14 @@ typedef struct Subscription
* and the retention duration has not
* exceeded max_retention_duration, when
* defined */
+ Oid conflictlogrelid; /* conflict log table Oid */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
List *publications; /* List of publication names to subscribe to */
char *origin; /* Only publish data originating from the
* specified origin */
+ char *conflictlogdest; /* Conflict log destination */
} Subscription;
#ifdef EXPOSE_TO_CLIENT_CODE
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 63504232a14..bc4a92af356 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -17,6 +17,7 @@
#include "catalog/objectaddress.h"
#include "parser/parse_node.h"
+#include "replication/conflict.h"
extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
bool isTopLevel);
@@ -36,4 +37,7 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
bool retention_active,
bool max_retention_set);
+extern ConflictLogDest GetLogDestination(const char *dest);
+extern bool IsConflictLogTable(Oid relid);
+
#endif /* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index d538274637f..af6deaa4297 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -10,6 +10,7 @@
#define CONFLICT_H
#include "access/xlogdefs.h"
+#include "catalog/pg_type.h"
#include "nodes/pg_list.h"
#include "utils/timestamp.h"
@@ -79,6 +80,61 @@ typedef struct ConflictTupleInfo
* conflicting local row occurred */
} ConflictTupleInfo;
+/*
+ * Conflict log destination types.
+ *
+ * These values are defined as bitmask flags to allow for multiple simultaneous
+ * logging destinations (e.g., logging to both system logs and a table).
+ * Internally, we use these for bitwise comparisons (IsSet), but the string
+ * representation is stored in pg_subscription.subconflictlogdest.
+ */
+typedef enum ConflictLogDest
+{
+ /* Log conflicts to the server logs */
+ CONFLICT_LOG_DEST_LOG = 1 << 0, /* 0x01 */
+
+ /* Log conflicts to an internally managed table */
+ CONFLICT_LOG_DEST_TABLE = 1 << 1, /* 0x02 */
+
+ /* Convenience flag for all supported destinations */
+ CONFLICT_LOG_DEST_ALL = (CONFLICT_LOG_DEST_LOG | CONFLICT_LOG_DEST_TABLE)
+} ConflictLogDest;
+
+/*
+ * Array mapping for converting internal enum to string.
+ */
+static const char *const ConflictLogDestNames[] = {
+ [CONFLICT_LOG_DEST_LOG] = "log",
+ [CONFLICT_LOG_DEST_TABLE] = "table",
+ [CONFLICT_LOG_DEST_ALL] = "all"
+};
+
+/* Structure to hold metadata for one column of the conflict log table */
+typedef struct ConflictLogColumnDef
+{
+ const char *attname; /* Column name */
+ Oid atttypid; /* Data type OID */
+} ConflictLogColumnDef;
+
+/* The single source of truth for the conflict log table schema */
+static const ConflictLogColumnDef ConflictLogSchema[] =
+{
+ { .attname = "relid", .atttypid = OIDOID },
+ { .attname = "schemaname", .atttypid = TEXTOID },
+ { .attname = "relname", .atttypid = TEXTOID },
+ { .attname = "conflict_type", .atttypid = TEXTOID },
+ { .attname = "remote_xid", .atttypid = XIDOID },
+ { .attname = "remote_commit_lsn",.atttypid = LSNOID },
+ { .attname = "remote_commit_ts", .atttypid = TIMESTAMPTZOID },
+ { .attname = "remote_origin", .atttypid = TEXTOID },
+ { .attname = "replica_identity", .atttypid = JSONOID },
+ { .attname = "remote_tuple", .atttypid = JSONOID },
+ { .attname = "local_conflicts", .atttypid = JSONARRAYOID }
+};
+
+/* Define the count using the array size */
+#define MAX_CONFLICT_ATTR_NUM (sizeof(ConflictLogSchema) / sizeof(ConflictLogSchema[0]))
+
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
RepOriginId *localorigin,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index b3eccd8afe3..d5f8abe9325 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 | log
(1 row)
-- ok - with lsn = NONE
@@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log
(1 row)
BEGIN;
@@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 | log
(1 row)
-- rename back to keep the rest simple
@@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
-- fail - publication already exists
@@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
-- fail - publication used more than once
@@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
-- we can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -433,10 +433,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -450,19 +450,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
-- ok
ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -517,7 +517,167 @@ COMMIT;
-- ok, owning it is enough for this stuff
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+--
+-- CONFLICT LOG DESTINATION TESTS
+--
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+-- fail - unrecognized parameter value
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid');
+ERROR: unrecognized conflict_log_destination value: "invalid"
+HINT: Valid values are "log", "table", and "all".
+-- verify subconflictlogdest is 'log' and relid is 0 (InvalidOid) for default case
+CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+SELECT subname, subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_log_default';
+ subname | subconflictlogdest | subconflictlogrelid
+------------------------------+--------------------+---------------------
+ regress_conflict_log_default | log | 0
+(1 row)
+
+-- verify empty string defaults to 'log'
+CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = '');
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+SELECT subname, subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_empty_str';
+ subname | subconflictlogdest | subconflictlogrelid
+----------------------------+--------------------+---------------------
+ regress_conflict_empty_str | log | 0
+(1 row)
+
+-- this should generate an internal table named pg_conflict_$subid$
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table');
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+-- check metadata in pg_subscription: destination should be 'table' and relid valid
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+ subname | subconflictlogdest | has_relid
+------------------------+--------------------+-----------
+ regress_conflict_test1 | table | t
+(1 row)
+
+-- verify the physical table exists and its OID matches subconflictlogrelid
+SELECT count(*)
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test1' AND c.oid = s.subconflictlogrelid;
+ count
+-------
+ 1
+(1 row)
+
+-- check if the internal table has the correct schema (11 columns)
+SELECT count(*)
+FROM pg_attribute a
+JOIN pg_class c ON a.attrelid = c.oid
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0;
+ count
+-------
+ 11
+(1 row)
+
+--
+-- ALTER SUBSCRIPTION - conflict_log_destination state transitions
+--
+-- These tests verify the transition logic between different logging
+-- destinations, ensuring internal tables are created or dropped as expected.
+--
+-- transition from 'log' to 'all'
+-- a new internal conflict log table should be created
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log');
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all');
+-- verify metadata after ALTER (destination should be 'all')
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+ subname | subconflictlogdest | has_relid
+------------------------+--------------------+-----------
+ regress_conflict_test2 | all | t
+(1 row)
+
+-- transition from 'all' to 'table'
+-- should NOT drop the table, only change destination string
+SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table');
+SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+ subconflictlogdest | relid_unchanged
+--------------------+-----------------
+ table | t
+(1 row)
+
+-- transition from 'table' to 'log'
+-- should drop the table and clear relid
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log');
+SELECT subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+ subconflictlogdest | subconflictlogrelid
+--------------------+---------------------
+ log | 0
+(1 row)
+
+-- verify the physical table is gone
+SELECT count(*)
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test2';
+ count
+-------
+ 0
+(1 row)
+
+-- ensure drop table not allowed and DROP SUBSCRIPTION reaps the table
+-- re-enable table logging for verification
+ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table');
+-- fail - drop table not allowed due to internal dependency
+-- use DO block to hide OID in error message
+DO $$
+BEGIN
+ EXECUTE 'DROP TABLE ' || (SELECT 'pg_conflict.pg_conflict_' || oid FROM pg_subscription WHERE subname = 'regress_conflict_test1');
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege';
+END $$;
+NOTICE: captured expected error: insufficient_privilege
+-- PUBLICATION: Verify internal tables are not publishable
+-- pg_relation_is_publishable should return false for internal conflict log tables
+SELECT pg_relation_is_publishable(subconflictlogrelid)
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+ pg_relation_is_publishable
+----------------------------
+ f
+(1 row)
+
+-- CLEANUP: Proper drop reaps the table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+-- Verify the table OID for reap check
+SELECT 'pg_conflict_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset
+DROP SUBSCRIPTION regress_conflict_test1;
+-- should return NULL, meaning the internal table was reaped via dependency
+SELECT to_regclass(:'internal_tablename');
+ to_regclass
+-------------
+
+(1 row)
+
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_log_default DISABLE;
+ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_log_default;
+ALTER SUBSCRIPTION regress_conflict_empty_str DISABLE;
+ALTER SUBSCRIPTION regress_conflict_empty_str SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_empty_str;
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
RESET SESSION AUTHORIZATION;
+DROP SCHEMA clt;
+ERROR: schema "clt" does not exist
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
DROP ROLE regress_subscription_user3;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index ef0c298d2df..6c7f358ffd2 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -365,7 +365,125 @@ COMMIT;
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+--
+-- CONFLICT LOG DESTINATION TESTS
+--
+
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+
+-- fail - unrecognized parameter value
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid');
+
+-- verify subconflictlogdest is 'log' and relid is 0 (InvalidOid) for default case
+CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+SELECT subname, subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_log_default';
+
+-- verify empty string defaults to 'log'
+CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = '');
+SELECT subname, subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_empty_str';
+
+-- this should generate an internal table named pg_conflict_$subid$
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table');
+
+-- check metadata in pg_subscription: destination should be 'table' and relid valid
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+
+-- verify the physical table exists and its OID matches subconflictlogrelid
+SELECT count(*)
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test1' AND c.oid = s.subconflictlogrelid;
+
+-- check if the internal table has the correct schema (11 columns)
+SELECT count(*)
+FROM pg_attribute a
+JOIN pg_class c ON a.attrelid = c.oid
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0;
+
+--
+-- ALTER SUBSCRIPTION - conflict_log_destination state transitions
+--
+-- These tests verify the transition logic between different logging
+-- destinations, ensuring internal tables are created or dropped as expected.
+--
+
+-- transition from 'log' to 'all'
+-- a new internal conflict log table should be created
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log');
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all');
+
+-- verify metadata after ALTER (destination should be 'all')
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- transition from 'all' to 'table'
+-- should NOT drop the table, only change destination string
+SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table');
+SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- transition from 'table' to 'log'
+-- should drop the table and clear relid
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log');
+SELECT subconflictlogdest, subconflictlogrelid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- verify the physical table is gone
+SELECT count(*)
+FROM pg_class c
+JOIN pg_subscription s ON c.relname = 'pg_conflict_' || s.oid
+WHERE s.subname = 'regress_conflict_test2';
+
+-- ensure drop table not allowed and DROP SUBSCRIPTION reaps the table
+-- re-enable table logging for verification
+ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table');
+
+-- fail - drop table not allowed due to internal dependency
+-- use DO block to hide OID in error message
+DO $$
+BEGIN
+ EXECUTE 'DROP TABLE ' || (SELECT 'pg_conflict.pg_conflict_' || oid FROM pg_subscription WHERE subname = 'regress_conflict_test1');
+EXCEPTION WHEN insufficient_privilege THEN
+ RAISE NOTICE 'captured expected error: insufficient_privilege';
+END $$;
+
+-- PUBLICATION: Verify internal tables are not publishable
+-- pg_relation_is_publishable should return false for internal conflict log tables
+SELECT pg_relation_is_publishable(subconflictlogrelid)
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+
+-- CLEANUP: Proper drop reaps the table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+
+-- Verify the table OID for reap check
+SELECT 'pg_conflict_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset
+
+DROP SUBSCRIPTION regress_conflict_test1;
+
+-- should return NULL, meaning the internal table was reaped via dependency
+SELECT to_regclass(:'internal_tablename');
+
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_log_default DISABLE;
+ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_log_default;
+
+ALTER SUBSCRIPTION regress_conflict_empty_str DISABLE;
+ALTER SUBSCRIPTION regress_conflict_empty_str SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_empty_str;
+
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
+
RESET SESSION AUTHORIZATION;
+DROP SCHEMA clt;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
DROP ROLE regress_subscription_user3;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b9e671fcda8..7e2410bf54e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -500,6 +500,8 @@ ConditionVariableMinimallyPadded
ConditionalStack
ConfigData
ConfigVariable
+ConflictLogColumnDef
+ConflictLogDest
ConflictTupleInfo
ConflictType
ConnCacheEntry
--
2.43.0
[text/x-patch] v19-0002-Implement-the-conflict-insertion-infrastructure-.patch (33.0K, 3-v19-0002-Implement-the-conflict-insertion-infrastructure-.patch)
download | inline diff:
From ba128ff1de0f99f6f2ed113d821e20bb6e762acf Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sat, 20 Dec 2025 15:20:09 +0530
Subject: [PATCH v19 2/6] Implement the conflict insertion infrastructure for
the conflict log table
This patch introduces the core logic to populate the conflict log table whenever
a logical replication conflict is detected. It captures the remote transaction
details along with the corresponding local state at the time of the conflict.
Handling Multi-row Conflicts: A single remote tuple may conflict with multiple
local tuples (e.g., in the case of multiple_unique_conflicts). To handle this,
the infrastructure creates a single row in the conflict log table for each
remote tuple. The details of all conflicting local rows are aggregated into a
single JSON array in the local_conflicts column.
The JSON array uses the following structured format:
[ { "xid": "1001", "commit_ts": "2025-12-25 10:00:00+05:30", "origin": "node_1",
"key": {"id": 1}, "tuple": {"id": 1, "val": "old_data"} }, ... ]
Example of querying the structured conflict data:
SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid,
local_conflicts[1] ->> 'tuple' AS local_tuple
FROM myschema.conflict_log_history2;
remote_xid | relname | remote_origin | local_xid | local_tuple
------------+----------+---------------+-----------+---------------------
760 | test | pg_16406 | 771 | {"a":1,"b":10}
765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2}
---
src/backend/replication/logical/conflict.c | 548 +++++++++++++++++--
src/backend/replication/logical/launcher.c | 1 +
src/backend/replication/logical/worker.c | 31 +-
src/include/replication/conflict.h | 4 +-
src/include/replication/worker_internal.h | 7 +
src/test/subscription/t/037_conflict_dest.pl | 181 ++++++
6 files changed, 728 insertions(+), 44 deletions(-)
create mode 100644 src/test/subscription/t/037_conflict_dest.pl
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 93222ee3b88..e23ff0b70cf 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,20 @@
#include "postgres.h"
#include "access/commit_ts.h"
+#include "access/heapam.h"
#include "access/tableam.h"
+#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
+#include "funcapi.h"
#include "pgstat.h"
#include "replication/conflict.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/jsonb.h"
static const char *const ConflictTypeNames[] = {
[CT_INSERT_EXISTS] = "insert_exists",
@@ -34,6 +41,19 @@ static const char *const ConflictTypeNames[] = {
[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
+/* Schema for the elements within the 'local_conflicts' JSON array */
+static const ConflictLogColumnDef LocalConflictSchema[] =
+{
+ { .attname = "xid", .atttypid = XIDOID },
+ { .attname = "commit_ts", .atttypid = TIMESTAMPTZOID },
+ { .attname = "origin", .atttypid = TEXTOID },
+ { .attname = "key", .atttypid = JSONOID },
+ { .attname = "tuple", .atttypid = JSONOID }
+};
+
+#define MAX_LOCAL_CONFLICT_INFO_ATTRS \
+ (sizeof(LocalConflictSchema) / sizeof(LocalConflictSchema[0]))
+
static int errcode_apply_conflict(ConflictType type);
static void errdetail_apply_conflict(EState *estate,
ResultRelInfo *relinfo,
@@ -50,8 +70,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *localslot,
TupleTableSlot *remoteslot,
Oid indexoid);
+static void build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull);
static char *build_index_value_desc(EState *estate, Relation localrel,
TupleTableSlot *slot, Oid indexoid);
+static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot);
+static Datum tuple_table_slot_to_indextup_json(EState *estate,
+ Relation localrel,
+ Oid replica_index,
+ TupleTableSlot *slot);
+static TupleDesc build_conflict_tupledesc(void);
+static Datum build_local_conflicts_json_array(EState *estate, Relation rel,
+ ConflictType conflict_type,
+ List *conflicttuples);
+static void prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ ConflictType conflict_type,
+ TupleTableSlot *searchslot,
+ List *conflicttuples,
+ TupleTableSlot *remoteslot);
/*
* Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -105,30 +144,83 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *remoteslot, List *conflicttuples)
{
- Relation localrel = relinfo->ri_RelationDesc;
- StringInfoData err_detail;
+ Relation localrel = relinfo->ri_RelationDesc;
+ ConflictLogDest dest;
+ Relation conflictlogrel;
- initStringInfo(&err_detail);
+ /*
+ * Get both the conflict log destination and the opened conflict log
+ * relation for insertion.
+ */
+ conflictlogrel = GetConflictLogTableInfo(&dest);
- /* Form errdetail message by combining conflicting tuples information. */
- foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
- errdetail_apply_conflict(estate, relinfo, type, searchslot,
- conflicttuple->slot, remoteslot,
- conflicttuple->indexoid,
- conflicttuple->xmin,
- conflicttuple->origin,
- conflicttuple->ts,
- &err_detail);
+ /* Insert to table if destination is 'table' or 'all' */
+ if ((dest & CONFLICT_LOG_DEST_TABLE) != 0)
+ {
+ Assert(conflictlogrel != NULL);
+
+ /*
+ * Prepare the conflict log tuple. If the error level is below ERROR,
+ * insert it immediately. Otherwise, defer the insertion to a new
+ * transaction after the current one aborts, ensuring the insertion of
+ * the log tuple is not rolled back.
+ */
+ prepare_conflict_log_tuple(estate,
+ relinfo->ri_RelationDesc,
+ conflictlogrel,
+ type,
+ searchslot,
+ conflicttuples,
+ remoteslot);
+ if (elevel < ERROR)
+ InsertConflictLogTuple(conflictlogrel);
+
+ table_close(conflictlogrel, RowExclusiveLock);
+ }
pgstat_report_subscription_conflict(MySubscription->oid, type);
- ereport(elevel,
- errcode_apply_conflict(type),
- errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
- get_namespace_name(RelationGetNamespace(localrel)),
- RelationGetRelationName(localrel),
- ConflictTypeNames[type]),
- errdetail_internal("%s", err_detail.data));
+ /* Decide what detail to show in server logs. */
+ if ((dest & CONFLICT_LOG_DEST_LOG) != 0)
+ {
+ StringInfoData err_detail;
+
+ initStringInfo(&err_detail);
+
+ /* Form errdetail message by combining conflicting tuples information. */
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+ errdetail_apply_conflict(estate, relinfo, type, searchslot,
+ conflicttuple->slot, remoteslot,
+ conflicttuple->indexoid,
+ conflicttuple->xmin,
+ conflicttuple->origin,
+ conflicttuple->ts,
+ &err_detail);
+
+ /* Standard reporting with full internal details. */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail_internal("%s", err_detail.data));
+ }
+ else
+ {
+ /*
+ * 'table' only: Report the error msg but omit raw tuple data from
+ * server logs since it's already captured in the internal table.
+ */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail("Conflict details logged to internal table with OID %u.",
+ MySubscription->conflictlogrelid));
+ }
}
/*
@@ -162,6 +254,67 @@ InitConflictIndexes(ResultRelInfo *relInfo)
relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
}
+/*
+ * GetConflictLogTableInfo
+ *
+ * Fetches conflict logging metadata from the cached MySubscription pointer.
+ * Sets the destination enum in *log_dest and, if applicable, opens and
+ * returns the relation handle for the internal log table.
+ */
+Relation
+GetConflictLogTableInfo(ConflictLogDest *log_dest)
+{
+ Oid conflictlogrelid;
+ Relation conflictlogrel = NULL;
+
+ /*
+ * Convert the text log destination to the internal enum. MySubscription
+ * already contains the data from pg_subscription.
+ */
+ *log_dest = GetLogDestination(MySubscription->conflictlogdest);
+ conflictlogrelid = MySubscription->conflictlogrelid;
+
+ /* If destination is 'log' only, no table to open. */
+ if (*log_dest == CONFLICT_LOG_DEST_LOG)
+ return NULL;
+
+ Assert(OidIsValid(conflictlogrelid));
+
+ conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock);
+
+ /* Conflict log table is dropped or not accessible. */
+ if (conflictlogrel == NULL)
+ ereport(WARNING,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("conflict log table with OID %u does not exist",
+ conflictlogrelid)));
+
+ return conflictlogrel;
+}
+
+/*
+ * InsertConflictLogTuple
+ *
+ * Insert conflict log tuple into the conflict log table. It uses
+ * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple
+ * inserted into the conflict log table.
+ */
+void
+InsertConflictLogTuple(Relation conflictlogrel)
+{
+ int options = HEAP_INSERT_NO_LOGICAL;
+
+ /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
+ Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+
+ heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+ GetCurrentCommandId(true), options, NULL);
+
+ /* Free conflict log tuple. */
+ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+}
+
/*
* Add SQLSTATE error code to the current conflict report.
*/
@@ -472,6 +625,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
return tuple_value.data;
}
+/*
+ * Helper function to extract the "raw" index key Datums and their null flags
+ * from a TupleTableSlot, given an already open index descriptor.
+ * This is the reusable core logic.
+ */
+static void
+build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull)
+{
+ TupleTableSlot *tableslot = slot;
+
+ /*
+ * If the slot is a virtual slot, copy it into a heap tuple slot as
+ * FormIndexDatum only works with heap tuple slots.
+ */
+ if (TTS_IS_VIRTUAL(slot))
+ {
+ /* Slot is created within the EState's tuple table */
+ tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+ tableslot = ExecCopySlot(tableslot, slot);
+ }
+
+ /*
+ * Initialize ecxt_scantuple for potential use in FormIndexDatum
+ */
+ GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+ /* Form the index datums */
+ FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values,
+ isnull);
+}
+
/*
* Helper functions to construct a string describing the contents of an index
* entry. See BuildIndexValueDescription for details.
@@ -487,41 +674,318 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
Relation indexDesc;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
- TupleTableSlot *tableslot = slot;
- if (!tableslot)
+ if (!slot)
return NULL;
Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
indexDesc = index_open(indexoid, NoLock);
- /*
- * If the slot is a virtual slot, copy it into a heap tuple slot as
- * FormIndexDatum only works with heap tuple slots.
- */
- if (TTS_IS_VIRTUAL(slot))
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+
+ index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+ index_close(indexDesc, NoLock);
+
+ return index_value;
+}
+
+/*
+ * tuple_table_slot_to_json_datum
+ *
+ * Helper function to convert a TupleTableSlot to Jsonb.
+ */
+static Datum
+tuple_table_slot_to_json_datum(TupleTableSlot *slot)
+{
+ HeapTuple tuple;
+ Datum datum;
+ Datum json;
+
+ Assert(slot != NULL);
+
+ tuple = ExecCopySlotHeapTuple(slot);
+ datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+
+ json = DirectFunctionCall1(row_to_json, datum);
+ heap_freetuple(tuple);
+
+ return json;
+}
+
+/*
+ * tuple_table_slot_to_indextup_json
+ *
+ * Fetch replica identity key from the tuple table slot and convert into a
+ * jsonb datum.
+ */
+static Datum
+tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
+ Oid indexid, TupleTableSlot *slot)
+{
+ Relation indexDesc;
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ HeapTuple tuple;
+ TupleDesc tupdesc;
+ Datum datum;
+
+ Assert(slot != NULL);
+
+ Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true));
+
+ indexDesc = index_open(indexid, NoLock);
+
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+ tupdesc = RelationGetDescr(indexDesc);
+
+ /* Bless the tupdesc so it can be looked up by row_to_json. */
+ BlessTupleDesc(tupdesc);
+
+ /* Form the replica identity tuple. */
+ tuple = heap_form_tuple(tupdesc, values, isnull);
+ datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+ index_close(indexDesc, NoLock);
+
+ /* Convert to a JSONB datum. */
+ return DirectFunctionCall1(row_to_json, datum);
+}
+
+static TupleDesc
+build_conflict_tupledesc(void)
+{
+ TupleDesc tupdesc;
+
+ tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++)
+ TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1),
+ LocalConflictSchema[i].attname,
+ LocalConflictSchema[i].atttypid,
+ -1, 0);
+
+ BlessTupleDesc(tupdesc);
+
+ return tupdesc;
+}
+
+/*
+ * Builds the local conflicts JSONB array column from the list of
+ * ConflictTupleInfo objects.
+ *
+ * Example output structure:
+ * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
+ */
+static Datum
+build_local_conflicts_json_array(EState *estate, Relation rel,
+ ConflictType conflict_type,
+ List *conflicttuples)
+{
+ ListCell *lc;
+ List *json_datums = NIL; /* List to hold the row_to_json results (type json) */
+ Datum *json_datum_array;
+ bool *json_null_array;
+ Datum json_array_datum;
+ int num_conflicts;
+ int i;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+ TupleDesc tupdesc;
+
+ /* Build local conflicts tuple descriptor. */
+ tupdesc = build_conflict_tupledesc();
+
+ /* Process local conflict tuple list and prepare an array of JSON. */
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
{
- tableslot = table_slot_create(localrel, &estate->es_tupleTable);
- tableslot = ExecCopySlot(tableslot, slot);
+ Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+ bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+ char *origin_name = NULL;
+ HeapTuple tuple;
+ Datum json_datum;
+ int attno;
+
+ attno = 0;
+ values[attno++] = TransactionIdGetDatum(conflicttuple->xmin);
+
+ if (conflicttuple->ts)
+ values[attno++] = TimestampTzGetDatum(conflicttuple->ts);
+ else
+ nulls[attno++] = true;
+
+ if (conflicttuple->origin != InvalidRepOriginId)
+ replorigin_by_oid(conflicttuple->origin, true, &origin_name);
+
+ /* Store empty string if origin name for the tuple is NULL. */
+ if (origin_name != NULL)
+ values[attno++] = CStringGetTextDatum(origin_name);
+ else
+ nulls[attno++] = true;
+
+ /*
+ * Add the conflicting key values in the case of a unique constraint
+ * violation.
+ */
+ if (conflict_type == CT_INSERT_EXISTS ||
+ conflict_type == CT_UPDATE_EXISTS ||
+ conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS)
+ {
+ Oid indexoid = conflicttuple->indexoid;
+
+ Assert(OidIsValid(indexoid) && conflicttuple->slot &&
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock,
+ true));
+ values[attno++] =
+ tuple_table_slot_to_indextup_json(estate, rel,
+ indexoid,
+ conflicttuple->slot);
+ }
+ else
+ nulls[attno++] = true;
+
+ /* Convert conflicting tuple to JSON datum. */
+ if (conflicttuple->slot)
+ values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot);
+ else
+ nulls[attno] = true;
+
+ Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+
+ json_datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+ /*
+ * Build the higher level JSON datum in format described in function
+ * header.
+ */
+ json_datum = DirectFunctionCall1(row_to_json, json_datum);
+
+ /* Done with the temporary tuple. */
+ heap_freetuple(tuple);
+
+ /* Add to the array element. */
+ json_datums = lappend(json_datums, (void *) json_datum);
}
- /*
- * Initialize ecxt_scantuple for potential use in FormIndexDatum when
- * index expressions are present.
- */
- GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+ num_conflicts = list_length(json_datums);
- /*
- * The values/nulls arrays passed to BuildIndexValueDescription should be
- * the results of FormIndexDatum, which are the "raw" input to the index
- * AM.
- */
- FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+ json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum));
+ json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool));
- index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+ i = 0;
+ foreach(lc, json_datums)
+ {
+ json_datum_array[i] = (Datum) lfirst(lc);
+ i++;
+ }
- index_close(indexDesc, NoLock);
+ /* Construct the json[] array Datum. */
+ get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign);
+ json_array_datum = PointerGetDatum(construct_array(json_datum_array,
+ num_conflicts,
+ JSONOID,
+ typlen,
+ typbyval,
+ typalign));
+ pfree(json_datum_array);
+ pfree(json_null_array);
+
+ return json_array_datum;
+}
- return index_value;
+/*
+ * prepare_conflict_log_tuple
+ *
+ * This routine prepares a tuple detailing a conflict encountered during
+ * logical replication. The prepared tuple will be stored in
+ * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * conflict log table by calling InsertConflictLogTuple.
+ */
+static void
+prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ ConflictType conflict_type,
+ TupleTableSlot *searchslot,
+ List *conflicttuples,
+ TupleTableSlot *remoteslot)
+{
+ Datum values[MAX_CONFLICT_ATTR_NUM] = {0};
+ bool nulls[MAX_CONFLICT_ATTR_NUM] = {0};
+ int attno;
+ char *remote_origin = NULL;
+ MemoryContext oldctx;
+
+ Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+
+ /* Populate the values and nulls arrays. */
+ attno = 0;
+ values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel));
+
+ values[attno++] =
+ CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+
+ values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel));
+
+ values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+
+ if (TransactionIdIsValid(remote_xid))
+ values[attno++] = TransactionIdGetDatum(remote_xid);
+ else
+ nulls[attno++] = true;
+
+ values[attno++] = LSNGetDatum(remote_final_lsn);
+
+ if (remote_commit_ts > 0)
+ values[attno++] = TimestampTzGetDatum(remote_commit_ts);
+ else
+ nulls[attno++] = true;
+
+ if (replorigin_session_origin != InvalidRepOriginId)
+ replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+ if (remote_origin != NULL)
+ values[attno++] = CStringGetTextDatum(remote_origin);
+ else
+ nulls[attno++] = true;
+
+ if (!TupIsNull(searchslot))
+ {
+ Oid replica_index = GetRelationIdentityOrPK(rel);
+
+ /*
+ * If the table has a valid replica identity index, build the index
+ * json datum from key value. Otherwise, construct it from the complete
+ * tuple in REPLICA IDENTITY FULL cases.
+ */
+ if (OidIsValid(replica_index))
+ values[attno++] = tuple_table_slot_to_indextup_json(estate, rel,
+ replica_index,
+ searchslot);
+ else
+ values[attno++] = tuple_table_slot_to_json_datum(searchslot);
+ }
+ else
+ nulls[attno++] = true;
+
+ if (!TupIsNull(remoteslot))
+ values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+ else
+ nulls[attno++] = true;
+
+ values[attno] = build_local_conflicts_json_array(estate, rel,
+ conflict_type,
+ conflicttuples);
+
+ Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ MyLogicalRepWorker->conflict_log_tuple =
+ heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+ MemoryContextSwitchTo(oldctx);
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3ed86480be2..2dda5a44218 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -477,6 +477,7 @@ retry:
worker->oldest_nonremovable_xid = retain_dead_tuples
? MyReplicationSlot->data.xmin
: InvalidTransactionId;
+ worker->conflict_log_tuple = NULL;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ad281e7069b..5ac826c279f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
static List *on_commit_wakeup_workers_subids = NIL;
bool in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId remote_xid = InvalidTransactionId;
+TimestampTz remote_commit_ts = 0;
/* fields valid only when processing streamed transaction */
static bool in_streamed_transaction = false;
@@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s)
set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
+ remote_commit_ts = begin_data.committime;
+ remote_xid = begin_data.xid;
maybe_start_skipping_changes(begin_data.final_lsn);
@@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s)
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
+ remote_xid = stream_xid;
+ remote_final_lsn = InvalidXLogRecPtr;
+ remote_commit_ts = 0;
+
if (!TransactionIdIsValid(stream_xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -5609,6 +5617,27 @@ start_apply(XLogRecPtr origin_startpos)
pgstat_report_subscription_error(MySubscription->oid,
MyLogicalRepWorker->type);
+ /*
+ * Insert any pending conflict log tuple under a new transaction.
+ */
+ if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+ {
+ Relation conflictlogrel;
+ ConflictLogDest dest;
+
+ StartTransactionCommand();
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /* Open conflict log table and insert the tuple. */
+ conflictlogrel = GetConflictLogTableInfo(&dest);
+ Assert((dest & CONFLICT_LOG_DEST_TABLE) != 0);
+ InsertConflictLogTuple(conflictlogrel);
+ table_close(conflictlogrel, RowExclusiveLock);
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ }
+
PG_RE_THROW();
}
}
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index af6deaa4297..b60c0b03e26 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -132,7 +132,6 @@ static const ConflictLogColumnDef ConflictLogSchema[] =
{ .attname = "local_conflicts", .atttypid = JSONARRAYOID }
};
-/* Define the count using the array size */
#define MAX_CONFLICT_ATTR_NUM (sizeof(ConflictLogSchema) / sizeof(ConflictLogSchema[0]))
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
@@ -145,4 +144,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
List *conflicttuples);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern Relation GetConflictLogTableInfo(ConflictLogDest *log_dest);
+extern void InsertConflictLogTuple(Relation conflictlogrel);
+extern bool ValidateConflictLogTable(Relation rel);
#endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index c1285fdd1bc..5bedfc5450f 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -101,6 +101,9 @@ typedef struct LogicalRepWorker
*/
TransactionId oldest_nonremovable_xid;
+ /* A conflict log tuple that is prepared but not yet inserted. */
+ HeapTuple conflict_log_tuple;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
extern PGDLLIMPORT List *table_states_not_ready;
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId remote_xid;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
Oid subid, Oid relid,
diff --git a/src/test/subscription/t/037_conflict_dest.pl b/src/test/subscription/t/037_conflict_dest.pl
new file mode 100644
index 00000000000..d33993530a4
--- /dev/null
+++ b/src/test/subscription/t/037_conflict_dest.pl
@@ -0,0 +1,181 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test conflicts in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);"
+);
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY key, b int UNIQUE, c int UNIQUE);");
+
+$node_subscriber->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int, c int, unique(a,b)) PARTITION BY RANGE (a);
+ CREATE TABLE conf_tab_2_p1 PARTITION OF conf_tab_2 FOR VALUES FROM (MINVALUE) TO (100);
+]);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_tab FOR TABLE conf_tab, conf_tab_2");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION sub_tab
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION pub_tab WITH (conflict_log_destination=table)");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+###############################################################################
+# Test conflict insertion into the internal conflict log table
+###############################################################################
+
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (10, 10, 10);");
+
+# Get the internally generated table name
+my $subid = $node_subscriber->safe_psql('postgres',
+ "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';");
+my $conflict_table = "pg_conflict.pg_conflict_$subid";
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (10, 20, 30);");
+
+# Wait for the conflict to be logged
+my $log_check = $node_subscriber->poll_query_until(
+ 'postgres',
+ "SELECT count(*) > 0 FROM $conflict_table;"
+);
+
+is($log_check, 1, 'Conflict was successfully logged to the internal table');
+
+my $json_query = qq[
+ SELECT string_agg((unnested.j::json)->'key'->>'a', ',')
+ FROM (
+ SELECT unnest(local_conflicts) AS j
+ FROM $conflict_table
+ ) AS unnested;
+];
+
+my $all_keys = $node_subscriber->safe_psql('postgres', $json_query);
+
+# Verify that '10' is present in the resulting string
+like($all_keys, qr/10/, 'Verified that key 10 exists in the local_conflicts log');
+
+pass('Conflict type and data successfully validated in internal table');
+
+# Final cleanup for subsequent bidirectional tests in the script
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+###############################################################################
+# Test Case: update_missing
+###############################################################################
+
+# Sync a row, then delete it locally on subscriber
+$node_publisher->safe_psql('postgres', "INSERT INTO conf_tab VALUES (50, 50, 50);");
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres', "DELETE FROM conf_tab WHERE a = 50;");
+
+# Trigger conflict by updating that row on publisher
+$node_publisher->safe_psql('postgres', "UPDATE conf_tab SET b = 500 WHERE a = 50;");
+
+# Wait for the apply worker to detect the missing row and log it
+$node_subscriber->poll_query_until('postgres',
+ "SELECT count(*) > 0 FROM $conflict_table WHERE conflict_type = 'update_missing';"
+) or die "Timed out waiting for update_missing conflict";
+
+my $upd_miss_check = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM $conflict_table WHERE conflict_type = 'update_missing';");
+is($upd_miss_check, 1, 'Verified update_missing conflict logged to internal table');
+
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+###############################################################################
+# Test Case: insert_exists (via secondary unique index)
+###############################################################################
+
+# 1. Subscriber has a row with b=100
+$node_subscriber->safe_psql('postgres', "INSERT INTO conf_tab VALUES (100, 100, 100);");
+
+# 2. Publisher inserts a NEW PK (101) but a DUPLICATE 'b' (100)
+$node_publisher->safe_psql('postgres', "INSERT INTO conf_tab VALUES (101, 100, 101);");
+
+# 3. Verify it appears as 'insert_exists' in your log table
+$node_subscriber->poll_query_until('postgres',
+ "SELECT count(*) > 0 FROM $conflict_table WHERE conflict_type = 'insert_exists' AND local_conflicts::text LIKE '%100%';"
+) or die "Timed out waiting for secondary index insert_exists conflict";
+
+pass('Logged insert_exists triggered by secondary unique index violation');
+
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+###############################################################################
+# CASE 3: Switching Destination to 'log' (Server Log Verification)
+###############################################################################
+
+# Switch destination
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub_tab SET (conflict_log_destination = 'all');");
+
+$node_subscriber->safe_psql('postgres', "DELETE FROM $conflict_table;");
+# Trigger a conflict for server log (insert_exists)
+$node_subscriber->safe_psql('postgres', "INSERT INTO conf_tab VALUES (600, 600, 600);");
+$node_publisher->safe_psql('postgres', "INSERT INTO conf_tab VALUES (600, 700, 700);");
+
+# Wait for table log
+$node_subscriber->poll_query_until('postgres', "SELECT count(*) > 0 FROM $conflict_table;")
+ or die "Timed out waiting for insert_exists conflict";
+
+# Check subscriber server log
+my $log_found = $node_subscriber->wait_for_log(
+ qr/conflict detected on relation "public.conf_tab": conflict=insert_exists/
+);
+ok($log_found, 'Conflict correctly directed to server stderr log');
+
+# Verify table count DID NOT increase for this conflict
+my $table_check = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM $conflict_table WHERE local_conflicts::text LIKE '%600%';");
+is($table_check, 1, 'Table log was bypassed when destination set to log');
+
+done_testing();
--
2.43.0
[text/x-patch] v19-0004-Add-shared-index-for-conflict-log-table-lookup-a.patch (14.1K, 4-v19-0004-Add-shared-index-for-conflict-log-table-lookup-a.patch)
download | inline diff:
From 50dc48b442e2c7683fb0e2948cec1e9b30f08c81 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 5 Jan 2026 15:46:10 +0530
Subject: [PATCH v19 4/6] Add shared index for conflict log table lookup and
allow explicit publication throuhg FOR TABLE publication
Introduce a dedicated shared unique index on
pg_subscription.subconflictlogrelid to make conflict log table
detection efficient, and index-backed. Previously, IsConflictLogTable()
relied on a full catalog scan of pg_subscription, which was inefficient.
This change adds pg_subscription_conflictrel_index and marks it as a
shared index, matching the shared pg_subscription table, and rewrites
conflict log table detection to use an indexed systable scan.
Additionally conflict tables can be replicated when the table is
explicitly specified through a FOR TABLE publication.
---
src/backend/catalog/catalog.c | 1 +
src/backend/catalog/pg_publication.c | 20 +--------------
src/backend/catalog/pg_subscription.c | 23 +++++++++++++++++
src/backend/commands/subscriptioncmds.c | 28 +++++++++------------
src/backend/replication/logical/conflict.c | 4 +--
src/backend/replication/pgoutput/pgoutput.c | 20 ++++++++++++---
src/bin/psql/describe.c | 4 ++-
src/include/catalog/pg_proc.dat | 7 ++++++
src/include/catalog/pg_subscription.h | 1 +
src/test/regress/expected/subscription.out | 7 +++---
src/test/regress/sql/subscription.sql | 5 ++--
11 files changed, 73 insertions(+), 47 deletions(-)
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index d438dc682ec..148a6ccf998 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -361,6 +361,7 @@ IsSharedRelation(Oid relationId)
relationId == SharedSecLabelObjectIndexId ||
relationId == SubscriptionNameIndexId ||
relationId == SubscriptionObjectIndexId ||
+ relationId == SubscriptionConflictrelIndexId ||
relationId == TablespaceNameIndexId ||
relationId == TablespaceOidIndexId)
return true;
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index cb383a5ce04..fcd48166edf 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -86,15 +86,6 @@ check_publication_add_relation(Relation targetrel)
errmsg("cannot add relation \"%s\" to publication",
RelationGetRelationName(targetrel)),
errdetail("This operation is not supported for unlogged tables.")));
-
- /* Can't be conflict log table */
- if (IsConflictLogTable(RelationGetRelid(targetrel)))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot add relation \"%s.%s\" to publication",
- get_namespace_name(RelationGetNamespace(targetrel)),
- RelationGetRelationName(targetrel)),
- errdetail("This operation is not supported for conflict log tables.")));
}
/*
@@ -155,13 +146,6 @@ is_publishable_class(Oid relid, Form_pg_class reltuple)
/*
* Another variant of is_publishable_class(), taking a Relation.
- *
- * Note: Conflict log tables are not publishable. However, we intentionally
- * skip this check here because this function is called for every change and
- * performing this check during every change publication is costly. To ensure
- * unpublishable entries are ignored without incurring performance overhead,
- * tuples inserted into the conflict log table uses the HEAP_INSERT_NO_LOGICAL
- * flag. This allows the decoding layer to bypass these entries automatically.
*/
bool
is_publishable_relation(Relation rel)
@@ -187,9 +171,7 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
if (!HeapTupleIsValid(tuple))
PG_RETURN_NULL();
- /* Subscription conflict log tables are not published */
- result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)) &&
- !IsConflictLogTable(relid);
+ result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
ReleaseSysCache(tuple);
PG_RETURN_BOOL(result);
}
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 285a598497d..1a93824504c 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -22,6 +22,7 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
+#include "commands/subscriptioncmds.h"
#include "miscadmin.h"
#include "storage/lmgr.h"
#include "utils/array.h"
@@ -156,6 +157,28 @@ GetSubscription(Oid subid, bool missing_ok)
return sub;
}
+/*
+ * pg_relation_is_conflict_log_table
+ *
+ * Returns true if the given relation OID is used as a conflict log table
+ * by any subscription, else returns false.
+ */
+Datum
+pg_relation_is_conflict_log_table(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ HeapTuple tuple;
+ bool result;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+ if (!HeapTupleIsValid(tuple))
+ PG_RETURN_NULL();
+
+ result = IsConflictLogTable(relid);
+ ReleaseSysCache(tuple);
+ PG_RETURN_BOOL(result);
+}
+
/*
* Return number of subscriptions defined in given database.
* Used by dropdb() to check if database can indeed be dropped.
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 4b2d98c9ed2..49c504960db 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -53,6 +53,7 @@
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -3443,27 +3444,22 @@ bool
IsConflictLogTable(Oid relid)
{
Relation rel;
- TableScanDesc scan;
- HeapTuple tup;
- bool is_clt = false;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ bool is_clt;
rel = table_open(SubscriptionRelationId, AccessShareLock);
- scan = table_beginscan_catalog(rel, 0, NULL);
+ ScanKeyInit(&scankey,
+ Anum_pg_subscription_subconflictlogrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(relid));
- while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
- {
- Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
+ scan = systable_beginscan(rel, SubscriptionConflictrelIndexId,
+ true, NULL, 1, &scankey);
- /* Direct Oid comparison from catalog */
- if (OidIsValid(subform->subconflictlogrelid) &&
- subform->subconflictlogrelid == relid)
- {
- is_clt = true;
- break;
- }
- }
+ is_clt = HeapTupleIsValid(systable_getnext(scan));
- table_endscan(scan);
+ systable_endscan(scan);
table_close(rel, AccessShareLock);
return is_clt;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index e23ff0b70cf..6fce652dbcb 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -302,13 +302,11 @@ GetConflictLogTableInfo(ConflictLogDest *log_dest)
void
InsertConflictLogTuple(Relation conflictlogrel)
{
- int options = HEAP_INSERT_NO_LOGICAL;
-
/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
- GetCurrentCommandId(true), options, NULL);
+ GetCurrentCommandId(true), 0, NULL);
/* Free conflict log tuple. */
heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 458418a249d..7ab8c942652 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -2098,6 +2098,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
List *rel_publications = NIL;
+ bool isconflictlogrel;
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -2176,6 +2177,14 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->estate = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ /*
+ * Check whether this table is a conflict log table. If so, avoid
+ * publishing it via FOR ALL TABLES or FOR TABLES IN SCHEMA
+ * publications. However, they may still be published if explicitly
+ * added to a FOR TABLE publication for this table.
+ */
+ isconflictlogrel = IsConflictLogTable(relid);
+
/*
* Build publication cache. We can't use one provided by relcache as
* relcache considers all publications that the given relation is in,
@@ -2199,7 +2208,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* If this is a FOR ALL TABLES publication, pick the partition
* root and set the ancestor level accordingly.
*/
- if (pub->alltables)
+ if (pub->alltables && !isconflictlogrel)
{
publish = true;
if (pub->pubviaroot && am_partition)
@@ -2225,8 +2234,12 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
{
Oid ancestor;
int level;
- List *ancestors = get_partition_ancestors(relid);
+ List *ancestors;
+
+ /* Conflict log table cannot be a partition */
+ Assert(isconflictlogrel == false);
+ ancestors = get_partition_ancestors(relid);
ancestor = GetTopMostAncestorInPublication(pub->oid,
ancestors,
&level);
@@ -2243,7 +2256,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
}
if (list_member_oid(pubids, pub->oid) ||
- list_member_oid(schemaPubids, pub->oid) ||
+ (list_member_oid(schemaPubids, pub->oid) &&
+ !isconflictlogrel) ||
ancestor_published)
publish = true;
}
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 20f08e548ba..230a68892ae 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -3063,6 +3063,7 @@ describeOneTableDetails(const char *schemaname,
" JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n"
" JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n"
"WHERE pc.oid ='%s' and pg_catalog.pg_relation_is_publishable('%s')\n"
+ "AND NOT pg_catalog.pg_relation_is_conflict_log_table('%s'::oid)\n"
"UNION\n"
"SELECT pubname\n"
" , pg_get_expr(pr.prqual, c.oid)\n"
@@ -3082,8 +3083,9 @@ describeOneTableDetails(const char *schemaname,
" , NULL\n"
"FROM pg_catalog.pg_publication p\n"
"WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
+ "AND NOT pg_catalog.pg_relation_is_conflict_log_table('%s'::oid)\n"
"ORDER BY 1;",
- oid, oid, oid, oid);
+ oid, oid, oid, oid, oid, oid);
}
else
{
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7f481687afe..d99f8500ac5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12328,6 +12328,13 @@
prorettype => 'bool', proargtypes => 'regclass',
prosrc => 'pg_relation_is_publishable' },
+# subscriptions
+{ oid => '6123',
+ descr => 'returns whether a relation is a subscription conflict log table',
+ proname => 'pg_relation_is_conflict_log_table', provolatile => 's',
+ prorettype => 'bool', proargtypes => 'regclass',
+ prosrc => 'pg_relation_is_conflict_log_table' },
+
# rls
{ oid => '3298',
descr => 'row security for current context active on table by table oid',
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 4aa29ea15d4..3d220b2db8a 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -123,6 +123,7 @@ DECLARE_TOAST_WITH_MACRO(pg_subscription, 4183, 4184, PgSubscriptionToastTable,
DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_oid_index, 6114, SubscriptionObjectIndexId, pg_subscription, btree(oid oid_ops));
DECLARE_UNIQUE_INDEX(pg_subscription_subname_index, 6115, SubscriptionNameIndexId, pg_subscription, btree(subdbid oid_ops, subname name_ops));
+DECLARE_INDEX(pg_subscription_conflictrel_index, 6122, SubscriptionConflictrelIndexId, pg_subscription, btree(subconflictlogrelid oid_ops));
MAKE_SYSCACHE(SUBSCRIPTIONOID, pg_subscription_oid_index, 4);
MAKE_SYSCACHE(SUBSCRIPTIONNAME, pg_subscription_subname_index, 4);
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index d5f8abe9325..84abbaa5a4a 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -643,13 +643,14 @@ EXCEPTION WHEN insufficient_privilege THEN
RAISE NOTICE 'captured expected error: insufficient_privilege';
END $$;
NOTICE: captured expected error: insufficient_privilege
--- PUBLICATION: Verify internal tables are not publishable
--- pg_relation_is_publishable should return false for internal conflict log tables
+-- PUBLICATION: Verify internal tables are publishable
+-- pg_relation_is_publishable should return true for internal conflict log
+-- tables, as it can be published using TABLE publication.
SELECT pg_relation_is_publishable(subconflictlogrelid)
FROM pg_subscription WHERE subname = 'regress_conflict_test1';
pg_relation_is_publishable
----------------------------
- f
+ t
(1 row)
-- CLEANUP: Proper drop reaps the table
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 6c7f358ffd2..83befa8722c 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -452,8 +452,9 @@ EXCEPTION WHEN insufficient_privilege THEN
RAISE NOTICE 'captured expected error: insufficient_privilege';
END $$;
--- PUBLICATION: Verify internal tables are not publishable
--- pg_relation_is_publishable should return false for internal conflict log tables
+-- PUBLICATION: Verify internal tables are publishable
+-- pg_relation_is_publishable should return true for internal conflict log
+-- tables, as it can be published using TABLE publication.
SELECT pg_relation_is_publishable(subconflictlogrelid)
FROM pg_subscription WHERE subname = 'regress_conflict_test1';
--
2.43.0
[text/x-patch] v19-0003-Doccumentation-patch.patch (11.3K, 5-v19-0003-Doccumentation-patch.patch)
download | inline diff:
From b1f2be1ef91231ba6ee4429ca73fbd666dcbcfc6 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sun, 21 Dec 2025 18:51:57 +0530
Subject: [PATCH v19 3/6] Doccumentation patch
---
doc/src/sgml/logical-replication.sgml | 124 +++++++++++++++++++++-
doc/src/sgml/ref/alter_subscription.sgml | 14 ++-
doc/src/sgml/ref/create_subscription.sgml | 36 +++++++
3 files changed, 168 insertions(+), 6 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 58ce75d8b63..a2c66b164a0 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -289,6 +289,18 @@
option of <command>CREATE SUBSCRIPTION</command> for details.
</para>
+ <para>
+ Conflicts that occur during replication are, by default, logged as plain text
+ in the server log, which can make automated monitoring and analysis difficult.
+ The <command>CREATE SUBSCRIPTION</command> command provides the
+ <link linkend="sql-createsubscription-params-with-conflict-log-destination">
+ <literal>conflict_log_destination</literal></link> option to record detailed
+ conflict information in a structured, queryable format. When this parameter
+ is set to <literal>table</literal> or <literal>all</literal>, the system
+ automatically manages a dedicated conflict storage table, which is created
+ and dropped along with the subscription. This significantly improves post-mortem
+ analysis and operational visibility of the replication setup.
+
<sect2 id="logical-replication-subscription-slot">
<title>Logical Replication Slot Management</title>
@@ -2006,9 +2018,15 @@ Publications:
</para>
<para>
- Additional logging is triggered, and the conflict statistics are collected (displayed in the
- <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
- in the following <firstterm>conflict</firstterm> cases:
+ Additional logging is triggered, and the conflict statistics are collected
+ (displayed in the <link linkend="monitoring-pg-stat-subscription-stats">
+ <structname>pg_stat_subscription_stats</structname></link> view)
+ in the following <firstterm>conflict</firstterm> cases. If the subscription
+ was created with the <literal>conflict_log_destination</literal> set to
+ <literal>table</literal> or <literal>all</literal>, detailed conflict
+ information is inserted into an internally managed table named
+ <literal>pg_conflict.pg_conflict_<replaceable>subscription_oid</replaceable>
+ </literal>, providing a structured record of all conflicts.
<variablelist>
<varlistentry id="conflict-insert-exists" xreflabel="insert_exists">
<term><literal>insert_exists</literal></term>
@@ -2118,7 +2136,96 @@ Publications:
</para>
<para>
- The log format for logical replication conflicts is as follows:
+ When the <literal>conflict_log_destination</literal> is set to
+ <literal>table</literal> or <literal>all</literal>, the system automatically
+ creates a new table with a predefined schema to log conflict details. This
+ table is created in the dedicated <literal>pg_conflict</literal> namespace.
+ The schema of this table is detailed in
+ <xref linkend="logical-replication-conflict-log-schema"/>.
+ </para>
+
+ <table id="logical-replication-conflict-log-schema">
+ <title>Conflict Log Table Schema</title>
+ <tgroup cols="3">
+ <thead>
+ <row>
+ <entry>Column</entry>
+ <entry>Type</entry>
+ <entry>Description</entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry><literal>relid</literal></entry>
+ <entry><type>oid</type></entry>
+ <entry>The OID of the local table where the conflict occurred.</entry>
+ </row>
+ <row>
+ <entry><literal>schemaname</literal></entry>
+ <entry><type>text</type></entry>
+ <entry>The schema name of the conflicting table.</entry>
+ </row>
+ <row>
+ <entry><literal>relname</literal></entry>
+ <entry><type>text</type></entry>
+ <entry>The name of the conflicting table.</entry>
+ </row>
+ <row>
+ <entry><literal>conflict_type</literal></entry>
+ <entry><type>text</type></entry>
+ <entry>The type of conflict that occurred (e.g., <literal>insert_exists</literal>).</entry>
+ </row>
+ <row>
+ <entry><literal>remote_xid</literal></entry>
+ <entry><type>xid</type></entry>
+ <entry>The remote transaction ID that caused the conflict.</entry>
+ </row>
+ <row>
+ <entry><literal>remote_commit_lsn</literal></entry>
+ <entry><type>pg_lsn</type></entry>
+ <entry>The final LSN of the remote transaction.</entry>
+ </row>
+ <row>
+ <entry><literal>remote_commit_ts</literal></entry>
+ <entry><type>timestamptz</type></entry>
+ <entry>The remote commit timestamp of the remote transaction.</entry>
+ </row>
+ <row>
+ <entry><literal>remote_origin</literal></entry>
+ <entry><type>text</type></entry>
+ <entry>The origin of the remote transaction.</entry>
+ </row>
+ <row>
+ <entry><literal>remote_tuple</literal></entry>
+ <entry><type>json</type></entry>
+ <entry>The JSON representation of the incoming remote row that caused the conflict.</entry>
+ </row>
+ <row>
+ <entry><literal>local_conflicts</literal></entry>
+ <entry><type>json[]</type></entry>
+ <entry>
+ An array of JSON objects representing the local state for each conflict attempt.
+ Each object includes the local transaction ID (<literal>xid</literal>), commit
+ timestamp (<literal>commit_ts</literal>), origin (<literal>origin</literal>),
+ conflicting key data (<literal>key</literal>), and the full local row
+ image (<literal>tuple</literal>).
+ </entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+
+ <para>
+ The conflicting row data, including the original local tuple and
+ the remote tuple, is stored in <type>JSON</type> columns (<literal>local_tuple</literal>
+ and <literal>remote_tuple</literal>) for flexible querying and analysis.
+ </para>
+
+ <para>
+ If <literal>conflict_log_destination</literal> is left at the default
+ setting or explicitly configured as <literal>log</literal> or
+ <literal>all</literal>, logical replication conflicts are logged in the
+ following format:
<synopsis>
LOG: conflict detected on relation "<replaceable>schemaname</replaceable>.<replaceable>tablename</replaceable>": conflict=<replaceable>conflict_type</replaceable>
DETAIL: <replaceable class="parameter">detailed_explanation</replaceable>.
@@ -2412,6 +2519,15 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
key or replica identity defined for it.
</para>
</listitem>
+
+ <listitem>
+ <para>
+ The internal table automatically created when <literal>conflict_log_destination</literal>
+ is set to <literal>table</literal> or <literal>all</literal> is excluded from
+ logical replication. It will not be published, even if a publication on the
+ subscriber is defined using <literal>FOR ALL TABLES</literal>.
+ </para>
+ </listitem>
</itemizedlist>
</sect1>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 27c06439f4f..90331f590e0 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -280,8 +280,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
<link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>,
- <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>, and
- <link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link>.
+ <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>,
+ <link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link> and,
+ <link linkend="sql-createsubscription-params-with-conflict-log-destination"><literal>conflict_log_destination</literal></link>.
Only a superuser can set <literal>password_required = false</literal>.
</para>
@@ -339,6 +340,15 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<quote><literal>pg_conflict_detection</literal></quote>, created to retain
dead tuples for conflict detection, will be dropped.
</para>
+
+ <para>
+ When the <literal>conflict_log_destination</literal> parameter is set to
+ <literal>table</literal> or <literal>all</literal>, the system
+ automatically creates the internal logging table if it does not already
+ exist. Conversely, if the destination is changed to
+ <literal>log</literal>, logging to the table stops and the internal
+ table is automatically dropped.
+ </para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index b7dd361294b..f50bdb52f35 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -274,6 +274,42 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</listitem>
</varlistentry>
+ <varlistentry id="sql-createsubscription-params-with-conflict-log-destination">
+ <term><literal>conflict_log_destination</literal> (<type>enum</type>)</term>
+ <listitem>
+ <para>
+ Specifies the destination for recording logical replication conflicts.
+ The supported values are <literal>log</literal>, <literal>table</literal>,
+ and <literal>all</literal>. The default is <literal>log</literal>.
+ </para>
+ <para>
+ The available destinations are:
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>log</literal>: Conflict details are recorded in the server log.
+ This is the default behavior.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>table</literal>: The system automatically creates a structured table
+ named <literal>pg_conflict.conflict_log_table_<subid></literal>.
+ This allows for easy querying and analysis of conflicts. This table is
+ automatically dropped when the subscription is removed.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>all</literal>: Records the conflict information to both the server log
+ and the dedicated conflict table.
+ </para>
+ </listitem>
+ </itemizedlist>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="sql-createsubscription-params-with-streaming">
<term><literal>streaming</literal> (<type>enum</type>)</term>
<listitem>
--
2.43.0
[text/x-patch] v19-0005-Preserve-conflict-log-destination-and-subscripti.patch (24.3K, 6-v19-0005-Preserve-conflict-log-destination-and-subscripti.patch)
download | inline diff:
From 15736f6690aea5114af5614bd4e871779546a457 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 5 Jan 2026 15:57:04 +0530
Subject: [PATCH v19 5/6] Preserve conflict log destination and subscription
OID for subscriptions
Support pg_dump to dump and restore the conflict_log_destination setting for
subscriptions.
During a normal CREATE SUBSCRIPTION, a conflict log table is created
automatically when required. However, during binary upgrade, the conflict
log table will already exist and must be reused rather than recreated, and
the subscription must retain its original OID to correctly re-establish
catalog relationships.
To ensure correct behavior, pg_dump now emits an ALTER SUBSCRIPTION command
after subscription creation to restore the conflict_log_destination setting.
---
src/backend/catalog/heap.c | 4 +-
src/backend/commands/subscriptioncmds.c | 144 +++++++++++++-----
src/backend/utils/adt/pg_upgrade_support.c | 10 ++
src/bin/pg_dump/pg_dump.c | 102 ++++++++++++-
src/bin/pg_dump/pg_dump.h | 2 +
src/bin/pg_dump/pg_dump_sort.c | 31 ++++
src/bin/pg_dump/t/002_pg_dump.pl | 5 +-
src/bin/pg_upgrade/pg_upgrade.c | 4 +
src/bin/pg_upgrade/t/004_subscription.pl | 14 +-
src/include/catalog/binary_upgrade.h | 1 +
src/include/catalog/pg_proc.dat | 4 +
.../expected/spgist_name_ops.out | 6 +-
12 files changed, 278 insertions(+), 49 deletions(-)
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 10dadf378a4..3eca0b020f0 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -311,11 +311,13 @@ heap_create(const char *relname,
* But allow creating indexes on relations in pg_catalog even if
* allow_system_table_mods = off, upper layers already guarantee it's on a
* user defined relation, not a system one.
+ *
+ * Allow creation of conflict table in binary-upgrade mode.
*/
if (!allow_system_table_mods &&
((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
IsToastNamespace(relnamespace) ||
- IsConflictNamespace(relnamespace)) &&
+ (!IsBinaryUpgrade && IsConflictNamespace(relnamespace))) &&
IsNormalProcessingMode())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 49c504960db..6c865dacfd7 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -88,6 +88,11 @@
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
+/*
+ * This will be set by the pg_upgrade_support function --
+ * binary_upgrade_set_next_pg_subscription_oid().
+ */
+Oid binary_upgrade_next_pg_subscription_oid = InvalidOid;
/*
* Structure to hold a bitmap representing the user-provided CREATE/ALTER
* SUBSCRIPTION command options and the parsed/default values of each of them.
@@ -735,8 +740,21 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
- subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
- Anum_pg_subscription_oid);
+ /* Use binary-upgrade override for pg_subscription.oid? */
+ if (IsBinaryUpgrade)
+ {
+ if (!OidIsValid(binary_upgrade_next_pg_subscription_oid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("pg_subscription OID value not set when in binary upgrade mode")));
+
+ subid = binary_upgrade_next_pg_subscription_oid;
+ binary_upgrade_next_pg_subscription_oid = InvalidOid;
+ }
+ else
+ subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
+ Anum_pg_subscription_oid);
+
values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
@@ -1378,6 +1396,84 @@ CheckAlterSubOption(Subscription *sub, const char *option,
}
}
+/*
+ * AlterSubscriptionConflictLogDestination
+ *
+ * Update the conflict log table associated with a subscription when its
+ * conflict log destination is changed.
+ *
+ * If the new destination requires a conflict log table and none was previously
+ * required, this function validates an existing conflict log table identified
+ * by the subscription specific naming convention or creates a new one.
+ *
+ * If the new destination no longer requires a conflict log table, the existing
+ * conflict log table associated with the subscription is removed via internal
+ * dependency cleanup to prevent orphaned relations.
+ *
+ * The function enforces that any conflict log table used is a permanent
+ * relation in a permanent schema, matches the expected structure, and is not
+ * already associated with another subscription.
+ *
+ * On success, *conflicttablerelid is set to the OID of the conflict log table
+ * that was created or validated, or to InvalidOid if no table is required.
+ *
+ * Returns true if the subscription's conflict log table reference must be
+ * updated as a result of the destination change; false otherwise.
+ */
+static bool
+AlterSubscriptionConflictLogDestination(Subscription *sub,
+ ConflictLogDest logdest,
+ Oid *conflicttablerelid)
+{
+ ConflictLogDest old_dest = GetLogDestination(sub->conflictlogdest);
+ bool want_table;
+ bool has_oldtable;
+ bool update_relid = false;
+ Oid relid = InvalidOid;
+
+ want_table = IsSet(logdest, CONFLICT_LOG_DEST_TABLE);
+ has_oldtable = IsSet(old_dest, CONFLICT_LOG_DEST_TABLE);
+
+ if (want_table && !has_oldtable)
+ {
+ char relname[NAMEDATALEN];
+
+ snprintf(relname, NAMEDATALEN, "pg_conflict_%u", sub->oid);
+
+ /*
+ * In upgrade scenarios, the conflict log table already exists. Update
+ * the catalog to record the association.
+ */
+ relid = get_relname_relid(relname, PG_CONFLICT_NAMESPACE);
+ if (!OidIsValid(relid))
+ relid = create_conflict_log_table(sub->oid, sub->name);
+
+ update_relid = true;
+ }
+ else if (!want_table && has_oldtable)
+ {
+ ObjectAddress object;
+
+ /*
+ * Conflict log tables are recorded as internal dependencies of the
+ * subscription. Drop the table if it is not required anymore to
+ * avoid stale or orphaned relations.
+ *
+ * XXX: At present, only conflict log tables are managed this way. In
+ * future if we introduce additional internal dependencies, we may
+ * need a targeted deletion to avoid deletion of any other objects.
+ */
+ ObjectAddressSet(object, SubscriptionRelationId, sub->oid);
+ performDeletion(&object, DROP_CASCADE,
+ PERFORM_DELETION_INTERNAL |
+ PERFORM_DELETION_SKIP_ORIGINAL);
+ update_relid = true;
+ }
+
+ *conflicttablerelid = relid;
+ return update_relid;
+}
+
/*
* Alter the existing subscription.
*/
@@ -1725,52 +1821,20 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (opts.logdest != old_dest)
{
- bool want_table =
- IsSet(opts.logdest, CONFLICT_LOG_DEST_TABLE);
- bool has_oldtable =
- IsSet(old_dest, CONFLICT_LOG_DEST_TABLE);
+ bool update_relid;
+ Oid relid = InvalidOid;
values[Anum_pg_subscription_subconflictlogdest - 1] =
CStringGetTextDatum(ConflictLogDestNames[opts.logdest]);
replaces[Anum_pg_subscription_subconflictlogdest - 1] = true;
- if (want_table && !has_oldtable)
+ update_relid = AlterSubscriptionConflictLogDestination(sub, opts.logdest, &relid);
+ if (update_relid)
{
- Oid relid;
-
- relid = create_conflict_log_table(subid, sub->name);
-
- values[Anum_pg_subscription_subconflictlogrelid - 1] =
- ObjectIdGetDatum(relid);
- replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
- true;
- }
- else if (!want_table && has_oldtable)
- {
- ObjectAddress object;
-
- /*
- * Conflict log tables are recorded as internal
- * dependencies of the subscription. Drop the
- * table if it is not required anymore to avoid
- * stale or orphaned relations.
- *
- * XXX: At present, only conflict log tables are
- * managed this way. In future if we introduce
- * additional internal dependencies, we may need
- * a targeted deletion to avoid deletion of any
- * other objects.
- */
- ObjectAddressSet(object, SubscriptionRelationId,
- subid);
- performDeletion(&object, DROP_CASCADE,
- PERFORM_DELETION_INTERNAL |
- PERFORM_DELETION_SKIP_ORIGINAL);
-
values[Anum_pg_subscription_subconflictlogrelid - 1] =
- ObjectIdGetDatum(InvalidOid);
+ ObjectIdGetDatum(relid);
replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
- true;
+ true;
}
}
}
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 8953a17753e..638130b7305 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -181,6 +181,16 @@ binary_upgrade_set_next_pg_authid_oid(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+Datum
+binary_upgrade_set_next_pg_subscription_oid(PG_FUNCTION_ARGS)
+{
+ Oid subid = PG_GETARG_OID(0);
+
+ CHECK_IS_BINARY_UPGRADE;
+ binary_upgrade_next_pg_subscription_oid = subid;
+ PG_RETURN_VOID();
+}
+
Datum
binary_upgrade_create_empty_extension(PG_FUNCTION_ARGS)
{
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7df56d8b1b0..44cc9f507a4 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -1997,6 +1997,8 @@ checkExtensionMembership(DumpableObject *dobj, Archive *fout)
static void
selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout)
{
+ DumpOptions *dopt = fout->dopt;
+
/*
* DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
* and (for --clean) a DROP SCHEMA statement. (In the absence of
@@ -2026,6 +2028,32 @@ selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout)
*/
nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
}
+ else if (strcmp(nsinfo->dobj.name, "pg_conflict") == 0)
+ {
+ if (dopt->binary_upgrade)
+ {
+ /*
+ * The pg_conflict schema is a strange beast that sits in a sort of
+ * no-mans-land between being a system object and a user object.
+ * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is
+ * just a comment.
+ */
+ nsinfo->create = false;
+ nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
+ nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
+ nsinfo->dobj.dump_contains = DUMP_COMPONENT_ALL;
+
+ /*
+ * Also, make like it has a comment even if it doesn't; this is so
+ * that we'll emit a command to drop the comment, if appropriate.
+ * (Without this, we'd not call dumpCommentExtended for it.)
+ */
+ nsinfo->dobj.components |= DUMP_COMPONENT_COMMENT;
+ }
+ else
+ nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
+ DUMP_COMPONENT_NONE;
+ }
else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
strcmp(nsinfo->dobj.name, "information_schema") == 0)
{
@@ -2083,9 +2111,31 @@ selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout)
static void
selectDumpableTable(TableInfo *tbinfo, Archive *fout)
{
+ DumpOptions *dopt = fout->dopt;
+
if (checkExtensionMembership(&tbinfo->dobj, fout))
return; /* extension membership overrides all else */
+ if (strcmp(tbinfo->dobj.namespace->dobj.name, "pg_conflict") == 0)
+ {
+ if (dopt->binary_upgrade)
+ {
+ /*
+ * Dump pg_conflict tables only during binary upgrade.
+ * The schema is assumed to already exist.
+ */
+ tbinfo->dobj.dump = DUMP_COMPONENT_DEFINITION;
+
+ /*
+ * Suppress the "ALTER TABLE ... OWNER TO ..." command for this
+ * table. This prevents pg_dump from outputting the owner change.
+ */
+ tbinfo->rolname = NULL;
+ }
+ else
+ tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
+ }
+
/*
* If specific tables are being dumped, dump just those tables; else, dump
* according to the parent namespace's dump flag.
@@ -5130,6 +5180,8 @@ getSubscriptions(Archive *fout)
int i_subfailover;
int i_subretaindeadtuples;
int i_submaxretention;
+ int i_subconflictlogrelid;
+ int i_sublogdestination;
int i,
ntups;
@@ -5216,10 +5268,17 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query,
- " s.submaxretention\n");
+ " s.submaxretention,\n");
else
appendPQExpBuffer(query,
- " 0 AS submaxretention\n");
+ " 0 AS submaxretention,\n");
+
+ if (fout->remoteVersion >= 190000)
+ appendPQExpBufferStr(query,
+ " s.subconflictlogrelid, s.subconflictlogdest\n");
+ else
+ appendPQExpBufferStr(query,
+ " NULL AS subconflictlogrelid, NULL AS subconflictlogdest\n");
appendPQExpBufferStr(query,
"FROM pg_subscription s\n");
@@ -5261,6 +5320,8 @@ getSubscriptions(Archive *fout)
i_subpublications = PQfnumber(res, "subpublications");
i_suborigin = PQfnumber(res, "suborigin");
i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
+ i_subconflictlogrelid = PQfnumber(res, "subconflictlogrelid");
+ i_sublogdestination = PQfnumber(res, "subconflictlogdest");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -5309,6 +5370,30 @@ getSubscriptions(Archive *fout)
else
subinfo[i].suboriginremotelsn =
pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
+ if (PQgetisnull(res, i, i_subconflictlogrelid))
+ subinfo[i].subconflictlogrelid = InvalidOid;
+ else
+ {
+ TableInfo *tableInfo;
+
+ subinfo[i].subconflictlogrelid =
+ atooid(PQgetvalue(res, i, i_subconflictlogrelid));
+
+ if (subinfo[i].subconflictlogrelid)
+ {
+ tableInfo = findTableByOid(subinfo[i].subconflictlogrelid);
+ if (!tableInfo)
+ pg_fatal("could not find conflict log table with OID %u",
+ subinfo[i].subconflictlogrelid);
+
+ addObjectDependency(&subinfo[i].dobj, tableInfo->dobj.dumpId);
+ }
+ }
+ if (PQgetisnull(res, i, i_sublogdestination))
+ subinfo[i].subconflictlogdest = NULL;
+ else
+ subinfo[i].subconflictlogdest =
+ pg_strdup(PQgetvalue(res, i, i_sublogdestination));
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5502,6 +5587,14 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
qsubname);
+ if (dopt->binary_upgrade)
+ {
+ appendPQExpBufferStr(query, "\n-- For binary upgrade, must preserve pg_subscription.oid\n");
+ appendPQExpBuffer(query,
+ "SELECT pg_catalog.binary_upgrade_set_next_pg_subscription_oid('%u'::pg_catalog.oid);\n\n",
+ subinfo->dobj.catId.oid);
+ }
+
appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
qsubname);
appendStringLiteralAH(query, subinfo->subconninfo, fout);
@@ -5564,6 +5657,11 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ");\n");
+ appendPQExpBuffer(query,
+ "\n\nALTER SUBSCRIPTION %s SET (conflict_log_destination = %s);\n",
+ qsubname,
+ subinfo->subconflictlogdest);
+
/*
* In binary-upgrade mode, we allow the replication to continue after the
* upgrade.
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 4c4b14e5fc7..6485166f2c6 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -719,12 +719,14 @@ typedef struct _SubscriptionInfo
bool subfailover;
bool subretaindeadtuples;
int submaxretention;
+ Oid subconflictlogrelid;
char *subconninfo;
char *subslotname;
char *subsynccommit;
char *subpublications;
char *suborigin;
char *suboriginremotelsn;
+ char *subconflictlogdest;
} SubscriptionInfo;
/*
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 24bed6681de..a1d7765eb75 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -1131,6 +1131,19 @@ repairTableAttrDefMultiLoop(DumpableObject *tableobj,
addObjectDependency(attrdefobj, tableobj->dumpId);
}
+/*
+ * Because we make subscriptions depend on their conflict log tables, while
+ * there is an automatic dependency in the other direction, we need to break
+ * the loop. Remove the automatic dependency, allowing the table to be created
+ * first.
+ */
+static void
+repairSubscriptionTableLoop(DumpableObject *subobj, DumpableObject *tableobj)
+{
+ /* Remove table's dependency on subscription */
+ removeObjectDependency(tableobj, subobj->dumpId);
+}
+
/*
* CHECK, NOT NULL constraints on domains work just like those on tables ...
*/
@@ -1361,6 +1374,24 @@ repairDependencyLoop(DumpableObject **loop,
return;
}
+ /*
+ * Subscription and its Conflict Log Table
+ */
+ if (nLoop == 2 &&
+ loop[0]->objType == DO_TABLE &&
+ loop[1]->objType == DO_SUBSCRIPTION)
+ {
+ repairSubscriptionTableLoop(loop[1], loop[0]);
+ return;
+ }
+ if (nLoop == 2 &&
+ loop[0]->objType == DO_SUBSCRIPTION &&
+ loop[1]->objType == DO_TABLE)
+ {
+ repairSubscriptionTableLoop(loop[0], loop[1]);
+ return;
+ }
+
/* index on partitioned table and corresponding index on partition */
if (nLoop == 2 &&
loop[0]->objType == DO_INDEX &&
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 28812d28aa9..0df84cd5897 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -3204,9 +3204,10 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = any, streaming = on);',
+ WITH (connect = false, origin = any, streaming = on, conflict_log_destination= table);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
+ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E\n\n\n
+ \QALTER SUBSCRIPTION sub3 SET (conflict_log_destination = table);\E
/xm,
like => { %full_runs, section_post_data => 1, },
unlike => {
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 2127d297bfe..135ef658c2c 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -35,6 +35,10 @@
*
* We control all assignments of pg_database.oid because we want the directory
* names to match between the old and new cluster.
+ *
+ * We control assignment of pg_subscription.oid because we want the oid to
+ * match between the old and new cluster to make use of subscription's
+ * conflict log table which is named using the subscription oid.
*/
diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl
index 3a8c8b88976..00c4f9a9fc1 100644
--- a/src/bin/pg_upgrade/t/004_subscription.pl
+++ b/src/bin/pg_upgrade/t/004_subscription.pl
@@ -290,7 +290,7 @@ $publisher->safe_psql(
$old_sub->safe_psql(
'postgres', qq[
CREATE TABLE tab_upgraded2(id int);
- CREATE SUBSCRIPTION regress_sub5 CONNECTION '$connstr' PUBLICATION regress_pub5;
+ CREATE SUBSCRIPTION regress_sub5 CONNECTION '$connstr' PUBLICATION regress_pub5 with (conflict_log_destination = 'table');
]);
# The table tab_upgraded2 will be in the init state as the subscriber's
@@ -312,7 +312,10 @@ my $tab_upgraded1_oid = $old_sub->safe_psql('postgres',
"SELECT oid FROM pg_class WHERE relname = 'tab_upgraded1'");
my $tab_upgraded2_oid = $old_sub->safe_psql('postgres',
"SELECT oid FROM pg_class WHERE relname = 'tab_upgraded2'");
-
+my $sub5_oid = $old_sub->safe_psql('postgres',
+ "SELECT oid FROM pg_subscription where subname = 'regress_sub5'");
+my $sub_clt_relid = $old_sub->safe_psql('postgres',
+ "SELECT subconflictlogrelid FROM pg_subscription WHERE subname = 'regress_sub5'");
$old_sub->stop;
# Change configuration so that initial table sync does not get started
@@ -393,6 +396,13 @@ $result = $new_sub->safe_psql('postgres',
"SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'");
is($result, qq(t), "conflict detection slot exists");
+# The subscription oid and the subscription conflict log table relid should be preserved
+$result = $new_sub->safe_psql('postgres', "SELECT oid FROM pg_subscription WHERE subname = 'regress_sub5'");
+is($result, qq($sub5_oid), "subscription oid should have been preserved");
+
+$result = $new_sub->safe_psql('postgres', "SELECT subconflictlogrelid FROM pg_subscription WHERE subname = 'regress_sub5'");
+is($result, qq($sub_clt_relid), "subscription conflict log table relid should have been preserved");
+
# Resume the initial sync and wait until all tables of subscription
# 'regress_sub5' are synchronized
$new_sub->append_conf('postgresql.conf',
diff --git a/src/include/catalog/binary_upgrade.h b/src/include/catalog/binary_upgrade.h
index 7bf7ae44385..b15b18e7dc9 100644
--- a/src/include/catalog/binary_upgrade.h
+++ b/src/include/catalog/binary_upgrade.h
@@ -32,6 +32,7 @@ extern PGDLLIMPORT RelFileNumber binary_upgrade_next_toast_pg_class_relfilenumbe
extern PGDLLIMPORT Oid binary_upgrade_next_pg_enum_oid;
extern PGDLLIMPORT Oid binary_upgrade_next_pg_authid_oid;
+extern PGDLLIMPORT Oid binary_upgrade_next_pg_subscription_oid;
extern PGDLLIMPORT bool binary_upgrade_record_init_privs;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d99f8500ac5..8789d03261c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11840,6 +11840,10 @@
proname => 'binary_upgrade_create_conflict_detection_slot', proisstrict => 'f',
provolatile => 'v', proparallel => 'u', prorettype => 'void',
proargtypes => '', prosrc => 'binary_upgrade_create_conflict_detection_slot' },
+{ oid => '8407', descr => 'for use by pg_upgrade',
+ proname => 'binary_upgrade_set_next_pg_subscription_oid', provolatile => 'v',
+ proparallel => 'r', prorettype => 'void', proargtypes => 'oid',
+ prosrc => 'binary_upgrade_set_next_pg_subscription_oid' },
# conversion functions
{ oid => '4302',
diff --git a/src/test/modules/spgist_name_ops/expected/spgist_name_ops.out b/src/test/modules/spgist_name_ops/expected/spgist_name_ops.out
index 1ee65ede243..39d43368c42 100644
--- a/src/test/modules/spgist_name_ops/expected/spgist_name_ops.out
+++ b/src/test/modules/spgist_name_ops/expected/spgist_name_ops.out
@@ -59,11 +59,12 @@ select * from t
binary_upgrade_set_next_multirange_pg_type_oid | 1 | binary_upgrade_set_next_multirange_pg_type_oid
binary_upgrade_set_next_pg_authid_oid | | binary_upgrade_set_next_pg_authid_oid
binary_upgrade_set_next_pg_enum_oid | | binary_upgrade_set_next_pg_enum_oid
+ binary_upgrade_set_next_pg_subscription_oid | | binary_upgrade_set_next_pg_subscription_oid
binary_upgrade_set_next_pg_tablespace_oid | | binary_upgrade_set_next_pg_tablespace_oid
binary_upgrade_set_next_pg_type_oid | | binary_upgrade_set_next_pg_type_oid
binary_upgrade_set_next_toast_pg_class_oid | 1 | binary_upgrade_set_next_toast_pg_class_oid
binary_upgrade_set_next_toast_relfilenode | | binary_upgrade_set_next_toast_relfilenode
-(13 rows)
+(14 rows)
-- Verify clean failure when INCLUDE'd columns result in overlength tuple
-- The error message details are platform-dependent, so show only SQLSTATE
@@ -108,11 +109,12 @@ select * from t
binary_upgrade_set_next_multirange_pg_type_oid | 1 | binary_upgrade_set_next_multirange_pg_type_oid
binary_upgrade_set_next_pg_authid_oid | | binary_upgrade_set_next_pg_authid_oid
binary_upgrade_set_next_pg_enum_oid | | binary_upgrade_set_next_pg_enum_oid
+ binary_upgrade_set_next_pg_subscription_oid | | binary_upgrade_set_next_pg_subscription_oid
binary_upgrade_set_next_pg_tablespace_oid | | binary_upgrade_set_next_pg_tablespace_oid
binary_upgrade_set_next_pg_type_oid | | binary_upgrade_set_next_pg_type_oid
binary_upgrade_set_next_toast_pg_class_oid | 1 | binary_upgrade_set_next_toast_pg_class_oid
binary_upgrade_set_next_toast_relfilenode | | binary_upgrade_set_next_toast_relfilenode
-(13 rows)
+(14 rows)
\set VERBOSITY sqlstate
insert into t values(repeat('xyzzy', 12), 42, repeat('xyzzy', 4000));
--
2.43.0
[text/x-patch] v19-0006-Allow-combined-conflict_log_destination-settings.patch (58.0K, 7-v19-0006-Allow-combined-conflict_log_destination-settings.patch)
download | inline diff:
From bf7ec689bd2be18a831d09284113c82220086ed3 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 5 Jan 2026 17:34:30 +0530
Subject: [PATCH v19 6/6] Allow combined conflict_log_destination settings
Extend conflict_log_destination handling to support combined destination
specifications. Previously, only log, table, or all were accepted. This change
allows combinations of them like log, table and all, log, table etc
---
src/backend/catalog/pg_subscription.c | 2 +-
src/backend/commands/subscriptioncmds.c | 92 +++++++++++++++-------
src/backend/replication/logical/conflict.c | 4 +-
src/bin/pg_dump/pg_dump.c | 44 ++++++++---
src/bin/pg_dump/t/002_pg_dump.pl | 4 +-
src/include/catalog/pg_subscription.h | 4 +-
src/include/commands/subscriptioncmds.h | 5 +-
src/include/replication/conflict.h | 9 ---
src/test/regress/expected/subscription.out | 72 +++++++++--------
src/test/regress/sql/subscription.sql | 11 ++-
10 files changed, 157 insertions(+), 90 deletions(-)
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 1a93824504c..a7028d05506 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -147,7 +147,7 @@ GetSubscription(Oid subid, bool missing_ok)
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
tup,
Anum_pg_subscription_subconflictlogdest);
- sub->conflictlogdest = TextDatumGetCString(datum);
+ sub->conflictlogdest = textarray_to_stringlist(DatumGetArrayTypeP(datum));
/* Is the subscription owner a superuser? */
sub->ownersuperuser = superuser_arg(sub->owner);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6c865dacfd7..8db919405b6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -60,6 +60,7 @@
#include "utils/pg_lsn.h"
#include "utils/regproc.h"
#include "utils/syscache.h"
+#include "utils/varlena.h"
/*
* Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
@@ -85,9 +86,6 @@
#define SUBOPT_ORIGIN 0x00020000
#define SUBOPT_CONFLICT_LOG_DEST 0x00040000
-/* check if the 'val' has 'bits' set */
-#define IsSet(val, bits) (((val) & (bits)) == (bits))
-
/*
* This will be set by the pg_upgrade_support function --
* binary_upgrade_set_next_pg_subscription_oid().
@@ -422,15 +420,21 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST) &&
strcmp(defel->defname, "conflict_log_destination") == 0)
{
- char *val;
- ConflictLogDest dest;
+ char *val;
+ List *dest;
if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_DEST))
errorConflictingDefElem(defel, pstate);
val = defGetString(defel);
- dest = GetLogDestination(val);
- opts->logdest = dest;
+ if (!SplitIdentifierString(val, ',', &dest))
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid list syntax in parameter \"%s\"",
+ "conflict_log_destination"));
+
+ opts->logdest = GetLogDestination(dest, false);
+
opts->specified_opts |= SUBOPT_CONFLICT_LOG_DEST;
}
else
@@ -610,6 +614,30 @@ publicationListToArray(List *publist)
return PointerGetDatum(arr);
}
+/*
+ * Build a text[] array representing the conflict_log_destination flags.
+ */
+static Datum
+ConflictLogDestFlagsToArray(ConflictLogDest logdest)
+{
+ Datum datums[3];
+ int ndatums = 0;
+
+ if (IsSet(logdest, CONFLICT_LOG_DEST_ALL))
+ datums[ndatums++] = CStringGetTextDatum("all");
+ else
+ {
+ if (IsSet(logdest, CONFLICT_LOG_DEST_LOG))
+ datums[ndatums++] = CStringGetTextDatum("log");
+
+ if (IsSet(logdest, CONFLICT_LOG_DEST_TABLE))
+ datums[ndatums++] = CStringGetTextDatum("table");
+ }
+
+ return PointerGetDatum(
+ construct_array_builtin(datums, ndatums, TEXTOID));
+}
+
/*
* Create new subscription.
*/
@@ -794,14 +822,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
/* Always set the destination, default will be 'log'. */
values[Anum_pg_subscription_subconflictlogdest - 1] =
- CStringGetTextDatum(ConflictLogDestNames[opts.logdest]);
+ ConflictLogDestFlagsToArray(opts.logdest);
/*
* If logging to a table is required, physically create the logging
* relation and store its OID in the catalog.
*/
- if (opts.logdest == CONFLICT_LOG_DEST_TABLE ||
- opts.logdest == CONFLICT_LOG_DEST_ALL)
+ if (IsSet(opts.logdest, CONFLICT_LOG_DEST_TABLE))
{
Oid logrelid;
@@ -1425,7 +1452,7 @@ AlterSubscriptionConflictLogDestination(Subscription *sub,
ConflictLogDest logdest,
Oid *conflicttablerelid)
{
- ConflictLogDest old_dest = GetLogDestination(sub->conflictlogdest);
+ ConflictLogDest old_dest = GetLogDestination(sub->conflictlogdest, true);
bool want_table;
bool has_oldtable;
bool update_relid = false;
@@ -1817,7 +1844,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DEST))
{
ConflictLogDest old_dest =
- GetLogDestination(sub->conflictlogdest);
+ GetLogDestination(sub->conflictlogdest, true);
if (opts.logdest != old_dest)
{
@@ -1825,7 +1852,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
Oid relid = InvalidOid;
values[Anum_pg_subscription_subconflictlogdest - 1] =
- CStringGetTextDatum(ConflictLogDestNames[opts.logdest]);
+ ConflictLogDestFlagsToArray(opts.logdest);
replaces[Anum_pg_subscription_subconflictlogdest - 1] = true;
update_relid = AlterSubscriptionConflictLogDestination(sub, opts.logdest, &relid);
@@ -3477,27 +3504,38 @@ create_conflict_log_table(Oid subid, char *subname)
/*
* GetLogDestination
*
- * Convert string to enum by comparing against standardized labels.
+ * Convert log destination List of strings to enums.
*/
ConflictLogDest
-GetLogDestination(const char *dest)
+GetLogDestination(List *destlist, bool strnodelist)
{
- /* Empty string or NULL defaults to LOG. */
- if (dest == NULL || dest[0] == '\0' || pg_strcasecmp(dest, "log") == 0)
+ ConflictLogDest logdest = 0;
+ ListCell *cell;
+
+ if (destlist == NULL)
return CONFLICT_LOG_DEST_LOG;
- if (pg_strcasecmp(dest,
- ConflictLogDestNames[CONFLICT_LOG_DEST_TABLE]) == 0)
- return CONFLICT_LOG_DEST_TABLE;
+ foreach(cell, destlist)
+ {
+ char *name;
- if (pg_strcasecmp(dest, ConflictLogDestNames[CONFLICT_LOG_DEST_ALL]) == 0)
- return CONFLICT_LOG_DEST_ALL;
+ name = (strnodelist) ? strVal(lfirst(cell)) : (char *) lfirst(cell);
- /* Unrecognized string. */
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("unrecognized conflict_log_destination value: \"%s\"", dest),
- errhint("Valid values are \"log\", \"table\", and \"all\".")));
+ if (pg_strcasecmp(name, "log") == 0)
+ logdest |= CONFLICT_LOG_DEST_LOG;
+ else if (pg_strcasecmp(name, "table") == 0)
+ logdest |= CONFLICT_LOG_DEST_TABLE;
+ else if (pg_strcasecmp(name, "all") == 0)
+ logdest |= CONFLICT_LOG_DEST_ALL;
+ else
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized value for subscription parameter \"%s\": \"%s\"",
+ "conflict_log_destination", name),
+ errhint("Valid values are \"log\", \"table\", and \"all\"."));
+ }
+
+ return logdest;
}
/*
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 6fce652dbcb..b9365c19975 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -181,7 +181,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
pgstat_report_subscription_conflict(MySubscription->oid, type);
/* Decide what detail to show in server logs. */
- if ((dest & CONFLICT_LOG_DEST_LOG) != 0)
+ if (IsSet(dest, CONFLICT_LOG_DEST_LOG) || IsSet(dest, CONFLICT_LOG_DEST_ALL))
{
StringInfoData err_detail;
@@ -271,7 +271,7 @@ GetConflictLogTableInfo(ConflictLogDest *log_dest)
* Convert the text log destination to the internal enum. MySubscription
* already contains the data from pg_subscription.
*/
- *log_dest = GetLogDestination(MySubscription->conflictlogdest);
+ *log_dest = GetLogDestination(MySubscription->conflictlogdest, true);
conflictlogrelid = MySubscription->conflictlogrelid;
/* If destination is 'log' only, no table to open. */
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 44cc9f507a4..fc85a4beee4 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5569,10 +5569,10 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
DumpOptions *dopt = fout->dopt;
PQExpBuffer delq;
PQExpBuffer query;
- PQExpBuffer publications;
+ PQExpBuffer namebuf;
char *qsubname;
- char **pubnames = NULL;
- int npubnames = 0;
+ char **names = NULL;
+ int nnames = 0;
int i;
/* Do nothing if not dumping schema */
@@ -5600,19 +5600,22 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendStringLiteralAH(query, subinfo->subconninfo, fout);
/* Build list of quoted publications and append them to query. */
- if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
+ if (!parsePGArray(subinfo->subpublications, &names, &nnames))
pg_fatal("could not parse %s array", "subpublications");
- publications = createPQExpBuffer();
- for (i = 0; i < npubnames; i++)
+ namebuf = createPQExpBuffer();
+ for (i = 0; i < nnames; i++)
{
if (i > 0)
- appendPQExpBufferStr(publications, ", ");
+ appendPQExpBufferStr(namebuf, ", ");
- appendPQExpBufferStr(publications, fmtId(pubnames[i]));
+ appendPQExpBufferStr(namebuf, fmtId(names[i]));
}
- appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", publications->data);
+ appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", namebuf->data);
+ resetPQExpBuffer(namebuf);
+ free(names);
+
if (subinfo->subslotname)
appendStringLiteralAH(query, subinfo->subslotname, fout);
else
@@ -5657,10 +5660,25 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ");\n");
+ /*
+ * Build list of quoted conflict log destinations and append them to
+ * query.
+ */
+ if (!parsePGArray(subinfo->subconflictlogdest, &names, &nnames))
+ pg_fatal("could not parse %s array", "conflict_log_destination");
+
+ for (i = 0; i < nnames; i++)
+ {
+ if (i > 0)
+ appendPQExpBufferStr(namebuf, ", ");
+
+ appendPQExpBuffer(namebuf, "%s", names[i]);
+ }
+
appendPQExpBuffer(query,
- "\n\nALTER SUBSCRIPTION %s SET (conflict_log_destination = %s);\n",
+ "\n\nALTER SUBSCRIPTION %s SET (conflict_log_destination = '%s');\n",
qsubname,
- subinfo->subconflictlogdest);
+ namebuf->data);
/*
* In binary-upgrade mode, we allow the replication to continue after the
@@ -5718,8 +5736,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
NULL, subinfo->rolname,
subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
- destroyPQExpBuffer(publications);
- free(pubnames);
+ destroyPQExpBuffer(namebuf);
+ free(names);
destroyPQExpBuffer(delq);
destroyPQExpBuffer(query);
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 0df84cd5897..e06ac55db47 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -3204,10 +3204,10 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = any, streaming = on, conflict_log_destination= table);',
+ WITH (connect = false, origin = any, streaming = on, conflict_log_destination= \'log,table\');',
regexp => qr/^
\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E\n\n\n
- \QALTER SUBSCRIPTION sub3 SET (conflict_log_destination = table);\E
+ \QALTER SUBSCRIPTION sub3 SET (conflict_log_destination = 'all');\E
/xm,
like => { %full_runs, section_post_data => 1, },
unlike => {
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 3d220b2db8a..92a742437c2 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -110,7 +110,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
* 'table' - internal table only,
* 'all' - both log and table.
*/
- text subconflictlogdest;
+ text subconflictlogdest[1] BKI_FORCE_NULL;
/* Only publish data originating from the specified origin */
text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
@@ -169,7 +169,7 @@ typedef struct Subscription
List *publications; /* List of publication names to subscribe to */
char *origin; /* Only publish data originating from the
* specified origin */
- char *conflictlogdest; /* Conflict log destination */
+ List *conflictlogdest; /* Conflict log destination */
} Subscription;
#ifdef EXPOSE_TO_CLIENT_CODE
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index bc4a92af356..b977deef04e 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -19,6 +19,9 @@
#include "parser/parse_node.h"
#include "replication/conflict.h"
+/* check if the 'val' has 'bits' set */
+#define IsSet(val, bits) (((val) & (bits)) == (bits))
+
extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
bool isTopLevel);
extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel);
@@ -37,7 +40,7 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
bool retention_active,
bool max_retention_set);
-extern ConflictLogDest GetLogDestination(const char *dest);
+extern ConflictLogDest GetLogDestination(List *destlist, bool strnodelist);
extern bool IsConflictLogTable(Oid relid);
#endif /* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index b60c0b03e26..3b5092d6584 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -100,15 +100,6 @@ typedef enum ConflictLogDest
CONFLICT_LOG_DEST_ALL = (CONFLICT_LOG_DEST_LOG | CONFLICT_LOG_DEST_TABLE)
} ConflictLogDest;
-/*
- * Array mapping for converting internal enum to string.
- */
-static const char *const ConflictLogDestNames[] = {
- [CONFLICT_LOG_DEST_LOG] = "log",
- [CONFLICT_LOG_DEST_TABLE] = "table",
- [CONFLICT_LOG_DEST_ALL] = "all"
-};
-
/* Structure to hold metadata for one column of the conflict log table */
typedef struct ConflictLogColumnDef
{
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 84abbaa5a4a..8d9d7dfc131 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -119,7 +119,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -148,7 +148,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 | log
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 | {log}
(1 row)
-- ok - with lsn = NONE
@@ -191,7 +191,7 @@ ERROR: invalid WAL location (LSN): 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | {log}
(1 row)
BEGIN;
@@ -226,7 +226,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off.
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 | log
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 | {log}
(1 row)
-- rename back to keep the rest simple
@@ -258,7 +258,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -282,7 +282,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
@@ -290,7 +290,7 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
@@ -299,7 +299,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
-- fail - publication already exists
@@ -317,7 +317,7 @@ ERROR: publication "testpub1" is already in subscription "regress_testsub"
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
-- fail - publication used more than once
@@ -335,7 +335,7 @@ ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (ref
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -374,7 +374,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
-- we can alter streaming when two_phase enabled
@@ -383,7 +383,7 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -396,7 +396,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -412,7 +412,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -436,7 +436,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -453,7 +453,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
-- ok
@@ -462,7 +462,7 @@ ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log destination
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | log
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | {log}
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -523,7 +523,7 @@ DROP SUBSCRIPTION regress_testsub;
SET SESSION AUTHORIZATION 'regress_subscription_user';
-- fail - unrecognized parameter value
CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid');
-ERROR: unrecognized conflict_log_destination value: "invalid"
+ERROR: unrecognized value for subscription parameter "conflict_log_destination": "invalid"
HINT: Valid values are "log", "table", and "all".
-- verify subconflictlogdest is 'log' and relid is 0 (InvalidOid) for default case
CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
@@ -533,7 +533,7 @@ SELECT subname, subconflictlogdest, subconflictlogrelid
FROM pg_subscription WHERE subname = 'regress_conflict_log_default';
subname | subconflictlogdest | subconflictlogrelid
------------------------------+--------------------+---------------------
- regress_conflict_log_default | log | 0
+ regress_conflict_log_default | {log} | 0
(1 row)
-- verify empty string defaults to 'log'
@@ -544,11 +544,11 @@ SELECT subname, subconflictlogdest, subconflictlogrelid
FROM pg_subscription WHERE subname = 'regress_conflict_empty_str';
subname | subconflictlogdest | subconflictlogrelid
----------------------------+--------------------+---------------------
- regress_conflict_empty_str | log | 0
+ regress_conflict_empty_str | {log} | 0
(1 row)
-- this should generate an internal table named pg_conflict_$subid$
-CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table');
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log, table');
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
-- check metadata in pg_subscription: destination should be 'table' and relid valid
@@ -556,7 +556,7 @@ SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
FROM pg_subscription WHERE subname = 'regress_conflict_test1';
subname | subconflictlogdest | has_relid
------------------------+--------------------+-----------
- regress_conflict_test1 | table | t
+ regress_conflict_test1 | {all} | t
(1 row)
-- verify the physical table exists and its OID matches subconflictlogrelid
@@ -586,18 +586,28 @@ WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0;
-- These tests verify the transition logic between different logging
-- destinations, ensuring internal tables are created or dropped as expected.
--
--- transition from 'log' to 'all'
+-- transition from 'log' to 'log, table'
-- a new internal conflict log table should be created
CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log');
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log, table');
+-- verify metadata after ALTER (destination should be 'log, table')
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+ subname | subconflictlogdest | has_relid
+------------------------+--------------------+-----------
+ regress_conflict_test2 | {all} | t
+(1 row)
+
+-- transition from 'log, table' to 'all'
ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all');
-- verify metadata after ALTER (destination should be 'all')
SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
FROM pg_subscription WHERE subname = 'regress_conflict_test2';
subname | subconflictlogdest | has_relid
------------------------+--------------------+-----------
- regress_conflict_test2 | all | t
+ regress_conflict_test2 | {all} | t
(1 row)
-- transition from 'all' to 'table'
@@ -608,7 +618,7 @@ SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged
FROM pg_subscription WHERE subname = 'regress_conflict_test2';
subconflictlogdest | relid_unchanged
--------------------+-----------------
- table | t
+ {table} | t
(1 row)
-- transition from 'table' to 'log'
@@ -618,7 +628,7 @@ SELECT subconflictlogdest, subconflictlogrelid
FROM pg_subscription WHERE subname = 'regress_conflict_test2';
subconflictlogdest | subconflictlogrelid
--------------------+---------------------
- log | 0
+ {log} | 0
(1 row)
-- verify the physical table is gone
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 83befa8722c..4ee96d4fcf2 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -385,7 +385,7 @@ SELECT subname, subconflictlogdest, subconflictlogrelid
FROM pg_subscription WHERE subname = 'regress_conflict_empty_str';
-- this should generate an internal table named pg_conflict_$subid$
-CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table');
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log, table');
-- check metadata in pg_subscription: destination should be 'table' and relid valid
SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
@@ -411,9 +411,16 @@ WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0;
-- destinations, ensuring internal tables are created or dropped as expected.
--
--- transition from 'log' to 'all'
+-- transition from 'log' to 'log, table'
-- a new internal conflict log table should be created
CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log');
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log, table');
+
+-- verify metadata after ALTER (destination should be 'log, table')
+SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- transition from 'log, table' to 'all'
ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all');
-- verify metadata after ALTER (destination should be 'all')
--
2.43.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: pgsql-hackers@postgresql.org
Cc: vignesh21@gmail.com, shveta.malik@gmail.com, dilipbalaut@gmail.com, amit.kapila16@gmail.com, sawada.mshk@gmail.com, bharath.rupireddyforpostgres@gmail.com, pgsql-hackers@lists.postgresql.org
Subject: Re: Proposal: Conflict log history table for Logical Replication
In-Reply-To: <CALDaNm2YOOdJ25X1sJ+DYz37K6Qi4g0ZNFHb_pQMF9UqancnEA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox