v18-0001-Improve-initial-slot-synchronization-in-pg_sync_.patch
application/octet-stream
Filename: v18-0001-Improve-initial-slot-synchronization-in-pg_sync_.patch
Type: application/octet-stream
Part: 0
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v18-0001
Subject: Improve initial slot synchronization in pg_sync_replication_slots()
| File | + | − |
|---|---|---|
| doc/src/sgml/func/func-admin.sgml | 1 | 3 |
| doc/src/sgml/logicaldecoding.sgml | 5 | 7 |
| src/backend/replication/logical/slotsync.c | 315 | 46 |
| src/backend/utils/activity/wait_event_names.txt | 1 | 1 |
| src/test/recovery/t/040_standby_failover_slots_sync.pl | 64 | 29 |
From 052715969b0bc5b42e111be30bcb0870a5f31bdf Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Mon, 13 Oct 2025 18:40:20 +1100
Subject: [PATCH v18] Improve initial slot synchronization in
pg_sync_replication_slots()
During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.
The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.
To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
doc/src/sgml/func/func-admin.sgml | 4 +-
doc/src/sgml/logicaldecoding.sgml | 12 +-
src/backend/replication/logical/slotsync.c | 361 +++++++++++++++---
.../utils/activity/wait_event_names.txt | 2 +-
.../t/040_standby_failover_slots_sync.pl | 93 +++--
5 files changed, 386 insertions(+), 86 deletions(-)
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 1b465bc8ba7..2896cd9e429 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1497,9 +1497,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
- Note that this function is primarily intended for testing and
- debugging purposes and should be used with caution. Additionally,
- this function cannot be executed if
+ Note that this function cannot be executed if
<link linkend="guc-sync-replication-slots"><varname>
sync_replication_slots</varname></link> is enabled and the slotsync
worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index b803a819cf1..b964937d509 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -405,15 +405,13 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
periodic synchronization of failover slots, they can also be manually
synchronized using the <link linkend="pg-sync-replication-slots">
<function>pg_sync_replication_slots</function></link> function on the standby.
- However, this function is primarily intended for testing and debugging and
- should be used with caution. Unlike automatic synchronization, it does not
- include cyclic retries, making it more prone to synchronization failures,
- particularly during initial sync scenarios where the required WAL files
- or catalog rows for the slot might have already been removed or are at risk
- of being removed on the standby. In contrast, automatic synchronization
+ However, unlike automatic synchronization, it does not perform incremental
+ updates. It retries cyclically to some extent—continuing until all
+ the failover slots that existed on primary at the start of the function
+ call are synchronized. Any slots created after the function begins will
+ not be synchronized. In contrast, automatic synchronization
via <varname>sync_replication_slots</varname> provides continuous slot
updates, enabling seamless failover and supporting high availability.
- Therefore, it is the recommended method for synchronizing slots.
</para>
</note>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8c061d55bdb..c6bbc12675a 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -39,6 +39,12 @@
* the last cycle. Refer to the comments above wait_for_slot_activity() for
* more details.
*
+ * If the pg_sync_replication API is used to sync the slots, and if the slots
+ * are not ready to be synced and are marked as RS_TEMPORARY because of any of
+ * the reasons mentioned above, then the API also waits and retries until the
+ * slots are marked as RS_PERSISTENT (which means sync-ready). Refer to the
+ * comments in SyncReplicationSlots() for more details.
+ *
* Any standby synchronized slots will be dropped if they no longer need
* to be synchronized. See comment atop drop_local_obsolete_slots() for more
* details.
@@ -64,6 +70,7 @@
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
@@ -100,6 +107,16 @@ typedef struct SlotSyncCtxStruct
slock_t mutex;
} SlotSyncCtxStruct;
+/*
+ * Structure holding parameters that need to be freed on error in
+ * pg_sync_replication_slots()
+ */
+typedef struct SlotSyncApiFailureParams
+{
+ WalReceiverConn *wrconn;
+ List *slot_names;
+} SlotSyncApiFailureParams;
+
static SlotSyncCtxStruct *SlotSyncCtx = NULL;
/* GUC variable */
@@ -147,6 +164,7 @@ typedef struct RemoteSlot
static void slotsync_failure_callback(int code, Datum arg);
static void update_synced_slots_inactive_since(void);
+static void slotsync_api_reread_config(void);
/*
* If necessary, update the local synced slot's metadata based on the data
@@ -553,11 +571,15 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
* local ones, then update the LSNs and persist the local synced slot for
* future synchronization; otherwise, do nothing.
*
+ * *slot_persistence_pending is set to true if any of the slots fail to
+ * persist. It is utilized by the pg_sync_replication_slots() API.
+ *
* Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
* false.
*/
static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+ bool *slot_persistence_pending)
{
ReplicationSlot *slot = MyReplicationSlot;
bool found_consistent_snapshot = false;
@@ -576,11 +598,18 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/*
* The remote slot didn't catch up to locally reserved position.
*
- * We do not drop the slot because the restart_lsn can be ahead of the
- * current location when recreating the slot in the next cycle. It may
- * take more time to create such a slot. Therefore, we keep this slot
- * and attempt the synchronization in the next cycle.
+ * We do not drop the slot because the restart_lsn can be
+ * ahead of the current location when recreating the slot in
+ * the next cycle. It may take more time to create such a
+ * slot. Therefore, we keep this slot and attempt the
+ * synchronization in the next cycle.
+ *
+ * We also update the slot_persistence_pending parameter, so
+ * the API can retry.
*/
+ if (slot_persistence_pending)
+ *slot_persistence_pending = true;
+
return false;
}
@@ -595,6 +624,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
+ /* Set this, so that API can retry */
+ if (slot_persistence_pending)
+ *slot_persistence_pending = true;
+
return false;
}
@@ -618,10 +651,14 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* updated. The slot is then persisted and is considered as sync-ready for
* periodic syncs.
*
+ * *slot_persistence_pending is set to true if any of the slots fail to
+ * persist. It is utilized by the pg_sync_replication_slots() API.
+ *
* Returns TRUE if the local slot is updated.
*/
static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+ bool *slot_persistence_pending)
{
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
@@ -715,7 +752,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
if (slot->data.persistency == RS_TEMPORARY)
{
slot_updated = update_and_persist_local_synced_slot(remote_slot,
- remote_dbid);
+ remote_dbid,
+ slot_persistence_pending);
}
/* Slot ready for sync, so sync it. */
@@ -784,7 +822,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
- update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+ update_and_persist_local_synced_slot(remote_slot, remote_dbid,
+ slot_persistence_pending);
slot_updated = true;
}
@@ -795,15 +834,23 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
}
/*
- * Synchronize slots.
+ * Fetch remote slots.
*
- * Gets the failover logical slots info from the primary server and updates
- * the slots locally. Creates the slots if not present on the standby.
+ * If slot_names is NIL, fetches all failover logical slots from the
+ * primary server, otherwise fetches only the ones with names in slot_names.
+ *
+ * Parameters:
+ * wrconn - Connection to the primary server
+ * slot_names - List of slot names (char *) to fetch from primary,
+ * or NIL to fetch all failover logical slots.
+ *
+ * Returns:
+ * List of remote slot information structures. Returns NIL if no slot
+ * is found.
*
- * Returns TRUE if any of the slots gets updated in this sync-cycle.
*/
-static bool
-synchronize_slots(WalReceiverConn *wrconn)
+static List *
+fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names)
{
#define SLOTSYNC_COLUMN_COUNT 10
Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
@@ -812,29 +859,47 @@ synchronize_slots(WalReceiverConn *wrconn)
WalRcvExecResult *res;
TupleTableSlot *tupslot;
List *remote_slot_list = NIL;
- bool some_slot_updated = false;
- bool started_tx = false;
- const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
- " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
- " database, invalidation_reason"
- " FROM pg_catalog.pg_replication_slots"
- " WHERE failover and NOT temporary";
-
- /* The syscache access in walrcv_exec() needs a transaction env. */
- if (!IsTransactionState())
+ StringInfoData query;
+
+ initStringInfo(&query);
+ appendStringInfoString(&query,
+ "SELECT slot_name, plugin, confirmed_flush_lsn,"
+ " restart_lsn, catalog_xmin, two_phase,"
+ " two_phase_at, failover,"
+ " database, invalidation_reason"
+ " FROM pg_catalog.pg_replication_slots"
+ " WHERE failover and NOT temporary");
+
+ if (slot_names != NIL)
{
- StartTransactionCommand();
- started_tx = true;
+ ListCell *lc;
+ bool first_slot = true;
+
+ /*
+ * Construct the query to fetch only the specified slots
+ */
+ appendStringInfoString(&query, " AND slot_name IN (");
+
+ foreach(lc, slot_names)
+ {
+ char *slot_name = (char *) lfirst(lc);
+
+ if (!first_slot)
+ appendStringInfoString(&query, ", ");
+
+ appendStringInfo(&query, "'%s'", slot_name);
+ first_slot = false;
+ }
+ appendStringInfoString(&query, ")");
}
/* Execute the query */
- res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
+ res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
errmsg("could not fetch failover logical slots info from the primary server: %s",
res->err));
- /* Construct the remote_slot tuple and synchronize each slot locally */
tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
{
@@ -885,7 +950,6 @@ synchronize_slots(WalReceiverConn *wrconn)
remote_slot->invalidated = isnull ? RS_INVAL_NONE :
GetSlotInvalidationCause(TextDatumGetCString(d));
- /* Sanity check */
Assert(col == SLOTSYNC_COLUMN_COUNT);
/*
@@ -905,12 +969,38 @@ synchronize_slots(WalReceiverConn *wrconn)
remote_slot->invalidated == RS_INVAL_NONE)
pfree(remote_slot);
else
- /* Create list of remote slots */
remote_slot_list = lappend(remote_slot_list, remote_slot);
ExecClearTuple(tupslot);
}
+ walrcv_clear_result(res);
+ pfree(query.data);
+
+ return remote_slot_list;
+}
+
+/*
+ * Synchronize slots.
+ *
+ * Takes a list of remote slots and synchronizes them locally. Creates the
+ * slots if not present on the standby and updates existing ones.
+ *
+ * Parameters:
+ * wrconn - Connection to the primary server
+ * remote_slot_list - List of RemoteSlot structures to synchronize.
+ * slot_persistence_pending - boolean used by pg_sync_replication_slots
+ * API to track if any slots could not be
+ * persisted and need to be retried.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ */
+static bool
+synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list,
+ bool *slot_persistence_pending)
+{
+ bool some_slot_updated = false;
+
/* Drop local slots that no longer need to be synced. */
drop_local_obsolete_slots(remote_slot_list);
@@ -926,19 +1016,12 @@ synchronize_slots(WalReceiverConn *wrconn)
*/
LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
- some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+ some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid,
+ slot_persistence_pending);
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
- /* We are done, free remote_slot_list elements */
- list_free_deep(remote_slot_list);
-
- walrcv_clear_result(res);
-
- if (started_tx)
- CommitTransactionCommand();
-
return some_slot_updated;
}
@@ -1186,6 +1269,26 @@ ProcessSlotSyncInterrupts(void)
slotsync_reread_config();
}
+/*
+ * Interrupt handler for pg_sync_replication_slots() API.
+ */
+static void
+ProcessSlotSyncAPIInterrupts()
+{
+ CHECK_FOR_INTERRUPTS();
+
+ /* If we've been promoted, then no point continuing. */
+ if (SlotSyncCtx->stopSignaled)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot continue replication slots synchronization"
+ " as standby promotion is triggered")));
+
+ /* error out if configuration parameters changed */
+ if (ConfigReloadPending)
+ slotsync_api_reread_config();
+}
+
/*
* Connection cleanup function for slotsync worker.
*
@@ -1275,7 +1378,7 @@ wait_for_slot_activity(bool some_slot_updated)
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
sleep_ms,
- WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+ WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
@@ -1505,10 +1608,27 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
for (;;)
{
bool some_slot_updated = false;
+ bool started_tx = false;
+ List *remote_slots;
ProcessSlotSyncInterrupts();
- some_slot_updated = synchronize_slots(wrconn);
+ /*
+ * The syscache access in fetch_or_refresh_remote_slots() needs a
+ * transaction env.
+ */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ remote_slots = fetch_remote_slots(wrconn, NIL);
+ some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL);
+ list_free_deep(remote_slots);
+
+ if (started_tx)
+ CommitTransactionCommand();
wait_for_slot_activity(some_slot_updated);
}
@@ -1705,7 +1825,8 @@ SlotSyncShmemInit(void)
static void
slotsync_failure_callback(int code, Datum arg)
{
- WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+ SlotSyncApiFailureParams *fparams =
+ (SlotSyncApiFailureParams *) DatumGetPointer(arg);
/*
* We need to do slots cleanup here just like WalSndErrorCleanup() does.
@@ -1732,23 +1853,171 @@ slotsync_failure_callback(int code, Datum arg)
if (syncing_slots)
reset_syncing_flag();
- walrcv_disconnect(wrconn);
+ if (fparams->slot_names)
+ list_free_deep(fparams->slot_names);
+
+ walrcv_disconnect(fparams->wrconn);
+}
+
+/*
+ * Helper function to extract slot names from a list of remote slots
+ */
+static List *
+extract_slot_names(List *remote_slots)
+{
+ List *slot_names = NIL;
+ ListCell *lc;
+ MemoryContext oldcontext;
+
+ /* Switch to long-lived TopMemoryContext to store slot names */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+ foreach(lc, remote_slots)
+ {
+ RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc);
+ char *slot_name;
+
+ slot_name = pstrdup(remote_slot->name);
+ slot_names = lappend(slot_names, slot_name);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return slot_names;
+}
+
+/*
+ * Re-read the config file and check for critical parameter changes.
+ *
+ */
+static void
+slotsync_api_reread_config(void)
+{
+ char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
+ char *old_primary_slotname = pstrdup(PrimarySlotName);
+ bool old_hot_standby_feedback = hot_standby_feedback;
+ bool conninfo_changed;
+ bool primary_slotname_changed;
+
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+
+ conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
+ primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
+
+ pfree(old_primary_conninfo);
+ pfree(old_primary_slotname);
+
+ /* throw error for certain parameter changes */
+ if (conninfo_changed ||
+ primary_slotname_changed ||
+ (old_hot_standby_feedback != hot_standby_feedback))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIG_FILE_ERROR),
+ errmsg("cannot continue slot synchronization due"
+ " to parameter changes"),
+ errdetail("One or more of primary_conninfo,"
+ " primary_slot_name or hot_standby_feedback"
+ " were modified"),
+ errhint("Retry pg_sync_replication_slots() to use the"
+ " updated configuration.")));
+ }
}
/*
* Synchronize the failover enabled replication slots using the specified
* primary server connection.
+ *
+ * Repeatedly fetches and updates replication slot information from the
+ * primary until all slots are at least "sync ready".
+ * Exits early if promotion is triggered or certain critical
+ * configuration parameters have changed.
*/
void
SyncReplicationSlots(WalReceiverConn *wrconn)
{
- PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
+ SlotSyncApiFailureParams fparams;
+
+ fparams.wrconn = wrconn;
+ fparams.slot_names = NULL;
+
+ PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(&fparams));
{
+ List *remote_slots = NIL;
+ List *slot_names = NIL; /* List of slot names to track */
+
check_and_set_sync_info(InvalidPid);
validate_remote_info(wrconn);
- synchronize_slots(wrconn);
+ /* Retry until all the slots are sync-ready */
+ for (;;)
+ {
+ bool started_tx = false;
+ bool slot_persistence_pending = false;
+ bool some_slot_updated = false;
+
+ /* Reset flag before every iteration */
+ slot_persistence_pending = false;
+
+ /* Check for interrupts and config changes */
+ ProcessSlotSyncAPIInterrupts();
+
+ /*
+ * The syscache access in fetch_remote_slots() needs a
+ * transaction env.
+ */
+ if (!IsTransactionState()) {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ /*
+ * Fetch remote slot info for the given slot_names. If slot_names is NIL,
+ * fetch all failover-enabled slots. Note that we reuse slot_names from
+ * the first iteration; re-fetching all failover slots each time could
+ * cause an endless loop. Instead of reprocessing only the pending slots
+ * in each iteration, it's better to process all the slots received in
+ * the first iteration. This ensures that by the time we're done, all
+ * slots reflect the latest values.
+ */
+ remote_slots = fetch_remote_slots(wrconn, slot_names);
+
+ /* Attempt to synchronize slots */
+ some_slot_updated = synchronize_slots(wrconn, remote_slots,
+ &slot_persistence_pending);
+
+ /*
+ * If slot_persistence_pending is true, extract slot names
+ * for future iterations (only needed if we haven't done it yet)
+ */
+ if (slot_names == NIL && slot_persistence_pending)
+ {
+ slot_names = extract_slot_names(remote_slots);
+
+ /* Update the failure structure so that it can be freed on error */
+ fparams.slot_names = slot_names;
+ }
+
+ /* Free the current remote_slots list */
+ list_free_deep(remote_slots);
+
+ /* Commit transaction if we started it */
+ if (started_tx)
+ CommitTransactionCommand();
+
+ /* Done if all slots are persisted i.e are sync-ready */
+ if (!slot_persistence_pending)
+ break;
+
+ /* wait before retrying again */
+ wait_for_slot_activity(some_slot_updated);
+
+ }
+
+ if (slot_names)
+ list_free_deep(slot_names);
/* Cleanup the synced temporary slots */
ReplicationSlotCleanup(true);
@@ -1756,5 +2025,5 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
/* We are done with sync, so reset sync flag */
reset_syncing_flag();
}
- PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
+ PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(&fparams));
}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7553f6eacef..16b3b04d3c4 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process."
LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process."
LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process."
RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
-REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker."
REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up."
SYSLOGGER_MAIN "Waiting in main loop of syslogger process."
WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process."
WAL_SENDER_MAIN "Waiting in main loop of WAL sender process."
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 2c61c51e914..44b43db6a56 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -115,17 +115,21 @@ ok( $stderr =~
"cannot sync slots on a non-standby server");
##################################################
-# Test logical failover slots corresponding to different plugins can be
-# synced to the standby.
+# Set up a standby server (standby1) to test slot synchronization.
#
-# Configure standby1 to replicate and synchronize logical slots configured
-# for failover on the primary
+# Configure standby1 to replicate from the primary and synchronize
+# logical failover slots.
#
-# failover slot lsub1_slot | output_plugin: pgoutput
-# failover slot lsub2_slot | output_plugin: test_decoding
+# failover slot lsub1_slot |
# primary ---> |
# physical slot sb1_slot --->| ----> standby1 (connected via streaming replication)
-# | lsub1_slot, lsub2_slot (synced_slot)
+# |
+##################################################
+
+##################################################
+# Test that pg_sync_replication_slots() on the standby waits and retries
+# until the slot becomes sync-ready (when the standby catches up to the
+# slot's restart_lsn).
##################################################
my $primary = $publisher;
@@ -153,47 +157,64 @@ log_min_messages = 'debug2'
$primary->append_conf('postgresql.conf', "log_min_messages = 'debug2'");
$primary->reload;
-# Drop the subscription to prevent further advancement of the restart_lsn for
+# Disable the subscription to prevent further advancement of the restart_lsn for
# the lsub1_slot.
-$subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub1;");
-
-# To ensure that restart_lsn has moved to a recent WAL position, we re-create
-# the lsub1_slot.
-$primary->psql('postgres',
- q{SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);}
-);
-
-$primary->psql('postgres',
- q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
-);
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE;");
+# create the physical slot for the standby
$primary->psql('postgres',
q{SELECT pg_create_physical_replication_slot('sb1_slot');});
# Start the standby so that slot syncing can begin
$standby1->start;
+# Create some DDL on the primary so that the slot lags behind the standby
+$primary->safe_psql('postgres', "CREATE TABLE push_wal (a int);");
+$subscriber1->safe_psql('postgres', "CREATE TABLE push_wal (a int);");
+
# Capture the inactive_since of the slot from the primary. Note that the slot
-# will be inactive since the corresponding subscription was dropped.
+# will be inactive since the corresponding subscription was disabled.
my $inactive_since_on_primary =
$primary->validate_slot_inactive_since('lsub1_slot',
$slot_creation_time_on_primary);
-# Wait for the standby to catch up so that the standby is not lagging behind
-# the failover slots.
-$primary->wait_for_replay_catchup($standby1);
+# Attempt to synchronize slots using API. This will initially fail because
+# the slot is not yet sync-ready (standby hasn't caught up to slot's restart_lsn),
+# but the API will wait and retry. Call the API in a background process.
+my $log_offset = -s $standby1->logfile;
-# Synchronize the primary server slots to the standby.
-$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+my $h = $standby1->background_psql('postgres', on_error_stop => 0);
-# Confirm that the logical failover slots are created on the standby and are
+$h->query_until(qr//, "SELECT pg_sync_replication_slots();\n");
+
+# Confirm that the slot could not be synced initially.
+$standby1->wait_for_log(
+ qr/could not synchronize replication slot \"lsub1_slot\"/,
+ $log_offset);
+
+$primary->safe_psql('postgres', "INSERT INTO push_wal values (1);");
+
+# Restart subscription to consume data in slot lsub1_slot
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE;");
+
+# Create xl_running_xacts records on the primary for which the standby is waiting
+$primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();");
+
+# Confirm log that the slot has been synced after becoming sync-ready.
+$standby1->wait_for_log(
+ qr/newly created replication slot \"lsub1_slot\" is sync-ready now/,
+ $log_offset);
+
+$h->quit;
+
+# Confirm that the logical failover slot is created on the standby and is
# flagged as 'synced'
is( $standby1->safe_psql(
'postgres',
- q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot') AND synced AND NOT temporary;}
+ q{SELECT count(*) = 1 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot') AND synced AND NOT temporary;}
),
"t",
- 'logical slots have synced as true on standby');
+ 'logical slots are synced after API retry on standby');
# Capture the inactive_since of the synced slot on the standby
my $inactive_since_on_standby =
@@ -208,6 +229,20 @@ is( $standby1->safe_psql(
"t",
'synchronized slot has got its own inactive_since');
+# Drop the tables and subscription
+$primary->safe_psql('postgres', "DROP TABLE push_wal;");
+$subscriber1->safe_psql('postgres', "DROP TABLE push_wal;");
+$subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub1;");
+
+# Re-create the lsub1_slot with pgoutput plugin and create lsub2_slot with test_decoding
+$primary->psql('postgres',
+ q{SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);}
+);
+
+$primary->psql('postgres',
+ q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
+);
+
##################################################
# Test that the synchronized slot will be dropped if the corresponding remote
# slot on the primary server has been dropped.
@@ -279,7 +314,7 @@ $inactive_since_on_primary =
# the failover slots.
$primary->wait_for_replay_catchup($standby1);
-my $log_offset = -s $standby1->logfile;
+$log_offset = -s $standby1->logfile;
# Synchronize the primary server slots to the standby.
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
--
2.47.3