v14-0002-Implement-the-conflict-insertion-infrastructure-.patch

text/x-patch

Filename: v14-0002-Implement-the-conflict-insertion-infrastructure-.patch
Type: text/x-patch
Part: 1
Message: Re: Proposal: Conflict log history table for Logical Replication
From 4aa8a0520b706b247f850dcc9a05fdefb01826e5 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sat, 20 Dec 2025 15:20:09 +0530
Subject: [PATCH v14 2/3] Implement the conflict insertion infrastructure into
 the conflict log table

Note: A single remote tuple may conflict with multiple local tuples when conflict type
is CT_MULTIPLE_UNIQUE_CONFLICTS, so for handling this case we create a single row in
conflict log table with respect to each remote conflict tuple even if it conflicts with
multiple local tuples and we store the multiple conflict tuples as a single JSON array
element in format as
[ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
We can extract the elements of the local tuples from the conflict log table row
as given in below example.

SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid,
       local_conflicts[1] ->> 'tuple' AS local_tuple
FROM myschema.conflict_log_history2;

 remote_xid | relname  | remote_origin | local_xid |     local_tuple
------------+----------+---------------+-----------+---------------------
        760 | test     | pg_16406      | 771       | {"a":1,"b":10}
        765 | conf_tab | pg_16406      | 775       | {"a":2,"b":2,"c":2}
---
 src/backend/replication/logical/conflict.c | 619 +++++++++++++++++++--
 src/backend/replication/logical/launcher.c |   1 +
 src/backend/replication/logical/worker.c   |  38 +-
 src/include/replication/conflict.h         |   3 +
 src/include/replication/worker_internal.h  |   7 +
 5 files changed, 635 insertions(+), 33 deletions(-)

diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 16695592265..5f753cd8042 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,22 @@
 #include "postgres.h"
 
 #include "access/commit_ts.h"
+#include "access/heapam.h"
 #include "access/tableam.h"
+#include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "funcapi.h"
 #include "pgstat.h"
 #include "replication/conflict.h"
 #include "replication/worker_internal.h"
 #include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/jsonb.h"
+
+#define MAX_LOCAL_CONFLICT_INFO_ATTRS 5
 
 static const char *const ConflictTypeNames[] = {
 	[CT_INSERT_EXISTS] = "insert_exists",
@@ -50,8 +59,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 									   TupleTableSlot *localslot,
 									   TupleTableSlot *remoteslot,
 									   Oid indexoid);
+static void build_index_datums_from_slot(EState *estate, Relation localrel,
+										 TupleTableSlot *slot,
+										 Relation indexDesc, Datum *values,
+										 bool *isnull);
 static char *build_index_value_desc(EState *estate, Relation localrel,
 									TupleTableSlot *slot, Oid indexoid);
+static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot);
+static Datum tuple_table_slot_to_indextup_json(EState *estate,
+											   Relation localrel,
+											   Oid replica_index,
+											   TupleTableSlot *slot);
+static TupleDesc build_conflict_tupledesc(void);
+static Datum build_local_conflicts_json_array(EState *estate, Relation rel,
+											  ConflictType conflict_type,
+											  List *conflicttuples);
+static void prepare_conflict_log_tuple(EState *estate, Relation rel,
+									   Relation conflictlogrel,
+									   ConflictType conflict_type,
+									   TupleTableSlot *searchslot,
+									   List *conflicttuples,
+									   TupleTableSlot *remoteslot);
 
 /*
  * Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -105,11 +133,19 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 					ConflictType type, TupleTableSlot *searchslot,
 					TupleTableSlot *remoteslot, List *conflicttuples)
 {
-	Relation	localrel = relinfo->ri_RelationDesc;
-	StringInfoData err_detail;
+	Relation		localrel = relinfo->ri_RelationDesc;
+	StringInfoData	err_detail;
+	ConflictLogDest	dest;
+	Relation		conflictlogrel;
 
 	initStringInfo(&err_detail);
 
+	/*
+	 * Get both the conflict log destination and the opened conflict log
+	 * relation for insertion.
+	 */
+	conflictlogrel = GetConflictLogTableInfo(&dest);
+
 	/* Form errdetail message by combining conflicting tuples information. */
 	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
 		errdetail_apply_conflict(estate, relinfo, type, searchslot,
@@ -120,15 +156,69 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 								 conflicttuple->ts,
 								 &err_detail);
 
