From c8fb374d71ef9e19edfb3515051428904ba8fd86 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 16 Jul 2025 05:19:14 -0400 Subject: [PATCH v2] Improve initial slot synchronization in pg_sync_replication_slots() During initial slot synchronization on a standby, the operation may fail if required catalog rows or WALs have been removed or are at risk of removal. The slotsync worker handles this by creating a temporary slot for initial sync and retain it even in case of failure. It will keep retrying until the slot on the primary has been advanced to a position where all the required data are also available on the standby. However, pg_sync_replication_slots() had no such protection mechanism. The SQL API would fail immediately if synchronization requirements weren't met. This could lead to permanent failure as the standby might continue removing the still-required data. To address this, we now make pg_sync_replication_slots() wait for the primary slot to advance to a suitable position before completing synchronization and before removing the temporary slot. Once the slot advances to a suitable position, we retry synchronization. Additionally, if a promotion occurs on the standby during this wait, the process exits gracefully and the temporary slot is removed. --- doc/src/sgml/func.sgml | 4 +- doc/src/sgml/logicaldecoding.sgml | 5 +- src/backend/replication/logical/slotsync.c | 113 +++++++++++++++-------------- 3 files changed, 65 insertions(+), 57 deletions(-) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index a6d7976..5e5e6a9 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -29980,9 +29980,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset standby server. Temporary synced slots, if any, cannot be used for logical decoding and must be dropped after promotion. See for details. - Note that this function is primarily intended for testing and - debugging purposes and should be used with caution. Additionaly, - this function cannot be executed if + Note that this function cannot be executed if sync_replication_slots is enabled and the slotsync worker is already running to perform the synchronization of slots. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index a8c18f9..2e4d2fa 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -370,7 +370,10 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU pg_create_logical_replication_slot, or by using the failover option of - CREATE SUBSCRIPTION during slot creation. + CREATE SUBSCRIPTION during slot creation, and then + calling + pg_sync_replication_slots + on the standby. Additionally, enabling sync_replication_slots on the standby is required. By enabling diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index c2a8e81..ca71727 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -559,9 +559,10 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) static bool wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot) { -#define SLOT_QUERY_COLUMN_COUNT 4 +#define SLOT_QUERY_COLUMN_COUNT 3 StringInfoData cmd; + int wait_iterations = 0; Assert(!AmLogicalSlotSyncWorkerProcess()); @@ -576,7 +577,7 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot) initStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT invalidation_reason IS NOT NULL, restart_lsn," + "SELECT restart_lsn," " confirmed_flush_lsn, catalog_xmin" " FROM pg_catalog.pg_replication_slots" " WHERE slot_name = %s", @@ -584,7 +585,6 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot) for (;;) { - bool new_invalidated; XLogRecPtr new_restart_lsn; XLogRecPtr new_confirmed_lsn; TransactionId new_catalog_xmin; @@ -594,7 +594,7 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot) int rc; int col = 0; bool isnull; - Oid slotRow[SLOT_QUERY_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, XIDOID}; + Oid slotRow[SLOT_QUERY_COLUMN_COUNT] = {LSNOID, LSNOID, XIDOID}; /* Handle any termination request if any */ ProcessSlotSyncInterrupts(wrconn); @@ -621,52 +621,23 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot) return false; } - /* - * It is possible to get null value for restart_lsn if the slot is - * invalidated on the primary server, so handle accordingly. - */ - new_invalidated = DatumGetBool(slot_getattr(tupslot, ++col, &isnull)); - Assert(!isnull); - + /* Any slot with NULL in these fields should not have made it this far */ d = slot_getattr(tupslot, ++col, &isnull); - new_restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); - - if (new_invalidated || XLogRecPtrIsInvalid(new_restart_lsn)) - { - /* - * The slot won't be persisted by the caller; it will be cleaned up - * at the end of synchronization. - */ - ereport(WARNING, - errmsg("aborting initial sync for slot \"%s\"", - remote_slot->name), - errdetail("This slot was invalidated on the primary server.")); - - pfree(cmd.data); - ExecClearTuple(tupslot); - walrcv_clear_result(res); - - return false; - } + Assert(!isnull); + new_restart_lsn = DatumGetLSN(d); - /* - * It is possible to get null values for confirmed_lsn and - * catalog_xmin if on the primary server the slot is just created with - * a valid restart_lsn and slot-sync worker has fetched the slot - * before the primary server could set valid confirmed_lsn and - * catalog_xmin. - */ d = slot_getattr(tupslot, ++col, &isnull); - new_confirmed_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); + Assert(!isnull); + new_confirmed_lsn = DatumGetLSN(d); d = slot_getattr(tupslot, ++col, &isnull); - new_catalog_xmin = isnull ? InvalidTransactionId : DatumGetTransactionId(d); + Assert(!isnull); + new_catalog_xmin = DatumGetTransactionId(d); ExecClearTuple(tupslot); walrcv_clear_result(res); if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn && - !XLogRecPtrIsInvalid(new_confirmed_lsn) && TransactionIdFollowsOrEquals(new_catalog_xmin, MyReplicationSlot->data.catalog_xmin)) { @@ -691,6 +662,22 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot) } /* + * If in SQL API synchronization, and we've been promoted, then no point + * continuing. + */ + if (!AmLogicalSlotSyncWorkerProcess() && PromoteIsTriggered()) + { + ereport(WARNING, + errmsg("aborting sync for slot \"%s\"", + remote_slot->name), + errdetail("Promotion occurred before this slot was fully" + " synchronized.")); + pfree(cmd.data); + + return false; + } + + /* * XXX: Is waiting for 2 seconds before retrying enough or more or * less? */ @@ -701,6 +688,20 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot) if (rc & WL_LATCH_SET) ResetLatch(MyLatch); + + /* log a message every ten seconds */ + wait_iterations++; + if (wait_iterations % 5 == 0) + { + ereport(LOG, + errmsg("continuing to wait for remote slot \"%s\" LSN (%X/%X) and catalog xmin" + " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + } } } @@ -733,22 +734,28 @@ update_and_persist_local_synced_slot(WalReceiverConn *wrconn, /* * The remote slot didn't catch up to locally reserved position. * - * For the slotsync worker, we do not drop the slot because the - * restart_lsn can be ahead of the current location when recreating the - * slot in the next cycle. It may take more time to create such a slot. - * Therefore, we keep this slot and attempt the synchronization in the - * next cycle. - * + * If we're in the slotsync worker, we retain the slot and retry in the + * next cycle. The restart_lsn might advance by then, allowing the slot + * to be created successfully later. + */ + if (AmLogicalSlotSyncWorkerProcess()) + return false; + + /* * For SQL API synchronization, we wait for the remote slot to catch up - * rather than leaving temporary slots. This is because we could not - * predict when (or if) the SQL function might be executed again, and - * the creating session might persist after promotion. Without - * automatic cleanup, this could lead to temporary slots being retained - * for a longer time. + * here, since we can't assume the SQL API will be called again soon. + * We will retry the sync once the slot catches up. + * + * Note: This will return false if a promotion is triggered on the + * standby while waiting, in which case we stop syncing and drop the + * temporary slot. */ - if (AmLogicalSlotSyncWorkerProcess() || - !wait_for_primary_slot_catchup(wrconn, remote_slot)) + if (!wait_for_primary_slot_catchup(wrconn, remote_slot)) return false; + else + update_local_synced_slot(remote_slot, remote_dbid, + &found_consistent_snapshot, + &remote_slot_precedes); } /* -- 1.8.3.1