0001-Experimental-ExecScrollSlot-for-hash-join-prefetch.patch

application/x-patch

Filename: 0001-Experimental-ExecScrollSlot-for-hash-join-prefetch.patch
Type: application/x-patch
Part: 0
Message: Re: Experimenting with hash join prefetch

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch 0001
Subject: Experimental ExecScrollSlot() for hash join prefetch.
File+
src/backend/access/heap/heapam.c 20 1
src/backend/access/heap/heapam_handler.c 5 5
src/backend/executor/execTuples.c 33 3
src/backend/executor/nodeHash.c 37 0
src/backend/executor/nodeHashjoin.c 4 0
src/include/access/heapam.h 1 0
src/include/executor/hashjoin.h 5 0
src/include/executor/nodeHash.h 6 0
src/include/executor/tuptable.h 25 1
From a352a861372aa0738746fce0b5a5dc962bb87d95 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Wed, 10 Apr 2019 15:19:23 +1200
Subject: [PATCH] Experimental ExecScrollSlot() for hash join prefetch.

Incomplet, inkorrect, proof-of-concept code only.

Problems with ExecScrollSlot():
* needs a configure test
* not preserving deform state when scrolling back and forth
* should ExecScrollSlot(n) be absolute or relative?
* how could we have a "consuming" mode, so that has build could use a tight
  inner loop over all the tuples in a page?
* look-ahead size of just 1 for now!
* only works with page-mode BufferHeapTupleTableSlot with no key test
* ...

Problems with hash join prefetch:
* anti-joins are broken (!), see make check, haven't looked into why yet
* ignoring skew hash table
* not prefetching in hash build phase
* need parallel hash join support
* ...

Author: Thomas Munro
Discussion: https://postgr.es/m/flat/CA%2Bq6zcXg5-Rc4k0JY%2B7%3DgEDGWjCVp0X9t7JdnCuaAfeNmtTEZQ%40mail.gmail.com#35d34634efe8315587efd0b0da9775fc
---
 src/backend/access/heap/heapam.c         | 21 +++++++++++++-
 src/backend/access/heap/heapam_handler.c | 10 +++----
 src/backend/executor/execTuples.c        | 36 +++++++++++++++++++++--
 src/backend/executor/nodeHash.c          | 37 ++++++++++++++++++++++++
 src/backend/executor/nodeHashjoin.c      |  4 +++
 src/include/access/heapam.h              |  1 +
 src/include/executor/hashjoin.h          |  5 ++++
 src/include/executor/nodeHash.h          |  6 ++++
 src/include/executor/tuptable.h          | 26 ++++++++++++++++-
 9 files changed, 136 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a05b6a07ad..6e66acb688 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -292,6 +292,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	scan->rs_numblocks = InvalidBlockNumber;
 	scan->rs_inited = false;
 	scan->rs_ctup.t_data = NULL;
