v2-0001-Refactor-replication-origin-state-reset-helpers.patch
application/octet-stream
Filename: v2-0001-Refactor-replication-origin-state-reset-helpers.patch
Type: application/octet-stream
Part: 0
From 496cc60401705a4512915db7ebf3358f7004014e Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <lic@highgo.com>
Date: Wed, 24 Dec 2025 09:17:27 +0800
Subject: [PATCH v2] Refactor replication origin state reset helpers
Factor out common logic for clearing per-transaction and per-session
replication origin state into dedicated helper functions.
This removes duplicated assignments of replorigin_session_origin,
replorigin_session_origin_lsn, and replorigin_session_origin_timestamp
across multiple call sites, and makes the intended scope of each reset
(clear per-transaction state vs. clear per-session state) explicit.
No functional change intended.
Author: Chao Li <lic@highgo.com>
---
.../replication/logical/applyparallelworker.c | 1 -
src/backend/replication/logical/origin.c | 20 +++++++++++-------
src/backend/replication/logical/tablesync.c | 5 -----
src/backend/replication/logical/worker.c | 15 ++++++-------
src/include/replication/origin.h | 21 +++++++++++++++++++
5 files changed, 39 insertions(+), 23 deletions(-)
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index a4aafcf5b6e..b05279e0809 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -962,7 +962,6 @@ ParallelApplyWorkerMain(Datum main_arg)
* origin which was already acquired by its leader process.
*/
replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
- replorigin_session_origin = originid;
CommitTransactionCommand();
/*
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 2380f369578..45d7bc5abc8 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1213,6 +1213,9 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
/* probably this one is pointless */
ConditionVariableBroadcast(&session_replication_state->origin_cv);
+
+ /* set local state too */
+ replorigin_session_origin = node;
}
/*
@@ -1233,6 +1236,9 @@ replorigin_session_reset(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("no replication origin is configured")));
+ /*
+ * Clear sessioin state in shared memory
+ */
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
session_replication_state->acquired_by = 0;
@@ -1242,6 +1248,11 @@ replorigin_session_reset(void)
LWLockRelease(ReplicationOriginLock);
ConditionVariableBroadcast(cv);
+
+ /*
+ * Clear local session state
+ */
+ replorigin_session_clear_state();
}
/*
@@ -1395,8 +1406,6 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
pid = PG_GETARG_INT32(1);
replorigin_session_setup(origin, pid);
- replorigin_session_origin = origin;
-
pfree(name);
PG_RETURN_VOID();
@@ -1412,10 +1421,6 @@ pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
replorigin_session_reset();
- replorigin_session_origin = InvalidRepOriginId;
- replorigin_session_origin_lsn = InvalidXLogRecPtr;
- replorigin_session_origin_timestamp = 0;
-
PG_RETURN_VOID();
}
@@ -1482,8 +1487,7 @@ pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
{
replorigin_check_prerequisites(true, false);
- replorigin_session_origin_lsn = InvalidXLogRecPtr;
- replorigin_session_origin_timestamp = 0;
+ replorigin_xact_clear_state();
PG_RETURN_VOID();
}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2522e372036..6ac467a9e19 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -323,9 +323,6 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
* This is needed to allow the origin to be dropped.
*/
replorigin_session_reset();
- replorigin_session_origin = InvalidRepOriginId;
- replorigin_session_origin_lsn = InvalidXLogRecPtr;
- replorigin_session_origin_timestamp = 0;
/*
* Drop the tablesync's origin tracking if exists.
@@ -1320,7 +1317,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
*/
originid = replorigin_by_name(originname, false);
replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
*origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
@@ -1407,7 +1403,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
/*
* If the user did not opt to run as the owner of the subscription
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 718408bb599..651045debee 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -627,7 +627,7 @@ static inline void reset_apply_error_context_info(void);
static TransApplyAction get_transaction_apply_action(TransactionId xid,
ParallelApplyWorkerInfo **winfo);
-static void replorigin_reset(int code, Datum arg);
+static void on_exit_clear_state(int code, Datum arg);
/*
* Form the origin name for the subscription.
@@ -5594,7 +5594,7 @@ start_apply(XLogRecPtr origin_startpos)
* transaction loss as that transaction won't be sent again by the
* server.
*/
- replorigin_reset(0, (Datum) 0);
+ replorigin_session_clear_state();
if (MySubscription->disableonerr)
DisableSubscriptionAndExit();
@@ -5652,7 +5652,6 @@ run_apply_worker(void)
if (!OidIsValid(originid))
originid = replorigin_create(originname);
replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
@@ -5865,18 +5864,16 @@ InitializeLogRepWorker(void)
* replication workers that set up origins and apply remote transactions
* are protected.
*/
- before_shmem_exit(replorigin_reset, (Datum) 0);
+ before_shmem_exit(on_exit_clear_state, (Datum) 0);
}
/*
- * Reset the origin state.
+ * Callback on exit to reset the origin state.
*/
static void
-replorigin_reset(int code, Datum arg)
+on_exit_clear_state(int code, Datum arg)
{
- replorigin_session_origin = InvalidRepOriginId;
- replorigin_session_origin_lsn = InvalidXLogRecPtr;
- replorigin_session_origin_timestamp = 0;
+ replorigin_session_clear_state();
}
/*
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index 2a73f6aa492..288f9ff658f 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -44,6 +44,27 @@ extern PGDLLIMPORT RepOriginId replorigin_session_origin;
extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
+/*
+ * Clear per-transaction replication origin state.
+ */
+static inline void
+replorigin_xact_clear_state(void)
+{
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;
+}
+
+/*
+ * Clear per-session replication origin state.
+ */
+static inline void
+replorigin_session_clear_state(void)
+{
+ replorigin_xact_clear_state();
+ replorigin_session_origin = InvalidRepOriginId;
+}
+
+
/* GUCs */
extern PGDLLIMPORT int max_active_replication_origins;
--
2.39.5 (Apple Git-154)