v37-0006-Review-comment-fixes-for-Implement-the-conflict-.patch
application/octet-stream
Filename: v37-0006-Review-comment-fixes-for-Implement-the-conflict-.patch
Type: application/octet-stream
Part: 5
From 7a57abe0edaed7efdbbeefeb34fa974395bbcf16 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Wed, 20 May 2026 10:13:28 +0530
Subject: [PATCH v37 06/10] Review comment fixes for Implement the conflict
insertion infrastructure for the conflict log table
Review comment fixes for Implement the conflict
insertion infrastructure for the conflict log table
---
src/backend/replication/logical/conflict.c | 91 ++++++++++++++--------
src/backend/replication/logical/worker.c | 32 ++------
src/include/replication/conflict.h | 1 +
src/test/subscription/t/030_origin.pl | 4 +-
src/test/subscription/t/035_conflicts.pl | 4 +-
5 files changed, 69 insertions(+), 63 deletions(-)
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 76bd6980d22..d9682ea4651 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -48,7 +48,6 @@ const char *const ConflictLogDestNames[] = {
StaticAssertDecl(lengthof(ConflictLogDestNames) == 3,
"ConflictLogDestNames length mismatch");
-
/* Structure to hold metadata for one column of the conflict log table */
typedef struct ConflictLogColumnDef
{
@@ -56,6 +55,18 @@ typedef struct ConflictLogColumnDef
Oid atttypid; /* Data type OID */
} ConflictLogColumnDef;
+/* Schema for the elements within the 'local_conflicts' JSON array */
+static const ConflictLogColumnDef LocalConflictSchema[] =
+{
+ { .attname = "xid", .atttypid = XIDOID },
+ { .attname = "commit_ts", .atttypid = TIMESTAMPTZOID },
+ { .attname = "origin", .atttypid = TEXTOID },
+ { .attname = "key", .atttypid = JSONOID },
+ { .attname = "tuple", .atttypid = JSONOID }
+};
+
+#define NUM_LOCAL_CONFLICT_ATTRS lengthof(LocalConflictSchema)
+
/*
* Schema definition for conflict log tables.
*
@@ -91,17 +102,7 @@ static const char *const ConflictTypeNames[] = {
[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
-/* Schema for the elements within the 'local_conflicts' JSON array */
-static const ConflictLogColumnDef LocalConflictSchema[] =
-{
- { .attname = "xid", .atttypid = XIDOID },
- { .attname = "commit_ts", .atttypid = TIMESTAMPTZOID },
- { .attname = "origin", .atttypid = TEXTOID },
- { .attname = "key", .atttypid = JSONOID },
- { .attname = "tuple", .atttypid = JSONOID }
-};
-#define MAX_LOCAL_CONFLICT_INFO_ATTRS lengthof(LocalConflictSchema)
static int errcode_apply_conflict(ConflictType type);
static void errdetail_apply_conflict(EState *estate,
@@ -376,7 +377,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
Relation localrel = relinfo->ri_RelationDesc;
ConflictLogDest dest;
Relation conflictlogrel;
- bool log_dest_clt = false;
+ bool log_dest_clt;
bool log_dest_logfile;
pgstat_report_subscription_conflict(MySubscription->oid, type);
@@ -387,10 +388,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
*/
conflictlogrel = GetConflictLogDestAndTable(&dest);
- if (dest == CONFLICT_LOG_DEST_TABLE || dest == CONFLICT_LOG_DEST_ALL)
- log_dest_clt = true;
- if (dest == CONFLICT_LOG_DEST_LOG || dest == CONFLICT_LOG_DEST_ALL)
- log_dest_logfile = true;
+ log_dest_clt = CONFLICTS_LOGGED_TO_TABLE(dest);
+ log_dest_logfile = CONFLICTS_LOGGED_TO_FILE(dest);
/* Insert to table if requested. */
if (log_dest_clt)
@@ -422,9 +421,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
*/
ereport(elevel,
errcode_apply_conflict(type),
- errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
- get_namespace_name(RelationGetNamespace(localrel)),
- RelationGetRelationName(localrel),
+ errmsg("conflict detected on relation \"%s\": conflict=%s",
+ RelationGetQualifiedRelationName(localrel),
ConflictTypeNames[type]),
errdetail("Conflict details are logged to the conflict log table: %s",
RelationGetRelationName(conflictlogrel)));
@@ -453,14 +451,41 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
/* Standard reporting with full internal details. */
ereport(elevel,
errcode_apply_conflict(type),
- errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
- get_namespace_name(RelationGetNamespace(localrel)),
- RelationGetRelationName(localrel),
+ errmsg("conflict detected on relation \"%s\": conflict=%s",
+ RelationGetQualifiedRelationName(localrel),
ConflictTypeNames[type]),
errdetail_internal("%s", err_detail.data));
}
}
+/*
+ * Insert any pending conflict log tuple under a new transaction.
+ */
+void
+ProcessPendingConflictLogTuple(void)
+{
+ Relation conflictlogrel;
+ ConflictLogDest dest;
+
+ /* Nothing to do */
+ if (MyLogicalRepWorker->conflict_log_tuple == NULL)
+ return;
+
+ StartTransactionCommand();
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ /* Open conflict log table and insert the tuple */
+ conflictlogrel = GetConflictLogDestAndTable(&dest);
+ Assert(CONFLICTS_LOGGED_TO_TABLE(dest));
+
+ InsertConflictLogTuple(conflictlogrel);
+
+ table_close(conflictlogrel, RowExclusiveLock);
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+}
+
/*
* Find all unique indexes to check for a conflict and store them into
* ResultRelInfo.
@@ -511,7 +536,7 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
*log_dest = GetLogDestination(MySubscription->conflictlogdest);
/* Quick exit if a conflict log table was not requested. */
- if (*log_dest == CONFLICT_LOG_DEST_LOG)
+ if (!CONFLICTS_LOGGED_TO_TABLE(*log_dest))
return NULL;
conflictlogrelid = MySubscription->conflictlogrelid;
@@ -531,13 +556,11 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
void
InsertConflictLogTuple(Relation conflictlogrel)
{
- int options = HEAP_INSERT_NO_LOGICAL;
-
/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
- GetCurrentCommandId(true), options, NULL);
+ GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL);
/* Free conflict log tuple. */
heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
@@ -1119,9 +1142,9 @@ build_conflict_tupledesc(void)
{
TupleDesc tupdesc;
- tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+ tupdesc = CreateTemplateTupleDesc(NUM_LOCAL_CONFLICT_ATTRS);
- for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++)
+ for (int i = 0; i < NUM_LOCAL_CONFLICT_ATTRS; i++)
TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1),
LocalConflictSchema[i].attname,
LocalConflictSchema[i].atttypid,
@@ -1162,8 +1185,8 @@ build_local_conflicts_json_array(EState *estate, Relation rel,
/* Process local conflict tuple list and prepare an array of JSON. */
foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
{
- Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
- bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0};
+ Datum values[NUM_LOCAL_CONFLICT_ATTRS] = {0};
+ bool nulls[NUM_LOCAL_CONFLICT_ATTRS] = {0};
char *origin_name = NULL;
HeapTuple tuple;
Datum json_datum;
@@ -1213,7 +1236,7 @@ build_local_conflicts_json_array(EState *estate, Relation rel,
else
nulls[attno] = true;
- Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+ Assert(attno + 1 == NUM_LOCAL_CONFLICT_ATTRS);
tuple = heap_form_tuple(tupdesc, values, nulls);
@@ -1272,8 +1295,8 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
List *conflicttuples,
TupleTableSlot *remoteslot)
{
- Datum values[MAX_CONFLICT_ATTR_NUM] = {0};
- bool nulls[MAX_CONFLICT_ATTR_NUM] = {0};
+ Datum values[NUM_CONFLICT_ATTRS] = {0};
+ bool nulls[NUM_CONFLICT_ATTRS] = {0};
int attno;
char *remote_origin = NULL;
MemoryContext oldctx;
@@ -1339,7 +1362,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
conflict_type,
conflicttuples);
- Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+ Assert(attno + 1 == NUM_CONFLICT_ATTRS);
oldctx = MemoryContextSwitchTo(ApplyContext);
MyLogicalRepWorker->conflict_log_tuple =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 469451c736a..70ae38a7bd1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1766,15 +1766,15 @@ apply_handle_stream_start(StringInfo s)
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
- remote_xid = stream_xid;
- remote_final_lsn = InvalidXLogRecPtr;
- remote_commit_ts = 0;
-
if (!TransactionIdIsValid(stream_xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
+ remote_xid = stream_xid;
+ remote_final_lsn = InvalidXLogRecPtr;
+ remote_commit_ts = 0;
+
set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/* Try to allocate a worker for the streaming transaction. */
@@ -5674,27 +5674,7 @@ start_apply(XLogRecPtr origin_startpos)
*/
AbortOutOfAnyTransaction();
pgstat_report_subscription_error(MySubscription->oid);
-
- /*
- * Insert any pending conflict log tuple under a new transaction.
- */
- if (MyLogicalRepWorker->conflict_log_tuple != NULL)
- {
- Relation conflictlogrel;
- ConflictLogDest dest;
-
- StartTransactionCommand();
- PushActiveSnapshot(GetTransactionSnapshot());
-
- /* Open conflict log table and insert the tuple. */
- conflictlogrel = GetConflictLogDestAndTable(&dest);
- Assert(dest != CONFLICT_LOG_DEST_LOG);
- InsertConflictLogTuple(conflictlogrel);
- table_close(conflictlogrel, RowExclusiveLock);
-
- PopActiveSnapshot();
- CommitTransactionCommand();
- }
+ ProcessPendingConflictLogTuple();
PG_RE_THROW();
}
@@ -6069,6 +6049,8 @@ DisableSubscriptionAndExit(void)
*/
pgstat_report_subscription_error(MyLogicalRepWorker->subid);
+ ProcessPendingConflictLogTuple();
+
/* Disable the subscription */
StartTransactionCommand();
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 4de6d03755d..e64166fdb81 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -117,6 +117,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
TupleTableSlot *searchslot,
TupleTableSlot *remoteslot,
List *conflicttuples);
+extern void ProcessPendingConflictLogTuple(void);
extern void InitConflictIndexes(ResultRelInfo *relInfo);
extern Relation GetConflictLogDestAndTable(ConflictLogDest *log_dest);
extern void InsertConflictLogTuple(Relation conflictlogrel);
diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl
index 6bc6b7874c2..5f4d00bdd33 100644
--- a/src/test/subscription/t/030_origin.pl
+++ b/src/test/subscription/t/030_origin.pl
@@ -166,7 +166,7 @@ is($result, qq(32), 'The node_A data replicated to node_B');
$node_C->safe_psql('postgres', "UPDATE $tab SET a = 33 WHERE a = 32;");
$node_B->wait_for_log(
- qr/conflict detected on relation "public.$tab_unquoted": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./
+ qr/conflict detected on relation "public.$tab": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./
);
$node_B->safe_psql('postgres', "DELETE FROM $tab;");
@@ -182,7 +182,7 @@ is($result, qq(33), 'The node_A data replicated to node_B');
$node_C->safe_psql('postgres', "DELETE FROM $tab WHERE a = 33;");
$node_B->wait_for_log(
- qr/conflict detected on relation "public.$tab_unquoted": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/
+ qr/conflict detected on relation "public.$tab": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/
);
# The remaining tests no longer test conflict detection.
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index 05c2179b9a8..4f3880e5b83 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -85,11 +85,11 @@ $node_subscriber->wait_for_log(
$log_offset);
# Verify the contents of the Conflict Log Table (CLT)
-# This section ensures that the clt contains the expected
+# This section ensures that the CLT contains the expected
# type and specific key data.
my $subid = $node_subscriber->safe_psql('postgres',
"SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';");
-my $clt = "pg_conflict.pg_conflict_log_$subid";
+my $clt = "pg_conflict.pg_conflict_log_for_subid_$subid";
# Wait for the conflict to be logged in the CLT
my $log_check = $node_subscriber->poll_query_until(
--
2.53.0