v1-0001-Approach-2-Detect-origin-id-reuse-in-origin-diffe.patch
application/x-patch
Filename: v1-0001-Approach-2-Detect-origin-id-reuse-in-origin-diffe.patch
Type: application/x-patch
Part: 0
From 5abf86e31601e19036dd0ba6da129b8a2563ce19 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Wed, 13 May 2026 22:24:56 +0530
Subject: [PATCH v1 1/2] Approach 2 Detect origin-id reuse in origin differs
conflict checksp
Add a 'rocreated' timestamptz column to pg_replication_origin, set at
repliorigin creation.
In the apply worker's UPDATE/DELETE conflict checks, when the local
row's origin roident matches the current subscription's roident (which
previously meant 'no conflict'), additionally verify that the local
row's commit_ts > ro_created. If commit_ts <= ro_created, the roident
was reused from a prior DROP/CREATE SUBSCRIPTION and the row was
written by a different subscription; report the conflict.
---
doc/src/sgml/catalogs.sgml | 9 ++++
src/backend/replication/logical/origin.c | 58 ++++++++++++++++++++-
src/backend/replication/logical/worker.c | 50 ++++++++++++++++--
src/include/catalog/pg_replication_origin.h | 1 +
src/include/replication/origin.h | 1 +
5 files changed, 113 insertions(+), 6 deletions(-)
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 4b474c13917..f4819594bdc 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7360,6 +7360,15 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l
origin.
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>rocreated</structfield> <type>timestamptz</type>
+ </para>
+ <para>
+ Time at which this replication origin was created.
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index c9dfb094c2b..c90382af6a1 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -144,6 +144,14 @@ typedef struct ReplicationState
* Lock protecting remote_lsn and local_lsn.
*/
LWLock lock;
+
+ /*
+ * Wall-clock time when this origin was created (from
+ * pg_replication_origin.rocreated). Used to detect roident reuse: if a
+ * local row's commit_ts <= ro_created, the row was written by a prior
+ * subscription that held this ID.
+ */
+ TimestampTz ro_created;
} ReplicationState;
/*
@@ -359,6 +367,8 @@ replorigin_create(const char *roname)
values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
values[Anum_pg_replication_origin_roname - 1] = roname_d;
+ values[Anum_pg_replication_origin_rocreated - 1] =
+ TimestampTzGetDatum(GetCurrentTimestamp());
tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
CatalogTupleInsert(rel, tuple);
@@ -839,6 +849,7 @@ StartupReplicationOrigin(void)
/* copy data to shared memory */
replication_states[last_state].roident = disk_state.roident;
+ replication_states[last_state].ro_created = DT_NOBEGIN;
replication_states[last_state].remote_lsn = disk_state.remote_lsn;
last_state++;
@@ -1134,6 +1145,24 @@ ReplicationOriginExitCleanup(int code, Datum arg)
replorigin_session_reset_internal();
}
+/*
+ * Return the creation timestamp of the current session's replication origin,
+ * or DT_NOBEGIN if no session origin is active or the value has not been
+ * loaded yet.
+ *
+ * Used by the apply worker to detect roident reuse during conflict checking:
+ * if a local row's commit_ts <= the returned value, the row was written by a
+ * prior subscription that happened to hold the same origin ID.
+ */
+TimestampTz
+replorigin_get_creation_time(void)
+{
+ if (session_replication_state == NULL)
+ return DT_NOBEGIN;
+
+ return session_replication_state->ro_created;
+}
+
/*
* Setup a replication origin in the shared memory struct if it doesn't
* already exist and cache access to the specific ReplicationSlot so the
@@ -1263,9 +1292,9 @@ replorigin_session_setup(ReplOriginId node, int acquired_by)
Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
session_replication_state->roident = node;
+ session_replication_state->ro_created = DT_NOBEGIN;
}
-
Assert(session_replication_state->roident != InvalidReplOriginId);
if (acquired_by == 0)
@@ -1289,6 +1318,33 @@ replorigin_session_setup(ReplOriginId node, int acquired_by)
/* probably this one is pointless */
ConditionVariableBroadcast(&session_replication_state->origin_cv);
+
+ /*
+ * Load the origin's creation time from the catalog if not already cached.
+ * The guard fires because ro_created is explicitly set to DT_NOBEGIN in
+ * the free-slot branch above and in StartupReplicationOrigin(). It also
+ * fires for origins that pre-date this feature: those have NULL in the
+ * catalog, so isnull will be true and ro_created stays DT_NOBEGIN,
+ * meaning the roident-reuse check is safely skipped for them.
+ */
+ if (session_replication_state->ro_created == DT_NOBEGIN)
+ {
+ HeapTuple tuple;
+
+ tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(node));
+ if (HeapTupleIsValid(tuple))
+ {
+ bool isnull;
+ Datum d;
+
+ d = SysCacheGetAttr(REPLORIGIDENT, tuple,
+ Anum_pg_replication_origin_rocreated,
+ &isnull);
+ if (!isnull)
+ session_replication_state->ro_created = DatumGetTimestampTz(d);
+ ReleaseSysCache(tuple);
+ }
+ }
}
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dd6fc38a41e..e0b6c021dfa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2905,6 +2905,41 @@ apply_handle_update(StringInfo s)
end_replication_step();
}
+/*
+ * Check whether the tuple's origin roident was reused by a new subscription.
+ * Returns true if a conflict should be reported despite matching roidents.
+ *
+ * When origins match (localorigin == current origin), the same numeric roident
+ * may have been recycled after a DROP/CREATE SUBSCRIPTION cycle. If the local
+ * row's commit_ts is at or before the current origin's creation time, the row
+ * was written by an earlier subscription, not the current one.
+ *
+ * Uses <= rather than < so that a row committed at the exact same microsecond
+ * as the origin was created is treated as belonging to the prior subscription;
+ * a false positive (spurious conflict) is safer than a false negative here.
+ */
+static inline bool
+IsRoidentReused(ReplOriginId localorigin, TimestampTz localts)
+{
+ TimestampTz origin_created;
+
+ if (localorigin == InvalidReplOriginId)
+ return false;
+
+ origin_created = replorigin_get_creation_time();
+
+ /*
+ * DT_NOBEGIN means no session origin is active, ro_created has not been
+ * loaded yet, or the origin pre-dates this feature (NULL in catalog) --
+ * in all cases skip the reuse check. localts == 0 when
+ * track_commit_timestamp is off; no timestamp to compare.
+ */
+ if (origin_created == DT_NOBEGIN || localts == 0)
+ return false;
+
+ return (localts <= origin_created);
+}
+
/*
* Workhorse for apply_handle_update()
* relinfo is for the relation we're actually updating in
@@ -2947,7 +2982,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
*/
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin, &conflicttuple.ts) &&
- conflicttuple.origin != replorigin_xact_state.origin)
+ (conflicttuple.origin != replorigin_xact_state.origin ||
+ IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
{
TupleTableSlot *newslot;
@@ -2989,7 +3025,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
&conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
- conflicttuple.origin != replorigin_xact_state.origin)
+ (conflicttuple.origin != replorigin_xact_state.origin ||
+ IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
type = CT_UPDATE_DELETED;
else
type = CT_UPDATE_MISSING;
@@ -3142,7 +3179,8 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
*/
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin, &conflicttuple.ts) &&
- conflicttuple.origin != replorigin_xact_state.origin)
+ (conflicttuple.origin != replorigin_xact_state.origin ||
+ IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
{
conflicttuple.slot = localslot;
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
@@ -3484,7 +3522,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
&conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
- conflicttuple.origin != replorigin_xact_state.origin)
+ (conflicttuple.origin != replorigin_xact_state.origin ||
+ IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
type = CT_UPDATE_DELETED;
else
type = CT_UPDATE_MISSING;
@@ -3510,7 +3549,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
- conflicttuple.origin != replorigin_xact_state.origin)
+ (conflicttuple.origin != replorigin_xact_state.origin ||
+ IsRoidentReused(conflicttuple.origin, conflicttuple.ts)))
{
TupleTableSlot *newslot;
diff --git a/src/include/catalog/pg_replication_origin.h b/src/include/catalog/pg_replication_origin.h
index 565d71ad0b3..d6079ca4844 100644
--- a/src/include/catalog/pg_replication_origin.h
+++ b/src/include/catalog/pg_replication_origin.h
@@ -51,6 +51,7 @@ CATALOG(pg_replication_origin,6000,ReplicationOriginRelationId) BKI_SHARED_RELAT
text roname BKI_FORCE_NOT_NULL;
#ifdef CATALOG_VARLEN /* further variable-length fields */
+ timestamptz rocreated BKI_FORCE_NOT_NULL;
#endif
} FormData_pg_replication_origin;
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index a69faf6eaaf..22bed238140 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -71,6 +71,7 @@ extern void replorigin_session_advance(XLogRecPtr remote_commit,
extern void replorigin_session_setup(ReplOriginId node, int acquired_by);
extern void replorigin_session_reset(void);
extern XLogRecPtr replorigin_session_get_progress(bool flush);
+extern TimestampTz replorigin_get_creation_time(void);
/* Per-transaction replication origin state manipulation */
extern void replorigin_xact_clear(bool clear_origin);
--
2.50.1 (Apple Git-155)