v20250108-2-0017-allow-parallel-execution-queries-with-session-variab.patch
text/x-patch
Filename: v20250108-2-0017-allow-parallel-execution-queries-with-session-variab.patch
Type: text/x-patch
Part: 5
Message:
Re: Re: proposal: schema variables
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v20250108-0017
Subject: allow parallel execution queries with session variables
| File | + | − |
|---|---|---|
| doc/src/sgml/parallel.sgml | 0 | 6 |
| src/backend/commands/session_variable.c | 23 | 0 |
| src/backend/executor/execMain.c | 16 | 4 |
| src/backend/executor/execParallel.c | 145 | 2 |
| src/backend/optimizer/util/clauses.c | 6 | 12 |
| src/backend/tcop/pquery.c | 3 | 0 |
| src/include/commands/session_variable.h | 1 | 0 |
| src/include/executor/execdesc.h | 4 | 0 |
| src/include/nodes/execnodes.h | 1 | 0 |
| src/test/regress/expected/session_variables.out | 7 | 5 |
From 5b43fe3c1c667d1b39ced3e15083705c0726098e Mon Sep 17 00:00:00 2001
From: Laurenz Albe <laurenz.albe@cybertec.at>
Date: Wed, 13 Nov 2024 15:08:17 +0100
Subject: [PATCH 17/22] allow parallel execution queries with session variables
---
doc/src/sgml/parallel.sgml | 6 -
src/backend/commands/session_variable.c | 23 +++
src/backend/executor/execMain.c | 20 ++-
src/backend/executor/execParallel.c | 147 +++++++++++++++++-
src/backend/optimizer/util/clauses.c | 18 +--
src/backend/tcop/pquery.c | 3 +
src/include/commands/session_variable.h | 1 +
src/include/executor/execdesc.h | 4 +
src/include/nodes/execnodes.h | 1 +
.../regress/expected/session_variables.out | 12 +-
10 files changed, 206 insertions(+), 29 deletions(-)
diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 683dede6ad..1ce9abf86f 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -515,12 +515,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
Plan nodes that reference a correlated <literal>SubPlan</literal>.
</para>
</listitem>
-
- <listitem>
- <para>
- Plan nodes that use a session variable.
- </para>
- </listitem>
</itemizedlist>
<sect2 id="parallel-labeling">
diff --git a/src/backend/commands/session_variable.c b/src/backend/commands/session_variable.c
index 4f284cc3a8..210d55b9e4 100644
--- a/src/backend/commands/session_variable.c
+++ b/src/backend/commands/session_variable.c
@@ -999,6 +999,29 @@ GetSessionVariable(Oid varid, bool *isNull)
return copy_session_variable_value(svar, isNull);
}
+/*
+ * Returns a copy of the value of the session variable (in the current memory
+ * context) plus typid of the session variable. The caller is responsible for
+ * permission checks.
+ */
+Datum
+GetSessionVariableWithTypeid(Oid varid, bool *isNull, Oid *typid)
+{
+ SVariable svar;
+
+ svar = get_session_variable(varid);
+
+ *typid = svar->typid;
+
+ /*
+ * Although "svar" is freshly validated in this point, svar->is_valid can
+ * be false, if an invalidation message was processed during the domain check.
+ * But the variable and all its dependencies are locked now, so we don't need
+ * to repeat the validation.
+ */
+ return copy_session_variable_value(svar, isNull);
+}
+
/*
* Assign the result of the evaluated expression to the session variable
*/
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index bb4e7bf06e..61cbb01908 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -210,7 +210,19 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
* be changed inside query execution time, and then a reference to
* previously returned value can be corrupted).
*/
- if (queryDesc->plannedstmt->sessionVariables)
+ if (queryDesc->num_session_variables > 0)
+ {
+ /*
+ * When a parallel query needs to access query parameters (including
+ * related session variables), then related session variables are
+ * restored (deserialized) in queryDesc already. So just push pointer
+ * of this array to executor's estate.
+ */
+ Assert(IsParallelWorker());
+ estate->es_session_variables = queryDesc->session_variables;
+ estate->es_num_session_variables = queryDesc->num_session_variables;
+ }
+ else if (queryDesc->plannedstmt->sessionVariables)
{
int nSessionVariables;
int i = 0;
@@ -249,9 +261,9 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
get_session_variable_name(varid));
}
- estate->es_session_variables[i].value =
- GetSessionVariable(varid,
- &estate->es_session_variables[i].isnull);
+ estate->es_session_variables[i].value = GetSessionVariableWithTypeid(varid,
+ &estate->es_session_variables[i].isnull,
+ &estate->es_session_variables[i].typid);
i++;
}
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ff4d9dd1bb..83e0391a44 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -12,8 +12,9 @@
* workers and ensuring that their state generally matches that of the
* leader; see src/backend/access/transam/README.parallel for details.
* However, we must save and restore relevant executor state, such as
- * any ParamListInfo associated with the query, buffer/WAL usage info, and
- * the actual plan to be passed down to the worker.
+ * any ParamListInfo associated with the query, buffer/WAL usage info,
+ * session variables buffer, and the actual plan to be passed down to
+ * the worker.
*
* IDENTIFICATION
* src/backend/executor/execParallel.c
@@ -64,6 +65,7 @@
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_SESSION_VARIABLES UINT64CONST(0xE00000000000000B)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@@ -138,6 +140,12 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
/* Helper function that runs in the parallel worker. */
static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
+/* Helper functions that can pass values of session variables */
+static Size EstimateSessionVariables(EState *estate);
+static void SerializeSessionVariables(EState *estate, char **start_address);
+static SessionVariableValue *RestoreSessionVariables(char **start_address,
+ int *num_session_variables);
+
/*
* Create a serialized representation of the plan to be sent to each worker.
*/
@@ -596,6 +604,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
char *pstmt_data;
char *pstmt_space;
char *paramlistinfo_space;
+ char *session_variables_space;
BufferUsage *bufusage_space;
WalUsage *walusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
@@ -605,6 +614,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
int instrumentation_len = 0;
int jit_instrumentation_len = 0;
int instrument_offset = 0;
+ int session_variables_len = 0;
Size dsa_minsize = dsa_minimum_size();
char *query_string;
int query_len;
@@ -660,6 +670,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Estimate space for serialized session variables. */
+ session_variables_len = EstimateSessionVariables(estate);
+ shm_toc_estimate_chunk(&pcxt->estimator, session_variables_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/*
* Estimate space for BufferUsage.
*
@@ -761,6 +776,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
+ /* Store serialized session variables. */
+ session_variables_space = shm_toc_allocate(pcxt->toc, session_variables_len);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_VARIABLES, session_variables_space);
+ SerializeSessionVariables(estate, &session_variables_space);
+
/* Allocate space for each worker's BufferUsage; no need to initialize. */
bufusage_space = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
@@ -1411,6 +1431,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
SharedJitInstrumentation *jit_instrumentation;
int instrument_options = 0;
void *area_space;
+ char *sessionvariable_space;
dsa_area *area;
ParallelWorkerContext pwcxt;
@@ -1436,6 +1457,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
area = dsa_attach_in_place(area_space, seg);
+ /* Reconstruct session variables. */
+ sessionvariable_space = shm_toc_lookup(toc,
+ PARALLEL_KEY_SESSION_VARIABLES,
+ false);
+ queryDesc->session_variables =
+ RestoreSessionVariables(&sessionvariable_space,
+ &queryDesc->num_session_variables);
+
/* Start up the executor */
queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
ExecutorStart(queryDesc, fpes->eflags);
@@ -1503,3 +1532,117 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
FreeQueryDesc(queryDesc);
receiver->rDestroy(receiver);
}
+
+/*
+ * Estimate the amount of space required to serialize a session variable.
+ */
+static Size
+EstimateSessionVariables(EState *estate)
+{
+ int i;
+ Size sz = sizeof(int);
+
+ if (estate->es_session_variables == NULL)
+ return sz;
+
+ for (i = 0; i < estate->es_num_session_variables; i++)
+ {
+ SessionVariableValue *svarval;
+ Oid typeOid;
+ int16 typLen;
+ bool typByVal;
+
+ svarval = &estate->es_session_variables[i];
+
+ typeOid = svarval->typid;
+
+ sz = add_size(sz, sizeof(Oid)); /* space for type OID */
+
+ /* space for datum/isnull */
+ Assert(OidIsValid(typeOid));
+ get_typlenbyval(typeOid, &typLen, &typByVal);
+
+ sz = add_size(sz,
+ datumEstimateSpace(svarval->value, svarval->isnull, typByVal, typLen));
+ }
+
+ return sz;
+}
+
+/*
+ * Serialize a session variables buffer into caller-provided storage.
+ *
+ * We write the number of parameters first, as a 4-byte integer, and then
+ * write details for each parameter in turn. The details for each parameter
+ * consist of a 4-byte type OID, and then the datum as serialized by
+ * datumSerialize(). The caller is responsible for ensuring that there is
+ * enough storage to store the number of bytes that will be written; use
+ * EstimateSessionVariables to find out how many will be needed.
+ * *start_address is updated to point to the byte immediately following those
+ * written.
+ *
+ * RestoreSessionVariables can be used to recreate a session variable buffer
+ * based on the serialized representation;
+ */
+static void
+SerializeSessionVariables(EState *estate, char **start_address)
+{
+ int nparams;
+ int i;
+
+ /* Write number of parameters. */
+ nparams = estate->es_num_session_variables;
+ memcpy(*start_address, &nparams, sizeof(int));
+ *start_address += sizeof(int);
+
+ /* Write each parameter in turn. */
+ for (i = 0; i < nparams; i++)
+ {
+ SessionVariableValue *svarval;
+ Oid typeOid;
+ int16 typLen;
+ bool typByVal;
+
+ svarval = &estate->es_session_variables[i];
+ typeOid = svarval->typid;
+
+ /* Write type OID. */
+ memcpy(*start_address, &typeOid, sizeof(Oid));
+ *start_address += sizeof(Oid);
+
+ Assert(OidIsValid(typeOid));
+ get_typlenbyval(typeOid, &typLen, &typByVal);
+
+ datumSerialize(svarval->value, svarval->isnull, typByVal, typLen,
+ start_address);
+ }
+}
+
+static SessionVariableValue *
+RestoreSessionVariables(char **start_address, int *num_session_variables)
+{
+ SessionVariableValue *session_variables;
+ int i;
+ int nparams;
+
+ memcpy(&nparams, *start_address, sizeof(int));
+ *start_address += sizeof(int);
+
+ *num_session_variables = nparams;
+ session_variables = (SessionVariableValue *)
+ palloc(nparams * sizeof(SessionVariableValue));
+
+ for (i = 0; i < nparams; i++)
+ {
+ SessionVariableValue *svarval = &session_variables[i];
+
+ /* Read type OID. */
+ memcpy(&svarval->typid, *start_address, sizeof(Oid));
+ *start_address += sizeof(Oid);
+
+ /* Read datum/isnull. */
+ svarval->value = datumRestore(start_address, &svarval->isnull);
+ }
+
+ return session_variables;
+}
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 2bc3b1f520..6a764ccd8d 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -923,25 +923,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
/*
* We can't pass Params to workers at the moment either, so they are also
- * parallel-restricted, unless they are PARAM_EXTERN Params or are
- * PARAM_EXEC Params listed in safe_param_ids, meaning they could be
- * either generated within workers or can be computed by the leader and
- * then their value can be passed to workers.
+ * parallel-restricted, unless they are PARAM_EXTERN or PARAM_VARIABLE
+ * Params or are PARAM_EXEC Params listed in safe_param_ids, meaning they
+ * could be either generated within workers or can be computed by the
+ * leader and then their value can be passed to workers.
*/
else if (IsA(node, Param))
{
Param *param = (Param *) node;
- if (param->paramkind == PARAM_EXTERN)
+ if (param->paramkind == PARAM_EXTERN ||
+ param->paramkind == PARAM_VARIABLE)
return false;
- /* we don't support passing session variables to workers */
- if (param->paramkind == PARAM_VARIABLE)
- {
- if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
- return true;
- }
-
if (param->paramkind != PARAM_EXEC ||
!list_member_int(context->safe_param_ids, param->paramid))
{
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 6f22496305..6f9bd7c8b0 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -86,6 +86,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt,
qd->queryEnv = queryEnv;
qd->instrument_options = instrument_options; /* instrumentation wanted? */
+ qd->num_session_variables = 0;
+ qd->session_variables = NULL;
+
/* null these fields until set by ExecutorStart */
qd->tupDesc = NULL;
qd->estate = NULL;
diff --git a/src/include/commands/session_variable.h b/src/include/commands/session_variable.h
index 774b39f2f9..386330c69c 100644
--- a/src/include/commands/session_variable.h
+++ b/src/include/commands/session_variable.h
@@ -29,6 +29,7 @@ extern void AtEOSubXact_SessionVariables(bool isCommit, SubTransactionId mySubid
extern void SetSessionVariable(Oid varid, Datum value, bool isNull);
extern Datum GetSessionVariable(Oid varid, bool *isNull);
+extern Datum GetSessionVariableWithTypeid(Oid varid, bool *isNull, Oid *typid);
extern void ExecuteLetStmt(ParseState *pstate, LetStmt *stmt, ParamListInfo params,
QueryEnvironment *queryEnv, QueryCompletion *qc);
diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h
index 86db3dc8d0..4e0fa3ea40 100644
--- a/src/include/executor/execdesc.h
+++ b/src/include/executor/execdesc.h
@@ -51,6 +51,10 @@ typedef struct QueryDesc
/* This field is set by ExecutePlan */
bool already_executed; /* true if previously executed */
+ /* reference to session variables buffer */
+ int num_session_variables;
+ SessionVariableValue *session_variables;
+
/* This is always set NULL by the core system, but plugins can change it */
struct Instrumentation *totaltime; /* total time spent in ExecutorRun */
} QueryDesc;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 8364a109e3..41a441834b 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -623,6 +623,7 @@ typedef struct AsyncRequest
*/
typedef struct SessionVariableValue
{
+ Oid typid;
bool isnull;
Datum value;
} SessionVariableValue;
diff --git a/src/test/regress/expected/session_variables.out b/src/test/regress/expected/session_variables.out
index b5ab05840a..f77f3b8066 100644
--- a/src/test/regress/expected/session_variables.out
+++ b/src/test/regress/expected/session_variables.out
@@ -1284,12 +1284,14 @@ SELECT count(*) FROM svar_test WHERE a%10 = zero;
-- parallel execution is not supported yet
EXPLAIN (COSTS OFF) SELECT count(*) FROM svar_test WHERE a%10 = zero;
- QUERY PLAN
------------------------------------
+ QUERY PLAN
+--------------------------------------------
Aggregate
- -> Seq Scan on svar_test
- Filter: ((a % 10) = zero)
-(3 rows)
+ -> Gather
+ Workers Planned: 2
+ -> Parallel Seq Scan on svar_test
+ Filter: ((a % 10) = zero)
+(5 rows)
LET zero = (SELECT count(*) FROM svar_test);
-- result should be 1000
--
2.47.1