v6-0001-Track-transactions-committed-in-BUILDING_SNAPSHOT.patch
application/octet-stream
Filename: v6-0001-Track-transactions-committed-in-BUILDING_SNAPSHOT.patch
Type: application/octet-stream
Part: 0
From 12dd3434ef13609b324bbbbe68a3f0e2a48934a2 Mon Sep 17 00:00:00 2001
From: ChangAo Chen <cca5507@qq.com>
Date: Fri, 21 Nov 2025 15:19:22 +0800
Subject: [PATCH v6 1/2] Track transactions committed in BUILDING_SNAPSHOT.
The historic snapshot previously didn't track transactions committed
in BUILDING_SNAPSHOT, this might result in a transaction taking an
incorrect snapshot and logical decoding being interrupted. So we need
to track these transactions.
We also need to handle the xlog which means a catalog change in BUILDING_SNAPSHOT
because the historic snapshot only tracks catalog modifying transactions.
---
src/backend/replication/logical/decode.c | 33 ++++++++++++++++++++----
1 file changed, 28 insertions(+), 5 deletions(-)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index cc03f0706e9..de1bed30781 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -206,12 +206,16 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
/*
- * If the snapshot isn't yet fully built, we cannot decode anything, so
- * bail out.
+ * If the snapshot hasn't started building yet, the transaction won't be
+ * decoded or tracked by the snapshot, so bail out.
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT)
return;
+ /*
+ * Note that if the snapshot isn't yet fully built, the xlog is only used
+ * to build the snapshot and won't be decoded.
+ */
switch (info)
{
case XLOG_XACT_COMMIT:
@@ -282,18 +286,24 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
TransactionId xid;
xl_xact_invals *invals;
+ bool has_snapshot;
xid = XLogRecGetXid(r);
invals = (xl_xact_invals *) XLogRecGetData(r);
+ has_snapshot =
+ SnapBuildCurrentState(builder) >= SNAPBUILD_FULL_SNAPSHOT;
/*
* Execute the invalidations for xid-less transactions,
* otherwise, accumulate them so that they can be processed at
* the commit time.
+ *
+ * Note that we only need to do this when we are not fast-forwarding
+ * and there is a snapshot.
*/
if (TransactionIdIsValid(xid))
{
- if (!ctx->fast_forward)
+ if (!ctx->fast_forward && has_snapshot)
ReorderBufferAddInvalidations(reorder, xid,
buf->origptr,
invals->nmsgs,
@@ -301,7 +311,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
buf->origptr);
}
- else if (!ctx->fast_forward)
+ else if (!ctx->fast_forward && has_snapshot)
ReorderBufferImmediateInvalidation(ctx->reorder,
invals->nmsgs,
invals->msgs);
@@ -419,7 +429,19 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* SnapBuildProcessRunningXacts().
*/
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ {
+ /*
+ * If we are building snapshot and the xlog means a catalog
+ * change, we need to mark it in the reorder buffer.
+ *
+ * Now only XLOG_HEAP2_NEW_CID means a catalog change.
+ */
+ if (SnapBuildCurrentState(builder) >= SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdIsValid(xid) && info == XLOG_HEAP2_NEW_CID)
+ ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+
return;
+ }
switch (info)
{
@@ -1306,6 +1328,7 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
Oid txn_dbid, RepOriginId origin_id)
{
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ SnapBuildCurrentState(ctx->snapshot_builder) < SNAPBUILD_CONSISTENT ||
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
FilterByOrigin(ctx, origin_id))
return true;
--
2.34.1