v15-0003-get-parallel-safety-functions.patch
text/x-patch
Filename: v15-0003-get-parallel-safety-functions.patch
Type: text/x-patch
Part: 2
Message:
Re: Parallel INSERT SELECT take 2
From 6ee10db34d0f7b95cc0b0f27113c381e97815761 Mon Sep 17 00:00:00 2001
From: test <test>
Date: Fri, 8 May 2026 02:40:31 +0200
Subject: [PATCH v15 3/5] get-parallel-safety-functions
Provide a utility function "pg_get_table_parallel_dml_safety(regclass)" that
returns records of (objid, classid, parallel_safety) for all
parallel unsafe/restricted table-related objects from which the
table's parallel DML safety is determined. The user can use this
information during development in order to accurately declare a
table's parallel DML safety. Or to identify any problematic objects
if a parallel DML fails or behaves unexpectedly.
When the use of an index-related parallel unsafe/restricted function
is detected, both the function oid and the index oid are returned.
Provide a utility function "pg_get_table_max_parallel_dml_hazard(regclass)" that
returns the worst parallel DML safety hazard that can be found in the
given relation. Users can use this function to do a quick check without
caring about specific parallel-related objects.
---
src/backend/optimizer/util/clauses.c | 654 ++++++++++++++++++++++++++-
src/backend/utils/adt/misc.c | 91 ++++
src/backend/utils/cache/typcache.c | 17 +
src/include/catalog/pg_proc.dat | 22 +-
src/include/optimizer/clauses.h | 10 +
src/include/utils/typcache.h | 2 +
6 files changed, 791 insertions(+), 5 deletions(-)
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index b8a7a5e3f1d..6504fe6ad5d 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -19,15 +19,20 @@
#include "postgres.h"
+#include "access/amapi.h"
+#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/pg_class.h"
+#include "catalog/pg_constraint.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_language.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
+#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
+#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/functions.h"
#include "funcapi.h"
@@ -49,6 +54,7 @@
#include "parser/parse_func.h"
#include "parser/parse_oper.h"
#include "parser/parsetree.h"
+#include "partitioning/partdesc.h"
#include "rewrite/rewriteHandler.h"
#include "rewrite/rewriteManip.h"
#include "tcop/tcopprot.h"
@@ -61,6 +67,7 @@
#include "utils/jsonpath.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/partcache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
@@ -99,6 +106,9 @@ typedef struct
char max_hazard; /* worst proparallel hazard found so far */
char max_interesting; /* worst proparallel hazard of interest */
List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */
+ bool check_all; /* whether collect all the unsafe/restricted objects */
+ List *objects; /* parallel unsafe/restricted objects */
+ PartitionDirectory partition_directory; /* partition descriptors */
} max_parallel_hazard_context;
static bool contain_agg_clause_walker(Node *node, void *context);
@@ -109,6 +119,24 @@ static bool contain_volatile_functions_walker(Node *node, void *context);
static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context);
static bool max_parallel_hazard_walker(Node *node,
max_parallel_hazard_context *context);
+static bool target_rel_parallel_hazard_recurse(Relation relation,
+ max_parallel_hazard_context *context,
+ bool is_partition);
+static bool target_rel_trigger_parallel_hazard(Relation rel,
+ max_parallel_hazard_context *context);
+static bool index_expr_parallel_hazard(Relation index_rel,
+ List *ii_Expressions,
+ List *ii_Predicate,
+ max_parallel_hazard_context *context);
+static bool target_rel_index_parallel_hazard(Relation rel,
+ max_parallel_hazard_context *context);
+static bool target_rel_domain_parallel_hazard(Oid typid,
+ max_parallel_hazard_context *context);
+static bool target_rel_partitions_parallel_hazard(Relation rel,
+ max_parallel_hazard_context *context,
+ bool is_partition);
+static bool target_rel_chk_constr_parallel_hazard(Relation rel,
+ max_parallel_hazard_context *context);
static bool contain_nonstrict_functions_walker(Node *node, void *context);
static bool contain_exec_param_walker(Node *node, List *param_ids);
static bool contain_context_dependent_node(Node *clause);
@@ -173,6 +201,7 @@ static Node *substitute_actual_parameters_in_from_mutator(Node *node,
substitute_actual_parameters_in_from_context *context);
static bool pull_paramids_walker(Node *node, Bitmapset **context);
static bool max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context);
+static safety_object *make_safety_object(Oid objid, Oid classid, char proparallel);
/*****************************************************************************
* Aggregate-function clause manipulation
@@ -753,6 +782,9 @@ max_parallel_hazard(Query *parse)
context.max_hazard = PROPARALLEL_SAFE;
context.max_interesting = PROPARALLEL_UNSAFE;
context.safe_param_ids = NIL;
+ context.check_all = false;
+ context.objects = NIL;
+ context.partition_directory = NULL;
/* try to determine the worst hazard for the parsed query */
found = max_parallel_hazard_walker((Node *) parse, &context);
@@ -816,6 +848,9 @@ is_parallel_safe(PlannerInfo *root, Node *node)
context.max_hazard = PROPARALLEL_SAFE;
context.max_interesting = PROPARALLEL_RESTRICTED;
context.safe_param_ids = NIL;
+ context.check_all = false;
+ context.objects = NIL;
+ context.partition_directory = NULL;
/*
* The params that refer to the same or parent query level are considered
@@ -847,7 +882,7 @@ max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context)
break;
case PROPARALLEL_RESTRICTED:
/* increase max_hazard to RESTRICTED */
- Assert(context->max_hazard != PROPARALLEL_UNSAFE);
+ Assert(context->check_all || context->max_hazard != PROPARALLEL_UNSAFE);
context->max_hazard = proparallel;
/* done if we are not expecting any unsafe functions */
if (context->max_interesting == proparallel)
@@ -864,6 +899,82 @@ max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context)
return false;
}
+/*
+ * make_safety_object
+ *
+ * Creates an safety_object given object id, class id and parallel safety.
+ */
+static safety_object *
+make_safety_object(Oid objid, Oid classid, char proparallel)
+{
+ safety_object *object = (safety_object *) palloc(sizeof(safety_object));
+
+ object->objid = objid;
+ object->classid = classid;
+ object->proparallel = proparallel;
+
+ return object;
+}
+
+/* check_functions_in_node callback */
+static bool
+parallel_hazard_checker(Oid func_id, void *context)
+{
+ char proparallel;
+ max_parallel_hazard_context *cont = (max_parallel_hazard_context *) context;
+
+ proparallel = func_parallel(func_id);
+
+ if (max_parallel_hazard_test(proparallel, cont) && !cont->check_all)
+ return true;
+ else if (proparallel != PROPARALLEL_SAFE)
+ {
+ safety_object *object = make_safety_object(func_id,
+ ProcedureRelationId,
+ proparallel);
+ cont->objects = lappend(cont->objects, object);
+ }
+
+ return false;
+}
+
+/*
+ * parallel_hazard_walker
+ *
+ * Recursively search an expression tree which is defined as partition key or
+ * index or constraint or column default expression for PARALLEL
+ * UNSAFE/RESTRICTED table-related objects.
+ *
+ * If context->find_all is true, then detect all PARALLEL UNSAFE/RESTRICTED
+ * table-related objects.
+ *
+ * If context->find_all is false, then find the worst parallel-hazard level.
+ */
+static bool
+parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
+{
+ if (node == NULL)
+ return false;
+
+ /* Check for hazardous functions in node itself */
+ if (check_functions_in_node(node, parallel_hazard_checker,
+ context))
+ return true;
+
+ if (IsA(node, CoerceToDomain))
+ {
+ CoerceToDomain *domain = (CoerceToDomain *) node;
+
+ if (target_rel_domain_parallel_hazard(domain->resulttype, context))
+ return true;
+ }
+
+ /* Recurse to check arguments */
+ return expression_tree_walker(node,
+ parallel_hazard_walker,
+ context);
+}
+
/* check_functions_in_node callback */
static bool
max_parallel_hazard_checker(Oid func_id, void *context)
@@ -1019,6 +1130,547 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
context);
}
+/*
+ * target_rel_parallel_hazard
+ *
+ * If context->find_all is true, then detect all PARALLEL UNSAFE/RESTRICTED
+ * table-related objects.
+ *
+ * If context->find_all is false, then find the worst parallel-hazard level.
+ */
+List*
+target_rel_parallel_hazard(Oid relOid, bool check_all,
+ char max_interesting, char *max_hazard)
+{
+ max_parallel_hazard_context context;
+ Relation targetRel;
+
+ context.check_all = check_all;
+ context.objects = NIL;
+ context.max_hazard = PROPARALLEL_SAFE;
+ context.max_interesting = max_interesting;
+ context.safe_param_ids = NIL;
+ context.partition_directory = NULL;
+
+ targetRel = table_open(relOid, AccessShareLock);
+
+ (void) target_rel_parallel_hazard_recurse(targetRel, &context, false);
+ if (context.partition_directory)
+ DestroyPartitionDirectory(context.partition_directory);
+
+ table_close(targetRel, AccessShareLock);
+
+ *max_hazard = context.max_hazard;
+
+ return context.objects;
+}
+
+/*
+ * target_rel_parallel_hazard_recurse
+ *
+ * Recursively search all table-related objects for PARALLEL UNSAFE/RESTRICTED
+ * objects.
+ *
+ * If context->find_all is true, then detect all PARALLEL UNSAFE/RESTRICTED
+ * table-related objects.
+ *
+ * If context->find_all is false, then find the worst parallel-hazard level.
+ */
+static bool
+target_rel_parallel_hazard_recurse(Relation rel,
+ max_parallel_hazard_context *context,
+ bool is_partition)
+{
+ TupleDesc tupdesc;
+ int attnum;
+
+ /*
+ * We can't support table modification in a parallel worker if it's a
+ * foreign table/partition (no FDW API for supporting parallel access) or
+ * a temporary table.
+ */
+ if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
+ RelationUsesLocalBuffers(rel))
+ {
+ if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context) &&
+ !context->check_all)
+ return true;
+ else
+ {
+ safety_object *object = make_safety_object(rel->rd_rel->oid,
+ RelationRelationId,
+ PROPARALLEL_RESTRICTED);
+ context->objects = lappend(context->objects, object);
+ }
+ }
+
+ /*
+ * If a partitioned table, check that each partition is safe for
+ * modification in parallel-mode.
+ */
+ if (target_rel_partitions_parallel_hazard(rel, context, is_partition))
+ return true;
+
+ /*
+ * If there are any index expressions or index predicate, check that they
+ * are parallel-mode safe.
+ */
+ if (target_rel_index_parallel_hazard(rel, context))
+ return true;
+
+ /*
+ * If any triggers exist, check that they are parallel-safe.
+ */
+ if (target_rel_trigger_parallel_hazard(rel, context))
+ return true;
+
+ /*
+ * Column default expressions are only applicable to INSERT and UPDATE.
+ * Note that even though column defaults may be specified separately for
+ * each partition in a partitioned table, a partition's default value is
+ * not applied when inserting a tuple through a partitioned table.
+ */
+
+ tupdesc = RelationGetDescr(rel);
+ for (attnum = 0; attnum < tupdesc->natts; attnum++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupdesc, attnum);
+
+ /* We don't need info for dropped or generated attributes */
+ if (att->attisdropped || att->attgenerated)
+ continue;
+
+ if (att->atthasdef && !is_partition)
+ {
+ Node *defaultexpr;
+
+ defaultexpr = build_column_default(rel, attnum + 1);
+ if (parallel_hazard_walker((Node *) defaultexpr, context))
+ return true;
+ }
+
+ /*
+ * If the column is of a DOMAIN type, determine whether that
+ * domain has any CHECK expressions that are not parallel-mode
+ * safe.
+ */
+ if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN)
+ {
+ if (target_rel_domain_parallel_hazard(att->atttypid, context))
+ return true;
+ }
+ }
+
+ /*
+ * CHECK constraints are only applicable to INSERT and UPDATE. If any
+ * CHECK constraints exist, determine if they are parallel-safe.
+ */
+ if (target_rel_chk_constr_parallel_hazard(rel, context))
+ return true;
+
+ return false;
+}
+
+/*
+ * target_rel_trigger_parallel_hazard
+ *
+ * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED
+ * objects for the specified relation's trigger data.
+ *
+ * If context->find_all is false, then find the worst parallel-hazard level.
+ */
+static bool
+target_rel_trigger_parallel_hazard(Relation rel,
+ max_parallel_hazard_context *context)
+{
+ int i;
+ char proparallel;
+
+ if (rel->trigdesc == NULL)
+ return false;
+
+ /*
+ * Care is needed here to avoid using the same relcache TriggerDesc field
+ * across other cache accesses, because relcache doesn't guarantee that it
+ * won't move.
+ */
+ for (i = 0; i < rel->trigdesc->numtriggers; i++)
+ {
+ Oid tgfoid = rel->trigdesc->triggers[i].tgfoid;
+ Oid tgoid = rel->trigdesc->triggers[i].tgoid;
+
+ proparallel = func_parallel(tgfoid);
+
+ if (max_parallel_hazard_test(proparallel, context) &&
+ !context->check_all)
+ return true;
+ else if (proparallel != PROPARALLEL_SAFE)
+ {
+ safety_object *object,
+ *parent_object;
+
+ object = make_safety_object(tgfoid, ProcedureRelationId,
+ proparallel);
+ parent_object = make_safety_object(tgoid, TriggerRelationId,
+ proparallel);
+
+ context->objects = lappend(context->objects, object);
+ context->objects = lappend(context->objects, parent_object);
+ }
+ }
+
+ return false;
+}
+
+/*
+ * index_expr_parallel_hazard
+ *
+ * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED
+ * objects for the input index expression and index predicate.
+ *
+ * If context->find_all is false, then find the worst parallel-hazard level.
+ */
+static bool
+index_expr_parallel_hazard(Relation index_rel,
+ List *ii_Expressions,
+ List *ii_Predicate,
+ max_parallel_hazard_context *context)
+{
+ int i;
+ Form_pg_index indexStruct;
+ ListCell *index_expr_item;
+
+ indexStruct = index_rel->rd_index;
+ index_expr_item = list_head(ii_Expressions);
+
+ /* Check parallel-safety of index expression */
+ for (i = 0; i < indexStruct->indnatts; i++)
+ {
+ int keycol = indexStruct->indkey.values[i];
+
+ if (keycol == 0)
+ {
+ /* Found an index expression */
+ Node *index_expr;
+
+ Assert(index_expr_item != NULL);
+ if (index_expr_item == NULL) /* shouldn't happen */
+ elog(ERROR, "too few entries in indexprs list");
+
+ index_expr = (Node *) lfirst(index_expr_item);
+
+ if (parallel_hazard_walker(index_expr, context))
+ return true;
+
+ index_expr_item = lnext(ii_Expressions, index_expr_item);
+ }
+ }
+
+ /* Check parallel-safety of index predicate */
+ if (parallel_hazard_walker((Node *) ii_Predicate, context))
+ return true;
+
+ return false;
+}
+
+/*
+ * target_rel_index_parallel_hazard
+ *
+ * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED
+ * objects for any existing index expressions or index predicate of a specified
+ * relation.
+ *
+ * If context->find_all is false, then find the worst parallel-hazard level.
+ */
+static bool
+target_rel_index_parallel_hazard(Relation rel,
+ max_parallel_hazard_context *context)
+{
+ List *index_oid_list;
+ ListCell *lc;
+ LOCKMODE lockmode = AccessShareLock;
+ bool max_hazard_found;
+
+ index_oid_list = RelationGetIndexList(rel);
+ foreach(lc, index_oid_list)
+ {
+ Relation index_rel;
+ List *ii_Expressions;
+ List *ii_Predicate;
+ List *temp_objects;
+ char temp_hazard;
+ Oid index_oid = lfirst_oid(lc);
+
+ temp_objects = context->objects;
+ context->objects = NIL;
+ temp_hazard = context->max_hazard;
+ context->max_hazard = PROPARALLEL_SAFE;
+
+ index_rel = index_open(index_oid, lockmode);
+
+ /* Check index expression */
+ ii_Expressions = RelationGetIndexExpressions(index_rel);
+ ii_Predicate = RelationGetIndexPredicate(index_rel);
+
+ max_hazard_found = index_expr_parallel_hazard(index_rel,
+ ii_Expressions,
+ ii_Predicate,
+ context);
+
+ index_close(index_rel, lockmode);
+
+ if (max_hazard_found)
+ return true;
+
+ /* Add the index itself to the objects list */
+ else if (context->objects != NIL)
+ {
+ safety_object *object;
+
+ object = make_safety_object(index_oid, IndexRelationId,
+ context->max_hazard);
+ context->objects = lappend(context->objects, object);
+ }
+
+ (void) max_parallel_hazard_test(temp_hazard, context);
+
+ context->objects = list_concat(context->objects, temp_objects);
+ list_free(temp_objects);
+ }
+
+ list_free(index_oid_list);
+
+ return false;
+}
+
+/*
+ * target_rel_domain_parallel_hazard
+ *
+ * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED
+ * objects for the specified DOMAIN type. Only any CHECK expressions are
+ * examined for parallel-safety.
+ *
+ * If context->find_all is false, then find the worst parallel-hazard level.
+ */
+static bool
+target_rel_domain_parallel_hazard(Oid typid,
+ max_parallel_hazard_context *context)
+{
+ ListCell *lc;
+ List *domain_list;
+ List *temp_objects;
+ char temp_hazard;
+
+ domain_list = GetDomainConstraints(typid);
+
+ foreach(lc, domain_list)
+ {
+ DomainConstraintState *r = (DomainConstraintState *) lfirst(lc);
+
+ temp_objects = context->objects;
+ context->objects = NIL;
+ temp_hazard = context->max_hazard;
+ context->max_hazard = PROPARALLEL_SAFE;
+
+ if (parallel_hazard_walker((Node *) r->check_expr, context))
+ return true;
+
+ /* Add the constraint itself to the objects list */
+ else if (context->objects != NIL)
+ {
+ safety_object *object;
+ Oid constr_oid = get_domain_constraint_oid(typid,
+ r->name,
+ false);
+
+ object = make_safety_object(constr_oid,
+ ConstraintRelationId,
+ context->max_hazard);
+ context->objects = lappend(context->objects, object);
+ }
+
+ (void) max_parallel_hazard_test(temp_hazard, context);
+
+ context->objects = list_concat(context->objects, temp_objects);
+ list_free(temp_objects);
+ }
+
+ return false;
+
+}
+
+/*
+ * target_rel_partitions_parallel_hazard
+ *
+ * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED
+ * objects for any partitions of a specified relation.
+ *
+ * If context->find_all is false, then find the worst parallel-hazard level.
+ */
+static bool
+target_rel_partitions_parallel_hazard(Relation rel,
+ max_parallel_hazard_context *context,
+ bool is_partition)
+{
+ int i;
+ PartitionDesc pdesc;
+ PartitionKey pkey;
+ ListCell *partexprs_item;
+ int partnatts;
+ List *partexprs,
+ *qual;
+
+ /*
+ * The partition check expression is composed of its parent table's
+ * partition key expression, we do not need to check it again for a
+ * partition because we already checked the parallel safety of its parent
+ * table's partition key expression.
+ */
+ if (!is_partition)
+ {
+ qual = RelationGetPartitionQual(rel);
+ if (parallel_hazard_walker((Node *) qual, context))
+ return true;
+ }
+
+ if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ return false;
+
+ pkey = RelationGetPartitionKey(rel);
+
+ partnatts = get_partition_natts(pkey);
+ partexprs = get_partition_exprs(pkey);
+
+ partexprs_item = list_head(partexprs);
+ for (i = 0; i < partnatts; i++)
+ {
+ Oid funcOid = pkey->partsupfunc[i].fn_oid;
+
+ if (OidIsValid(funcOid))
+ {
+ char proparallel = func_parallel(funcOid);
+
+ if (max_parallel_hazard_test(proparallel, context) &&
+ !context->check_all)
+ return true;
+
+ else if (proparallel != PROPARALLEL_SAFE)
+ {
+ safety_object *object;
+
+ object = make_safety_object(funcOid, ProcedureRelationId,
+ proparallel);
+ context->objects = lappend(context->objects, object);
+ }
+ }
+
+ /* Check parallel-safety of any expressions in the partition key */
+ if (get_partition_col_attnum(pkey, i) == 0)
+ {
+ Node *check_expr = (Node *) lfirst(partexprs_item);
+
+ if (parallel_hazard_walker(check_expr, context))
+ return true;
+
+ partexprs_item = lnext(partexprs, partexprs_item);
+ }
+ }
+
+ /* Recursively check each partition ... */
+
+ /* Create the PartitionDirectory infrastructure if we didn't already */
+ if (context->partition_directory == NULL)
+ context->partition_directory =
+ CreatePartitionDirectory(CurrentMemoryContext, false);
+
+ pdesc = PartitionDirectoryLookup(context->partition_directory, rel);
+
+ for (i = 0; i < pdesc->nparts; i++)
+ {
+ Relation part_rel;
+ bool max_hazard_found;
+
+ part_rel = table_open(pdesc->oids[i], AccessShareLock);
+ max_hazard_found = target_rel_parallel_hazard_recurse(part_rel,
+ context,
+ true);
+ table_close(part_rel, AccessShareLock);
+
+ if (max_hazard_found)
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * target_rel_chk_constr_parallel_hazard
+ *
+ * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED
+ * objects for any CHECK expressions or CHECK constraints related to the
+ * specified relation.
+ *
+ * If context->find_all is false, then find the worst parallel-hazard level.
+ */
+static bool
+target_rel_chk_constr_parallel_hazard(Relation rel,
+ max_parallel_hazard_context *context)
+{
+ char temp_hazard;
+ int i;
+ TupleDesc tupdesc;
+ List *temp_objects;
+ ConstrCheck *check;
+
+ tupdesc = RelationGetDescr(rel);
+
+ if (tupdesc->constr == NULL)
+ return false;
+
+ check = tupdesc->constr->check;
+
+ /*
+ * Determine if there are any CHECK constraints which are not
+ * parallel-safe.
+ */
+ for (i = 0; i < tupdesc->constr->num_check; i++)
+ {
+ Expr *check_expr = stringToNode(check[i].ccbin);
+
+ temp_objects = context->objects;
+ context->objects = NIL;
+ temp_hazard = context->max_hazard;
+ context->max_hazard = PROPARALLEL_SAFE;
+
+ if (parallel_hazard_walker((Node *) check_expr, context))
+ return true;
+
+ /* Add the constraint itself to the objects list */
+ if (context->objects != NIL)
+ {
+ Oid constr_oid;
+ safety_object *object;
+
+ constr_oid = get_relation_constraint_oid(rel->rd_rel->oid,
+ check->ccname,
+ true);
+
+ object = make_safety_object(constr_oid,
+ ConstraintRelationId,
+ context->max_hazard);
+
+ context->objects = lappend(context->objects, object);
+ }
+
+ (void) max_parallel_hazard_test(temp_hazard, context);
+
+ context->objects = list_concat(context->objects, temp_objects);
+ list_free(temp_objects);
+ }
+
+ return false;
+}
+
/*
* is_parallel_allowed_for_modify
*
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index c033e68ba15..1924dffd97e 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -24,6 +24,8 @@
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/table.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_proc.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_type.h"
#include "catalog/system_fk_info.h"
@@ -32,6 +34,7 @@
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/miscnodes.h"
+#include "optimizer/clauses.h"
#include "parser/parse_type.h"
#include "parser/scansup.h"
#include "pgstat.h"
@@ -47,6 +50,7 @@
#include "utils/syscache.h"
#include "utils/timestamp.h"
#include "utils/tuplestore.h"
+#include "utils/varlena.h"
#include "utils/wait_event.h"
@@ -609,6 +613,93 @@ pg_collation_for(PG_FUNCTION_ARGS)
PG_RETURN_TEXT_P(cstring_to_text(generate_collation_name(collid)));
}
+/*
+ * Find the worst parallel-hazard level in the given relation
+ *
+ * Returns the worst parallel hazard level (the earliest in this list:
+ * PROPARALLEL_UNSAFE, PROPARALLEL_RESTRICTED, PROPARALLEL_SAFE) that can
+ * be found in the given relation.
+ */
+Datum
+pg_get_table_max_parallel_dml_hazard(PG_FUNCTION_ARGS)
+{
+ char max_parallel_hazard;
+ Oid relOid = PG_GETARG_OID(0);
+
+ (void) target_rel_parallel_hazard(relOid, false,
+ PROPARALLEL_UNSAFE,
+ &max_parallel_hazard);
+
+ PG_RETURN_CHAR(max_parallel_hazard);
+}
+
+/*
+ * Determine whether the target relation is safe to execute parallel modification.
+ *
+ * Return all the PARALLEL RESTRICTED/UNSAFE objects.
+ */
+Datum
+pg_get_table_parallel_dml_safety(PG_FUNCTION_ARGS)
+{
+#define PG_GET_PARALLEL_SAFETY_COLS 3
+ List *objects;
+ ListCell *object;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ ReturnSetInfo *rsinfo;
+ char max_parallel_hazard;
+ Oid relOid = PG_GETARG_OID(0);
+
+ rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ objects = target_rel_parallel_hazard(relOid, true,
+ PROPARALLEL_UNSAFE,
+ &max_parallel_hazard);
+ foreach(object, objects)
+ {
+ Datum values[PG_GET_PARALLEL_SAFETY_COLS];
+ bool nulls[PG_GET_PARALLEL_SAFETY_COLS];
+ safety_object *sobject = (safety_object *) lfirst(object);
+
+ memset(nulls, 0, sizeof(nulls));
+
+ values[0] = sobject->objid;
+ values[1] = sobject->classid;
+ values[2] = sobject->proparallel;
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ return (Datum) 0;
+}
+
/*
* pg_relation_is_updatable - determine which update events the specified
diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c
index cebe7a916fb..4522ee2ea3e 100644
--- a/src/backend/utils/cache/typcache.c
+++ b/src/backend/utils/cache/typcache.c
@@ -2755,6 +2755,23 @@ compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2)
return 0;
}
+/*
+ * GetDomainConstraints --- get DomainConstraintState list of specified domain type
+ */
+List *
+GetDomainConstraints(Oid type_id)
+{
+ TypeCacheEntry *typentry;
+ List *constraints = NIL;
+
+ typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO);
+
+ if(typentry->domainData != NULL)
+ constraints = typentry->domainData->constraints;
+
+ return constraints;
+}
+
/*
* Load (or re-load) the enumData member of the typcache entry.
*/
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fa9ae79082b..c2e07ecb74e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -4100,6 +4100,20 @@
provolatile => 's', prorettype => 'regclass', proargtypes => 'regclass',
prosrc => 'pg_get_replica_identity_index' },
+{ oid => '6122',
+ descr => 'parallel unsafe/restricted objects in the target relation',
+ proname => 'pg_get_table_parallel_dml_safety', prorows => '100',
+ proretset => 't', provolatile => 'v', proparallel => 'u',
+ prorettype => 'record', proargtypes => 'regclass',
+ proallargtypes => '{regclass,oid,oid,char}',
+ proargmodes => '{i,o,o,o}',
+ proargnames => '{table_name, objid, classid, proparallel}',
+ prosrc => 'pg_get_table_parallel_dml_safety' },
+
+{ oid => '6123', descr => 'worst parallel-hazard level in the given relation for DML',
+ proname => 'pg_get_table_max_parallel_dml_hazard', prorettype => 'char', proargtypes => 'regclass',
+ prosrc => 'pg_get_table_max_parallel_dml_hazard', provolatile => 'v', proparallel => 'u' },
+
# Deferrable unique constraint trigger
{ oid => '1250', descr => 'deferred UNIQUE constraint check',
proname => 'unique_key_recheck', provolatile => 'v', prorettype => 'trigger',
@@ -4107,11 +4121,11 @@
# Generic referential integrity constraint triggers
{ oid => '1644', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
- proname => 'RI_FKey_check_ins', provolatile => 'v', prorettype => 'trigger',
- proargtypes => '', prosrc => 'RI_FKey_check_ins' },
+ proname => 'RI_FKey_check_ins', provolatile => 'v', proparallel => 'r',
+ prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_ins' },
{ oid => '1645', descr => 'referential integrity FOREIGN KEY ... REFERENCES',
- proname => 'RI_FKey_check_upd', provolatile => 'v', prorettype => 'trigger',
- proargtypes => '', prosrc => 'RI_FKey_check_upd' },
+ proname => 'RI_FKey_check_upd', provolatile => 'v', proparallel => 'r',
+ prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_upd' },
{ oid => '1646', descr => 'referential integrity ON DELETE CASCADE',
proname => 'RI_FKey_cascade_del', provolatile => 'v', prorettype => 'trigger',
proargtypes => '', prosrc => 'RI_FKey_cascade_del' },
diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h
index e96fd4bac7e..fd8883ddb6e 100644
--- a/src/include/optimizer/clauses.h
+++ b/src/include/optimizer/clauses.h
@@ -23,6 +23,13 @@ typedef struct
List **windowFuncs; /* lists of WindowFuncs for each winref */
} WindowFuncLists;
+typedef struct safety_object
+{
+ Oid objid;
+ Oid classid;
+ char proparallel;
+} safety_object;
+
extern bool contain_agg_clause(Node *clause);
extern bool contain_window_function(Node *clause);
@@ -57,5 +64,8 @@ extern Query *inline_function_in_from(PlannerInfo *root,
extern Bitmapset *pull_paramids(Expr *expr);
extern bool is_parallel_allowed_for_modify(Query *parse);
+extern List *target_rel_parallel_hazard(Oid relOid, bool check_all,
+ char max_interesting,
+ char *max_hazard);
#endif /* CLAUSES_H */
diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h
index 5a4aa9ec840..3891bbffcfb 100644
--- a/src/include/utils/typcache.h
+++ b/src/include/utils/typcache.h
@@ -201,6 +201,8 @@ extern uint64 assign_record_type_identifier(Oid type_id, int32 typmod);
extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2);
+extern List *GetDomainConstraints(Oid type_id);
+
extern size_t SharedRecordTypmodRegistryEstimate(void);
extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *,
--
2.53.0