v1-0001-Approach-1-Zero-commit_ts-origin-on-replication-o.patch
application/x-patch
Filename: v1-0001-Approach-1-Zero-commit_ts-origin-on-replication-o.patch
Type: application/x-patch
Part: 2
From 9ad2b5f8a4bbcd133c013229c93039176516bb8b Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Fri, 8 May 2026 14:55:12 +0530
Subject: [PATCH v1] Approach 1 Zero commit_ts origin on replication origin
drop
When a replication origin is dropped, scan the commit_ts data and set
nodeid = InvalidReplOriginId on every entry that records the freed
roident. This ensures that after DROP/CREATE SUBSCRIPTION reuse the
roident, the apply worker sees nodeid=0 != current_origin for rows
written by the old subscription and reports an origin_differs
conflict rather than silently skipping it.
---
src/backend/access/transam/commit_ts.c | 62 ++++++++++++++++++++++++
src/backend/replication/logical/origin.c | 8 +++
src/include/access/commit_ts.h | 1 +
src/test/subscription/t/029_on_error.pl | 2 +-
4 files changed, 72 insertions(+), 1 deletion(-)
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index 9e6fd5d4657..6851f21e810 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -271,6 +271,68 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
&entry, SizeOfCommitTimestampEntry);
}
+/*
+ * Zero the nodeid field on every commit_ts SLRU entry that records the given
+ * origin. Called when a replication origin is dropped so that rows stamped
+ * with the freed roident appear to have no origin; a future apply worker
+ * that reuses the same roident will then see nodeid=0 != current_origin and
+ * report a conflict via the ordinary origin-differs check.
+ *
+ * Does nothing when track_commit_timestamp is off.
+ */
+void
+InvalidateCommitTsOrigin(ReplOriginId origin)
+{
+ TransactionId oldest;
+ TransactionId newest;
+ int64 firstpage;
+ int64 lastpage;
+
+ if (!track_commit_timestamp)
+ return;
+
+ oldest = TransamVariables->oldestCommitTsXid;
+ newest = TransamVariables->newestCommitTsXid;
+
+ if (!TransactionIdIsValid(oldest))
+ return;
+
+ firstpage = TransactionIdToCTsPage(oldest);
+ lastpage = TransactionIdToCTsPage(newest);
+
+ for (int64 pageno = firstpage; pageno <= lastpage; pageno++)
+ {
+ LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
+ int slotno;
+ bool modified = false;
+ TransactionId xid = (TransactionId)
+ (pageno * COMMIT_TS_XACTS_PER_PAGE);
+
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
+
+ for (int i = 0; i < COMMIT_TS_XACTS_PER_PAGE; i++)
+ {
+ CommitTimestampEntry entry;
+ char *entryptr = CommitTsCtl->shared->page_buffer[slotno]
+ + SizeOfCommitTimestampEntry * i;
+
+ memcpy(&entry, entryptr, SizeOfCommitTimestampEntry);
+ if (entry.nodeid == origin)
+ {
+ entry.nodeid = InvalidReplOriginId;
+ memcpy(entryptr, &entry, SizeOfCommitTimestampEntry);
+ modified = true;
+ }
+ }
+
+ if (modified)
+ CommitTsCtl->shared->page_dirty[slotno] = true;
+
+ LWLockRelease(lock);
+ }
+}
+
/*
* Interrogate the commit timestamp of a transaction.
*
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index c9dfb094c2b..30d38cd30d3 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -71,6 +71,7 @@
#include <sys/stat.h>
#include "access/genam.h"
+#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
@@ -490,6 +491,13 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
replorigin_state_clear(roident, nowait);
+ /*
+ * Zero any commit_ts entries stamped with this origin so that a future
+ * subscription reusing the same roident sees nodeid=0 != current_origin
+ * and reports a conflict via the ordinary origin-differs check.
+ */
+ InvalidateCommitTsOrigin(roident);
+
/*
* Now, we can delete the catalog entry.
*/
diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h
index 825ccda90ed..400de80fac0 100644
--- a/src/include/access/commit_ts.h
+++ b/src/include/access/commit_ts.h
@@ -37,6 +37,7 @@ extern void TruncateCommitTs(TransactionId oldestXact);
extern void SetCommitTsLimit(TransactionId oldestXact,
TransactionId newestXact);
extern void AdvanceOldestCommitTsXid(TransactionId oldestXact);
+extern void InvalidateCommitTsOrigin(ReplOriginId origin);
extern int committssyncfiletag(const FileTag *ftag, char *path);
diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl
index 7d68759b6cd..c7c5c16d9f3 100644
--- a/src/test/subscription/t/029_on_error.pl
+++ b/src/test/subscription/t/029_on_error.pl
@@ -30,7 +30,7 @@ sub test_skip_lsn
# ERROR with its CONTEXT when retrieving this information.
my $contents = slurp_file($node_subscriber->logfile, $offset);
$contents =~
- qr/conflict detected on relation "public.tbl".*\n.*DETAIL:.* Could not apply remote change.*\n.*Key already exists in unique index "tbl_pkey", modified by .*origin.* in transaction \d+ at .*: key .*, local row .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
+ qr/conflict detected on relation "public.tbl".*\n.*DETAIL:.* Could not apply remote change.*\n.*Key already exists in unique index "tbl_pkey", modified (?:locally|by .*) in transaction \d+ at .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
or die "could not get error-LSN";
my $lsn = $1;
--
2.50.1 (Apple Git-155)