v1-0001-Approach-1-Zero-commit_ts-origin-on-replication-o.patch

application/x-patch

Filename: v1-0001-Approach-1-Zero-commit_ts-origin-on-replication-o.patch
Type: application/x-patch
Part: 2
Message: Improve conflict detection when replication origins are reused
From 9ad2b5f8a4bbcd133c013229c93039176516bb8b Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Fri, 8 May 2026 14:55:12 +0530
Subject: [PATCH v1] Approach 1 Zero commit_ts origin on replication origin
 drop

When a replication origin is dropped, scan the commit_ts data and set
nodeid = InvalidReplOriginId on every entry that records the freed
roident. This ensures that after DROP/CREATE SUBSCRIPTION reuse the
roident, the apply worker sees nodeid=0 != current_origin for rows
written by the old subscription and reports an origin_differs
conflict rather than silently skipping it.
---
 src/backend/access/transam/commit_ts.c   | 62 ++++++++++++++++++++++++
 src/backend/replication/logical/origin.c |  8 +++
 src/include/access/commit_ts.h           |  1 +
 src/test/subscription/t/029_on_error.pl  |  2 +-
 4 files changed, 72 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index 9e6fd5d4657..6851f21e810 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -271,6 +271,68 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
 		   &entry, SizeOfCommitTimestampEntry);
 }
 
+/*
+ * Zero the nodeid field on every commit_ts SLRU entry that records the given
+ * origin.  Called when a replication origin is dropped so that rows stamped
+ * with the freed roident appear to have no origin; a future apply worker
+ * that reuses the same roident will then see nodeid=0 != current_origin and
+ * report a conflict via the ordinary origin-differs check.
+ *
+ * Does nothing when track_commit_timestamp is off.
+ */
+void
+InvalidateCommitTsOrigin(ReplOriginId origin)
+{
+	TransactionId oldest;
+	TransactionId newest;
+	int64		firstpage;
+	int64		lastpage;
+
+	if (!track_commit_timestamp)
+		return;
+
+	oldest = TransamVariables->oldestCommitTsXid;
+	newest = TransamVariables->newestCommitTsXid;
+
+	if (!TransactionIdIsValid(oldest))
+		return;
+
+	firstpage = TransactionIdToCTsPage(oldest);
+	lastpage = TransactionIdToCTsPage(newest);
+
+	for (int64 pageno = firstpage; pageno <= lastpage; pageno++)
+	{
+		LWLock	   *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
+		int			slotno;
+		bool		modified = false;
+		TransactionId xid = (TransactionId)
+		(pageno * COMMIT_TS_XACTS_PER_PAGE);
+
+		LWLockAcquire(lock, LW_EXCLUSIVE);
+		slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
+
+		for (int i = 0; i < COMMIT_TS_XACTS_PER_PAGE; i++)
+		{
+			CommitTimestampEntry entry;
+			char	   *entryptr = CommitTsCtl->shared->page_buffer[slotno]
+				+ SizeOfCommitTimestampEntry * i;
+
+			memcpy(&entry, entryptr, SizeOfCommitTimestampEntry);
+			if (entry.nodeid == origin)
+			{
+				entry.nodeid = InvalidReplOriginId;
+				memcpy(entryptr, &entry, SizeOfCommitTimestampEntry);
+				modified = true;
+			}
+		}
+
+		if (modified)
+			CommitTsCtl->shared->page_dirty[slotno] = true;
+
+		LWLockRelease(lock);
+	}
+}
+
 /*
  * Interrogate the commit timestamp of a transaction.
  *
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index c9dfb094c2b..30d38cd30d3 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -71,6 +71,7 @@
 #include <sys/stat.h>
 
 #include "access/genam.h"
+#include "access/commit_ts.h"
 #include "access/htup_details.h"
 #include "access/table.h"
 #include "access/xact.h"
@@ -490,6 +491,13 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
 
 	replorigin_state_clear(roident, nowait);
 
+	/*
+	 * Zero any commit_ts entries stamped with this origin so that a future
+	 * subscription reusing the same roident sees nodeid=0 != current_origin
+	 * and reports a conflict via the ordinary origin-differs check.
+	 */
+	InvalidateCommitTsOrigin(roident);
+
 	/*
 	 * Now, we can delete the catalog entry.
 	 */
diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h
index 825ccda90ed..400de80fac0 100644
--- a/src/include/access/commit_ts.h
+++ b/src/include/access/commit_ts.h
@@ -37,6 +37,7 @@ extern void TruncateCommitTs(TransactionId oldestXact);
 extern void SetCommitTsLimit(TransactionId oldestXact,
 							 TransactionId newestXact);
 extern void AdvanceOldestCommitTsXid(TransactionId oldestXact);
+extern void InvalidateCommitTsOrigin(ReplOriginId origin);
 
 extern int	committssyncfiletag(const FileTag *ftag, char *path);
 
diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl
index 7d68759b6cd..c7c5c16d9f3 100644
--- a/src/test/subscription/t/029_on_error.pl
+++ b/src/test/subscription/t/029_on_error.pl
@@ -30,7 +30,7 @@ sub test_skip_lsn
 	# ERROR with its CONTEXT when retrieving this information.
 	my $contents = slurp_file($node_subscriber->logfile, $offset);
 	$contents =~
-	  qr/conflict detected on relation "public.tbl".*\n.*DETAIL:.* Could not apply remote change.*\n.*Key already exists in unique index "tbl_pkey", modified by .*origin.* in transaction \d+ at .*: key .*, local row .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
+	  qr/conflict detected on relation "public.tbl".*\n.*DETAIL:.* Could not apply remote change.*\n.*Key already exists in unique index "tbl_pkey", modified (?:locally|by .*) in transaction \d+ at .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
 	  or die "could not get error-LSN";
 	my $lsn = $1;
 
-- 
2.50.1 (Apple Git-155)