REL_12-v3-0001-Make-the-decoded-transaction-as-subxact-while-dec.patch

application/octet-stream

Filename: REL_12-v3-0001-Make-the-decoded-transaction-as-subxact-while-dec.patch
Type: application/octet-stream
Part: 3
Message: RE: BUG #18369: logical decoding core on AssertTXNLsnOrder()

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v3-0001
Subject: Make the decoded transaction as subxact while decoding COMMIT record
File+
contrib/test_decoding/expected/catalog_change_snapshot.out 43 1
contrib/test_decoding/specs/catalog_change_snapshot.spec 18 0
src/backend/replication/logical/reorderbuffer.c 22 4
src/backend/replication/logical/snapbuild.c 1 1
src/include/replication/reorderbuffer.h 1 0
From 5cecabcdc9a5079aed4d69e72c9daeebdbea9af0 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 13 Mar 2024 02:25:30 +0000
Subject: [PATCH v3] Make the decoded transaction as subxact while decoding
 COMMIT record

Since ReorderBufferXidSetCatalogChanges() generates ReorderBufferTXNs as
top-level ones, it was possible that two transactions were decoded as top-level
by the same COMMIT record.

This commit adds ReorderBufferXidSetCatalogChangesEx(), which can control
whether to create a decoded transaction as a top. The function is used when the
COMMIT is decoded and the record has invalidation messages.

Author: Haiyang Li, Hayato Kuroda
Reported-by: Haiyang Li
Reviewed-by: Haiyang Li, Alexander Lakhin, Fei Changhong
Discussion: https://www.postgresql.org/message-id/18369-ad61699bf91c5bc0%40postgresql.org
Backpatch-through: 12
---
 .../expected/catalog_change_snapshot.out      | 44 ++++++++++++++++++-
 .../specs/catalog_change_snapshot.spec        | 18 ++++++++
 .../replication/logical/reorderbuffer.c       | 26 +++++++++--
 src/backend/replication/logical/snapbuild.c   |  2 +-
 src/include/replication/reorderbuffer.h       |  1 +
 5 files changed, 85 insertions(+), 6 deletions(-)

diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index 1d75cf5af0..6c3b8fc8b0 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
 starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
 step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
@@ -87,3 +87,45 @@ COMMIT
 stop    
 (1 row)
 
+
+starting permutation: s0_init s0_begin s0_savepoint s0_create_part1 s0_savepoint_release s2_init s1_checkpoint s1_get_changes s0_commit s2_get_changes s2_stop
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_create_part1: CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10);
+step s0_savepoint_release: RELEASE SAVEPOINT sp1;
+step s2_init: SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); <waiting ...>
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s2_init: <... completed>
+?column?
+--------
+init    
+(1 row)
+
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s2_stop: SELECT 'stop' FROM pg_drop_replication_slot('another_slot');
+?column?
+--------
+stop    
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index 2ad1edeaa8..a4642e26db 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -3,7 +3,9 @@
 setup
 {
     DROP TABLE IF EXISTS tbl1;
+    DROP TABLE IF EXISTS tbl1_part;
     CREATE TABLE tbl1 (val1 integer, val2 integer);
+    CREATE TABLE tbl1_part (val1 integer) PARTITION BY RANGE (val1);
     CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true);
 }
 
@@ -19,9 +21,11 @@ setup { SET synchronous_commit=on; }
 step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
 step "s0_begin" { BEGIN; }
 step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_savepoint_release" { RELEASE SAVEPOINT sp1; }
 step "s0_truncate" { TRUNCATE tbl1; }
 step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
 step "s0_insert2" { INSERT INTO user_cat VALUES (1); }
+step "s0_create_part1" { CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); }
 step "s0_commit" { COMMIT; }
 
 session "s1"
@@ -29,6 +33,12 @@ setup { SET synchronous_commit=on; }
 step "s1_checkpoint" { CHECKPOINT; }
 step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
 
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_init" { SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); }
+step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+step "s2_stop" { SELECT 'stop' FROM pg_drop_replication_slot('another_slot'); }
+
 # For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
 # only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
 # during the first checkpoint execution.  This transaction must be marked as
@@ -53,3 +63,11 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s
 # transaction to do timetravel since one of its subtransactions has been marked as
 # containing catalog changes.
 permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# The last decoding restarts from the slot creation by session 2 and doesn't
+# decode any WAL records generated by the subtransaction that performed
+# s0_create_part1. While processing the commit record for the corresponding
+# top-level transaction which will be marked as containing catalog change, we
+# ensure that the corresponding substransaction is added into ReorderBuffer as
+# subxact.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_create_part1" "s0_savepoint_release" "s2_init" "s1_checkpoint" "s1_get_changes" "s0_commit" "s2_get_changes" "s2_stop"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index d8345a75b6..87c5be9730 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -828,8 +828,14 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
 			 * We already saw this transaction, but initially added it to the
 			 * list of top-level txns.  Now that we know it's not top-level,
 			 * remove it from there.
+			 *
+			 * Note: There is a possibility that the transaction was created
+			 * as not the top-level txn, but the flag was not set. In this
+			 * case, the transaction was not added to the list. This happens if
+			 * sub-txns are first recognized and decoded by a COMMIT record.
 			 */
-			dlist_delete(&subtxn->node);
+			if (subtxn->node.next != NULL)
+				dlist_delete(&subtxn->node);
 		}
 	}
 
@@ -1286,14 +1292,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	}
 
 	/*
-	 * Remove TXN from its containing list.
+	 * Remove TXN from its containing list if registered.
 	 *
 	 * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
 	 * parent's list of known subxacts; this leaves the parent's nsubxacts
 	 * count too high, but we don't care.  Otherwise, we are deleting the TXN
 	 * from the LSN-ordered list of toplevel TXNs.
 	 */
-	dlist_delete(&txn->node);
+	if (txn->node.next != NULL)
+		dlist_delete(&txn->node);
 
 	/* now remove reference from buffer */
 	hash_search(rb->by_txn,
@@ -2203,10 +2210,21 @@ ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
 void
 ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
 								  XLogRecPtr lsn)
+{
+	ReorderBufferXidSetCatalogChangesEx(rb, xid, lsn, true);
+}
+
+/*
+ * Mark a transaction as containing catalog changes. Moreover, this can control
+ * whether the ReorderBufferTXN is created as top transaction or not.
+ */
+void
+ReorderBufferXidSetCatalogChangesEx(ReorderBuffer *rb, TransactionId xid,
+									XLogRecPtr lsn, bool is_top)
 {
 	ReorderBufferTXN *txn;
 
-	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, is_top);
 
 	txn->has_catalog_changes = true;
 }
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index bdfd1f0228..ee5d99dbbf 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2170,6 +2170,6 @@ SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt
 				sizeof(TransactionId), xidComparator) != NULL)
 	{
 		for (int i = 0; i < subxcnt; i++)
-			ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn);
+			ReorderBufferXidSetCatalogChangesEx(builder->reorder, subxacts[i], lsn, false);
 	}
 }
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bc97b08a90..c652bf8fdc 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -429,6 +429,7 @@ void		ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
 											   SharedInvalidationMessage *invalidations);
 void		ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
 void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
+void		ReorderBufferXidSetCatalogChangesEx(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn, bool is_top);
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
-- 
2.43.0