v3-0001-Improve-initial-slot-synchronization-in-pg_sync_r.patch
application/octet-stream
Filename: v3-0001-Improve-initial-slot-synchronization-in-pg_sync_r.patch
Type: application/octet-stream
Part: 0
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v3-0001
Subject: Improve initial slot synchronization in pg_sync_replication_slots()
| File | + | − |
|---|---|---|
| doc/src/sgml/func.sgml | 1 | 3 |
| doc/src/sgml/logicaldecoding.sgml | 13 | 27 |
| src/backend/replication/logical/slotsync.c | 210 | 10 |
| src/backend/utils/activity/wait_event_names.txt | 1 | 0 |
From 661fcb4ced929bed2dd4d90e126c0a1cb39114c0 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Thu, 31 Jul 2025 05:33:42 -0400
Subject: [PATCH v3] Improve initial slot synchronization in
pg_sync_replication_slots()
During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.
The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.
To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
doc/src/sgml/func.sgml | 4 +-
doc/src/sgml/logicaldecoding.sgml | 40 ++---
src/backend/replication/logical/slotsync.c | 220 ++++++++++++++++++++++--
src/backend/utils/activity/wait_event_names.txt | 1 +
4 files changed, 225 insertions(+), 40 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 74a16af..4092677 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -30034,9 +30034,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
- Note that this function is primarily intended for testing and
- debugging purposes and should be used with caution. Additionally,
- this function cannot be executed if
+ Note that this function cannot be executed if
<link linkend="guc-sync-replication-slots"><varname>
sync_replication_slots</varname></link> is enabled and the slotsync
worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 593f784..edad0e9 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
<sect2 id="logicaldecoding-replication-slots-synchronization">
<title>Replication Slot Synchronization</title>
<para>
- The logical replication slots on the primary can be synchronized to
- the hot standby by using the <literal>failover</literal> parameter of
+ The logical replication slots on the primary can be enabled for
+ synchronization to the hot standby by using the
+ <literal>failover</literal> parameter of
<link linkend="pg-create-logical-replication-slot">
<function>pg_create_logical_replication_slot</function></link>, or by
using the <link linkend="sql-createsubscription-params-with-failover">
<literal>failover</literal></link> option of
- <command>CREATE SUBSCRIPTION</command> during slot creation.
- Additionally, enabling <link linkend="guc-sync-replication-slots">
- <varname>sync_replication_slots</varname></link> on the standby
- is required. By enabling <link linkend="guc-sync-replication-slots">
- <varname>sync_replication_slots</varname></link>
- on the standby, the failover slots can be synchronized periodically in
+ <command>CREATE SUBSCRIPTION</command> during slot creation. After that,
+ synchronization can be be performed either manually by calling
+ <link linkend="pg-sync-replication-slots">
+ <function>pg_sync_replication_slots</function></link>
+ on the standby, or automatically by enabling
+ <link linkend="guc-sync-replication-slots">
+ <varname>sync_replication_slots</varname></link> on the standby.
+ When <link linkend="guc-sync-replication-slots">
+ <varname>sync_replication_slots</varname></link> is enabled
+ on the standby, the failover slots are periodically synchronized by
the slotsync worker. For the synchronization to work, it is mandatory to
have a physical replication slot between the primary and the standby (i.e.,
<link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
@@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
receiving the WAL up to the latest flushed position on the primary server.
</para>
- <note>
- <para>
- While enabling <link linkend="guc-sync-replication-slots">
- <varname>sync_replication_slots</varname></link> allows for automatic
- periodic synchronization of failover slots, they can also be manually
- synchronized using the <link linkend="pg-sync-replication-slots">
- <function>pg_sync_replication_slots</function></link> function on the standby.
- However, this function is primarily intended for testing and debugging and
- should be used with caution. Unlike automatic synchronization, it does not
- include cyclic retries, making it more prone to synchronization failures,
- particularly during initial sync scenarios where the required WAL files
- or catalog rows for the slot may have already been removed or are at risk
- of being removed on the standby. In contrast, automatic synchronization
- via <varname>sync_replication_slots</varname> provides continuous slot
- updates, enabling seamless failover and supporting high availability.
- Therefore, it is the recommended method for synchronizing slots.
- </para>
- </note>
-
<para>
When slot synchronization is configured as recommended,
and the initial synchronization is performed either automatically or
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 2f0c08b..3308772 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -146,6 +146,7 @@ typedef struct RemoteSlot
ReplicationSlotInvalidationCause invalidated;
} RemoteSlot;
+static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn);
static void slotsync_failure_callback(int code, Datum arg);
static void update_synced_slots_inactive_since(void);
@@ -550,6 +551,185 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
}
/*
+ * Wait for remote slot to pass locally reserved position.
+ *
+ * Return true if remote_slot could catch up with the locally reserved
+ * position. Return false in all other cases.
+ */
+static bool
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
+{
+#define SLOT_QUERY_COLUMN_COUNT 4
+
+ StringInfoData cmd;
+ int wait_iterations = 0;
+
+ Assert(!AmLogicalSlotSyncWorkerProcess());
+
+ ereport(LOG,
+ errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin"
+ " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)",
+ remote_slot->name,
+ LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+ remote_slot->catalog_xmin,
+ LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+ MyReplicationSlot->data.catalog_xmin));
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT invalidation_reason IS NOT NULL, restart_lsn,"
+ " confirmed_flush_lsn, catalog_xmin"
+ " FROM pg_catalog.pg_replication_slots"
+ " WHERE slot_name = %s",
+ quote_literal_cstr(remote_slot->name));
+
+ for (;;)
+ {
+ bool new_invalidated;
+ XLogRecPtr new_restart_lsn;
+ XLogRecPtr new_confirmed_lsn;
+ TransactionId new_catalog_xmin;
+ WalRcvExecResult *res;
+ TupleTableSlot *tupslot;
+ Datum d;
+ int rc;
+ int col = 0;
+ bool isnull;
+ Oid slotRow[SLOT_QUERY_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, XIDOID};
+
+ /* Handle any termination request if any */
+ ProcessSlotSyncInterrupts(wrconn);
+
+ res = walrcv_exec(wrconn, cmd.data, SLOT_QUERY_COLUMN_COUNT, slotRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ errmsg("could not fetch slot \"%s\" info from the"
+ " primary server: %s",
+ remote_slot->name, res->err));
+
+ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+ {
+ ereport(WARNING,
+ errmsg("aborting initial sync for slot \"%s\"",
+ remote_slot->name),
+ errdetail("This slot was not found on the primary server."));
+
+ pfree(cmd.data);
+ walrcv_clear_result(res);
+
+ return false;
+ }
+
+ /*
+ * It is possible that the slot was invalidated on the primary, if so
+ * handle accordingly.
+ */
+ new_invalidated = DatumGetBool(slot_getattr(tupslot, ++col, &isnull));
+ Assert(!isnull);
+
+ if (new_invalidated)
+ {
+ /*
+ * The slot won't be persisted by the caller; it will be cleaned
+ * up at the end of synchronization.
+ */
+ ereport(WARNING,
+ errmsg("aborting initial sync for slot \"%s\"",
+ remote_slot->name),
+ errdetail("This slot was invalidated on the primary server."));
+
+ pfree(cmd.data);
+ ExecClearTuple(tupslot);
+ walrcv_clear_result(res);
+
+ return false;
+ }
+
+ /* Any slot with NULL in these fields should not have made it this far */
+ d = slot_getattr(tupslot, ++col, &isnull);
+ Assert(!isnull);
+ new_restart_lsn = DatumGetLSN(d);
+
+ d = slot_getattr(tupslot, ++col, &isnull);
+ Assert(!isnull);
+ new_confirmed_lsn = DatumGetLSN(d);
+
+ d = slot_getattr(tupslot, ++col, &isnull);
+ Assert(!isnull);
+ new_catalog_xmin = DatumGetTransactionId(d);
+
+ ExecClearTuple(tupslot);
+ walrcv_clear_result(res);
+
+ if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn &&
+ TransactionIdFollowsOrEquals(new_catalog_xmin,
+ MyReplicationSlot->data.catalog_xmin))
+ {
+ /* Update new values in remote_slot */
+ remote_slot->restart_lsn = new_restart_lsn;
+ remote_slot->confirmed_lsn = new_confirmed_lsn;
+ remote_slot->catalog_xmin = new_catalog_xmin;
+
+ ereport(LOG,
+ errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)"
+ " and catalog xmin (%u) has now passed local slot LSN"
+ " (%X/%X) and catalog xmin (%u)",
+ remote_slot->name,
+ LSN_FORMAT_ARGS(new_restart_lsn),
+ new_catalog_xmin,
+ LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+ MyReplicationSlot->data.catalog_xmin));
+
+ pfree(cmd.data);
+
+ return true;
+ }
+
+ /*
+ * If we've been promoted, then no point continuing.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots when"
+ " standby promotion is ongoing")));
+ pfree(cmd.data);
+
+ return false;
+ }
+
+ /*
+ * XXX: Is waiting for 2 seconds before retrying enough or more or
+ * less?
+ */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 2000L,
+ WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+
+ /* log a message every ten seconds */
+ wait_iterations++;
+ if (wait_iterations % 5 == 0)
+ {
+ ereport(LOG,
+ errmsg("continuing to wait for remote slot \"%s\" LSN (%X/%X) and catalog xmin"
+ " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)",
+ remote_slot->name,
+ LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+ remote_slot->catalog_xmin,
+ LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+ MyReplicationSlot->data.catalog_xmin));
+ }
+ }
+}
+
+/*
* If the remote restart_lsn and catalog_xmin have caught up with the
* local ones, then update the LSNs and persist the local synced slot for
* future synchronization; otherwise, do nothing.
@@ -558,7 +738,8 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
* false.
*/
static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(WalReceiverConn *wrconn,
+ RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
bool found_consistent_snapshot = false;
@@ -577,12 +758,28 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/*
* The remote slot didn't catch up to locally reserved position.
*
- * We do not drop the slot because the restart_lsn can be ahead of the
- * current location when recreating the slot in the next cycle. It may
- * take more time to create such a slot. Therefore, we keep this slot
- * and attempt the synchronization in the next cycle.
+ * If we're in the slotsync worker, we retain the slot and retry in the
+ * next cycle. The restart_lsn might advance by then, allowing the slot
+ * to be created successfully later.
*/
- return false;
+ if (AmLogicalSlotSyncWorkerProcess())
+ return false;
+
+ /*
+ * For SQL API synchronization, we wait for the remote slot to catch up
+ * here, since we can't assume the SQL API will be called again soon.
+ * We will retry the sync once the slot catches up.
+ *
+ * Note: This will return false if a promotion is triggered on the
+ * standby while waiting, in which case we stop syncing and drop the
+ * temporary slot.
+ */
+ if (!wait_for_primary_slot_catchup(wrconn, remote_slot))
+ return false;
+ else
+ update_local_synced_slot(remote_slot, remote_dbid,
+ &found_consistent_snapshot,
+ &remote_slot_precedes);
}
/*
@@ -622,7 +819,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* Returns TRUE if the local slot is updated.
*/
static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
+ Oid remote_dbid)
{
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
@@ -715,7 +913,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
- slot_updated = update_and_persist_local_synced_slot(remote_slot,
+ slot_updated = update_and_persist_local_synced_slot(wrconn,
+ remote_slot,
remote_dbid);
}
@@ -785,7 +984,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
- update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+ update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid);
slot_updated = true;
}
@@ -927,7 +1126,8 @@ synchronize_slots(WalReceiverConn *wrconn)
*/
LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
- some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+ some_slot_updated |= synchronize_one_slot(wrconn, remote_slot,
+ remote_dbid);
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 0be307d..9fa36ab 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -64,6 +64,7 @@ LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication paralle
RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker."
REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up."
SYSLOGGER_MAIN "Waiting in main loop of syslogger process."
WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process."
WAL_SENDER_MAIN "Waiting in main loop of WAL sender process."
--
1.8.3.1