v4-0001-Add-batch-table-AM-API-and-heapam-implementation.patch
application/octet-stream
Filename: v4-0001-Add-batch-table-AM-API-and-heapam-implementation.patch
Type: application/octet-stream
Part: 0
Message:
Re: Batching in executor
From 24a3d208db93312788745882a01b526957919966 Mon Sep 17 00:00:00 2001
From: Amit Langote <amitlan@postgresql.org>
Date: Sat, 20 Dec 2025 17:21:56 +0900
Subject: [PATCH v4 1/3] Add batch table AM API and heapam implementation
Introduce new table AM callbacks to fetch multiple tuples per call.
This reduces per-tuple call overhead by letting executor nodes work
in batches.
Define a HeapBatch structure and supporting code in tableam.h.
Batches are limited to tuples from a single page and at most
EXEC_BATCH_ROWS (currently 64) entries.
Provide initial heapam support with heapgettup_pagemode_batch().
No executor node is switched over yet; a later commit will adapt
SeqScan to use this API. Other nodes may adopt it in the future.
Also add pgstat_count_heap_getnext_batch() to record batched fetches
in pgstat.
Reviewed-by: Daniil Davydov <3danissimo@gmail.com>
Discussion: https://postgr.es/m/CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com
---
src/backend/access/heap/heapam.c | 219 ++++++++++++++++++++++-
src/backend/access/heap/heapam_handler.c | 4 +
src/include/access/heapam.h | 18 ++
src/include/access/tableam.h | 58 ++++++
src/include/pgstat.h | 5 +
5 files changed, 303 insertions(+), 1 deletion(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 6daf4a87dec..fcc0813f139 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1023,7 +1023,7 @@ heapgettup_pagemode(HeapScanDesc scan,
int nkeys,
ScanKey key)
{
- HeapTuple tuple = &(scan->rs_ctup);
+ HeapTuple tuple = &scan->rs_ctup;
Page page;
uint32 lineindex;
uint32 linesleft;
@@ -1104,6 +1104,132 @@ continue_page:
scan->rs_inited = false;
}
+/*
+ * heapgettup_pagemode_batch
+ * Collect up to 'maxitems' visible tuples from a single page in page mode.
+ *
+ * This function returns a *batch* of tuples from one heap page. If the
+ * current page (as tracked by the scan desc) has no more tuples left,
+ * it will advance to the next page and prepare it (via heap_prepare_pagescan).
+ * It will not cross a page boundary while filling the batch.
+ *
+ * Return value:
+ * number of tuples written into 'tdata' (0 at end-of-scan).
+ *
+ * Side effects:
+ * - Ensures rs_cbuf pins the page from which tuples were produced.
+ * - Sets rs_cblock, rs_cindex, rs_ntuples consistently (same as
+ * heapgettup_pagemode’s inner-loop effects).
+ * - Does *not* change buffer pin counts except through normal page
+ * transitions performed by heap_fetch_next_buffer().
+ */
+static int
+heapgettup_pagemode_batch(HeapScanDesc scan,
+ ScanDirection dir,
+ int nkeys, ScanKey key,
+ HeapTupleData *tdata,
+ int maxitems)
+{
+ Page page;
+ uint32 lineindex;
+ uint32 linesleft;
+ int nout = 0;
+ Relation rel = scan->rs_base.rs_rd;
+ Oid tableOid = RelationGetRelid(rel);
+ TupleDesc tupdesc = key ? RelationGetDescr(rel) : NULL;
+
+ /*
+ * Current batching limitations (may be relaxed in future):
+ *
+ * - Forward scans only: backward scan support would require changes to
+ * batch iteration and page advancement logic.
+ *
+ * - Pagemode required: batching relies on the pre-built rs_vistuples[]
+ * array from heap_prepare_pagescan(). This is guaranteed by
+ * ScanCanUseBatching() which only enables batching when SO_ALLOW_PAGEMODE
+ * is set. Unlike heap_getnextslot, we don't support dynamic fallback to
+ * tuple-at-a-time mode since the batch execution path is selected at
+ * ExecInit time.
+ */
+ Assert(ScanDirectionIsForward(dir));
+ Assert(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE);
+ Assert(maxitems > 0);
+
+ /*
+ * If we have no current page (or the current page is exhausted),
+ * advance to the next page that has any visible tuples and prepare it.
+ * This mirrors the outer loop of heapgettup_pagemode(), but we stop
+ * as soon as we have a prepared page; we never produce from two pages.
+ */
+ for (;;)
+ {
+ if (BufferIsValid(scan->rs_cbuf))
+ {
+ /* Are there more visible tuples left on this page? */
+ lineindex = scan->rs_cindex + dir;
+ linesleft = (lineindex <= (uint32) scan->rs_ntuples) ?
+ (scan->rs_ntuples - lineindex) : 0;
+ if (linesleft > 0)
+ break; /* continue on this page */
+ }
+
+ /* Move to next page and prepare its visible tuple list. */
+ heap_fetch_next_buffer(scan, dir);
+
+ if (!BufferIsValid(scan->rs_cbuf))
+ {
+ /* end of scan; keep rs_cbuf invalid like heapgettup_pagemode */
+ scan->rs_cblock = InvalidBlockNumber;
+ scan->rs_prefetch_block = InvalidBlockNumber;
+ scan->rs_inited = false;
+ return 0;
+ }
+
+ Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock);
+ heap_prepare_pagescan((TableScanDesc) scan);
+
+ /* After prepare, either rs_ntuples > 0 or we'll loop again. */
+ if (scan->rs_ntuples > 0)
+ {
+ lineindex = 0;
+ linesleft = scan->rs_ntuples;
+ break;
+ }
+ /* else: page had no visible tuples; continue to next page */
+ }
+
+ /* From here on, we must only read tuples from this single page. */
+ page = BufferGetPage(scan->rs_cbuf);
+
+ /*
+ * Walk rs_vistuples[] from 'lineindex', copying headers into tdata[]
+ * until either the page is exhausted or the batch capacity is reached.
+ */
+ for (; linesleft > 0 && nout < maxitems; linesleft--, lineindex += dir)
+ {
+ OffsetNumber lineoff;
+ ItemId lpp;
+ HeapTupleData *dst = &tdata[nout];
+
+ Assert(lineindex <= (uint32) scan->rs_ntuples);
+ lineoff = scan->rs_vistuples[lineindex];
+ lpp = PageGetItemId(page, lineoff);
+ Assert(ItemIdIsNormal(lpp));
+
+ dst->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+ dst->t_len = ItemIdGetLength(lpp);
+ dst->t_tableOid = tableOid;
+ ItemPointerSet(&(dst->t_self), scan->rs_cblock, lineoff);
+
+ if (key != NULL && !HeapKeyTest(dst, tupdesc, nkeys, key))
+ continue;
+
+ scan->rs_cindex = lineindex;
+ nout++;
+ }
+
+ return nout;
+}
/* ----------------------------------------------------------------
* heap access method interface
@@ -1436,6 +1562,97 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s
return true;
}
+/*---------- Batching support -----------*/
+
+/*
+ * heap_scan_begin_batch
+ *
+ * Allocate a HeapBatch with space for 'maxitems' tuple headers. No pin is
+ * taken here. Memory is allocated under the scan's memory context.
+ */
+void *
+heap_begin_batch(TableScanDesc sscan, int maxitems)
+{
+ HeapBatch *hb;
+ Oid relid;
+
+ Assert(maxitems > 0);
+
+ hb = palloc(sizeof(HeapBatch));
+ hb->tupdata = palloc(sizeof(HeapTupleData) * maxitems);
+ hb->maxitems = maxitems;
+ hb->nitems = 0;
+ hb->buf = InvalidBuffer;
+
+ /* Initialize static fields of HeapTupleData. Row bodies remain on page. */
+ relid = RelationGetRelid(sscan->rs_rd);
+ for (int i = 0; i < maxitems; i++)
+ hb->tupdata[i].t_tableOid = relid;
+
+ return hb;
+}
+
+/*
+ * heap_scan_end_batch
+ *
+ * Release any outstanding pin and free the batch allocations. Caller will
+ * not use 'am_batch' after this point.
+ */
+void
+heap_end_batch(TableScanDesc sscan, void *am_batch)
+{
+ HeapBatch *hb = (HeapBatch *) am_batch;
+
+ if (BufferIsValid(hb->buf))
+ ReleaseBuffer(hb->buf);
+
+ pfree(hb->tupdata);
+ pfree(hb);
+}
+
+int
+heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir)
+{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+ HeapBatch *hb = (HeapBatch *) am_batch;
+ Buffer curbuf;
+ int n;
+
+ Assert(ScanDirectionIsForward(dir));
+ Assert(sscan->rs_flags & SO_ALLOW_PAGEMODE);
+ Assert(hb->maxitems > 0);
+
+ /* Drop prior batch pin, if any. */
+ if (BufferIsValid(hb->buf))
+ {
+ ReleaseBuffer(hb->buf);
+ hb->buf = InvalidBuffer;
+ }
+
+ hb->nitems = 0;
+
+ /* One call per batch, never crosses a page. */
+ n = heapgettup_pagemode_batch(scan, dir,
+ sscan->rs_nkeys, sscan->rs_key,
+ hb->tupdata, hb->maxitems);
+
+ if (n == 0)
+ return 0; /* end of scan */
+
+ /* Hold a shared pin for the batch lifetime so t_data stays valid. */
+ curbuf = scan->rs_cbuf;
+ IncrBufferRefCount(curbuf);
+ hb->buf = curbuf;
+
+ /* Per-tuple stats (can be collapsed into a future _multi() call). */
+ pgstat_count_heap_getnext_batch(sscan->rs_rd, n);
+
+ hb->nitems = n;
+ return n;
+}
+
+/*----- End of batching support -----*/
+
void
heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
ItemPointer maxtid)
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index dd4fe6bf62f..550b788553c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2623,6 +2623,10 @@ static const TableAmRoutine heapam_methods = {
.scan_rescan = heap_rescan,
.scan_getnextslot = heap_getnextslot,
+ .scan_begin_batch = heap_begin_batch,
+ .scan_getnextbatch = heap_getnextbatch,
+ .scan_end_batch = heap_end_batch,
+
.scan_set_tidrange = heap_set_tidrange,
.scan_getnextslot_tidrange = heap_getnextslot_tidrange,
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index f7e4ae3843c..f6675043fb3 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -101,6 +101,19 @@ typedef struct HeapScanDescData
} HeapScanDescData;
typedef struct HeapScanDescData *HeapScanDesc;
+/*
+ * HeapBatch -- stateless per-batch buffer. A batch pins one page and
+ * exposes up to maxitems HeapTupleData headers whose t_data point into that
+ * page.
+ */
+typedef struct HeapBatch
+{
+ HeapTupleData *tupdata; /* len = maxitems; headers only */
+ int nitems; /* tuples produced in last getnextbatch() */
+ int maxitems; /* fixed capacity set at begin_batch() */
+ Buffer buf; /* single pinned buffer for this batch */
+} HeapBatch;
+
typedef struct BitmapHeapScanDescData
{
HeapScanDescData rs_heap_base;
@@ -337,6 +350,11 @@ extern void heap_endscan(TableScanDesc sscan);
extern HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction);
extern bool heap_getnextslot(TableScanDesc sscan,
ScanDirection direction, TupleTableSlot *slot);
+
+extern void *heap_begin_batch(TableScanDesc sscan, int maxitems);
+extern void heap_end_batch(TableScanDesc sscan, void *am_batch);
+extern int heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir);
+
extern void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
ItemPointer maxtid);
extern bool heap_getnextslot_tidrange(TableScanDesc sscan,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 2fa790b6bf5..3ec3c3dd008 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -351,6 +351,16 @@ typedef struct TableAmRoutine
ScanDirection direction,
TupleTableSlot *slot);
+ /* ------------------------------------------------------------------------
+ * Batched scan support
+ * ------------------------------------------------------------------------
+ */
+
+ void *(*scan_begin_batch)(TableScanDesc sscan, int maxitems);
+ int (*scan_getnextbatch)(TableScanDesc sscan, void *am_batch,
+ ScanDirection dir);
+ void (*scan_end_batch)(TableScanDesc sscan, void *am_batch);
+
/*-----------
* Optional functions to provide scanning for ranges of ItemPointers.
* Implementations must either provide both of these functions, or neither
@@ -1036,6 +1046,54 @@ table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableS
return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
}
+/*
+ * table_scan_begin_batch
+ * Allocate AM-owned batch payload with capacity 'maxitems'.
+ */
+static inline void *
+table_scan_begin_batch(TableScanDesc sscan, int maxitems)
+{
+ const TableAmRoutine *tam = sscan->rs_rd->rd_tableam;
+
+ Assert(tam->scan_begin_batch != NULL);
+
+ return tam->scan_begin_batch(sscan, maxitems);
+}
+
+/*
+ * table_scan_getnextbatch
+ * Fill next batch from the AM. Returns number of tuples, 0 => EOS.
+ * Batches are single-page in v1. Direction is forward only in v1.
+ */
+static inline int
+table_scan_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir)
+{
+ const TableAmRoutine *tam = sscan->rs_rd->rd_tableam;
+
+ /* Only forward scans are supported in the batched mode. */
+ Assert(dir == ForwardScanDirection);
+ Assert(tam->scan_getnextbatch != NULL);
+
+ return tam->scan_getnextbatch(sscan, am_batch, dir);
+}
+
+/*
+ * table_scan_end_batch
+ * Release AM-owned resources for the batch payload.
+ */
+static inline void
+table_scan_end_batch(TableScanDesc sscan, void *am_batch)
+{
+ const TableAmRoutine *tam = sscan->rs_rd->rd_tableam;
+
+ if (am_batch == NULL)
+ return;
+
+ Assert(tam->scan_end_batch != NULL);
+
+ tam->scan_end_batch(sscan, am_batch);
+}
+
/* ----------------------------------------------------------------------------
* TID Range scanning related functions.
* ----------------------------------------------------------------------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 6714363144a..85f76dee468 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -697,6 +697,11 @@ extern void pgstat_report_analyze(Relation rel,
if (pgstat_should_count_relation(rel)) \
(rel)->pgstat_info->counts.tuples_returned++; \
} while (0)
+#define pgstat_count_heap_getnext_batch(rel, n) \
+ do { \
+ if (pgstat_should_count_relation(rel)) \
+ (rel)->pgstat_info->counts.tuples_returned += n; \
+ } while (0)
#define pgstat_count_heap_fetch(rel) \
do { \
if (pgstat_should_count_relation(rel)) \
--
2.47.3