diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out index 551dc2204a..3d7d649af5 100644 --- a/contrib/test_decoding/expected/catalog_change_snapshot.out +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -176,3 +176,46 @@ COMMIT stop (1 row) + +starting permutation: s0_create_temp s0_init s0_begin s0_txid_current s1_checkpoint s0_savepoint s0_insert_temp s0_savepoint_release s0_savepoint s0_insert_temp s0_savepoint_release s0_savepoint s0_insert_temp s0_savepoint_release s0_create_part1 s1_get_changes s0_commit s1_get_changes +step s0_create_temp: CREATE TEMP TABLE temp_t (id int, data TEXT); +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_txid_current: SELECT 'xid' FROM txid_current(); +?column? +-------- +xid +(1 row) + +step s1_checkpoint: CHECKPOINT; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert_temp: INSERT INTO temp_t VALUES(1); +step s0_savepoint_release: RELEASE SAVEPOINT sp1; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert_temp: INSERT INTO temp_t VALUES(1); +step s0_savepoint_release: RELEASE SAVEPOINT sp1; +step s0_savepoint: SAVEPOINT sp1; +step s0_insert_temp: INSERT INTO temp_t VALUES(1); +step s0_savepoint_release: RELEASE SAVEPOINT sp1; +step s0_create_part1: CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec index d8b9df97ed..d8a453479d 100644 --- a/contrib/test_decoding/specs/catalog_change_snapshot.spec +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -19,14 +19,17 @@ teardown session "s0" setup { SET synchronous_commit=on; } +step "s0_create_temp" { CREATE TEMP TABLE temp_t (id int, data TEXT); } step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } step "s0_begin" { BEGIN; } +step "s0_txid_current" { SELECT 'xid' FROM txid_current(); } step "s0_savepoint" { SAVEPOINT sp1; } step "s0_savepoint_release" { RELEASE SAVEPOINT sp1; } step "s0_truncate" { TRUNCATE tbl1; } step "s0_insert" { INSERT INTO tbl1 VALUES (1); } step "s0_insert2" { INSERT INTO user_cat VALUES (1); } step "s0_insert_part" { INSERT INTO tbl1_part VALUES (1); } +step "s0_insert_temp" { INSERT INTO temp_t VALUES(1); } step "s0_create_part1" { CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); } step "s0_create_part2" { CREATE TABLE tbl1_part_p2 PARTITION OF tbl1_part FOR VALUES FROM (10) TO (20); } step "s0_commit" { COMMIT; } @@ -75,3 +78,11 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_ # that the corresponding substransaction is also marked as containing a catalog # modifying change. permutation "s0_init" "s0_begin" "s0_savepoint" "s0_create_part1" "s0_savepoint_release" "s1_checkpoint" "s0_create_part2" "s0_commit" "s0_begin" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_insert_part" "s1_get_changes" "s0_commit" "s1_get_changes" + +# DML operations on temporary table temp_t consume XIDs, but do not generate WAL records. +# The last decoding restarts from the first checkpoint and doesn't get any information of +# last two substransactions that performed s0_insert_temp. While processing the commit record +# for the corresponding top-level transaction which will be marked as containing catalog +# change even before commit, we ensure that the corresponding substransaction is added into +# ReorderBuffer as subxact. +permutation "s0_create_temp" "s0_init" "s0_begin" "s0_txid_current" "s1_checkpoint" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_create_part1" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 3a68a393d2..6af585b183 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1067,7 +1067,8 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, * list of top-level txns. Now that we know it's not top-level, * remove it from there. */ - dlist_delete(&subtxn->node); + if (subtxn->node.next != NULL) + dlist_delete(&subtxn->node); } } @@ -1557,7 +1558,8 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * count too high, but we don't care. Otherwise, we are deleting the TXN * from the LSN-ordered list of toplevel TXNs. */ - dlist_delete(&txn->node); + if (txn->node.next != NULL) + dlist_delete(&txn->node); /* now remove reference from buffer */ hash_search(rb->by_txn, @@ -3322,6 +3324,26 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; } +void +ReorderBufferXidSetCatalogChangesEx(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, bool is_top) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, is_top); + + txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + + /* + * Mark top-level transaction as having catalog changes too if one of its + * children has so that the ReorderBufferBuildTupleCidHash can + * conveniently check just top-level transaction and decide whether to + * build the hash table or not. + */ + if (txn->toptxn != NULL) + txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; +} + /* * Query whether a transaction is already *known* to contain catalog * changes. This can be wrong until directly before the commit! diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 39819947c1..08bfb7c7eb 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -2141,6 +2141,6 @@ SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); for (int i = 0; i < subxcnt; i++) - ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn); + ReorderBufferXidSetCatalogChangesEx(builder->reorder, subxacts[i], lsn, false); } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4a01f877e5..a672510b85 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -666,6 +666,7 @@ extern void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalida extern void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); extern void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); +extern void ReorderBufferXidSetCatalogChangesEx(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn, bool is_top); extern bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); extern bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);