v15-0005-Preserve-conflict-log-destination-for-subscripti.patch

text/x-patch

Filename: v15-0005-Preserve-conflict-log-destination-for-subscripti.patch
Type: text/x-patch
Part: 3
Message: Re: Proposal: Conflict log history table for Logical Replication
From 0cccf05489d43cdf89cfd61a0561adb4e19163ec Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Tue, 23 Dec 2025 11:12:47 +0530
Subject: [PATCH v15 5/5] Preserve conflict log destination for subscriptions

Support pg_dump to dump and restore the conflict_log_destination setting for
subscriptions.

During a normal CREATE SUBSCRIPTION, a conflict log table is created
automatically when required. However, during dump/restore or binary upgrade,
the conflict log table may already exist and must be reused rather than
recreated.

To ensure correct behavior, pg_dump now emits an ALTER SUBSCRIPTION command
after subscription creation to restore the conflict_log_destination setting.
While dumping, pg_dump temporarily sets the search_path to the schema in which
the conflict log table was created, ensuring that the conflict log table is
resolved with the appropriate schema.
---
 src/backend/commands/subscriptioncmds.c | 174 +++++++++++++++++-------
 src/bin/pg_dump/pg_dump.c               |  59 +++++++-
 src/bin/pg_dump/pg_dump.h               |   2 +
 src/bin/pg_dump/pg_dump_sort.c          |  31 +++++
 src/bin/pg_dump/t/002_pg_dump.pl        |   7 +-
 5 files changed, 220 insertions(+), 53 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b3694375191..1fbe0d474cf 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1395,6 +1395,124 @@ CheckAlterSubOption(Subscription *sub, const char *option,
 	}
 }
 
