v39-0005-Implement-the-conflict-insertion-infrastructure-.patch

application/octet-stream

Filename: v39-0005-Implement-the-conflict-insertion-infrastructure-.patch
Type: application/octet-stream
Part: 2
Message: Re: Proposal: Conflict log history table for Logical Replication
From 36f89fc093879e3c977c414e52e81f7e9b04a47c Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Mon, 18 May 2026 10:51:02 +0000
Subject: [PATCH v39 05/10] Implement the conflict insertion infrastructure for
 the conflict log table

This patch introduces the core logic to populate the conflict log table whenever
a logical replication conflict is detected. It captures the remote transaction
details along with the corresponding local state at the time of the conflict.

Handling Multi-row Conflicts: A single remote tuple may conflict with multiple
local tuples (e.g., in the case of multiple_unique_conflicts). To handle this,
the infrastructure creates a single row in the conflict log table for each
remote tuple. The details of all conflicting local rows are aggregated into a
single JSON array in the local_conflicts column.

The JSON array uses the following structured format:
[ { "xid": "1001", "commit_ts": "2025-12-25 10:00:00+05:30", "origin": "node_1",
"key": {"id": 1}, "tuple": {"id": 1, "val": "old_data"} }, ... ]

Example of querying the structured conflict data:

SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid,
       local_conflicts[1] ->> 'tuple' AS local_tuple
FROM pg_conflict.pg_conflict_log_for_subid_16396;

 remote_xid | relname  | remote_origin | local_xid |     local_tuple
------------+----------+---------------+-----------+---------------------
        760 | test     | pg_16406      | 771       | {"a":1,"b":10}
        765 | conf_tab | pg_16406      | 775       | {"a":2,"b":2,"c":2}
---
 src/backend/replication/logical/conflict.c | 554 +++++++++++++++++++--
 src/backend/replication/logical/launcher.c |   1 +
 src/backend/replication/logical/worker.c   |  31 +-
 src/include/replication/conflict.h         |   2 +
 src/include/replication/worker_internal.h  |   7 +
 src/test/subscription/t/035_conflicts.pl   |  47 +-
 6 files changed, 597 insertions(+), 45 deletions(-)

diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index ad5c0b16abc..2dc10d80bf2 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -16,6 +16,7 @@
 
 #include "access/commit_ts.h"
 #include "access/genam.h"
+#include "access/heapam.h"
 #include "access/tableam.h"
 #include "catalog/dependency.h"
 #include "catalog/heap.h"
@@ -23,11 +24,17 @@
 #include "catalog/pg_namespace.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "funcapi.h"
 #include "pgstat.h"
 #include "replication/conflict.h"
 #include "replication/worker_internal.h"
 #include "storage/lmgr.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/json.h"
 
 /*
  * String representations for the supported conflict logging destinations.
@@ -84,6 +91,18 @@ static const char *const ConflictTypeNames[] = {
 	[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
 };
 
+/* Schema for the elements within the 'local_conflicts' JSON array */
+static const ConflictLogColumnDef LocalConflictSchema[] =
+{
+	{ .attname = "xid",       .atttypid = XIDOID },
+	{ .attname = "commit_ts", .atttypid = TIMESTAMPTZOID },
+	{ .attname = "origin",    .atttypid = TEXTOID },
+	{ .attname = "key",       .atttypid = JSONOID },
+	{ .attname = "tuple",     .atttypid = JSONOID }
+};
+
+#define MAX_LOCAL_CONFLICT_INFO_ATTRS lengthof(LocalConflictSchema)
+
 static int	errcode_apply_conflict(ConflictType type);
 static void errdetail_apply_conflict(EState *estate,
 									 ResultRelInfo *relinfo,
@@ -100,8 +119,27 @@ static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo,
 						   TupleTableSlot *remoteslot, char **remote_desc,
 						   TupleTableSlot *searchslot, char **search_desc,
 						   Oid indexoid);
