v1-0002-Unify-parallel-worker-handling-for-instrumentatio.patch

application/octet-stream

Filename: v1-0002-Unify-parallel-worker-handling-for-instrumentatio.patch
Type: application/octet-stream
Part: 2
Message: Unify parallel worker handling for index builds and instrumentation
From 57686a159d357f08b2d948f5d00ab43edb19cb10 Mon Sep 17 00:00:00 2001
From: Lukas Fittl <lukas@fittl.com>
Date: Sun, 31 May 2026 10:01:32 -0700
Subject: [PATCH v1 2/4] Unify parallel worker handling for instrumentation

Introduce helpers to estimate the shared memory required for
instrumentation, and for allocating and storing it in shared memory.

Author: Lukas Fittl <lukas@fittl.com>
Reviewed-by:
Discussion:
---
 src/backend/access/brin/brin.c        | 21 ++++--------------
 src/backend/access/gin/gininsert.c    | 21 ++++--------------
 src/backend/access/nbtree/nbtsort.c   | 21 ++++--------------
 src/backend/access/transam/parallel.c | 32 +++++++++++++++++++++++++++
 src/backend/commands/vacuumparallel.c | 25 +++++----------------
 src/backend/executor/execParallel.c   | 27 +++++-----------------
 src/include/access/parallel.h         |  4 ++++
 7 files changed, 58 insertions(+), 93 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index b04028a3858..8bd032c8669 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -2426,16 +2426,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 
 	shm_toc_estimate_keys(&pcxt->estimator, 2);
 
-	/*
-	 * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION.
-	 *
-	 * If there are no extensions loaded that care, we could skip this.  We
-	 * have no way of knowing whether anyone's looking at pgWalUsage or
-	 * pgBufferUsage, so do it unconditionally.
-	 */
-	shm_toc_estimate_chunk(&pcxt->estimator,
-						   mul_size(sizeof(Instrumentation), pcxt->nworkers));
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	/* Estimate space for the per-worker Instrumentation array. */
+	EstimateParallelInstrumentation(pcxt);
 
 	/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
 	if (debug_query_string)
@@ -2506,13 +2498,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
 	}
 
-	/*
-	 * Allocate space for each worker's Instrumentation; no need to
-	 * initialize.
-	 */
-	instr = shm_toc_allocate(pcxt->toc,
-							 mul_size(sizeof(Instrumentation), pcxt->nworkers));
-	shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr);
+	/* Allocate space for each worker's Instrumentation. */
+	instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION);
 
 	/* Launch workers, saving status for leader/caller */
 	LaunchParallelWorkers(pcxt);
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index c6d144d12f5..97a8f38be5d 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -983,16 +983,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 
 	shm_toc_estimate_keys(&pcxt->estimator, 2);
 
-	/*
-	 * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION.
-	 *
-	 * If there are no extensions loaded that care, we could skip this.  We
-	 * have no way of knowing whether anyone's looking at pgWalUsage or
-	 * pgBufferUsage, so do it unconditionally.
-	 */
-	shm_toc_estimate_chunk(&pcxt->estimator,
-						   mul_size(sizeof(Instrumentation), pcxt->nworkers));
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	/* Estimate space for the per-worker Instrumentation array. */
+	EstimateParallelInstrumentation(pcxt);
 
 	/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
 	if (debug_query_string)
@@ -1058,13 +1050,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
 	}
 
-	/*
-	 * Allocate space for each worker's Instrumentation; no need to
-	 * initialize.
-	 */
-	instr = shm_toc_allocate(pcxt->toc,
-							 mul_size(sizeof(Instrumentation), pcxt->nworkers));
-	shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr);
+	/* Allocate space for each worker's Instrumentation. */
+	instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION);
 
 	/* Launch workers, saving status for leader/caller */
 	LaunchParallelWorkers(pcxt);
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 0e5fa86cf17..7f0a1b88062 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1458,16 +1458,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 		shm_toc_estimate_keys(&pcxt->estimator, 3);
 	}
 
-	/*
-	 * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION.
-	 *
-	 * If there are no extensions loaded that care, we could skip this.  We
-	 * have no way of knowing whether anyone's looking at pgWalUsage or
-	 * pgBufferUsage, so do it unconditionally.
-	 */
-	shm_toc_estimate_chunk(&pcxt->estimator,
-						   mul_size(sizeof(Instrumentation), pcxt->nworkers));
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	/* Estimate space for the per-worker Instrumentation array. */
+	EstimateParallelInstrumentation(pcxt);
 
 	/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
 	if (debug_query_string)
@@ -1552,13 +1544,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
 	}
 
-	/*
-	 * Allocate space for each worker's Instrumentation; no need to
-	 * initialize.
-	 */
-	instr = shm_toc_allocate(pcxt->toc,
-							 mul_size(sizeof(Instrumentation), pcxt->nworkers));
-	shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr);
+	/* Allocate space for each worker's Instrumentation. */
+	instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION);
 
 	/* Launch workers, saving status for leader/caller */
 	LaunchParallelWorkers(pcxt);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 89e9d224eec..17fb7c15aab 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -28,6 +28,7 @@
 #include "commands/async.h"
 #include "commands/vacuum.h"
 #include "executor/execParallel.h"