+/*
+ * AlterSubscriptionConflictLogDestination
+ *
+ * Update the conflict log table associated with a subscription when its
+ * conflict log destination is changed.
+ *
+ * If the new destination requires a conflict log table and none was previously
+ * required, this function validates an existing conflict log table identified
+ * by the subscription specific naming convention or creates a new one.
+ *
+ * If the new destination no longer requires a conflict log table, the existing
+ * conflict log table associated with the subscription is removed via internal
+ * dependency cleanup to prevent orphaned relations.
+ *
+ * The function enforces that any conflict log table used is a permanent
+ * relation in a permanent schema, matches the expected structure, and is not
+ * already associated with another subscription.
+ *
+ * On success, *conflicttablerelid is set to the OID of the conflict log table
+ * that was created or validated, or to InvalidOid if no table is required.
+ *
+ * Returns true if the subscription's conflict log table reference must be
+ * updated as a result of the destination change; false otherwise.
+ */
+static bool
+AlterSubscriptionConflictLogDestination(Subscription *sub,
+										ConflictLogDest logdest,
+										Oid *conflicttablerelid)
+{
+	ConflictLogDest old_dest = GetLogDestination(sub->logdestination);
+	bool		want_table;
+	bool		has_oldtable;
+	bool		update_relid = false;
+	Oid			relid = InvalidOid;
+
+	want_table = (logdest == CONFLICT_LOG_DEST_TABLE ||
+				  logdest == CONFLICT_LOG_DEST_ALL);
+	has_oldtable = (old_dest == CONFLICT_LOG_DEST_TABLE ||
+					old_dest == CONFLICT_LOG_DEST_ALL);
+
+	if (want_table && !has_oldtable)
+	{
+		char		relname[NAMEDATALEN];
+		Oid			nspid;
+
+		GetConflictLogTableName(relname, sub->oid);
+		nspid = RangeVarGetCreationNamespace(makeRangeVar(NULL, relname, -1));
+		relid = get_relname_relid(relname, nspid);
+		if (OidIsValid(relid))
+		{
+			Relation	conflictlogrel;
+			char	   *nspname = get_namespace_name(nspid);
+
+			/*
+			 * Conflict log tables must be permanent relations. Disallow in
+			 * temporary namespaces to ensure the same.
+			 */
+			if (isTempNamespace(nspid))
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("cannot use conflict log table \"%s.%s\" of a temporary namespace",
+							   nspname, relname),
+						errhint("Specify table from a permanent schema."));
+
+			conflictlogrel = table_open(relid, RowExclusiveLock);
+			if (conflictlogrel->rd_rel->relpersistence != RELPERSISTENCE_PERMANENT)
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						errmsg("conflict log table \"%s.%s\" must be a permanent table",
+							   nspname, relname),
+						errhint("Specify a permanent table as the conflict log table."));
+
+			if (IsConflictLogTable(relid))
+				ereport(ERROR,
+						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						errmsg("conflict log table \"%s.%s\" cannot be used",
+							   nspname, relname),
+						errdetail("The specified table is already registered for a different subscription."),
+						errhint("Specify a different conflict log table."));
+			if (!ValidateConflictLogTable(conflictlogrel))
+				ereport(ERROR,
+						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						errmsg("conflict log table \"%s.%s\" has an incompatible definition",
+							   nspname, relname),
+						errdetail("The table does not match the required conflict log table structure."),
+						errhint("Create the conflict log table with the expected definition or specify a different table."));
+
+			table_close(conflictlogrel, NoLock);
+		}
+		else
+			relid = create_conflict_log_table(sub->oid, sub->name, nspid, relname);
+
+		update_relid = true;
+	}
+	else if (!want_table && has_oldtable)
+	{
+		ObjectAddress object;
+
+		/*
+		 * Conflict log tables are recorded as internal dependencies of the
+		 * subscription.  Drop the table if it is not required anymore to
+		 * avoid stale or orphaned relations.
+		 *
+		 * XXX: At present, only conflict log tables are managed this way. In
+		 * future if we introduce additional internal dependencies, we may
+		 * need a targeted deletion to avoid deletion of any other objects.
+		 */
+		ObjectAddressSet(object, SubscriptionRelationId, sub->oid);
+		performDeletion(&object, DROP_CASCADE,
+						PERFORM_DELETION_INTERNAL |
+						PERFORM_DELETION_SKIP_ORIGINAL);
+		update_relid = true;
+	}
+
+	*conflicttablerelid = relid;
+	return update_relid;
+}
+
 /*
  * Alter the existing subscription.
  */
@@ -1738,65 +1856,23 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DESTINATION))
 				{
 					ConflictLogDest old_dest =
-							GetLogDestination(sub->logdestination);
+						GetLogDestination(sub->logdestination);
 
 					if (opts.logdest != old_dest)
 					{
-						bool want_table =
-								(opts.logdest == CONFLICT_LOG_DEST_TABLE ||
-								 opts.logdest == CONFLICT_LOG_DEST_ALL);
-						bool has_oldtable =
-								(old_dest == CONFLICT_LOG_DEST_TABLE ||
-								 old_dest == CONFLICT_LOG_DEST_ALL);
+						bool		update_relid;
+						Oid			relid = InvalidOid;
 
 						values[Anum_pg_subscription_sublogdestination - 1] =
 							CStringGetTextDatum(ConflictLogDestLabels[opts.logdest]);
 						replaces[Anum_pg_subscription_sublogdestination - 1] = true;
-
-						if (want_table && !has_oldtable)
+						update_relid = AlterSubscriptionConflictLogDestination(sub, opts.logdest, &relid);
+						if (update_relid)
 						{
-							char	relname[NAMEDATALEN];
-							Oid		nspid;
-							Oid		relid;
-
-							GetConflictLogTableName(relname, subid);
-							nspid = RangeVarGetCreationNamespace(makeRangeVar(
-														NULL, relname, -1));
-
-							relid = create_conflict_log_table(subid, sub->name,
-															  nspid, relname);
-
-							values[Anum_pg_subscription_subconflictlogrelid - 1] =
-														ObjectIdGetDatum(relid);
-							replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
-														true;
-						}
-						else if (!want_table && has_oldtable)
-						{
-							ObjectAddress object;
-
-							/*
-							 * Conflict log tables are recorded as internal
-							 * dependencies of the subscription.  Drop the
-							 * table if it is not required anymore to avoid
-							 * stale or orphaned relations.
-							 *
-							 * XXX: At present, only conflict log tables are
-							 * managed this way.  In future if we introduce
-							 * additional internal dependencies, we may need
-							 * a targeted deletion to avoid deletion of any
-							 * other objects.
-							 */
-							ObjectAddressSet(object, SubscriptionRelationId,
-											 subid);
-							performDeletion(&object, DROP_CASCADE,
-											PERFORM_DELETION_INTERNAL |
-											PERFORM_DELETION_SKIP_ORIGINAL);
-
 							values[Anum_pg_subscription_subconflictlogrelid - 1] =
-												ObjectIdGetDatum(InvalidOid);
+								ObjectIdGetDatum(relid);
 							replaces[Anum_pg_subscription_subconflictlogrelid - 1] =
-												true;
+								true;
 						}
 					}
 				}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 27f6be3f0f8..d2477cfb5a1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5130,6 +5130,8 @@ getSubscriptions(Archive *fout)
 	int			i_subfailover;
 	int			i_subretaindeadtuples;
 	int			i_submaxretention;
