v20231203-0004-global-hash-table.patch

text/x-patch

Filename: v20231203-0004-global-hash-table.patch
Type: text/x-patch
Part: 7
Message: Re: logical decoding and replication of sequences, take 2

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v20231203-0004
Subject: global hash table
File+
src/backend/replication/logical/reorderbuffer.c 68 109
src/include/replication/reorderbuffer.h 6 0
From e646d7f07f402475df57ac1ce946bad4d6606290 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Sat, 2 Dec 2023 10:28:26 +0100
Subject: [PATCH v20231203 4/4] global hash table

---
 .../replication/logical/reorderbuffer.c       | 177 +++++++-----------
 src/include/replication/reorderbuffer.h       |   6 +
 2 files changed, 74 insertions(+), 109 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index bb16ca4f7b9..f7d9a26cfcb 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -415,6 +415,14 @@ ReorderBufferAllocate(void)
 	dlist_init(&buffer->txns_by_base_snapshot_lsn);
 	dclist_init(&buffer->catchange_txns);
 
+	/* Create hash of sequences, mapping relfilelocator to transaction. */
+	hash_ctl.keysize = sizeof(RelFileLocator);
+	hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+	hash_ctl.hcxt = buffer->context;
+
+	buffer->sequences_hash = hash_create("ReorderBufferSequenceHash", 32, &hash_ctl,
+										 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
 	/*
 	 * Ensure there's no stale data from prior uses of this slot, in case some
 	 * prior exit avoided calling ReorderBufferFree. Failure to do this can
@@ -1015,45 +1023,27 @@ ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
 									 RelFileLocator rlocator,
 									 TransactionId *xid)
 {
+	ReorderBufferSequenceEnt *entry;
 	bool		found = false;
-	dlist_iter	iter;
 
 	AssertCheckSequences(rb);
 
+	entry = hash_search(rb->sequences_hash,
+						(void *) &rlocator,
+						HASH_FIND,
+						&found);
+
 	/*
-	 * Walk all top-level transactions (some of which may be subxacts, except
-	 * that we haven't processed the assignments yet), and check if any of
-	 * them created the relfilenode.
+	 * If we found an entry with matching relfilenode, we're done - we
+	 * have to treat the sequence change as transactional, and replay it
+	 * in the same (sub)transaction just like any other change.
+	 *
+	 * Optionally set XID of the (sub)xact that created the relfilenode.
 	 */
-	dlist_foreach(iter, &rb->toplevel_by_lsn)
+	if (found)
 	{
-		ReorderBufferSequenceEnt *entry;
-		ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node,
-												iter.cur);
-
-		/* transaction has no relfilenodes at all */
-		if (!txn->sequences_hash)
-			continue;
-
-		entry = hash_search(txn->sequences_hash,
-							(void *) &rlocator,
-							HASH_FIND,
-							&found);
-
-		/*
-		 * If we found an entry with matching relfilenode, we're done - we
-		 * have to treat the sequence change as transactional, and replay it
-		 * in the same (sub)transaction just like any other change.
-		 *
-		 * Optionally set XID of the (sub)xact that created the relfilenode.
-		 */
-		if (found)
-		{
-			if (xid)
-				*xid = entry->txn->xid;
-
-			break;
-		}
+		if (xid)
+			*xid = entry->txn->xid;
 	}
 
 	return found;
@@ -1068,27 +1058,22 @@ ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
  * the aborted one.
  */
 static void
-ReorderBufferSequenceCleanup(ReorderBufferTXN *txn)
+ReorderBufferSequenceCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	HASH_SEQ_STATUS scan_status;
 	ReorderBufferSequenceEnt *ent;
 
