v14-0002-Implement-the-conflict-insertion-infrastructure-.patch
text/x-patch
Filename: v14-0002-Implement-the-conflict-insertion-infrastructure-.patch
Type: text/x-patch
Part: 1
From 4aa8a0520b706b247f850dcc9a05fdefb01826e5 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Sat, 20 Dec 2025 15:20:09 +0530
Subject: [PATCH v14 2/3] Implement the conflict insertion infrastructure into
the conflict log table
Note: A single remote tuple may conflict with multiple local tuples when conflict type
is CT_MULTIPLE_UNIQUE_CONFLICTS, so for handling this case we create a single row in
conflict log table with respect to each remote conflict tuple even if it conflicts with
multiple local tuples and we store the multiple conflict tuples as a single JSON array
element in format as
[ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
We can extract the elements of the local tuples from the conflict log table row
as given in below example.
SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid,
local_conflicts[1] ->> 'tuple' AS local_tuple
FROM myschema.conflict_log_history2;
remote_xid | relname | remote_origin | local_xid | local_tuple
------------+----------+---------------+-----------+---------------------
760 | test | pg_16406 | 771 | {"a":1,"b":10}
765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2}
---
src/backend/replication/logical/conflict.c | 619 +++++++++++++++++++--
src/backend/replication/logical/launcher.c | 1 +
src/backend/replication/logical/worker.c | 38 +-
src/include/replication/conflict.h | 3 +
src/include/replication/worker_internal.h | 7 +
5 files changed, 635 insertions(+), 33 deletions(-)
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 16695592265..5f753cd8042 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,22 @@
#include "postgres.h"
#include "access/commit_ts.h"
+#include "access/heapam.h"
#include "access/tableam.h"
+#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
+#include "funcapi.h"
#include "pgstat.h"
#include "replication/conflict.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/jsonb.h"
+
+#define MAX_LOCAL_CONFLICT_INFO_ATTRS 5
static const char *const ConflictTypeNames[] = {
[CT_INSERT_EXISTS] = "insert_exists",
@@ -50,8 +59,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *localslot,
TupleTableSlot *remoteslot,
Oid indexoid);
+static void build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull);
static char *build_index_value_desc(EState *estate, Relation localrel,
TupleTableSlot *slot, Oid indexoid);
+static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot);
+static Datum tuple_table_slot_to_indextup_json(EState *estate,
+ Relation localrel,
+ Oid replica_index,
+ TupleTableSlot *slot);
+static TupleDesc build_conflict_tupledesc(void);
+static Datum build_local_conflicts_json_array(EState *estate, Relation rel,
+ ConflictType conflict_type,
+ List *conflicttuples);
+static void prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ ConflictType conflict_type,
+ TupleTableSlot *searchslot,
+ List *conflicttuples,
+ TupleTableSlot *remoteslot);
/*
* Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -105,11 +133,19 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *remoteslot, List *conflicttuples)
{
- Relation localrel = relinfo->ri_RelationDesc;
- StringInfoData err_detail;
+ Relation localrel = relinfo->ri_RelationDesc;
+ StringInfoData err_detail;
+ ConflictLogDest dest;
+ Relation conflictlogrel;
initStringInfo(&err_detail);
+ /*
+ * Get both the conflict log destination and the opened conflict log
+ * relation for insertion.
+ */
+ conflictlogrel = GetConflictLogTableInfo(&dest);
+
/* Form errdetail message by combining conflicting tuples information. */
foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
errdetail_apply_conflict(estate, relinfo, type, searchslot,
@@ -120,15 +156,69 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
conflicttuple->ts,
&err_detail);
+ /* Insert to table if destination is 'table' or 'all' */
+ if (conflictlogrel)
+ {
+ Assert(dest == CONFLICT_LOG_DEST_TABLE ||
+ dest == CONFLICT_LOG_DEST_ALL);
+
+ if (ValidateConflictLogTable(conflictlogrel))
+ {
+ /*
+ * Prepare the conflict log tuple. If the error level is below
+ * ERROR, insert it immediately. Otherwise, defer the insertion to
+ * a new transaction after the current one aborts, ensuring the
+ * insertion of the log tuple is not rolled back.
+ */
+ prepare_conflict_log_tuple(estate,
+ relinfo->ri_RelationDesc,
+ conflictlogrel,
+ type,
+ searchslot,
+ conflicttuples,
+ remoteslot);
+ if (elevel < ERROR)
+ InsertConflictLogTuple(conflictlogrel);
+ }
+ else
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("conflict log table \"%s.%s\" structure changed, skipping insertion",
+ get_namespace_name(RelationGetNamespace(conflictlogrel)),
+ RelationGetRelationName(conflictlogrel)));
+
+ table_close(conflictlogrel, RowExclusiveLock);
+ }
+
pgstat_report_subscription_conflict(MySubscription->oid, type);
- ereport(elevel,
- errcode_apply_conflict(type),
- errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
- get_namespace_name(RelationGetNamespace(localrel)),
- RelationGetRelationName(localrel),
- ConflictTypeNames[type]),
- errdetail_internal("%s", err_detail.data));
+ /* Decide what detail to show in server logs. */
+ if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL)
+ {
+ /* Standard reporting with full internal details. */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail_internal("%s", err_detail.data));
+ }
+ else
+ {
+ /*
+ * 'table' only: Report the error msg but omit raw tuple data from
+ * server logs since it's already captured in the internal table.
+ */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
+ get_namespace_name(RelationGetNamespace(localrel)),
+ RelationGetRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail("Conflict details logged to internal table with OID %u.",
+ MySubscription->conflictrelid));
+ }
}
/*
@@ -162,6 +252,142 @@ InitConflictIndexes(ResultRelInfo *relInfo)
relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
}
+/*
+ * GetConflictLogTableInfo
+ *
+ * Fetches conflict logging metadata from the cached MySubscription pointer.
+ * Sets the destination enum in *log_dest and, if applicable, opens and
+ * returns the relation handle for the internal log table.
+ */
+Relation
+GetConflictLogTableInfo(ConflictLogDest *log_dest)
+{
+ Oid conflictlogrelid;
+ Relation conflictlogrel = NULL;
+
+ /*
+ * Convert the text log destination to the internal enum. MySubscription
+ * already contains the data from pg_subscription.
+ */
+ *log_dest = GetLogDestination(MySubscription->logdestination);
+ conflictlogrelid = MySubscription->conflictrelid;
+
+ /* If destination is 'log' only, no table to open. */
+ if (*log_dest == CONFLICT_LOG_DEST_LOG)
+ return NULL;
+
+ Assert(OidIsValid(conflictlogrelid));
+
+ conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock);
+
+ /* Conflict log table is dropped or not accessible. */
+ if (conflictlogrel == NULL)
+ ereport(WARNING,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("conflict log table with OID %u does not exist",
+ conflictlogrelid)));
+
+ return conflictlogrel;
+}
+
+/*
+ * InsertConflictLogTuple
+ *
+ * Insert conflict log tuple into the conflict log table. It uses
+ * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple
+ * inserted into the conflict log table.
+ */
+void
+InsertConflictLogTuple(Relation conflictlogrel)
+{
+ int options = HEAP_INSERT_NO_LOGICAL;
+
+ /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
+ Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+
+ heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+ GetCurrentCommandId(true), options, NULL);
+
+ /* Free conflict log tuple. */
+ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+}
+
+/*
+ * ValidateConflictLogTable - Validate conflict log table
+ *
+ * Validate whether the conflict log table is still suitable for considering as
+ * conflict log table.
+ */
+bool
+ValidateConflictLogTable(Relation rel)
+{
+ Relation pg_attribute;
+ HeapTuple atup;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ Form_pg_attribute attForm;
+ int attcnt = 0;
+ bool tbl_ok = true;
+
+ /*
+ * Check whether the table definition including its column names, data
+ * types, and column ordering meets the requirements for conflict log
+ * table.
+ */
+ pg_attribute = table_open(AttributeRelationId, AccessShareLock);
+ ScanKeyInit(&scankey,
+ Anum_pg_attribute_attrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+
+ scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true,
+ SnapshotSelf, 1, &scankey);
+
+ /* We only need to check up to MAX_CONFLICT_ATTR_NUM attributes */
+ while (HeapTupleIsValid(atup = systable_getnext(scan)))
+ {
+ const ConflictLogColumnDef *expected;
+ int schema_idx;
+
+ attForm = (Form_pg_attribute) GETSTRUCT(atup);
+
+ /* Skip system columns and dropped columns */
+ if (attForm->attnum < 1 || attForm->attisdropped)
+ continue;
+
+ attcnt++;
+
+ /* attnum 1 corresponds to index 0 in ConflictLogSchema */
+ schema_idx = attForm->attnum - 1;
+
+ /* Check against the central schema definition */
+ if (schema_idx >= MAX_CONFLICT_ATTR_NUM)
+ {
+ /* Found an extra column beyond the required set */
+ tbl_ok = false;
+ break;
+ }
+
+ expected = &ConflictLogSchema[schema_idx];
+
+ if (attForm->atttypid != expected->atttypid ||
+ strcmp(NameStr(attForm->attname), expected->attname) != 0)
+ {
+ tbl_ok = false;
+ break;
+ }
+ }
+
+ systable_endscan(scan);
+ table_close(pg_attribute, AccessShareLock);
+
+ if (attcnt != MAX_CONFLICT_ATTR_NUM || !tbl_ok)
+ return false;
+
+ return true;
+}
+
/*
* Add SQLSTATE error code to the current conflict report.
*/
@@ -472,6 +698,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
return tuple_value.data;
}
+/*
+ * Helper function to extract the "raw" index key Datums and their null flags
+ * from a TupleTableSlot, given an already open index descriptor.
+ * This is the reusable core logic.
+ */
+static void
+build_index_datums_from_slot(EState *estate, Relation localrel,
+ TupleTableSlot *slot,
+ Relation indexDesc, Datum *values,
+ bool *isnull)
+{
+ TupleTableSlot *tableslot = slot;
+
+ /*
+ * If the slot is a virtual slot, copy it into a heap tuple slot as
+ * FormIndexDatum only works with heap tuple slots.
+ */
+ if (TTS_IS_VIRTUAL(slot))
+ {
+ /* Slot is created within the EState's tuple table */
+ tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+ tableslot = ExecCopySlot(tableslot, slot);
+ }
+
+ /*
+ * Initialize ecxt_scantuple for potential use in FormIndexDatum
+ */
+ GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+ /* Form the index datums */
+ FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values,
+ isnull);
+}
+
/*
* Helper functions to construct a string describing the contents of an index
* entry. See BuildIndexValueDescription for details.
@@ -487,41 +747,336 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
Relation indexDesc;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
- TupleTableSlot *tableslot = slot;
- if (!tableslot)
+ if (!slot)
return NULL;
Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
indexDesc = index_open(indexoid, NoLock);
- /*
- * If the slot is a virtual slot, copy it into a heap tuple slot as
- * FormIndexDatum only works with heap tuple slots.
- */
- if (TTS_IS_VIRTUAL(slot))
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+
+ index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+ index_close(indexDesc, NoLock);
+
+ return index_value;
+}
+
+/*
+ * tuple_table_slot_to_json_datum
+ *
+ * Helper function to convert a TupleTableSlot to Jsonb.
+ */
+static Datum
+tuple_table_slot_to_json_datum(TupleTableSlot *slot)
+{
+ HeapTuple tuple;
+ Datum datum;
+ Datum json;
+
+ Assert(slot != NULL);
+
+ tuple = ExecCopySlotHeapTuple(slot);
+ datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+
+ json = DirectFunctionCall1(row_to_json, datum);
+ heap_freetuple(tuple);
+
+ return json;
+}
+
+/*
+ * tuple_table_slot_to_indextup_json
+ *
+ * Fetch replica identity key from the tuple table slot and convert into a
+ * jsonb datum.
+ */
+static Datum
+tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
+ Oid indexid, TupleTableSlot *slot)
+{
+ Relation indexDesc;
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ HeapTuple tuple;
+ TupleDesc tupdesc;
+ Datum datum;
+
+ Assert(slot != NULL);
+
+ Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true));
+
+ indexDesc = index_open(indexid, NoLock);
+
+ build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+ isnull);
+ tupdesc = RelationGetDescr(indexDesc);
+
+ /* Bless the tupdesc so it can be looked up by row_to_json. */
+ BlessTupleDesc(tupdesc);
+
+ /* Form the replica identity tuple. */
+ tuple = heap_form_tuple(tupdesc, values, isnull);
+ datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+ index_close(indexDesc, NoLock);
+
+ /* Convert to a JSONB datum. */
+ return DirectFunctionCall1(row_to_json, datum);
+}
+
+/*
+ * Initialize the tuple descriptor for local conflict info.
+ */
+static TupleDesc
+build_conflict_tupledesc(void)
+{
+ TupleDesc tupdesc;
+ int attno = 1;
+
+ tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "xid",
+ XIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "commit_ts",
+ TIMESTAMPTZOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "origin",
+ TEXTOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "key",
+ JSONOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) attno, "tuple",
+ JSONOID, -1, 0);
+
+ BlessTupleDesc(tupdesc);
+
+ Assert(attno == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ return tupdesc;
+}
+
+/*
+ * Builds the local conflicts JSONB array column from the list of
+ * ConflictTupleInfo objects.
+ *
+ * Example output structure:
+ * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
+ */
+static Datum
+build_local_conflicts_json_array(EState *estate, Relation rel,
+ ConflictType conflict_type,
+ List *conflicttuples)
+{
+ ListCell *lc;
+ List *json_datums = NIL; /* List to hold the row_to_json results (type json) */
+ Datum *json_datum_array;
+ bool *json_null_array;
+ Datum json_array_datum;
+ int num_conflicts;
+ int i;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+ TupleDesc tupdesc;
+
+ /* Build local conflicts tuple descriptor. */
+ tupdesc = build_conflict_tupledesc();
+
+ /* Process local conflict tuple list and prepare an array of JSON. */
+ foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
{
- tableslot = table_slot_create(localrel, &estate->es_tupleTable);
- tableslot = ExecCopySlot(tableslot, slot);
+ Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS];
+ bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS];
+ char *origin_name = NULL;
+ HeapTuple tuple;
+ Datum json_datum;
+ int attno;
+
+ memset(values, 0, sizeof(Datum) * MAX_LOCAL_CONFLICT_INFO_ATTRS);
+ memset(nulls, 0, sizeof(bool) * MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ attno = 0;
+ values[attno++] = TransactionIdGetDatum(conflicttuple->xmin);
+
+ if (conflicttuple->ts)
+ values[attno++] = TimestampTzGetDatum(conflicttuple->ts);
+ else
+ nulls[attno++] = true;
+
+ if (conflicttuple->origin != InvalidRepOriginId)
+ replorigin_by_oid(conflicttuple->origin, true, &origin_name);
+
+ /* Store empty string if origin name for the tuple is NULL. */
+ if (origin_name != NULL)
+ values[attno++] = CStringGetTextDatum(origin_name);
+ else
+ nulls[attno++] = true;
+
+ /*
+ * Add the conflicting key values in the case of a unique constraint
+ * violation.
+ */
+ if (conflict_type == CT_INSERT_EXISTS ||
+ conflict_type == CT_UPDATE_EXISTS ||
+ conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS)
+ {
+ Oid indexoid = conflicttuple->indexoid;
+
+ Assert(OidIsValid(indexoid) && conflicttuple->slot &&
+ CheckRelationOidLockedByMe(indexoid, RowExclusiveLock,
+ true));
+ values[attno++] =
+ tuple_table_slot_to_indextup_json(estate, rel,
+ indexoid,
+ conflicttuple->slot);
+ }
+ else
+ nulls[attno++] = true;
+
+ /* Convert conflicting tuple to JSON datum. */
+ if (conflicttuple->slot)
+ values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot);
+ else
+ nulls[attno] = true;
+
+ Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+
+ json_datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+ /*
+ * Build the higher level JSON datum in format described in function
+ * header.
+ */
+ json_datum = DirectFunctionCall1(row_to_json, json_datum);
+
+ /* Done with the temporary tuple. */
+ heap_freetuple(tuple);
+
+ /* Add to the array element. */
+ json_datums = lappend(json_datums, (void *) json_datum);
}
- /*
- * Initialize ecxt_scantuple for potential use in FormIndexDatum when
- * index expressions are present.
- */
- GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+ num_conflicts = list_length(json_datums);
- /*
- * The values/nulls arrays passed to BuildIndexValueDescription should be
- * the results of FormIndexDatum, which are the "raw" input to the index
- * AM.
- */
- FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+ json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum));
+ json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool));
- index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+ i = 0;
+ foreach(lc, json_datums)
+ {
+ json_datum_array[i] = (Datum) lfirst(lc);
+ i++;
+ }
- index_close(indexDesc, NoLock);
+ /* Construct the json[] array Datum. */
+ get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign);
+ json_array_datum = PointerGetDatum(construct_array(json_datum_array,
+ num_conflicts,
+ JSONOID,
+ typlen,
+ typbyval,
+ typalign));
+ pfree(json_datum_array);
+ pfree(json_null_array);
+
+ return json_array_datum;
+}
- return index_value;
+/*
+ * prepare_conflict_log_tuple
+ *
+ * This routine prepares a tuple detailing a conflict encountered during
+ * logical replication. The prepared tuple will be stored in
+ * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * conflict log table by calling InsertConflictLogTuple.
+ */
+static void
+prepare_conflict_log_tuple(EState *estate, Relation rel,
+ Relation conflictlogrel,
+ ConflictType conflict_type,
+ TupleTableSlot *searchslot,
+ List *conflicttuples,
+ TupleTableSlot *remoteslot)
+{
+ Datum values[MAX_CONFLICT_ATTR_NUM];
+ bool nulls[MAX_CONFLICT_ATTR_NUM];
+ int attno;
+ char *remote_origin = NULL;
+ MemoryContext oldctx;
+
+ Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+
+ /* Initialize values and nulls arrays. */
+ memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM);
+ memset(nulls, 0, sizeof(bool) * MAX_CONFLICT_ATTR_NUM);
+
+ /* Populate the values and nulls arrays. */
+ attno = 0;
+ values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel));
+
+ values[attno++] =
+ CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+
+ values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel));
+
+ values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+
+ if (TransactionIdIsValid(remote_xid))
+ values[attno++] = TransactionIdGetDatum(remote_xid);
+ else
+ nulls[attno++] = true;
+
+ values[attno++] = LSNGetDatum(remote_final_lsn);
+
+ if (remote_commit_ts > 0)
+ values[attno++] = TimestampTzGetDatum(remote_commit_ts);
+ else
+ nulls[attno++] = true;
+
+ if (replorigin_session_origin != InvalidRepOriginId)
+ replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+ if (remote_origin != NULL)
+ values[attno++] = CStringGetTextDatum(remote_origin);
+ else
+ nulls[attno++] = true;
+
+ if (!TupIsNull(searchslot))
+ {
+ Oid replica_index = GetRelationIdentityOrPK(rel);
+
+ /*
+ * If the table has a valid replica identity index, build the index
+ * json datum from key value. Otherwise, construct it from the complete
+ * tuple in REPLICA IDENTITY FULL cases.
+ */
+ if (OidIsValid(replica_index))
+ values[attno++] = tuple_table_slot_to_indextup_json(estate, rel,
+ replica_index,
+ searchslot);
+ else
+ values[attno++] = tuple_table_slot_to_json_datum(searchslot);
+ }
+ else
+ nulls[attno++] = true;
+
+ if (!TupIsNull(remoteslot))
+ values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+ else
+ nulls[attno++] = true;
+
+ values[attno] = build_local_conflicts_json_array(estate, rel,
+ conflict_type,
+ conflicttuples);
+
+ Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ MyLogicalRepWorker->conflict_log_tuple =
+ heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+ MemoryContextSwitchTo(oldctx);
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3991e1495d4..bc7e1d9ebde 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -477,6 +477,7 @@ retry:
worker->oldest_nonremovable_xid = retain_dead_tuples
? MyReplicationSlot->data.xmin
: InvalidTransactionId;
+ worker->conflict_log_tuple = NULL;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fc64476a9ef..ccd1b2c6e81 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
static List *on_commit_wakeup_workers_subids = NIL;
bool in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId remote_xid = InvalidTransactionId;
+TimestampTz remote_commit_ts = 0;
/* fields valid only when processing streamed transaction */
static bool in_streamed_transaction = false;
@@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s)
set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
+ remote_commit_ts = begin_data.committime;
+ remote_xid = begin_data.xid;
maybe_start_skipping_changes(begin_data.final_lsn);
@@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s)
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
+ remote_xid = stream_xid;
+ remote_final_lsn = InvalidXLogRecPtr;
+ remote_commit_ts = 0;
+
if (!TransactionIdIsValid(stream_xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -5609,6 +5617,34 @@ start_apply(XLogRecPtr origin_startpos)
pgstat_report_subscription_error(MySubscription->oid,
MyLogicalRepWorker->type);
+ /*
+ * Insert any pending conflict log tuple under a new transaction.
+ */
+ if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+ {
+ Relation conflictlogrel;
+ ConflictLogDest dest;
+
+ StartTransactionCommand();
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /* Open conflict log table and insert the tuple. */
+ conflictlogrel = GetConflictLogTableInfo(&dest);
+ if (ValidateConflictLogTable(conflictlogrel))
+ InsertConflictLogTuple(conflictlogrel);
+ else
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Conflict log table \"%s.%s\" structure changed, skipping insertion",
+ get_namespace_name(RelationGetNamespace(conflictlogrel)),
+ RelationGetRelationName(conflictlogrel)));
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+ table_close(conflictlogrel, RowExclusiveLock);
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ }
+
PG_RE_THROW();
}
}
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 70f8744b381..226fed9e383 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -139,4 +139,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
List *conflicttuples);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern Relation GetConflictLogTableInfo(ConflictLogDest *log_dest);
+extern void InsertConflictLogTuple(Relation conflictlogrel);
+extern bool ValidateConflictLogTable(Relation rel);
#endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f081619f151..711c04c7297 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -101,6 +101,9 @@ typedef struct LogicalRepWorker
*/
TransactionId oldest_nonremovable_xid;
+ /* A conflict log tuple that is prepared but not yet inserted. */
+ HeapTuple conflict_log_tuple;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
extern PGDLLIMPORT List *table_states_not_ready;
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId remote_xid;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
Oid subid, Oid relid,
--
2.43.0