+	int			i_subconflictlogrelid;
+	int			i_sublogdestination;
 	int			i,
 				ntups;
 
@@ -5216,10 +5218,17 @@ getSubscriptions(Archive *fout)
 
 	if (fout->remoteVersion >= 190000)
 		appendPQExpBufferStr(query,
-							 " s.submaxretention\n");
+							 " s.submaxretention,\n");
 	else
 		appendPQExpBuffer(query,
-						  " 0 AS submaxretention\n");
+						  " 0 AS submaxretention,\n");
+
+	if (fout->remoteVersion >= 190000)
+		appendPQExpBufferStr(query,
+							 " s.subconflictlogrelid, s.sublogdestination\n");
+	else
+		appendPQExpBufferStr(query,
+							 " NULL AS subconflictlogrelid, NULL AS sublogdestination\n");
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n");
@@ -5261,6 +5270,8 @@ getSubscriptions(Archive *fout)
 	i_subpublications = PQfnumber(res, "subpublications");
 	i_suborigin = PQfnumber(res, "suborigin");
 	i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
+	i_subconflictlogrelid = PQfnumber(res, "subconflictlogrelid");
+	i_sublogdestination = PQfnumber(res, "sublogdestination");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -5309,6 +5320,33 @@ getSubscriptions(Archive *fout)
 		else
 			subinfo[i].suboriginremotelsn =
 				pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
+		if (PQgetisnull(res, i, i_subconflictlogrelid))
+			subinfo[i].subconflictlogrelid = InvalidOid;
+		else
+		{
+			TableInfo  *tableInfo;
+
+			subinfo[i].subconflictlogrelid =
+				atooid(PQgetvalue(res, i, i_subconflictlogrelid));
+
+			if (subinfo[i].subconflictlogrelid)
+			{
+				tableInfo = findTableByOid(subinfo[i].subconflictlogrelid);
+				if (!tableInfo)
+					pg_fatal("could not find conflict log table with OID %u",
+							 subinfo[i].subconflictlogrelid);
+
+				addObjectDependency(&subinfo[i].dobj, tableInfo->dobj.dumpId);
+
+				if (!dopt->binary_upgrade)
+					tableInfo->dobj.dump = DUMP_COMPONENT_NONE;
+			}
+		}
+		if (PQgetisnull(res, i, i_sublogdestination))
+			subinfo[i].sublogdestination = NULL;
+		else
+			subinfo[i].sublogdestination =
+				pg_strdup(PQgetvalue(res, i, i_sublogdestination));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5564,6 +5602,23 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 
 	appendPQExpBufferStr(query, ");\n");
 
