v13-0002-pg_dump-dump-conflict-log-table-configuration-fo.patch

text/x-patch

Filename: v13-0002-pg_dump-dump-conflict-log-table-configuration-fo.patch
Type: text/x-patch
Part: 1
Message: Re: Proposal: Conflict log history table for Logical Replication
From 0676b9c1b425ea069a6c38816636034fc090662f Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Tue, 16 Dec 2025 09:16:26 +0530
Subject: [PATCH v13 2/2] pg_dump: dump conflict log table configuration for
 subscriptions

Allow pg_dump to preserve the conflict_log_table setting of logical
replication subscriptions.
---
 src/backend/commands/subscriptioncmds.c    | 245 ++++++++++++++-------
 src/bin/pg_dump/pg_dump.c                  |  49 ++++-
 src/bin/pg_dump/pg_dump.h                  |   1 +
 src/bin/pg_dump/pg_dump_sort.c             |  32 +++
 src/bin/pg_dump/t/002_pg_dump.pl           |   5 +-
 src/test/regress/expected/subscription.out |  15 +-
 src/test/regress/sql/subscription.sql      |   8 +
 7 files changed, 265 insertions(+), 90 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b044ed70a2a..569c1a5a76b 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1242,6 +1242,148 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		table_close(rel, NoLock);
 }
 
+/*
+ * AlterSubscriptionConflictLogTable
+ *
+ * Set, change, or remove the conflict log table associated with a
+ * subscription.
+ *
+ * If a conflict log table name is provided, this function validates the
+ * specified relation (or creates it if it does not exist) and records it
+ * as an internal dependency of the subscription. The table must be a
+ * permanent relation in a non temporary schema and must match the expected
+ * conflict log table definition.
+ *
+ * If the subscription already uses the specified conflict log table and the
+ * table still exists, no change is made and a NOTICE is emitted.
+ *
+ * Any previously associated conflict log table is removed by dropping the
+ * subscription's internal dependencies before associating a new table.
+ *
+ * Returns:
+ *   NULL when the association has been removed.
+ *   else conflict log table associated with the subscription.
+ */
+static char *
+AlterSubscriptionConflictLogTable(Oid subid, char *conflictlogtable,
+								  Oid *relnamespaceid)
+{
+	Oid			nspid = InvalidOid;
+	Oid			old_nspid = InvalidOid;
+	char	   *old_relname = NULL;
+	char	   *relname = NULL;
+	List	   *names = NIL;
+	char	   *nspname;
+	ObjectAddress object;
+
+	if (conflictlogtable != NULL)
+	{
+		/* Explicitly check for empty string before any processing. */
+		if (conflictlogtable[0] == '\0')
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("conflict log table name cannot be empty"),
+					 errhint("Provide a valid table name or omit the parameter.")));
+
+		names = stringToQualifiedNameList(conflictlogtable, NULL);
+		nspid = QualifiedNameGetCreationNamespace(names, &relname);
+		nspname = get_namespace_name(nspid);
+	}
+
+	/* Fetch the existing conflict table information. */
+	old_relname = get_subscription_conflict_log_table(subid, &old_nspid);
+
+	/*
+	 * If the subscription already uses this conflict log table and it exists,
+	 * just issue a notice.
+	 */
+	if (old_relname != NULL && relname != NULL
+		&& (strcmp(old_relname, relname) == 0) &&
+		old_nspid == nspid &&
+		OidIsValid(get_relname_relid(old_relname, old_nspid)))
+	{
+		ereport(NOTICE,
+				errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription",
+					   nspname, relname));
+	}
+	else
+	{
+		/*
+		 * Conflict log tables are recorded as internal dependencies of the
+		 * subscription. Before associating a new table, drop the existing
+		 * table to avoid stale or orphaned relations.
+		 *
+		 * XXX: At present, only conflict log tables are managed this way. In
+		 * future if we introduce additional internal dependencies, we may
+		 * need a targeted deletion to avoid deletion of any other objects.
+		 */
+		ObjectAddressSet(object, SubscriptionRelationId, subid);
+		performDeletion(&object, DROP_CASCADE,
+						PERFORM_DELETION_INTERNAL |
+						PERFORM_DELETION_SKIP_ORIGINAL);
+
+		/* Need to create a new table if a new name was provided. */
+		if (relname != NULL)
+		{
+			Oid			conflictlogrelid = get_relname_relid(relname, nspid);
+
+			if (OidIsValid(conflictlogrelid))
+			{
+				Relation	conflictlogrel;
+
+				/*
+				 * Conflict log tables must be permanent relations. Disallow
+				 * in temporary namespaces to ensure the same.
+				 */
+				if (isTempNamespace(nspid))
+					ereport(ERROR,
+							errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							errmsg("cannot use conflict log table \"%s.%s\" of a temporary namespace",
+								   nspname, relname),
+							errhint("Specify table from a permanent schema."));
+
+				conflictlogrel = table_open(conflictlogrelid,
+											RowExclusiveLock);
+
+				if (conflictlogrel->rd_rel->relpersistence != RELPERSISTENCE_PERMANENT)
+					ereport(ERROR,
+							errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							errmsg("conflict log table \"%s.%s\" must be a permanent table",
+								   nspname, relname),
+							errhint("Specify a permanent table as the conflict log table."));
+
+				if (IsConflictLogTable(conflictlogrelid))
+					ereport(ERROR,
+							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							errmsg("conflict log table \"%s.%s\" cannot be used",
+								   nspname, relname),
+							errdetail("The specified table is already registered for a different subscription."),
+							errhint("Specify a different conflict log table."));
+
+				if (!ValidateConflictLogTable(conflictlogrel))
+					ereport(ERROR,
+							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							errmsg("conflict log table \"%s.%s\" has an incompatible definition",
+								   nspname, relname),
+							errdetail("The table does not match the required conflict log table structure."),
+							errhint("Create the conflict log table with the expected definition or specify a different table."));
+
+				table_close(conflictlogrel, NoLock);
+
+			}
+			else
+				create_conflict_log_table(nspid, relname, subid);
+		}
+	}
+
+	pfree(nspname);
+	if (old_relname != NULL)
+		pfree(old_relname);
+
+	*relnamespaceid = nspid;
+	return relname;
+}
+
 /*
  * Marks all sequences with INIT state.
  */
