v20231203-0004-global-hash-table.patch
text/x-patch
Filename: v20231203-0004-global-hash-table.patch
Type: text/x-patch
Part: 7
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v20231203-0004
Subject: global hash table
| File | + | − |
|---|---|---|
| src/backend/replication/logical/reorderbuffer.c | 68 | 109 |
| src/include/replication/reorderbuffer.h | 6 | 0 |
From e646d7f07f402475df57ac1ce946bad4d6606290 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Sat, 2 Dec 2023 10:28:26 +0100
Subject: [PATCH v20231203 4/4] global hash table
---
.../replication/logical/reorderbuffer.c | 177 +++++++-----------
src/include/replication/reorderbuffer.h | 6 +
2 files changed, 74 insertions(+), 109 deletions(-)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index bb16ca4f7b9..f7d9a26cfcb 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -415,6 +415,14 @@ ReorderBufferAllocate(void)
dlist_init(&buffer->txns_by_base_snapshot_lsn);
dclist_init(&buffer->catchange_txns);
+ /* Create hash of sequences, mapping relfilelocator to transaction. */
+ hash_ctl.keysize = sizeof(RelFileLocator);
+ hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+ hash_ctl.hcxt = buffer->context;
+
+ buffer->sequences_hash = hash_create("ReorderBufferSequenceHash", 32, &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
/*
* Ensure there's no stale data from prior uses of this slot, in case some
* prior exit avoided calling ReorderBufferFree. Failure to do this can
@@ -1015,45 +1023,27 @@ ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
RelFileLocator rlocator,
TransactionId *xid)
{
+ ReorderBufferSequenceEnt *entry;
bool found = false;
- dlist_iter iter;
AssertCheckSequences(rb);
+ entry = hash_search(rb->sequences_hash,
+ (void *) &rlocator,
+ HASH_FIND,
+ &found);
+
/*
- * Walk all top-level transactions (some of which may be subxacts, except
- * that we haven't processed the assignments yet), and check if any of
- * them created the relfilenode.
+ * If we found an entry with matching relfilenode, we're done - we
+ * have to treat the sequence change as transactional, and replay it
+ * in the same (sub)transaction just like any other change.
+ *
+ * Optionally set XID of the (sub)xact that created the relfilenode.
*/
- dlist_foreach(iter, &rb->toplevel_by_lsn)
+ if (found)
{
- ReorderBufferSequenceEnt *entry;
- ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node,
- iter.cur);
-
- /* transaction has no relfilenodes at all */
- if (!txn->sequences_hash)
- continue;
-
- entry = hash_search(txn->sequences_hash,
- (void *) &rlocator,
- HASH_FIND,
- &found);
-
- /*
- * If we found an entry with matching relfilenode, we're done - we
- * have to treat the sequence change as transactional, and replay it
- * in the same (sub)transaction just like any other change.
- *
- * Optionally set XID of the (sub)xact that created the relfilenode.
- */
- if (found)
- {
- if (xid)
- *xid = entry->txn->xid;
-
- break;
- }
+ if (xid)
+ *xid = entry->txn->xid;
}
return found;
@@ -1068,27 +1058,22 @@ ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
* the aborted one.
*/
static void
-ReorderBufferSequenceCleanup(ReorderBufferTXN *txn)
+ReorderBufferSequenceCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
HASH_SEQ_STATUS scan_status;
ReorderBufferSequenceEnt *ent;
- /* Bail out if not a subxact, or if there are no entries. */
- if (!rbtxn_is_known_subxact(txn))
- return;
-
+ /* Bail out if there are no sequence entries for this xact. */
if (!txn->sequences_hash)
return;
/*
- * Scan the top-level transaction hash and remove the entries from it. If
- * we have entries for subxact, the top-level hash must have been
- * initialized.
+ * Scan the global transaction hash and remove the entries from it.
*/
hash_seq_init(&scan_status, txn->sequences_hash);
while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
{
- if (hash_search(txn->toptxn->sequences_hash,
+ if (hash_search(rb->sequences_hash,
(void *) &ent->rlocator,
HASH_REMOVE, NULL) == NULL)
elog(ERROR, "hash table corrupted");
@@ -1286,28 +1271,23 @@ ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
/*
- * First add it to the top-level transaction, but make sure it links to
+ * First add it to the global hash table, and make sure it links to
* the correct subtransaction (so that we add later changes to it).
*/
- if (txn->toptxn)
- {
- /* make sure the hash table is initialized */
- ReorderBufferTXNSequencesInit(rb, txn->toptxn);
+ entry = hash_search(rb->sequences_hash,
+ (void *) &rlocator,
+ HASH_ENTER,
+ &found);
- /* search the lookup table */
- entry = hash_search(txn->toptxn->sequences_hash,
- (void *) &rlocator,
- HASH_ENTER,
- &found);
+ /*
+ * We've just decoded creation of the relfilenode, so if we found it
+ * in the hash table, something is wrong.
+ */
+ Assert(!found);
- /*
- * We've just decoded creation of the relfilenode, so if we found it
- * in the hash table, something is wrong.
- */
- Assert(!found);
+ entry->txn = txn;
- entry->txn = txn;
- }
+ /* now add it to the hash table in the transaction itself */
/* make sure the hash table is initialized */
ReorderBufferTXNSequencesInit(rb, txn);
@@ -1417,7 +1397,6 @@ AssertCheckSequences(ReorderBuffer *rb)
{
#ifdef USE_ASSERT_CHECKING
LogicalDecodingContext *ctx = rb->private_data;
- dlist_iter iter;
/*
* Skip the verification if we don't reach the LSN at which we start
@@ -1433,57 +1412,53 @@ AssertCheckSequences(ReorderBuffer *rb)
return;
/*
- * Make sure the relfilenodes from subxacts are properly recorded in the
- * top-level transaction hash table.
+ * Make sure the relfilenodes from all xacts are properly recorded in the
+ * global hash table of sequences.
*/
- dlist_foreach(iter, &rb->toplevel_by_lsn)
{
+ HASH_SEQ_STATUS hash_seq;
+ ReorderBufferTXNByIdEnt *ent;
int nentries = 0,
nsubentries = 0;
- dlist_iter subiter;
- ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node,
- iter.cur);
/*
- * We don't skip top-level transactions without relfilenodes, because
- * there might be a subtransaction with some, and we want to detect
+ * We do this check even if the global hash table is empty, because
+ * there might be a transaction with some, and we want to detect
* such cases too.
*/
- if (txn->sequences_hash)
- nentries = hash_get_num_entries(txn->sequences_hash);
+ nentries = hash_get_num_entries(rb->sequences_hash);
- /* walk all subxacts, and check the hash table in each one */
- dlist_foreach(subiter, &txn->subtxns)
+ /* walk all xacts, and check the hash table in each one */
+ hash_seq_init(&hash_seq, rb->by_txn);
+ while ((ent = hash_seq_search(&hash_seq)) != NULL)
{
- HASH_SEQ_STATUS scan_status;
+ ReorderBufferTXN *txn = ent->txn;
ReorderBufferSequenceEnt *entry;
-
- ReorderBufferTXN *subtxn = dlist_container(ReorderBufferTXN, node,
- subiter.cur);
+ HASH_SEQ_STATUS scan_status;
/*
* If this subxact has no relfilenodes, skip it. We'll do the
- * check in the opposite direction (that all top-level
- * relfilenodes are in the correct subxact) later.
+ * check in the opposite direction (that all relfilenodes are
+ * also in hash for the correct xact) later.
*/
- if (!subtxn->sequences_hash)
+ if (!txn->sequences_hash)
continue;
- /* add number of relfilenodes in this subxact */
- nsubentries += hash_get_num_entries(subtxn->sequences_hash);
+ /* account for relfilenodes in this xact */
+ nsubentries += hash_get_num_entries(txn->sequences_hash);
/*
* Check that all subxact relfilenodes are in the top-level txn
* too, and are pointing to this subtransaction.
*/
- hash_seq_init(&scan_status, subtxn->sequences_hash);
+ hash_seq_init(&scan_status, txn->sequences_hash);
while ((entry = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
{
bool found = false;
ReorderBufferSequenceEnt *subentry;
- /* search for the same relfilenode in the top-level txn */
- subentry = hash_search(txn->sequences_hash,
+ /* search for the same relfilenode in the global hash table */
+ subentry = hash_search(rb->sequences_hash,
(void *) &entry->rlocator,
HASH_FIND,
&found);
@@ -1498,7 +1473,7 @@ AssertCheckSequences(ReorderBuffer *rb)
* The entry has to point to the subxact - there's no subxact
* "below" this one to which the relfilenode could belong.
*/
- Assert(subentry->txn == subtxn);
+ Assert(subentry->txn == txn);
}
}
@@ -1508,32 +1483,25 @@ AssertCheckSequences(ReorderBuffer *rb)
* directly, so this needs to be inequality.
*/
Assert(nentries >= nsubentries);
+ }
+ {
/*
* Now do the check in the opposite direction - check that every entry
- * in the top-level txn (except those pointing to the top-level txn
- * itself) point to one of the subxacts, and there's an entry in the
- * subxact hash.
+ * in the global hash table point to one of the running xacts, and
+ * there's an entry in the transaction hash table.
*/
- if (txn->sequences_hash)
+ if (rb->sequences_hash)
{
HASH_SEQ_STATUS scan_status;
ReorderBufferSequenceEnt *entry;
- hash_seq_init(&scan_status, txn->sequences_hash);
+ hash_seq_init(&scan_status, rb->sequences_hash);
while ((entry = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
{
bool found = false;
ReorderBufferSequenceEnt *subentry;
- /* Skip entries for the top-level txn itself. */
- if (entry->txn == txn)
- continue;
-
- /* Is it a subxact of this txn? */
- Assert(rbtxn_is_known_subxact(entry->txn));
- Assert(entry->txn->toptxn == txn);
-
/*
* Search for the same relfilenode in the subxact (it should
* be initialized, as we expect it to contain the
@@ -2181,6 +2149,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
+ /* remove the sequence relfilenode from the global hash table */
+ ReorderBufferSequenceCleanup(rb, txn);
+
/* deallocate */
ReorderBufferReturnTXN(rb, txn);
}
@@ -3519,9 +3490,6 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
ReorderBufferExecuteInvalidations(txn->ninvalidations,
txn->invalidations);
- /* cleanup: remove sequence relfilenodes from the top-level txn */
- ReorderBufferSequenceCleanup(txn);
-
ReorderBufferCleanupTXN(rb, txn);
}
@@ -3571,9 +3539,6 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
/* cosmetic... */
txn->final_lsn = lsn;
- /* remove sequence relfilenodes from the top-level txn */
- ReorderBufferSequenceCleanup(txn);
-
/* remove potential on-disk data, and deallocate */
ReorderBufferCleanupTXN(rb, txn);
@@ -3613,9 +3578,6 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
if (rbtxn_is_streamed(txn))
rb->stream_abort(rb, txn, InvalidXLogRecPtr);
- /* remove sequence relfilenodes from the top-level txn */
- ReorderBufferSequenceCleanup(txn);
-
/* remove potential on-disk data, and deallocate this tx */
ReorderBufferCleanupTXN(rb, txn);
@@ -3668,9 +3630,6 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
else
Assert(txn->ninvalidations == 0);
- /* remove sequence relfilenodes from a top-level txn */
- ReorderBufferSequenceCleanup(txn);
-
/* remove potential on-disk data, and deallocate */
ReorderBufferCleanupTXN(rb, txn);
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 58a99b74060..ad3a56d8f0f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -704,6 +704,12 @@ struct ReorderBuffer
*/
int64 totalTxns; /* total number of transactions sent */
int64 totalBytes; /* total amount of data decoded */
+
+ /*
+ * Sequence relfilenodes created in any transaction (also includes
+ * altered sequences, which assigns new relfilenode).
+ */
+ HTAB *sequences_hash;
};
--
2.41.0