v39-0006-Review-comment-fixes-for-Implement-the-conflict-.patch

application/octet-stream

Filename: v39-0006-Review-comment-fixes-for-Implement-the-conflict-.patch
Type: application/octet-stream
Part: 5
Message: Re: Proposal: Conflict log history table for Logical Replication
From 749ad0833f7dbc3483da811490bee20150efc22e Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Wed, 20 May 2026 10:13:28 +0530
Subject: [PATCH v39 06/10] Review comment fixes for Implement the conflict
 insertion infrastructure for the conflict log table

Review comment fixes for Implement the conflict
insertion infrastructure for the conflict log table
---
 src/backend/replication/logical/conflict.c | 160 +++++++++++++--------
 src/backend/replication/logical/worker.c   |  32 +----
 src/include/replication/conflict.h         |   1 +
 src/test/subscription/t/030_origin.pl      |   4 +-
 src/test/subscription/t/035_conflicts.pl   |   4 +-
 5 files changed, 116 insertions(+), 85 deletions(-)

diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 2dc10d80bf2..adf49bda7a7 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -31,10 +31,8 @@
 #include "storage/lmgr.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
-#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/pg_lsn.h"
-#include "utils/json.h"
 
 /*
  * String representations for the supported conflict logging destinations.
@@ -48,7 +46,6 @@ const char *const ConflictLogDestNames[] = {
 StaticAssertDecl(lengthof(ConflictLogDestNames) == 3,
 				 "ConflictLogDestNames length mismatch");
 
-
 /* Structure to hold metadata for one column of the conflict log table */
 typedef struct ConflictLogColumnDef
 {
@@ -80,17 +77,6 @@ static const ConflictLogColumnDef ConflictLogSchema[] = {
 
 #define NUM_CONFLICT_ATTRS lengthof(ConflictLogSchema)
 
-static const char *const ConflictTypeNames[] = {
-	[CT_INSERT_EXISTS] = "insert_exists",
-	[CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
-	[CT_UPDATE_EXISTS] = "update_exists",
-	[CT_UPDATE_MISSING] = "update_missing",
-	[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
-	[CT_UPDATE_DELETED] = "update_deleted",
-	[CT_DELETE_MISSING] = "delete_missing",
-	[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
-};
-
 /* Schema for the elements within the 'local_conflicts' JSON array */
 static const ConflictLogColumnDef LocalConflictSchema[] =
 {
@@ -101,7 +87,18 @@ static const ConflictLogColumnDef LocalConflictSchema[] =
 	{ .attname = "tuple",     .atttypid = JSONOID }
 };
 
-#define MAX_LOCAL_CONFLICT_INFO_ATTRS lengthof(LocalConflictSchema)
+#define NUM_LOCAL_CONFLICT_ATTRS lengthof(LocalConflictSchema)
+
+static const char *const ConflictTypeNames[] = {
+	[CT_INSERT_EXISTS] = "insert_exists",
+	[CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
+	[CT_UPDATE_EXISTS] = "update_exists",
+	[CT_UPDATE_MISSING] = "update_missing",
+	[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
+	[CT_UPDATE_DELETED] = "update_deleted",
+	[CT_DELETE_MISSING] = "delete_missing",
+	[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
+};
 
 static int	errcode_apply_conflict(ConflictType type);
 static void errdetail_apply_conflict(EState *estate,
@@ -340,7 +337,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	Relation		localrel = relinfo->ri_RelationDesc;
 	ConflictLogDest	dest;
 	Relation		conflictlogrel;
-	bool			log_dest_clt = false;
+	bool			log_dest_table;
 	bool 			log_dest_logfile;
 
 	pgstat_report_subscription_conflict(MySubscription->oid, type);
@@ -351,13 +348,11 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 	 */
 	conflictlogrel = GetConflictLogDestAndTable(&dest);
 
-	if (dest == CONFLICT_LOG_DEST_TABLE || dest == CONFLICT_LOG_DEST_ALL)
-		log_dest_clt = true;
-	if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL)
-		log_dest_logfile = true;
+	log_dest_table = CONFLICTS_LOGGED_TO_TABLE(dest);
+	log_dest_logfile = CONFLICTS_LOGGED_TO_FILE(dest);
 
 	/* Insert to table if requested. */
-	if (log_dest_clt)
+	if (log_dest_table)
 	{
 		Assert(conflictlogrel != NULL);
 
@@ -386,9 +381,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 			 */
 			ereport(elevel,
 					errcode_apply_conflict(type),
-					errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
-						get_namespace_name(RelationGetNamespace(localrel)),
-						RelationGetRelationName(localrel),
+					errmsg("conflict detected on relation \"%s\": conflict=%s",
+						RelationGetQualifiedRelationName(localrel),
 						ConflictTypeNames[type]),
 					errdetail("Conflict details are logged to the conflict log table: %s",
 							  RelationGetRelationName(conflictlogrel)));
@@ -417,14 +411,54 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 		/* Standard reporting with full internal details. */
 		ereport(elevel,
 				errcode_apply_conflict(type),
-				errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
-					   get_namespace_name(RelationGetNamespace(localrel)),
-					   RelationGetRelationName(localrel),
+				errmsg("conflict detected on relation \"%s\": conflict=%s",
+					   RelationGetQualifiedRelationName(localrel),
 					   ConflictTypeNames[type]),
 				errdetail_internal("%s", err_detail.data));
 	}
 }
 
+/*
+ * ProcessPendingConflictLogTuple
+ *      Insert any deferred conflict log tuple in a separate transaction.
+ *
+ * For conflicts raised at ERROR level, the conflict log tuple cannot be
+ * inserted immediately because the surrounding transaction will abort.
+ * To ensure that conflict information is not lost, such tuples are prepared
+ * during error processing (see prepare_conflict_log_tuple()) but their
+ * insertion is deferred.
+ *
+ * This function is responsible for completing that deferred insertion after
+ * the failing transaction has been aborted and the system has returned to an
+ * idle state.  It executes the insertion in a new, independent transaction,
+ * ensuring that the conflict log entry is durable and not rolled back
+ * together with the failed apply transaction.
+ */
+void
+ProcessPendingConflictLogTuple(void)
+{
+	Relation	conflictlogrel;
+	ConflictLogDest dest;
+
+	/* Nothing to do */
+	if (MyLogicalRepWorker->conflict_log_tuple == NULL)
+		return;
+
+	StartTransactionCommand();
+	PushActiveSnapshot(GetTransactionSnapshot());
+
+	/* Open conflict log table and insert the tuple */
+	conflictlogrel = GetConflictLogDestAndTable(&dest);
+	Assert(conflictlogrel);
+
+	InsertConflictLogTuple(conflictlogrel);
+
+	table_close(conflictlogrel, RowExclusiveLock);
+
+	PopActiveSnapshot();
+	CommitTransactionCommand();
+}
+
 /*
  * Find all unique indexes to check for a conflict and store them into
  * ResultRelInfo.
@@ -475,7 +509,7 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
 	*log_dest = GetLogDestination(MySubscription->conflictlogdest);
 
 	/* Quick exit if a conflict log table was not requested. */
-	if (*log_dest == CONFLICT_LOG_DEST_LOG)
+	if (!CONFLICTS_LOGGED_TO_TABLE(*log_dest))
 		return NULL;
 
 	conflictlogrelid = MySubscription->conflictlogrelid;
@@ -495,13 +529,11 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
 void
 InsertConflictLogTuple(Relation conflictlogrel)
 {
-	int			options = HEAP_INSERT_NO_LOGICAL;
-
 	/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
 	Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
 
 	heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
-				GetCurrentCommandId(true), options, NULL);
+				GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL);
 
 	/* Free conflict log tuple. */
 	heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
@@ -1056,7 +1088,7 @@ tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
 
 	build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
 								 isnull);
-	tupdesc = RelationGetDescr(indexDesc);
+	tupdesc = CreateTupleDescCopy(RelationGetDescr(indexDesc));
 
 	/* Bless the tupdesc so it can be looked up by row_to_json. */
 	BlessTupleDesc(tupdesc);
@@ -1065,8 +1097,9 @@ tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
 	tuple = heap_form_tuple(tupdesc, values, isnull);
 	datum = heap_copy_tuple_as_datum(tuple, tupdesc);
 
-	index_close(indexDesc, NoLock);
 	heap_freetuple(tuple);
+	FreeTupleDesc(tupdesc);
+	index_close(indexDesc, NoLock);
 
 	/* Convert to a JSON datum. */
 	return DirectFunctionCall1(row_to_json, datum);
@@ -1075,26 +1108,41 @@ tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
 /*
  * build_conflict_tupledesc
  *
- * Build and bless a tuple descriptor for the internal conflict log table
- * based on the predefined LocalConflictSchema.
+ * Build and bless a tuple descriptor for the conflict log table based on the
+ * predefined LocalConflictSchema.
  */
 static TupleDesc
 build_conflict_tupledesc(void)
 {
-	TupleDesc   tupdesc;
+	static TupleDesc cached_tupdesc = NULL;
 
-	tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+	if (cached_tupdesc == NULL)
+	{
+		MemoryContext oldcxt;
 
-	for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++)
-		TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1),
-						   LocalConflictSchema[i].attname,
-						   LocalConflictSchema[i].atttypid,
-						   -1, 0);
+		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
-	TupleDescFinalize(tupdesc);
-	BlessTupleDesc(tupdesc);
+		cached_tupdesc = CreateTemplateTupleDesc(NUM_LOCAL_CONFLICT_ATTRS);
 
-	return tupdesc;
+		for (int i = 0; i < NUM_LOCAL_CONFLICT_ATTRS; i++)
+			TupleDescInitEntry(cached_tupdesc,
+							   (AttrNumber) (i + 1),
+							   LocalConflictSchema[i].attname,
+							   LocalConflictSchema[i].atttypid,
+							   -1, 0);
+
+		TupleDescFinalize(cached_tupdesc);
+
+		/*
+		 * Bless once so it can be used as a RECORD type (e.g. for
+		 * row_to_json or other record-based operations).
+		 */
+		BlessTupleDesc(cached_tupdesc);
+
+		MemoryContextSwitchTo(oldcxt);
+	}
+
+	return cached_tupdesc;
 }
 
 /*
@@ -1126,8 +1174,8 @@ build_local_conflicts_json_array(EState *estate, Relation rel,
 	/* Process local conflict tuple list and prepare an array of JSON. */
 	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
 	{
-		Datum		values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
-		bool		nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+		Datum		values[NUM_LOCAL_CONFLICT_ATTRS] = {0};
+		bool		nulls[NUM_LOCAL_CONFLICT_ATTRS] = {0};
 		char	   *origin_name = NULL;
 		HeapTuple	tuple;
 		Datum		json_datum;
@@ -1177,7 +1225,7 @@ build_local_conflicts_json_array(EState *estate, Relation rel,
 		else
 			nulls[attno] = true;
 
-		Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+		Assert(attno + 1 == NUM_LOCAL_CONFLICT_ATTRS);
 
 		tuple = heap_form_tuple(tupdesc, values, nulls);
 
@@ -1236,8 +1284,8 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
 						   List *conflicttuples,
 						   TupleTableSlot *remoteslot)
 {
-	Datum		values[MAX_CONFLICT_ATTR_NUM] = {0};
-	bool		nulls[MAX_CONFLICT_ATTR_NUM] = {0};
+	Datum		values[NUM_CONFLICT_ATTRS] = {0};
+	bool		nulls[NUM_CONFLICT_ATTRS] = {0};
 	int			attno;
 	char	   *remote_origin = NULL;
 	MemoryContext	oldctx;
@@ -1275,6 +1323,11 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
 	else
 		nulls[attno++] = true;
 
+	if (!TupIsNull(remoteslot))
+		values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+	else
+		nulls[attno++] = true;
+
 	if (!TupIsNull(searchslot))
 	{
 		Oid		replica_index = GetRelationIdentityOrPK(rel);
@@ -1294,16 +1347,11 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
 	else
 		nulls[attno++] = true;
 
-	if (!TupIsNull(remoteslot))
-		values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
-	else
-		nulls[attno++] = true;
-
 	values[attno] = build_local_conflicts_json_array(estate, rel,
 													 conflict_type,
 													 conflicttuples);
 
-	Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+	Assert(attno + 1 == NUM_CONFLICT_ATTRS);
 
 	oldctx = MemoryContextSwitchTo(ApplyContext);
 	MyLogicalRepWorker->conflict_log_tuple =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 469451c736a..70ae38a7bd1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1766,15 +1766,15 @@ apply_handle_stream_start(StringInfo s)
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
-	remote_xid = stream_xid;
-	remote_final_lsn = InvalidXLogRecPtr;
-	remote_commit_ts = 0;
-
 	if (!TransactionIdIsValid(stream_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
+	remote_xid = stream_xid;
+	remote_final_lsn = InvalidXLogRecPtr;
+	remote_commit_ts = 0;
+
 	set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
 
 	/* Try to allocate a worker for the streaming transaction. */
@@ -5674,27 +5674,7 @@ start_apply(XLogRecPtr origin_startpos)
 			 */
 			AbortOutOfAnyTransaction();
 			pgstat_report_subscription_error(MySubscription->oid);
-
-			/*
-			 * Insert any pending conflict log tuple under a new transaction.
-			 */
-			if (MyLogicalRepWorker->conflict_log_tuple != NULL)
-			{
-				Relation	conflictlogrel;
-				ConflictLogDest	dest;
-
-				StartTransactionCommand();
-				PushActiveSnapshot(GetTransactionSnapshot());
-
-				/* Open conflict log table and insert the tuple. */
-				conflictlogrel = GetConflictLogDestAndTable(&dest);
-				Assert(dest != CONFLICT_LOG_DEST_LOG);
-				InsertConflictLogTuple(conflictlogrel);
-				table_close(conflictlogrel, RowExclusiveLock);
-
-				PopActiveSnapshot();
-				CommitTransactionCommand();
-			}
+			ProcessPendingConflictLogTuple();
 
 			PG_RE_THROW();
 		}
@@ -6069,6 +6049,8 @@ DisableSubscriptionAndExit(void)
 	 */
 	pgstat_report_subscription_error(MyLogicalRepWorker->subid);
 
+	ProcessPendingConflictLogTuple();
+
 	/* Disable the subscription */
 	StartTransactionCommand();
 
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 6dcb7970bb7..8829f6c6378 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -115,6 +115,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								TupleTableSlot *searchslot,
 								TupleTableSlot *remoteslot,
 								List *conflicttuples);
+extern void ProcessPendingConflictLogTuple(void);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
 extern Relation GetConflictLogDestAndTable(ConflictLogDest *log_dest);
 extern void InsertConflictLogTuple(Relation conflictlogrel);
diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl
index 6bc6b7874c2..5f4d00bdd33 100644
--- a/src/test/subscription/t/030_origin.pl
+++ b/src/test/subscription/t/030_origin.pl
@@ -166,7 +166,7 @@ is($result, qq(32), 'The node_A data replicated to node_B');
 $node_C->safe_psql('postgres', "UPDATE $tab SET a = 33 WHERE a = 32;");
 
 $node_B->wait_for_log(
-	qr/conflict detected on relation "public.$tab_unquoted": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./
+	qr/conflict detected on relation "public.$tab": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./
 );
 
 $node_B->safe_psql('postgres', "DELETE FROM $tab;");
@@ -182,7 +182,7 @@ is($result, qq(33), 'The node_A data replicated to node_B');
 $node_C->safe_psql('postgres', "DELETE FROM $tab WHERE a = 33;");
 
 $node_B->wait_for_log(
-	qr/conflict detected on relation "public.$tab_unquoted": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/
+	qr/conflict detected on relation "public.$tab": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/
 );
 
 # The remaining tests no longer test conflict detection.
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index 05c2179b9a8..4f3880e5b83 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -85,11 +85,11 @@ $node_subscriber->wait_for_log(
 	$log_offset);
 
 # Verify the contents of the Conflict Log Table (CLT)
-# This section ensures that the clt contains the expected
+# This section ensures that the CLT contains the expected
 # type and specific key data.
 my $subid = $node_subscriber->safe_psql('postgres',
 	"SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';");
-my $clt = "pg_conflict.pg_conflict_log_$subid";
+my $clt = "pg_conflict.pg_conflict_log_for_subid_$subid";
 
 # Wait for the conflict to be logged in the CLT
 my $log_check = $node_subscriber->poll_query_until(
-- 
2.53.0