+	/* Insert to table if destination is 'table' or 'all' */
+	if (conflictlogrel)
+	{
+		Assert(dest == CONFLICT_LOG_DEST_TABLE ||
+			   dest == CONFLICT_LOG_DEST_ALL);
+
+		if (ValidateConflictLogTable(conflictlogrel))
+		{
+			/*
+			 * Prepare the conflict log tuple. If the error level is below
+			 * ERROR, insert it immediately. Otherwise, defer the insertion to
+			 * a new transaction after the current one aborts, ensuring the
+			 * insertion of the log tuple is not rolled back.
+			 */
+			prepare_conflict_log_tuple(estate,
+									   relinfo->ri_RelationDesc,
+									   conflictlogrel,
+									   type,
+									   searchslot,
+									   conflicttuples,
+									   remoteslot);
+			if (elevel < ERROR)
+				InsertConflictLogTuple(conflictlogrel);
+		}
+		else
+			ereport(WARNING,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("conflict log table \"%s.%s\" structure changed, skipping insertion",
+							get_namespace_name(RelationGetNamespace(conflictlogrel)),
+							RelationGetRelationName(conflictlogrel)));
+
+		table_close(conflictlogrel, RowExclusiveLock);
+	}
+
 	pgstat_report_subscription_conflict(MySubscription->oid, type);
 
-	ereport(elevel,
-			errcode_apply_conflict(type),
-			errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
-				   get_namespace_name(RelationGetNamespace(localrel)),
-				   RelationGetRelationName(localrel),
-				   ConflictTypeNames[type]),
-			errdetail_internal("%s", err_detail.data));
+	/* Decide what detail to show in server logs. */
+	if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL)
+	{
+		/* Standard reporting with full internal details. */
+		ereport(elevel,
+				errcode_apply_conflict(type),
+				errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+					   get_namespace_name(RelationGetNamespace(localrel)),
+					   RelationGetRelationName(localrel),
+					   ConflictTypeNames[type]),
+				errdetail_internal("%s", err_detail.data));
+	}
+	else
+	{
+		/*
+		 * 'table' only: Report the error msg but omit raw tuple data from
+		 * server logs since it's already captured in the internal table.
+		 */
+		ereport(elevel,
+				errcode_apply_conflict(type),
+				errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+					   get_namespace_name(RelationGetNamespace(localrel)),
+					   RelationGetRelationName(localrel),
+					   ConflictTypeNames[type]),
+				errdetail("Conflict details logged to internal table with OID %u.",
+						  MySubscription->conflictrelid));
+	}
 }
 
 /*
@@ -162,6 +252,142 @@ InitConflictIndexes(ResultRelInfo *relInfo)
 	relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
 }
 
+/*
+ * GetConflictLogTableInfo
+ *
+ * Fetches conflict logging metadata from the cached MySubscription pointer.
+ * Sets the destination enum in *log_dest and, if applicable, opens and
+ * returns the relation handle for the internal log table.
+ */
+Relation
+GetConflictLogTableInfo(ConflictLogDest *log_dest)
+{
+	Oid			conflictlogrelid;
+	Relation	conflictlogrel = NULL;
+
+	/*
+	 * Convert the text log destination to the internal enum.  MySubscription
+	 * already contains the data from pg_subscription.
+	 */
+	*log_dest = GetLogDestination(MySubscription->logdestination);
+	conflictlogrelid = MySubscription->conflictrelid;
+
+	/* If destination is 'log' only, no table to open. */
+	if (*log_dest == CONFLICT_LOG_DEST_LOG)
+		return NULL;
+
+	Assert(OidIsValid(conflictlogrelid));
+
+	conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock);
+
+	/* Conflict log table is dropped or not accessible. */
+	if (conflictlogrel == NULL)
+		ereport(WARNING,
+				(errcode(ERRCODE_UNDEFINED_TABLE),
+				 errmsg("conflict log table with OID %u does not exist",
+						conflictlogrelid)));
+
+	return conflictlogrel;
+}
+
+/*
+ * InsertConflictLogTuple
+ *
+ * Insert conflict log tuple into the conflict log table. It uses
+ * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple
+ * inserted into the conflict log table.
+ */
+void
+InsertConflictLogTuple(Relation conflictlogrel)
+{
+	int			options = HEAP_INSERT_NO_LOGICAL;
+
+	/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
+	Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+
+	heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+				GetCurrentCommandId(true), options, NULL);
+
+	/* Free conflict log tuple. */
+	heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+	MyLogicalRepWorker->conflict_log_tuple = NULL;
+}
+
+/*
+ * ValidateConflictLogTable - Validate conflict log table
+ *
+ * Validate whether the conflict log table is still suitable for considering as
+ * conflict log table.
+ */
+bool
+ValidateConflictLogTable(Relation rel)
+{
+	Relation    pg_attribute;
+	HeapTuple   atup;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	Form_pg_attribute attForm;
+	int         attcnt = 0;
+	bool        tbl_ok = true;
+
+	/*
+	 * Check whether the table definition including its column names, data
+	 * types, and column ordering meets the requirements for conflict log
+	 * table.
+	 */
+	pg_attribute = table_open(AttributeRelationId, AccessShareLock);
+	ScanKeyInit(&scankey,
+				Anum_pg_attribute_attrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(RelationGetRelid(rel)));
+
+	scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true,
+							  SnapshotSelf, 1, &scankey);
+
+	/* We only need to check up to MAX_CONFLICT_ATTR_NUM attributes */
+	while (HeapTupleIsValid(atup = systable_getnext(scan)))
+	{
+		const ConflictLogColumnDef *expected;
+		int		schema_idx;
+
+		attForm = (Form_pg_attribute) GETSTRUCT(atup);
+
+		/* Skip system columns and dropped columns */
+		if (attForm->attnum < 1 || attForm->attisdropped)
+			continue;
+
+		attcnt++;
+
+		/* attnum 1 corresponds to index 0 in ConflictLogSchema */
+		schema_idx = attForm->attnum - 1;
+
+		/* Check against the central schema definition */
+		if (schema_idx >= MAX_CONFLICT_ATTR_NUM)
+		{
+			/* Found an extra column beyond the required set */
+			tbl_ok = false;
+			break;
+		}
+
+		expected = &ConflictLogSchema[schema_idx];
+
+		if (attForm->atttypid != expected->atttypid ||
+			strcmp(NameStr(attForm->attname), expected->attname) != 0)
+		{
+			tbl_ok = false;
+			break;
+		}
+	}
+
+	systable_endscan(scan);
+	table_close(pg_attribute, AccessShareLock);
+
+	if (attcnt != MAX_CONFLICT_ATTR_NUM || !tbl_ok)
+		return false;
+
+	return true;
+}
+
 /*
  * Add SQLSTATE error code to the current conflict report.
  */
