diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out index 1d75cf5af0..a3ade3a279 100644 --- a/contrib/test_decoding/expected/catalog_change_snapshot.out +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -1,4 +1,4 @@ -Parsed test spec with 2 sessions +Parsed test spec with 3 sessions starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); @@ -87,3 +87,91 @@ COMMIT stop (1 row) + +starting permutation: s0_create_temp s0_init s0_begin s0_txid_current s1_checkpoint s0_savepoint s0_insert_temp s0_savepoint_release s0_savepoint s0_insert_temp s0_savepoint_release s0_savepoint s0_insert_temp s0_savepoint_release s0_create_part1 s1_get_changes s0_commit s1_get_changes +step s0_create_temp: CREATE TEMP TABLE temp_t (id int, data TEXT); +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_txid_current: SELECT 'xid' FROM txid_current(); +?column? +-------- +xid +(1 row) + +step s1_checkpoint: CHECKPOINT; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert_temp: INSERT INTO temp_t VALUES(1); +step s0_savepoint_release: RELEASE SAVEPOINT sp1; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert_temp: INSERT INTO temp_t VALUES(1); +step s0_savepoint_release: RELEASE SAVEPOINT sp1; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert_temp: INSERT INTO temp_t VALUES(1); +step s0_savepoint_release: RELEASE SAVEPOINT sp1; +step s0_create_part1: CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + + +starting permutation: s0_init s0_begin s0_savepoint s0_insert s2_init s1_checkpoint s1_get_changes s0_insert2 s0_commit s2_get_changes s2_stop +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s2_init: SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO user_cat VALUES (1); +step s0_commit: COMMIT; +step s2_init: <... completed> +?column? +-------- +init +(1 row) + +step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---------------------------------------------- +BEGIN +table public.user_cat: INSERT: val1[integer]:1 +COMMIT +(3 rows) + +step s2_stop: SELECT 'stop' FROM pg_drop_replication_slot('another_slot'); +?column? +-------- +stop +(1 row) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec index 2ad1edeaa8..7f0374409c 100644 --- a/contrib/test_decoding/specs/catalog_change_snapshot.spec +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -4,24 +4,31 @@ setup { DROP TABLE IF EXISTS tbl1; CREATE TABLE tbl1 (val1 integer, val2 integer); + CREATE TABLE tbl1_part (val1 integer) PARTITION BY RANGE (val1); CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); } teardown { DROP TABLE tbl1; + DROP TABLE tbl1_part; DROP TABLE user_cat; SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); } session "s0" setup { SET synchronous_commit=on; } +step "s0_create_temp" { CREATE TEMP TABLE temp_t (id int, data TEXT); } step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } step "s0_begin" { BEGIN; } +step "s0_txid_current" { SELECT 'xid' FROM txid_current(); } step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_savepoint_release" { RELEASE SAVEPOINT sp1; } step "s0_truncate" { TRUNCATE tbl1; } step "s0_insert" { INSERT INTO tbl1 VALUES (1); } step "s0_insert2" { INSERT INTO user_cat VALUES (1); } +step "s0_create_part1" { CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); } +step "s0_insert_temp" { INSERT INTO temp_t VALUES(1); } step "s0_commit" { COMMIT; } session "s1" @@ -29,6 +36,11 @@ setup { SET synchronous_commit=on; } step "s1_checkpoint" { CHECKPOINT; } step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +session "s2" +step "s2_init" { SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); } +step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s2_stop" { SELECT 'stop' FROM pg_drop_replication_slot('another_slot'); } + # For the transaction that TRUNCATEd the table tbl1, the last decoding decodes # only its COMMIT record, because it starts from the RUNNING_XACTS record emitted # during the first checkpoint execution. This transaction must be marked as @@ -53,3 +65,18 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s # transaction to do timetravel since one of its subtransactions has been marked as # containing catalog changes. permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" + +# DML operations on temporary table temp_t consume XIDs, but do not generate WAL records. +# The last decoding restarts from the first checkpoint and doesn't get any information of +# last two substransactions that performed s0_insert_temp. While processing the commit record +# for the corresponding top-level transaction which will be marked as containing catalog +# change even before commit, we ensure that the corresponding substransaction is added into +# ReorderBuffer as subxact. +permutation "s0_create_temp" "s0_init" "s0_begin" "s0_txid_current" "s1_checkpoint" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_create_part1" "s1_get_changes" "s0_commit" "s1_get_changes" + +# The last decoding restarts from the NEW_CID record in the subtransaction. +# While processing it, a same ReorderBufferChange entry would be associated +# with both the top and the sub transaction, as the first entry. This breaks +# an assumption in AssertTXNLsnOrder() which the first_lsn of entries must be +# strictly higher than previous. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s2_init" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s2_get_changes" "s2_stop" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index fb323a80ec..7e6f24c0d5 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -206,6 +206,9 @@ static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn); static void AssertTXNLsnOrder(ReorderBuffer *rb); +#ifdef USE_ASSERT_CHECKING +static bool ReorderBufferTXNIsEmpty(ReorderBufferTXN *txn); +#endif /* --------------------------------------- * support functions for lsn-order iterating over the ->changes of a @@ -736,6 +739,7 @@ AssertTXNLsnOrder(ReorderBuffer *rb) dlist_iter iter; XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; + bool prev_txn_is_empty; /* * Skip the verification if we don't reach the LSN at which we start @@ -762,14 +766,24 @@ AssertTXNLsnOrder(ReorderBuffer *rb) if (cur_txn->end_lsn != InvalidXLogRecPtr) Assert(cur_txn->first_lsn <= cur_txn->end_lsn); - /* Current initial LSN must be strictly higher than previous */ + /* + * Current initial LSN must be strictly higher than previous, except + * the previous one is empty. In such a case, the initial LSN of both + * entries must be the same. + */ if (prev_first_lsn != InvalidXLogRecPtr) - Assert(prev_first_lsn < cur_txn->first_lsn); + { + if (prev_first_lsn == cur_txn->first_lsn) + Assert(prev_txn_is_empty); + else + Assert(prev_first_lsn < cur_txn->first_lsn); + } /* known-as-subtxn txns must not be listed */ Assert(!rbtxn_is_known_subxact(cur_txn)); prev_first_lsn = cur_txn->first_lsn; + prev_txn_is_empty = ReorderBufferTXNIsEmpty(cur_txn); } dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn) @@ -794,6 +808,23 @@ AssertTXNLsnOrder(ReorderBuffer *rb) #endif } +#ifdef USE_ASSERT_CHECKING +/* + * ReorderBufferTXNIsEmpty + * + * Check the transaction is empty or not. + */ +static bool +ReorderBufferTXNIsEmpty(ReorderBufferTXN *txn) +{ + return txn->nentries == 0 && + txn->nentries_mem == 0 && + txn->ntuplecids == 0 && + txn->nsubtxns == 0 && + txn->ninvalidations == 0; +} +#endif + /* * ReorderBufferGetOldestTXN * Return oldest transaction in reorderbuffer diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 31caad8855..f32b46cb9a 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -843,6 +843,13 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, xlrec->cmin, xlrec->cmax, xlrec->combocid); + /* + * The decoding transaction is a sub-transaction if the recorded + * top-transaction is not the same as specified one. + */ + if (!TransactionIdEquals(xlrec->top_xid, xid)) + ReorderBufferAssignChild(builder->reorder, xlrec->top_xid, xid, lsn); + /* figure out new command id */ if (xlrec->cmin != InvalidCommandId && xlrec->cmax != InvalidCommandId)