+#include "executor/instrument.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqmq.h"
@@ -1026,6 +1027,37 @@ DestroyParallelContext(ParallelContext *pcxt)
 	pfree(pcxt);
 }
 
+/*
+ * Helpers for managing the per-worker Instrumentation array that parallel
+ * leaders allocate in DSM.  The worker side fills in its own slot directly via
+ * InstrEndParallelQuery.
+ */
+
+/* Reserve DSM space for the per-worker Instrumentation array. */
+void
+EstimateParallelInstrumentation(ParallelContext *pcxt)
+{
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(Instrumentation), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Allocate the per-worker Instrumentation array in DSM and publish it under
+ * the given key.  No need to initialize; each worker fills in its own slot.
+ * Returns the array for the leader's convenience.
+ */
+Instrumentation *
+StoreParallelInstrumentation(ParallelContext *pcxt, uint64 key)
+{
+	Instrumentation *instr;
+
+	instr = shm_toc_allocate(pcxt->toc,
+							 mul_size(sizeof(Instrumentation), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, key, instr);
+	return instr;
+}
+
 /*
  * Are there any parallel contexts currently active?
  */
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index b2cdab310d6..5ffae66260d 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -307,7 +307,6 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	PVShared   *shared;
 	TidStore   *dead_items;
 	PVIndStats *indstats;
-	Instrumentation *instr;
 	bool	   *will_parallel_vacuum;
 	Size		est_indstats_len;
 	Size		est_shared_len;
@@ -359,17 +358,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
-	/*
-	 * Estimate space for Instrumentation --
-	 * PARALLEL_VACUUM_KEY_INSTRUMENTATION.
-	 *
-	 * If there are no extensions loaded that care, we could skip this.  We
-	 * have no way of knowing whether anyone's looking at pgBufferUsage or
-	 * pgWalUsage, so do it unconditionally.
-	 */
-	shm_toc_estimate_chunk(&pcxt->estimator,
-						   mul_size(sizeof(Instrumentation), pcxt->nworkers));
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	/* Estimate space for the per-worker Instrumentation array. */
+	EstimateParallelInstrumentation(pcxt);
 
 	/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
 	if (debug_query_string)
@@ -465,14 +455,9 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
 	pvs->shared = shared;
 
-	/*
-	 * Allocate space for each worker's Instrumentation; no need to
-	 * initialize.
-	 */
-	instr = shm_toc_allocate(pcxt->toc,
-							 mul_size(sizeof(Instrumentation), pcxt->nworkers));
-	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INSTRUMENTATION, instr);
-	pvs->instr = instr;
+	/* Allocate space for each worker's Instrumentation. */
+	pvs->instr = StoreParallelInstrumentation(pcxt,
+											  PARALLEL_VACUUM_KEY_INSTRUMENTATION);
 
 	/* Store query string for workers */
 	if (debug_query_string)
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 89e717a1c50..4f202f544b3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -723,16 +723,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
-	/*
-	 * Estimate space for Instrumentation.
-	 *
-	 * If EXPLAIN is not in use and there are no extensions loaded that care,
-	 * we could skip this.  But we have no way of knowing whether anyone's
-	 * looking at pgBufferUsage, so do it unconditionally.
-	 */
-	shm_toc_estimate_chunk(&pcxt->estimator,
-						   mul_size(sizeof(Instrumentation), pcxt->nworkers));
-	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	/* Estimate space for the per-worker Instrumentation array. */
+	EstimateParallelInstrumentation(pcxt);
 
 	/* Estimate space for tuple queues. */
 	shm_toc_estimate_chunk(&pcxt->estimator,
@@ -817,18 +809,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
 	SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
 
-	/*
-	 * Allocate space for each worker's Instrumentation; no need to
-	 * initialize.
-	 */
-	{
-		Instrumentation *instr;
-
-		instr = shm_toc_allocate(pcxt->toc,
-								 mul_size(sizeof(Instrumentation), pcxt->nworkers));
-		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr);
-		pei->instrumentation = instr;
-	}
+	/* Allocate space for each worker's Instrumentation. */
+	pei->instrumentation = StoreParallelInstrumentation(pcxt,
+														PARALLEL_KEY_INSTRUMENTATION);
 
 	/* Set up the tuple queues that the workers will write into. */
 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 60f857675e0..3c85924e85d 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -72,6 +72,10 @@ extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
 extern void DestroyParallelContext(ParallelContext *pcxt);
 extern bool ParallelContextActive(void);
 
+extern void EstimateParallelInstrumentation(ParallelContext *pcxt);
+extern struct Instrumentation *StoreParallelInstrumentation(ParallelContext *pcxt,
+															uint64 key);
+
 extern void HandleParallelMessageInterrupt(void);
 extern void ProcessParallelMessages(void);
 extern void AtEOXact_Parallel(bool isCommit);
-- 
2.47.1