@@ -472,6 +698,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 	return tuple_value.data;
 }
 
+/*
+ * Helper function to extract the "raw" index key Datums and their null flags
+ * from a TupleTableSlot, given an already open index descriptor.
+ * This is the reusable core logic.
+ */
+static void
+build_index_datums_from_slot(EState *estate, Relation localrel,
+							 TupleTableSlot *slot,
+							 Relation indexDesc, Datum *values,
+							 bool *isnull)
+{
+	TupleTableSlot *tableslot = slot;
+
+	/*
+	 * If the slot is a virtual slot, copy it into a heap tuple slot as
+	 * FormIndexDatum only works with heap tuple slots.
+	 */
+	if (TTS_IS_VIRTUAL(slot))
+	{
+		/* Slot is created within the EState's tuple table */
+		tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+		tableslot = ExecCopySlot(tableslot, slot);
+	}
+
+	/*
+	 * Initialize ecxt_scantuple for potential use in FormIndexDatum
+	 */
+	GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+	/* Form the index datums */
+	FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values,
+				   isnull);
+}
+
 /*
  * Helper functions to construct a string describing the contents of an index
  * entry. See BuildIndexValueDescription for details.
@@ -487,41 +747,336 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
 	Relation	indexDesc;
 	Datum		values[INDEX_MAX_KEYS];
 	bool		isnull[INDEX_MAX_KEYS];
-	TupleTableSlot *tableslot = slot;
 
-	if (!tableslot)
+	if (!slot)
 		return NULL;
 
 	Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
 	indexDesc = index_open(indexoid, NoLock);
 
-	/*
-	 * If the slot is a virtual slot, copy it into a heap tuple slot as
-	 * FormIndexDatum only works with heap tuple slots.
-	 */
-	if (TTS_IS_VIRTUAL(slot))
+	build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+								 isnull);
+
+	index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+	index_close(indexDesc, NoLock);
+
+	return index_value;
+}
+
+/*
+ * tuple_table_slot_to_json_datum
+ *
+ * Helper function to convert a TupleTableSlot to Jsonb.
+ */
+static Datum
+tuple_table_slot_to_json_datum(TupleTableSlot *slot)
+{
+	HeapTuple	tuple;
+	Datum		datum;
+	Datum		json;
+
+	Assert(slot != NULL);
+
+	tuple = ExecCopySlotHeapTuple(slot);
+	datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+
+	json = DirectFunctionCall1(row_to_json, datum);
+	heap_freetuple(tuple);
+
+	return json;
+}
+
+/*
+ * tuple_table_slot_to_indextup_json
+ *
+ * Fetch replica identity key from the tuple table slot and convert into a
+ * jsonb datum.
+ */
+static Datum
+tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
+								  Oid indexid, TupleTableSlot *slot)
+{
+	Relation	indexDesc;
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	HeapTuple	tuple;
+	TupleDesc	tupdesc;
+	Datum		datum;
+
+	Assert(slot != NULL);
+
+	Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true));
+
+	indexDesc = index_open(indexid, NoLock);
+
+	build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+								 isnull);
+	tupdesc = RelationGetDescr(indexDesc);
+
+	/* Bless the tupdesc so it can be looked up by row_to_json. */
+	BlessTupleDesc(tupdesc);
+
+	/* Form the replica identity tuple. */
+	tuple = heap_form_tuple(tupdesc, values, isnull);
+	datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+	index_close(indexDesc, NoLock);
+
+	/* Convert to a JSONB datum. */
+	return DirectFunctionCall1(row_to_json, datum);
+}
+
+/*
+ * Initialize the tuple descriptor for local conflict info.
+ */
+static TupleDesc
+build_conflict_tupledesc(void)
+{
+	TupleDesc	tupdesc;
+	int			attno = 1;
+
+	tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "xid",
+						XIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "commit_ts",
+						TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "origin",
+						TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "key",
+						JSONOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno, "tuple",
+						JSONOID, -1, 0);
+
+	BlessTupleDesc(tupdesc);
+
+	Assert(attno == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+	return tupdesc;
+}
+
+/*
+ * Builds the local conflicts JSONB array column from the list of
+ * ConflictTupleInfo objects.
+ *
+ * Example output structure:
+ * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
+ */
+static Datum
+build_local_conflicts_json_array(EState *estate, Relation rel,
+								 ConflictType conflict_type,
+								 List *conflicttuples)
+{
+	ListCell   *lc;
+	List	   *json_datums = NIL; /* List to hold the row_to_json results (type json) */
+	Datum	   *json_datum_array;
+	bool	   *json_null_array;
+	Datum		json_array_datum;
+	int			num_conflicts;
+	int			i;
+	int16		typlen;
+	bool		typbyval;
+	char		typalign;
+	TupleDesc	tupdesc;
+
+	/* Build local conflicts tuple descriptor. */
+	tupdesc = build_conflict_tupledesc();
+
+	/* Process local conflict tuple list and prepare an array of JSON. */
+	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
 	{
-		tableslot = table_slot_create(localrel, &estate->es_tupleTable);
-		tableslot = ExecCopySlot(tableslot, slot);
+		Datum		values[MAX_LOCAL_CONFLICT_INFO_ATTRS];
+		bool		nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS];
+		char	   *origin_name = NULL;
+		HeapTuple	tuple;
+		Datum		json_datum;
+		int			attno;
+
+		memset(values, 0, sizeof(Datum) * MAX_LOCAL_CONFLICT_INFO_ATTRS);
+		memset(nulls, 0, sizeof(bool) * MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+		attno = 0;
+		values[attno++] = TransactionIdGetDatum(conflicttuple->xmin);
+
+		if (conflicttuple->ts)
+			values[attno++] = TimestampTzGetDatum(conflicttuple->ts);
+		else
+			nulls[attno++] = true;
+
+		if (conflicttuple->origin != InvalidRepOriginId)
+			replorigin_by_oid(conflicttuple->origin, true, &origin_name);
+
+		/* Store empty string if origin name for the tuple is NULL. */
+		if (origin_name != NULL)
+			values[attno++] = CStringGetTextDatum(origin_name);
+		else
+			nulls[attno++] = true;
+
+		/*
+		 * Add the conflicting key values in the case of a unique constraint
+		 * violation.
+		 */
+		if (conflict_type == CT_INSERT_EXISTS ||
+			conflict_type == CT_UPDATE_EXISTS ||
+			conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS)
+		{
+			Oid	indexoid = conflicttuple->indexoid;
+
+			Assert(OidIsValid(indexoid) && conflicttuple->slot &&
+				   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock,
+											  true));
+			values[attno++] =
+					tuple_table_slot_to_indextup_json(estate, rel,
+													  indexoid,
+													  conflicttuple->slot);
+		}
+		else
+			nulls[attno++] = true;
+
+		/* Convert conflicting tuple to JSON datum. */
+		if (conflicttuple->slot)
+			values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot);
+		else
+			nulls[attno] = true;
+
+		Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+		tuple = heap_form_tuple(tupdesc, values, nulls);
+
+		json_datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+		/*
+		 * Build the higher level JSON datum in format described in function
+		 * header.
+		 */
+		json_datum = DirectFunctionCall1(row_to_json, json_datum);
+
+		/* Done with the temporary tuple. */
+		heap_freetuple(tuple);
+
+		/* Add to the array element. */
+		json_datums = lappend(json_datums, (void *) json_datum);
 	}
 
