From e0329c7a8a886e51a4d09e25ab22b70c8afc2079 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Sun, 2 Feb 2020 22:24:22 +1300 Subject: [PATCH] Prefetch cache lines while building hash join table. Since tuple insertion is expected to generate cache misses most of the time for large hash tables, build a small software prefetch pipeline, on compilers that provide a builtin for that. Experimental code only! Discussion: https://postgr.es/m/CAEepm%3D2y9HM9QP%2BHhRZdQ3pU6FShSMyu%3DV1uHXhQ5gG-dketHg%40mail.gmail.com --- config/c-compiler.m4 | 17 +++++ configure | 40 ++++++++++ configure.in | 3 + src/backend/executor/nodeHash.c | 112 ++++++++++++++++++++++++++-- src/backend/executor/nodeHashjoin.c | 2 + src/include/c.h | 8 ++ src/include/executor/hashjoin.h | 15 ++++ src/include/executor/nodeHash.h | 2 + src/include/pg_config.h.in | 3 + 9 files changed, 196 insertions(+), 6 deletions(-) diff --git a/config/c-compiler.m4 b/config/c-compiler.m4 index 71b645839d..656c259e9b 100644 --- a/config/c-compiler.m4 +++ b/config/c-compiler.m4 @@ -394,6 +394,23 @@ AC_DEFINE_UNQUOTED(AS_TR_CPP([HAVE$1]), 1, [Define to 1 if your compiler understands $1.]) fi])# PGAC_CHECK_BUILTIN_FUNC +# PGAC_CHECK_BUILTIN_VOID_FUNC +# ----------------------- +# Variant for void functions. +AC_DEFUN([PGAC_CHECK_BUILTIN_VOID_FUNC], +[AC_CACHE_CHECK(for $1, pgac_cv$1, +[AC_LINK_IFELSE([AC_LANG_PROGRAM([ +void +call$1($2) +{ + $1(x); +}], [])], +[pgac_cv$1=yes], +[pgac_cv$1=no])]) +if test x"${pgac_cv$1}" = xyes ; then +AC_DEFINE_UNQUOTED(AS_TR_CPP([HAVE$1]), 1, + [Define to 1 if your compiler understands $1.]) +fi])# PGAC_CHECK_BUILTIN_VOID_FUNC # PGAC_PROG_VARCC_VARFLAGS_OPT diff --git a/configure b/configure index 702adba839..73d9f92bec 100755 --- a/configure +++ b/configure @@ -15240,6 +15240,46 @@ _ACEOF fi +# Can we use a built-in to prefetch memory? +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for __builtin_prefetch" >&5 +$as_echo_n "checking for __builtin_prefetch... " >&6; } +if ${pgac_cv__builtin_prefetch+:} false; then : + $as_echo_n "(cached) " >&6 +else + cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +void +call__builtin_prefetch(void *x) +{ + __builtin_prefetch(x); +} +int +main () +{ + + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + pgac_cv__builtin_prefetch=yes +else + pgac_cv__builtin_prefetch=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $pgac_cv__builtin_prefetch" >&5 +$as_echo "$pgac_cv__builtin_prefetch" >&6; } +if test x"${pgac_cv__builtin_prefetch}" = xyes ; then + +cat >>confdefs.h <<_ACEOF +#define HAVE__BUILTIN_PREFETCH 1 +_ACEOF + +fi + ac_fn_c_check_func "$LINENO" "fseeko" "ac_cv_func_fseeko" if test "x$ac_cv_func_fseeko" = xyes; then : $as_echo "#define HAVE_FSEEKO 1" >>confdefs.h diff --git a/configure.in b/configure.in index 8165f70039..a6173dd362 100644 --- a/configure.in +++ b/configure.in @@ -1660,6 +1660,9 @@ PGAC_CHECK_BUILTIN_FUNC([__builtin_clz], [unsigned int x]) PGAC_CHECK_BUILTIN_FUNC([__builtin_ctz], [unsigned int x]) PGAC_CHECK_BUILTIN_FUNC([__builtin_popcount], [unsigned int x]) +# Can we use a built-in to prefetch memory? +PGAC_CHECK_BUILTIN_VOID_FUNC([__builtin_prefetch], [void *x]) + AC_REPLACE_FUNCS(fseeko) case $host_os in # NetBSD uses a custom fseeko/ftello built on fsetpos/fgetpos diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index b6d5084908..a44a358fee 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -68,6 +68,10 @@ static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared); +static inline void ExecParallelHashEnqueueTuple(HashJoinTable hashtable, + HashJoinTuple tuple, + dsa_pointer tuple_shared, + int bucketno); static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch); static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable); static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable); @@ -79,6 +83,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, size_t size); static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); +static void ExecHashResetInsertQueue(HashJoinTable hashtable); /* ---------------------------------------------------------------- @@ -189,6 +194,7 @@ MultiExecPrivateHash(HashState *node) hashtable->totalTuples += 1; } } + ExecHashFlushInsertQueue(hashtable); /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ if (hashtable->nbuckets != hashtable->nbuckets_optimal) @@ -289,6 +295,7 @@ MultiExecParallelHash(HashState *node) ExecParallelHashTableInsert(hashtable, slot, hashvalue); hashtable->partialTuples++; } + ExecParallelHashFlushInsertQueue(hashtable); /* * Make sure that any tuples we wrote to disk are visible to @@ -512,6 +519,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, hashtable->parallel_state = state->parallel_state; hashtable->area = state->ps.state->es_query_dsa; hashtable->batches = NULL; + ExecHashResetInsertQueue(hashtable); #ifdef HJDEBUG printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n", @@ -905,6 +913,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) nbatch = oldnbatch * 2; Assert(nbatch > 1); + ExecHashFlushInsertQueue(hashtable); + #ifdef HJDEBUG printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n", hashtable, nbatch, hashtable->spaceUsed); @@ -1062,6 +1072,8 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + ExecHashResetInsertQueue(hashtable); + /* * It's unlikely, but we need to be prepared for new participants to show * up while we're in the middle of this operation so we need to switch on @@ -1461,6 +1473,7 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable) memset(hashtable->buckets.unshared, 0, hashtable->nbuckets * sizeof(HashJoinTuple)); + ExecHashResetInsertQueue(hashtable); /* scan through all tuples in all chunks to rebuild the hash table */ for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared) @@ -1501,6 +1514,8 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); + ExecHashResetInsertQueue(hashtable); + /* * It's unlikely, but we need to be prepared for new participants to show * up while we're in the middle of this operation so we need to switch on @@ -1579,6 +1594,55 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) } } +static inline void +ExecHashPushTuple(HashJoinTable hashtable, HashJoinTuple tuple, int bucketno) +{ + tuple->next.unshared = hashtable->buckets.unshared[bucketno]; + hashtable->buckets.unshared[bucketno] = tuple; +} + +static inline void +ExecHashEnqueueTuple(HashJoinTable hashtable, HashJoinTuple tuple, int bucketno) +{ + HashJoinTableInserter *inserter = &hashtable->inserter; + int i = inserter->head; + + /* Push the oldest item into the hash table. */ + if (inserter->queue[i].tuple) + ExecHashPushTuple(hashtable, + inserter->queue[i].tuple, + inserter->queue[i].bucketno); + + /* Start fetching the cache line, and add it to the queue. */ + pg_prefetch_mem(&hashtable->buckets.unshared[bucketno]); + inserter->queue[i].tuple = tuple; + inserter->queue[i].bucketno = bucketno; + inserter->head = (i + 1) % HJ_INSERTION_QUEUE_DEPTH; +} + +void +ExecHashFlushInsertQueue(HashJoinTable hashtable) +{ + HashJoinTableInserter *inserter = &hashtable->inserter; + + for (int i = 0; i < HJ_INSERTION_QUEUE_DEPTH; ++i) + { + if (inserter->queue[i].tuple) + { + ExecHashPushTuple(hashtable, + inserter->queue[i].tuple, + inserter->queue[i].bucketno); + inserter->queue[i].tuple = NULL; + } + } +} + +static inline void +ExecHashResetInsertQueue(HashJoinTable hashtable) +{ + memset(&hashtable->inserter, 0, sizeof(hashtable->inserter)); +} + /* * ExecHashTableInsert * insert a tuple into the hash table depending on the hash value @@ -1631,8 +1695,7 @@ ExecHashTableInsert(HashJoinTable hashtable, HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the bucket's list */ - hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; - hashtable->buckets.unshared[bucketno] = hashTuple; + ExecHashEnqueueTuple(hashtable, hashTuple, bucketno); /* * Increase the (optimal) number of buckets if we just exceeded the @@ -1711,8 +1774,7 @@ retry: memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); /* Push it onto the front of the bucket's list */ - ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], - hashTuple, shared); + ExecParallelHashEnqueueTuple(hashtable, hashTuple, shared, bucketno); } else { @@ -1758,14 +1820,14 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); Assert(batchno == hashtable->curbatch); + hashTuple = ExecParallelHashTupleAlloc(hashtable, HJTUPLE_OVERHEAD + tuple->t_len, &shared); hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); - ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], - hashTuple, shared); + ExecParallelHashEnqueueTuple(hashtable, hashTuple, shared, bucketno); if (shouldFree) heap_free_minimal_tuple(tuple); @@ -3224,6 +3286,44 @@ ExecParallelHashPushTuple(dsa_pointer_atomic *head, } } +static inline void +ExecParallelHashEnqueueTuple(HashJoinTable hashtable, HashJoinTuple tuple, + dsa_pointer tuple_shared, int bucketno) +{ + HashJoinTableInserter *inserter = &hashtable->inserter; + int i = inserter->head; + + /* Push the oldest item into the hash table. */ + if (inserter->queue[i].tuple) + ExecParallelHashPushTuple(&hashtable->buckets.shared[inserter->queue[i].bucketno], + inserter->queue[i].tuple, + inserter->queue[i].tuple_shared); + + /* Start fetching the cache line, and add it to the queue. */ + pg_prefetch_mem(&hashtable->buckets.unshared[bucketno]); + inserter->queue[i].tuple = tuple; + inserter->queue[i].tuple_shared = tuple_shared; + inserter->queue[i].bucketno = bucketno; + inserter->head = (i + 1) % HJ_INSERTION_QUEUE_DEPTH; +} + +void +ExecParallelHashFlushInsertQueue(HashJoinTable hashtable) +{ + HashJoinTableInserter *inserter = &hashtable->inserter; + + for (int i = 0; i < HJ_INSERTION_QUEUE_DEPTH; ++i) + { + if (inserter->queue[i].tuple) + { + ExecParallelHashPushTuple(&hashtable->buckets.shared[inserter->queue[i].bucketno], + inserter->queue[i].tuple, + inserter->queue[i].tuple_shared); + inserter->queue[i].tuple = NULL; + } + } +} + /* * Prepare to work on a given batch. */ diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index c901a80923..93a12bd44e 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1056,6 +1056,7 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) */ ExecHashTableInsert(hashtable, slot, hashvalue); } + ExecHashFlushInsertQueue(hashtable); /* * after we build the hash table, the inner batch file is no longer @@ -1160,6 +1161,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ExecParallelHashTableInsertCurrentBatch(hashtable, slot, hashvalue); } + ExecParallelHashFlushInsertQueue(hashtable); sts_end_parallel_scan(inner_tuples); BarrierArriveAndWait(batch_barrier, WAIT_EVENT_HASH_BATCH_LOADING); diff --git a/src/include/c.h b/src/include/c.h index 6898229b43..0546b13589 100644 --- a/src/include/c.h +++ b/src/include/c.h @@ -278,6 +278,14 @@ #endif #endif +/* Do we have support for prefetching memory? */ +#if defined(HAVE__BUILTIN_PREFETCH) +#define pg_prefetch_mem(a) __builtin_prefetch(a) +#elif defined(_MSC_VER) +#define pg_prefetch_mem(a) _m_prefetch(a) +#else +#define pg_prefetch_mem(a) +#endif /* ---------------------------------------------------------------- * Section 2: bool, true, false diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 79b634e8ed..8acb1d40f4 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -281,6 +281,19 @@ typedef struct ParallelHashJoinState #define PHJ_GROW_BUCKETS_REINSERTING 2 #define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */ +#define HJ_INSERTION_QUEUE_DEPTH 4 + +typedef struct HashJoinTableInserter +{ + struct + { + HashJoinTupleData *tuple; + dsa_pointer tuple_shared; + int bucketno; + } queue[HJ_INSERTION_QUEUE_DEPTH]; + int head; +} HashJoinTableInserter; + typedef struct HashJoinTableData { int nbuckets; /* # buckets in the in-memory hash table */ @@ -348,6 +361,8 @@ typedef struct HashJoinTableData MemoryContext hashCxt; /* context for whole-hash-join storage */ MemoryContext batchCxt; /* context for this-batch-only storage */ + HashJoinTableInserter inserter; + /* used for dense allocation of tuples (into linked chunks) */ HashMemoryChunk chunks; /* one list for the whole batch */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 1336fde6b4..37825ff5d9 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -75,5 +75,7 @@ extern void ExecHashRetrieveInstrumentation(HashState *node); extern void ExecShutdownHash(HashState *node); extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable); +extern void ExecHashFlushInsertQueue(HashJoinTable hashtable); +extern void ExecParallelHashFlushInsertQueue(HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 6f485f73cd..f952cc8a81 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -758,6 +758,9 @@ /* Define to 1 if your compiler understands __builtin_popcount. */ #undef HAVE__BUILTIN_POPCOUNT +/* Define to 1 if your compiler understands __builtin_prefetch. */ +#undef HAVE__BUILTIN_PREFETCH + /* Define to 1 if your compiler understands __builtin_types_compatible_p. */ #undef HAVE__BUILTIN_TYPES_COMPATIBLE_P -- 2.23.0