From 8bbf1b200fb9f418286f837e8f6eb41c99f2890c Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 11 Nov 2025 13:01:07 +0900 Subject: [PATCH] HEAD: invalidate newly synchronized slot --- src/backend/access/transam/xlog.c | 3 + src/backend/replication/logical/slotsync.c | 5 + src/test/recovery/meson.build | 1 + .../recovery/t/110_slot_sync_invalidation.pl | 110 ++++++++++++++++++ 4 files changed, 119 insertions(+) create mode 100644 src/test/recovery/t/110_slot_sync_invalidation.pl diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 22d0a2e8c3a..49c22506e57 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7838,6 +7838,9 @@ CreateRestartPoint(int flags) receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; + + INJECTION_POINT("restartpoint-before-old-wal-removal", NULL); + KeepLogSeg(endptr, &_logSegNo); if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8b4afd87dc9..64afca24960 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -64,6 +64,7 @@ #include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/timeout.h" @@ -497,10 +498,14 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) while (true) { + INJECTION_POINT("slotsync-reserve-wal", NULL); + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); + INJECTION_POINT("slotsync-reserve-wal-before-compute-required-lsn", NULL); + /* Prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 523a5cd5b52..022af7b4c14 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -58,6 +58,7 @@ tests += { 't/047_checkpoint_physical_slot.pl', 't/048_vacuum_horizon_floor.pl', 't/049_wait_for_lsn.pl', + 't/110_slot_sync_invalidation.pl' ], }, } diff --git a/src/test/recovery/t/110_slot_sync_invalidation.pl b/src/test/recovery/t/110_slot_sync_invalidation.pl new file mode 100644 index 00000000000..3dfabb6c2da --- /dev/null +++ b/src/test/recovery/t/110_slot_sync_invalidation.pl @@ -0,0 +1,110 @@ + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +# Create primary +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); +$primary->append_conf( + 'postgresql.conf', qq( +autovacuum = off +checkpoint_timeout = 1h +)); +$primary->start; + +# Create a standby from the backup +my $backup_name = 'backup'; +$primary->backup($backup_name); + +my $standby = PostgreSQL::Test::Cluster->new('standby'); +$standby->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr = $primary->connstr; +$standby->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr dbname=postgres' +)); + +# Install injection_point +$primary->safe_psql('postgres', q{CREATE EXTENSION injection_points;}); + +# Create a physical replication slot to synchronize logical slots +$primary->safe_psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); + +# Start the standby so that slot syncing can begin +$standby->start; + +$primary->wait_for_replay_catchup($standby); + +# Create a logical replication slot to be synchronized +$primary->safe_psql('postgres', + q{SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);}); + +$primary->advance_wal(6); + +$primary->wait_for_replay_catchup($standby); + + +# Attach an injection point an try to synchronize the slot. It would stuck. +$standby->safe_psql('postgres', + q{SELECT injection_points_attach('slotsync-reserve-wal', 'wait')}); +$standby->safe_psql('postgres', + q{SELECT injection_points_attach('slotsync-reserve-wal-before-compute-required-lsn', 'wait')}); + +my $background1 = $standby->background_psql('postgres'); +$background1->query_until( + qr/slotsync-reserve-wal/, + q(\echo slotsync-reserve-wal + select pg_sync_replication_slots(); + \q +)); + +$standby->wait_for_event('client backend', 'slotsync-reserve-wal'); + +my $offset = -s $standby->logfile; + +$primary->safe_psql('postgres', q{CHECKPOINT}); + +# Attach an injection point while running the RESTARTPOINT +$standby->safe_psql('postgres', + q{SELECT injection_points_attach('restartpoint-before-old-wal-removal', 'wait')}); +my $background2 = $standby->background_psql('postgres'); +$background2->query_until( + qr/restartpoint-before-old-wal-removal/, + q(\echo restartpoint-before-old-wal-removal + CHECKPOINT; + \q +)); + +$standby->wait_for_event('checkpointer', 'restartpoint-before-old-wal-removal'); + +# Wake up the backend and stop again at another injection point +$standby->safe_psql('postgres', + q{SELECT injection_points_wakeup('slotsync-reserve-wal')}); + +$standby->wait_for_event('client backend', 'slotsync-reserve-wal-before-compute-required-lsn'); + +# Wake up the checkpointer to allow removing WALs. +$standby->safe_psql('postgres', + q{SELECT injection_points_wakeup('restartpoint-before-old-wal-removal')}); + +my $result = $standby->safe_psql('postgres', + q(SELECT slot_name FROM pg_replication_slots;)); + +is($result, 'lsub1_slot', 'logical replication slot can be synchronized'); +done_testing(); -- 2.47.3