v8-0001-Improve-initial-slot-synchronization-in-pg_sync_r.patch
application/octet-stream
Filename: v8-0001-Improve-initial-slot-synchronization-in-pg_sync_r.patch
Type: application/octet-stream
Part: 0
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v8-0001
Subject: Improve initial slot synchronization in pg_sync_replication_slots()
| File | + | − |
|---|---|---|
| doc/src/sgml/func/func-admin.sgml | 1 | 3 |
| doc/src/sgml/logicaldecoding.sgml | 13 | 27 |
| src/backend/replication/logical/slotsync.c | 188 | 31 |
| src/backend/utils/activity/wait_event_names.txt | 1 | 1 |
From 2af57203f3c5bb6f038413f44a9f116a9622c579 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 20 Aug 2025 14:55:55 +1000
Subject: [PATCH v8] Improve initial slot synchronization in
pg_sync_replication_slots()
During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.
The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.
To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
doc/src/sgml/func/func-admin.sgml | 4 +-
doc/src/sgml/logicaldecoding.sgml | 40 ++--
src/backend/replication/logical/slotsync.c | 219 +++++++++++++++---
.../utils/activity/wait_event_names.txt | 2 +-
4 files changed, 203 insertions(+), 62 deletions(-)
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 6347fe60b0c..cefdcb3887d 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1478,9 +1478,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
- Note that this function is primarily intended for testing and
- debugging purposes and should be used with caution. Additionally,
- this function cannot be executed if
+ Note that this function cannot be executed if
<link linkend="guc-sync-replication-slots"><varname>
sync_replication_slots</varname></link> is enabled and the slotsync
worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index a1f2efb2420..fd1d8771ec2 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
<sect2 id="logicaldecoding-replication-slots-synchronization">
<title>Replication Slot Synchronization</title>
<para>
- The logical replication slots on the primary can be synchronized to
- the hot standby by using the <literal>failover</literal> parameter of
+ The logical replication slots on the primary can be enabled for
+ synchronization to the hot standby by using the
+ <literal>failover</literal> parameter of
<link linkend="pg-create-logical-replication-slot">
<function>pg_create_logical_replication_slot</function></link>, or by
using the <link linkend="sql-createsubscription-params-with-failover">
<literal>failover</literal></link> option of
- <command>CREATE SUBSCRIPTION</command> during slot creation.
- Additionally, enabling <link linkend="guc-sync-replication-slots">
- <varname>sync_replication_slots</varname></link> on the standby
- is required. By enabling <link linkend="guc-sync-replication-slots">
- <varname>sync_replication_slots</varname></link>
- on the standby, the failover slots can be synchronized periodically in
+ <command>CREATE SUBSCRIPTION</command> during slot creation. After that,
+ synchronization can be be performed either manually by calling
+ <link linkend="pg-sync-replication-slots">
+ <function>pg_sync_replication_slots</function></link>
+ on the standby, or automatically by enabling
+ <link linkend="guc-sync-replication-slots">
+ <varname>sync_replication_slots</varname></link> on the standby.
+ When <link linkend="guc-sync-replication-slots">
+ <varname>sync_replication_slots</varname></link> is enabled
+ on the standby, the failover slots are periodically synchronized by
the slotsync worker. For the synchronization to work, it is mandatory to
have a physical replication slot between the primary and the standby (i.e.,
<link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
@@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
receiving the WAL up to the latest flushed position on the primary server.
</para>
- <note>
- <para>
- While enabling <link linkend="guc-sync-replication-slots">
- <varname>sync_replication_slots</varname></link> allows for automatic
- periodic synchronization of failover slots, they can also be manually
- synchronized using the <link linkend="pg-sync-replication-slots">
- <function>pg_sync_replication_slots</function></link> function on the standby.
- However, this function is primarily intended for testing and debugging and
- should be used with caution. Unlike automatic synchronization, it does not
- include cyclic retries, making it more prone to synchronization failures,
- particularly during initial sync scenarios where the required WAL files
- or catalog rows for the slot might have already been removed or are at risk
- of being removed on the standby. In contrast, automatic synchronization
- via <varname>sync_replication_slots</varname> provides continuous slot
- updates, enabling seamless failover and supporting high availability.
- Therefore, it is the recommended method for synchronizing slots.
- </para>
- </note>
-
<para>
When slot synchronization is configured as recommended,
and the initial synchronization is performed either automatically or
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 37738440113..770b5325f2b 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -91,6 +91,9 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
+ *
+ * The 'slot_persistence_pending' flag is used by pg_sync_replication_slots()
+ * to do retries if the slot did not persist while syncing.
*/
typedef struct SlotSyncCtxStruct
{
@@ -99,6 +102,7 @@ typedef struct SlotSyncCtxStruct
bool syncing;
time_t last_start_time;
slock_t mutex;
+ bool slot_persistence_pending;
} SlotSyncCtxStruct;
static SlotSyncCtxStruct *SlotSyncCtx = NULL;
@@ -113,6 +117,7 @@ bool sync_replication_slots = false;
*/
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000 /* 30s */
+#define SLOTSYNC_API_NAPTIME_MS 2000 /* 2s */
static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
@@ -146,6 +151,7 @@ typedef struct RemoteSlot
ReplicationSlotInvalidationCause invalidated;
} RemoteSlot;
+static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn);
static void slotsync_failure_callback(int code, Datum arg);
static void update_synced_slots_inactive_since(void);
@@ -577,11 +583,15 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/*
* The remote slot didn't catch up to locally reserved position.
*
- * We do not drop the slot because the restart_lsn can be ahead of the
- * current location when recreating the slot in the next cycle. It may
- * take more time to create such a slot. Therefore, we keep this slot
- * and attempt the synchronization in the next cycle.
+ * We do not drop the slot because the restart_lsn can be
+ * ahead of the current location when recreating the slot in
+ * the next cycle. It may take more time to create such a
+ * slot. Therefore, we keep this slot and attempt the
+ * synchronization in the next cycle. Update the
+ * slot_persistence_pending flag, so the API can retry.
*/
+ SlotSyncCtx->slot_persistence_pending = true;
+
return false;
}
@@ -596,6 +606,9 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
+ /* update flag, so that we retry */
+ SlotSyncCtx->slot_persistence_pending = true;
+
return false;
}
@@ -796,15 +809,23 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
}
/*
- * Synchronize slots.
+ * Fetch remote slots.
*
- * Gets the failover logical slots info from the primary server and updates
- * the slots locally. Creates the slots if not present on the standby.
+ * If remote_slot_list is NIL, fetches all failover logical slots from the
+ * primary server, otherwises fetches only the ones mentioned in
+ * target_slot_list.
+ *
+ * NOTE: Caller must ensure a transaction is active before calling this
+ * function.
+ *
+ * Parameters:
+ * wrconn - Connection to the primary server
+ * target_slot_list - List of RemoteSlot structures to refresh, or NIL to
+ * fetch all failover slots
*
- * Returns TRUE if any of the slots gets updated in this sync-cycle.
*/
-static bool
-synchronize_slots(WalReceiverConn *wrconn)
+static List *
+fetch_remote_slots(WalReceiverConn *wrconn, List *target_slot_list)
{
#define SLOTSYNC_COLUMN_COUNT 10
Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
@@ -813,13 +834,38 @@ synchronize_slots(WalReceiverConn *wrconn)
WalRcvExecResult *res;
TupleTableSlot *tupslot;
List *remote_slot_list = NIL;
- bool some_slot_updated = false;
+ StringInfoData query;
+ ListCell *lc;
+ bool first_slot = true;
bool started_tx = false;
- const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
- " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
- " database, invalidation_reason"
- " FROM pg_catalog.pg_replication_slots"
- " WHERE failover and NOT temporary";
+
+ /* Build the query based on whether we're fetching all or refreshing specific slots */
+ initStringInfo(&query);
+ appendStringInfoString(&query,
+ "SELECT slot_name, plugin, confirmed_flush_lsn,"
+ " restart_lsn, catalog_xmin, two_phase,"
+ " two_phase_at, failover,"
+ " database, invalidation_reason"
+ " FROM pg_catalog.pg_replication_slots"
+ " WHERE failover and NOT temporary");
+
+ if (target_slot_list!= NIL)
+ {
+ /* Add IN clause for specific slot names */
+ appendStringInfoString(&query, " AND slot_name IN (");
+
+ foreach(lc, target_slot_list)
+ {
+ RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc);
+
+ if (!first_slot)
+ appendStringInfoString(&query, ", ");
+
+ appendStringInfo(&query, "'%s'", remote_slot->name);
+ first_slot = false;
+ }
+ appendStringInfoString(&query, ")");
+ }
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
@@ -829,13 +875,13 @@ synchronize_slots(WalReceiverConn *wrconn)
}
/* Execute the query */
- res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
+ res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
errmsg("could not fetch failover logical slots info from the primary server: %s",
res->err));
- /* Construct the remote_slot tuple and synchronize each slot locally */
+ /* Process the slot information */
tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
{
@@ -906,12 +952,34 @@ synchronize_slots(WalReceiverConn *wrconn)
remote_slot->invalidated == RS_INVAL_NONE)
pfree(remote_slot);
else
- /* Create list of remote slots */
+ /* Add to updated list */
remote_slot_list = lappend(remote_slot_list, remote_slot);
ExecClearTuple(tupslot);
}
+ walrcv_clear_result(res);
+ pfree(query.data);
+
+ if (started_tx)
+ CommitTransactionCommand();
+
+ return remote_slot_list;
+}
+
+/*
+ * Synchronize slots.
+ *
+ * Takes a list of remote slots and synchronizes them locally. Creates the
+ * slots if not present on the standby and updates existing ones.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ */
+static bool
+synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list)
+{
+ bool some_slot_updated = false;
+
/* Drop local slots that no longer need to be synced. */
drop_local_obsolete_slots(remote_slot_list);
@@ -932,14 +1000,6 @@ synchronize_slots(WalReceiverConn *wrconn)
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
- /* We are done, free remote_slot_list elements */
- list_free_deep(remote_slot_list);
-
- walrcv_clear_result(res);
-
- if (started_tx)
- CommitTransactionCommand();
-
return some_slot_updated;
}
@@ -1131,7 +1191,7 @@ slotsync_reread_config(void)
bool conninfo_changed;
bool primary_slotname_changed;
- Assert(sync_replication_slots);
+ Assert(!AmLogicalSlotSyncWorkerProcess() || sync_replication_slots);
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
@@ -1228,6 +1288,7 @@ slotsync_worker_onexit(int code, Datum arg)
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->pid = InvalidPid;
+ SlotSyncCtx->slot_persistence_pending = false;
/*
* If syncing_slots is true, it indicates that the process errored out
@@ -1276,7 +1337,7 @@ wait_for_slot_activity(bool some_slot_updated)
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
sleep_ms,
- WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+ WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
@@ -1335,6 +1396,7 @@ reset_syncing_flag()
{
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
+ SlotSyncCtx->slot_persistence_pending = false;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
@@ -1505,10 +1567,27 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
for (;;)
{
bool some_slot_updated = false;
+ bool started_tx = false;
+ List *remote_slots;
ProcessSlotSyncInterrupts(wrconn);
- some_slot_updated = synchronize_slots(wrconn);
+ /*
+ * The syscache access in fetch_or_refresh_remote_slots() needs a
+ * transaction env.
+ */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ remote_slots = fetch_remote_slots(wrconn, NIL);
+ some_slot_updated = synchronize_slots(wrconn, remote_slots);
+ list_free_deep(remote_slots);
+
+ if (started_tx)
+ CommitTransactionCommand();
wait_for_slot_activity(some_slot_updated);
}
@@ -1738,17 +1817,95 @@ slotsync_failure_callback(int code, Datum arg)
/*
* Synchronize the failover enabled replication slots using the specified
* primary server connection.
+ *
+ * Repeatedly fetches and updates replication slot information from the
+ * primary until all slots are at least "sync ready". Retry is done after 2
+ * sec wait. Exits early is promotion is triggered.
*/
void
SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ List *remote_slots;
+ List *prev_slot_list = NIL;
+ bool started_tx = false;
+
check_and_set_sync_info(InvalidPid);
validate_remote_info(wrconn);
- synchronize_slots(wrconn);
+ /*
+ * The syscache access in fetch_or_refresh_remote_slots() needs a
+ * transaction env.
+ */
+ if (!IsTransactionState()) {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+
+ /* Retry until all slots are sync ready atleast */
+ for (;;)
+ {
+ int rc;
+
+ /* reset flag before every iteration */
+ SlotSyncCtx->slot_persistence_pending = false;
+
+ /*
+ * Refresh the remote slot data. We keep using the previous slot
+ * list, even if some slots are already sync ready, so that all
+ * slots are updated with the latest status from the primary.
+ * Some of the slots in the previous list could have gone away,
+ * which is why we create a new list here and free the old list
+ * at the end of the loop.
+ */
+ remote_slots = fetch_remote_slots(wrconn, prev_slot_list);
+
+ /* Attempt to synchronize slots */
+ synchronize_slots(wrconn, remote_slots);
+
+ /* Done if all slots are atleast sync ready */
+ if (!SlotSyncCtx->slot_persistence_pending)
+ break;
+
+ /* wait for 2 seconds before retrying */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ SLOTSYNC_API_NAPTIME_MS,
+ WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+
+ /*
+ * If we've been promoted, then no point
+ * continuing.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("exiting from slot synchronization as"
+ " promotion is triggered")));
+ break;
+ }
+
+ /* Handle any termination request if any */
+ ProcessSlotSyncInterrupts(wrconn);
+
+ /* Free the previous slot-list if it exists */
+ if (prev_slot_list)
+ list_free_deep(prev_slot_list);
+
+ prev_slot_list = remote_slots;
+ }
+
+ list_free_deep(remote_slots);
+
+ if (started_tx)
+ CommitTransactionCommand();
/* Cleanup the synced temporary slots */
ReplicationSlotCleanup(true);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 0be307d2ca0..3497f0fa45e 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process."
LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process."
LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process."
RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
-REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker."
REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up."
SYSLOGGER_MAIN "Waiting in main loop of syslogger process."
WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process."
WAL_SENDER_MAIN "Waiting in main loop of WAL sender process."
--
2.47.3