v20241110-0015-allow-parallel-execution-queries-with-session-variab.patch
text/x-patch
Filename: v20241110-0015-allow-parallel-execution-queries-with-session-variab.patch
Type: text/x-patch
Part: 5
Message:
Re: proposal: schema variables
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v20241110-0015
Subject: allow parallel execution queries with session variables
| File | + | − |
|---|---|---|
| doc/src/sgml/parallel.sgml | 0 | 6 |
| src/backend/executor/execMain.c | 13 | 1 |
| src/backend/executor/execParallel.c | 145 | 2 |
| src/backend/optimizer/util/clauses.c | 6 | 12 |
| src/test/regress/expected/session_variables.out | 7 | 5 |
From 5c3e6810a32310d9cc11f39163b1e46725ceb2c6 Mon Sep 17 00:00:00 2001
From: "okbob@github.com" <okbob@github.com>
Date: Sat, 20 Jan 2024 07:57:29 +0100
Subject: [PATCH 15/20] allow parallel execution queries with session variables
---
doc/src/sgml/parallel.sgml | 6 -
src/backend/executor/execMain.c | 14 +-
src/backend/executor/execParallel.c | 147 +++++++++++++++++-
src/backend/optimizer/util/clauses.c | 18 +--
.../regress/expected/session_variables.out | 12 +-
5 files changed, 171 insertions(+), 26 deletions(-)
diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 683dede6adc..1ce9abf86f5 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -515,12 +515,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
Plan nodes that reference a correlated <literal>SubPlan</literal>.
</para>
</listitem>
-
- <listitem>
- <para>
- Plan nodes that use a session variable.
- </para>
- </listitem>
</itemizedlist>
<sect2 id="parallel-labeling">
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index ba6e38263cc..b926d37af17 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -211,7 +211,19 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
* be changed inside query execution time, and then a reference to
* previously returned value can be corrupted).
*/
- if (queryDesc->plannedstmt->sessionVariables)
+ if (queryDesc->num_session_variables > 0)
+ {
+ /*
+ * When a parallel query needs to access query parameters (including
+ * related session variables), then related session variables are
+ * restored (deserialized) in queryDesc already. So just push pointer
+ * of this array to executor's estate.
+ */
+ Assert(IsParallelWorker());
+ estate->es_session_variables = queryDesc->session_variables;
+ estate->es_num_session_variables = queryDesc->num_session_variables;
+ }
+ else if (queryDesc->plannedstmt->sessionVariables)
{
ListCell *lc;
int nSessionVariables;
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index bfb3419efb7..cb434e2768e 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -12,8 +12,9 @@
* workers and ensuring that their state generally matches that of the
* leader; see src/backend/access/transam/README.parallel for details.
* However, we must save and restore relevant executor state, such as
- * any ParamListInfo associated with the query, buffer/WAL usage info, and
- * the actual plan to be passed down to the worker.
+ * any ParamListInfo associated with the query, buffer/WAL usage info,
+ * session variables buffer, and the actual plan to be passed down to
+ * the worker.
*
* IDENTIFICATION
* src/backend/executor/execParallel.c
@@ -64,6 +65,7 @@
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_SESSION_VARIABLES UINT64CONST(0xE00000000000000B)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@@ -138,6 +140,12 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
/* Helper function that runs in the parallel worker. */
static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
+/* Helper functions that can pass values of session variables */
+static Size EstimateSessionVariables(EState *estate);
+static void SerializeSessionVariables(EState *estate, char **start_address);
+static SessionVariableValue *RestoreSessionVariables(char **start_address,
+ int *num_session_variables);
+
/*
* Create a serialized representation of the plan to be sent to each worker.
*/
@@ -596,6 +604,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
char *pstmt_data;
char *pstmt_space;
char *paramlistinfo_space;
+ char *session_variables_space;
BufferUsage *bufusage_space;
WalUsage *walusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
@@ -605,6 +614,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
int instrumentation_len = 0;
int jit_instrumentation_len = 0;
int instrument_offset = 0;
+ int session_variables_len = 0;
Size dsa_minsize = dsa_minimum_size();
char *query_string;
int query_len;
@@ -660,6 +670,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Estimate space for serialized session variables. */
+ session_variables_len = EstimateSessionVariables(estate);
+ shm_toc_estimate_chunk(&pcxt->estimator, session_variables_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/*
* Estimate space for BufferUsage.
*
@@ -761,6 +776,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
+ /* Store serialized session variables. */
+ session_variables_space = shm_toc_allocate(pcxt->toc, session_variables_len);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_VARIABLES, session_variables_space);
+ SerializeSessionVariables(estate, &session_variables_space);
+
/* Allocate space for each worker's BufferUsage; no need to initialize. */
bufusage_space = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
@@ -1411,6 +1431,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
SharedJitInstrumentation *jit_instrumentation;
int instrument_options = 0;
void *area_space;
+ char *sessionvariable_space;
dsa_area *area;
ParallelWorkerContext pwcxt;
@@ -1436,6 +1457,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
area = dsa_attach_in_place(area_space, seg);
+ /* Reconstruct session variables. */
+ sessionvariable_space = shm_toc_lookup(toc,
+ PARALLEL_KEY_SESSION_VARIABLES,
+ false);
+ queryDesc->session_variables =
+ RestoreSessionVariables(&sessionvariable_space,
+ &queryDesc->num_session_variables);
+
/* Start up the executor */
queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
ExecutorStart(queryDesc, fpes->eflags);
@@ -1504,3 +1533,117 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
FreeQueryDesc(queryDesc);
receiver->rDestroy(receiver);
}
+
+/*
+ * Estimate the amount of space required to serialize a session variable.
+ */
+static Size
+EstimateSessionVariables(EState *estate)
+{
+ int i;
+ Size sz = sizeof(int);
+
+ if (estate->es_session_variables == NULL)
+ return sz;
+
+ for (i = 0; i < estate->es_num_session_variables; i++)
+ {
+ SessionVariableValue *svarval;
+ Oid typeOid;
+ int16 typLen;
+ bool typByVal;
+
+ svarval = &estate->es_session_variables[i];
+
+ typeOid = svarval->typid;
+
+ sz = add_size(sz, sizeof(Oid)); /* space for type OID */
+
+ /* space for datum/isnull */
+ Assert(OidIsValid(typeOid));
+ get_typlenbyval(typeOid, &typLen, &typByVal);
+
+ sz = add_size(sz,
+ datumEstimateSpace(svarval->value, svarval->isnull, typByVal, typLen));
+ }
+
+ return sz;
+}
+
+/*
+ * Serialize a session variables buffer into caller-provided storage.
+ *
+ * We write the number of parameters first, as a 4-byte integer, and then
+ * write details for each parameter in turn. The details for each parameter
+ * consist of a 4-byte type OID, and then the datum as serialized by
+ * datumSerialize(). The caller is responsible for ensuring that there is
+ * enough storage to store the number of bytes that will be written; use
+ * EstimateSessionVariables to find out how many will be needed.
+ * *start_address is updated to point to the byte immediately following those
+ * written.
+ *
+ * RestoreSessionVariables can be used to recreate a session variable buffer
+ * based on the serialized representation;
+ */
+static void
+SerializeSessionVariables(EState *estate, char **start_address)
+{
+ int nparams;
+ int i;
+
+ /* Write number of parameters. */
+ nparams = estate->es_num_session_variables;
+ memcpy(*start_address, &nparams, sizeof(int));
+ *start_address += sizeof(int);
+
+ /* Write each parameter in turn. */
+ for (i = 0; i < nparams; i++)
+ {
+ SessionVariableValue *svarval;
+ Oid typeOid;
+ int16 typLen;
+ bool typByVal;
+
+ svarval = &estate->es_session_variables[i];
+ typeOid = svarval->typid;
+
+ /* Write type OID. */
+ memcpy(*start_address, &typeOid, sizeof(Oid));
+ *start_address += sizeof(Oid);
+
+ Assert(OidIsValid(typeOid));
+ get_typlenbyval(typeOid, &typLen, &typByVal);
+
+ datumSerialize(svarval->value, svarval->isnull, typByVal, typLen,
+ start_address);
+ }
+}
+
+static SessionVariableValue *
+RestoreSessionVariables(char **start_address, int *num_session_variables)
+{
+ SessionVariableValue *session_variables;
+ int i;
+ int nparams;
+
+ memcpy(&nparams, *start_address, sizeof(int));
+ *start_address += sizeof(int);
+
+ *num_session_variables = nparams;
+ session_variables = (SessionVariableValue *)
+ palloc(nparams * sizeof(SessionVariableValue));
+
+ for (i = 0; i < nparams; i++)
+ {
+ SessionVariableValue *svarval = &session_variables[i];
+
+ /* Read type OID. */
+ memcpy(&svarval->typid, *start_address, sizeof(Oid));
+ *start_address += sizeof(Oid);
+
+ /* Read datum/isnull. */
+ svarval->value = datumRestore(start_address, &svarval->isnull);
+ }
+
+ return session_variables;
+}
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index a5452c0c9e4..8cee732561d 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -924,25 +924,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
/*
* We can't pass Params to workers at the moment either, so they are also
- * parallel-restricted, unless they are PARAM_EXTERN Params or are
- * PARAM_EXEC Params listed in safe_param_ids, meaning they could be
- * either generated within workers or can be computed by the leader and
- * then their value can be passed to workers.
+ * parallel-restricted, unless they are PARAM_EXTERN or PARAM_VARIABLE
+ * Params or are PARAM_EXEC Params listed in safe_param_ids, meaning they
+ * could be either generated within workers or can be computed by the
+ * leader and then their value can be passed to workers.
*/
else if (IsA(node, Param))
{
Param *param = (Param *) node;
- if (param->paramkind == PARAM_EXTERN)
+ if (param->paramkind == PARAM_EXTERN ||
+ param->paramkind == PARAM_VARIABLE)
return false;
- /* we don't support passing session variables to workers */
- if (param->paramkind == PARAM_VARIABLE)
- {
- if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
- return true;
- }
-
if (param->paramkind != PARAM_EXEC ||
!list_member_int(context->safe_param_ids, param->paramid))
{
diff --git a/src/test/regress/expected/session_variables.out b/src/test/regress/expected/session_variables.out
index 388c8ee8e72..20bf282b3f2 100644
--- a/src/test/regress/expected/session_variables.out
+++ b/src/test/regress/expected/session_variables.out
@@ -644,12 +644,14 @@ SELECT count(*) FROM svar_test WHERE a%10 = zero;
-- parallel execution is not supported yet
EXPLAIN (COSTS OFF) SELECT count(*) FROM svar_test WHERE a%10 = zero;
- QUERY PLAN
------------------------------------
+ QUERY PLAN
+--------------------------------------------
Aggregate
- -> Seq Scan on svar_test
- Filter: ((a % 10) = zero)
-(3 rows)
+ -> Gather
+ Workers Planned: 2
+ -> Parallel Seq Scan on svar_test
+ Filter: ((a % 10) = zero)
+(5 rows)
LET zero = (SELECT count(*) FROM svar_test);
-- result should be 1000
--
2.47.0