HEAD_fix_assertion_failure.diff
application/octet-stream
Filename: HEAD_fix_assertion_failure.diff
Type: application/octet-stream
Part: 5
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: unified
| File | + | − |
|---|---|---|
| contrib/test_decoding/expected/catalog_change_snapshot.out | 45 | 0 |
| contrib/test_decoding/specs/catalog_change_snapshot.spec | 10 | 0 |
| src/backend/replication/logical/reorderbuffer.c | 33 | 2 |
| src/backend/replication/logical/snapbuild.c | 7 | 0 |
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index 8722787e6e..d39c59e6e0 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -141,3 +141,48 @@ COMMIT
stop
(1 row)
+
+starting permutation: s0_init s0_begin s0_savepoint s0_insert s2_init s1_checkpoint s1_get_changes s0_insert2 s0_commit s2_get_changes s2_stop
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s2_init: SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); <waiting ...>
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_insert2: INSERT INTO user_cat VALUES (1);
+step s0_commit: COMMIT;
+step s2_init: <... completed>
+?column?
+--------
+init
+(1 row)
+
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----------------------------------------------
+BEGIN
+table public.user_cat: INSERT: val1[integer]:1
+COMMIT
+(3 rows)
+
+step s2_stop: SELECT 'stop' FROM pg_drop_replication_slot('another_slot');
+?column?
+--------
+stop
+(1 row)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index 673bccf4b0..c3b011572b 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -37,6 +37,9 @@ setup { SET synchronous_commit=on; }
step "s2_begin" { BEGIN; }
step "s2_truncate" { TRUNCATE tbl2; }
step "s2_commit" { COMMIT; }
+step "s2_init" { SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); }
+step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+step "s2_stop" { SELECT 'stop' FROM pg_drop_replication_slot('another_slot'); }
# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
@@ -73,3 +76,10 @@ permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_chec
# transaction to do timetravel since one of its subtransactions has been marked as
# containing catalog changes.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# The last decoding restarts from the NEW_CID record in the subtransaction.
+# While processing it, a same ReorderBufferChange entry would be associated
+# with both the top and the sub transaction, as the first entry. This breaks
+# an assumption in AssertTXNLsnOrder() which the first_lsn of entries must be
+# strictly higher than previous.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s2_init" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s2_get_changes" "s2_stop"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 001f901ee6..63031fdecb 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -224,6 +224,9 @@ static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
ReorderBufferTXN *subtxn);
static void AssertTXNLsnOrder(ReorderBuffer *rb);
+#ifdef USE_ASSERT_CHECKING
+static bool ReorderBufferTXNIsEmpty(ReorderBufferTXN *txn);
+#endif
/* ---------------------------------------
* support functions for lsn-order iterating over the ->changes of a
@@ -907,6 +910,7 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
dlist_iter iter;
XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
+ bool prev_txn_is_empty;
/*
* Skip the verification if we don't reach the LSN at which we start
@@ -933,14 +937,24 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
if (cur_txn->end_lsn != InvalidXLogRecPtr)
Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
- /* Current initial LSN must be strictly higher than previous */
+ /*
+ * Current initial LSN must be strictly higher than previous, except
+ * the previous one is empty. In such a case, the initial LSN of both
+ * entries must be the same.
+ */
if (prev_first_lsn != InvalidXLogRecPtr)
- Assert(prev_first_lsn < cur_txn->first_lsn);
+ {
+ if (prev_first_lsn == cur_txn->first_lsn)
+ Assert(prev_txn_is_empty);
+ else
+ Assert(prev_first_lsn < cur_txn->first_lsn);
+ }
/* known-as-subtxn txns must not be listed */
Assert(!rbtxn_is_known_subxact(cur_txn));
prev_first_lsn = cur_txn->first_lsn;
+ prev_txn_is_empty = ReorderBufferTXNIsEmpty(cur_txn);
}
dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
@@ -965,6 +979,23 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
#endif
}
+#ifdef USE_ASSERT_CHECKING
+/*
+ * ReorderBufferTXNIsEmpty
+ *
+ * Check the transaction is empty or not.
+ */
+static bool
+ReorderBufferTXNIsEmpty(ReorderBufferTXN *txn)
+{
+ return txn->nentries == 0 &&
+ txn->nentries_mem == 0 &&
+ txn->ntuplecids == 0 &&
+ txn->nsubtxns == 0 &&
+ txn->ninvalidations == 0;
+}
+#endif
+
/*
* AssertChangeLsnOrder
*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index ac24b51860..c70d518b83 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -831,6 +831,13 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
xlrec->cmin, xlrec->cmax,
xlrec->combocid);
+ /*
+ * The decoding transaction is a sub-transaction if the recorded
+ * top-transaction is not the same as specified one.
+ */
+ if (!TransactionIdEquals(xlrec->top_xid, xid))
+ ReorderBufferAssignChild(builder->reorder, xlrec->top_xid, xid, lsn);
+
/* figure out new command id */
if (xlrec->cmin != InvalidCommandId &&
xlrec->cmax != InvalidCommandId)