v17-0002-Implement-the-conflict-insertion-infrastructure-.patch
text/x-patch
Filename: v17-0002-Implement-the-conflict-insertion-infrastructure-.patch
Type: text/x-patch
Part: 4
From 82c2eadc602ae4f01dfa09dd943bad2ab600cdcd Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Fri, 26 Dec 2025 20:08:11 +0530
Subject: [PATCH v17 2/6] Implement the conflict insertion infrastructure for
the conflict log table
This patch introduces the core logic to populate the conflict log table whenever
a logical replication conflict is detected. It captures the remote transaction
details along with the corresponding local state at the time of the conflict.
Handling Multi-row Conflicts: A single remote tuple may conflict with multiple
local tuples (e.g., in the case of multiple_unique_conflicts). To handle this,
the infrastructure creates a single row in the conflict log table for each
remote tuple. The details of all conflicting local rows are aggregated into a
single JSON array in the local_conflicts column.
The JSON array uses the following structured format:
[ { "xid": "1001", "commit_ts": "2025-12-25 10:00:00+05:30", "origin": "node_1",
"key": {"id": 1}, "tuple": {"id": 1, "val": "old_data"} }, ... ]
Example of querying the structured conflict data:
SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid,
local_conflicts[1] ->> 'tuple' AS local_tuple
FROM myschema.conflict_log_history2;
remote_xid | relname | remote_origin | local_xid | local_tuple
------------+----------+---------------+-----------+---------------------
760 | test | pg_16406 | 771 | {"a":1,"b":10}
765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2}
---
src/backend/replication/logical/conflict.c | 603 ++++++++++++++++++-
src/backend/replication/logical/launcher.c | 1 +
src/backend/replication/logical/worker.c | 32 +-
src/include/replication/conflict.h | 4 +-
src/include/replication/worker_internal.h | 7 +
src/test/subscription/t/037_conflict_dest.pl | 190 ++++++
6 files changed, 805 insertions(+), 32 deletions(-)
create mode 100644 src/test/subscription/t/037_conflict_dest.pl
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 16695592265..98080fd2db0 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,20 @@
#include "postgres.h"
#include "access/commit_ts.h"
+#include "access/heapam.h"
#include "access/tableam.h"
+#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
+#include "funcapi.h"
#include "pgstat.h"
#include "replication/conflict.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/jsonb.h"
static const char *const ConflictTypeNames[] = {
[CT_INSERT_EXISTS] = "insert_exists",
@@ -34,6 +41,19 @@ static const char *const ConflictTypeNames[] = {
[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
+/* Schema for the elements within the 'local_conflicts' JSON array */
+static const ConflictLogColumnDef LocalConflictSchema[] =
+{
+ {.attname = "xid",.atttypid = XIDOID},
+ {.attname = "commit_ts",.atttypid = TIMESTAMPTZOID},
+ {.attname = "origin",.atttypid = TEXTOID},
+ {.attname = "key",.atttypid = JSONOID},
+ {.attname = "tuple",.atttypid = JSONOID}
+};
+
+#define MAX_LOCAL_CONFLICT_INFO_ATTRS \
+ (sizeof(LocalConflictSchema) / sizeof(LocalConflictSchema[0]))
+
static int errcode_apply_conflict(ConflictType type);
static void errdetail_apply_conflict(EState *estate,
ResultRelInfo *relinfo,
@@ -50,8 +70,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *localslot,
TupleTableSlot *remoteslot,
Oid indexoid);
+static void build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull);
static char *build_index_value_desc(EState *estate, Relation localrel,
TupleTableSlot *slot, Oid indexoid);
+static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot);
+static Datum tuple_table_slot_to_indextup_json(EState *estate,
+ Relation localrel,
+ Oid replica_index,
+ TupleTableSlot *slot);
+static TupleDesc build_conflict_tupledesc(void);
+static Datum build_local_conflicts_json_array(EState *estate, Relation rel,
+ ConflictType conflict_type,
+ List *conflicttuples);
+static void prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ ConflictType conflict_type,
+ TupleTableSlot *searchslot,
+ List *conflicttuples,
+ TupleTableSlot *remoteslot);
/*
* Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -107,9 +146,17 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
{
Relation localrel = relinfo->ri_RelationDesc;
StringInfoData err_detail;
+ ConflictLogDest dest;
+ Relation conflictlogrel;
initStringInfo(&err_detail);
+ /*
+ * Get both the conflict log destination and the opened conflict log
+ * relation for insertion.
+ */
+ conflictlogrel = GetConflictLogTableInfo(&dest);
+
/* Form errdetail message by combining conflicting tuples information. */
foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
errdetail_apply_conflict(estate, relinfo, type, searchslot,
@@ -120,15 +167,62 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
conflicttuple->ts,
&err_detail);
+ /* Insert to table if destination is 'table' or 'all' */
+ if (conflictlogrel)
+ {
+ Assert((dest & CONFLICT_LOG_DEST_TABLE) != 0);
+
+ if (ValidateConflictLogTable(conflictlogrel))
+ {
+ /*
+ * Prepare the conflict log tuple. If the error level is below
+ * ERROR, insert it immediately. Otherwise, defer the insertion to
+ * a new transaction after the current one aborts, ensuring the
+ * insertion of the log tuple is not rolled back.
+ */
+ prepare_conflict_log_tuple(estate,
+ relinfo->ri_RelationDesc,
+ conflictlogrel,
+ type,
+ searchslot,
+ conflicttuples,
+ remoteslot);
+ if (elevel < ERROR)
+ InsertConflictLogTuple(conflictlogrel);
+ }
+
+ table_close(conflictlogrel, RowExclusiveLock);
+ }
+
pgstat_report_subscription_conflict(MySubscription->oid, type);
- ereport(elevel,
- errcode_apply_conflict(type),
- errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
- get_namespace_name(RelationGetNamespace(localrel)),
- RelationGetRelationName(localrel),
- ConflictTypeNames[type]),
- errdetail_internal("%s", err_detail.data));
+ /* Decide what detail to show in server logs. */
+ if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL)
+ {
+ /* Standard reporting with full internal details. */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail_internal("%s", err_detail.data));
+ }
+ else
+ {
+ /*
+ * 'table' only: Report the error msg but omit raw tuple data from
+ * server logs since it's already captured in the internal table.
+ */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail("Conflict details logged to internal table with OID %u.",
+ MySubscription->conflictlogrelid));
+ }
}
/*
@@ -162,6 +256,143 @@ InitConflictIndexes(ResultRelInfo *relInfo)
relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
}
+/*
+ * GetConflictLogTableInfo
+ *
+ * Fetches conflict logging metadata from the cached MySubscription pointer.
+ * Sets the destination enum in *log_dest and, if applicable, opens and
+ * returns the relation handle for the internal log table.
+ */
+Relation
+GetConflictLogTableInfo(ConflictLogDest *log_dest)
+{
+ Oid conflictlogrelid;
+ Relation conflictlogrel = NULL;
+
+ /*
+ * Convert the text log destination to the internal enum. MySubscription
+ * already contains the data from pg_subscription.
+ */
+ *log_dest = GetLogDestination(MySubscription->conflictlogdest);
+ conflictlogrelid = MySubscription->conflictlogrelid;
+
+ /* If destination is 'log' only, no table to open. */
+ if (*log_dest == CONFLICT_LOG_DEST_LOG)
+ return NULL;
+
+ Assert(OidIsValid(conflictlogrelid));
+
+ conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock);
+
+ /* Conflict log table is dropped or not accessible. */
+ if (conflictlogrel == NULL)
+ ereport(WARNING,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("conflict log table with OID %u does not exist",
+ conflictlogrelid)));
+
+ return conflictlogrel;
+}
+
+/*
+ * InsertConflictLogTuple
+ *
+ * Insert conflict log tuple into the conflict log table. It uses
+ * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple
+ * inserted into the conflict log table.
+ */
+void
+InsertConflictLogTuple(Relation conflictlogrel)
+{
+ int options = HEAP_INSERT_NO_LOGICAL;
+
+ /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
+ Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+
+ heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+ GetCurrentCommandId(true), options, NULL);
+
+ /* Free conflict log tuple. */
+ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+}
+
+/*
+ * ValidateConflictLogTable - Validate conflict log table schema
+ *
+ * Checks whether the table definition including its column names, data
+ * types, and column ordering meet the requirements for conflict log
+ * table.
+ */
+bool
+ValidateConflictLogTable(Relation rel)
+{
+ Relation pg_attribute;
+ HeapTuple atup;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ int attcnt = 0;
+ bool tbl_ok = true;
+
+ pg_attribute = table_open(AttributeRelationId, AccessShareLock);
+ ScanKeyInit(&scankey,
+ Anum_pg_attribute_attrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+
+ scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true,
+ SnapshotSelf, 1, &scankey);
+
+ /* We only need to check up to MAX_CONFLICT_ATTR_NUM attributes */
+ while (HeapTupleIsValid(atup = systable_getnext(scan)))
+ {
+ const ConflictLogColumnDef *expected;
+ int schema_idx;
+ Form_pg_attribute attForm = (Form_pg_attribute) GETSTRUCT(atup);
+
+ /* Skip system columns and dropped columns */
+ if (attForm->attnum < 1 || attForm->attisdropped)
+ continue;
+
+ attcnt++;
+
+ /* attnum 1 corresponds to index 0 in ConflictLogSchema */
+ schema_idx = attForm->attnum - 1;
+
+ /* Check against the central schema definition */
+ if (schema_idx >= MAX_CONFLICT_ATTR_NUM)
+ {
+ /* Found an extra column beyond the required set */
+ tbl_ok = false;
+ break;
+ }
+
+ expected = &ConflictLogSchema[schema_idx];
+
+ if (attForm->atttypid != expected->atttypid ||
+ strcmp(NameStr(attForm->attname), expected->attname) != 0)
+ {
+ tbl_ok = false;
+ break;
+ }
+ }
+
+ systable_endscan(scan);
+ table_close(pg_attribute, AccessShareLock);
+
+ if (attcnt != MAX_CONFLICT_ATTR_NUM || !tbl_ok)
+ {
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("conflict log table \"%s.%s\" structure changed, skipping insertion",
+ get_namespace_name(RelationGetNamespace(rel)),
+ RelationGetRelationName(rel)));
+ return false;
+ }
+
+ return true;
+}
+
/*
* Add SQLSTATE error code to the current conflict report.
*/
@@ -472,6 +703,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
return tuple_value.data;
}
+/*
+ * Helper function to extract the "raw" index key Datums and their null flags
+ * from a TupleTableSlot, given an already open index descriptor.
+ * This is the reusable core logic.
+ */
+static void
+build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull)
+{
+ TupleTableSlot *tableslot = slot;
+
+ /*
+ * If the slot is a virtual slot, copy it into a heap tuple slot as
+ * FormIndexDatum only works with heap tuple slots.
+ */
+ if (TTS_IS_VIRTUAL(slot))
+ {
+ /* Slot is created within the EState's tuple table */
+ tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+ tableslot = ExecCopySlot(tableslot, slot);
+ }
+
+ /*
+ * Initialize ecxt_scantuple for potential use in FormIndexDatum
+ */
+ GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+ /* Form the index datums */
+ FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values,
+ isnull);
+}
+
/*
* Helper functions to construct a string describing the contents of an index
* entry. See BuildIndexValueDescription for details.
@@ -487,41 +752,319 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
Relation indexDesc;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
- TupleTableSlot *tableslot = slot;
- if (!tableslot)
+ if (!slot)
return NULL;
Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
indexDesc = index_open(indexoid, NoLock);
- /*
- * If the slot is a virtual slot, copy it into a heap tuple slot as
- * FormIndexDatum only works with heap tuple slots.
- */
- if (TTS_IS_VIRTUAL(slot))
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+
+ index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+ index_close(indexDesc, NoLock);
+
+ return index_value;
+}
+
+/*
+ * tuple_table_slot_to_json_datum
+ *
+ * Helper function to convert a TupleTableSlot to Jsonb.
+ */
+static Datum
+tuple_table_slot_to_json_datum(TupleTableSlot *slot)
+{
+ HeapTuple tuple;
+ Datum datum;
+ Datum json;
+
+ Assert(slot != NULL);
+
+ tuple = ExecCopySlotHeapTuple(slot);
+ datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+
+ json = DirectFunctionCall1(row_to_json, datum);
+ heap_freetuple(tuple);
+
+ return json;
+}
+
+/*
+ * tuple_table_slot_to_indextup_json
+ *
+ * Fetch replica identity key from the tuple table slot and convert into a
+ * jsonb datum.
+ */
+static Datum
+tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
+ Oid indexid, TupleTableSlot *slot)
+{
+ Relation indexDesc;
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ HeapTuple tuple;
+ TupleDesc tupdesc;
+ Datum datum;
+
+ Assert(slot != NULL);
+
+ Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true));
+
+ indexDesc = index_open(indexid, NoLock);
+
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+ tupdesc = RelationGetDescr(indexDesc);
+
+ /* Bless the tupdesc so it can be looked up by row_to_json. */
+ BlessTupleDesc(tupdesc);
+
+ /* Form the replica identity tuple. */
+ tuple = heap_form_tuple(tupdesc, values, isnull);
+ datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+ index_close(indexDesc, NoLock);
+
+ /* Convert to a JSONB datum. */
+ return DirectFunctionCall1(row_to_json, datum);
+}
+
+static TupleDesc
+build_conflict_tupledesc(void)
+{
+ TupleDesc tupdesc;
+
+ tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++)
+ TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1),
+ LocalConflictSchema[i].attname,
+ LocalConflictSchema[i].atttypid,
+ -1, 0);
+
+ BlessTupleDesc(tupdesc);
+
+ return tupdesc;
+}
+
+/*
+ * Builds the local conflicts JSONB array column from the list of
+ * ConflictTupleInfo objects.
+ *
+ * Example output structure:
+ * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
+ */
+static Datum
+build_local_conflicts_json_array(EState *estate, Relation rel,
+ ConflictType conflict_type,
+ List *conflicttuples)
+{
+ ListCell *lc;
+ List *json_datums = NIL; /* List to hold the row_to_json results
+ * (type json) */
+ Datum *json_datum_array;
+ bool *json_null_array;
+ Datum json_array_datum;
+ int num_conflicts;
+ int i;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+ TupleDesc tupdesc;
+
+ /* Build local conflicts tuple descriptor. */
+ tupdesc = build_conflict_tupledesc();
+
+ /* Process local conflict tuple list and prepare an array of JSON. */
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
{
- tableslot = table_slot_create(localrel, &estate->es_tupleTable);
- tableslot = ExecCopySlot(tableslot, slot);
+ Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+ bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+ char *origin_name = NULL;
+ HeapTuple tuple;
+ Datum json_datum;
+ int attno;
+
+ attno = 0;
+ values[attno++] = TransactionIdGetDatum(conflicttuple->xmin);
+
+ if (conflicttuple->ts)
+ values[attno++] = TimestampTzGetDatum(conflicttuple->ts);
+ else
+ nulls[attno++] = true;
+
+ if (conflicttuple->origin != InvalidRepOriginId)
+ replorigin_by_oid(conflicttuple->origin, true, &origin_name);
+
+ /* Store empty string if origin name for the tuple is NULL. */
+ if (origin_name != NULL)
+ values[attno++] = CStringGetTextDatum(origin_name);
+ else
+ nulls[attno++] = true;
+
+ /*
+ * Add the conflicting key values in the case of a unique constraint
+ * violation.
+ */
+ if (conflict_type == CT_INSERT_EXISTS ||
+ conflict_type == CT_UPDATE_EXISTS ||
+ conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS)
+ {
+ Oid indexoid = conflicttuple->indexoid;
+
+ Assert(OidIsValid(indexoid) && conflicttuple->slot &&
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock,
+ true));
+ values[attno++] =
+ tuple_table_slot_to_indextup_json(estate, rel,
+ indexoid,
+ conflicttuple->slot);
+ }
+ else
+ nulls[attno++] = true;
+
+ /* Convert conflicting tuple to JSON datum. */
+ if (conflicttuple->slot)
+ values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot);
+ else
+ nulls[attno] = true;
+
+ Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+
+ json_datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+ /*
+ * Build the higher level JSON datum in format described in function
+ * header.
+ */
+ json_datum = DirectFunctionCall1(row_to_json, json_datum);
+
+ /* Done with the temporary tuple. */
+ heap_freetuple(tuple);
+
+ /* Add to the array element. */
+ json_datums = lappend(json_datums, (void *) json_datum);
}
- /*
- * Initialize ecxt_scantuple for potential use in FormIndexDatum when
- * index expressions are present.
- */
- GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+ num_conflicts = list_length(json_datums);
- /*
- * The values/nulls arrays passed to BuildIndexValueDescription should be
- * the results of FormIndexDatum, which are the "raw" input to the index
- * AM.
- */
- FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+ json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum));
+ json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool));
- index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+ i = 0;
+ foreach(lc, json_datums)
+ {
+ json_datum_array[i] = (Datum) lfirst(lc);
+ i++;
+ }
- index_close(indexDesc, NoLock);
+ /* Construct the json[] array Datum. */
+ get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign);
+ json_array_datum = PointerGetDatum(construct_array(json_datum_array,
+ num_conflicts,
+ JSONOID,
+ typlen,
+ typbyval,
+ typalign));
+ pfree(json_datum_array);
+ pfree(json_null_array);
+
+ return json_array_datum;
+}
- return index_value;
+/*
+ * prepare_conflict_log_tuple
+ *
+ * This routine prepares a tuple detailing a conflict encountered during
+ * logical replication. The prepared tuple will be stored in
+ * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * conflict log table by calling InsertConflictLogTuple.
+ */
+static void
+prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ ConflictType conflict_type,
+ TupleTableSlot *searchslot,
+ List *conflicttuples,
+ TupleTableSlot *remoteslot)
+{
+ Datum values[MAX_CONFLICT_ATTR_NUM] = {0};
+ bool nulls[MAX_CONFLICT_ATTR_NUM] = {0};
+ int attno;
+ char *remote_origin = NULL;
+ MemoryContext oldctx;
+
+ Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+
+ /* Populate the values and nulls arrays. */
+ attno = 0;
+ values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel));
+
+ values[attno++] =
+ CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+
+ values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel));
+
+ values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+
+ if (TransactionIdIsValid(remote_xid))
+ values[attno++] = TransactionIdGetDatum(remote_xid);
+ else
+ nulls[attno++] = true;
+
+ values[attno++] = LSNGetDatum(remote_final_lsn);
+
+ if (remote_commit_ts > 0)
+ values[attno++] = TimestampTzGetDatum(remote_commit_ts);
+ else
+ nulls[attno++] = true;
+
+ if (replorigin_session_origin != InvalidRepOriginId)
+ replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+ if (remote_origin != NULL)
+ values[attno++] = CStringGetTextDatum(remote_origin);
+ else
+ nulls[attno++] = true;
+
+ if (!TupIsNull(searchslot))
+ {
+ Oid replica_index = GetRelationIdentityOrPK(rel);
+
+ /*
+ * If the table has a valid replica identity index, build the index
+ * json datum from key value. Otherwise, construct it from the
+ * complete tuple in REPLICA IDENTITY FULL cases.
+ */
+ if (OidIsValid(replica_index))
+ values[attno++] = tuple_table_slot_to_indextup_json(estate, rel,
+ replica_index,
+ searchslot);
+ else
+ values[attno++] = tuple_table_slot_to_json_datum(searchslot);
+ }
+ else
+ nulls[attno++] = true;
+
+ if (!TupIsNull(remoteslot))
+ values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+ else
+ nulls[attno++] = true;
+
+ values[attno] = build_local_conflicts_json_array(estate, rel,
+ conflict_type,
+ conflicttuples);
+
+ Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ MyLogicalRepWorker->conflict_log_tuple =
+ heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+ MemoryContextSwitchTo(oldctx);
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3991e1495d4..bc7e1d9ebde 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -477,6 +477,7 @@ retry:
worker->oldest_nonremovable_xid = retain_dead_tuples
? MyReplicationSlot->data.xmin
: InvalidTransactionId;
+ worker->conflict_log_tuple = NULL;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 718408bb599..cf1008c93dc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
static List *on_commit_wakeup_workers_subids = NIL;
bool in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId remote_xid = InvalidTransactionId;
+TimestampTz remote_commit_ts = 0;
/* fields valid only when processing streamed transaction */
static bool in_streamed_transaction = false;
@@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s)
set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
+ remote_commit_ts = begin_data.committime;
+ remote_xid = begin_data.xid;
maybe_start_skipping_changes(begin_data.final_lsn);
@@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s)
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
+ remote_xid = stream_xid;
+ remote_final_lsn = InvalidXLogRecPtr;
+ remote_commit_ts = 0;
+
if (!TransactionIdIsValid(stream_xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -5609,6 +5617,28 @@ start_apply(XLogRecPtr origin_startpos)
pgstat_report_subscription_error(MySubscription->oid,
MyLogicalRepWorker->type);
+ /*
+ * Insert any pending conflict log tuple under a new transaction.
+ */
+ if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+ {
+ Relation conflictlogrel;
+ ConflictLogDest dest;
+
+ StartTransactionCommand();
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /* Open conflict log table and insert the tuple. */
+ conflictlogrel = GetConflictLogTableInfo(&dest);
+ if (ValidateConflictLogTable(conflictlogrel))
+ InsertConflictLogTuple(conflictlogrel);
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+ table_close(conflictlogrel, RowExclusiveLock);
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ }
+
PG_RE_THROW();
}
}
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index e722df2d015..694e0ba26ee 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -132,7 +132,6 @@ static const ConflictLogColumnDef ConflictLogSchema[] =
{.attname = "local_conflicts",.atttypid = JSONARRAYOID}
};
-/* Define the count using the array size */
#define MAX_CONFLICT_ATTR_NUM (sizeof(ConflictLogSchema) / sizeof(ConflictLogSchema[0]))
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
@@ -145,4 +144,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
List *conflicttuples);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern Relation GetConflictLogTableInfo(ConflictLogDest *log_dest);
+extern void InsertConflictLogTuple(Relation conflictlogrel);
+extern bool ValidateConflictLogTable(Relation rel);
#endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f081619f151..8ff556cc558 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -101,6 +101,9 @@ typedef struct LogicalRepWorker
*/
TransactionId oldest_nonremovable_xid;
+ /* A conflict log tuple that is prepared but not yet inserted. */
+ HeapTuple conflict_log_tuple;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
extern PGDLLIMPORT List *table_states_not_ready;
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId remote_xid;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
Oid subid, Oid relid,
diff --git a/src/test/subscription/t/037_conflict_dest.pl b/src/test/subscription/t/037_conflict_dest.pl
new file mode 100644
index 00000000000..ceccd74b34a
--- /dev/null
+++ b/src/test/subscription/t/037_conflict_dest.pl
@@ -0,0 +1,190 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test conflicts in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);"
+);
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE conf_tab (a int PRIMARY key, b int UNIQUE, c int UNIQUE);");
+
+$node_subscriber->safe_psql(
+ 'postgres', qq[
+ CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int, c int, unique(a,b)) PARTITION BY RANGE (a);
+ CREATE TABLE conf_tab_2_p1 PARTITION OF conf_tab_2 FOR VALUES FROM (MINVALUE) TO (100);
+]);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_tab FOR TABLE conf_tab, conf_tab_2");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION sub_tab
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION pub_tab WITH (conflict_log_destination=table)");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+###############################################################################
+# Test conflict insertion into the internal conflict log table
+###############################################################################
+
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (10, 10, 10);");
+
+# Get the internally generated table name
+my $subid = $node_subscriber->safe_psql('postgres',
+ "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';");
+my $conflict_table = "pg_conflict.conflict_log_table_$subid";
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (10, 20, 30);");
+
+# Wait for the conflict to be logged
+my $log_check = $node_subscriber->poll_query_until('postgres',
+ "SELECT count(*) > 0 FROM $conflict_table;");
+
+is($log_check, 1, 'Conflict was successfully logged to the internal table');
+
+my $json_query = qq[
+ SELECT string_agg((unnested.j::json)->'key'->>'a', ',')
+ FROM (
+ SELECT unnest(local_conflicts) AS j
+ FROM $conflict_table
+ ) AS unnested;
+];
+
+my $all_keys = $node_subscriber->safe_psql('postgres', $json_query);
+
+# Verify that '10' is present in the resulting string
+like($all_keys, qr/10/,
+ 'Verified that key 10 exists in the local_conflicts log');
+
+pass('Conflict type and data successfully validated in internal table');
+
+# Final cleanup for subsequent bidirectional tests in the script
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+###############################################################################
+# Test Case: update_missing
+###############################################################################
+
+# Sync a row, then delete it locally on subscriber
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (50, 50, 50);");
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres', "DELETE FROM conf_tab WHERE a = 50;");
+
+# Trigger conflict by updating that row on publisher
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab SET b = 500 WHERE a = 50;");
+
+# Wait for the apply worker to detect the missing row and log it
+$node_subscriber->poll_query_until('postgres',
+ "SELECT count(*) > 0 FROM $conflict_table WHERE conflict_type = 'update_missing';"
+) or die "Timed out waiting for update_missing conflict";
+
+my $upd_miss_check = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM $conflict_table WHERE conflict_type = 'update_missing';"
+);
+is($upd_miss_check, 1,
+ 'Verified update_missing conflict logged to internal table');
+
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+###############################################################################
+# Test Case: insert_exists (via secondary unique index)
+###############################################################################
+
+# 1. Subscriber has a row with b=100
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (100, 100, 100);");
+
+# 2. Publisher inserts a NEW PK (101) but a DUPLICATE 'b' (100)
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (101, 100, 101);");
+
+# 3. Verify it appears as 'insert_exists' in your log table
+$node_subscriber->poll_query_until('postgres',
+ "SELECT count(*) > 0 FROM $conflict_table WHERE conflict_type = 'insert_exists' AND local_conflicts::text LIKE '%100%';"
+) or die "Timed out waiting for secondary index insert_exists conflict";
+
+pass('Logged insert_exists triggered by secondary unique index violation');
+
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+###############################################################################
+# CASE 3: Switching Destination to 'log' (Server Log Verification)
+###############################################################################
+
+# Switch destination
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub_tab SET (conflict_log_destination = 'all');");
+
+$node_subscriber->safe_psql('postgres', "DELETE FROM $conflict_table;");
+# Trigger a conflict for server log (insert_exists)
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (600, 600, 600);");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab VALUES (600, 700, 700);");
+
+# Wait for table log
+$node_subscriber->poll_query_until('postgres',
+ "SELECT count(*) > 0 FROM $conflict_table;")
+ or die "Timed out waiting for insert_exists conflict";
+
+# Check subscriber server log
+my $log_found = $node_subscriber->wait_for_log(
+ qr/conflict detected on relation "public.conf_tab": conflict=insert_exists/
+);
+ok($log_found, 'Conflict correctly directed to server stderr log');
+
+# Verify table count DID NOT increase for this conflict
+my $table_check = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM $conflict_table WHERE local_conflicts::text LIKE '%600%';"
+);
+is($table_check, 1, 'Table log was bypassed when destination set to log');
+
+done_testing();
--
2.43.0