+	scan->rs_ntup.t_data = NULL;
 	ItemPointerSetInvalid(&scan->rs_ctup.t_self);
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
@@ -493,6 +494,8 @@ heapgettup(HeapScanDesc scan,
 	int			linesleft;
 	ItemId		lpp;
 
+	scan->rs_ntup.t_data = NULL;
+
 	/*
 	 * calculate next starting lineoff, given scan direction
 	 */
@@ -807,6 +810,8 @@ heapgettup_pagemode(HeapScanDesc scan,
 	int			linesleft;
 	ItemId		lpp;
 
+	scan->rs_ntup.t_data = NULL;
+
 	/*
 	 * calculate next starting lineindex, given scan direction
 	 */
@@ -983,6 +988,18 @@ heapgettup_pagemode(HeapScanDesc scan,
 			else
 			{
 				scan->rs_cindex = lineindex;
+
+				/* Allow executor nodes to scroll to the next tuple. */
+				if (linesleft > 1)
+				{
+					lineoff = scan->rs_vistuples[lineindex + (backward ? -1 : 1)];
+					lpp = PageGetItemId(dp, lineoff);
+					Assert(ItemIdIsNormal(lpp));
+					tuple = &scan->rs_ntup;
+					tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+					tuple->t_len = ItemIdGetLength(lpp);
+					ItemPointerSet(&(tuple->t_self), page, lineoff);
+				}
 				return;
 			}
 
@@ -1356,7 +1373,9 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s
 
 	pgstat_count_heap_getnext(scan->rs_base.rs_rd);
 
-	ExecStoreBufferHeapTuple(&scan->rs_ctup, slot,
+	ExecStoreBufferHeapTuple(&scan->rs_ctup,
+							 scan->rs_ntup.t_data ? &scan->rs_ntup : NULL,
+							 slot,
 							 scan->rs_cbuf);
 	return true;
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 1d8ca2429f..8c1a3c6564 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -163,7 +163,7 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 		*call_again = !IsMVCCSnapshot(snapshot);
 
 		slot->tts_tableOid = RelationGetRelid(scan->rel);
-		ExecStoreBufferHeapTuple(&bslot->base.tupdata, slot, hscan->xs_cbuf);
+		ExecStoreBufferHeapTuple(&bslot->base.tupdata, NULL, slot, hscan->xs_cbuf);
 	}
 	else
 	{
@@ -1104,7 +1104,7 @@ heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
 
 		if (sample_it)
 		{
-			ExecStoreBufferHeapTuple(targtuple, slot, hscan->rs_cbuf);
+			ExecStoreBufferHeapTuple(targtuple, NULL, slot, hscan->rs_cbuf);
 			hscan->rs_cindex++;
 
 			/* note that we leave the buffer locked here! */
@@ -1578,7 +1578,7 @@ heapam_index_build_range_scan(Relation heapRelation,
 		MemoryContextReset(econtext->ecxt_per_tuple_memory);
 
 		/* Set up for predicate or expression evaluation */
-		ExecStoreBufferHeapTuple(heapTuple, slot, hscan->rs_cbuf);
+		ExecStoreBufferHeapTuple(heapTuple, NULL, slot, hscan->rs_cbuf);
 
 		/*
 		 * In a partial index, discard tuples that don't satisfy the
@@ -2220,7 +2220,7 @@ heapam_scan_bitmap_next_tuple(TableScanDesc scan,
 	 * Set up the result slot to point to this tuple.  Note that the slot
 	 * acquires a pin on the buffer.
 	 */
-	ExecStoreBufferHeapTuple(&hscan->rs_ctup,
+	ExecStoreBufferHeapTuple(&hscan->rs_ctup, NULL,
 							 slot,
 							 hscan->rs_cbuf);
 
@@ -2374,7 +2374,7 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
 			if (!pagemode)
 				LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK);
 
-			ExecStoreBufferHeapTuple(tuple, slot, hscan->rs_cbuf);
+			ExecStoreBufferHeapTuple(tuple, NULL, slot, hscan->rs_cbuf);
 
 			/* Count successfully-fetched tuples as heap fetches */
 			pgstat_count_heap_getnext(scan->rs_rd);
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 41fa374b6f..715b7dcb7d 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -76,6 +76,7 @@ slot_deform_heap_tuple(TupleTableSlot *slot, HeapTuple tuple, uint32 *offp,
 					   int natts);
 static inline void tts_buffer_heap_store_tuple(TupleTableSlot *slot,
 											   HeapTuple tuple,
+											   HeapTuple next,
 											   Buffer buffer,
 											   bool transfer_pin);
 static void tts_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple, bool shouldFree);
@@ -664,6 +665,7 @@ tts_buffer_heap_clear(TupleTableSlot *slot)
 	ItemPointerSetInvalid(&slot->tts_tid);
 	bslot->base.tuple = NULL;
 	bslot->base.off = 0;
+	bslot->ntuples = 0;
 	bslot->buffer = InvalidBuffer;
 }
 
@@ -737,6 +739,9 @@ tts_buffer_heap_materialize(TupleTableSlot *slot)
 	 */
 	bslot->base.off = 0;
 	slot->tts_nvalid = 0;
+
+	bslot->ntuples = 1;
+	bslot->tuples[0] = bslot->base.tuple;
 }
 
 static void
@@ -767,7 +772,7 @@ tts_buffer_heap_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
 	{
 		Assert(BufferIsValid(bsrcslot->buffer));
 
-		tts_buffer_heap_store_tuple(dstslot, bsrcslot->base.tuple,
+		tts_buffer_heap_store_tuple(dstslot, bsrcslot->base.tuple, NULL,
 									bsrcslot->buffer, false);
 
 		/*
@@ -779,6 +784,24 @@ tts_buffer_heap_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
 		memcpy(&bdstslot->base.tupdata, bdstslot->base.tuple, sizeof(HeapTupleData));
 		bdstslot->base.tuple = &bdstslot->base.tupdata;
 	}
+	bdstslot->tuples[0] = bdstslot->base.tuple;
+	bdstslot->ntuples = 1;
+}
+
+static bool
+tts_buffer_heap_scrollslot(TupleTableSlot *slot, int n)
+{
+	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
+
+	if (n >= bslot->ntuples)
+		return false;
+
+	bslot->base.tuple = bslot->tuples[n];
+	bslot->base.off = 0;
+	slot->tts_nvalid = 0;
+	slot->tts_tid = bslot->base.tuple->t_self;
+
+	return true;
 }
 
 static HeapTuple
@@ -822,6 +845,7 @@ tts_buffer_heap_copy_minimal_tuple(TupleTableSlot *slot)
 
 static inline void
 tts_buffer_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple,
+							HeapTuple next_tuple,
 							Buffer buffer, bool transfer_pin)
 {
 	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
@@ -838,6 +862,9 @@ tts_buffer_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple,
 	slot->tts_flags &= ~TTS_FLAG_EMPTY;
 	slot->tts_nvalid = 0;
 	bslot->base.tuple = tuple;
+	bslot->tuples[0] = tuple;
+	bslot->tuples[1] = next_tuple;
+	bslot->ntuples = next_tuple ? 2 : 1;
 	bslot->base.off = 0;
 	slot->tts_tid = tuple->t_self;
 
@@ -1050,6 +1077,7 @@ const TupleTableSlotOps TTSOpsBufferHeapTuple = {
 	.getsysattr = tts_buffer_heap_getsysattr,
 	.materialize = tts_buffer_heap_materialize,
 	.copyslot = tts_buffer_heap_copyslot,
+	.scrollslot= tts_buffer_heap_scrollslot,
 	.get_heap_tuple = tts_buffer_heap_get_heap_tuple,
 
 	/* A buffer heap tuple table slot can not "own" a minimal tuple. */
@@ -1339,6 +1367,7 @@ ExecStoreHeapTuple(HeapTuple tuple,
  *		into a specified slot in the tuple table.
  *
  *		tuple:	tuple to store
+ *		next_tuple: if available, the next tuple of a scan, in the same buffer
  *		slot:	TTSOpsBufferHeapTuple type slot to store it in
  *		buffer: disk buffer if tuple is in a disk page, else InvalidBuffer
  *
@@ -1353,6 +1382,7 @@ ExecStoreHeapTuple(HeapTuple tuple,
  */
 TupleTableSlot *
 ExecStoreBufferHeapTuple(HeapTuple tuple,
+						 HeapTuple next_tuple,
 						 TupleTableSlot *slot,
 						 Buffer buffer)
 {
@@ -1366,7 +1396,7 @@ ExecStoreBufferHeapTuple(HeapTuple tuple,
 
 	if (unlikely(!TTS_IS_BUFFERTUPLE(slot)))
 		elog(ERROR, "trying to store an on-disk heap tuple into wrong type of slot");
-	tts_buffer_heap_store_tuple(slot, tuple, buffer, false);
+	tts_buffer_heap_store_tuple(slot, tuple, next_tuple, buffer, false);
 
 	slot->tts_tableOid = tuple->t_tableOid;
 
@@ -1392,7 +1422,7 @@ ExecStorePinnedBufferHeapTuple(HeapTuple tuple,
 
 	if (unlikely(!TTS_IS_BUFFERTUPLE(slot)))
 		elog(ERROR, "trying to store an on-disk heap tuple into wrong type of slot");
-	tts_buffer_heap_store_tuple(slot, tuple, buffer, true);
+	tts_buffer_heap_store_tuple(slot, tuple, NULL, buffer, true);
 
 	slot->tts_tableOid = tuple->t_tableOid;
 
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 64eec91f8b..cdf5f4db1b 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
+	hashtable->have_prefetch = false;
 
 #ifdef HJDEBUG
 	printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
@@ -1769,6 +1770,34 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
 		heap_free_minimal_tuple(tuple);
 }
 
+void
+ExecHashPrefetch(HashJoinTable hashtable,
+				 TupleTableSlot *slot,
+				 ExprContext *econtext,
+				 List *hashkeys,
+				 bool outer_tuple,
+				 bool keep_nulls)
+{
+	hashtable->have_prefetch = false;
+
+	if (ExecScrollSlot(slot, 1))
+	{
+		uint32		next_bucket;
+
+		hashtable->prefetch_result =
+			ExecHashGetHashValue(hashtable,
+								 econtext,
+								 hashkeys,
+								 outer_tuple,
+								 keep_nulls,
+								 &hashtable->prefetch_hash);
+		hashtable->have_prefetch = true;
+		next_bucket = hashtable->prefetch_hash % hashtable->nbuckets;
+		__builtin_prefetch(&hashtable->buckets.unshared[next_bucket]);
+		ExecScrollSlot(slot, 0);
+	}
+}
+
 /*
  * ExecHashGetHashValue
  *		Compute the hash value for a tuple
@@ -1796,6 +1825,14 @@ ExecHashGetHashValue(HashJoinTable hashtable,
 	int			i = 0;
 	MemoryContext oldContext;
 
+	/* Do we have a pre-computed result from ExecHashPrefetch()? */
+	if (hashtable->have_prefetch)
+	{
+		hashtable->have_prefetch = false;
+		*hashvalue = hashtable->prefetch_hash;
+		return hashtable->prefetch_result;
+	}
+
 	/*
 	 * We reset the eval context each time to reclaim any memory leaked in the
 	 * hashkey expressions.
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index aa43296e26..853eff4682 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -855,6 +855,10 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
 				/* remember outer relation is not empty for possible rescan */
 				hjstate->hj_OuterNotEmpty = true;
 
+				ExecHashPrefetch(hashtable, slot, econtext,
+								 hjstate->hj_OuterHashKeys,
+								 true,	/* outer tuple */
+								 HJ_FILL_OUTER(hjstate));
 				return slot;
 			}
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 77e5e603b0..1b2cd06dc5 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -63,6 +63,7 @@ typedef struct HeapScanDescData
 	BufferAccessStrategy rs_strategy;	/* access strategy for reads */
 
 	HeapTupleData rs_ctup;		/* current tuple in scan, if any */
+	HeapTupleData rs_ntup;		/* next tuple in scan, if any */
 
 	/* these fields only used in page-at-a-time mode and for bitmap scans */
 	int			rs_cindex;		/* current tuple's index in vistuples */
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2c94b926d3..dd0d3ded3d 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -329,6 +329,11 @@ typedef struct HashJoinTableData
 	BufFile   **innerBatchFile; /* buffered virtual temp file per batch */
 	BufFile   **outerBatchFile; /* buffered virtual temp file per batch */
 
+	/* State used for prefetching cache lines for future tuples. */
+	bool		have_prefetch;
+	uint32		prefetch_hash;
+	bool		prefetch_result;
+
 	/*
 	 * Info about the datatype-specific hash functions for the datatypes being
 	 * hashed. These are arrays of the same length as the number of hash join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 1233766023..8dab0c4e99 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -49,6 +49,12 @@ extern bool ExecHashGetHashValue(HashJoinTable hashtable,
 					 bool outer_tuple,
 					 bool keep_nulls,
 					 uint32 *hashvalue);
+extern void ExecHashPrefetch(HashJoinTable hashtable,
+							 TupleTableSlot *slot,
+							 ExprContext *econtext,
+							 List *hashkeys,
+							 bool outer_tuple,
+							 bool keep_nulls);
 extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 						  uint32 hashvalue,
 						  int *bucketno,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index b0561ebe29..951ade0de8 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -179,6 +179,11 @@ struct TupleTableSlotOps
 	 */
 	void (*copyslot) (TupleTableSlot *dstslot, TupleTableSlot *srcslot);
 
+	/*
+	 * Scroll forward to a future tuple, if the slot allows it.
+	 */
+	bool (*scrollslot) (TupleTableSlot *slot, int n);
+
 	/*
 	 * Return a heap tuple "owned" by the slot. It is slot's responsibility to
 	 * free the memory consumed by the heap tuple. If the slot can not "own" a
@@ -258,6 +263,12 @@ typedef struct BufferHeapTupleTableSlot
 {
 	HeapTupleTableSlot base;
 
+#define BUFFERHEAPTUPLE_SCROLL_SIZE 2
+	/* Buffer to support scrolling. */
+	HeapTuple	tuples[BUFFERHEAPTUPLE_SCROLL_SIZE];
+	int			ntuples;
+	int			ctuple;
+
 	/*
 	 * If buffer is not InvalidBuffer, then the slot is holding a pin on the
 	 * indicated buffer page; drop the pin when we release the slot's
@@ -265,7 +276,7 @@ typedef struct BufferHeapTupleTableSlot
 	 * false in such a case, since presumably tts_tuple is pointing at the
 	 * buffer page.)
 	 */
-	Buffer		buffer;		/* tuple's buffer, or InvalidBuffer */
+	Buffer		buffer;		/* tuples' buffer, or InvalidBuffer */
 } BufferHeapTupleTableSlot;
 
 typedef struct MinimalTupleTableSlot
@@ -308,6 +319,7 @@ extern TupleTableSlot *ExecStoreHeapTuple(HeapTuple tuple,
 				   bool shouldFree);
 extern void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot);
 extern TupleTableSlot *ExecStoreBufferHeapTuple(HeapTuple tuple,
+						 HeapTuple peek_next,
 						 TupleTableSlot *slot,
 						 Buffer buffer);
 extern TupleTableSlot *ExecStorePinnedBufferHeapTuple(HeapTuple tuple,
@@ -480,6 +492,18 @@ ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
 	return dstslot;
 }
 
+/*
+ * Try to scroll to another tuple.
+ */
+static inline bool
+ExecScrollSlot(TupleTableSlot *slot, int n)
+{
+	Assert(!TTS_EMPTY(slot));
+
+	return slot->tts_ops->scrollslot && slot->tts_ops->scrollslot(slot, n);
+}
+
+
 #endif							/* FRONTEND */
 
 #endif							/* TUPTABLE_H */
-- 
2.21.0