diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index cc03f0706e9..62a6d3097a1 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -206,10 +206,10 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; /* - * If the snapshot isn't yet fully built, we cannot decode anything, so - * bail out. + * If the snapshot hasn't started building yet, the transaction won't be + * decoded or tracked by the snapshot, so bail out. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT) return; switch (info) @@ -286,6 +286,9 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xid = XLogRecGetXid(r); invals = (xl_xact_invals *) XLogRecGetData(r); + if (SNAPBUILD_XID_IGNORED(builder, xid)) + break; + /* * Execute the invalidations for xid-less transactions, * otherwise, accumulate them so that they can be processed at @@ -418,7 +421,7 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * determining the candidate catalog_xmin for the replication slot. See * SnapBuildProcessRunningXacts(). */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + if (SNAPBUILD_XID_IGNORED(builder, xid)) return; switch (info) @@ -860,6 +863,9 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, abort_time = parsed->origin_timestamp; } + if (SNAPBUILD_XID_IGNORED(ctx->snapshot_builder, xid)) + return; + /* * Check whether we need to process this transaction. See * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 6e18baa33cb..a5bfa4ecdb5 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -306,6 +306,15 @@ SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr) return ptr < builder->start_decoding_at; } +/* + * Return the next phase at transaction ID during snapshot building. + */ +TransactionId +SnapBuildNextPhaseAt(SnapBuild *builder) +{ + return builder->next_phase_at; +} + /* * Increase refcount of a snapshot. * @@ -952,9 +961,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor * will they be part of a snapshot. So we don't need to record anything. */ - if (builder->state == SNAPBUILD_START || - (builder->state == SNAPBUILD_BUILDING_SNAPSHOT && - TransactionIdPrecedes(xid, builder->next_phase_at))) + if (SNAPBUILD_XID_IGNORED(builder, xid)) { /* ensure that only commits after this are getting replayed */ if (builder->start_decoding_at <= lsn) diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 44031dcf6e3..509063cf35e 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -61,6 +61,11 @@ struct ReorderBuffer; struct xl_heap_new_cid; struct xl_running_xacts; +#define SNAPBUILD_XID_IGNORED(builder, xid) \ + (SnapBuildCurrentState((builder)) == SNAPBUILD_START || \ + (SnapBuildCurrentState((builder)) == SNAPBUILD_BUILDING_SNAPSHOT && \ + TransactionIdPrecedes((xid), SnapBuildNextPhaseAt((builder))))) + extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder, @@ -84,6 +89,8 @@ extern bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr); extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder); extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr); +extern TransactionId SnapBuildNextPhaseAt(SnapBuild *builder); + extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts, uint32 xinfo);