v1-0001-pg_recvlogical-send-final-feedback-on-SIGINT-SIGT.patch

application/octet-stream

Filename: v1-0001-pg_recvlogical-send-final-feedback-on-SIGINT-SIGT.patch
Type: application/octet-stream
Part: 0
Message: pg_recvlogical: send final feedback on SIGINT/SIGTERM exit
From ad98280be52f37acb23d8cd14fc26dd04e1cd3e9 Mon Sep 17 00:00:00 2001
From: Fujii Masao <fujii@postgresql.org>
Date: Thu, 28 May 2026 23:19:42 +0900
Subject: [PATCH v1] pg_recvlogical: send final feedback on SIGINT/SIGTERM
 shutdown

Previously, when pg_recvlogical exited due to SIGINT or SIGTERM,
it could terminate without sending final feedback for the last decoded
changes it had already written locally. So, if pg_recvlogical was
restarted afterwards, the server-side logical replication slot could
still point behind those changes, causing them to be sent again.

Make pg_recvlogical send final feedback once more during SIGINT/SIGTERM
shutdown, before sending CopyDone. This gives the server one more chance
to advance the slot far enough to avoid resending already-written data,
so users are less likely to see duplicate decoded output after stopping
and restarting pg_recvlogical.

This remains a best-effort improvement rather than a guarantee. Depending
on when the signal arrives, pg_recvlogical can already have written
decoded output that the server cannot yet safely treat as confirmed, so a
later restart can still receive duplicate data.
---
 src/bin/pg_basebackup/pg_recvlogical.c        | 23 ++++++
 src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 74 +++++++++++++++++++
 2 files changed, 97 insertions(+)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 2fdf64bcadb..11abdbc274e 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -1071,6 +1071,29 @@ static void
 prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
 				   XLogRecPtr lsn)
 {
+	/*
+	 * If pg_recvlogical is terminated by a signal, we can reach here without
+	 * sending final feedback. In that case, send feedback once more before
+	 * sending CopyDone so the replication slot can advance far enough to
+	 * reduce the chance of resending duplicate data when pg_recvlogical is
+	 * restarted.
+	 *
+	 * This is still only a best-effort attempt. Depending on when the signal
+	 * arrives, the receiver may have written decoded output that the server
+	 * cannot yet safely treat as confirmed, so a later restart can still see
+	 * duplicate data.
+	 *
+	 * For other termination cases, such as STREAM_STOP_KEEPALIVE and
+	 * STREAM_STOP_END_OF_WAL, feedback has already been sent before reaching
+	 * here, so there is no need to call flushAndSendFeedback() again.
+	 */
+	if (reason == STREAM_STOP_SIGNAL)
+	{
+		TimestampTz now = feGetCurrentTimestamp();
+
+		(void) flushAndSendFeedback(conn, &now);
+	}
+
 	(void) PQputCopyEnd(conn, NULL);
 	(void) PQflush(conn);
 
diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
index 945a242bdad..5e3e36cc4f3 100644
--- a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
+++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
@@ -282,6 +282,80 @@ SKIP:
 		'pg_recvlogical output file respects group permissions (0640)');
 }
 
+SKIP:
+{
+	skip "signals not supported on Windows", 4
+	  if ($Config{osname} eq 'MSWin32' || $Config{osname} eq 'cygwin');
+
+	my $signal_outfile = $node->basedir . '/signal_shutdown.out';
+
+	$node->command_ok(
+		[
+			'pg_recvlogical',
+			'--slot' => 'signal_shutdown_test',
+			'--dbname' => $node->connstr('postgres'),
+			'--create-slot',
+		],
+		'slot created for signal shutdown test');
+
+	@pg_recvlogical_cmd = (
+		'pg_recvlogical',
+		'--slot' => 'signal_shutdown_test',
+		'--dbname' => $node->connstr('postgres'),
+		'--start',
+		'--file' => $signal_outfile,
+		'--fsync-interval' => '100',
+		'--status-interval' => '100');
+
+	$recv = IPC::Run::start(
+		[@pg_recvlogical_cmd],
+		'>' => \$stdout,
+		'2>' => \$stderr);
+
+	$node->safe_psql('postgres', 'INSERT INTO test_table VALUES (42)');
+
+	# Wait for not only INSERT but also COMMIT because the inserted
+	# change might not yet be safely confirmable by final feedback until
+	# the transaction has committed.
+	wait_for_file($signal_outfile,
+		qr/test_table: INSERT: x\[integer\]:42\b.*?\bCOMMIT\b/s);
+
+	$recv->signal('TERM');
+	$recv->finish();
+
+	$nextlsn =
+	  $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()');
+	chomp($nextlsn);
+
+	$node->command_ok(
+		[
+			'pg_recvlogical',
+			'--slot' => 'signal_shutdown_test',
+			'--dbname' => $node->connstr('postgres'),
+			'--start',
+			'--endpos' => $nextlsn,
+			'--no-loop',
+			'--file' => $signal_outfile,
+		],
+		'pg_recvlogical exits after signal without replaying flushed data');
+
+	my $signal_data = slurp_file($signal_outfile);
+	my $signal_count =
+	  (() = $signal_data =~ /test_table: INSERT: x\[integer\]:42\b/g);
+	is($signal_count, 1,
+		'pg_recvlogical does not duplicate decoded changes after signal shutdown'
+	);
+
+	$node->command_ok(
+		[
+			'pg_recvlogical',
+			'--slot' => 'signal_shutdown_test',
+			'--dbname' => $node->connstr('postgres'),
+			'--drop-slot'
+		],
+		'signal_shutdown_test slot dropped');
+}
+
 $node->command_ok(
 	[
 		'pg_recvlogical',
-- 
2.53.0