+	if (subinfo->subconflictlogrelid)
+	{
+		TableInfo  *tableInfo = findTableByOid(subinfo->subconflictlogrelid);
+
+		appendPQExpBuffer(query, "\n\nSELECT pg_catalog.set_config('search_path', '%s', false);\n",
+						  tableInfo->dobj.namespace->dobj.name);
+	}
+
+	appendPQExpBuffer(query,
+					  "\n\nALTER SUBSCRIPTION %s SET (conflict_log_destination = %s);\n",
+					  qsubname,
+					  subinfo->sublogdestination);
+
+
+	if (subinfo->subconflictlogrelid)
+		appendPQExpBufferStr(query, "\n\nSELECT pg_catalog.set_config('search_path', '', false);\n");
+
 	/*
 	 * In binary-upgrade mode, we allow the replication to continue after the
 	 * upgrade.
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 72a00e1bc20..bd52c92140d 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -719,12 +719,14 @@ typedef struct _SubscriptionInfo
 	bool		subfailover;
 	bool		subretaindeadtuples;
 	int			submaxretention;
+	Oid			subconflictlogrelid;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subsynccommit;
 	char	   *subpublications;
 	char	   *suborigin;
 	char	   *suboriginremotelsn;
+	char	   *sublogdestination;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index e2a4df4cf4b..2f170cae70f 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -1131,6 +1131,19 @@ repairTableAttrDefMultiLoop(DumpableObject *tableobj,
 	addObjectDependency(attrdefobj, tableobj->dumpId);
 }
 
+/*
+ * Because we make subscriptions depend on their conflict log tables, while
+ * there is an automatic dependency in the other direction, we need to break
+ * the loop. Remove the automatic dependency, allowing the table to be created
+ * first.
+ */
+static void
+repairSubscriptionTableLoop(DumpableObject *subobj, DumpableObject *tableobj)
+{
+	/* Remove table's dependency on subscription */
+	removeObjectDependency(tableobj, subobj->dumpId);
+}
+
 /*
  * CHECK, NOT NULL constraints on domains work just like those on tables ...
  */
@@ -1361,6 +1374,24 @@ repairDependencyLoop(DumpableObject **loop,
 		return;
 	}
 
+	/*
+	 * Subscription and its Conflict Log Table
+	 */
+	if (nLoop == 2 &&
+		loop[0]->objType == DO_TABLE &&
+		loop[1]->objType == DO_SUBSCRIPTION)
+	{
+		repairSubscriptionTableLoop(loop[1], loop[0]);
+		return;
+	}
+	if (nLoop == 2 &&
+		loop[0]->objType == DO_SUBSCRIPTION &&
+		loop[1]->objType == DO_TABLE)
+	{
+		repairSubscriptionTableLoop(loop[0], loop[1]);
+		return;
+	}
+
 	/* index on partitioned table and corresponding index on partition */
 	if (nLoop == 2 &&
 		loop[0]->objType == DO_INDEX &&
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index e33aa95f6ff..8ec7b0069dd 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -3204,9 +3204,12 @@ my %tests = (
 		create_order => 50,
 		create_sql => 'CREATE SUBSCRIPTION sub3
 						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
-						 WITH (connect = false, origin = any, streaming = on);',
+						 WITH (connect = false, origin = any, streaming = on, conflict_log_destination= table);',
 		regexp => qr/^
-			\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
+			\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E\n\n\n
+			\QSELECT pg_catalog.set_config('search_path', 'public', false);\E\n\n\n
+			\QALTER SUBSCRIPTION sub3 SET (conflict_log_destination = table);\E\n\n\n
+			\QSELECT pg_catalog.set_config('search_path', '', false);\E
 			/xm,
 		like => { %full_runs, section_post_data => 1, },
 		unlike => {
-- 
2.43.0