v15-0005-Preserve-conflict-log-destination-for-subscripti.patch
text/x-patch
Filename: v15-0005-Preserve-conflict-log-destination-for-subscripti.patch
Type: text/x-patch
Part: 3
From 0cccf05489d43cdf89cfd61a0561adb4e19163ec Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Tue, 23 Dec 2025 11:12:47 +0530
Subject: [PATCH v15 5/5] Preserve conflict log destination for subscriptions
Support pg_dump to dump and restore the conflict_log_destination setting for
subscriptions.
During a normal CREATE SUBSCRIPTION, a conflict log table is created
automatically when required. However, during dump/restore or binary upgrade,
the conflict log table may already exist and must be reused rather than
recreated.
To ensure correct behavior, pg_dump now emits an ALTER SUBSCRIPTION command
after subscription creation to restore the conflict_log_destination setting.
While dumping, pg_dump temporarily sets the search_path to the schema in which
the conflict log table was created, ensuring that the conflict log table is
resolved with the appropriate schema.
---
src/backend/commands/subscriptioncmds.c | 174 +++++++++++++++++-------
src/bin/pg_dump/pg_dump.c | 59 +++++++-
src/bin/pg_dump/pg_dump.h | 2 +
src/bin/pg_dump/pg_dump_sort.c | 31 +++++
src/bin/pg_dump/t/002_pg_dump.pl | 7 +-
5 files changed, 220 insertions(+), 53 deletions(-)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b3694375191..1fbe0d474cf 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1395,6 +1395,124 @@ CheckAlterSubOption(Subscription *sub, const char *option,
}
}
+/*
+ * AlterSubscriptionConflictLogDestination
+ *
+ * Update the conflict log table associated with a subscription when its
+ * conflict log destination is changed.
+ *
+ * If the new destination requires a conflict log table and none was previously
+ * required, this function validates an existing conflict log table identified
+ * by the subscription specific naming convention or creates a new one.
+ *
+ * If the new destination no longer requires a conflict log table, the existing
+ * conflict log table associated with the subscription is removed via internal
+ * dependency cleanup to prevent orphaned relations.
+ *
+ * The function enforces that any conflict log table used is a permanent
+ * relation in a permanent schema, matches the expected structure, and is not
+ * already associated with another subscription.
+ *
+ * On success, *conflicttablerelid is set to the OID of the conflict log table
+ * that was created or validated, or to InvalidOid if no table is required.
+ *
+ * Returns true if the subscription's conflict log table reference must be
+ * updated as a result of the destination change; false otherwise.
+ */
+static bool
+AlterSubscriptionConflictLogDestination(Subscription *sub,
+ ConflictLogDest logdest,
+ Oid *conflicttablerelid)
+{
+ ConflictLogDest old_dest = GetLogDestination(sub->logdestination);
+ bool want_table;
+ bool has_oldtable;
+ bool update_relid = false;
+ Oid relid = InvalidOid;
+
+ want_table = (logdest == CONFLICT_LOG_DEST_TABLE ||
+ logdest == CONFLICT_LOG_DEST_ALL);
+ has_oldtable = (old_dest == CONFLICT_LOG_DEST_TABLE ||
+ old_dest == CONFLICT_LOG_DEST_ALL);
+
+ if (want_table && !has_oldtable)
+ {
+ char relname[NAMEDATALEN];
+ Oid nspid;
+
+ GetConflictLogTableName(relname, sub->oid);
+ nspid = RangeVarGetCreationNamespace(makeRangeVar(NULL, relname, -1));
+ relid = get_relname_relid(relname, nspid);
+ if (OidIsValid(relid))
+ {
+ Relation conflictlogrel;
+ char *nspname = get_namespace_name(nspid);
+
+ /*
+ * Conflict log tables must be permanent relations. Disallow in
+ * temporary namespaces to ensure the same.
+ */
+ if (isTempNamespace(nspid))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot use conflict log table \"%s.%s\" of a temporary namespace",
+ nspname, relname),
+ errhint("Specify table from a permanent schema."));
+
+ conflictlogrel = table_open(relid, RowExclusiveLock);
+ if (conflictlogrel->rd_rel->relpersistence != RELPERSISTENCE_PERMANENT)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("conflict log table \"%s.%s\" must be a permanent table",
+ nspname, relname),
+ errhint("Specify a permanent table as the conflict log table."));
+
+ if (IsConflictLogTable(relid))
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("conflict log table \"%s.%s\" cannot be used",
+ nspname, relname),
+ errdetail("The specified table is already registered for a different subscription."),
+ errhint("Specify a different conflict log table."));
+ if (!ValidateConflictLogTable(conflictlogrel))
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("conflict log table \"%s.%s\" has an incompatible definition",
+ nspname, relname),
+ errdetail("The table does not match the required conflict log table structure."),
+ errhint("Create the conflict log table with the expected definition or specify a different table."));
+
+ table_close(conflictlogrel, NoLock);
+ }
+ else
+ relid = create_conflict_log_table(sub->oid, sub->name, nspid, relname);
+
+ update_relid = true;
+ }
+ else if (!want_table && has_oldtable)
+ {
+ ObjectAddress object;
+
+ /*
+ * Conflict log tables are recorded as internal dependencies of the
+ * subscription. Drop the table if it is not required anymore to
+ * avoid stale or orphaned relations.
+ *
+ * XXX: At present, only conflict log tables are managed this way. In
+ * future if we introduce additional internal dependencies, we may
+ * need a targeted deletion to avoid deletion of any other objects.
+ */
+ ObjectAddressSet(object, SubscriptionRelationId, sub->oid);
+ performDeletion(&object, DROP_CASCADE,
+ PERFORM_DELETION_INTERNAL |
+ PERFORM_DELETION_SKIP_ORIGINAL);
+ update_relid = true;
+ }
+
+ *conflicttablerelid = relid;
+ return update_relid;
+}
+
/*
* Alter the existing subscription.
*/
@@ -1738,65 +1856,23 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DESTINATION))
{
ConflictLogDest old_dest =
- GetLogDestination(sub->logdestination);
+ GetLogDestination(sub->logdestination);
if (opts.logdest != old_dest)
{
- bool want_table =
- (opts.logdest == CONFLICT_LOG_DEST_TABLE ||
- opts.logdest == CONFLICT_LOG_DEST_ALL);
- bool has_oldtable =
- (old_dest == CONFLICT_LOG_DEST_TABLE ||
- old_dest == CONFLICT_LOG_DEST_ALL);
+ bool update_relid;
+ Oid relid = InvalidOid;
values[Anum_pg_subscription_sublogdestination - 1] =
CStringGetTextDatum(ConflictLogDestLabels[opts.logdest]);
replaces[Anum_pg_subscription_sublogdestination - 1] = true;
-
- if (want_table && !has_oldtable)
+ update_relid = AlterSubscriptionConflictLogDestination(sub, opts.logdest, &relid);
+ if (update_relid)
{
- char relname[NAMEDATALEN];
- Oid nspid;
- Oid relid;
-
- GetConflictLogTableName(relname, subid);
- nspid = RangeVarGetCreationNamespace(makeRangeVar(
- NULL, relname, -1));
-
- relid = create_conflict_log_table(subid, sub->name,
- nspid, relname);
-
- values[Anum_pg_subscription_subconflictlogrelid - 1] =
- ObjectIdGetDatum(relid);
- replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
- true;
- }
- else if (!want_table && has_oldtable)
- {
- ObjectAddress object;
-
- /*
- * Conflict log tables are recorded as internal
- * dependencies of the subscription. Drop the
- * table if it is not required anymore to avoid
- * stale or orphaned relations.
- *
- * XXX: At present, only conflict log tables are
- * managed this way. In future if we introduce
- * additional internal dependencies, we may need
- * a targeted deletion to avoid deletion of any
- * other objects.
- */
- ObjectAddressSet(object, SubscriptionRelationId,
- subid);
- performDeletion(&object, DROP_CASCADE,
- PERFORM_DELETION_INTERNAL |
- PERFORM_DELETION_SKIP_ORIGINAL);
-
values[Anum_pg_subscription_subconflictlogrelid - 1] =
- ObjectIdGetDatum(InvalidOid);
+ ObjectIdGetDatum(relid);
replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
- true;
+ true;
}
}
}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 27f6be3f0f8..d2477cfb5a1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5130,6 +5130,8 @@ getSubscriptions(Archive *fout)
int i_subfailover;
int i_subretaindeadtuples;
int i_submaxretention;
+ int i_subconflictlogrelid;
+ int i_sublogdestination;
int i,
ntups;
@@ -5216,10 +5218,17 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query,
- " s.submaxretention\n");
+ " s.submaxretention,\n");
else
appendPQExpBuffer(query,
- " 0 AS submaxretention\n");
+ " 0 AS submaxretention,\n");
+
+ if (fout->remoteVersion >= 190000)
+ appendPQExpBufferStr(query,
+ " s.subconflictlogrelid, s.sublogdestination\n");
+ else
+ appendPQExpBufferStr(query,
+ " NULL AS subconflictlogrelid, NULL AS sublogdestination\n");
appendPQExpBufferStr(query,
"FROM pg_subscription s\n");
@@ -5261,6 +5270,8 @@ getSubscriptions(Archive *fout)
i_subpublications = PQfnumber(res, "subpublications");
i_suborigin = PQfnumber(res, "suborigin");
i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
+ i_subconflictlogrelid = PQfnumber(res, "subconflictlogrelid");
+ i_sublogdestination = PQfnumber(res, "sublogdestination");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@@ -5309,6 +5320,33 @@ getSubscriptions(Archive *fout)
else
subinfo[i].suboriginremotelsn =
pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
+ if (PQgetisnull(res, i, i_subconflictlogrelid))
+ subinfo[i].subconflictlogrelid = InvalidOid;
+ else
+ {
+ TableInfo *tableInfo;
+
+ subinfo[i].subconflictlogrelid =
+ atooid(PQgetvalue(res, i, i_subconflictlogrelid));
+
+ if (subinfo[i].subconflictlogrelid)
+ {
+ tableInfo = findTableByOid(subinfo[i].subconflictlogrelid);
+ if (!tableInfo)
+ pg_fatal("could not find conflict log table with OID %u",
+ subinfo[i].subconflictlogrelid);
+
+ addObjectDependency(&subinfo[i].dobj, tableInfo->dobj.dumpId);
+
+ if (!dopt->binary_upgrade)
+ tableInfo->dobj.dump = DUMP_COMPONENT_NONE;
+ }
+ }
+ if (PQgetisnull(res, i, i_sublogdestination))
+ subinfo[i].sublogdestination = NULL;
+ else
+ subinfo[i].sublogdestination =
+ pg_strdup(PQgetvalue(res, i, i_sublogdestination));
/* Decide whether we want to dump it */
selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5564,6 +5602,23 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ");\n");
+ if (subinfo->subconflictlogrelid)
+ {
+ TableInfo *tableInfo = findTableByOid(subinfo->subconflictlogrelid);
+
+ appendPQExpBuffer(query, "\n\nSELECT pg_catalog.set_config('search_path', '%s', false);\n",
+ tableInfo->dobj.namespace->dobj.name);
+ }
+
+ appendPQExpBuffer(query,
+ "\n\nALTER SUBSCRIPTION %s SET (conflict_log_destination = %s);\n",
+ qsubname,
+ subinfo->sublogdestination);
+
+
+ if (subinfo->subconflictlogrelid)
+ appendPQExpBufferStr(query, "\n\nSELECT pg_catalog.set_config('search_path', '', false);\n");
+
/*
* In binary-upgrade mode, we allow the replication to continue after the
* upgrade.
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 72a00e1bc20..bd52c92140d 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -719,12 +719,14 @@ typedef struct _SubscriptionInfo
bool subfailover;
bool subretaindeadtuples;
int submaxretention;
+ Oid subconflictlogrelid;
char *subconninfo;
char *subslotname;
char *subsynccommit;
char *subpublications;
char *suborigin;
char *suboriginremotelsn;
+ char *sublogdestination;
} SubscriptionInfo;
/*
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index e2a4df4cf4b..2f170cae70f 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -1131,6 +1131,19 @@ repairTableAttrDefMultiLoop(DumpableObject *tableobj,
addObjectDependency(attrdefobj, tableobj->dumpId);
}
+/*
+ * Because we make subscriptions depend on their conflict log tables, while
+ * there is an automatic dependency in the other direction, we need to break
+ * the loop. Remove the automatic dependency, allowing the table to be created
+ * first.
+ */
+static void
+repairSubscriptionTableLoop(DumpableObject *subobj, DumpableObject *tableobj)
+{
+ /* Remove table's dependency on subscription */
+ removeObjectDependency(tableobj, subobj->dumpId);
+}
+
/*
* CHECK, NOT NULL constraints on domains work just like those on tables ...
*/
@@ -1361,6 +1374,24 @@ repairDependencyLoop(DumpableObject **loop,
return;
}
+ /*
+ * Subscription and its Conflict Log Table
+ */
+ if (nLoop == 2 &&
+ loop[0]->objType == DO_TABLE &&
+ loop[1]->objType == DO_SUBSCRIPTION)
+ {
+ repairSubscriptionTableLoop(loop[1], loop[0]);
+ return;
+ }
+ if (nLoop == 2 &&
+ loop[0]->objType == DO_SUBSCRIPTION &&
+ loop[1]->objType == DO_TABLE)
+ {
+ repairSubscriptionTableLoop(loop[0], loop[1]);
+ return;
+ }
+
/* index on partitioned table and corresponding index on partition */
if (nLoop == 2 &&
loop[0]->objType == DO_INDEX &&
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index e33aa95f6ff..8ec7b0069dd 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -3204,9 +3204,12 @@ my %tests = (
create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
- WITH (connect = false, origin = any, streaming = on);',
+ WITH (connect = false, origin = any, streaming = on, conflict_log_destination= table);',
regexp => qr/^
- \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
+ \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E\n\n\n
+ \QSELECT pg_catalog.set_config('search_path', 'public', false);\E\n\n\n
+ \QALTER SUBSCRIPTION sub3 SET (conflict_log_destination = table);\E\n\n\n
+ \QSELECT pg_catalog.set_config('search_path', '', false);\E
/xm,
like => { %full_runs, section_post_data => 1, },
unlike => {
--
2.43.0