v1-0002-Unify-parallel-worker-handling-for-instrumentatio.patch
application/octet-stream
Filename: v1-0002-Unify-parallel-worker-handling-for-instrumentatio.patch
Type: application/octet-stream
Part: 2
From 57686a159d357f08b2d948f5d00ab43edb19cb10 Mon Sep 17 00:00:00 2001
From: Lukas Fittl <lukas@fittl.com>
Date: Sun, 31 May 2026 10:01:32 -0700
Subject: [PATCH v1 2/4] Unify parallel worker handling for instrumentation
Introduce helpers to estimate the shared memory required for
instrumentation, and for allocating and storing it in shared memory.
Author: Lukas Fittl <lukas@fittl.com>
Reviewed-by:
Discussion:
---
src/backend/access/brin/brin.c | 21 ++++--------------
src/backend/access/gin/gininsert.c | 21 ++++--------------
src/backend/access/nbtree/nbtsort.c | 21 ++++--------------
src/backend/access/transam/parallel.c | 32 +++++++++++++++++++++++++++
src/backend/commands/vacuumparallel.c | 25 +++++----------------
src/backend/executor/execParallel.c | 27 +++++-----------------
src/include/access/parallel.h | 4 ++++
7 files changed, 58 insertions(+), 93 deletions(-)
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index b04028a3858..8bd032c8669 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -2426,16 +2426,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
shm_toc_estimate_keys(&pcxt->estimator, 2);
- /*
- * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION.
- *
- * If there are no extensions loaded that care, we could skip this. We
- * have no way of knowing whether anyone's looking at pgWalUsage or
- * pgBufferUsage, so do it unconditionally.
- */
- shm_toc_estimate_chunk(&pcxt->estimator,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Estimate space for the per-worker Instrumentation array. */
+ EstimateParallelInstrumentation(pcxt);
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
if (debug_query_string)
@@ -2506,13 +2498,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
}
- /*
- * Allocate space for each worker's Instrumentation; no need to
- * initialize.
- */
- instr = shm_toc_allocate(pcxt->toc,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr);
+ /* Allocate space for each worker's Instrumentation. */
+ instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION);
/* Launch workers, saving status for leader/caller */
LaunchParallelWorkers(pcxt);
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index c6d144d12f5..97a8f38be5d 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -983,16 +983,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
shm_toc_estimate_keys(&pcxt->estimator, 2);
- /*
- * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION.
- *
- * If there are no extensions loaded that care, we could skip this. We
- * have no way of knowing whether anyone's looking at pgWalUsage or
- * pgBufferUsage, so do it unconditionally.
- */
- shm_toc_estimate_chunk(&pcxt->estimator,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Estimate space for the per-worker Instrumentation array. */
+ EstimateParallelInstrumentation(pcxt);
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
if (debug_query_string)
@@ -1058,13 +1050,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
}
- /*
- * Allocate space for each worker's Instrumentation; no need to
- * initialize.
- */
- instr = shm_toc_allocate(pcxt->toc,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr);
+ /* Allocate space for each worker's Instrumentation. */
+ instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION);
/* Launch workers, saving status for leader/caller */
LaunchParallelWorkers(pcxt);
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 0e5fa86cf17..7f0a1b88062 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1458,16 +1458,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
shm_toc_estimate_keys(&pcxt->estimator, 3);
}
- /*
- * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION.
- *
- * If there are no extensions loaded that care, we could skip this. We
- * have no way of knowing whether anyone's looking at pgWalUsage or
- * pgBufferUsage, so do it unconditionally.
- */
- shm_toc_estimate_chunk(&pcxt->estimator,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Estimate space for the per-worker Instrumentation array. */
+ EstimateParallelInstrumentation(pcxt);
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
if (debug_query_string)
@@ -1552,13 +1544,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
}
- /*
- * Allocate space for each worker's Instrumentation; no need to
- * initialize.
- */
- instr = shm_toc_allocate(pcxt->toc,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr);
+ /* Allocate space for each worker's Instrumentation. */
+ instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION);
/* Launch workers, saving status for leader/caller */
LaunchParallelWorkers(pcxt);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 89e9d224eec..17fb7c15aab 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -28,6 +28,7 @@
#include "commands/async.h"
#include "commands/vacuum.h"
#include "executor/execParallel.h"
+#include "executor/instrument.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
@@ -1026,6 +1027,37 @@ DestroyParallelContext(ParallelContext *pcxt)
pfree(pcxt);
}
+/*
+ * Helpers for managing the per-worker Instrumentation array that parallel
+ * leaders allocate in DSM. The worker side fills in its own slot directly via
+ * InstrEndParallelQuery.
+ */
+
+/* Reserve DSM space for the per-worker Instrumentation array. */
+void
+EstimateParallelInstrumentation(ParallelContext *pcxt)
+{
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(Instrumentation), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Allocate the per-worker Instrumentation array in DSM and publish it under
+ * the given key. No need to initialize; each worker fills in its own slot.
+ * Returns the array for the leader's convenience.
+ */
+Instrumentation *
+StoreParallelInstrumentation(ParallelContext *pcxt, uint64 key)
+{
+ Instrumentation *instr;
+
+ instr = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(Instrumentation), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, key, instr);
+ return instr;
+}
+
/*
* Are there any parallel contexts currently active?
*/
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index b2cdab310d6..5ffae66260d 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -307,7 +307,6 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
PVShared *shared;
TidStore *dead_items;
PVIndStats *indstats;
- Instrumentation *instr;
bool *will_parallel_vacuum;
Size est_indstats_len;
Size est_shared_len;
@@ -359,17 +358,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
- /*
- * Estimate space for Instrumentation --
- * PARALLEL_VACUUM_KEY_INSTRUMENTATION.
- *
- * If there are no extensions loaded that care, we could skip this. We
- * have no way of knowing whether anyone's looking at pgBufferUsage or
- * pgWalUsage, so do it unconditionally.
- */
- shm_toc_estimate_chunk(&pcxt->estimator,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Estimate space for the per-worker Instrumentation array. */
+ EstimateParallelInstrumentation(pcxt);
/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
if (debug_query_string)
@@ -465,14 +455,9 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
pvs->shared = shared;
- /*
- * Allocate space for each worker's Instrumentation; no need to
- * initialize.
- */
- instr = shm_toc_allocate(pcxt->toc,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INSTRUMENTATION, instr);
- pvs->instr = instr;
+ /* Allocate space for each worker's Instrumentation. */
+ pvs->instr = StoreParallelInstrumentation(pcxt,
+ PARALLEL_VACUUM_KEY_INSTRUMENTATION);
/* Store query string for workers */
if (debug_query_string)
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 89e717a1c50..4f202f544b3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -723,16 +723,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
- /*
- * Estimate space for Instrumentation.
- *
- * If EXPLAIN is not in use and there are no extensions loaded that care,
- * we could skip this. But we have no way of knowing whether anyone's
- * looking at pgBufferUsage, so do it unconditionally.
- */
- shm_toc_estimate_chunk(&pcxt->estimator,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Estimate space for the per-worker Instrumentation array. */
+ EstimateParallelInstrumentation(pcxt);
/* Estimate space for tuple queues. */
shm_toc_estimate_chunk(&pcxt->estimator,
@@ -817,18 +809,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
- /*
- * Allocate space for each worker's Instrumentation; no need to
- * initialize.
- */
- {
- Instrumentation *instr;
-
- instr = shm_toc_allocate(pcxt->toc,
- mul_size(sizeof(Instrumentation), pcxt->nworkers));
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr);
- pei->instrumentation = instr;
- }
+ /* Allocate space for each worker's Instrumentation. */
+ pei->instrumentation = StoreParallelInstrumentation(pcxt,
+ PARALLEL_KEY_INSTRUMENTATION);
/* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 60f857675e0..3c85924e85d 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -72,6 +72,10 @@ extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
extern void DestroyParallelContext(ParallelContext *pcxt);
extern bool ParallelContextActive(void);
+extern void EstimateParallelInstrumentation(ParallelContext *pcxt);
+extern struct Instrumentation *StoreParallelInstrumentation(ParallelContext *pcxt,
+ uint64 key);
+
extern void HandleParallelMessageInterrupt(void);
extern void ProcessParallelMessages(void);
extern void AtEOXact_Parallel(bool isCommit);
--
2.47.1