+static void build_index_datums_from_slot(EState *estate, Relation localrel,
+										 TupleTableSlot *slot,
+										 Relation indexDesc, Datum *values,
+										 bool *isnull);
 static char *build_index_value_desc(EState *estate, Relation localrel,
 									TupleTableSlot *slot, Oid indexoid);
+static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot);
+static Datum tuple_table_slot_to_indextup_json(EState *estate,
+											   Relation localrel,
+											   Oid replica_index,
+											   TupleTableSlot *slot);
+static TupleDesc build_conflict_tupledesc(void);
+static Datum build_local_conflicts_json_array(EState *estate, Relation rel,
+											  ConflictType conflict_type,
+											  List *conflicttuples);
+static void prepare_conflict_log_tuple(EState *estate, Relation rel,
+									   Relation conflictlogrel,
+									   ConflictType conflict_type,
+									   TupleTableSlot *searchslot,
+									   List *conflicttuples,
+									   TupleTableSlot *remoteslot);
 
 /*
  * Builds the TupleDesc for the conflict log table.
@@ -299,30 +337,92 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 					ConflictType type, TupleTableSlot *searchslot,
 					TupleTableSlot *remoteslot, List *conflicttuples)
 {
-	Relation	localrel = relinfo->ri_RelationDesc;
-	StringInfoData err_detail;
+	Relation		localrel = relinfo->ri_RelationDesc;
+	ConflictLogDest	dest;
+	Relation		conflictlogrel;
+	bool			log_dest_clt = false;
+	bool 			log_dest_logfile;
 
-	initStringInfo(&err_detail);
+	pgstat_report_subscription_conflict(MySubscription->oid, type);
 
-	/* Form errdetail message by combining conflicting tuples information. */
-	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
-		errdetail_apply_conflict(estate, relinfo, type, searchslot,
-								 conflicttuple->slot, remoteslot,
-								 conflicttuple->indexoid,
-								 conflicttuple->xmin,
-								 conflicttuple->origin,
-								 conflicttuple->ts,
-								 &err_detail);
+	/*
+	 * Get the conflict log destination. Also, (if there is one) return the
+	 * CLT relation already opened and ready for insertion.
+	 */
+	conflictlogrel = GetConflictLogDestAndTable(&dest);
 
-	pgstat_report_subscription_conflict(MySubscription->oid, type);
+	if (dest == CONFLICT_LOG_DEST_TABLE || dest == CONFLICT_LOG_DEST_ALL)
+		log_dest_clt = true;
+	if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL)
+		log_dest_logfile = true;
 