-	/* Bail out if not a subxact, or if there are no entries. */
-	if (!rbtxn_is_known_subxact(txn))
-		return;
-
+	/* Bail out if there are no sequence entries for this xact. */
 	if (!txn->sequences_hash)
 		return;
 
 	/*
-	 * Scan the top-level transaction hash and remove the entries from it. If
-	 * we have entries for subxact, the top-level hash must have been
-	 * initialized.
+	 * Scan the global transaction hash and remove the entries from it.
 	 */
 	hash_seq_init(&scan_status, txn->sequences_hash);
 	while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
 	{
-		if (hash_search(txn->toptxn->sequences_hash,
+		if (hash_search(rb->sequences_hash,
 						(void *) &ent->rlocator,
 						HASH_REMOVE, NULL) == NULL)
 			elog(ERROR, "hash table corrupted");
@@ -1286,28 +1271,23 @@ ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
 	/*
-	 * First add it to the top-level transaction, but make sure it links to
+	 * First add it to the global hash table, and make sure it links to
 	 * the correct subtransaction (so that we add later changes to it).
 	 */
-	if (txn->toptxn)
-	{
-		/* make sure the hash table is initialized */
-		ReorderBufferTXNSequencesInit(rb, txn->toptxn);
+	entry = hash_search(rb->sequences_hash,
+						(void *) &rlocator,
+						HASH_ENTER,
+						&found);
 
-		/* search the lookup table */
-		entry = hash_search(txn->toptxn->sequences_hash,
-							(void *) &rlocator,
-							HASH_ENTER,
-							&found);
+	/*
+	 * We've just decoded creation of the relfilenode, so if we found it
+	 * in the hash table, something is wrong.
+	 */
+	Assert(!found);
 
-		/*
-		 * We've just decoded creation of the relfilenode, so if we found it
-		 * in the hash table, something is wrong.
-		 */
-		Assert(!found);
+	entry->txn = txn;
 
-		entry->txn = txn;
-	}
+	/* now add it to the hash table in the transaction itself */
 
 	/* make sure the hash table is initialized */
 	ReorderBufferTXNSequencesInit(rb, txn);
@@ -1417,7 +1397,6 @@ AssertCheckSequences(ReorderBuffer *rb)
 {
 #ifdef USE_ASSERT_CHECKING
 	LogicalDecodingContext *ctx = rb->private_data;
-	dlist_iter	iter;
 
 	/*
 	 * Skip the verification if we don't reach the LSN at which we start
@@ -1433,57 +1412,53 @@ AssertCheckSequences(ReorderBuffer *rb)
 		return;
 
 	/*
-	 * Make sure the relfilenodes from subxacts are properly recorded in the
-	 * top-level transaction hash table.
+	 * Make sure the relfilenodes from all xacts are properly recorded in the
+	 * global hash table of sequences.
 	 */
-	dlist_foreach(iter, &rb->toplevel_by_lsn)
 	{
+		HASH_SEQ_STATUS hash_seq;
+		ReorderBufferTXNByIdEnt *ent;
 		int			nentries = 0,
 					nsubentries = 0;
-		dlist_iter	subiter;
-		ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node,
-												iter.cur);
 
 		/*
-		 * We don't skip top-level transactions without relfilenodes, because
-		 * there might be a subtransaction with some, and we want to detect
+		 * We do this check even if the global hash table is empty, because
+		 * there might be a transaction with some, and we want to detect
 		 * such cases too.
 		 */
-		if (txn->sequences_hash)
-			nentries = hash_get_num_entries(txn->sequences_hash);
+		nentries = hash_get_num_entries(rb->sequences_hash);
 
-		/* walk all subxacts, and check the hash table in each one */
-		dlist_foreach(subiter, &txn->subtxns)
+		/* walk all xacts, and check the hash table in each one */
+		hash_seq_init(&hash_seq, rb->by_txn);
+		while ((ent = hash_seq_search(&hash_seq)) != NULL)
 		{
-			HASH_SEQ_STATUS scan_status;
+			ReorderBufferTXN *txn = ent->txn;
 			ReorderBufferSequenceEnt *entry;
-
-			ReorderBufferTXN *subtxn = dlist_container(ReorderBufferTXN, node,
-													   subiter.cur);
+			HASH_SEQ_STATUS scan_status;
 
 			/*
 			 * If this subxact has no relfilenodes, skip it. We'll do the
-			 * check in the opposite direction (that all top-level
-			 * relfilenodes are in the correct subxact) later.
+			 * check in the opposite direction (that all relfilenodes are
+			 * also in hash for the correct xact) later.
 			 */
-			if (!subtxn->sequences_hash)
+			if (!txn->sequences_hash)
 				continue;
 
-			/* add number of relfilenodes in this subxact */
-			nsubentries += hash_get_num_entries(subtxn->sequences_hash);
+			/* account for relfilenodes in this xact */
+			nsubentries += hash_get_num_entries(txn->sequences_hash);
 
 			/*
 			 * Check that all subxact relfilenodes are in the top-level txn
 			 * too, and are pointing to this subtransaction.
 			 */
-			hash_seq_init(&scan_status, subtxn->sequences_hash);
+			hash_seq_init(&scan_status, txn->sequences_hash);
 			while ((entry = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
 			{
 				bool		found = false;
 				ReorderBufferSequenceEnt *subentry;
 
-				/* search for the same relfilenode in the top-level txn */
-				subentry = hash_search(txn->sequences_hash,
+				/* search for the same relfilenode in the global hash table */
+				subentry = hash_search(rb->sequences_hash,
 									   (void *) &entry->rlocator,
 									   HASH_FIND,
 									   &found);
@@ -1498,7 +1473,7 @@ AssertCheckSequences(ReorderBuffer *rb)
 				 * The entry has to point to the subxact - there's no subxact
 				 * "below" this one to which the relfilenode could belong.
 				 */
-				Assert(subentry->txn == subtxn);
+				Assert(subentry->txn == txn);
 			}
 		}
 
@@ -1508,32 +1483,25 @@ AssertCheckSequences(ReorderBuffer *rb)
 		 * directly, so this needs to be inequality.
 		 */
 		Assert(nentries >= nsubentries);
+	}
 
+	{
 		/*
 		 * Now do the check in the opposite direction - check that every entry
-		 * in the top-level txn (except those pointing to the top-level txn
-		 * itself) point to one of the subxacts, and there's an entry in the
-		 * subxact hash.
+		 * in the global hash table point to one of the running xacts, and
+		 * there's an entry in the transaction hash table.
 		 */
-		if (txn->sequences_hash)
+		if (rb->sequences_hash)
 		{
 			HASH_SEQ_STATUS scan_status;
 			ReorderBufferSequenceEnt *entry;
 
-			hash_seq_init(&scan_status, txn->sequences_hash);
+			hash_seq_init(&scan_status, rb->sequences_hash);
 			while ((entry = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
 			{
 				bool		found = false;
 				ReorderBufferSequenceEnt *subentry;
 
-				/* Skip entries for the top-level txn itself. */
-				if (entry->txn == txn)
-					continue;
-
-				/* Is it a subxact of this txn? */
-				Assert(rbtxn_is_known_subxact(entry->txn));
-				Assert(entry->txn->toptxn == txn);
-
 				/*
 				 * Search for the same relfilenode in the subxact (it should
 				 * be initialized, as we expect it to contain the
@@ -2181,6 +2149,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
 
+	/* remove the sequence relfilenode from the global hash table */
+	ReorderBufferSequenceCleanup(rb, txn);
+
 	/* deallocate */
 	ReorderBufferReturnTXN(rb, txn);
 }
@@ -3519,9 +3490,6 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 	ReorderBufferExecuteInvalidations(txn->ninvalidations,
 									  txn->invalidations);
 
-	/* cleanup: remove sequence relfilenodes from the top-level txn */
-	ReorderBufferSequenceCleanup(txn);
-
 	ReorderBufferCleanupTXN(rb, txn);
 }
 
@@ -3571,9 +3539,6 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	/* cosmetic... */
 	txn->final_lsn = lsn;
 
-	/* remove sequence relfilenodes from the top-level txn */
-	ReorderBufferSequenceCleanup(txn);
-
 	/* remove potential on-disk data, and deallocate */
 	ReorderBufferCleanupTXN(rb, txn);
 
@@ -3613,9 +3578,6 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
 			if (rbtxn_is_streamed(txn))
 				rb->stream_abort(rb, txn, InvalidXLogRecPtr);
 
-			/* remove sequence relfilenodes from the top-level txn */
-			ReorderBufferSequenceCleanup(txn);
-
 			/* remove potential on-disk data, and deallocate this tx */
 			ReorderBufferCleanupTXN(rb, txn);
 
@@ -3668,9 +3630,6 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	else
 		Assert(txn->ninvalidations == 0);
 
-	/* remove sequence relfilenodes from a top-level txn */
-	ReorderBufferSequenceCleanup(txn);
-
 	/* remove potential on-disk data, and deallocate */
 	ReorderBufferCleanupTXN(rb, txn);
 }
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 58a99b74060..ad3a56d8f0f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -704,6 +704,12 @@ struct ReorderBuffer
 	 */
 	int64		totalTxns;		/* total number of transactions sent */
 	int64		totalBytes;		/* total amount of data decoded */
+
+	/*
+	 * Sequence relfilenodes created in any transaction (also includes
+	 * altered sequences, which assigns new relfilenode).
+	 */
+	HTAB	   *sequences_hash;
 };
 
 
-- 
2.41.0