Thread

  1. [RFC PATCH v0 2/7] Aggregate EXPLAIN WAITS from parallel workers

    Ilmar Y <tanswis42@gmail.com> — 2026-05-08T23:22:32Z

    ---
     src/backend/commands/explain.c          |   4 +
     src/backend/executor/execMain.c         |   1 +
     src/backend/executor/execParallel.c     | 129 ++++++++++++++++++++++++
     src/backend/executor/execUtils.c        |   1 +
     src/backend/utils/activity/wait_event.c |  22 +++-
     src/include/executor/execParallel.h     |   2 +
     src/include/nodes/execnodes.h           |   2 +
     src/include/utils/wait_event.h          |   3 +
     src/test/regress/expected/explain.out   |  17 ++++
     src/test/regress/sql/explain.sql        |  12 +++
     10 files changed, 190 insertions(+), 3 deletions(-)
    
    diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
    index 0b7cc5c15c6..9d7372f5415 100644
    --- a/src/backend/commands/explain.c
    +++ b/src/backend/commands/explain.c
    @@ -594,6 +594,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
     			waitEventUsagePtr = &waitEventUsage;
     			pgstat_begin_wait_event_usage(waitEventUsagePtr,
     										  queryDesc->estate->es_query_cxt);
    +			queryDesc->estate->es_wait_event_usage = waitEventUsagePtr;
     		}
     
     		/* run the plan */
    @@ -607,7 +608,10 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
     		PG_FINALLY();
     		{
     			if (waitEventUsagePtr)
    +			{
     				pgstat_end_wait_event_usage(waitEventUsagePtr);
    +				queryDesc->estate->es_wait_event_usage = NULL;
    +			}
     		}
     		PG_END_TRY();
     
    diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
    index 4b30f768680..86ab124c1c0 100644
    --- a/src/backend/executor/execMain.c
    +++ b/src/backend/executor/execMain.c
    @@ -3066,6 +3066,7 @@ EvalPlanQualStart(EPQState *epqstate, Plan *planTree)
     	/* es_trig_target_relations must NOT be copied */
     	rcestate->es_top_eflags = parentestate->es_top_eflags;
     	rcestate->es_instrument = parentestate->es_instrument;
    +	rcestate->es_wait_event_usage = parentestate->es_wait_event_usage;
     	/* es_auxmodifytables must NOT be copied */
     
     	/*
    diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
    index 81b87d82fab..8213565a708 100644
    --- a/src/backend/executor/execParallel.c
    +++ b/src/backend/executor/execParallel.c
    @@ -51,6 +51,7 @@
     #include "utils/dsa.h"
     #include "utils/lsyscache.h"
     #include "utils/snapmgr.h"
    +#include "utils/wait_event.h"
     
     /*
      * Magic numbers for parallel executor communication.  We use constants
    @@ -67,6 +68,7 @@
     #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
     #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
     #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
    +#define PARALLEL_KEY_WAIT_EVENT_USAGE	UINT64CONST(0xE00000000000000B)
     
     #define PARALLEL_TUPLE_QUEUE_SIZE		65536
     
    @@ -114,6 +116,18 @@ struct SharedExecutorInstrumentation
     	(StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
     	 (NodeInstrumentation *) (((char *) sei) + sei->instrument_offset))
     
    +typedef struct SharedWaitEventUsageWorker
    +{
    +	int			nentries;
    +	dsa_pointer entries;
    +} SharedWaitEventUsageWorker;
    +
    +struct SharedWaitEventUsage
    +{
    +	int			num_workers;
    +	SharedWaitEventUsageWorker worker_usage[FLEXIBLE_ARRAY_MEMBER];
    +};
    +
     /* Context object for ExecParallelEstimate. */
     typedef struct ExecParallelEstimateContext
     {
    @@ -141,6 +155,10 @@ static bool ExecParallelReInitializeDSM(PlanState *planstate,
     										ParallelContext *pcxt);
     static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
     												SharedExecutorInstrumentation *instrumentation);
    +static void ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei);
    +static void ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared,
    +											 dsa_area *area,
    +											 const WaitEventUsage *usage);
     
     /* Helper function that runs in the parallel worker. */
     static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
    @@ -664,10 +682,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
     	char	   *paramlistinfo_space;
     	BufferUsage *bufusage_space;
     	WalUsage   *walusage_space;
    +	SharedWaitEventUsage *wait_event_usage = NULL;
     	SharedExecutorInstrumentation *instrumentation = NULL;
     	SharedJitInstrumentation *jit_instrumentation = NULL;
     	int			pstmt_len;
     	int			paramlistinfo_len;
    +	int			wait_event_usage_len = 0;
     	int			instrumentation_len = 0;
     	int			jit_instrumentation_len = 0;
     	int			instrument_offset = 0;
    @@ -744,6 +764,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
     						   mul_size(sizeof(WalUsage), pcxt->nworkers));
     	shm_toc_estimate_keys(&pcxt->estimator, 1);
     
    +	/* Estimate space for per-worker wait event usage metadata. */
    +	if (estate->es_wait_event_usage != NULL)
    +	{
    +		wait_event_usage_len =
    +			offsetof(SharedWaitEventUsage, worker_usage) +
    +			mul_size(sizeof(SharedWaitEventUsageWorker), pcxt->nworkers);
    +		shm_toc_estimate_chunk(&pcxt->estimator, wait_event_usage_len);
    +		shm_toc_estimate_keys(&pcxt->estimator, 1);
    +	}
    +
     	/* Estimate space for tuple queues. */
     	shm_toc_estimate_chunk(&pcxt->estimator,
     						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
    @@ -839,6 +869,21 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
     	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
     	pei->wal_usage = walusage_space;
     
    +	/* Allocate metadata for each worker's wait event usage, if requested. */
    +	if (estate->es_wait_event_usage != NULL)
    +	{
    +		wait_event_usage = shm_toc_allocate(pcxt->toc, wait_event_usage_len);
    +		wait_event_usage->num_workers = nworkers;
    +		for (int i = 0; i < nworkers; i++)
    +		{
    +			wait_event_usage->worker_usage[i].nentries = 0;
    +			wait_event_usage->worker_usage[i].entries = InvalidDsaPointer;
    +		}
    +		shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAIT_EVENT_USAGE,
    +					   wait_event_usage);
    +		pei->wait_event_usage = wait_event_usage;
    +	}
    +
     	/* Set up the tuple queues that the workers will write into. */
     	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
     
    @@ -1213,6 +1258,68 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
     	memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
     }
     
    +static void
    +ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei)
    +{
    +	SharedWaitEventUsage *shared = pei->wait_event_usage;
    +	WaitEventUsage *usage;
    +
    +	if (shared == NULL)
    +		return;
    +
    +	usage = pei->planstate->state->es_wait_event_usage;
    +	if (usage == NULL)
    +		return;
    +
    +	for (int i = 0; i < shared->num_workers; i++)
    +	{
    +		SharedWaitEventUsageWorker *worker = &shared->worker_usage[i];
    +		WaitEventUsageEntry *entries;
    +
    +		if (worker->nentries <= 0 || !DsaPointerIsValid(worker->entries))
    +			continue;
    +
    +		entries = dsa_get_address(pei->area, worker->entries);
    +		pgstat_accumulate_wait_event_usage(usage,
    +										   entries,
    +										   worker->nentries);
    +		dsa_free(pei->area, worker->entries);
    +		worker->nentries = 0;
    +		worker->entries = InvalidDsaPointer;
    +	}
    +}
    +
    +static void
    +ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared,
    +								 dsa_area *area,
    +								 const WaitEventUsage *usage)
    +{
    +	SharedWaitEventUsageWorker *worker;
    +	WaitEventUsageEntry *entries;
    +	dsa_pointer entries_dsa;
    +	Size		entries_size;
    +
    +	Assert(shared != NULL);
    +	Assert(area != NULL);
    +	Assert(usage != NULL);
    +	Assert(IsParallelWorker());
    +	Assert(ParallelWorkerNumber < shared->num_workers);
    +
    +	if (usage->nentries <= 0)
    +		return;
    +
    +	worker = &shared->worker_usage[ParallelWorkerNumber];
    +	entries_size = mul_size(sizeof(WaitEventUsageEntry), usage->nentries);
    +	entries_dsa = dsa_allocate(area, entries_size);
    +	entries = dsa_get_address(area, entries_dsa);
    +	memcpy(entries, usage->entries, entries_size);
    +
    +	if (DsaPointerIsValid(worker->entries))
    +		dsa_free(area, worker->entries);
    +	worker->nentries = usage->nentries;
    +	worker->entries = entries_dsa;
    +}
    +
     /*
      * Finish parallel execution.  We wait for parallel workers to finish, and
      * accumulate their buffer/WAL usage.
    @@ -1261,6 +1368,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
     	for (i = 0; i < nworkers; i++)
     		InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
     
    +	/* Accumulate wait event usage, if requested. */
    +	ExecParallelRetrieveWaitEventUsage(pei);
    +
     	pei->finished = true;
     }
     
    @@ -1516,10 +1626,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
     	FixedParallelExecutorState *fpes;
     	BufferUsage *buffer_usage;
     	WalUsage   *wal_usage;
    +	SharedWaitEventUsage *wait_event_usage;
     	DestReceiver *receiver;
     	QueryDesc  *queryDesc;
     	SharedExecutorInstrumentation *instrumentation;
     	SharedJitInstrumentation *jit_instrumentation;
    +	WaitEventUsage waitEventUsage;
    +	WaitEventUsage *waitEventUsagePtr = NULL;
     	int			instrument_options = 0;
     	void	   *area_space;
     	dsa_area   *area;
    @@ -1535,6 +1648,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
     		instrument_options = instrumentation->instrument_options;
     	jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
     										 true);
    +	wait_event_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAIT_EVENT_USAGE, true);
     	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
     
     	/* Setting debug_query_string for individual workers */
    @@ -1576,6 +1690,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
     	 */
     	InstrStartParallelQuery();
     
    +	if (wait_event_usage != NULL)
    +	{
    +		waitEventUsagePtr = &waitEventUsage;
    +		pgstat_begin_wait_event_usage(waitEventUsagePtr,
    +									  queryDesc->estate->es_query_cxt);
    +	}
    +
     	/*
     	 * Run the plan.  If we specified a tuple bound, be careful not to demand
     	 * more tuples than that.
    @@ -1587,6 +1708,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
     	/* Shut down the executor */
     	ExecutorFinish(queryDesc);
     
    +	if (waitEventUsagePtr != NULL)
    +	{
    +		pgstat_end_wait_event_usage(waitEventUsagePtr);
    +		ExecParallelReportWaitEventUsage(wait_event_usage,
    +										 area,
    +										 waitEventUsagePtr);
    +	}
    +
     	/* Report buffer/WAL usage during parallel execution. */
     	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
     	wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
    diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
    index 1eb6b9f1f40..80ea777632b 100644
    --- a/src/backend/executor/execUtils.c
    +++ b/src/backend/executor/execUtils.c
    @@ -151,6 +151,7 @@ CreateExecutorState(void)
     
     	estate->es_top_eflags = 0;
     	estate->es_instrument = 0;
    +	estate->es_wait_event_usage = NULL;
     	estate->es_finished = false;
     
     	estate->es_exprcontexts = NIL;
    diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
    index 60d37ccbb73..eb01bc3d88c 100644
    --- a/src/backend/utils/activity/wait_event.c
    +++ b/src/backend/utils/activity/wait_event.c
    @@ -37,7 +37,7 @@ static const char *pgstat_get_wait_ipc(WaitEventIPC w);
     static const char *pgstat_get_wait_timeout(WaitEventTimeout w);
     static const char *pgstat_get_wait_io(WaitEventIO w);
     static void WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
    -							  const instr_time *elapsed);
    +							  uint64 calls, const instr_time *elapsed);
     
     
     static uint32 local_my_wait_event_info;
    @@ -442,15 +442,31 @@ pgstat_count_wait_event_end(void)
     
     	WaitEventUsageAdd(pgstat_wait_event_usage,
     					  pgstat_wait_event_usage_current,
    +					  1,
     					  &elapsed);
     
     	pgstat_wait_event_usage_current = 0;
     	INSTR_TIME_SET_ZERO(pgstat_wait_event_usage_start);
     }
     
    +void
    +pgstat_accumulate_wait_event_usage(WaitEventUsage *usage,
    +								   const WaitEventUsageEntry *entries,
    +								   int nentries)
    +{
    +	Assert(usage != NULL);
    +	Assert(nentries == 0 || entries != NULL);
    +
    +	for (int i = 0; i < nentries; i++)
    +		WaitEventUsageAdd(usage,
    +						  entries[i].wait_event_info,
    +						  entries[i].calls,
    +						  &entries[i].time);
    +}
    +
     static void
     WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
    -				  const instr_time *elapsed)
    +				  uint64 calls, const instr_time *elapsed)
     {
     	WaitEventUsageEntry *entry = NULL;
     
    @@ -494,7 +510,7 @@ WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
     		INSTR_TIME_SET_ZERO(entry->time);
     	}
     
    -	entry->calls++;
    +	entry->calls += calls;
     	INSTR_TIME_ADD(entry->time, *elapsed);
     }
     
    diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
    index 5a2034811d5..71df2c2511c 100644
    --- a/src/include/executor/execParallel.h
    +++ b/src/include/executor/execParallel.h
    @@ -20,6 +20,7 @@
     #include "utils/dsa.h"
     
     typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
    +typedef struct SharedWaitEventUsage SharedWaitEventUsage;
     
     typedef struct ParallelExecutorInfo
     {
    @@ -27,6 +28,7 @@ typedef struct ParallelExecutorInfo
     	ParallelContext *pcxt;		/* parallel context we're using */
     	BufferUsage *buffer_usage;	/* points to bufusage area in DSM */
     	WalUsage   *wal_usage;		/* walusage area in DSM */
    +	SharedWaitEventUsage *wait_event_usage;	/* optional */
     	SharedExecutorInstrumentation *instrumentation; /* optional */
     	struct SharedJitInstrumentation *jit_instrumentation;	/* optional */
     	dsa_area   *area;			/* points to DSA area in DSM */
    diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
    index 13359180d25..781c8c79132 100644
    --- a/src/include/nodes/execnodes.h
    +++ b/src/include/nodes/execnodes.h
    @@ -69,6 +69,7 @@ typedef struct Tuplestorestate Tuplestorestate;
     typedef struct TupleTableSlot TupleTableSlot;
     typedef struct TupleTableSlotOps TupleTableSlotOps;
     typedef struct WalUsage WalUsage;
    +typedef struct WaitEventUsage WaitEventUsage;
     typedef struct WorkerNodeInstrumentation WorkerNodeInstrumentation;
     
     
    @@ -754,6 +755,7 @@ typedef struct EState
     
     	int			es_top_eflags;	/* eflags passed to ExecutorStart */
     	int			es_instrument;	/* OR of InstrumentOption flags */
    +	WaitEventUsage *es_wait_event_usage;	/* EXPLAIN WAITS accumulator */
     	bool		es_finished;	/* true when ExecutorFinish is done */
     
     	List	   *es_exprcontexts;	/* List of ExprContexts within EState */
    diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
    index f7fab5736bb..63992137ee7 100644
    --- a/src/include/utils/wait_event.h
    +++ b/src/include/utils/wait_event.h
    @@ -39,6 +39,9 @@ extern void pgstat_reset_wait_event_storage(void);
     extern void pgstat_begin_wait_event_usage(WaitEventUsage *usage,
     										  MemoryContext memcontext);
     extern void pgstat_end_wait_event_usage(WaitEventUsage *usage);
    +extern void pgstat_accumulate_wait_event_usage(WaitEventUsage *usage,
    +											   const WaitEventUsageEntry *entries,
    +											   int nentries);
     extern void pgstat_count_wait_event_start(uint32 wait_event_info);
     extern void pgstat_count_wait_event_end(void);
     
    diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out
    index 2c7a7e1d4c6..e3847e222be 100644
    --- a/src/test/regress/expected/explain.out
    +++ b/src/test/regress/expected/explain.out
    @@ -114,6 +114,23 @@ select explain_filter_to_json('explain (analyze, waits, costs off, summary off,
      {"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": "Timeout"}
     (1 row)
     
    +begin;
    +create function pg_temp.parallel_pg_sleep(float8) returns void
    +  language internal volatile parallel safe as 'pg_sleep';
    +set local debug_parallel_query = on;
    +set local max_parallel_workers_per_gather = 1;
    +select jsonb_path_query_first(
    +  explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json)
    +                         select pg_temp.parallel_pg_sleep(0.01)
    +                         from tenk1 where unique1 = 1') #> '{0,Wait Events}',
    +  '$[*] ? (@."Wait Event" == "PgSleep")'
    +);
    +                              jsonb_path_query_first                              
    +----------------------------------------------------------------------------------
    + {"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": "Timeout"}
    +(1 row)
    +
    +rollback;
     explain (waits) select 1;
     ERROR:  EXPLAIN option WAITS requires ANALYZE
     \a
    diff --git a/src/test/regress/sql/explain.sql b/src/test/regress/sql/explain.sql
    index fe025ddeac5..8821250bcef 100644
    --- a/src/test/regress/sql/explain.sql
    +++ b/src/test/regress/sql/explain.sql
    @@ -71,6 +71,18 @@ select explain_filter('explain (buffers, format text) select * from int8_tbl i8'
     -- WAITS option
     select explain_filter('explain (analyze, waits, costs off, summary off, timing off, buffers off) select pg_sleep(0.01)');
     select explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json) select pg_sleep(0.01)') #> '{0,Wait Events,0}';
    +begin;
    +create function pg_temp.parallel_pg_sleep(float8) returns void
    +  language internal volatile parallel safe as 'pg_sleep';
    +set local debug_parallel_query = on;
    +set local max_parallel_workers_per_gather = 1;
    +select jsonb_path_query_first(
    +  explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json)
    +                         select pg_temp.parallel_pg_sleep(0.01)
    +                         from tenk1 where unique1 = 1') #> '{0,Wait Events}',
    +  '$[*] ? (@."Wait Event" == "PgSleep")'
    +);
    +rollback;
     explain (waits) select 1;
     
     \a
    -- 
    2.52.0