REL_14-v4-0001-Make-the-decoded-transaction-as-subxact-while-dec.patch
application/octet-stream
Filename: REL_14-v4-0001-Make-the-decoded-transaction-as-subxact-while-dec.patch
Type: application/octet-stream
Part: 1
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v4-0001
Subject: Make the decoded transaction as subxact while decoding COMMIT record
| File | + | − |
|---|---|---|
| contrib/test_decoding/expected/catalog_change_snapshot.out | 43 | 0 |
| contrib/test_decoding/specs/catalog_change_snapshot.spec | 12 | 0 |
| src/backend/replication/logical/reorderbuffer.c | 22 | 4 |
| src/backend/replication/logical/snapbuild.c | 1 | 1 |
| src/include/replication/reorderbuffer.h | 1 | 0 |
From 241bd175fcc92a78566ddbddfa1e396e1d20883b Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 13 Mar 2024 02:25:30 +0000
Subject: [PATCH v4] Make the decoded transaction as subxact while decoding
COMMIT record
Since ReorderBufferXidSetCatalogChanges() generates ReorderBufferTXNs as
top-level ones, it was possible that two transactions were decoded as top-level
by the same COMMIT record.
This commit adds ReorderBufferXidSetCatalogChangesEx(), which can control
whether to create a decoded transaction as a top. The function is used when the
COMMIT is decoded and the record is marked as XACT_XINFO_HAS_INVALS.
Author: Haiyang Li, Hayato Kuroda
Reported-by: Haiyang Li
Reviewed-by: Haiyang Li, Alexander Lakhin, Fei Changhong
Discussion: https://www.postgresql.org/message-id/18369-ad61699bf91c5bc0%40postgresql.org
Backpatch-through: 12
---
.../expected/catalog_change_snapshot.out | 43 +++++++++++++++++++
.../specs/catalog_change_snapshot.spec | 12 ++++++
.../replication/logical/reorderbuffer.c | 26 +++++++++--
src/backend/replication/logical/snapbuild.c | 2 +-
src/include/replication/reorderbuffer.h | 1 +
5 files changed, 79 insertions(+), 5 deletions(-)
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index 551dc2204a..3d7d649af5 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -176,3 +176,46 @@ COMMIT
stop
(1 row)
+
+starting permutation: s0_create_temp s0_init s0_begin s0_txid_current s1_checkpoint s0_savepoint s0_insert_temp s0_savepoint_release s0_savepoint s0_insert_temp s0_savepoint_release s0_savepoint s0_insert_temp s0_savepoint_release s0_create_part1 s1_get_changes s0_commit s1_get_changes
+step s0_create_temp: CREATE TEMP TABLE temp_t (id int, data TEXT);
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_txid_current: SELECT 'xid' FROM txid_current();
+?column?
+--------
+xid
+(1 row)
+
+step s1_checkpoint: CHECKPOINT;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert_temp: INSERT INTO temp_t VALUES(1);
+step s0_savepoint_release: RELEASE SAVEPOINT sp1;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert_temp: INSERT INTO temp_t VALUES(1);
+step s0_savepoint_release: RELEASE SAVEPOINT sp1;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert_temp: INSERT INTO temp_t VALUES(1);
+step s0_savepoint_release: RELEASE SAVEPOINT sp1;
+step s0_create_part1: CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10);
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index d8b9df97ed..227c7ca558 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -21,12 +21,15 @@ session "s0"
setup { SET synchronous_commit=on; }
step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
step "s0_begin" { BEGIN; }
+step "s0_txid_current" { SELECT 'xid' FROM txid_current(); }
step "s0_savepoint" { SAVEPOINT sp1; }
step "s0_savepoint_release" { RELEASE SAVEPOINT sp1; }
step "s0_truncate" { TRUNCATE tbl1; }
step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
step "s0_insert2" { INSERT INTO user_cat VALUES (1); }
step "s0_insert_part" { INSERT INTO tbl1_part VALUES (1); }
+step "s0_create_temp" { CREATE TEMP TABLE temp_t (id int, data TEXT); }
+step "s0_insert_temp" { INSERT INTO temp_t VALUES(1); }
step "s0_create_part1" { CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); }
step "s0_create_part2" { CREATE TABLE tbl1_part_p2 PARTITION OF tbl1_part FOR VALUES FROM (10) TO (20); }
step "s0_commit" { COMMIT; }
@@ -75,3 +78,12 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_
# that the corresponding substransaction is also marked as containing a catalog
# modifying change.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_create_part1" "s0_savepoint_release" "s1_checkpoint" "s0_create_part2" "s0_commit" "s0_begin" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_insert_part" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# DML operations on temporary table temp_t consume XIDs, but do not generate
+# WAL records. The last decoding restarts from the first checkpoint and doesn't
+# get any information of last two substransactions that performed s0_insert_temp.
+# While processing the commit record for the corresponding top-level
+# transaction which will be marked as containing catalog change even before
+# commit, we ensure that the corresponding substransaction is added into
+# ReorderBuffer as subxact.
+permutation "s0_create_temp" "s0_init" "s0_begin" "s0_txid_current" "s1_checkpoint" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_create_part1" "s1_get_changes" "s0_commit" "s1_get_changes"
\ No newline at end of file
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 264c253a87..d541b31cc2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1059,8 +1059,14 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
* We already saw this transaction, but initially added it to the
* list of top-level txns. Now that we know it's not top-level,
* remove it from there.
+ *
+ * Note: There is a possibility that the transaction was created
+ * as not the top-level txn, but the flag was not set. In this
+ * case, the transaction was not added to the list. This happens if
+ * sub-txns are first recognized and decoded by a COMMIT record.
*/
- dlist_delete(&subtxn->node);
+ if (subtxn->node.next != NULL)
+ dlist_delete(&subtxn->node);
}
}
@@ -1543,14 +1549,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Remove TXN from its containing list.
+ * Remove TXN from its containing list if registered.
*
* Note: if txn is known as subxact, we are deleting the TXN from its
* parent's list of known subxacts; this leaves the parent's nsubxacts
* count too high, but we don't care. Otherwise, we are deleting the TXN
* from the LSN-ordered list of toplevel TXNs.
*/
- dlist_delete(&txn->node);
+ if (txn->node.next != NULL)
+ dlist_delete(&txn->node);
/* now remove reference from buffer */
hash_search(rb->by_txn,
@@ -3298,10 +3305,21 @@ ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
void
ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn)
+{
+ ReorderBufferXidSetCatalogChangesEx(rb, xid, lsn, true);
+}
+
+/*
+ * Mark a transaction as containing catalog changes. Moreover, this can control
+ * whether the ReorderBufferTXN is created as top transaction or not.
+ */
+void
+ReorderBufferXidSetCatalogChangesEx(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr lsn, bool is_top)
{
ReorderBufferTXN *txn;
- txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, is_top);
txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 4f8c00f717..5ce1e034f7 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2134,6 +2134,6 @@ SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt
ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
for (int i = 0; i < subxcnt; i++)
- ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn);
+ ReorderBufferXidSetCatalogChangesEx(builder->reorder, subxacts[i], lsn, false);
}
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index ba257d81b5..53c24086d0 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -662,6 +662,7 @@ void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
+void ReorderBufferXidSetCatalogChangesEx(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn, bool is_top);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
--
2.43.0