v2-0001-Improve-initial-slot-synchronization-in-pg_sync_r.patch
application/octet-stream
Filename: v2-0001-Improve-initial-slot-synchronization-in-pg_sync_r.patch
Type: application/octet-stream
Part: 0
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v2-0001
Subject: Improve initial slot synchronization in pg_sync_replication_slots()
| File | + | − |
|---|---|---|
| doc/src/sgml/func.sgml | 1 | 3 |
| doc/src/sgml/logicaldecoding.sgml | 4 | 1 |
| src/backend/replication/logical/slotsync.c | 60 | 53 |
From c8fb374d71ef9e19edfb3515051428904ba8fd86 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 16 Jul 2025 05:19:14 -0400
Subject: [PATCH v2] Improve initial slot synchronization in
pg_sync_replication_slots()
During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.
The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.
To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
doc/src/sgml/func.sgml | 4 +-
doc/src/sgml/logicaldecoding.sgml | 5 +-
src/backend/replication/logical/slotsync.c | 113 +++++++++++++++--------------
3 files changed, 65 insertions(+), 57 deletions(-)
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index a6d7976..5e5e6a9 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29980,9 +29980,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
- Note that this function is primarily intended for testing and
- debugging purposes and should be used with caution. Additionaly,
- this function cannot be executed if
+ Note that this function cannot be executed if
<link linkend="guc-sync-replication-slots"><varname>
sync_replication_slots</varname></link> is enabled and the slotsync
worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index a8c18f9..2e4d2fa 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -370,7 +370,10 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
<function>pg_create_logical_replication_slot</function></link>, or by
using the <link linkend="sql-createsubscription-params-with-failover">
<literal>failover</literal></link> option of
- <command>CREATE SUBSCRIPTION</command> during slot creation.
+ <command>CREATE SUBSCRIPTION</command> during slot creation, and then
+ calling <link linkend="pg-sync-replication-slots">
+ <function>pg_sync_replication_slots</function></link>
+ on the standby.
Additionally, enabling <link linkend="guc-sync-replication-slots">
<varname>sync_replication_slots</varname></link> on the standby
is required. By enabling <link linkend="guc-sync-replication-slots">
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index c2a8e81..ca71727 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -559,9 +559,10 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
static bool
wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
{
-#define SLOT_QUERY_COLUMN_COUNT 4
+#define SLOT_QUERY_COLUMN_COUNT 3
StringInfoData cmd;
+ int wait_iterations = 0;
Assert(!AmLogicalSlotSyncWorkerProcess());
@@ -576,7 +577,7 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
initStringInfo(&cmd);
appendStringInfo(&cmd,
- "SELECT invalidation_reason IS NOT NULL, restart_lsn,"
+ "SELECT restart_lsn,"
" confirmed_flush_lsn, catalog_xmin"
" FROM pg_catalog.pg_replication_slots"
" WHERE slot_name = %s",
@@ -584,7 +585,6 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
for (;;)
{
- bool new_invalidated;
XLogRecPtr new_restart_lsn;
XLogRecPtr new_confirmed_lsn;
TransactionId new_catalog_xmin;
@@ -594,7 +594,7 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
int rc;
int col = 0;
bool isnull;
- Oid slotRow[SLOT_QUERY_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, XIDOID};
+ Oid slotRow[SLOT_QUERY_COLUMN_COUNT] = {LSNOID, LSNOID, XIDOID};
/* Handle any termination request if any */
ProcessSlotSyncInterrupts(wrconn);
@@ -621,52 +621,23 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
return false;
}
- /*
- * It is possible to get null value for restart_lsn if the slot is
- * invalidated on the primary server, so handle accordingly.
- */
- new_invalidated = DatumGetBool(slot_getattr(tupslot, ++col, &isnull));
- Assert(!isnull);
-
+ /* Any slot with NULL in these fields should not have made it this far */
d = slot_getattr(tupslot, ++col, &isnull);
- new_restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
-
- if (new_invalidated || XLogRecPtrIsInvalid(new_restart_lsn))
- {
- /*
- * The slot won't be persisted by the caller; it will be cleaned up
- * at the end of synchronization.
- */
- ereport(WARNING,
- errmsg("aborting initial sync for slot \"%s\"",
- remote_slot->name),
- errdetail("This slot was invalidated on the primary server."));
-
- pfree(cmd.data);
- ExecClearTuple(tupslot);
- walrcv_clear_result(res);
-
- return false;
- }
+ Assert(!isnull);
+ new_restart_lsn = DatumGetLSN(d);
- /*
- * It is possible to get null values for confirmed_lsn and
- * catalog_xmin if on the primary server the slot is just created with
- * a valid restart_lsn and slot-sync worker has fetched the slot
- * before the primary server could set valid confirmed_lsn and
- * catalog_xmin.
- */
d = slot_getattr(tupslot, ++col, &isnull);
- new_confirmed_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+ Assert(!isnull);
+ new_confirmed_lsn = DatumGetLSN(d);
d = slot_getattr(tupslot, ++col, &isnull);
- new_catalog_xmin = isnull ? InvalidTransactionId : DatumGetTransactionId(d);
+ Assert(!isnull);
+ new_catalog_xmin = DatumGetTransactionId(d);
ExecClearTuple(tupslot);
walrcv_clear_result(res);
if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn &&
- !XLogRecPtrIsInvalid(new_confirmed_lsn) &&
TransactionIdFollowsOrEquals(new_catalog_xmin,
MyReplicationSlot->data.catalog_xmin))
{
@@ -691,6 +662,22 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
}
/*
+ * If in SQL API synchronization, and we've been promoted, then no point
+ * continuing.
+ */
+ if (!AmLogicalSlotSyncWorkerProcess() && PromoteIsTriggered())
+ {
+ ereport(WARNING,
+ errmsg("aborting sync for slot \"%s\"",
+ remote_slot->name),
+ errdetail("Promotion occurred before this slot was fully"
+ " synchronized."));
+ pfree(cmd.data);
+
+ return false;
+ }
+
+ /*
* XXX: Is waiting for 2 seconds before retrying enough or more or
* less?
*/
@@ -701,6 +688,20 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
+
+ /* log a message every ten seconds */
+ wait_iterations++;
+ if (wait_iterations % 5 == 0)
+ {
+ ereport(LOG,
+ errmsg("continuing to wait for remote slot \"%s\" LSN (%X/%X) and catalog xmin"
+ " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)",
+ remote_slot->name,
+ LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+ remote_slot->catalog_xmin,
+ LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+ MyReplicationSlot->data.catalog_xmin));
+ }
}
}
@@ -733,22 +734,28 @@ update_and_persist_local_synced_slot(WalReceiverConn *wrconn,
/*
* The remote slot didn't catch up to locally reserved position.
*
- * For the slotsync worker, we do not drop the slot because the
- * restart_lsn can be ahead of the current location when recreating the
- * slot in the next cycle. It may take more time to create such a slot.
- * Therefore, we keep this slot and attempt the synchronization in the
- * next cycle.
- *
+ * If we're in the slotsync worker, we retain the slot and retry in the
+ * next cycle. The restart_lsn might advance by then, allowing the slot
+ * to be created successfully later.
+ */
+ if (AmLogicalSlotSyncWorkerProcess())
+ return false;
+
+ /*
* For SQL API synchronization, we wait for the remote slot to catch up
- * rather than leaving temporary slots. This is because we could not
- * predict when (or if) the SQL function might be executed again, and
- * the creating session might persist after promotion. Without
- * automatic cleanup, this could lead to temporary slots being retained
- * for a longer time.
+ * here, since we can't assume the SQL API will be called again soon.
+ * We will retry the sync once the slot catches up.
+ *
+ * Note: This will return false if a promotion is triggered on the
+ * standby while waiting, in which case we stop syncing and drop the
+ * temporary slot.
*/
- if (AmLogicalSlotSyncWorkerProcess() ||
- !wait_for_primary_slot_catchup(wrconn, remote_slot))
+ if (!wait_for_primary_slot_catchup(wrconn, remote_slot))
return false;
+ else
+ update_local_synced_slot(remote_slot, remote_dbid,
+ &found_consistent_snapshot,
+ &remote_slot_precedes);
}
/*
--
1.8.3.1