-	ereport(elevel,
-			errcode_apply_conflict(type),
-			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
-				   get_namespace_name(RelationGetNamespace(localrel)),
-				   RelationGetRelationName(localrel),
-				   ConflictTypeNames[type]),
-			errdetail_internal("%s", err_detail.data));
+	/* Insert to table if requested. */
+	if (log_dest_clt)
+	{
+		Assert(conflictlogrel != NULL);
+
+		/*
+		 * Prepare the conflict log tuple. If the error level is below ERROR,
+		 * insert it immediately. Otherwise, defer the insertion to a new
+		 * transaction after the current one aborts, ensuring the insertion of
+		 * the log tuple is not rolled back.
+		 */
+		prepare_conflict_log_tuple(estate,
+								   relinfo->ri_RelationDesc,
+								   conflictlogrel,
+								   type,
+								   searchslot,
+								   conflicttuples,
+								   remoteslot);
+		if (elevel < ERROR)
+			InsertConflictLogTuple(conflictlogrel);
+
+		if (!log_dest_logfile)
+		{
+			/*
+			 * Not logging conflict details to the server log; Report the error
+			 * msg but omit raw tuple data from server logs since it's already
+			 * captured in the conflict log table.
+			 */
+			ereport(elevel,
+					errcode_apply_conflict(type),
+					errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+						get_namespace_name(RelationGetNamespace(localrel)),
+						RelationGetRelationName(localrel),
+						ConflictTypeNames[type]),
+					errdetail("Conflict details are logged to the conflict log table: %s",
+							  RelationGetRelationName(conflictlogrel)));
+		}
+
+		table_close(conflictlogrel, RowExclusiveLock);
+	}
+
+	/* Log into the server log if requested. */
+	if (log_dest_logfile)
+	{
+		StringInfoData	err_detail;
+
+		initStringInfo(&err_detail);
+
+		/* Form errdetail message by combining conflicting tuples information. */
+		foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+			errdetail_apply_conflict(estate, relinfo, type, searchslot,
+									conflicttuple->slot, remoteslot,
+									conflicttuple->indexoid,
+									conflicttuple->xmin,
+									conflicttuple->origin,
+									conflicttuple->ts,
+									&err_detail);
+
+		/* Standard reporting with full internal details. */
+		ereport(elevel,
+				errcode_apply_conflict(type),
+				errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+					   get_namespace_name(RelationGetNamespace(localrel)),
+					   RelationGetRelationName(localrel),
+					   ConflictTypeNames[type]),
+				errdetail_internal("%s", err_detail.data));
+	}
 }
 
 /*
@@ -356,6 +456,58 @@ InitConflictIndexes(ResultRelInfo *relInfo)
 	relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
 }
 
+/*
+ * GetConflictLogDestAndTable
+ *
+ * Fetches conflict logging metadata from the cached MySubscription pointer.
+ * Sets the destination enum in *log_dest and, if applicable, opens and
+ * returns the relation handle for the conflict log table.
+ */
+Relation
+GetConflictLogDestAndTable(ConflictLogDest *log_dest)
+{
+	Oid			conflictlogrelid;
+
+	/*
+	 * Convert the text log destination to the internal enum.  MySubscription
+	 * already contains the data from pg_subscription.
+	 */
+	*log_dest = GetLogDestination(MySubscription->conflictlogdest);
+
+	/* Quick exit if a conflict log table was not requested. */
+	if (*log_dest == CONFLICT_LOG_DEST_LOG)
+		return NULL;
+
+	conflictlogrelid = MySubscription->conflictlogrelid;
+
+	Assert(OidIsValid(conflictlogrelid));
+
+	return table_open(conflictlogrelid, RowExclusiveLock);
+}
+
+/*
+ * InsertConflictLogTuple
+ *
+ * Insert conflict log tuple into the conflict log table. It uses
+ * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple
+ * inserted into the conflict log table.
+ */
+void
+InsertConflictLogTuple(Relation conflictlogrel)
+{
+	int			options = HEAP_INSERT_NO_LOGICAL;
+
+	/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
+	Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+
+	heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+				GetCurrentCommandId(true), options, NULL);
+
+	/* Free conflict log tuple. */
+	heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+	MyLogicalRepWorker->conflict_log_tuple = NULL;
+}
+
 /*
  * Add SQLSTATE error code to the current conflict report.
  */
@@ -789,6 +941,40 @@ get_tuple_desc(EState *estate, ResultRelInfo *relinfo, ConflictType type,
 	}
 }
 
