v13-0002-pg_dump-dump-conflict-log-table-configuration-fo.patch
text/x-patch
Filename: v13-0002-pg_dump-dump-conflict-log-table-configuration-fo.patch
Type: text/x-patch
Part: 1
From 0676b9c1b425ea069a6c38816636034fc090662f Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Tue, 16 Dec 2025 09:16:26 +0530
Subject: [PATCH v13 2/2] pg_dump: dump conflict log table configuration for
subscriptions
Allow pg_dump to preserve the conflict_log_table setting of logical
replication subscriptions.
---
src/backend/commands/subscriptioncmds.c | 245 ++++++++++++++-------
src/bin/pg_dump/pg_dump.c | 49 ++++-
src/bin/pg_dump/pg_dump.h | 1 +
src/bin/pg_dump/pg_dump_sort.c | 32 +++
src/bin/pg_dump/t/002_pg_dump.pl | 5 +-
src/test/regress/expected/subscription.out | 15 +-
src/test/regress/sql/subscription.sql | 8 +
7 files changed, 265 insertions(+), 90 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b044ed70a2a..569c1a5a76b 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1242,6 +1242,148 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
table_close(rel, NoLock);
}
+/*
+ * AlterSubscriptionConflictLogTable
+ *
+ * Set, change, or remove the conflict log table associated with a
+ * subscription.
+ *
+ * If a conflict log table name is provided, this function validates the
+ * specified relation (or creates it if it does not exist) and records it
+ * as an internal dependency of the subscription. The table must be a
+ * permanent relation in a non temporary schema and must match the expected
+ * conflict log table definition.
+ *
+ * If the subscription already uses the specified conflict log table and the
+ * table still exists, no change is made and a NOTICE is emitted.
+ *
+ * Any previously associated conflict log table is removed by dropping the
+ * subscription's internal dependencies before associating a new table.
+ *
+ * Returns:
+ * NULL when the association has been removed.
+ * else conflict log table associated with the subscription.
+ */
+static char *
+AlterSubscriptionConflictLogTable(Oid subid, char *conflictlogtable,
+ Oid *relnamespaceid)
+{
+ Oid nspid = InvalidOid;
+ Oid old_nspid = InvalidOid;
+ char *old_relname = NULL;
+ char *relname = NULL;
+ List *names = NIL;
+ char *nspname;
+ ObjectAddress object;
+
+ if (conflictlogtable != NULL)
+ {
+ /* Explicitly check for empty string before any processing. */
+ if (conflictlogtable[0] == '\0')
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("conflict log table name cannot be empty"),
+ errhint("Provide a valid table name or omit the parameter.")));
+
+ names = stringToQualifiedNameList(conflictlogtable, NULL);
+ nspid = QualifiedNameGetCreationNamespace(names, &relname);
+ nspname = get_namespace_name(nspid);
+ }
+
+ /* Fetch the existing conflict table information. */
+ old_relname = get_subscription_conflict_log_table(subid, &old_nspid);
+
+ /*
+ * If the subscription already uses this conflict log table and it exists,
+ * just issue a notice.
+ */
+ if (old_relname != NULL && relname != NULL
+ && (strcmp(old_relname, relname) == 0) &&
+ old_nspid == nspid &&
+ OidIsValid(get_relname_relid(old_relname, old_nspid)))
+ {
+ ereport(NOTICE,
+ errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription",
+ nspname, relname));
+ }
+ else
+ {
+ /*
+ * Conflict log tables are recorded as internal dependencies of the
+ * subscription. Before associating a new table, drop the existing
+ * table to avoid stale or orphaned relations.
+ *
+ * XXX: At present, only conflict log tables are managed this way. In
+ * future if we introduce additional internal dependencies, we may
+ * need a targeted deletion to avoid deletion of any other objects.
+ */
+ ObjectAddressSet(object, SubscriptionRelationId, subid);
+ performDeletion(&object, DROP_CASCADE,
+ PERFORM_DELETION_INTERNAL |
+ PERFORM_DELETION_SKIP_ORIGINAL);
+
+ /* Need to create a new table if a new name was provided. */
+ if (relname != NULL)
+ {
+ Oid conflictlogrelid = get_relname_relid(relname, nspid);
+
+ if (OidIsValid(conflictlogrelid))
+ {
+ Relation conflictlogrel;
+
+ /*
+ * Conflict log tables must be permanent relations. Disallow
+ * in temporary namespaces to ensure the same.
+ */
+ if (isTempNamespace(nspid))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot use conflict log table \"%s.%s\" of a temporary namespace",
+ nspname, relname),
+ errhint("Specify table from a permanent schema."));
+
+ conflictlogrel = table_open(conflictlogrelid,
+ RowExclusiveLock);
+
+ if (conflictlogrel->rd_rel->relpersistence != RELPERSISTENCE_PERMANENT)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("conflict log table \"%s.%s\" must be a permanent table",
+ nspname, relname),
+ errhint("Specify a permanent table as the conflict log table."));
+
+ if (IsConflictLogTable(conflictlogrelid))
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("conflict log table \"%s.%s\" cannot be used",
+ nspname, relname),
+ errdetail("The specified table is already registered for a different subscription."),
+ errhint("Specify a different conflict log table."));
+
+ if (!ValidateConflictLogTable(conflictlogrel))
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("conflict log table \"%s.%s\" has an incompatible definition",
+ nspname, relname),
+ errdetail("The table does not match the required conflict log table structure."),
+ errhint("Create the conflict log table with the expected definition or specify a different table."));
+
+ table_close(conflictlogrel, NoLock);
+
+ }
+ else
+ create_conflict_log_table(nspid, relname, subid);
+ }
+ }
+
+ pfree(nspname);
+ if (old_relname != NULL)
+ pfree(old_relname);
+
+ *relnamespaceid = nspid;
+ return relname;
+}
+
/*
* Marks all sequences with INIT state.
*/
@@ -1727,92 +1869,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_TABLE))
{
- Oid nspid = InvalidOid;
- Oid old_nspid = InvalidOid;
- char *old_relname = NULL;
- char *relname = NULL;
- List *names = NIL;
-
- if (opts.conflictlogtable != NULL)
- {
- /* Explicitly check for empty string before any processing. */
- if (opts.conflictlogtable[0] == '\0')
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("conflict log table name cannot be empty"),
- errhint("Provide a valid table name or omit the parameter.")));
-
- names = stringToQualifiedNameList(opts.conflictlogtable,
- NULL);
- nspid = QualifiedNameGetCreationNamespace(names, &relname);
- }
-
- /* Fetch the existing conflict table information. */
- old_relname =
- get_subscription_conflict_log_table(subid, &old_nspid);
-
- /*
- * If the subscription already uses this conflict log table
- * and it exists, just issue a notice.
- */
- if (old_relname != NULL && relname != NULL
- && (strcmp(old_relname, relname) == 0) &&
- old_nspid == nspid &&
- OidIsValid(get_relname_relid(old_relname, old_nspid)))
- {
- char *nspname = get_namespace_name(nspid);
-
- ereport(NOTICE,
- (errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription",
- nspname, relname)));
- pfree(nspname);
- }
+ char *relname;
+ Oid nspid;
+ char *conftable = opts.conflictlogtable;
+
+ relname = AlterSubscriptionConflictLogTable(subid,
+ conftable,
+ &nspid);
+ values[Anum_pg_subscription_subconflictlognspid - 1] =
+ ObjectIdGetDatum(nspid);
+
+ if (relname != NULL)
+ values[Anum_pg_subscription_subconflictlogtable - 1] =
+ CStringGetTextDatum(relname);
else
- {
- ObjectAddress object;
-
- /*
- * Conflict log tables are recorded as internal
- * dependencies of the subscription. Before
- * associating a new table, drop the existing table to
- * avoid stale or orphaned relations.
- *
- * XXX: At present, only conflict log tables are
- * managed this way. In future if we introduce
- * additional internal dependencies, we may need
- * a targeted deletion to avoid deletion of any
- * other objects.
- */
- ObjectAddressSet(object, SubscriptionRelationId, subid);
- performDeletion(&object, DROP_CASCADE,
- PERFORM_DELETION_INTERNAL |
- PERFORM_DELETION_SKIP_ORIGINAL);
-
- /*
- * Need to create a new table if a new name was
- * provided.
- */
- if (relname != NULL)
- create_conflict_log_table(nspid, relname, subid);
-
- values[Anum_pg_subscription_subconflictlognspid - 1] =
- ObjectIdGetDatum(nspid);
-
- if (relname != NULL)
- values[Anum_pg_subscription_subconflictlogtable - 1] =
- CStringGetTextDatum(relname);
- else
- nulls[Anum_pg_subscription_subconflictlogtable - 1] =
- true;
-
- replaces[Anum_pg_subscription_subconflictlognspid - 1] =
- true;
- replaces[Anum_pg_subscription_subconflictlogtable - 1] =
- true;
- }
+ nulls[Anum_pg_subscription_subconflictlogtable - 1] =
+ true;
- if (old_relname != NULL)
- pfree(old_relname);
+ replaces[Anum_pg_subscription_subconflictlognspid - 1] =
+ true;
+ replaces[Anum_pg_subscription_subconflictlogtable - 1] =
+ true;
}
update_tuple = true;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 27f6be3f0f8..f85263b8e12 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5130,6 +5130,7 @@ getSubscriptions(Archive *fout)
int i_subfailover;
int i_subretaindeadtuples;
int i_submaxretention;
+ int i_subconflictlogrelid;
int i,
ntups;
@@ -5216,10 +5217,17 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query,
- " s.submaxretention\n");
+ " s.submaxretention,\n");
else
appendPQExpBuffer(query,
- " 0 AS submaxretention\n");
+ " 0 AS submaxretention,\n");
+
+ if (fout->remoteVersion >= 190000)
+ appendPQExpBufferStr(query,
+ " c.oid AS subconflictlogrelid\n");
+ else
+ appendPQExpBufferStr(query,
+ " 0::oid AS subconflictlogrelid\n");
appendPQExpBufferStr(query,
"FROM pg_subscription s\n");
@@ -5229,6 +5237,12 @@ getSubscriptions(Archive *fout)
"LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
" ON o.external_id = 'pg_' || s.oid::text \n");
+ if (fout->remoteVersion >= 190000)
+ appendPQExpBufferStr(query,
+ "LEFT JOIN pg_class c ON c.relname = s.subconflictlogtable\n"
+ "LEFT JOIN pg_namespace n \n"
+ " ON n.oid = c.relnamespace AND n.oid = s.subconflictlognspid\n");
+
appendPQExpBufferStr(query,
"WHERE s.subdbid = (SELECT oid FROM pg_database\n"
" WHERE datname = current_database())");
@@ -5255,6 +5269,7 @@ getSubscriptions(Archive *fout)
i_subfailover = PQfnumber(res, "subfailover");
i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples");
i_submaxretention = PQfnumber(res, "submaxretention");
+ i_subconflictlogrelid = PQfnumber(res, "subconflictlogrelid");
i_subconninfo = PQfnumber(res, "subconninfo");
i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -5292,6 +5307,19 @@ getSubscriptions(Archive *fout)
(strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0);
subinfo[i].submaxretention =
atoi(PQgetvalue(res, i, i_submaxretention));
+ subinfo[i].subconflictlogrelid =
+ atooid(PQgetvalue(res, i, i_subconflictlogrelid));
+
+ if (subinfo[i].subconflictlogrelid != InvalidOid)
+ {
+ TableInfo *tableInfo = findTableByOid(subinfo[i].subconflictlogrelid);
+
+ if (!tableInfo)
+ pg_fatal("could not find conflict log table with OID %u",
+ subinfo[i].subconflictlogrelid);
+
+ addObjectDependency(&subinfo[i].dobj, tableInfo->dobj.dumpId);
+ }
subinfo[i].subconninfo =
pg_strdup(PQgetvalue(res, i, i_subconninfo));
if (PQgetisnull(res, i, i_subslotname))
@@ -5564,6 +5592,23 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ");\n");
+ if (subinfo->subconflictlogrelid != InvalidOid)
+ {
+ PQExpBuffer conflictlogbuf = createPQExpBuffer();
+ TableInfo *tbinfo = findTableByOid(subinfo->subconflictlogrelid);
+
+ appendStringLiteralAH(conflictlogbuf,
+ fmtQualifiedDumpable(tbinfo),
+ fout);
+
+ appendPQExpBuffer(query,
+ "\n\nALTER SUBSCRIPTION %s SET (conflict_log_table = %s);\n",
+ qsubname,
+ conflictlogbuf->data);
+
+ destroyPQExpBuffer(conflictlogbuf);
+ }
+
/*
* In binary-upgrade mode, we allow the replication to continue after the
* upgrade.
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 72a00e1bc20..20ffae491eb 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -719,6 +719,7 @@ typedef struct _SubscriptionInfo
bool subfailover;
bool subretaindeadtuples;
int submaxretention;
+ Oid subconflictlogrelid;
char *subconninfo;
char *subslotname;
char *subsynccommit;
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index e2a4df4cf4b..f0bda51b993 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -1131,6 +1131,19 @@ repairTableAttrDefMultiLoop(DumpableObject *tableobj,
addObjectDependency(attrdefobj, tableobj->dumpId);
}
+/*
+ * Because we make subscriptions depend on their conflict log tables, while
+ * there is an automatic dependency in the other direction, we need to break
+ * the loop. Remove the automatic dependency, allowing the table to be created
+ * first.
+ */
+static void
+repairSubscriptionTableLoop(DumpableObject *subobj, DumpableObject *tableobj)
+{
+ /* Remove table's dependency on subscription */
+ removeObjectDependency(tableobj, subobj->dumpId);
+}
+
/*
* CHECK, NOT NULL constraints on domains work just like those on tables ...
*/
@@ -1361,6 +1374,25 @@ repairDependencyLoop(DumpableObject **loop,
return;
}
+ /*
+ * Subscription and its Conflict Log Table
+ */
+ if (nLoop == 2 &&
+ loop[0]->objType == DO_TABLE &&
+ loop[1]->objType == DO_SUBSCRIPTION)
+ {
+ repairSubscriptionTableLoop(loop[1], loop[0]);
+ return;
+ }
+
+ if (nLoop == 2 &&
+ loop[0]->objType == DO_SUBSCRIPTION &&
+ loop[1]->objType == DO_TABLE)
+ {
+ repairSubscriptionTableLoop(loop[0], loop[1]);
+ return;
+ }
+
/* index on partitioned table and corresponding index on partition */
if (nLoop == 2 &&
loop[0]->objType == DO_INDEX &&
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index e33aa95f6ff..ef11db6b8ee 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -3204,9 +3204,10 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = any, streaming = on);',
+ WITH (connect = false, origin = any, streaming = on, conflict_log_table = \'conflict\');',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
+ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E\n\n\n
+ \QALTER SUBSCRIPTION sub3 SET (conflict_log_table = 'public.conflict');\E
/xm,
like => { %full_runs, section_post_data => 1, },
unlike => {
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index f96687e107c..665a8fcc0aa 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -630,8 +630,19 @@ SELECT * FROM pg_publication_tables WHERE pubname = 'pub';
DROP PUBLICATION pub;
-- fail - set conflict_log_table to one already used by a different subscription
ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
-ERROR: cannot create conflict log table "public.regress_conflict_log1" because a table with that name already exists
-HINT: Use a different name for the conflict log table or drop the existing table.
+ERROR: conflict log table "public.regress_conflict_log1" cannot be used
+DETAIL: The specified table is already registered for a different subscription.
+HINT: Specify a different conflict log table.
+-- fail - conflict log table must be a permanent relation (UNLOGGED not allowed)
+CREATE UNLOGGED TABLE public.regress_conflict_log_unlogged (id int);
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log_unlogged');
+ERROR: conflict log table "public.regress_conflict_log_unlogged" must be a permanent table
+HINT: Specify a permanent table as the conflict log table.
+DROP TABLE public.regress_conflict_log_unlogged;
+-- fail - conflict log table must not be in a temporary schema
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'pg_temp.regress_conflict_log1');
+ERROR: cannot create conflict log table "regress_conflict_log1" in a temporary namespace
+HINT: Use a permanent schema.
-- ok - dropping subscription also drops the log table
ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 6b6f1503145..5dd31b5ed12 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -428,6 +428,14 @@ DROP PUBLICATION pub;
-- fail - set conflict_log_table to one already used by a different subscription
ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
+-- fail - conflict log table must be a permanent relation (UNLOGGED not allowed)
+CREATE UNLOGGED TABLE public.regress_conflict_log_unlogged (id int);
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log_unlogged');
+DROP TABLE public.regress_conflict_log_unlogged;
+
+-- fail - conflict log table must not be in a temporary schema
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'pg_temp.regress_conflict_log1');
+
-- ok - dropping subscription also drops the log table
ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
--
2.43.0