v2-logical-rollback-spike.patch
application/octet-stream
Filename: v2-logical-rollback-spike.patch
Type: application/octet-stream
Part: 0
From 62b52d0011f08d472f3d60b67110453b07f1017d Mon Sep 17 00:00:00 2001
From: Nikolay Samokhvalov <nik@postgres.ai>
Date: Thu, 16 Apr 2026 19:25:55 -0700
Subject: [PATCH] Do not count logical decoding cleanup aborts in xact_rollback
ReorderBufferProcessTXN() aborts the current transaction after each
decoded commit to release locks and clean up catalog access. In a
logical walsender that is a top-level abort, so every decoded commit
bumps pg_stat_database.xact_rollback; the counts surface as a spike on
walsender exit (e.g. when a subscription is disabled).
Add AbortCurrentTransactionWithoutXactStats(), a wrapper that suppresses
only the DB-level xact_commit/xact_rollback counter and leaves
per-relation and subxact stat handling intact. Use it from the
top-level cleanup paths in ReorderBufferProcessTXN() (gated on
!using_subtxn) and SnapBuildClearExportedSnapshot().
A TAP test asserts a publisher xact_rollback delta of 0 across five
decoded transactions (delta is 5 without the fix).
Reported-by: Rafael Thofehrn Castro
Discussion: https://postgr.es/m/CAM527d_EbU5Li4a5FdKQjYsdF-4Lqr_i3jXmZOm7Wbb%3DQ2KzTw%40mail.gmail.com
---
src/backend/access/transam/twophase.c | 2 +-
src/backend/access/transam/xact.c | 39 ++++++++++-
.../replication/logical/reorderbuffer.c | 16 +++--
src/backend/replication/logical/snapbuild.c | 9 ++-
src/backend/utils/activity/pgstat_xact.c | 11 ++-
src/include/access/xact.h | 1 +
src/include/pgstat.h | 2 +-
src/test/subscription/meson.build | 1 +
.../t/039_publisher_xact_rollback.pl | 70 +++++++++++++++++++
9 files changed, 139 insertions(+), 12 deletions(-)
create mode 100644 src/test/subscription/t/039_publisher_xact_rollback.pl
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 1035e8b3fc7..3aa31e7f805 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1676,7 +1676,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
LWLockRelease(TwoPhaseStateLock);
/* Count the prepared xact as committed or aborted */
- AtEOXact_PgStat(isCommit, false);
+ AtEOXact_PgStat(isCommit, false, true);
/*
* And now we can clean up any files we may have left.
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 48bc90c9673..35d5e174011 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -316,6 +316,15 @@ typedef struct XactCallbackItem
static XactCallbackItem *Xact_callbacks = NULL;
+/*
+ * When true, suppress the pg_stat_database xact_commit/xact_rollback bump
+ * for the current transaction end. Must only be set via
+ * AbortCurrentTransactionWithoutXactStats(); assertions in
+ * StartTransaction() and in the wrapper itself guard against the flag
+ * leaking across transactions.
+ */
+static bool xactSkipXactStats = false;
+
/*
* List of add-on start- and end-of-subxact callbacks
*/
@@ -2118,6 +2127,7 @@ StartTransaction(void)
/* check the current transaction state */
Assert(s->state == TRANS_DEFAULT);
+ Assert(!xactSkipXactStats);
/*
* Set the current transaction state information appropriately during
@@ -2514,7 +2524,7 @@ CommitTransaction(void)
AtEOXact_Files(true);
AtEOXact_ComboCid();
AtEOXact_HashTables(true);
- AtEOXact_PgStat(true, is_parallel_worker);
+ AtEOXact_PgStat(true, is_parallel_worker, true);
AtEOXact_Snapshot(true, false);
AtEOXact_ApplyLauncher(true);
AtEOXact_LogicalRepWorkers(true);
@@ -3039,7 +3049,7 @@ AbortTransaction(void)
AtEOXact_Files(false);
AtEOXact_ComboCid();
AtEOXact_HashTables(false);
- AtEOXact_PgStat(false, is_parallel_worker);
+ AtEOXact_PgStat(false, is_parallel_worker, !xactSkipXactStats);
AtEOXact_ApplyLauncher(false);
AtEOXact_LogicalRepWorkers(false);
AtEOXact_LogicalCtl();
@@ -3509,6 +3519,31 @@ AbortCurrentTransaction(void)
}
}
+/*
+ * AbortCurrentTransactionWithoutXactStats
+ *
+ * Like AbortCurrentTransaction(), but do not count the transaction abort in
+ * pg_stat_database.xact_rollback. This is for internal cleanup aborts that
+ * release transaction-local resources but do not represent a user-visible
+ * transaction rollback.
+ */
+void
+AbortCurrentTransactionWithoutXactStats(void)
+{
+ Assert(!xactSkipXactStats);
+
+ xactSkipXactStats = true;
+ PG_TRY();
+ {
+ AbortCurrentTransaction();
+ }
+ PG_FINALLY();
+ {
+ xactSkipXactStats = false;
+ }
+ PG_END_TRY();
+}
+
/*
* AbortCurrentTransactionInternal - a function doing an iteration of work
* regarding handling the current transaction abort. In the case of
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 682d13c9f22..f953676bfe1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2674,9 +2674,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* Aborting the current (sub-)transaction as a whole has the right
* semantics. We want all locks acquired in here to be released, not
* reassigned to the parent and we do not want any database access
- * have persistent effects.
+ * have persistent effects. In the !using_subtxn case this is a
+ * top-level abort; keep it out of pg_stat_database.xact_rollback.
*/
- AbortCurrentTransaction();
+ if (using_subtxn)
+ AbortCurrentTransaction();
+ else
+ AbortCurrentTransactionWithoutXactStats();
/* make sure there's no cache pollution */
if (rbtxn_distr_inval_overflowed(txn))
@@ -2737,9 +2741,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
/*
* Force cache invalidation to happen outside of a valid transaction
- * to prevent catalog access as we just caught an error.
+ * to prevent catalog access as we just caught an error. As above,
+ * keep the top-level abort out of pg_stat_database.xact_rollback.
*/
- AbortCurrentTransaction();
+ if (using_subtxn)
+ AbortCurrentTransaction();
+ else
+ AbortCurrentTransactionWithoutXactStats();
/* make sure there's no cache pollution */
if (rbtxn_distr_inval_overflowed(txn))
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index c8309b96ed4..ba9af88c505 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -631,8 +631,13 @@ SnapBuildClearExportedSnapshot(void)
*/
tmpResOwner = SavedResourceOwnerDuringExport;
- /* make sure nothing could have ever happened */
- AbortCurrentTransaction();
+ /*
+ * Make sure nothing could have ever happened. Keep this cleanup abort
+ * out of pg_stat_database.xact_rollback; we must be at top level so
+ * the abort reaches AtEOXact_PgStat_Database.
+ */
+ Assert(!IsSubTransaction());
+ AbortCurrentTransactionWithoutXactStats();
CurrentResourceOwner = tmpResOwner;
}
diff --git a/src/backend/utils/activity/pgstat_xact.c b/src/backend/utils/activity/pgstat_xact.c
index 5e2d69e6297..ea9f703c088 100644
--- a/src/backend/utils/activity/pgstat_xact.c
+++ b/src/backend/utils/activity/pgstat_xact.c
@@ -37,11 +37,18 @@ static PgStat_SubXactStatus *pgStatXactStack = NULL;
* Called from access/transam/xact.c at top-level transaction commit/abort.
*/
void
-AtEOXact_PgStat(bool isCommit, bool parallel)
+AtEOXact_PgStat(bool isCommit, bool parallel, bool count_xact_stats)
{
PgStat_SubXactStatus *xact_state;
- AtEOXact_PgStat_Database(isCommit, parallel);
+ /*
+ * Only the database-level xact_commit/xact_rollback counter is gated
+ * here. Per-relation and subxact stat handling below must still run
+ * unconditionally so any stats accumulated during the transaction are
+ * not lost.
+ */
+ if (count_xact_stats)
+ AtEOXact_PgStat_Database(isCommit, parallel);
/* handle transactional stats information */
xact_state = pgStatXactStack;
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index a8cbdf247c8..a5ec9a027d4 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -467,6 +467,7 @@ extern void SaveTransactionCharacteristics(SavedTransactionCharacteristics *s);
extern void RestoreTransactionCharacteristics(const SavedTransactionCharacteristics *s);
extern void CommitTransactionCommand(void);
extern void AbortCurrentTransaction(void);
+extern void AbortCurrentTransactionWithoutXactStats(void);
extern void BeginTransactionBlock(void);
extern bool EndTransactionBlock(bool chain);
extern bool PrepareTransactionBlock(const char *gid);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index dfa2e837638..8509a590bbf 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -814,7 +814,7 @@ extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
* Functions in pgstat_xact.c
*/
-extern void AtEOXact_PgStat(bool isCommit, bool parallel);
+extern void AtEOXact_PgStat(bool isCommit, bool parallel, bool count_xact_stats);
extern void AtEOSubXact_PgStat(bool isCommit, int nestDepth);
extern void AtPrepare_PgStat(void);
extern void PostPrepare_PgStat(void);
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index e71e95c6297..268fa8c3e9c 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -48,6 +48,7 @@ tests += {
't/036_sequences.pl',
't/037_except.pl',
't/038_walsnd_shutdown_timeout.pl',
+ 't/039_publisher_xact_rollback.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/039_publisher_xact_rollback.pl b/src/test/subscription/t/039_publisher_xact_rollback.pl
new file mode 100644
index 00000000000..1fa72a19712
--- /dev/null
+++ b/src/test/subscription/t/039_publisher_xact_rollback.pl
@@ -0,0 +1,70 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+# Check that pg_stat_database.xact_rollback on a logical-replication
+# publisher is not inflated by the walsender's internal catalog-cleanup
+# aborts. ReorderBufferProcessTXN() ends each decoded transaction with
+# AbortCurrentTransaction(); in the walsender that is a top-level abort
+# whose counter increment flushes to shared stats on walsender exit.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+$node_publisher->safe_psql('postgres',
+ 'CREATE TABLE t (id int PRIMARY KEY)');
+$node_subscriber->safe_psql('postgres',
+ 'CREATE TABLE t (id int PRIMARY KEY)');
+
+$node_publisher->safe_psql('postgres', 'CREATE PUBLICATION p FOR TABLE t');
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION s CONNECTION '$publisher_connstr' PUBLICATION p");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 's');
+
+# Use a baseline-delta rather than pg_stat_reset() to tolerate ambient
+# rollback activity.
+my $base = $node_publisher->safe_psql('postgres',
+ "SELECT xact_rollback FROM pg_stat_database WHERE datname = 'postgres'");
+chomp $base;
+
+# Five autocommit INSERTs: each becomes one decoded committed txn on the
+# walsender. Without the fix, that's five spurious rollbacks after DISABLE.
+my $n = 5;
+$node_publisher->safe_psql('postgres',
+ join('', map { "INSERT INTO t VALUES ($_);\n" } 1 .. $n));
+
+$node_publisher->wait_for_catchup('s');
+
+# Disabling the subscription terminates the walsender; its shutdown hook
+# flushes pgstat counters to shared stats.
+$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION s DISABLE');
+
+# Wait for this subscription's walsender (filter by application_name).
+$node_publisher->poll_query_until(
+ 'postgres', q{
+ SELECT count(*) = 0 FROM pg_stat_activity
+ WHERE backend_type = 'walsender' AND application_name = 's'
+})
+ or die 's walsender did not exit';
+
+my $final = $node_publisher->safe_psql('postgres',
+ "SELECT xact_rollback FROM pg_stat_database WHERE datname = 'postgres'");
+chomp $final;
+
+cmp_ok(
+ $final - $base, '==', 0,
+ 'walsender does not inflate publisher xact_rollback for decoded transactions'
+);
+
+done_testing();
--
2.50.1 (Apple Git-155)