+/*
+ * Helper function to extract the "raw" index key Datums and their null flags
+ * from a TupleTableSlot, given an already open index descriptor.
+ * This is the reusable core logic.
+ */
+static void
+build_index_datums_from_slot(EState *estate, Relation localrel,
+							 TupleTableSlot *slot,
+							 Relation indexDesc, Datum *values,
+							 bool *isnull)
+{
+	TupleTableSlot *tableslot = slot;
+
+	/*
+	 * If the slot is a virtual slot, copy it into a heap tuple slot as
+	 * FormIndexDatum only works with heap tuple slots.
+	 */
+	if (TTS_IS_VIRTUAL(slot))
+	{
+		/* Slot is created within the EState's tuple table */
+		tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+		tableslot = ExecCopySlot(tableslot, slot);
+	}
+
+	/*
+	 * Initialize ecxt_scantuple for potential use in FormIndexDatum
+	 */
+	GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+	/* Form the index datums */
+	FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values,
+				   isnull);
+}
+
 /*
  * Helper functions to construct a string describing the contents of an index
  * entry. See BuildIndexValueDescription for details.
@@ -804,41 +990,323 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
 	Relation	indexDesc;
 	Datum		values[INDEX_MAX_KEYS];
 	bool		isnull[INDEX_MAX_KEYS];
-	TupleTableSlot *tableslot = slot;
 
-	if (!tableslot)
+	if (!slot)
 		return NULL;
 
 	Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
 	indexDesc = index_open(indexoid, NoLock);
 
-	/*
-	 * If the slot is a virtual slot, copy it into a heap tuple slot as
-	 * FormIndexDatum only works with heap tuple slots.
-	 */
-	if (TTS_IS_VIRTUAL(slot))
+	build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+								 isnull);
+
+	index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+	index_close(indexDesc, NoLock);
+
+	return index_value;
+}
+
+/*
+ * tuple_table_slot_to_json_datum
+ *
+ * Helper function to convert a TupleTableSlot to JSON.
+ */
+static Datum
+tuple_table_slot_to_json_datum(TupleTableSlot *slot)
+{
+	HeapTuple	tuple;
+	Datum		datum;
+	Datum		json;
+
+	Assert(slot != NULL);
+
+	tuple = ExecCopySlotHeapTuple(slot);
+	datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+
+	json = DirectFunctionCall1(row_to_json, datum);
+	heap_freetuple(tuple);
+
+	return json;
+}
+
+/*
+ * tuple_table_slot_to_indextup_json
+ *
+ * Fetch replica identity key from the tuple table slot and convert into a
+ * JSON datum.
+ */
+static Datum
+tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
+								  Oid indexid, TupleTableSlot *slot)
+{
+	Relation	indexDesc;
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	HeapTuple	tuple;
+	TupleDesc	tupdesc;
+	Datum		datum;
+
+	Assert(slot != NULL);
+
+	Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true));
+
+	indexDesc = index_open(indexid, NoLock);
+
+	build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+								 isnull);
+	tupdesc = RelationGetDescr(indexDesc);
+
+	/* Bless the tupdesc so it can be looked up by row_to_json. */
+	BlessTupleDesc(tupdesc);
+
+	/* Form the replica identity tuple. */
+	tuple = heap_form_tuple(tupdesc, values, isnull);
+	datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+	index_close(indexDesc, NoLock);
+	heap_freetuple(tuple);
+
+	/* Convert to a JSON datum. */
+	return DirectFunctionCall1(row_to_json, datum);
+}
+
+/*
+ * build_conflict_tupledesc
+ *
+ * Build and bless a tuple descriptor for the internal conflict log table
+ * based on the predefined LocalConflictSchema.
+ */
+static TupleDesc
+build_conflict_tupledesc(void)
+{
+	TupleDesc   tupdesc;
+
+	tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+	for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++)
+		TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1),
+						   LocalConflictSchema[i].attname,
+						   LocalConflictSchema[i].atttypid,
+						   -1, 0);
+
+	TupleDescFinalize(tupdesc);
+	BlessTupleDesc(tupdesc);
+
+	return tupdesc;
+}
+
+/*
+ * Builds the local conflicts JSON array column from the list of
+ * ConflictTupleInfo objects.
+ *
+ * Example output structure:
+ * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
+ */
+static Datum
+build_local_conflicts_json_array(EState *estate, Relation rel,
+								 ConflictType conflict_type,
+								 List *conflicttuples)
+{
+	ListCell   *lc;
+	List	   *json_datums = NIL;
+	Datum	   *json_datum_array;
+	Datum		json_array_datum;
+	int			num_conflicts;
+	int			i;
+	int16		typlen;
+	bool		typbyval;
+	char		typalign;
+	TupleDesc	tupdesc;
+
+	/* Build local conflicts tuple descriptor. */
+	tupdesc = build_conflict_tupledesc();
+
+	/* Process local conflict tuple list and prepare an array of JSON. */
+	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
 	{
-		tableslot = table_slot_create(localrel, &estate->es_tupleTable);
-		tableslot = ExecCopySlot(tableslot, slot);
+		Datum		values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+		bool		nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+		char	   *origin_name = NULL;
+		HeapTuple	tuple;
+		Datum		json_datum;
+		int			attno;
+
+		attno = 0;
+		values[attno++] = TransactionIdGetDatum(conflicttuple->xmin);
+
+		if (conflicttuple->ts)
+			values[attno++] = TimestampTzGetDatum(conflicttuple->ts);
+		else
+			nulls[attno++] = true;
+
+		if (conflicttuple->origin != InvalidReplOriginId)
+			replorigin_by_oid(conflicttuple->origin, true, &origin_name);
+
+		/* Store empty string if origin name for the tuple is NULL. */
+		if (origin_name != NULL)
+			values[attno++] = CStringGetTextDatum(origin_name);
+		else
+			nulls[attno++] = true;
+
+		/*
+		 * Add the conflicting key values in the case of a unique constraint
+		 * violation.
+		 */
+		if (conflict_type == CT_INSERT_EXISTS ||
+			conflict_type == CT_UPDATE_EXISTS ||
+			conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS)
+		{
+			Oid	indexoid = conflicttuple->indexoid;
+
+			Assert(OidIsValid(indexoid) && conflicttuple->slot &&
+				   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock,
+											  true));
+			values[attno++] =
+					tuple_table_slot_to_indextup_json(estate, rel,
+													  indexoid,
+													  conflicttuple->slot);
+		}
+		else
+			nulls[attno++] = true;
+
+		/* Convert conflicting tuple to JSON datum. */
+		if (conflicttuple->slot)
+			values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot);
+		else
+			nulls[attno] = true;
+
+		Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+		tuple = heap_form_tuple(tupdesc, values, nulls);
+
+		json_datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+		/*
+		 * Build the higher level JSON datum in format described in function
+		 * header.
+		 */
+		json_datum = DirectFunctionCall1(row_to_json, json_datum);
+
+		/* Done with the temporary tuple. */
+		heap_freetuple(tuple);
+
+		/* Add to the array element. */
+		json_datums = lappend(json_datums, (void *) json_datum);
 	}
 