@@ -1727,92 +1869,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_TABLE))
 				{
-					Oid		nspid = InvalidOid;
-					Oid     old_nspid = InvalidOid;
-					char   *old_relname = NULL;
-					char   *relname = NULL;
-					List   *names = NIL;
-
-					if (opts.conflictlogtable != NULL)
-					{
-						/* Explicitly check for empty string before any processing. */
-						if (opts.conflictlogtable[0] == '\0')
-							ereport(ERROR,
-									(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-									errmsg("conflict log table name cannot be empty"),
-									errhint("Provide a valid table name or omit the parameter.")));
-
-						names = stringToQualifiedNameList(opts.conflictlogtable,
-														  NULL);
-						nspid = QualifiedNameGetCreationNamespace(names, &relname);
-					}
-
-					/* Fetch the existing conflict table information. */
-					old_relname =
-						get_subscription_conflict_log_table(subid, &old_nspid);
-
-					/*
-					 * If the subscription already uses this conflict log table
-					 * and it exists, just issue a notice.
-					 */
-					if (old_relname != NULL && relname != NULL
-						&& (strcmp(old_relname, relname) == 0) &&
-						old_nspid == nspid &&
-						OidIsValid(get_relname_relid(old_relname, old_nspid)))
-					{
-						char *nspname = get_namespace_name(nspid);
-
-						ereport(NOTICE,
-								(errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription",
-										nspname, relname)));
-						pfree(nspname);
-					}
+					char	   *relname;
+					Oid			nspid;
+					char	   *conftable = opts.conflictlogtable;
+
+					relname = AlterSubscriptionConflictLogTable(subid,
+																conftable,
+																&nspid);
+					values[Anum_pg_subscription_subconflictlognspid - 1] =
+						ObjectIdGetDatum(nspid);
+
+					if (relname != NULL)
+						values[Anum_pg_subscription_subconflictlogtable - 1] =
+							CStringGetTextDatum(relname);
 					else
-					{
-						ObjectAddress	object;
-
-						/*
-						 * Conflict log tables are recorded as internal
-						 * dependencies of the subscription.  Before
-						 * associating a new table, drop the existing table to
-						 * avoid stale or orphaned relations.
-						 *
-						 * XXX: At present, only conflict log tables are
-						 * managed this way.  In future if we introduce
-						 * additional internal dependencies, we may need
-						 * a targeted deletion to avoid deletion of any
-						 * other objects.
-						 */
-						ObjectAddressSet(object, SubscriptionRelationId, subid);
-						performDeletion(&object, DROP_CASCADE,
-										PERFORM_DELETION_INTERNAL |
-										PERFORM_DELETION_SKIP_ORIGINAL);
-
-						/*
-						 * Need to create a new table if a new name was
-						 * provided.
-						 */
-						if (relname != NULL)
-							create_conflict_log_table(nspid, relname, subid);
-
-						values[Anum_pg_subscription_subconflictlognspid - 1] =
-									ObjectIdGetDatum(nspid);
-
-						if (relname != NULL)
-							values[Anum_pg_subscription_subconflictlogtable - 1] =
-									CStringGetTextDatum(relname);
-						else
-							nulls[Anum_pg_subscription_subconflictlogtable - 1] =
-									true;
-
-						replaces[Anum_pg_subscription_subconflictlognspid - 1] =
-									true;
-						replaces[Anum_pg_subscription_subconflictlogtable - 1] =
-									true;
-					}
+						nulls[Anum_pg_subscription_subconflictlogtable - 1] =
+							true;
 
