From 2764cc594b7b8dc947b6c66193691853de5e83f3 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 9 Dec 2025 21:11:31 +1100 Subject: [PATCH v31 1/2] Signal backends running pg_sync_replication_slots() during promotion. Previously, during promotion, only the slot synchronization worker was interrupted to shutdown for promotion. That meant backends that perform slot synchronization via the pg_sync_replication_slots() SQL function were not signalled at all because their PIDs were not recorded in the slot-sync context. This patch changes behaviour to: 1. Store the backend PID in SlotSyncCtxStruct so the backend performing slot synchronization can be signalled. 2. On promotion, send SIGUSR1 to the recorded PID - either the slot-sync worker or any backend currently syncing slots. 3. Backends invoking pg_sync_replication_slots() also calls ProcessSlotSyncInterrupts() to handle promotion signal as well any configuration changes that might result in stopping of synchronization. This patch also acts as a base for a larger patch that improves pg_sync_replication_slots() to wait for slots to be persisted before exiting. Author: Ajin Cherian Discussion: https://www.postgresql.org/message-id/CAFPTHDZAA%2BgWDntpa5ucqKKba41%3DtXmoXqN3q4rpjO9cdxgQrw%40mail.gmail.com --- src/backend/replication/logical/slotsync.c | 145 +++++++++++++-------- 1 file changed, 93 insertions(+), 52 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 7e3b4c4413e..327bb361d61 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -71,9 +71,12 @@ /* * Struct for sharing information to control slot synchronization. * - * The slot sync worker's pid is needed by the startup process to shut it - * down during promotion. The startup process shuts down the slot sync worker - * and also sets stopSignaled=true to handle the race condition when the + * The pid is either the slot sync worker's pid or the backend's pid running + * the SQL function pg_sync_replication_slots(). When the startup process sets + * stopSignaled during promotion, it uses this pid to wake up the currently + * synchronizing process so that the process can immediately stop its + * synchronizing work on seeing stopSignaled set. + * Setting stopSignaled is also used to handle the race condition when the * postmaster has not noticed the promotion yet and thus may end up restarting * the slot sync worker. If stopSignaled is set, the worker will exit in such a * case. The SQL function pg_sync_replication_slots() will also error out if @@ -1195,10 +1198,11 @@ ValidateSlotSyncParams(int elevel) } /* - * Re-read the config file. + * Re-read the config file for slot synchronization. + * + * Exit or throw errors if relevant GUCs have changed depending on whether + * called from slotsync worker or from SQL function pg_sync_replication_slots() * - * Exit if any of the slot sync GUCs have changed. The postmaster will - * restart it. */ static void slotsync_reread_config(void) @@ -1209,45 +1213,77 @@ slotsync_reread_config(void) bool old_hot_standby_feedback = hot_standby_feedback; bool conninfo_changed; bool primary_slotname_changed; + bool worker = AmLogicalSlotSyncWorkerProcess(); + bool parameter_changed = false; - Assert(sync_replication_slots); + if (worker) + Assert(sync_replication_slots); ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + pfree(old_primary_conninfo); pfree(old_primary_slotname); + /* Check for sync_replication_slots change */ if (old_sync_replication_slots != sync_replication_slots) { - ereport(LOG, - /* translator: %s is a GUC variable name */ - errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled", "sync_replication_slots")); - proc_exit(0); + if (worker) + { + ereport(LOG, + /* translator: %s is a GUC variable name */ + errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled", + "sync_replication_slots")); + + proc_exit(0); + } + + parameter_changed = true; } + /* Check for parameter changes common to both API and worker */ if (conninfo_changed || primary_slotname_changed || (old_hot_standby_feedback != hot_standby_feedback)) { - ereport(LOG, - errmsg("replication slot synchronization worker will restart because of a parameter change")); - /* - * Reset the last-start time for this worker so that the postmaster - * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. - */ - SlotSyncCtx->last_start_time = 0; + if (worker) + { + ereport(LOG, + errmsg("replication slot synchronization worker will restart because of a parameter change")); - proc_exit(0); + /* + * Reset the last-start time for this worker so that the + * postmaster can restart it without waiting for + * SLOTSYNC_RESTART_INTERVAL_SEC. + */ + SlotSyncCtx->last_start_time = 0; + + proc_exit(0); + } + + parameter_changed = true; + } + + /* + * If we have reached here with a parameter change, we must be running in + * SQL function, emit error in such a case. + */ + if (parameter_changed) + { + Assert(!worker); + ereport(ERROR, + errmsg("replication slot synchronization will stop because of a parameter change")); } } /* - * Interrupt handler for main loop of slot sync worker. + * Interrupt handler for main loop of slot sync worker and + * SQL function pg_sync_replication_slots(). */ static void ProcessSlotSyncInterrupts(void) @@ -1256,10 +1292,20 @@ ProcessSlotSyncInterrupts(void) if (SlotSyncCtx->stopSignaled) { - ereport(LOG, - errmsg("replication slot synchronization worker is shutting down because promotion is triggered")); + if (AmLogicalSlotSyncWorkerProcess()) + { + ereport(LOG, + errmsg("replication slot synchronization worker is shutting down because promotion is triggered")); - proc_exit(0); + proc_exit(0); + } + else + { + /* For SQL function */ + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot synchronization will stop because promotion is triggered")); + } } if (ConfigReloadPending) @@ -1366,25 +1412,10 @@ wait_for_slot_activity(bool some_slot_updated) * Otherwise, advertise that a sync is in progress. */ static void -check_and_set_sync_info(pid_t worker_pid) +check_and_set_sync_info(pid_t sync_process_pid) { SpinLockAcquire(&SlotSyncCtx->mutex); - /* The worker pid must not be already assigned in SlotSyncCtx */ - Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid); - - /* - * Emit an error if startup process signaled the slot sync machinery to - * stop. See comments atop SlotSyncCtxStruct. - */ - if (SlotSyncCtx->stopSignaled) - { - SpinLockRelease(&SlotSyncCtx->mutex); - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot synchronize replication slots when standby promotion is ongoing")); - } - if (SlotSyncCtx->syncing) { SpinLockRelease(&SlotSyncCtx->mutex); @@ -1393,13 +1424,16 @@ check_and_set_sync_info(pid_t worker_pid) errmsg("cannot synchronize replication slots concurrently")); } + /* The pid must not be already assigned in SlotSyncCtx */ + Assert(SlotSyncCtx->pid == InvalidPid); + SlotSyncCtx->syncing = true; /* * Advertise the required PID so that the startup process can kill the - * slot sync worker on promotion. + * slot sync process on promotion. */ - SlotSyncCtx->pid = worker_pid; + SlotSyncCtx->pid = sync_process_pid; SpinLockRelease(&SlotSyncCtx->mutex); @@ -1414,6 +1448,7 @@ reset_syncing_flag(void) { SpinLockAcquire(&SlotSyncCtx->mutex); SlotSyncCtx->syncing = false; + SlotSyncCtx->pid = InvalidPid; SpinLockRelease(&SlotSyncCtx->mutex); syncing_slots = false; @@ -1595,7 +1630,7 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) /* * The slot sync worker can't get here because it will only stop when it - * receives a stop request from the startup process, or when there is an + * because receives a stop request from the startup process, or when there is an * error. */ Assert(false); @@ -1651,16 +1686,18 @@ update_synced_slots_inactive_since(void) } /* - * Shut down the slot sync worker. + * Shut down slot synchronization. * - * This function sends signal to shutdown slot sync worker, if required. It - * also waits till the slot sync worker has exited or + * This function sets stopSignaled=true and wakes up the slot sync process + * (either worker or backend running SQL function pg_sync_replication_slots()) + * so that worker can exit or SQL function pg_sync_replication_slots() can + * finish. It also waits till the slot sync worker has exited or * pg_sync_replication_slots() has finished. */ void ShutDownSlotSync(void) { - pid_t worker_pid; + pid_t sync_process_pid; SpinLockAcquire(&SlotSyncCtx->mutex); @@ -1677,16 +1714,17 @@ ShutDownSlotSync(void) return; } - worker_pid = SlotSyncCtx->pid; + sync_process_pid = SlotSyncCtx->pid; SpinLockRelease(&SlotSyncCtx->mutex); /* - * Signal slotsync worker if it was still running. The worker will stop - * upon detecting that the stopSignaled flag is set to true. + * Signal slotsync worker or backend process running pg_sync_replication_slots() + * if running. The process will stop upon detecting that the stopSignaled + * flag is set to true. */ - if (worker_pid != InvalidPid) - kill(worker_pid, SIGUSR1); + if (sync_process_pid!= InvalidPid) + kill(sync_process_pid, SIGUSR1); /* Wait for slot sync to end */ for (;;) @@ -1835,7 +1873,10 @@ SyncReplicationSlots(WalReceiverConn *wrconn) { PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); { - check_and_set_sync_info(InvalidPid); + check_and_set_sync_info(MyProcPid); + + /* Check for interrupts and config changes */ + ProcessSlotSyncInterrupts(); validate_remote_info(wrconn); -- 2.47.3