v1-0001-Approach-2-Detect-origin-id-reuse-in-origin-diffe.patch

application/x-patch

Filename: v1-0001-Approach-2-Detect-origin-id-reuse-in-origin-diffe.patch
Type: application/x-patch
Part: 0
Message: Improve conflict detection when replication origins are reused
From 5abf86e31601e19036dd0ba6da129b8a2563ce19 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Wed, 13 May 2026 22:24:56 +0530
Subject: [PATCH v1 1/2] Approach 2 Detect origin-id reuse in origin differs
 conflict checksp

Add a 'rocreated' timestamptz column to pg_replication_origin, set at
repliorigin creation.

In the apply worker's UPDATE/DELETE conflict checks, when the local
row's origin roident matches the current subscription's roident (which
previously meant 'no conflict'), additionally verify that the local
row's commit_ts > ro_created.  If commit_ts <= ro_created, the roident
was reused from a prior DROP/CREATE SUBSCRIPTION and the row was
written by a different subscription; report the conflict.
---
 doc/src/sgml/catalogs.sgml                  |  9 ++++
 src/backend/replication/logical/origin.c    | 58 ++++++++++++++++++++-
 src/backend/replication/logical/worker.c    | 50 ++++++++++++++++--
 src/include/catalog/pg_replication_origin.h |  1 +
 src/include/replication/origin.h            |  1 +
 5 files changed, 113 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 4b474c13917..f4819594bdc 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7360,6 +7360,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        origin.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>rocreated</structfield> <type>timestamptz</type>
+      </para>
+      <para>
+       Time at which this replication origin was created.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index c9dfb094c2b..c90382af6a1 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -144,6 +144,14 @@ typedef struct ReplicationState
 	 * Lock protecting remote_lsn and local_lsn.
 	 */
 	LWLock		lock;
+
+	/*
+	 * Wall-clock time when this origin was created (from
+	 * pg_replication_origin.rocreated).  Used to detect roident reuse: if a
+	 * local row's commit_ts <= ro_created, the row was written by a prior
+	 * subscription that held this ID.
+	 */
+	TimestampTz ro_created;
 } ReplicationState;
 
 /*
@@ -359,6 +367,8 @@ replorigin_create(const char *roname)
 
 			values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
 			values[Anum_pg_replication_origin_roname - 1] = roname_d;
+			values[Anum_pg_replication_origin_rocreated - 1] =
+				TimestampTzGetDatum(GetCurrentTimestamp());
 
 			tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 			CatalogTupleInsert(rel, tuple);
@@ -839,6 +849,7 @@ StartupReplicationOrigin(void)
 
 		/* copy data to shared memory */
 		replication_states[last_state].roident = disk_state.roident;
+		replication_states[last_state].ro_created = DT_NOBEGIN;
 		replication_states[last_state].remote_lsn = disk_state.remote_lsn;
 		last_state++;
 
@@ -1134,6 +1145,24 @@ ReplicationOriginExitCleanup(int code, Datum arg)
 	replorigin_session_reset_internal();
 }
 
+/*
+ * Return the creation timestamp of the current session's replication origin,
+ * or DT_NOBEGIN if no session origin is active or the value has not been
+ * loaded yet.
+ *
+ * Used by the apply worker to detect roident reuse during conflict checking:
+ * if a local row's commit_ts <= the returned value, the row was written by a
+ * prior subscription that happened to hold the same origin ID.
+ */
+TimestampTz
+replorigin_get_creation_time(void)
+{
+	if (session_replication_state == NULL)
+		return DT_NOBEGIN;
+
+	return session_replication_state->ro_created;
+}
+
 /*
  * Setup a replication origin in the shared memory struct if it doesn't
  * already exist and cache access to the specific ReplicationSlot so the
@@ -1263,9 +1292,9 @@ replorigin_session_setup(ReplOriginId node, int acquired_by)
 		Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
 		Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
 		session_replication_state->roident = node;
+		session_replication_state->ro_created = DT_NOBEGIN;
 	}
 
-
 	Assert(session_replication_state->roident != InvalidReplOriginId);
 
 	if (acquired_by == 0)
@@ -1289,6 +1318,33 @@ replorigin_session_setup(ReplOriginId node, int acquired_by)
 
 	/* probably this one is pointless */
 	ConditionVariableBroadcast(&session_replication_state->origin_cv);
+
+	/*
+	 * Load the origin's creation time from the catalog if not already cached.
+	 * The guard fires because ro_created is explicitly set to DT_NOBEGIN in
+	 * the free-slot branch above and in StartupReplicationOrigin().  It also
+	 * fires for origins that pre-date this feature: those have NULL in the
+	 * catalog, so isnull will be true and ro_created stays DT_NOBEGIN,
+	 * meaning the roident-reuse check is safely skipped for them.
+	 */
+	if (session_replication_state->ro_created == DT_NOBEGIN)
+	{
+		HeapTuple	tuple;
+
+		tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(node));
+		if (HeapTupleIsValid(tuple))
+		{
+			bool		isnull;
+			Datum		d;
+
+			d = SysCacheGetAttr(REPLORIGIDENT, tuple,
+								Anum_pg_replication_origin_rocreated,
+								&isnull);
+			if (!isnull)
+				session_replication_state->ro_created = DatumGetTimestampTz(d);
+			ReleaseSysCache(tuple);
+		}
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dd6fc38a41e..e0b6c021dfa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2905,6 +2905,41 @@ apply_handle_update(StringInfo s)
 	end_replication_step();
 }
 