-					if (old_relname != NULL)
-						pfree(old_relname);
+					replaces[Anum_pg_subscription_subconflictlognspid - 1] =
+						true;
+					replaces[Anum_pg_subscription_subconflictlogtable - 1] =
+						true;
 				}
 
 				update_tuple = true;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 27f6be3f0f8..f85263b8e12 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5130,6 +5130,7 @@ getSubscriptions(Archive *fout)
 	int			i_subfailover;
 	int			i_subretaindeadtuples;
 	int			i_submaxretention;
+	int			i_subconflictlogrelid;
 	int			i,
 				ntups;
 
@@ -5216,10 +5217,17 @@ getSubscriptions(Archive *fout)
 
 	if (fout->remoteVersion >= 190000)
 		appendPQExpBufferStr(query,
-							 " s.submaxretention\n");
+							 " s.submaxretention,\n");
 	else
 		appendPQExpBuffer(query,
-						  " 0 AS submaxretention\n");
+						  " 0 AS submaxretention,\n");
+
+	if (fout->remoteVersion >= 190000)
+		appendPQExpBufferStr(query,
+							 " c.oid AS subconflictlogrelid\n");
+	else
+		appendPQExpBufferStr(query,
+							 " 0::oid AS subconflictlogrelid\n");
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n");
@@ -5229,6 +5237,12 @@ getSubscriptions(Archive *fout)
 							 "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
 							 "    ON o.external_id = 'pg_' || s.oid::text \n");
 
+	if (fout->remoteVersion >= 190000)
+		appendPQExpBufferStr(query,
+							 "LEFT JOIN pg_class c ON c.relname = s.subconflictlogtable\n"
+							 "LEFT JOIN pg_namespace n \n"
+							 "    ON n.oid = c.relnamespace AND n.oid = s.subconflictlognspid\n");
+
 	appendPQExpBufferStr(query,
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
 						 "                   WHERE datname = current_database())");
@@ -5255,6 +5269,7 @@ getSubscriptions(Archive *fout)
 	i_subfailover = PQfnumber(res, "subfailover");
 	i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples");
 	i_submaxretention = PQfnumber(res, "submaxretention");
+	i_subconflictlogrelid = PQfnumber(res, "subconflictlogrelid");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -5292,6 +5307,19 @@ getSubscriptions(Archive *fout)
 			(strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0);
 		subinfo[i].submaxretention =
 			atoi(PQgetvalue(res, i, i_submaxretention));
+		subinfo[i].subconflictlogrelid =
+			atooid(PQgetvalue(res, i, i_subconflictlogrelid));
+
+		if (subinfo[i].subconflictlogrelid != InvalidOid)
+		{
+			TableInfo  *tableInfo = findTableByOid(subinfo[i].subconflictlogrelid);
+
+			if (!tableInfo)
+				pg_fatal("could not find conflict log table with OID %u",
+						 subinfo[i].subconflictlogrelid);
+
+			addObjectDependency(&subinfo[i].dobj, tableInfo->dobj.dumpId);
+		}
 		subinfo[i].subconninfo =
 			pg_strdup(PQgetvalue(res, i, i_subconninfo));
 		if (PQgetisnull(res, i, i_subslotname))
@@ -5564,6 +5592,23 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 
 	appendPQExpBufferStr(query, ");\n");
 
