v2-0006-Streamify-hash-index-VACUUM-primary-bucket-page-r.patch

application/x-patch

Filename: v2-0006-Streamify-hash-index-VACUUM-primary-bucket-page-r.patch
Type: application/x-patch
Part: 5
Message: Re: Streamify more code paths
From 7b3018a09bfb26f8c6d1a413fd80c631231073f3 Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Sun, 28 Dec 2025 18:29:28 +0800
Subject: [PATCH v2 6/6] Streamify hash index VACUUM primary bucket page reads

Refactor hashbulkdelete() to use the Read Stream  for primary bucket
pages. This enables prefetching of upcoming buckets while the current
one is being processed, improving I/O efficiency during hash index
vacuum operations.
---
 src/backend/access/hash/hash.c | 77 +++++++++++++++++++++++++++++++++-
 1 file changed, 76 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index e388252afdc..4000d5d8e99 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -30,6 +30,7 @@
 #include "nodes/execnodes.h"
 #include "optimizer/plancat.h"
 #include "pgstat.h"
+#include "storage/read_stream.h"
 #include "utils/fmgrprotos.h"
 #include "utils/index_selfuncs.h"
 #include "utils/rel.h"
@@ -42,12 +43,23 @@ typedef struct
 	Relation	heapRel;		/* heap relation descriptor */
 } HashBuildState;
 
+/* Working state for streaming reads in hashbulkdelete */
+typedef struct
+{
+	HashMetaPage metap;			/* cached metapage for BUCKET_TO_BLKNO */
+	Bucket		next_bucket;	/* next bucket to prefetch */
+	Bucket		max_bucket;		/* stop when next_bucket > max_bucket */
+}			HashBulkDeleteStreamPrivate;
+
 static void hashbuildCallback(Relation index,
 							  ItemPointer tid,
 							  Datum *values,
 							  bool *isnull,
 							  bool tupleIsAlive,
 							  void *state);
+static BlockNumber hash_bulkdelete_read_stream_cb(ReadStream *stream,
+												  void *callback_private_data,
+												  void *per_buffer_data);
 
 
 /*
@@ -450,6 +462,25 @@ hashendscan(IndexScanDesc scan)
 	scan->opaque = NULL;
 }
 
+/*
+ * Read stream callback for hashbulkdelete.
+ *
+ * Returns the block number of the primary page for the next bucket to
+ * vacuum, using the BUCKET_TO_BLKNO mapping from the cached metapage.
+ */
+static BlockNumber
+hash_bulkdelete_read_stream_cb(ReadStream *stream,
+							   void *callback_private_data,
+							   void *per_buffer_data)
+{
+	HashBulkDeleteStreamPrivate *p = callback_private_data;
+
+	if (p->next_bucket > p->max_bucket)
+		return InvalidBlockNumber;
+
+	return BUCKET_TO_BLKNO(p->metap, p->next_bucket++);
+}
+
 /*
  * Bulk deletion of all index entries pointing to a set of heap tuples.
  * The set of target tuples is specified via a callback routine that tells
@@ -474,6 +505,8 @@ hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	Buffer		metabuf = InvalidBuffer;
 	HashMetaPage metap;
 	HashMetaPage cachedmetap;
+	HashBulkDeleteStreamPrivate stream_private;
+	ReadStream *stream = NULL;
 
 	tuples_removed = 0;
 	num_index_tuples = 0;
@@ -495,6 +528,24 @@ hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	cur_maxbucket = orig_maxbucket;
 
 loop_top:
+	/* Set up streaming read for primary bucket pages */
+	stream_private.metap = cachedmetap;
+	stream_private.next_bucket = cur_bucket;
+	stream_private.max_bucket = cur_maxbucket;
+
+	/*
+	 * It is safe to use batchmode as hash_bulkdelete_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_USE_BATCHING,
+										info->strategy,
+										rel,
+										MAIN_FORKNUM,
+										hash_bulkdelete_read_stream_cb,
+										&stream_private,
+										0);
+
 	while (cur_bucket <= cur_maxbucket)
 	{
 		BlockNumber bucket_blkno;
@@ -514,7 +565,8 @@ loop_top:
 		 * We need to acquire a cleanup lock on the primary bucket page to out
 		 * wait concurrent scans before deleting the dead tuples.
 		 */
-		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(BufferIsValid(buf));
 		LockBufferForCleanup(buf);
 		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
 
@@ -545,6 +597,24 @@ loop_top:
 			{
 				cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
 				Assert(cachedmetap != NULL);
+
+				/*
+				 * Reset stream with updated metadata for remaining buckets.
+				 * The BUCKET_TO_BLKNO mapping depends on hashm_spares[],
+				 * which may have changed.
+				 */
+				read_stream_end(stream);
+				stream_private.metap = cachedmetap;
+				stream_private.next_bucket = cur_bucket + 1;
+				stream_private.max_bucket = cur_maxbucket;
+				stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+													READ_STREAM_USE_BATCHING,
+													info->strategy,
+													rel,
+													MAIN_FORKNUM,
+													hash_bulkdelete_read_stream_cb,
+													&stream_private,
+													0);
 			}
 		}
 
@@ -577,9 +647,14 @@ loop_top:
 		cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
 		Assert(cachedmetap != NULL);
 		cur_maxbucket = cachedmetap->hashm_maxbucket;
+		read_stream_end(stream);
 		goto loop_top;
 	}
 
+	/* Stream should be exhausted since we processed all buckets */
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
+
 	/* Okay, we're really done.  Update tuple count in metapage. */
 	START_CRIT_SECTION();
 
-- 
2.51.0