-	/*
-	 * Initialize ecxt_scantuple for potential use in FormIndexDatum when
-	 * index expressions are present.
-	 */
-	GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+	num_conflicts = list_length(json_datums);
 
-	/*
-	 * The values/nulls arrays passed to BuildIndexValueDescription should be
-	 * the results of FormIndexDatum, which are the "raw" input to the index
-	 * AM.
-	 */
-	FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+	json_datum_array = palloc_array(Datum, num_conflicts);
 
-	index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+	i = 0;
+	foreach(lc, json_datums)
+	{
+		json_datum_array[i] = (Datum) lfirst(lc);
+		i++;
+	}
 
-	index_close(indexDesc, NoLock);
+	/* Construct the JSON array Datum. */
+	get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign);
+	json_array_datum = PointerGetDatum(construct_array(json_datum_array,
+													   num_conflicts,
+													   JSONOID,
+													   typlen,
+													   typbyval,
+													   typalign));
+	pfree(json_datum_array);
+
+	return json_array_datum;
+}
 
-	return index_value;
+/*
+ * prepare_conflict_log_tuple
+ *
+ * This routine prepares a tuple detailing a conflict encountered during
+ * logical replication. The prepared tuple will be stored in
+ * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * conflict log table by calling InsertConflictLogTuple.
+ */
+static void
+prepare_conflict_log_tuple(EState *estate, Relation rel,
+						   Relation conflictlogrel,
+						   ConflictType conflict_type,
+						   TupleTableSlot *searchslot,
+						   List *conflicttuples,
+						   TupleTableSlot *remoteslot)
+{
+	Datum		values[MAX_CONFLICT_ATTR_NUM] = {0};
+	bool		nulls[MAX_CONFLICT_ATTR_NUM] = {0};
+	int			attno;
+	char	   *remote_origin = NULL;
+	MemoryContext	oldctx;
+
+	Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+
+	/* Populate the values and nulls arrays. */
+	attno = 0;
+	values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel));
+
+	values[attno++] =
+			CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+
+	values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel));
+
+	values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+
+	if (TransactionIdIsValid(remote_xid))
+		values[attno++] = TransactionIdGetDatum(remote_xid);
+	else
+		nulls[attno++] = true;
+
+	values[attno++] = LSNGetDatum(remote_final_lsn);
+
+	if (remote_commit_ts > 0)
+		values[attno++] = TimestampTzGetDatum(remote_commit_ts);
+	else
+		nulls[attno++] = true;
+
+	if (replorigin_xact_state.origin != InvalidReplOriginId)
+		replorigin_by_oid(replorigin_xact_state.origin, true, &remote_origin);
+
+	if (remote_origin != NULL)
+		values[attno++] = CStringGetTextDatum(remote_origin);
+	else
+		nulls[attno++] = true;
+
+	if (!TupIsNull(searchslot))
+	{
+		Oid		replica_index = GetRelationIdentityOrPK(rel);
+
+		/*
+		 * If the table has a valid replica identity index, build the index
+		 * JSON datum from key value. Otherwise, construct it from the complete
+		 * tuple in REPLICA IDENTITY FULL cases.
+		 */
+		if (OidIsValid(replica_index))
+			values[attno++] = tuple_table_slot_to_indextup_json(estate, rel,
+																replica_index,
+																searchslot);
+		else
+			values[attno++] = tuple_table_slot_to_json_datum(searchslot);
+	}
+	else
+		nulls[attno++] = true;
+
+	if (!TupIsNull(remoteslot))
+		values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+	else
+		nulls[attno++] = true;
+
+	values[attno] = build_local_conflicts_json_array(estate, rel,
+													 conflict_type,
+													 conflicttuples);
+
+	Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+
+	oldctx = MemoryContextSwitchTo(ApplyContext);
+	MyLogicalRepWorker->conflict_log_tuple =
+		heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+	MemoryContextSwitchTo(oldctx);
 }
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 50051dea8c7..f3ee0e9991d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -487,6 +487,7 @@ retry:
 	worker->oldest_nonremovable_xid = retain_dead_tuples
 		? MyReplicationSlot->data.xmin
 		: InvalidTransactionId;