-	/*
-	 * Initialize ecxt_scantuple for potential use in FormIndexDatum when
-	 * index expressions are present.
-	 */
-	GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+	num_conflicts = list_length(json_datums);
 
-	/*
-	 * The values/nulls arrays passed to BuildIndexValueDescription should be
-	 * the results of FormIndexDatum, which are the "raw" input to the index
-	 * AM.
-	 */
-	FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+	json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum));
+	json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool));
 
-	index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+	i = 0;
+	foreach(lc, json_datums)
+	{
+		json_datum_array[i] = (Datum) lfirst(lc);
+		i++;
+	}
 
-	index_close(indexDesc, NoLock);
+	/* Construct the json[] array Datum. */
+	get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign);
+	json_array_datum = PointerGetDatum(construct_array(json_datum_array,
+													   num_conflicts,
+													   JSONOID,
+													   typlen,
+													   typbyval,
+													   typalign));
+	pfree(json_datum_array);
+	pfree(json_null_array);
+
+	return json_array_datum;
+}
 
-	return index_value;
+/*
+ * prepare_conflict_log_tuple
+ *
+ * This routine prepares a tuple detailing a conflict encountered during
+ * logical replication. The prepared tuple will be stored in
+ * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * conflict log table by calling InsertConflictLogTuple.
+ */
+static void
+prepare_conflict_log_tuple(EState *estate, Relation rel,
+						   Relation conflictlogrel,
+						   ConflictType conflict_type,
+						   TupleTableSlot *searchslot,
+						   List *conflicttuples,
+						   TupleTableSlot *remoteslot)
+{
+	Datum		values[MAX_CONFLICT_ATTR_NUM];
+	bool		nulls[MAX_CONFLICT_ATTR_NUM];
+	int			attno;
+	char	   *remote_origin = NULL;
+	MemoryContext	oldctx;
+
+	Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+
+	/* Initialize values and nulls arrays. */
+	memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM);
+	memset(nulls, 0, sizeof(bool) * MAX_CONFLICT_ATTR_NUM);
+
+	/* Populate the values and nulls arrays. */
+	attno = 0;
+	values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel));
+
+	values[attno++] =
+			CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+
+	values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel));
+
+	values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+
+	if (TransactionIdIsValid(remote_xid))
+		values[attno++] = TransactionIdGetDatum(remote_xid);
+	else
+		nulls[attno++] = true;
+
+	values[attno++] = LSNGetDatum(remote_final_lsn);
+
+	if (remote_commit_ts > 0)
+		values[attno++] = TimestampTzGetDatum(remote_commit_ts);
+	else
+		nulls[attno++] = true;
+
+	if (replorigin_session_origin != InvalidRepOriginId)
+		replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+	if (remote_origin != NULL)
+		values[attno++] = CStringGetTextDatum(remote_origin);
+	else
+		nulls[attno++] = true;
+
+	if (!TupIsNull(searchslot))
+	{
+		Oid		replica_index = GetRelationIdentityOrPK(rel);
+
+		/*
+		 * If the table has a valid replica identity index, build the index
+		 * json datum from key value. Otherwise, construct it from the complete
+		 * tuple in REPLICA IDENTITY FULL cases.
+		 */
+		if (OidIsValid(replica_index))
+			values[attno++] = tuple_table_slot_to_indextup_json(estate, rel,
+																replica_index,
+																searchslot);
+		else
+			values[attno++] = tuple_table_slot_to_json_datum(searchslot);
+	}
+	else
+		nulls[attno++] = true;
+
+	if (!TupIsNull(remoteslot))
+		values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+	else
+		nulls[attno++] = true;
+
+	values[attno] = build_local_conflicts_json_array(estate, rel,
+													 conflict_type,
+													 conflicttuples);
+
+	Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+
+	oldctx = MemoryContextSwitchTo(ApplyContext);
+	MyLogicalRepWorker->conflict_log_tuple =
+		heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+	MemoryContextSwitchTo(oldctx);
 }
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3991e1495d4..bc7e1d9ebde 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -477,6 +477,7 @@ retry:
 	worker->oldest_nonremovable_xid = retain_dead_tuples
 		? MyReplicationSlot->data.xmin
 		: InvalidTransactionId;
