REL_14_fix_assertion_failure.diff

application/octet-stream

Filename: REL_14_fix_assertion_failure.diff
Type: application/octet-stream
Part: 2
Message: RE: Re:RE: Re:BUG #18369: logical decoding core on AssertTXNLsnOrder()

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: unified
File+
contrib/test_decoding/expected/catalog_change_snapshot.out 89 1
contrib/test_decoding/specs/catalog_change_snapshot.spec 23 0
src/backend/replication/logical/reorderbuffer.c 33 2
src/backend/replication/logical/snapbuild.c 7 0
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index 551dc2204a..de1c58b672 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
 starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
 step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
@@ -176,3 +176,91 @@ COMMIT
 stop    
 (1 row)
 
+
+starting permutation: s0_create_temp s0_init s0_begin s0_txid_current s1_checkpoint s0_savepoint s0_insert_temp s0_savepoint_release s0_savepoint s0_insert_temp s0_savepoint_release s0_savepoint s0_insert_temp s0_savepoint_release s0_create_part1 s1_get_changes s0_commit s1_get_changes
+step s0_create_temp: CREATE TEMP TABLE temp_t (id int, data TEXT);
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_txid_current: SELECT 'xid' FROM txid_current();
+?column?
+--------
+xid     
+(1 row)
+
+step s1_checkpoint: CHECKPOINT;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert_temp: INSERT INTO temp_t VALUES(1);
+step s0_savepoint_release: RELEASE SAVEPOINT sp1;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert_temp: INSERT INTO temp_t VALUES(1);
+step s0_savepoint_release: RELEASE SAVEPOINT sp1;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert_temp: INSERT INTO temp_t VALUES(1);
+step s0_savepoint_release: RELEASE SAVEPOINT sp1;
+step s0_create_part1: CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10);
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s0_init s0_begin s0_savepoint s0_insert s2_init s1_checkpoint s1_get_changes s0_insert2 s0_commit s2_get_changes s2_stop
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s2_init: SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); <waiting ...>
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_insert2: INSERT INTO user_cat VALUES (1);
+step s0_commit: COMMIT;
+step s2_init: <... completed>
+?column?
+--------
+init    
+(1 row)
+
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                          
+----------------------------------------------
+BEGIN                                         
+table public.user_cat: INSERT: val1[integer]:1
+COMMIT                                        
+(3 rows)
+
+step s2_stop: SELECT 'stop' FROM pg_drop_replication_slot('another_slot');
+?column?
+--------
+stop    
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index d8b9df97ed..d061925243 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -19,14 +19,17 @@ teardown
 
 session "s0"
 setup { SET synchronous_commit=on; }
+step "s0_create_temp" { CREATE TEMP TABLE temp_t (id int, data TEXT); }
 step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
 step "s0_begin" { BEGIN; }
+step "s0_txid_current" { SELECT 'xid' FROM txid_current(); }
 step "s0_savepoint" { SAVEPOINT sp1; }
 step "s0_savepoint_release" { RELEASE SAVEPOINT sp1; }
 step "s0_truncate" { TRUNCATE tbl1; }
 step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
 step "s0_insert2" { INSERT INTO user_cat VALUES (1); }
 step "s0_insert_part" { INSERT INTO tbl1_part VALUES (1); }
+step "s0_insert_temp" { INSERT INTO temp_t VALUES(1); }
 step "s0_create_part1" { CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); }
 step "s0_create_part2" { CREATE TABLE tbl1_part_p2 PARTITION OF tbl1_part FOR VALUES FROM (10) TO (20); }
 step "s0_commit" { COMMIT; }
@@ -36,6 +39,11 @@ setup { SET synchronous_commit=on; }
 step "s1_checkpoint" { CHECKPOINT; }
 step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
 