+	worker->conflict_log_tuple = NULL;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a3f2406ed83..469451c736a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -487,7 +487,9 @@ static bool MySubscriptionValid = false;
 static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId	remote_xid = InvalidTransactionId;
+TimestampTz	remote_commit_ts = 0;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -1236,6 +1238,8 @@ apply_handle_begin(StringInfo s)
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
+	remote_commit_ts = begin_data.committime;
+	remote_xid = begin_data.xid;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
@@ -1762,6 +1766,10 @@ apply_handle_stream_start(StringInfo s)
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
+	remote_xid = stream_xid;
+	remote_final_lsn = InvalidXLogRecPtr;
+	remote_commit_ts = 0;
+
 	if (!TransactionIdIsValid(stream_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -5667,6 +5675,27 @@ start_apply(XLogRecPtr origin_startpos)
 			AbortOutOfAnyTransaction();
 			pgstat_report_subscription_error(MySubscription->oid);
 
+			/*
+			 * Insert any pending conflict log tuple under a new transaction.
+			 */
+			if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+			{
+				Relation	conflictlogrel;
+				ConflictLogDest	dest;
+
+				StartTransactionCommand();
+				PushActiveSnapshot(GetTransactionSnapshot());
+
+				/* Open conflict log table and insert the tuple. */
+				conflictlogrel = GetConflictLogDestAndTable(&dest);
+				Assert(dest != CONFLICT_LOG_DEST_LOG);
+				InsertConflictLogTuple(conflictlogrel);
+				table_close(conflictlogrel, RowExclusiveLock);
+
+				PopActiveSnapshot();
+				CommitTransactionCommand();
+			}
+
 			PG_RE_THROW();
 		}
 	}
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index c92216e2dd5..6dcb7970bb7 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -116,4 +116,6 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								TupleTableSlot *remoteslot,
 								List *conflicttuples);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern Relation GetConflictLogDestAndTable(ConflictLogDest *log_dest);