+/*
+ * Check whether the tuple's origin roident was reused by a new subscription.
+ * Returns true if a conflict should be reported despite matching roidents.
+ *
+ * When origins match (localorigin == current origin), the same numeric roident
+ * may have been recycled after a DROP/CREATE SUBSCRIPTION cycle.  If the local
+ * row's commit_ts is at or before the current origin's creation time, the row
+ * was written by an earlier subscription, not the current one.
+ *
+ * Uses <= rather than < so that a row committed at the exact same microsecond
+ * as the origin was created is treated as belonging to the prior subscription;
+ * a false positive (spurious conflict) is safer than a false negative here.
+ */
+static inline bool
+IsRoidentReused(ReplOriginId localorigin, TimestampTz localts)
+{
+	TimestampTz origin_created;
+
+	if (localorigin == InvalidReplOriginId)
+		return false;
+
+	origin_created = replorigin_get_creation_time();
+
+	/*
+	 * DT_NOBEGIN means no session origin is active, ro_created has not been
+	 * loaded yet, or the origin pre-dates this feature (NULL in catalog) --
+	 * in all cases skip the reuse check.  localts == 0 when
+	 * track_commit_timestamp is off; no timestamp to compare.
+	 */
+	if (origin_created == DT_NOBEGIN || localts == 0)
+		return false;
+
+	return (localts <= origin_created);
+}
+
 /*
  * Workhorse for apply_handle_update()
  * relinfo is for the relation we're actually updating in
@@ -2947,7 +2982,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_xact_state.origin)
+			(conflicttuple.origin != replorigin_xact_state.origin ||
+			 IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
 		{
 			TupleTableSlot *newslot;
 
@@ -2989,7 +3025,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 									   &conflicttuple.xmin,
 									   &conflicttuple.origin,
 									   &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_xact_state.origin)
+			(conflicttuple.origin != replorigin_xact_state.origin ||
+			 IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
 			type = CT_UPDATE_DELETED;
 		else
 			type = CT_UPDATE_MISSING;
@@ -3142,7 +3179,8 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 		 */
 		if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 									&conflicttuple.origin, &conflicttuple.ts) &&
-			conflicttuple.origin != replorigin_xact_state.origin)
+			(conflicttuple.origin != replorigin_xact_state.origin ||
+			 IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
 		{
 			conflicttuple.slot = localslot;
 			ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
@@ -3484,7 +3522,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 												   &conflicttuple.xmin,
 												   &conflicttuple.origin,
 												   &conflicttuple.ts) &&
-						conflicttuple.origin != replorigin_xact_state.origin)
+						(conflicttuple.origin != replorigin_xact_state.origin ||
+						 IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
 						type = CT_UPDATE_DELETED;
 					else
 						type = CT_UPDATE_MISSING;
@@ -3510,7 +3549,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
 											&conflicttuple.origin,
 											&conflicttuple.ts) &&
-					conflicttuple.origin != replorigin_xact_state.origin)
+					(conflicttuple.origin != replorigin_xact_state.origin ||
+					 IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
 				{
 					TupleTableSlot *newslot;
 
diff --git a/src/include/catalog/pg_replication_origin.h b/src/include/catalog/pg_replication_origin.h
index 565d71ad0b3..d6079ca4844 100644
--- a/src/include/catalog/pg_replication_origin.h
+++ b/src/include/catalog/pg_replication_origin.h
@@ -51,6 +51,7 @@ CATALOG(pg_replication_origin,6000,ReplicationOriginRelationId) BKI_SHARED_RELAT
 	text		roname BKI_FORCE_NOT_NULL;
 
 #ifdef CATALOG_VARLEN			/* further variable-length fields */
+	timestamptz rocreated BKI_FORCE_NOT_NULL;
 #endif
 } FormData_pg_replication_origin;
 
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index a69faf6eaaf..22bed238140 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -71,6 +71,7 @@ extern void replorigin_session_advance(XLogRecPtr remote_commit,
 extern void replorigin_session_setup(ReplOriginId node, int acquired_by);
 extern void replorigin_session_reset(void);
 extern XLogRecPtr replorigin_session_get_progress(bool flush);
+extern TimestampTz replorigin_get_creation_time(void);
 
 /* Per-transaction replication origin state manipulation */
 extern void replorigin_xact_clear(bool clear_origin);
-- 
2.50.1 (Apple Git-155)