+	if (subinfo->subconflictlogrelid != InvalidOid)
+	{
+		PQExpBuffer conflictlogbuf = createPQExpBuffer();
+		TableInfo  *tbinfo = findTableByOid(subinfo->subconflictlogrelid);
+
+		appendStringLiteralAH(conflictlogbuf,
+							  fmtQualifiedDumpable(tbinfo),
+							  fout);
+
+		appendPQExpBuffer(query,
+						  "\n\nALTER SUBSCRIPTION %s SET (conflict_log_table = %s);\n",
+						  qsubname,
+						  conflictlogbuf->data);
+
+		destroyPQExpBuffer(conflictlogbuf);
+	}
+
 	/*
 	 * In binary-upgrade mode, we allow the replication to continue after the
 	 * upgrade.
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 72a00e1bc20..20ffae491eb 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -719,6 +719,7 @@ typedef struct _SubscriptionInfo
 	bool		subfailover;
 	bool		subretaindeadtuples;
 	int			submaxretention;
+	Oid			subconflictlogrelid;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subsynccommit;
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index e2a4df4cf4b..f0bda51b993 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -1131,6 +1131,19 @@ repairTableAttrDefMultiLoop(DumpableObject *tableobj,
 	addObjectDependency(attrdefobj, tableobj->dumpId);
 }
 
+/*
+ * Because we make subscriptions depend on their conflict log tables, while
+ * there is an automatic dependency in the other direction, we need to break
+ * the loop. Remove the automatic dependency, allowing the table to be created
+ * first.
+ */
+static void
+repairSubscriptionTableLoop(DumpableObject *subobj, DumpableObject *tableobj)
+{
+	/* Remove table's dependency on subscription */
+	removeObjectDependency(tableobj, subobj->dumpId);
+}
+
 /*
  * CHECK, NOT NULL constraints on domains work just like those on tables ...
  */
@@ -1361,6 +1374,25 @@ repairDependencyLoop(DumpableObject **loop,
 		return;
 	}
 
+	/*
+	 * Subscription and its Conflict Log Table
+	 */
+	if (nLoop == 2 &&
+		loop[0]->objType == DO_TABLE &&
+		loop[1]->objType == DO_SUBSCRIPTION)
+	{
+		repairSubscriptionTableLoop(loop[1], loop[0]);
+		return;
+	}
+
+	if (nLoop == 2 &&
+		loop[0]->objType == DO_SUBSCRIPTION &&
+		loop[1]->objType == DO_TABLE)
+	{
+		repairSubscriptionTableLoop(loop[0], loop[1]);
+		return;
+	}
+
 	/* index on partitioned table and corresponding index on partition */
 	if (nLoop == 2 &&
 		loop[0]->objType == DO_INDEX &&
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index e33aa95f6ff..ef11db6b8ee 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -3204,9 +3204,10 @@ my %tests = (
 		create_order => 50,
 		create_sql => 'CREATE SUBSCRIPTION sub3
 						 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
-						 WITH (connect = false, origin = any, streaming = on);',
+						 WITH (connect = false, origin = any, streaming = on, conflict_log_table = \'conflict\');',
 		regexp => qr/^
-			\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
+			\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E\n\n\n
+			\QALTER SUBSCRIPTION sub3 SET (conflict_log_table = 'public.conflict');\E
 			/xm,
 		like => { %full_runs, section_post_data => 1, },
 		unlike => {
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index f96687e107c..665a8fcc0aa 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -630,8 +630,19 @@ SELECT * FROM pg_publication_tables WHERE pubname = 'pub';
 DROP PUBLICATION pub;
 -- fail - set conflict_log_table to one already used by a different subscription
 ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
-ERROR:  cannot create conflict log table "public.regress_conflict_log1" because a table with that name already exists
-HINT:  Use a different name for the conflict log table or drop the existing table.
+ERROR:  conflict log table "public.regress_conflict_log1" cannot be used
+DETAIL:  The specified table is already registered for a different subscription.
+HINT:  Specify a different conflict log table.
+-- fail - conflict log table must be a permanent relation (UNLOGGED not allowed)
+CREATE UNLOGGED TABLE public.regress_conflict_log_unlogged (id int);
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log_unlogged');
+ERROR:  conflict log table "public.regress_conflict_log_unlogged" must be a permanent table
+HINT:  Specify a permanent table as the conflict log table.
+DROP TABLE public.regress_conflict_log_unlogged;
+-- fail - conflict log table must not be in a temporary schema
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'pg_temp.regress_conflict_log1');
+ERROR:  cannot create conflict log table "regress_conflict_log1" in a temporary namespace
+HINT:  Use a permanent schema.
 -- ok - dropping subscription also drops the log table
 ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
 ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 6b6f1503145..5dd31b5ed12 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -428,6 +428,14 @@ DROP PUBLICATION pub;
 -- fail - set conflict_log_table to one already used by a different subscription
 ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
 
+-- fail - conflict log table must be a permanent relation (UNLOGGED not allowed)
+CREATE UNLOGGED TABLE public.regress_conflict_log_unlogged (id int);
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log_unlogged');
+DROP TABLE public.regress_conflict_log_unlogged;
+
+-- fail - conflict log table must not be in a temporary schema
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'pg_temp.regress_conflict_log1');
+
 -- ok - dropping subscription also drops the log table
 ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
 ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
-- 
2.43.0