+extern void InsertConflictLogTuple(Relation conflictlogrel);
 #endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 745b7d9e969..6a447da6510 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -100,6 +100,9 @@ typedef struct LogicalRepWorker
 	 */
 	TransactionId oldest_nonremovable_xid;
 
+	/* A conflict log tuple that is prepared but not yet inserted. */
+	HeapTuple	conflict_log_tuple;
+
 	/* Stats. */
 	XLogRecPtr	last_lsn;
 	TimestampTz last_send_time;
@@ -255,6 +258,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern PGDLLIMPORT List *table_states_not_ready;
 
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId	remote_xid;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
 												Oid subid, Oid relid,
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index f23fe6af2a5..05c2179b9a8 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -50,7 +50,7 @@ $node_subscriber->safe_psql(
 	'postgres',
 	"CREATE SUBSCRIPTION sub_tab
 	 CONNECTION '$publisher_connstr application_name=$appname'
-	 PUBLICATION pub_tab;");
+	 PUBLICATION pub_tab WITH (conflict_log_destination=all)");
 
 # Wait for initial table sync to finish
 $node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
@@ -84,10 +84,35 @@ $node_subscriber->wait_for_log(
 .*Key already exists in unique index \"conf_tab_c_key\", modified in transaction .*: key \(c\)=\(4\), local row \(4, 4, 4\)./,
 	$log_offset);
 
+# Verify the contents of the Conflict Log Table (CLT)
+# This section ensures that the clt contains the expected
+# type and specific key data.
+my $subid = $node_subscriber->safe_psql('postgres',
+	"SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';");
+my $clt = "pg_conflict.pg_conflict_log_$subid";
+
+# Wait for the conflict to be logged in the CLT
+my $log_check = $node_subscriber->poll_query_until(
+    'postgres',
+    "SELECT count(*) > 0 FROM $clt;"
+);
+
+my $conflict_check = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) >= 1 FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';");
+is($conflict_check, 't', 'Verified multiple_unique_conflicts logged into conflict log table');
+
+my $json_query = "SELECT local_conflicts FROM $clt;";
+my $raw_json = $node_subscriber->safe_psql('postgres', $json_query);
+
+# Verify that '2' is present inside the JSON structure using a regex
+# This matches the key/value pattern for "a": 2
+like($raw_json, qr/\\"a\\":2/, 'Verified that key 2 exists in the local_conflicts');
+
 pass('multiple_unique_conflicts detected during insert');
 
 # Truncate table to get rid of the error
 $node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+$node_subscriber->safe_psql('postgres', "DELETE FROM $clt");
 
 ##################################################
 # Test multiple_unique_conflicts due to UPDATE
@@ -114,6 +139,26 @@ $node_subscriber->wait_for_log(
 .*Key already exists in unique index \"conf_tab_c_key\", modified in transaction .*: key \(c\)=\(8\), local row \(8, 8, 8\)./,
 	$log_offset);
 
+# Verify the contents of the Conflict Log Table (CLT)
+# This section ensures that the CLT contains the expected
+# type and specific key data.
+
+# Wait for the conflict to be logged in the CLT
+$log_check = $node_subscriber->poll_query_until(
+    'postgres',
+    "SELECT count(*) > 0 FROM $clt;"
+);
+
+$conflict_check = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) >= 1 FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';");
+is($conflict_check, 't', 'Verified multiple_unique_conflicts logged into conflict log table');
+
+$raw_json = $node_subscriber->safe_psql('postgres', $json_query);
+
+# Verify that '6' is present inside the JSON structure using a regex
+# This matches the key/value pattern for "a": 6
+like($raw_json, qr/\\"a\\":6/, 'Verified that key 6 exists in the local_conflicts');
+
 pass('multiple_unique_conflicts detected during update');
 
 # Truncate table to get rid of the error
-- 
2.53.0