v5-0001-Improve-initial-slot-synchronization-in-pg_sync_r.patch
application/octet-stream
Filename: v5-0001-Improve-initial-slot-synchronization-in-pg_sync_r.patch
Type: application/octet-stream
Part: 0
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v5-0001
Subject: Improve initial slot synchronization in pg_sync_replication_slots()
| File | + | − |
|---|---|---|
| doc/src/sgml/func/func-admin.sgml | 1 | 3 |
| doc/src/sgml/logicaldecoding.sgml | 13 | 27 |
| src/backend/replication/logical/slotsync.c | 368 | 69 |
| src/backend/utils/activity/wait_event_names.txt | 1 | 1 |
From b11c33159de217d21c188cfa18af0399e1277e0d Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Mon, 11 Aug 2025 03:44:55 -0400
Subject: [PATCH v5] Improve initial slot synchronization in
pg_sync_replication_slots()
During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.
The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.
To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
doc/src/sgml/func/func-admin.sgml | 4 +-
doc/src/sgml/logicaldecoding.sgml | 40 +--
src/backend/replication/logical/slotsync.c | 437 ++++++++++++++++++++----
src/backend/utils/activity/wait_event_names.txt | 2 +-
4 files changed, 383 insertions(+), 100 deletions(-)
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 446fdfe..3608610 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1478,9 +1478,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
- Note that this function is primarily intended for testing and
- debugging purposes and should be used with caution. Additionally,
- this function cannot be executed if
+ Note that this function cannot be executed if
<link linkend="guc-sync-replication-slots"><varname>
sync_replication_slots</varname></link> is enabled and the slotsync
worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 77c720c..6e4251a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
<sect2 id="logicaldecoding-replication-slots-synchronization">
<title>Replication Slot Synchronization</title>
<para>
- The logical replication slots on the primary can be synchronized to
- the hot standby by using the <literal>failover</literal> parameter of
+ The logical replication slots on the primary can be enabled for
+ synchronization to the hot standby by using the
+ <literal>failover</literal> parameter of
<link linkend="pg-create-logical-replication-slot">
<function>pg_create_logical_replication_slot</function></link>, or by
using the <link linkend="sql-createsubscription-params-with-failover">
<literal>failover</literal></link> option of
- <command>CREATE SUBSCRIPTION</command> during slot creation.
- Additionally, enabling <link linkend="guc-sync-replication-slots">
- <varname>sync_replication_slots</varname></link> on the standby
- is required. By enabling <link linkend="guc-sync-replication-slots">
- <varname>sync_replication_slots</varname></link>
- on the standby, the failover slots can be synchronized periodically in
+ <command>CREATE SUBSCRIPTION</command> during slot creation. After that,
+ synchronization can be be performed either manually by calling
+ <link linkend="pg-sync-replication-slots">
+ <function>pg_sync_replication_slots</function></link>
+ on the standby, or automatically by enabling
+ <link linkend="guc-sync-replication-slots">
+ <varname>sync_replication_slots</varname></link> on the standby.
+ When <link linkend="guc-sync-replication-slots">
+ <varname>sync_replication_slots</varname></link> is enabled
+ on the standby, the failover slots are periodically synchronized by
the slotsync worker. For the synchronization to work, it is mandatory to
have a physical replication slot between the primary and the standby (i.e.,
<link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
@@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
receiving the WAL up to the latest flushed position on the primary server.
</para>
- <note>
- <para>
- While enabling <link linkend="guc-sync-replication-slots">
- <varname>sync_replication_slots</varname></link> allows for automatic
- periodic synchronization of failover slots, they can also be manually
- synchronized using the <link linkend="pg-sync-replication-slots">
- <function>pg_sync_replication_slots</function></link> function on the standby.
- However, this function is primarily intended for testing and debugging and
- should be used with caution. Unlike automatic synchronization, it does not
- include cyclic retries, making it more prone to synchronization failures,
- particularly during initial sync scenarios where the required WAL files
- or catalog rows for the slot might have already been removed or are at risk
- of being removed on the standby. In contrast, automatic synchronization
- via <varname>sync_replication_slots</varname> provides continuous slot
- updates, enabling seamless failover and supporting high availability.
- Therefore, it is the recommended method for synchronizing slots.
- </para>
- </note>
-
<para>
When slot synchronization is configured as recommended,
and the initial synchronization is performed either automatically or
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 3773844..f9eec0b 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -113,6 +113,7 @@ bool sync_replication_slots = false;
*/
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000 /* 30s */
+#define SLOTSYNC_API_NAPTIME_MS 2000 /* 2s */
static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
@@ -146,6 +147,7 @@ typedef struct RemoteSlot
ReplicationSlotInvalidationCause invalidated;
} RemoteSlot;
+static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn);
static void slotsync_failure_callback(int code, Datum arg);
static void update_synced_slots_inactive_since(void);
@@ -166,7 +168,8 @@ static void update_synced_slots_inactive_since(void);
static bool
update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
bool *found_consistent_snapshot,
- bool *remote_slot_precedes)
+ bool *remote_slot_precedes,
+ int sync_iterations)
{
ReplicationSlot *slot = MyReplicationSlot;
bool updated_xmin_or_lsn = false;
@@ -209,15 +212,21 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
* to understand why the slot is not sync-ready. In the case of a
* persistent slot, it would be a more common case and won't directly
* impact the users, so we used DEBUG1 level to log the message.
+ *
+ * If called from pg_sync_replication_slots(), log message only for
+ * the first iteration.
*/
- ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
- errmsg("could not synchronize replication slot \"%s\"",
+ if (AmLogicalSlotSyncWorkerProcess() || sync_iterations == 1)
+ {
+ ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
+ errmsg("Replication slot \"%s\" is not sync ready; will keep retrying",
remote_slot->name),
- errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
- LSN_FORMAT_ARGS(remote_slot->restart_lsn),
- remote_slot->catalog_xmin,
- LSN_FORMAT_ARGS(slot->data.restart_lsn),
- slot->data.catalog_xmin));
+ errdetail("Attempting Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
+ LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+ remote_slot->catalog_xmin,
+ LSN_FORMAT_ARGS(slot->data.restart_lsn),
+ slot->data.catalog_xmin));
+ }
if (remote_slot_precedes)
*remote_slot_precedes = true;
@@ -558,7 +567,9 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
* false.
*/
static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(WalReceiverConn * wrconn,
+ RemoteSlot * remote_slot, Oid remote_dbid, bool *sync_start_pending,
+ int sync_iterations)
{
ReplicationSlot *slot = MyReplicationSlot;
bool found_consistent_snapshot = false;
@@ -566,7 +577,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
(void) update_local_synced_slot(remote_slot, remote_dbid,
&found_consistent_snapshot,
- &remote_slot_precedes);
+ &remote_slot_precedes,
+ sync_iterations);
/*
* Check if the primary server has caught up. Refer to the comment atop
@@ -575,13 +587,40 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
if (remote_slot_precedes)
{
/*
- * The remote slot didn't catch up to locally reserved position.
+ * The remote slot didn't catch up to locally reserved
+ * position.
*
- * We do not drop the slot because the restart_lsn can be ahead of the
- * current location when recreating the slot in the next cycle. It may
- * take more time to create such a slot. Therefore, we keep this slot
- * and attempt the synchronization in the next cycle.
+ * We do not drop the slot because the restart_lsn can be
+ * ahead of the current location when recreating the slot in
+ * the next cycle. It may take more time to create such a
+ * slot. Therefore, we keep this slot and attempt the
+ * synchronization in the next cycle.
+ *
+ * If called from pg_sync_replication_slots(), set flag
+ * indicating that the slot is not yet sync ready, so that it
+ * can be retried. Log a message once every 5 iterations,
+ * which should be around 10 seconds.
*/
+ if (!AmLogicalSlotSyncWorkerProcess())
+ {
+ if (sync_start_pending)
+ *sync_start_pending = true;
+
+ if (sync_iterations % 5 == 0)
+ {
+ /* Log a message every ten seconds */
+ ereport(LOG,
+ errmsg("waiting for remote slot \"%s\" LSN (%X/%X)"
+ " and catalog xmin %u) to pass local slot LSN"
+ " (%X/%X) and catalog xmin (%u)",
+ remote_slot->name,
+ LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+ remote_slot->catalog_xmin,
+ LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+ MyReplicationSlot->data.catalog_xmin));
+ }
+ }
+
return false;
}
@@ -622,7 +661,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* Returns TRUE if the local slot is updated.
*/
static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(WalReceiverConn * wrconn, RemoteSlot * remote_slot,
+ Oid remote_dbid, bool *sync_start_pending, int sync_iterations)
{
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
@@ -715,8 +755,11 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
- slot_updated = update_and_persist_local_synced_slot(remote_slot,
- remote_dbid);
+ slot_updated = update_and_persist_local_synced_slot(wrconn,
+ remote_slot,
+ remote_dbid,
+ sync_start_pending,
+ sync_iterations);
}
/* Slot ready for sync, so sync it. */
@@ -738,7 +781,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
- NULL, NULL);
+ NULL, NULL, sync_iterations);
}
}
/* Otherwise create the slot first. */
@@ -785,7 +828,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
- update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+ update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid,
+ sync_start_pending,
+ sync_iterations);
slot_updated = true;
}
@@ -796,15 +841,17 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
}
/*
- * Synchronize slots.
+ * Fetch remote slots.
*
- * Gets the failover logical slots info from the primary server and updates
- * the slots locally. Creates the slots if not present on the standby.
+ * Gets the failover logical slots info from the primary server and creates
+ * a list of remote slots that need to be synchronized locally.
*
- * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ * NOTE: Caller must ensure a transaction is active before calling this function.
+ *
+ * Returns a list of RemoteSlot structures, or NIL if no slots need syncing.
*/
-static bool
-synchronize_slots(WalReceiverConn *wrconn)
+static List *
+fetch_remote_slots(WalReceiverConn *wrconn)
{
#define SLOTSYNC_COLUMN_COUNT 10
Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
@@ -813,21 +860,12 @@ synchronize_slots(WalReceiverConn *wrconn)
WalRcvExecResult *res;
TupleTableSlot *tupslot;
List *remote_slot_list = NIL;
- bool some_slot_updated = false;
- bool started_tx = false;
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
" restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
" database, invalidation_reason"
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- /* The syscache access in walrcv_exec() needs a transaction env. */
- if (!IsTransactionState())
- {
- StartTransactionCommand();
- started_tx = true;
- }
-
/* Execute the query */
res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
if (res->status != WALRCV_OK_TUPLES)
@@ -835,7 +873,7 @@ synchronize_slots(WalReceiverConn *wrconn)
errmsg("could not fetch failover logical slots info from the primary server: %s",
res->err));
- /* Construct the remote_slot tuple and synchronize each slot locally */
+ /* Construct the remote_slot tuple and build list of slots to sync */
tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
{
@@ -912,6 +950,180 @@ synchronize_slots(WalReceiverConn *wrconn)
ExecClearTuple(tupslot);
}
+ walrcv_clear_result(res);
+
+ return remote_slot_list;
+}
+
+/*
+ * Update remote slots list with current values.
+ *
+ * Takes a list of RemoteSlot structures and queries the primary server to
+ * get updated values for those specific slots. This is useful for refreshing
+ * slot information without fetching all failover slots again.
+ *
+ * NOTE: Caller must ensure a transaction is active before calling this
+ * function.
+ *
+ * Parameters: wrconn - Connection to the primary server remote_slot_list -
+ * List of RemoteSlot structures to update
+ *
+ * Returns the updated list, or the original list if query fails. Slots that
+ * no longer exist on the primary will be removed from the list.
+ */
+static List *
+refresh_remote_slots(WalReceiverConn * wrconn, List * remote_slot_list)
+{
+#define UPDATE_SLOTSYNC_COLUMN_COUNT 10
+ Oid slotRow[UPDATE_SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
+ LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
+ WalRcvExecResult *res;
+ TupleTableSlot *tupslot;
+ List *updated_slot_list = NIL;
+ StringInfoData query;
+ ListCell *lc;
+ bool first_slot = true;
+
+ /* If the input list is empty, return it as-is */
+ if (remote_slot_list == NIL)
+ return remote_slot_list;
+
+ /* Build query with slot names from the input list */
+ initStringInfo(&query);
+ appendStringInfoString(&query,
+ "SELECT slot_name, plugin, confirmed_flush_lsn,"
+ " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
+ " database, invalidation_reason"
+ " FROM pg_catalog.pg_replication_slots"
+ " WHERE failover and NOT temporary AND slot_name IN (");
+
+ /* Add slot names to the IN clause */
+ foreach(lc, remote_slot_list)
+ {
+ RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc);
+
+ if (!first_slot)
+ appendStringInfoString(&query, ", ");
+
+ appendStringInfo(&query, "'%s'", remote_slot->name);
+ first_slot = false;
+ }
+ appendStringInfoString(&query, ")");
+
+ /* Execute the query */
+ res = walrcv_exec(wrconn, query.data, UPDATE_SLOTSYNC_COLUMN_COUNT, slotRow);
+ if (res->status != WALRCV_OK_TUPLES)
+ {
+ ereport(WARNING,
+ errmsg("could not fetch updated failover logical slots info"
+ " from the primary server: %s",
+ res->err));
+ pfree(query.data);
+ return remote_slot_list;
+ }
+
+ /* Process the updated slot information */
+ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+ {
+ bool isnull;
+ RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
+ Datum d;
+ int col = 0;
+
+ remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
+ &isnull));
+ Assert(!isnull);
+
+ remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
+ &isnull));
+ Assert(!isnull);
+
+ /*
+ * Handle possible null values for LSN and Xmin if slot is
+ * invalidated on the primary server.
+ */
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
+ DatumGetLSN(d);
+
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
+ DatumGetTransactionId(d);
+
+ remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
+ &isnull));
+ Assert(!isnull);
+
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
+ remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
+ &isnull));
+ Assert(!isnull);
+
+ remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
+ ++col, &isnull));
+ Assert(!isnull);
+
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->invalidated = isnull ? RS_INVAL_NONE :
+ GetSlotInvalidationCause(TextDatumGetCString(d));
+
+ /* Sanity check */
+ Assert(col == UPDATE_SLOTSYNC_COLUMN_COUNT);
+
+ /*
+ * Apply the same ephemeral slot filtering as in
+ * fetch_remote_slots. Skip slots that are in RS_EPHEMERAL
+ * state (invalid LSNs/xmin but not explicitly invalidated).
+ */
+ if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
+ XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
+ !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
+ remote_slot->invalidated == RS_INVAL_NONE)
+ pfree(remote_slot);
+ else
+ /* Add to updated list */
+ updated_slot_list = lappend(updated_slot_list, remote_slot);
+
+ ExecClearTuple(tupslot);
+ }
+
+ walrcv_clear_result(res);
+ pfree(query.data);
+
+ /*
+ * Free the original list structures (but not the slot names, as
+ * they're reused)
+ */
+ foreach(lc, remote_slot_list)
+ {
+ RemoteSlot *old_slot = (RemoteSlot *) lfirst(lc);
+ pfree(old_slot);
+ }
+ list_free(remote_slot_list);
+
+ return updated_slot_list;
+}
+
+/*
+ * Synchronize slots.
+ *
+ * Takes a list of remote slots and synchronizes them locally. Creates the
+ * slots if not present on the standby and updates existing ones.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ */
+static bool
+synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list,
+ List **pending_sync_start_slots, int sync_iterations)
+{
+ bool some_slot_updated = false;
+
/* Drop local slots that no longer need to be synced. */
drop_local_obsolete_slots(remote_slot_list);
@@ -919,6 +1131,7 @@ synchronize_slots(WalReceiverConn *wrconn)
foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
{
Oid remote_dbid = get_database_oid(remote_slot->database, false);
+ bool sync_start_pending = false;
/*
* Use shared lock to prevent a conflict with
@@ -927,19 +1140,16 @@ synchronize_slots(WalReceiverConn *wrconn)
*/
LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
- some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+ some_slot_updated |= synchronize_one_slot(wrconn, remote_slot,
+ remote_dbid, &sync_start_pending, sync_iterations);
+
+ /* Only append to list if caller wants it and sync is pending */
+ if (pending_sync_start_slots != NULL && sync_start_pending)
+ *pending_sync_start_slots = lappend(*pending_sync_start_slots, remote_slot);
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
- /* We are done, free remote_slot_list elements */
- list_free_deep(remote_slot_list);
-
- walrcv_clear_result(res);
-
- if (started_tx)
- CommitTransactionCommand();
-
return some_slot_updated;
}
@@ -1131,7 +1341,7 @@ slotsync_reread_config(void)
bool conninfo_changed;
bool primary_slotname_changed;
- Assert(sync_replication_slots);
+ Assert(!AmLogicalSlotSyncWorkerProcess() || sync_replication_slots);
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
@@ -1252,31 +1462,38 @@ slotsync_worker_onexit(int code, Datum arg)
* sync-cycles is reset to the minimum (200ms).
*/
static void
-wait_for_slot_activity(bool some_slot_updated)
+wait_for_slot_activity(bool some_slot_updated, bool called_from_api)
{
- int rc;
+ int rc;
+ int wait_time;
- if (!some_slot_updated)
- {
- /*
- * No slots were updated, so double the sleep time, but not beyond the
- * maximum allowable value.
- */
- sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
- }
- else
- {
+ if (called_from_api) {
/*
- * Some slots were updated since the last sleep, so reset the sleep
- * time.
+ * When called from pg_sync_replication_slots, use a fixed 2
+ * second wait time.
*/
- sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
+ wait_time = SLOTSYNC_API_NAPTIME_MS;
+ } else {
+ if (!some_slot_updated) {
+ /*
+ * No slots were updated, so double the sleep time,
+ * but not beyond the maximum allowable value.
+ */
+ sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
+ } else {
+ /*
+ * Some slots were updated since the last sleep, so
+ * reset the sleep time.
+ */
+ sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
+ }
+ wait_time = sleep_ms;
}
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- sleep_ms,
- WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+ wait_time,
+ WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
@@ -1505,12 +1722,28 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
for (;;)
{
bool some_slot_updated = false;
+ List *remote_slots;
+ bool started_tx = false;
ProcessSlotSyncInterrupts(wrconn);
- some_slot_updated = synchronize_slots(wrconn);
+ /*
+ * The syscache access in fetch_remote_slots() needs a
+ * transaction env.
+ */
+ if (!IsTransactionState()) {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ remote_slots = fetch_remote_slots(wrconn);
+ some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL, 0);
+ list_free_deep(remote_slots);
+
+ if (started_tx)
+ CommitTransactionCommand();
- wait_for_slot_activity(some_slot_updated);
+ wait_for_slot_activity(some_slot_updated, false);
}
/*
@@ -1736,19 +1969,85 @@ slotsync_failure_callback(int code, Datum arg)
}
/*
- * Synchronize the failover enabled replication slots using the specified
- * primary server connection.
+ * Synchronize failover enabled replication slots using the specified primary
+ * server connection.
+ *
+ * Repeatedly fetches and updates replication slot information from the
+ * primary until all slots are at least "sync ready". Retry is done after 2
+ * sec wait. Exits early is promotion is triggered.
*/
void
-SyncReplicationSlots(WalReceiverConn *wrconn)
+SyncReplicationSlots(WalReceiverConn * wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ List *remote_slots;
+ bool started_tx = false;
+ int sync_iterations = 0;
+
check_and_set_sync_info(InvalidPid);
validate_remote_info(wrconn);
- synchronize_slots(wrconn);
+ /*
+ * The syscache access in fetch_remote_slots() needs a
+ * transaction env.
+ */
+ if (!IsTransactionState()) {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ remote_slots = fetch_remote_slots(wrconn);
+
+ /* Retry until all slots are sync ready atleast */
+ for (;;)
+ {
+ bool some_slot_updated = false;
+ List *pending_sync_start_slots = NIL;
+
+ sync_iterations++;
+
+ /* Refresh remote slot data */
+ remote_slots = refresh_remote_slots(wrconn, remote_slots);
+
+ /* Attempt to synchronize slots */
+ some_slot_updated = synchronize_slots(wrconn, remote_slots,
+ &pending_sync_start_slots, sync_iterations);
+
+ /* Done if all slots are atleast sync ready */
+ if (pending_sync_start_slots == NIL)
+ break;
+ else
+ {
+ list_free(pending_sync_start_slots);
+ pending_sync_start_slots = NIL;
+
+ /* wait for 2 seconds before retrying */
+ wait_for_slot_activity(some_slot_updated, true);
+
+ /*
+ * If we've been promoted, then no point
+ * continuing.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("exiting from slot synchronization as"
+ " promotion is triggered")));
+ break;
+ }
+
+ /* Handle any termination request if any */
+ ProcessSlotSyncInterrupts(wrconn);
+ }
+ }
+
+ list_free_deep(remote_slots);
+
+ if (started_tx)
+ CommitTransactionCommand();
/* Cleanup the synced temporary slots */
ReplicationSlotCleanup(true);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 0be307d..3497f0f 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process."
LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process."
LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process."
RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
-REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker."
REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up."
SYSLOGGER_MAIN "Waiting in main loop of syslogger process."
WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process."
WAL_SENDER_MAIN "Waiting in main loop of WAL sender process."
--
1.8.3.1