0001-Prefetch-cache-lines-while-building-hash-join-table.patch
application/octet-stream
Filename: 0001-Prefetch-cache-lines-while-building-hash-join-table.patch
Type: application/octet-stream
Part: 0
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0001
Subject: Prefetch cache lines while building hash join table.
| File | + | − |
|---|---|---|
| config/c-compiler.m4 | 17 | 0 |
| configure | 40 | 0 |
| configure.in | 3 | 0 |
| src/backend/executor/nodeHash.c | 106 | 6 |
| src/backend/executor/nodeHashjoin.c | 2 | 0 |
| src/include/c.h | 8 | 0 |
| src/include/executor/hashjoin.h | 15 | 0 |
| src/include/executor/nodeHash.h | 2 | 0 |
| src/include/pg_config.h.in | 3 | 0 |
From e0329c7a8a886e51a4d09e25ab22b70c8afc2079 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sun, 2 Feb 2020 22:24:22 +1300
Subject: [PATCH] Prefetch cache lines while building hash join table.
Since tuple insertion is expected to generate cache misses
most of the time for large hash tables, build a small software
prefetch pipeline, on compilers that provide a builtin for
that.
Experimental code only!
Discussion: https://postgr.es/m/CAEepm%3D2y9HM9QP%2BHhRZdQ3pU6FShSMyu%3DV1uHXhQ5gG-dketHg%40mail.gmail.com
---
config/c-compiler.m4 | 17 +++++
configure | 40 ++++++++++
configure.in | 3 +
src/backend/executor/nodeHash.c | 112 ++++++++++++++++++++++++++--
src/backend/executor/nodeHashjoin.c | 2 +
src/include/c.h | 8 ++
src/include/executor/hashjoin.h | 15 ++++
src/include/executor/nodeHash.h | 2 +
src/include/pg_config.h.in | 3 +
9 files changed, 196 insertions(+), 6 deletions(-)
diff --git a/config/c-compiler.m4 b/config/c-compiler.m4
index 71b645839d..656c259e9b 100644
--- a/config/c-compiler.m4
+++ b/config/c-compiler.m4
@@ -394,6 +394,23 @@ AC_DEFINE_UNQUOTED(AS_TR_CPP([HAVE$1]), 1,
[Define to 1 if your compiler understands $1.])
fi])# PGAC_CHECK_BUILTIN_FUNC
+# PGAC_CHECK_BUILTIN_VOID_FUNC
+# -----------------------
+# Variant for void functions.
+AC_DEFUN([PGAC_CHECK_BUILTIN_VOID_FUNC],
+[AC_CACHE_CHECK(for $1, pgac_cv$1,
+[AC_LINK_IFELSE([AC_LANG_PROGRAM([
+void
+call$1($2)
+{
+ $1(x);
+}], [])],
+[pgac_cv$1=yes],
+[pgac_cv$1=no])])
+if test x"${pgac_cv$1}" = xyes ; then
+AC_DEFINE_UNQUOTED(AS_TR_CPP([HAVE$1]), 1,
+ [Define to 1 if your compiler understands $1.])
+fi])# PGAC_CHECK_BUILTIN_VOID_FUNC
# PGAC_PROG_VARCC_VARFLAGS_OPT
diff --git a/configure b/configure
index 702adba839..73d9f92bec 100755
--- a/configure
+++ b/configure
@@ -15240,6 +15240,46 @@ _ACEOF
fi
+# Can we use a built-in to prefetch memory?
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for __builtin_prefetch" >&5
+$as_echo_n "checking for __builtin_prefetch... " >&6; }
+if ${pgac_cv__builtin_prefetch+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+void
+call__builtin_prefetch(void *x)
+{
+ __builtin_prefetch(x);
+}
+int
+main ()
+{
+
+ ;
+ return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+ pgac_cv__builtin_prefetch=yes
+else
+ pgac_cv__builtin_prefetch=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext conftest.$ac_ext
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $pgac_cv__builtin_prefetch" >&5
+$as_echo "$pgac_cv__builtin_prefetch" >&6; }
+if test x"${pgac_cv__builtin_prefetch}" = xyes ; then
+
+cat >>confdefs.h <<_ACEOF
+#define HAVE__BUILTIN_PREFETCH 1
+_ACEOF
+
+fi
+
ac_fn_c_check_func "$LINENO" "fseeko" "ac_cv_func_fseeko"
if test "x$ac_cv_func_fseeko" = xyes; then :
$as_echo "#define HAVE_FSEEKO 1" >>confdefs.h
diff --git a/configure.in b/configure.in
index 8165f70039..a6173dd362 100644
--- a/configure.in
+++ b/configure.in
@@ -1660,6 +1660,9 @@ PGAC_CHECK_BUILTIN_FUNC([__builtin_clz], [unsigned int x])
PGAC_CHECK_BUILTIN_FUNC([__builtin_ctz], [unsigned int x])
PGAC_CHECK_BUILTIN_FUNC([__builtin_popcount], [unsigned int x])
+# Can we use a built-in to prefetch memory?
+PGAC_CHECK_BUILTIN_VOID_FUNC([__builtin_prefetch], [void *x])
+
AC_REPLACE_FUNCS(fseeko)
case $host_os in
# NetBSD uses a custom fseeko/ftello built on fsetpos/fgetpos
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index b6d5084908..a44a358fee 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -68,6 +68,10 @@ static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table,
static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head,
HashJoinTuple tuple,
dsa_pointer tuple_shared);
+static inline void ExecParallelHashEnqueueTuple(HashJoinTable hashtable,
+ HashJoinTuple tuple,
+ dsa_pointer tuple_shared,
+ int bucketno);
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch);
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable);
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable);
@@ -79,6 +83,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
size_t size);
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashResetInsertQueue(HashJoinTable hashtable);
/* ----------------------------------------------------------------
@@ -189,6 +194,7 @@ MultiExecPrivateHash(HashState *node)
hashtable->totalTuples += 1;
}
}
+ ExecHashFlushInsertQueue(hashtable);
/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
@@ -289,6 +295,7 @@ MultiExecParallelHash(HashState *node)
ExecParallelHashTableInsert(hashtable, slot, hashvalue);
hashtable->partialTuples++;
}
+ ExecParallelHashFlushInsertQueue(hashtable);
/*
* Make sure that any tuples we wrote to disk are visible to
@@ -512,6 +519,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
hashtable->parallel_state = state->parallel_state;
hashtable->area = state->ps.state->es_query_dsa;
hashtable->batches = NULL;
+ ExecHashResetInsertQueue(hashtable);
#ifdef HJDEBUG
printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
@@ -905,6 +913,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
nbatch = oldnbatch * 2;
Assert(nbatch > 1);
+ ExecHashFlushInsertQueue(hashtable);
+
#ifdef HJDEBUG
printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",
hashtable, nbatch, hashtable->spaceUsed);
@@ -1062,6 +1072,8 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+ ExecHashResetInsertQueue(hashtable);
+
/*
* It's unlikely, but we need to be prepared for new participants to show
* up while we're in the middle of this operation so we need to switch on
@@ -1461,6 +1473,7 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
memset(hashtable->buckets.unshared, 0,
hashtable->nbuckets * sizeof(HashJoinTuple));
+ ExecHashResetInsertQueue(hashtable);
/* scan through all tuples in all chunks to rebuild the hash table */
for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared)
@@ -1501,6 +1514,8 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+ ExecHashResetInsertQueue(hashtable);
+
/*
* It's unlikely, but we need to be prepared for new participants to show
* up while we're in the middle of this operation so we need to switch on
@@ -1579,6 +1594,55 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
}
}
+static inline void
+ExecHashPushTuple(HashJoinTable hashtable, HashJoinTuple tuple, int bucketno)
+{
+ tuple->next.unshared = hashtable->buckets.unshared[bucketno];
+ hashtable->buckets.unshared[bucketno] = tuple;
+}
+
+static inline void
+ExecHashEnqueueTuple(HashJoinTable hashtable, HashJoinTuple tuple, int bucketno)
+{
+ HashJoinTableInserter *inserter = &hashtable->inserter;
+ int i = inserter->head;
+
+ /* Push the oldest item into the hash table. */
+ if (inserter->queue[i].tuple)
+ ExecHashPushTuple(hashtable,
+ inserter->queue[i].tuple,
+ inserter->queue[i].bucketno);
+
+ /* Start fetching the cache line, and add it to the queue. */
+ pg_prefetch_mem(&hashtable->buckets.unshared[bucketno]);
+ inserter->queue[i].tuple = tuple;
+ inserter->queue[i].bucketno = bucketno;
+ inserter->head = (i + 1) % HJ_INSERTION_QUEUE_DEPTH;
+}
+
+void
+ExecHashFlushInsertQueue(HashJoinTable hashtable)
+{
+ HashJoinTableInserter *inserter = &hashtable->inserter;
+
+ for (int i = 0; i < HJ_INSERTION_QUEUE_DEPTH; ++i)
+ {
+ if (inserter->queue[i].tuple)
+ {
+ ExecHashPushTuple(hashtable,
+ inserter->queue[i].tuple,
+ inserter->queue[i].bucketno);
+ inserter->queue[i].tuple = NULL;
+ }
+ }
+}
+
+static inline void
+ExecHashResetInsertQueue(HashJoinTable hashtable)
+{
+ memset(&hashtable->inserter, 0, sizeof(hashtable->inserter));
+}
+
/*
* ExecHashTableInsert
* insert a tuple into the hash table depending on the hash value
@@ -1631,8 +1695,7 @@ ExecHashTableInsert(HashJoinTable hashtable,
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
/* Push it onto the front of the bucket's list */
- hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
- hashtable->buckets.unshared[bucketno] = hashTuple;
+ ExecHashEnqueueTuple(hashtable, hashTuple, bucketno);
/*
* Increase the (optimal) number of buckets if we just exceeded the
@@ -1711,8 +1774,7 @@ retry:
memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
/* Push it onto the front of the bucket's list */
- ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
- hashTuple, shared);
+ ExecParallelHashEnqueueTuple(hashtable, hashTuple, shared, bucketno);
}
else
{
@@ -1758,14 +1820,14 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
Assert(batchno == hashtable->curbatch);
+
hashTuple = ExecParallelHashTupleAlloc(hashtable,
HJTUPLE_OVERHEAD + tuple->t_len,
&shared);
hashTuple->hashvalue = hashvalue;
memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
- ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
- hashTuple, shared);
+ ExecParallelHashEnqueueTuple(hashtable, hashTuple, shared, bucketno);
if (shouldFree)
heap_free_minimal_tuple(tuple);
@@ -3224,6 +3286,44 @@ ExecParallelHashPushTuple(dsa_pointer_atomic *head,
}
}
+static inline void
+ExecParallelHashEnqueueTuple(HashJoinTable hashtable, HashJoinTuple tuple,
+ dsa_pointer tuple_shared, int bucketno)
+{
+ HashJoinTableInserter *inserter = &hashtable->inserter;
+ int i = inserter->head;
+
+ /* Push the oldest item into the hash table. */
+ if (inserter->queue[i].tuple)
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[inserter->queue[i].bucketno],
+ inserter->queue[i].tuple,
+ inserter->queue[i].tuple_shared);
+
+ /* Start fetching the cache line, and add it to the queue. */
+ pg_prefetch_mem(&hashtable->buckets.unshared[bucketno]);
+ inserter->queue[i].tuple = tuple;
+ inserter->queue[i].tuple_shared = tuple_shared;
+ inserter->queue[i].bucketno = bucketno;
+ inserter->head = (i + 1) % HJ_INSERTION_QUEUE_DEPTH;
+}
+
+void
+ExecParallelHashFlushInsertQueue(HashJoinTable hashtable)
+{
+ HashJoinTableInserter *inserter = &hashtable->inserter;
+
+ for (int i = 0; i < HJ_INSERTION_QUEUE_DEPTH; ++i)
+ {
+ if (inserter->queue[i].tuple)
+ {
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[inserter->queue[i].bucketno],
+ inserter->queue[i].tuple,
+ inserter->queue[i].tuple_shared);
+ inserter->queue[i].tuple = NULL;
+ }
+ }
+}
+
/*
* Prepare to work on a given batch.
*/
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index c901a80923..93a12bd44e 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1056,6 +1056,7 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
*/
ExecHashTableInsert(hashtable, slot, hashvalue);
}
+ ExecHashFlushInsertQueue(hashtable);
/*
* after we build the hash table, the inner batch file is no longer
@@ -1160,6 +1161,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
hashvalue);
}
+ ExecParallelHashFlushInsertQueue(hashtable);
sts_end_parallel_scan(inner_tuples);
BarrierArriveAndWait(batch_barrier,
WAIT_EVENT_HASH_BATCH_LOADING);
diff --git a/src/include/c.h b/src/include/c.h
index 6898229b43..0546b13589 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -278,6 +278,14 @@
#endif
#endif
+/* Do we have support for prefetching memory? */
+#if defined(HAVE__BUILTIN_PREFETCH)
+#define pg_prefetch_mem(a) __builtin_prefetch(a)
+#elif defined(_MSC_VER)
+#define pg_prefetch_mem(a) _m_prefetch(a)
+#else
+#define pg_prefetch_mem(a)
+#endif
/* ----------------------------------------------------------------
* Section 2: bool, true, false
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 79b634e8ed..8acb1d40f4 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -281,6 +281,19 @@ typedef struct ParallelHashJoinState
#define PHJ_GROW_BUCKETS_REINSERTING 2
#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
+#define HJ_INSERTION_QUEUE_DEPTH 4
+
+typedef struct HashJoinTableInserter
+{
+ struct
+ {
+ HashJoinTupleData *tuple;
+ dsa_pointer tuple_shared;
+ int bucketno;
+ } queue[HJ_INSERTION_QUEUE_DEPTH];
+ int head;
+} HashJoinTableInserter;
+
typedef struct HashJoinTableData
{
int nbuckets; /* # buckets in the in-memory hash table */
@@ -348,6 +361,8 @@ typedef struct HashJoinTableData
MemoryContext hashCxt; /* context for whole-hash-join storage */
MemoryContext batchCxt; /* context for this-batch-only storage */
+ HashJoinTableInserter inserter;
+
/* used for dense allocation of tuples (into linked chunks) */
HashMemoryChunk chunks; /* one list for the whole batch */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 1336fde6b4..37825ff5d9 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -75,5 +75,7 @@ extern void ExecHashRetrieveInstrumentation(HashState *node);
extern void ExecShutdownHash(HashState *node);
extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
HashJoinTable hashtable);
+extern void ExecHashFlushInsertQueue(HashJoinTable hashtable);
+extern void ExecParallelHashFlushInsertQueue(HashJoinTable hashtable);
#endif /* NODEHASH_H */
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 6f485f73cd..f952cc8a81 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -758,6 +758,9 @@
/* Define to 1 if your compiler understands __builtin_popcount. */
#undef HAVE__BUILTIN_POPCOUNT
+/* Define to 1 if your compiler understands __builtin_prefetch. */
+#undef HAVE__BUILTIN_PREFETCH
+
/* Define to 1 if your compiler understands __builtin_types_compatible_p. */
#undef HAVE__BUILTIN_TYPES_COMPATIBLE_P
--
2.23.0