v32-0001-Signal-backends-running-pg_sync_replication_slot.patch
application/octet-stream
Filename: v32-0001-Signal-backends-running-pg_sync_replication_slot.patch
Type: application/octet-stream
Part: 0
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v32-0001
Subject: Signal backends running pg_sync_replication_slots() during promotion.
| File | + | − |
|---|---|---|
| src/backend/replication/logical/slotsync.c | 90 | 51 |
From 9f61c459a646ff9ea1f3c016af6c3f2941159dcf Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 10 Dec 2025 13:14:18 +1100
Subject: [PATCH v32 1/2] Signal backends running pg_sync_replication_slots()
during promotion.
Previously, during promotion, only the slot synchronization worker was
interrupted to shutdown for promotion. That meant a backend
that performs slot synchronization via the pg_sync_replication_slots()
SQL function was not signalled at all because its PID was not
recorded in the slot-sync context.
This patch changes behaviour to:
1. Store the backend PID in SlotSyncCtxStruct so the backend performing
slot synchronization can be signalled.
2. On promotion, send SIGUSR1 to the recorded PID - either
the slot-sync worker or any backend currently syncing slots.
3. Backend invoking pg_sync_replication_slots() also calls
ProcessSlotSyncInterrupts() to handle promotion signal as well any
configuration changes that might result in stopping of
synchronization.
This patch also acts as a base for a larger patch that improves
pg_sync_replication_slots() to wait for slots to be persisted before
exiting.
Author: Ajin Cherian <itsajin@gmail.com>
Reviewed-by: Shveta Malik <shveta.malik@gmail.com>
Discussion: https://www.postgresql.org/message-id/CAFPTHDZAA%2BgWDntpa5ucqKKba41%3DtXmoXqN3q4rpjO9cdxgQrw%40mail.gmail.com
---
src/backend/replication/logical/slotsync.c | 141 +++++++++++++--------
1 file changed, 90 insertions(+), 51 deletions(-)
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 9f92c21237e..1e7b131c0f8 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -71,9 +71,12 @@
/*
* Struct for sharing information to control slot synchronization.
*
- * The slot sync worker's pid is needed by the startup process to shut it
- * down during promotion. The startup process shuts down the slot sync worker
- * and also sets stopSignaled=true to handle the race condition when the
+ * The pid is either the slot sync worker's pid or the backend's pid running
+ * the SQL function pg_sync_replication_slots(). When the startup process sets
+ * stopSignaled during promotion, it uses this pid to wake up the currently
+ * synchronizing process so that the process can immediately stop its
+ * synchronizing work on seeing stopSignaled set.
+ * Setting stopSignaled is also used to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker. If stopSignaled is set, the worker will exit in such a
* case. The SQL function pg_sync_replication_slots() will also error out if
@@ -1195,10 +1198,11 @@ ValidateSlotSyncParams(int elevel)
}
/*
- * Re-read the config file.
+ * Re-read the config file for slot synchronization.
+ *
+ * Exit or throw errors if relevant GUCs have changed depending on whether
+ * called from slotsync worker or from SQL function pg_sync_replication_slots()
*
- * Exit if any of the slot sync GUCs have changed. The postmaster will
- * restart it.
*/
static void
slotsync_reread_config(void)
@@ -1209,8 +1213,11 @@ slotsync_reread_config(void)
bool old_hot_standby_feedback = hot_standby_feedback;
bool conninfo_changed;
bool primary_slotname_changed;
+ bool worker = AmLogicalSlotSyncWorkerProcess();
+ bool parameter_changed = false;
- Assert(sync_replication_slots);
+ if (worker)
+ Assert(sync_replication_slots);
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
@@ -1222,32 +1229,58 @@ slotsync_reread_config(void)
if (old_sync_replication_slots != sync_replication_slots)
{
- ereport(LOG,
- /* translator: %s is a GUC variable name */
- errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled", "sync_replication_slots"));
- proc_exit(0);
+ if (worker)
+ {
+ ereport(LOG,
+ /* translator: %s is a GUC variable name */
+ errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled",
+ "sync_replication_slots"));
+
+ proc_exit(0);
+ }
+
+ parameter_changed = true;
}
if (conninfo_changed ||
primary_slotname_changed ||
(old_hot_standby_feedback != hot_standby_feedback))
{
- ereport(LOG,
- errmsg("replication slot synchronization worker will restart because of a parameter change"));
- /*
- * Reset the last-start time for this worker so that the postmaster
- * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- */
- SlotSyncCtx->last_start_time = 0;
+ if (worker)
+ {
+ ereport(LOG,
+ errmsg("replication slot synchronization worker will restart because of a parameter change"));
- proc_exit(0);
+ /*
+ * Reset the last-start time for this worker so that the
+ * postmaster can restart it without waiting for
+ * SLOTSYNC_RESTART_INTERVAL_SEC.
+ */
+ SlotSyncCtx->last_start_time = 0;
+
+ proc_exit(0);
+ }
+
+ parameter_changed = true;
+ }
+
+ /*
+ * If we have reached here with a parameter change, we must be running in
+ * SQL function, emit error in such a case.
+ */
+ if (parameter_changed)
+ {
+ Assert(!worker);
+ ereport(ERROR,
+ errmsg("replication slot synchronization will stop because of a parameter change"));
}
}
/*
- * Interrupt handler for main loop of slot sync worker.
+ * Interrupt handler for main loop of slot sync worker and
+ * SQL function pg_sync_replication_slots().
*/
static void
ProcessSlotSyncInterrupts(void)
@@ -1256,10 +1289,20 @@ ProcessSlotSyncInterrupts(void)
if (SlotSyncCtx->stopSignaled)
{
- ereport(LOG,
- errmsg("replication slot synchronization worker is shutting down because promotion is triggered"));
+ if (AmLogicalSlotSyncWorkerProcess())
+ {
+ ereport(LOG,
+ errmsg("replication slot synchronization worker is shutting down because promotion is triggered"));
- proc_exit(0);
+ proc_exit(0);
+ }
+ else
+ {
+ /* For SQL function */
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot synchronization will stop because promotion is triggered"));
+ }
}
if (ConfigReloadPending)
@@ -1366,25 +1409,10 @@ wait_for_slot_activity(bool some_slot_updated)
* Otherwise, advertise that a sync is in progress.
*/
static void
-check_and_set_sync_info(pid_t worker_pid)
+check_and_set_sync_info(pid_t sync_process_pid)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* The worker pid must not be already assigned in SlotSyncCtx */
- Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Emit an error if startup process signaled the slot sync machinery to
- * stop. See comments atop SlotSyncCtxStruct.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
- }
-
if (SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1393,13 +1421,16 @@ check_and_set_sync_info(pid_t worker_pid)
errmsg("cannot synchronize replication slots concurrently"));
}
+ /* The pid must not be already assigned in SlotSyncCtx */
+ Assert(SlotSyncCtx->pid == InvalidPid);
+
SlotSyncCtx->syncing = true;
/*
* Advertise the required PID so that the startup process can kill the
- * slot sync worker on promotion.
+ * slot sync process on promotion.
*/
- SlotSyncCtx->pid = worker_pid;
+ SlotSyncCtx->pid = sync_process_pid;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1414,6 +1445,7 @@ reset_syncing_flag(void)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
+ SlotSyncCtx->pid = InvalidPid;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
@@ -1651,16 +1683,18 @@ update_synced_slots_inactive_since(void)
}
/*
- * Shut down the slot sync worker.
+ * Shut down slot synchronization.
*
- * This function sends signal to shutdown slot sync worker, if required. It
- * also waits till the slot sync worker has exited or
+ * This function sets stopSignaled=true and wakes up the slot sync process
+ * (either worker or backend running SQL function pg_sync_replication_slots())
+ * so that worker can exit or SQL function pg_sync_replication_slots() can
+ * finish. It also waits till the slot sync worker has exited or
* pg_sync_replication_slots() has finished.
*/
void
ShutDownSlotSync(void)
{
- pid_t worker_pid;
+ pid_t sync_process_pid;
SpinLockAcquire(&SlotSyncCtx->mutex);
@@ -1677,16 +1711,18 @@ ShutDownSlotSync(void)
return;
}
- worker_pid = SlotSyncCtx->pid;
+ sync_process_pid = SlotSyncCtx->pid;
SpinLockRelease(&SlotSyncCtx->mutex);
/*
- * Signal slotsync worker if it was still running. The worker will stop
- * upon detecting that the stopSignaled flag is set to true.
+ * Signal slotsync worker or the backend process running
+ * pg_sync_replication_slots(), if either one is active.
+ * The process will stop upon detecting that the stopSignaled
+ * flag is set to true.
*/
- if (worker_pid != InvalidPid)
- kill(worker_pid, SIGUSR1);
+ if (sync_process_pid!= InvalidPid)
+ kill(sync_process_pid, SIGUSR1);
/* Wait for slot sync to end */
for (;;)
@@ -1835,7 +1871,10 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
- check_and_set_sync_info(InvalidPid);
+ check_and_set_sync_info(MyProcPid);
+
+ /* Check for interrupts and config changes */
+ ProcessSlotSyncInterrupts();
validate_remote_info(wrconn);
--
2.47.3