+session "s2"
+step "s2_init" { SELECT 'init' FROM pg_create_logical_replication_slot('another_slot', 'test_decoding'); }
+step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('another_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+step "s2_stop" { SELECT 'stop' FROM pg_drop_replication_slot('another_slot'); }
+
 # For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
 # only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
 # during the first checkpoint execution.  This transaction must be marked as
@@ -75,3 +83,18 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_
 # that the corresponding substransaction is also marked as containing a catalog
 # modifying change.
 permutation "s0_init" "s0_begin" "s0_savepoint" "s0_create_part1" "s0_savepoint_release" "s1_checkpoint" "s0_create_part2" "s0_commit" "s0_begin" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_insert_part" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# DML operations on temporary table temp_t consume XIDs, but do not generate WAL records.
+# The last decoding restarts from the first checkpoint and doesn't get any information of
+# last two substransactions that performed s0_insert_temp. While processing the commit record
+# for the corresponding top-level transaction which will be marked as containing catalog
+# change even before commit, we ensure that the corresponding substransaction is added into
+# ReorderBuffer as subxact.
+permutation "s0_create_temp" "s0_init" "s0_begin" "s0_txid_current" "s1_checkpoint" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_savepoint" "s0_insert_temp" "s0_savepoint_release" "s0_create_part1" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# The last decoding restarts from the NEW_CID record in the subtransaction.
+# While processing it, a same ReorderBufferChange entry would be associated
+# with both the top and the sub transaction, as the first entry. This breaks
+# an assumption in AssertTXNLsnOrder() which the first_lsn of entries must be
+# strictly higher than previous.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s2_init" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s2_get_changes" "s2_stop"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 264c253a87..de8730dad9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -222,6 +222,9 @@ static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
 											  ReorderBufferTXN *subtxn);
 
 static void AssertTXNLsnOrder(ReorderBuffer *rb);
+#ifdef USE_ASSERT_CHECKING
+static bool ReorderBufferTXNIsEmpty(ReorderBufferTXN *txn);
+#endif
 
 /* ---------------------------------------
  * support functions for lsn-order iterating over the ->changes of a
@@ -885,6 +888,7 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
 	dlist_iter	iter;
 	XLogRecPtr	prev_first_lsn = InvalidXLogRecPtr;
 	XLogRecPtr	prev_base_snap_lsn = InvalidXLogRecPtr;
+	bool		prev_txn_is_empty;
 
 	/*
 	 * Skip the verification if we don't reach the LSN at which we start
@@ -911,14 +915,24 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
 		if (cur_txn->end_lsn != InvalidXLogRecPtr)
 			Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
 
-		/* Current initial LSN must be strictly higher than previous */
+		/*
+		 * Current initial LSN must be strictly higher than previous, except
+		 * the previous one is empty. In such a case, the initial LSN of both
+		 * entries must be the same.
+		 */
 		if (prev_first_lsn != InvalidXLogRecPtr)
-			Assert(prev_first_lsn < cur_txn->first_lsn);
+		{
+			if (prev_first_lsn == cur_txn->first_lsn)
+				Assert(prev_txn_is_empty);
+			else
+				Assert(prev_first_lsn < cur_txn->first_lsn);
+		}
 
 		/* known-as-subtxn txns must not be listed */
 		Assert(!rbtxn_is_known_subxact(cur_txn));
 
 		prev_first_lsn = cur_txn->first_lsn;
+		prev_txn_is_empty = ReorderBufferTXNIsEmpty(cur_txn);
 	}
 
 	dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
@@ -943,6 +957,23 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
 #endif
 }
 
+#ifdef USE_ASSERT_CHECKING
+/*
+ * ReorderBufferTXNIsEmpty
+ *
+ * Check the transaction is empty or not.
+ */
+static bool
+ReorderBufferTXNIsEmpty(ReorderBufferTXN *txn)
+{
+	return txn->nentries == 0 &&
+		   txn->nentries_mem == 0 &&
+		   txn->ntuplecids == 0 &&
+		   txn->nsubtxns == 0 &&
+		   txn->ninvalidations == 0;
+}
+#endif
+
 /*
  * AssertChangeLsnOrder
  *
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 4f8c00f717..6eb4a80181 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -825,6 +825,13 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 								 xlrec->cmin, xlrec->cmax,
 								 xlrec->combocid);
 
+	/*
+	 * The decoding transaction is a sub-transaction if the recorded
+	 * top-transaction is not the same as specified one.
+	 */
+	if (!TransactionIdEquals(xlrec->top_xid, xid))
+		ReorderBufferAssignChild(builder->reorder, xlrec->top_xid, xid, lsn);
+
 	/* figure out new command id */
 	if (xlrec->cmin != InvalidCommandId &&
 		xlrec->cmax != InvalidCommandId)