+	worker->conflict_log_tuple = NULL;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fc64476a9ef..ccd1b2c6e81 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
 static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId	remote_xid = InvalidTransactionId;
+TimestampTz	remote_commit_ts = 0;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s)
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
+	remote_commit_ts = begin_data.committime;
+	remote_xid = begin_data.xid;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
@@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s)
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
+	remote_xid = stream_xid;
+	remote_final_lsn = InvalidXLogRecPtr;
+	remote_commit_ts = 0;
+
 	if (!TransactionIdIsValid(stream_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -5609,6 +5617,34 @@ start_apply(XLogRecPtr origin_startpos)
 			pgstat_report_subscription_error(MySubscription->oid,
 											 MyLogicalRepWorker->type);
 
+			/*
+			 * Insert any pending conflict log tuple under a new transaction.
+			 */
+			if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+			{
+				Relation	conflictlogrel;
+				ConflictLogDest	dest;
+
+				StartTransactionCommand();
+				PushActiveSnapshot(GetTransactionSnapshot());
+
+				/* Open conflict log table and insert the tuple. */
+				conflictlogrel = GetConflictLogTableInfo(&dest);
+				if (ValidateConflictLogTable(conflictlogrel))
+					InsertConflictLogTuple(conflictlogrel);
+				else
+					ereport(WARNING,
+							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							errmsg("Conflict log table \"%s.%s\" structure changed, skipping insertion",
+								   get_namespace_name(RelationGetNamespace(conflictlogrel)),
+								   RelationGetRelationName(conflictlogrel)));
+				MyLogicalRepWorker->conflict_log_tuple = NULL;
+				table_close(conflictlogrel, RowExclusiveLock);
+
+				PopActiveSnapshot();
+				CommitTransactionCommand();
+			}
+
 			PG_RE_THROW();
 		}
 	}
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 70f8744b381..226fed9e383 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -139,4 +139,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								TupleTableSlot *remoteslot,
 								List *conflicttuples);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern Relation GetConflictLogTableInfo(ConflictLogDest *log_dest);
+extern void InsertConflictLogTuple(Relation conflictlogrel);
+extern bool ValidateConflictLogTable(Relation rel);
 #endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f081619f151..711c04c7297 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -101,6 +101,9 @@ typedef struct LogicalRepWorker
 	 */
 	TransactionId oldest_nonremovable_xid;
 
+	/* A conflict log tuple that is prepared but not yet inserted. */
+	HeapTuple	conflict_log_tuple;
+
 	/* Stats. */
 	XLogRecPtr	last_lsn;
 	TimestampTz last_send_time;
@@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern PGDLLIMPORT List *table_states_not_ready;
 
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId	remote_xid;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
 												Oid subid, Oid relid,
-- 
2.43.0