From 01f66103601cd9dc15d426b1d2933475a7467647 Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Tue, 23 Dec 2025 22:21:47 +0900 Subject: [PATCH v36 5/8] Row pattern recognition patch (executor). --- src/backend/executor/nodeWindowAgg.c | 1850 +++++++++++++++++++++++++- src/backend/utils/adt/windowfuncs.c | 34 +- src/include/catalog/pg_proc.dat | 6 + src/include/nodes/execnodes.h | 63 + 4 files changed, 1942 insertions(+), 11 deletions(-) diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index d92d632e248..88a1f2bd46e 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -36,6 +36,7 @@ #include "access/htup_details.h" #include "catalog/objectaccess.h" #include "catalog/pg_aggregate.h" +#include "catalog/pg_collation_d.h" #include "catalog/pg_proc.h" #include "executor/executor.h" #include "executor/nodeWindowAgg.h" @@ -45,9 +46,11 @@ #include "optimizer/optimizer.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" +#include "regex/regex.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" +#include "utils/fmgroids.h" #include "utils/expandeddatum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -170,6 +173,33 @@ typedef struct WindowStatePerAggData bool restart; /* need to restart this agg in this cycle? */ } WindowStatePerAggData; +/* + * Set of StringInfo. Used in RPR. + */ +#define STRSET_FROZEN (1 << 0) /* string is frozen */ +#define STRSET_DISCARDED (1 << 1) /* string is scheduled to be discarded */ +#define STRSET_MATCHED (1 << 2) /* string is confirmed to be matched + * with pattern */ + +typedef struct StringSet +{ + StringInfo *str_set; + Size set_size; /* current array allocation size in number of + * items */ + int set_index; /* current used size */ + int *info; /* an array of information bit per StringInfo. + * see above */ +} StringSet; + +/* + * Structure used by check_rpr_navigation() and rpr_navigation_walker(). + */ +typedef struct NavigationInfo +{ + bool is_prev; /* true if PREV */ + int num_vars; /* number of var nodes */ +} NavigationInfo; + static void initialize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate); @@ -206,6 +236,9 @@ static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1, TupleTableSlot *slot2); +static int WinGetSlotInFrame(WindowObject winobj, TupleTableSlot *slot, + int relpos, int seektype, bool set_mark, + bool *isnull, bool *isout); static bool window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot); @@ -224,6 +257,55 @@ static uint8 get_notnull_info(WindowObject winobj, int64 pos, int argno); static void put_notnull_info(WindowObject winobj, int64 pos, int argno, bool isnull); +static void attno_map(Node *node); +static bool attno_map_walker(Node *node, void *context); +static int row_is_in_reduced_frame(WindowObject winobj, int64 pos); +static bool rpr_is_defined(WindowAggState *winstate); + +static void create_reduced_frame_map(WindowAggState *winstate); +static int get_reduced_frame_map(WindowAggState *winstate, int64 pos); +static void register_reduced_frame_map(WindowAggState *winstate, int64 pos, + int val); +static void clear_reduced_frame_map(WindowAggState *winstate); +static void update_reduced_frame(WindowObject winobj, int64 pos); + +static int64 evaluate_pattern(WindowObject winobj, int64 current_pos, + char *vname, StringInfo encoded_str, bool *result); + +static bool get_slots(WindowObject winobj, int64 current_pos); + +static int search_str_set(WindowAggState *winstate, + StringSet *input_str_set); +static StringSet *generate_patterns(WindowAggState *winstate, StringSet *input_str_set, + char *pattern, VariablePos *variable_pos, + char tail_pattern_initial); +static int add_pattern(WindowAggState *winstate, StringInfo old, int old_info, + StringSet *new_str_set, + char c, char *pattern, char tail_pattern_initial, + int resultlen); +static int freeze_pattern(WindowAggState *winstate, StringInfo old, int old_info, + StringSet *new_str_set, + char *pattern, int resultlen); +static char pattern_initial(WindowAggState *winstate, char *vname); +static int do_pattern_match(WindowAggState *winstate, char *pattern, char *encoded_str, int len); +static void pattern_regfree(WindowAggState *winstate); + +static StringSet *string_set_init(void); +static void string_set_add(StringSet *string_set, StringInfo str, int info); +static StringInfo string_set_get(StringSet *string_set, int index, int *flag); +static int string_set_get_size(StringSet *string_set); +static void string_set_discard(StringSet *string_set); +static VariablePos *variable_pos_init(void); +static void variable_pos_register(VariablePos *variable_pos, char initial, + int pos); +static bool variable_pos_compare(VariablePos *variable_pos, + char initial1, char initial2); +static int variable_pos_fetch(VariablePos *variable_pos, char initial, + int index); +static VariablePos *variable_pos_build(WindowAggState *winstate); + +static void check_rpr_navigation(Node *node, bool is_prev); +static bool rpr_navigation_walker(Node *node, void *context); /* * Not null info bit array consists of 2-bit items @@ -817,6 +899,7 @@ eval_windowaggregates(WindowAggState *winstate) * transition function, or * - we have an EXCLUSION clause, or * - if the new frame doesn't overlap the old one + * - if RPR is enabled * * Note that we don't strictly need to restart in the last case, but if * we're going to remove all rows from the aggregation anyway, a restart @@ -831,7 +914,8 @@ eval_windowaggregates(WindowAggState *winstate) (winstate->aggregatedbase != winstate->frameheadpos && !OidIsValid(peraggstate->invtransfn_oid)) || (winstate->frameOptions & FRAMEOPTION_EXCLUSION) || - winstate->aggregatedupto <= winstate->frameheadpos) + winstate->aggregatedupto <= winstate->frameheadpos || + rpr_is_defined(winstate)) { peraggstate->restart = true; numaggs_restart++; @@ -905,7 +989,22 @@ eval_windowaggregates(WindowAggState *winstate) * head, so that tuplestore can discard unnecessary rows. */ if (agg_winobj->markptr >= 0) - WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + { + int64 markpos = winstate->frameheadpos; + + if (rpr_is_defined(winstate)) + { + /* + * If RPR is used, it is possible PREV wants to look at the + * previous row. So the mark pos should be frameheadpos - 1 + * unless it is below 0. + */ + markpos -= 1; + if (markpos < 0) + markpos = 0; + } + WinSetMarkPosition(agg_winobj, markpos); + } /* * Now restart the aggregates that require it. @@ -960,6 +1059,14 @@ eval_windowaggregates(WindowAggState *winstate) { winstate->aggregatedupto = winstate->frameheadpos; ExecClearTuple(agg_row_slot); + + /* + * If RPR is defined, we do not use aggregatedupto_nonrestarted. To + * avoid assertion failure below, we reset aggregatedupto_nonrestarted + * to frameheadpos. + */ + if (rpr_is_defined(winstate)) + aggregatedupto_nonrestarted = winstate->frameheadpos; } /* @@ -973,6 +1080,12 @@ eval_windowaggregates(WindowAggState *winstate) { int ret; +#ifdef RPR_DEBUG + elog(DEBUG1, "===== loop in frame starts: aggregatedupto: " INT64_FORMAT " aggregatedbase: " INT64_FORMAT, + winstate->aggregatedupto, + winstate->aggregatedbase); +#endif + /* Fetch next row if we didn't already */ if (TupIsNull(agg_row_slot)) { @@ -989,9 +1102,53 @@ eval_windowaggregates(WindowAggState *winstate) agg_row_slot, false); if (ret < 0) break; + if (ret == 0) goto next_tuple; + if (rpr_is_defined(winstate)) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "reduced_frame_map: %d aggregatedupto: " INT64_FORMAT " aggregatedbase: " INT64_FORMAT, + get_reduced_frame_map(winstate, + winstate->aggregatedupto), + winstate->aggregatedupto, + winstate->aggregatedbase); +#endif + + /* + * If the row status at currentpos is already decided and current + * row status is not decided yet, it means we passed the last + * reduced frame. Time to break the loop. + */ + if (get_reduced_frame_map(winstate, winstate->currentpos) + != RF_NOT_DETERMINED && + get_reduced_frame_map(winstate, winstate->aggregatedupto) + == RF_NOT_DETERMINED) + break; + + /* + * Otherwise we need to calculate the reduced frame. + */ + ret = row_is_in_reduced_frame(winstate->agg_winobj, + winstate->aggregatedupto); + if (ret == -1) /* unmatched row */ + break; + + /* + * Check if current row needs to be skipped due to no match. + */ + if (get_reduced_frame_map(winstate, + winstate->aggregatedupto) == RF_SKIPPED && + winstate->aggregatedupto == winstate->aggregatedbase) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "skip current row for aggregation"); +#endif + break; + } + } + /* Set tuple context for evaluation of aggregate arguments */ winstate->tmpcontext->ecxt_outertuple = agg_row_slot; @@ -1020,6 +1177,7 @@ next_tuple: ExecClearTuple(agg_row_slot); } + /* The frame's end is not supposed to move backwards, ever */ Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); @@ -1243,6 +1401,7 @@ begin_partition(WindowAggState *winstate) winstate->framehead_valid = false; winstate->frametail_valid = false; winstate->grouptail_valid = false; + create_reduced_frame_map(winstate); winstate->spooled_rows = 0; winstate->currentpos = 0; winstate->frameheadpos = 0; @@ -2237,6 +2396,11 @@ ExecWindowAgg(PlanState *pstate) CHECK_FOR_INTERRUPTS(); +#ifdef RPR_DEBUG + elog(DEBUG1, "ExecWindowAgg called. pos: " INT64_FORMAT, + winstate->currentpos); +#endif + if (winstate->status == WINDOWAGG_DONE) return NULL; @@ -2345,6 +2509,17 @@ ExecWindowAgg(PlanState *pstate) /* don't evaluate the window functions when we're in pass-through mode */ if (winstate->status == WINDOWAGG_RUN) { + /* + * If RPR is defined and skip mode is next row, we need to clear + * existing reduced frame info so that we newly calculate the info + * starting from current row. + */ + if (rpr_is_defined(winstate)) + { + if (winstate->rpSkipTo == ST_NEXT_ROW) + clear_reduced_frame_map(winstate); + } + /* * Evaluate true window functions */ @@ -2511,6 +2686,9 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) TupleDesc scanDesc; ListCell *l; + TargetEntry *te; + Expr *expr; + /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -2609,6 +2787,16 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc, &TTSOpsMinimalTuple); + winstate->prev_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + + winstate->next_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + + winstate->null_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + winstate->null_slot = ExecStoreAllNullTuple(winstate->null_slot); + /* * create frame head and tail slots only if needed (must create slots in * exactly the same cases that update_frameheadpos and update_frametailpos @@ -2795,6 +2983,52 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) winstate->inRangeAsc = node->inRangeAsc; winstate->inRangeNullsFirst = node->inRangeNullsFirst; + /* Set up SKIP TO type */ + winstate->rpSkipTo = node->rpSkipTo; + /* Set up row pattern recognition PATTERN clause */ + winstate->patternVariableList = node->patternVariable; + winstate->patternRegexpList = node->patternRegexp; + + /* Set up row pattern recognition DEFINE clause */ + winstate->defineInitial = node->defineInitial; + winstate->defineVariableList = NIL; + winstate->defineClauseList = NIL; + if (node->defineClause != NIL) + { + /* + * Tweak arg var of PREV/NEXT so that it refers to scan/inner slot. + */ + foreach(l, node->defineClause) + { + char *name; + ExprState *exps; + + te = lfirst(l); + name = te->resname; + expr = te->expr; + +#ifdef RPR_DEBUG + elog(DEBUG1, "defineVariable name: %s", name); +#endif + winstate->defineVariableList = + lappend(winstate->defineVariableList, + makeString(pstrdup(name))); + attno_map((Node *) expr); + exps = ExecInitExpr(expr, (PlanState *) winstate); + winstate->defineClauseList = + lappend(winstate->defineClauseList, exps); + } + } + /* set up regular expression compiled result cache for RPR */ + winstate->patbuf = NULL; + memset(&winstate->preg, 0, sizeof(winstate->preg)); + + /* + * Build variable_pos + */ + if (winstate->defineInitial) + winstate->variable_pos = variable_pos_build(winstate); + winstate->all_first = true; winstate->partition_spooled = false; winstate->more_partitions = false; @@ -2803,6 +3037,111 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) return winstate; } +/* + * Rewrite varno of Var nodes that are the argument of PREV/NET so that they + * see scan tuple (PREV) or inner tuple (NEXT). Also we check the arguments + * of PREV/NEXT include at least 1 column reference. This is required by the + * SQL standard. + */ +static void +attno_map(Node *node) +{ + (void) expression_tree_walker(node, attno_map_walker, NULL); +} + +static bool +attno_map_walker(Node *node, void *context) +{ + FuncExpr *func; + int nargs; + bool is_prev; + + if (node == NULL) + return false; + + if (IsA(node, FuncExpr)) + { + func = (FuncExpr *) node; + + if (func->funcid == F_PREV || func->funcid == F_NEXT) + { + /* + * The SQL standard allows to have two more arguments form of + * PREV/NEXT. But currently we allow only 1 argument form. + */ + nargs = list_length(func->args); + if (list_length(func->args) != 1) + elog(ERROR, "PREV/NEXT must have 1 argument but function %d has %d args", + func->funcid, nargs); + + /* + * Check expr of PREV/NEXT aruguments and replace varno. + */ + is_prev = (func->funcid == F_PREV) ? true : false; + check_rpr_navigation(node, is_prev); + } + } + return expression_tree_walker(node, attno_map_walker, NULL); +} + +/* + * Rewrite varno of Var of RPR navigation operations (PREV/NEXT). + * If is_prev is true, we take care PREV, otherwise NEXT. + */ +static void +check_rpr_navigation(Node *node, bool is_prev) +{ + NavigationInfo context; + + context.is_prev = is_prev; + context.num_vars = 0; + (void) expression_tree_walker(node, rpr_navigation_walker, &context); + if (context.num_vars < 1) + ereport(ERROR, + errmsg("row pattern navigation operation's argument must include at least one column reference")); +} + +static bool +rpr_navigation_walker(Node *node, void *context) +{ + NavigationInfo *nav = (NavigationInfo *) context; + + if (node == NULL) + return false; + + switch (nodeTag(node)) + { + case T_Var: + { + Var *var = (Var *) node; + + nav->num_vars++; + + if (nav->is_prev) + { + /* + * Rewrite varno from OUTER_VAR to regular var no so that + * the var references scan tuple. + */ + var->varno = var->varnosyn; + } + else + var->varno = INNER_VAR; + } + break; + case T_Const: + case T_FuncExpr: + case T_OpExpr: + break; + + default: + ereport(ERROR, + errmsg("row pattern navigation operation's argument includes unsupported expression")); + } + return expression_tree_walker(node, rpr_navigation_walker, context); +} + + /* ----------------- * ExecEndWindowAgg * ----------------- @@ -2834,6 +3173,8 @@ ExecEndWindowAgg(WindowAggState *node) pfree(node->perfunc); pfree(node->peragg); + pattern_regfree(node); + outerPlan = outerPlanState(node); ExecEndNode(outerPlan); } @@ -2860,6 +3201,8 @@ ExecReScanWindowAgg(WindowAggState *node) ExecClearTuple(node->agg_row_slot); ExecClearTuple(node->temp_slot_1); ExecClearTuple(node->temp_slot_2); + ExecClearTuple(node->prev_slot); + ExecClearTuple(node->next_slot); if (node->framehead_slot) ExecClearTuple(node->framehead_slot); if (node->frametail_slot) @@ -3220,7 +3563,8 @@ window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot) return false; if (pos < winobj->markpos) - elog(ERROR, "cannot fetch row before WindowObject's mark position"); + elog(ERROR, "cannot fetch row: " INT64_FORMAT " before WindowObject's mark position: " INT64_FORMAT, + pos, winobj->markpos); oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); @@ -3922,8 +4266,6 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, WindowAggState *winstate; ExprContext *econtext; TupleTableSlot *slot; - int64 abs_pos; - int64 mark_pos; Assert(WindowObjectIsValid(winobj)); winstate = winobj->winstate; @@ -3934,6 +4276,48 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, return ignorenulls_getfuncarginframe(winobj, argno, relpos, seektype, set_mark, isnull, isout); + if (WinGetSlotInFrame(winobj, slot, + relpos, seektype, set_mark, + isnull, isout) == 0) + { + econtext->ecxt_outertuple = slot; + return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), + econtext, isnull); + } + + if (isout) + *isout = true; + *isnull = true; + return (Datum) 0; +} + +/* + * WinGetSlotInFrame + * slot: TupleTableSlot to store the result + * relpos: signed rowcount offset from the seek position + * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL + * set_mark: If the row is found/in frame and set_mark is true, the mark is + * moved to the row as a side-effect. + * isnull: output argument, receives isnull status of result + * isout: output argument, set to indicate whether target row position + * is out of frame (can pass NULL if caller doesn't care about this) + * + * Returns 0 if we successfullt got the slot. false if out of frame. + * (also isout is set) + */ +static int +WinGetSlotInFrame(WindowObject winobj, TupleTableSlot *slot, + int relpos, int seektype, bool set_mark, + bool *isnull, bool *isout) +{ + WindowAggState *winstate; + int64 abs_pos; + int64 mark_pos; + int num_reduced_frame; + + Assert(WindowObjectIsValid(winobj)); + winstate = winobj->winstate; + switch (seektype) { case WINDOW_SEEK_CURRENT: @@ -4000,11 +4384,25 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, winstate->frameOptions); break; } + num_reduced_frame = row_is_in_reduced_frame(winobj, + winstate->frameheadpos); + if (num_reduced_frame < 0) + goto out_of_frame; + else if (num_reduced_frame > 0) + if (relpos >= num_reduced_frame) + goto out_of_frame; break; case WINDOW_SEEK_TAIL: /* rejecting relpos > 0 is easy and simplifies code below */ if (relpos > 0) goto out_of_frame; + + /* + * RPR cares about frame head pos. Need to call + * update_frameheadpos + */ + update_frameheadpos(winstate); + update_frametailpos(winstate); abs_pos = winstate->frametailpos - 1 + relpos; @@ -4071,6 +4469,14 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, mark_pos = 0; /* keep compiler quiet */ break; } + + num_reduced_frame = row_is_in_reduced_frame(winobj, + winstate->frameheadpos + relpos); + if (num_reduced_frame < 0) + goto out_of_frame; + else if (num_reduced_frame > 0) + abs_pos = winstate->frameheadpos + relpos + + num_reduced_frame - 1; break; default: elog(ERROR, "unrecognized window seek type: %d", seektype); @@ -4089,15 +4495,13 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, *isout = false; if (set_mark) WinSetMarkPosition(winobj, mark_pos); - econtext->ecxt_outertuple = slot; - return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), - econtext, isnull); + return 0; out_of_frame: if (isout) *isout = true; *isnull = true; - return (Datum) 0; + return -1; } /* @@ -4128,3 +4532,1431 @@ WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull) return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), econtext, isnull); } + +/* + * rpr_is_defined + * return true if Row pattern recognition is defined. + */ +static +bool +rpr_is_defined(WindowAggState *winstate) +{ + return winstate->patternVariableList != NIL; +} + +/* + * ----------------- + * row_is_in_reduced_frame + * Determine whether a row is in the current row's reduced window frame + * according to row pattern matching + * + * The row must has been already determined that it is in a full window frame + * and fetched it into slot. + * + * Returns: + * = 0, RPR is not defined. + * >0, if the row is the first in the reduced frame. Return the number of rows + * in the reduced frame. + * -1, if the row is unmatched row + * -2, if the row is in the reduced frame but needed to be skipped because of + * AFTER MATCH SKIP PAST LAST ROW + * ----------------- + */ +static +int +row_is_in_reduced_frame(WindowObject winobj, int64 pos) +{ + WindowAggState *winstate = winobj->winstate; + int state; + int rtn; + + if (!rpr_is_defined(winstate)) + { + /* + * RPR is not defined. Assume that we are always in the the reduced + * window frame. + */ + rtn = 0; +#ifdef RPR_DEBUG + elog(DEBUG1, "row_is_in_reduced_frame returns %d: pos: " INT64_FORMAT, + rtn, pos); +#endif + return rtn; + } + + state = get_reduced_frame_map(winstate, pos); + + if (state == RF_NOT_DETERMINED) + { + update_frameheadpos(winstate); + update_reduced_frame(winobj, pos); + } + + state = get_reduced_frame_map(winstate, pos); + + switch (state) + { + int64 i; + int num_reduced_rows; + + case RF_FRAME_HEAD: + num_reduced_rows = 1; + for (i = pos + 1; + get_reduced_frame_map(winstate, i) == RF_SKIPPED; i++) + num_reduced_rows++; + rtn = num_reduced_rows; + break; + + case RF_SKIPPED: + rtn = -2; + break; + + case RF_UNMATCHED: + rtn = -1; + break; + + default: + elog(ERROR, "Unrecognized state: %d at: " INT64_FORMAT, + state, pos); + break; + } + +#ifdef RPR_DEBUG + elog(DEBUG1, "row_is_in_reduced_frame returns %d: pos: " INT64_FORMAT, + rtn, pos); +#endif + return rtn; +} + +#define REDUCED_FRAME_MAP_INIT_SIZE 1024L + +/* + * create_reduced_frame_map + * Create reduced frame map + */ +static +void +create_reduced_frame_map(WindowAggState *winstate) +{ + winstate->reduced_frame_map = + MemoryContextAlloc(winstate->partcontext, + REDUCED_FRAME_MAP_INIT_SIZE); + winstate->alloc_sz = REDUCED_FRAME_MAP_INIT_SIZE; + clear_reduced_frame_map(winstate); +} + +/* + * clear_reduced_frame_map + * Clear reduced frame map + */ +static +void +clear_reduced_frame_map(WindowAggState *winstate) +{ + Assert(winstate->reduced_frame_map != NULL); + MemSet(winstate->reduced_frame_map, RF_NOT_DETERMINED, + winstate->alloc_sz); +} + +/* + * get_reduced_frame_map + * Get reduced frame map specified by pos + */ +static +int +get_reduced_frame_map(WindowAggState *winstate, int64 pos) +{ + Assert(winstate->reduced_frame_map != NULL); + Assert(pos >= 0); + + /* + * If pos is not in the reduced frame map, it means that any info + * regarding the pos has not been registered yet. So we return + * RF_NOT_DETERMINED. + */ + if (pos >= winstate->alloc_sz) + return RF_NOT_DETERMINED; + + return winstate->reduced_frame_map[pos]; +} + +/* + * register_reduced_frame_map + * Add/replace reduced frame map member at pos. + * If there's no enough space, expand the map. + */ +static +void +register_reduced_frame_map(WindowAggState *winstate, int64 pos, int val) +{ + int64 realloc_sz; + + Assert(winstate->reduced_frame_map != NULL); + + if (pos < 0) + elog(ERROR, "wrong pos: " INT64_FORMAT, pos); + + if (pos > winstate->alloc_sz - 1) + { + realloc_sz = winstate->alloc_sz * 2; + + winstate->reduced_frame_map = + repalloc(winstate->reduced_frame_map, realloc_sz); + + MemSet(winstate->reduced_frame_map + winstate->alloc_sz, + RF_NOT_DETERMINED, realloc_sz - winstate->alloc_sz); + + winstate->alloc_sz = realloc_sz; + } + + winstate->reduced_frame_map[pos] = val; +} + +/* + * update_reduced_frame + * Update reduced frame info. + */ +static +void +update_reduced_frame(WindowObject winobj, int64 pos) +{ + WindowAggState *winstate = winobj->winstate; + ListCell *lc1, + *lc2; + bool expression_result; + int num_matched_rows; + int64 original_pos; + bool anymatch; + StringInfo encoded_str; + StringSet *str_set; + bool greedy = false; + int64 result_pos, + i; + int init_size; + + /* + * Set of pattern variables evaluated to true. Each character corresponds + * to pattern variable. Example: str_set[0] = "AB"; str_set[1] = "AC"; In + * this case at row 0 A and B are true, and A and C are true in row 1. + */ + + /* initialize pattern variables set */ + str_set = string_set_init(); + + /* save original pos */ + original_pos = pos; + + /* + * Calculate initial memory allocation size from the number of pattern + * variables, + */ + init_size = sizeof(char) * list_length(winstate->patternVariableList) + 1; + + /* + * Check if the pattern does not include any greedy quantifier. If it does + * not, we can just apply the pattern to each row. If it succeeds, we are + * done. + */ + foreach(lc1, winstate->patternRegexpList) + { + char *quantifier = strVal(lfirst(lc1)); + + if (*quantifier == '+' || *quantifier == '*' || *quantifier == '?') + { + greedy = true; + break; + } + } + + /* + * Non greedy case + */ + if (!greedy) + { + num_matched_rows = 0; + encoded_str = makeStringInfoExt(init_size); + + foreach(lc1, winstate->patternVariableList) + { + char *vname = strVal(lfirst(lc1)); +#ifdef RPR_DEBUG + elog(DEBUG1, "pos: " INT64_FORMAT " pattern vname: %s", + pos, vname); +#endif + expression_result = false; + + /* evaluate row pattern against current row */ + result_pos = evaluate_pattern(winobj, pos, vname, + encoded_str, &expression_result); + if (!expression_result || result_pos < 0) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "expression result is false or out of frame"); +#endif + register_reduced_frame_map(winstate, original_pos, + RF_UNMATCHED); + destroyStringInfo(encoded_str); + return; + } + /* move to next row */ + pos++; + num_matched_rows++; + } + destroyStringInfo(encoded_str); +#ifdef RPR_DEBUG + elog(DEBUG1, "pattern matched"); +#endif + register_reduced_frame_map(winstate, original_pos, RF_FRAME_HEAD); + + for (i = original_pos + 1; i < original_pos + num_matched_rows; i++) + { + register_reduced_frame_map(winstate, i, RF_SKIPPED); + } + return; + } + + /* + * Greedy quantifiers included. Loop over until none of pattern matches or + * encounters end of frame. + */ + for (;;) + { + result_pos = -1; + + /* + * Loop over each PATTERN variable. + */ + anymatch = false; + + encoded_str = makeStringInfoExt(init_size); + + forboth(lc1, winstate->patternVariableList, lc2, + winstate->patternRegexpList) + { + char *vname = strVal(lfirst(lc1)); +#ifdef RPR_DEBUG + char *quantifier = strVal(lfirst(lc2)); + + elog(DEBUG1, "pos: " INT64_FORMAT " pattern vname: %s quantifier: %s", + pos, vname, quantifier); +#endif + expression_result = false; + + /* evaluate row pattern against current row */ + result_pos = evaluate_pattern(winobj, pos, vname, + encoded_str, &expression_result); + if (expression_result) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "expression result is true"); +#endif + anymatch = true; + } + + /* + * If out of frame, we are done. + */ + if (result_pos < 0) + break; + } + + if (!anymatch) + { + /* none of patterns matched. */ + break; + } + + string_set_add(str_set, encoded_str, 0); + +#ifdef RPR_DEBUG + elog(DEBUG1, "pos: " INT64_FORMAT " encoded_str: %s", + encoded_str->data); +#endif + + /* move to next row */ + pos++; + + if (result_pos < 0) + { + /* out of frame */ + break; + } + } + + if (string_set_get_size(str_set) == 0) + { + /* no match found in the first row */ + register_reduced_frame_map(winstate, original_pos, RF_UNMATCHED); + destroyStringInfo(encoded_str); + return; + } + +#ifdef RPR_DEBUG + elog(DEBUG2, "pos: " INT64_FORMAT " encoded_str: %s", + pos, encoded_str->data); +#endif + + /* look for matching pattern variable sequence */ +#ifdef RPR_DEBUG + elog(DEBUG1, "search_str_set started"); +#endif + num_matched_rows = search_str_set(winstate, str_set); + +#ifdef RPR_DEBUG + elog(DEBUG1, "search_str_set returns: %d", num_matched_rows); +#endif + string_set_discard(str_set); + + /* + * We are at the first row in the reduced frame. Save the number of + * matched rows as the number of rows in the reduced frame. + */ + if (num_matched_rows <= 0) + { + /* no match */ + register_reduced_frame_map(winstate, original_pos, RF_UNMATCHED); + } + else + { + register_reduced_frame_map(winstate, original_pos, RF_FRAME_HEAD); + + for (i = original_pos + 1; i < original_pos + num_matched_rows; i++) + { + register_reduced_frame_map(winstate, i, RF_SKIPPED); + } + } + + return; +} + +/* + * search_str_set + * + * Perform pattern matching using "pattern" against input_str_set. pattern is + * a regular expression string derived from PATTERN clause. Note that the + * regular expression string is prefixed by '^' and followed by initials + * represented in a same way as str_set. str_set is a set of StringInfo. Each + * StringInfo has a string comprising initials of pattern variable strings + * being true in a row. The initials are one of [a-z], parallel to the order + * of variable names in DEFINE clause. Suppose DEFINE has variables START, UP + * and DOWN. If PATTERN has START, UP+ and DOWN, then the initials in PATTERN + * will be 'a', 'b' and 'c'. The "pattern" will be "^ab+c". + * + * variable_pos is an array representing the order of pattern variable string + * initials in PATTERN clause. For example initial 'a' potion is in + * variable_pos[0].pos[0] = 0. Note that if the pattern is "START UP DOWN UP" + * (UP appears twice), then "UP" (initial is 'b') has two position 1 and + * 3. Thus variable_pos for b is variable_pos[1].pos[0] = 1 and + * variable_pos[1].pos[1] = 3. + * + * Returns the longest number of the matching rows (greedy matching) if + * quatifier '+' or '*' is included in "pattern". + */ +static +int +search_str_set(WindowAggState *winstate, StringSet *input_str_set) +{ + char *pattern; /* search regexp pattern */ + VariablePos *variable_pos; + int set_size; /* number of rows in the set */ + int resultlen; + int index; + StringSet *new_str_set; + int new_str_size; + int len; + int info; + char tail_pattern_initial; + + /* + * Set last initial char to tail_pattern_initial if we can apply "tail + * pattern initial optimization". If the last regexp component in pattern + * is with '+' quatifier, set the initial to tail_pattern_initial. For + * example if pattern = "ab+", tail_pattern_initial will be 'b'. + * Otherwise, tail_pattern_initial is '\0'. + */ + pattern = winstate->pattern_str->data; + if (pattern[strlen(pattern) - 1] == '+') + tail_pattern_initial = pattern[strlen(pattern) - 2]; + else + tail_pattern_initial = '\0'; + + /* + * Generate all possible pattern variable name initials as a set of + * StringInfo named "new_str_set". For example, if we have two rows + * having "ab" (row 0) and "ac" (row 1) in the input str_set, new_str_set + * will have set of StringInfo "aa", "ac", "ba" and "bc" in the end. + */ + variable_pos = winstate->variable_pos; + new_str_set = generate_patterns(winstate, input_str_set, pattern, variable_pos, + tail_pattern_initial); + + /* + * Perform pattern matching to find out the longest match. + */ + new_str_size = string_set_get_size(new_str_set); + len = 0; + resultlen = 0; + set_size = string_set_get_size(input_str_set); + + for (index = 0; index < new_str_size; index++) + { + StringInfo s; + + s = string_set_get(new_str_set, index, &info); + if (s == NULL) + continue; /* no data */ + + /* + * If the string is scheduled to be discarded, we just disregard it. + */ + if (info & STRSET_DISCARDED) + continue; + + len = do_pattern_match(winstate, pattern, s->data, s->len); + if (len > resultlen) + { + /* remember the longest match */ + resultlen = len; + + /* + * If the size of result set is equal to the number of rows in the + * set, we are done because it's not possible that the number of + * matching rows exceeds the number of rows in the set. + */ + if (resultlen >= set_size) + break; + } + } + + /* we no longer need new string set */ + string_set_discard(new_str_set); + + return resultlen; +} + +/* + * generate_patterns + * + * Generate all possible pattern variable name initials in 'input_str_set' as + * a set of StringInfo and return it. For example, if we have two rows having + * "ab" (row 0) and "ac" (row 1) in 'input str_set', returned StringSet will + * have set of StringInfo "aa", "ac", "ba" and "bc" in the end. + * 'variable_pos' and 'tail_pattern_initial' are used for pruning + * optimization. + */ +static +StringSet * +generate_patterns(WindowAggState *winstate, StringSet *input_str_set, + char *pattern, VariablePos *variable_pos, + char tail_pattern_initial) +{ + StringSet *old_str_set, + *new_str_set; + int index; + int set_size; + int old_set_size; + int info; + int resultlen; + StringInfo str; + int i; + char *p; + + new_str_set = string_set_init(); + set_size = string_set_get_size(input_str_set); + if (set_size == 0) /* if there's no row in input, return empty + * set */ + return new_str_set; + + resultlen = 0; + + /* + * Generate initial new_string_set for input row 0. + */ + str = string_set_get(input_str_set, 0, &info); + p = str->data; + + /* + * Loop over each new pattern variable char. + */ + while (*p) + { + StringInfo new = makeStringInfo(); + + /* add pattern variable char */ + appendStringInfoChar(new, *p); + /* add new one to string set */ + string_set_add(new_str_set, new, 0); + p++; /* next pattern variable */ + } + + /* + * Generate new_string_set for each input row. + */ + for (index = 1; index < set_size; index++) + { + /* previous new str set now becomes old str set */ + old_str_set = new_str_set; + new_str_set = string_set_init(); /* create new string set */ + /* pick up input string */ + str = string_set_get(input_str_set, index, &info); + old_set_size = string_set_get_size(old_str_set); + + /* + * Loop over each row in the previous result set. + */ + for (i = 0; i < old_set_size; i++) + { + char last_old_char; + int old_str_len; + int old_info; + StringInfo old; + + old = string_set_get(old_str_set, i, &old_info); + p = old->data; + old_str_len = old->len; + if (old_str_len > 0) + last_old_char = p[old_str_len - 1]; + else + last_old_char = '\0'; + + /* Can this old set be discarded? */ + if (old_info & STRSET_DISCARDED) + continue; /* discard the old string */ + + /* Is this old set frozen? */ + else if (old_info & STRSET_FROZEN) + { + /* if shorter match. we can discard it */ + if (old_str_len < resultlen) + continue; /* discard the shorter string */ + + /* move the old set to new_str_set */ + string_set_add(new_str_set, old, old_info); + old_str_set->str_set[i] = NULL; + continue; + } + + /* + * loop over each pattern variable initial char in the input set. + */ + for (p = str->data; *p; p++) + { + /* + * Optimization. Check if the row's pattern variable initial + * character position is greater than or equal to the old + * set's last pattern variable initial character position. For + * example, if the old set's last pattern variable initials + * are "ab", then the new pattern variable initial can be "b" + * or "c" but can not be "a", if the initials in PATTERN is + * something like "a b c" or "a b+ c+" etc. This optimization + * is possible when we only allow "+" quantifier. + */ + if (variable_pos_compare(variable_pos, last_old_char, *p)) + + /* + * Satisfied the condition. Add new pattern char to + * new_str_set if it looks good. + */ + resultlen = add_pattern(winstate, old, old_info, new_str_set, *p, + pattern, tail_pattern_initial, resultlen); + else + + /* + * The old_str did not satisfy the condition and it cannot + * be extended further. "Freeze" it. + */ + resultlen = freeze_pattern(winstate, old, old_info, + new_str_set, pattern, resultlen); + } + } + /* we no longer need old string set */ + string_set_discard(old_str_set); + } + return new_str_set; +} + +/* + * add_pattern + * + * Make a copy of 'old' (along with 'old_info' flag) and add new pattern char + * 'c' to it. Then add it to 'new_str_set'. 'pattern' and + * 'tail_pattern_initial' is checked to determine whether the copy is worth to + * add to new_str_set or not. The match length (possibly longer than + * 'resultlen') is returned. + */ +static +int +add_pattern(WindowAggState *winstate, StringInfo old, int old_info, + StringSet *new_str_set, char c, char *pattern, char tail_pattern_initial, + int resultlen) +{ + StringInfo new; + int info; + int len; + + /* + * New char in the input row satisfies the condition above. + */ + new = makeStringInfoExt(old->len + 1); /* copy source string */ + appendStringInfoString(new, old->data); + + /* add pattern variable char */ + appendStringInfoChar(new, c); + + /* + * Adhoc optimization. If the first letter in the input string is in the + * head and second position and there's no associated quatifier '+', then + * we can dicard the input because there's no chance to expand the string + * further. + * + * For example, pattern "abc" cannot match "aa". + */ + if (pattern[1] == new->data[0] && + pattern[1] == new->data[1] && + pattern[2] != '+' && + pattern[1] != pattern[2]) + { + destroyStringInfo(new); + return resultlen; + } + + info = old_info; + + /* + * Check if we can apply "tail pattern initial optimization". If the last + * regexp component in pattern has '+' quantifier, the component is set to + * the last pattern initial. For example if pattern is "ab+", + * tail_pattern_initial will become 'b'. Otherwise, tail_pattern_initial + * is '\0'. If the tail pattern initial optimization is possible, we do + * not need to apply regular expression match again. Suppose we have the + * previous string ended with "b" and the it was confirmed the regular + * expression match, then char 'b' can be added to the string without + * applying the regular expression match again. + */ + if (c == tail_pattern_initial) /* tail pattern initial optimization + * possible? */ + { + /* + * Is already confirmed to be matched with pattern? + */ + if ((info & STRSET_MATCHED) == 0) + { + /* not confirmed yet */ + len = do_pattern_match(winstate, pattern, new->data, new->len); + if (len > 0) + info = STRSET_MATCHED; /* set already confirmed flag */ + } + else + + /* + * already confirmed. Use the string length as the matching length + */ + len = new->len; + + /* update the longest match length if needed */ + if (len > resultlen) + resultlen = len; + } + + /* add new StringInfo to the string set */ + string_set_add(new_str_set, new, info); + + return resultlen; +} + +/* + * freeze_pattern + * + * "Freeze" 'old' (along with 'old_info' flag) and add it to + * 'new_str_set'. Frozen string is known to not be expanded further. The frozen + * string is check if it satisfies 'pattern'. If it does not, "discarded" + * mark is added. The discarded mark is also added if the match length is + * shorter than the current longest match length. The match length (possibly + * longer than 'resultlen') is returned. + */ +static +int +freeze_pattern(WindowAggState *winstate, StringInfo old, int old_info, + StringSet *new_str_set, + char *pattern, int resultlen) +{ + int len; + StringInfo new; + int new_str_size; + int new_index; + + /* + * We are freezing this pattern string. If the pattern string length is + * shorter than the current longest string length, we don't need to keep + * it. + */ + if (old->len < resultlen) + return resultlen; + + if (old_info & STRSET_MATCHED) + /* we don't need to apply pattern match again */ + len = old->len; + else + { + /* apply pattern match */ + len = do_pattern_match(winstate, pattern, old->data, old->len); + if (len <= 0) + { + /* no match. we can discard it */ + return resultlen; + } + } + if (len < resultlen) + { + /* shorter match. we can discard it */ + return resultlen; + } + + /* + * Match length is the longest so far + */ + resultlen = len; /* remember the longest match */ + + /* freeze the pattern string */ + new = makeStringInfo(); + enlargeStringInfo(new, old->len + 1); + appendStringInfoString(new, old->data); + /* set frozen mark */ + string_set_add(new_str_set, new, STRSET_FROZEN); + + /* + * Search new_str_set to find out frozen entries that have shorter match + * length. Mark them as "discard" so that they are discarded in the next + * round. + */ + new_str_size = + string_set_get_size(new_str_set) - 1; + + /* loop over new_str_set */ + for (new_index = 0; new_index < new_str_size; new_index++) + { + int info; + + new = string_set_get(new_str_set, new_index, &info); + + /* + * If this is frozen and is not longer than the current longest match + * length, we don't need to keep this. + */ + if (info & STRSET_FROZEN && new->len < resultlen) + { + /* + * mark this set to discard in the next round + */ + info |= STRSET_DISCARDED; + new_str_set->info[new_index] = info; + } + } + return resultlen; +} + +/* + * do_pattern_match + * + * Perform pattern match using 'pattern' against 'encoded_str' whose length is + * 'len' bytes (without null terminate). Returns matching number of rows if + * matching is succeeded. Otherwise returns 0. + */ +static +int +do_pattern_match(WindowAggState *winstate, char *pattern, char *encoded_str, int len) +{ + int plen; + int cflags = REG_EXTENDED; + size_t nmatch = 1; + int eflags = 0; + regmatch_t pmatch[1]; + int sts; + pg_wchar *data; + int data_len; + + /* + * Compile regexp if cache does not exist or existing cache is not same as + * "pattern". + */ + if (winstate->patbuf == NULL || strcmp(winstate->patbuf, pattern)) + { + MemoryContext oldContext = MemoryContextSwitchTo + (winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); + + pattern_regfree(winstate); + + /* we need to convert to char to pg_wchar */ + plen = strlen(pattern); + data = (pg_wchar *) palloc((plen + 1) * sizeof(pg_wchar)); + data_len = pg_mb2wchar_with_len(pattern, data, plen); + /* compile re */ + sts = pg_regcomp(&winstate->preg, /* compiled re */ + data, /* target pattern */ + data_len, /* length of pattern */ + cflags, /* compile option */ + C_COLLATION_OID /* collation */ + ); + pfree(data); + + if (sts != REG_OKAY) + { + /* re didn't compile (no need for pg_regfree, if so) */ + ereport(ERROR, + (errcode(ERRCODE_INVALID_REGULAR_EXPRESSION), + errmsg("invalid regular expression: %s", pattern))); + } + + /* save pattern */ + winstate->patbuf = pstrdup(pattern); + MemoryContextSwitchTo(oldContext); + } + + data = (pg_wchar *) palloc((len + 1) * sizeof(pg_wchar)); + data_len = pg_mb2wchar_with_len(encoded_str, data, len); + + /* execute the regular expression match */ + sts = pg_regexec( + &winstate->preg, /* compiled re */ + data, /* target string */ + data_len, /* length of encoded_str */ + 0, /* search start */ + NULL, /* rm details */ + nmatch, /* number of match sub re */ + pmatch, /* match result details */ + eflags); + + pfree(data); + + if (sts != REG_OKAY) + { + if (sts != REG_NOMATCH) + { + char errMsg[100]; + + pg_regerror(sts, &winstate->preg, errMsg, sizeof(errMsg)); + ereport(ERROR, + (errcode(ERRCODE_INVALID_REGULAR_EXPRESSION), + errmsg("regular expression failed: %s", errMsg))); + } + return 0; /* does not match */ + } + + len = pmatch[0].rm_eo; /* return match length */ + return len; +} + +/* + * pattern_regfree + * + * Free the regcomp result for PARTTERN string. + */ +static +void +pattern_regfree(WindowAggState *winstate) +{ + if (winstate->patbuf != NULL) + { + pg_regfree(&winstate->preg); /* free previous re */ + pfree(winstate->patbuf); + winstate->patbuf = NULL; + } +} + +/* + * evaluate_pattern + * + * Evaluate expression associated with PATTERN variable vname. current_pos is + * relative row position in a frame (starting from 0). If vname is evaluated + * to true, initial letters associated with vname is appended to + * encode_str. result is out paramater representing the expression evaluation + * result is true of false. + *--------- + * Return values are: + * >=0: the last match absolute row position + * otherwise out of frame. + *--------- + */ +static +int64 +evaluate_pattern(WindowObject winobj, int64 current_pos, + char *vname, StringInfo encoded_str, bool *result) +{ + WindowAggState *winstate = winobj->winstate; + ExprContext *econtext = winstate->ss.ps.ps_ExprContext; + ListCell *lc1, + *lc2, + *lc3; + ExprState *pat; + Datum eval_result; + bool out_of_frame = false; + bool isnull; + TupleTableSlot *slot; + + forthree(lc1, winstate->defineVariableList, + lc2, winstate->defineClauseList, + lc3, winstate->defineInitial) + { + char initial; /* initial letter associated with vname */ + char *name = strVal(lfirst(lc1)); + + if (strcmp(vname, name)) + continue; + + initial = *(strVal(lfirst(lc3))); + + /* set expression to evaluate */ + pat = lfirst(lc2); + + /* get current, previous and next tuples */ + if (!get_slots(winobj, current_pos)) + { + out_of_frame = true; + } + else + { + /* evaluate the expression */ + eval_result = ExecEvalExpr(pat, econtext, &isnull); + if (isnull) + { + /* expression is NULL */ +#ifdef RPR_DEBUG + elog(DEBUG1, "expression for %s is NULL at row: " INT64_FORMAT, + vname, current_pos); +#endif + *result = false; + } + else + { + if (!DatumGetBool(eval_result)) + { + /* expression is false */ +#ifdef RPR_DEBUG + elog(DEBUG1, "expression for %s is false at row: " INT64_FORMAT, + vname, current_pos); +#endif + *result = false; + } + else + { + /* expression is true */ +#ifdef RPR_DEBUG + elog(DEBUG1, "expression for %s is true at row: " INT64_FORMAT, + vname, current_pos); +#endif + appendStringInfoChar(encoded_str, initial); + *result = true; + } + } + + slot = winstate->temp_slot_1; + if (slot != winstate->null_slot) + ExecClearTuple(slot); + slot = winstate->prev_slot; + if (slot != winstate->null_slot) + ExecClearTuple(slot); + slot = winstate->next_slot; + if (slot != winstate->null_slot) + ExecClearTuple(slot); + + break; + } + + if (out_of_frame) + { + *result = false; + return -1; + } + } + return current_pos; +} + +/* + * get_slots + * + * Get current, previous and next tuples. + * Returns false if current row is out of partition/full frame. + */ +static +bool +get_slots(WindowObject winobj, int64 current_pos) +{ + WindowAggState *winstate = winobj->winstate; + TupleTableSlot *slot; + int ret; + ExprContext *econtext; + + econtext = winstate->ss.ps.ps_ExprContext; + + /* set up current row tuple slot */ + slot = winstate->temp_slot_1; + if (!window_gettupleslot(winobj, current_pos, slot)) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "current row is out of partition at:" INT64_FORMAT, + current_pos); +#endif + return false; + } + ret = row_is_in_frame(winobj, current_pos, slot, false); + if (ret <= 0) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "current row is out of frame at: " INT64_FORMAT, + current_pos); +#endif + ExecClearTuple(slot); + return false; + } + econtext->ecxt_outertuple = slot; + + /* for PREV */ + if (current_pos > 0) + { + slot = winstate->prev_slot; + if (!window_gettupleslot(winobj, current_pos - 1, slot)) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "previous row is out of partition at: " INT64_FORMAT, + current_pos - 1); +#endif + econtext->ecxt_scantuple = winstate->null_slot; + } + else + { + ret = row_is_in_frame(winobj, current_pos - 1, slot, false); + if (ret <= 0) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "previous row is out of frame at: " INT64_FORMAT, + current_pos - 1); +#endif + ExecClearTuple(slot); + econtext->ecxt_scantuple = winstate->null_slot; + } + else + { + econtext->ecxt_scantuple = slot; + } + } + } + else + econtext->ecxt_scantuple = winstate->null_slot; + + /* for NEXT */ + slot = winstate->next_slot; + if (!window_gettupleslot(winobj, current_pos + 1, slot)) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "next row is out of partiton at: " INT64_FORMAT, + current_pos + 1); +#endif + econtext->ecxt_innertuple = winstate->null_slot; + } + else + { + ret = row_is_in_frame(winobj, current_pos + 1, slot, false); + if (ret <= 0) + { +#ifdef RPR_DEBUG + elog(DEBUG1, "next row is out of frame at: " INT64_FORMAT, + current_pos + 1); +#endif + ExecClearTuple(slot); + econtext->ecxt_innertuple = winstate->null_slot; + } + else + econtext->ecxt_innertuple = slot; + } + return true; +} + +/* + * pattern_initial + * + * Return pattern variable initial character + * matching with pattern variable name vname. + * If not found, return 0. + */ +static +char +pattern_initial(WindowAggState *winstate, char *vname) +{ + char initial; + char *name; + ListCell *lc1, + *lc2; + + forboth(lc1, winstate->defineVariableList, + lc2, winstate->defineInitial) + { + name = strVal(lfirst(lc1)); /* DEFINE variable name */ + initial = *(strVal(lfirst(lc2))); /* DEFINE variable initial */ + + + if (!strcmp(name, vname)) + return initial; /* found */ + } + return 0; +} + +/* + * string_set_init + * + * Create dynamic set of StringInfo. + */ +static +StringSet * +string_set_init(void) +{ +/* Initial allocation size of str_set */ +#define STRING_SET_ALLOC_SIZE 1024 + + StringSet *string_set; + Size set_size; + + string_set = palloc0(sizeof(StringSet)); + string_set->set_index = 0; + set_size = STRING_SET_ALLOC_SIZE; + string_set->str_set = palloc(set_size * sizeof(StringInfo)); + string_set->info = palloc0(set_size * sizeof(int)); + string_set->set_size = set_size; + + return string_set; +} + +/* + * string_set_add + * + * Add StringInfo str to StringSet string_set. + */ +static +void +string_set_add(StringSet *string_set, StringInfo str, int info) +{ + Size set_size; + Size old_set_size; + + set_size = string_set->set_size; + if (string_set->set_index >= set_size) + { + old_set_size = set_size; + set_size *= 2; + string_set->str_set = repalloc(string_set->str_set, + set_size * sizeof(StringInfo)); + string_set->info = repalloc0(string_set->info, + old_set_size * sizeof(int), + set_size * sizeof(int)); + string_set->set_size = set_size; + } + + string_set->info[string_set->set_index] = info; + string_set->str_set[string_set->set_index++] = str; + + return; +} + +/* + * string_set_get + * + * Returns StringInfo specified by index. + * If there's no data yet, returns NULL. + */ +static +StringInfo +string_set_get(StringSet *string_set, int index, int *info) +{ + *info = 0; + + /* no data? */ + if (index == 0 && string_set->set_index == 0) + return NULL; + + if (index < 0 || index >= string_set->set_index) + elog(ERROR, "invalid index: %d", index); + + *info = string_set->info[index]; + + return string_set->str_set[index]; +} + +/* + * string_set_get_size + * + * Returns the size of StringSet. + */ +static +int +string_set_get_size(StringSet *string_set) +{ + return string_set->set_index; +} + +/* + * string_set_discard + * Discard StringSet. + * All memory including StringSet itself is freed. + */ +static +void +string_set_discard(StringSet *string_set) +{ + int i; + + for (i = 0; i < string_set->set_index; i++) + { + StringInfo str = string_set->str_set[i]; + + if (str) + destroyStringInfo(str); + } + pfree(string_set->info); + pfree(string_set->str_set); + pfree(string_set); +} + +/* + * variable_pos_init + * + * Create and initialize variable postion structure + */ +static +VariablePos * +variable_pos_init(void) +{ + VariablePos *variable_pos; + + variable_pos = palloc(sizeof(VariablePos) * NUM_ALPHABETS); + MemSet(variable_pos, -1, sizeof(VariablePos) * NUM_ALPHABETS); + return variable_pos; +} + +/* + * variable_pos_register + * + * Register pattern variable whose initial is initial into postion index. + * pos is position of initial. + * If pos is already registered, register it at next empty slot. + */ +static +void +variable_pos_register(VariablePos *variable_pos, char initial, int pos) +{ + int index = initial - 'a'; + int slot; + int i; + + if (pos < 0 || pos > NUM_ALPHABETS) + elog(ERROR, "initial is not valid char: %c", initial); + + for (i = 0; i < NUM_ALPHABETS; i++) + { + slot = variable_pos[index].pos[i]; + if (slot < 0) + { + /* empty slot found */ + variable_pos[index].pos[i] = pos; + return; + } + } + elog(ERROR, "no empty slot for initial: %c", initial); +} + +/* + * variable_pos_compare + * + * Returns true if initial1 can be followed by initial2 + */ +static +bool +variable_pos_compare(VariablePos *variable_pos, char initial1, char initial2) +{ + int index1, + index2; + int pos1, + pos2; + + for (index1 = 0;; index1++) + { + pos1 = variable_pos_fetch(variable_pos, initial1, index1); + if (pos1 < 0) + break; + + for (index2 = 0;; index2++) + { + pos2 = variable_pos_fetch(variable_pos, initial2, index2); + if (pos2 < 0) + break; + if (pos1 <= pos2) + return true; + } + } + return false; +} + +/* + * variable_pos_fetch + * + * Fetch position of pattern variable whose initial is initial, and whose index + * is index. If no postion was registered by initial, index, returns -1. + */ +static +int +variable_pos_fetch(VariablePos *variable_pos, char initial, int index) +{ + int pos = initial - 'a'; + + if (pos < 0 || pos > NUM_ALPHABETS) + elog(ERROR, "initial is not valid char: %c", initial); + + if (index < 0 || index > NUM_ALPHABETS) + elog(ERROR, "index is not valid: %d", index); + + return variable_pos[pos].pos[index]; +} + +/* + * variable_pos_build + * + * Build VariablePos structure and return it. + */ +static +VariablePos * +variable_pos_build(WindowAggState *winstate) +{ + VariablePos *variable_pos; + StringInfo pattern_str; + int initial_index = 0; + ListCell *lc1, + *lc2; + + variable_pos = winstate->variable_pos = variable_pos_init(); + pattern_str = winstate->pattern_str = makeStringInfo(); + appendStringInfoChar(pattern_str, '^'); + + forboth(lc1, winstate->patternVariableList, + lc2, winstate->patternRegexpList) + { + char *vname = strVal(lfirst(lc1)); + char *quantifier = strVal(lfirst(lc2)); + char initial; + + initial = pattern_initial(winstate, vname); + Assert(initial != 0); + appendStringInfoChar(pattern_str, initial); + if (quantifier[0]) + appendStringInfoChar(pattern_str, quantifier[0]); + + /* + * Register the initial at initial_index. If the initial appears more + * than once, all of it's initial_index will be recorded. This could + * happen if a pattern variable appears in the PATTERN clause more + * than once like "UP DOWN UP" "UP UP UP". + */ + variable_pos_register(variable_pos, initial, initial_index); + + initial_index++; + } + + return variable_pos; +} diff --git a/src/backend/utils/adt/windowfuncs.c b/src/backend/utils/adt/windowfuncs.c index 969f02aa59b..de7571589af 100644 --- a/src/backend/utils/adt/windowfuncs.c +++ b/src/backend/utils/adt/windowfuncs.c @@ -37,11 +37,19 @@ typedef struct int64 remainder; /* (total rows) % (bucket num) */ } ntile_context; +/* + * rpr process information. + * Used for AFTER MATCH SKIP PAST LAST ROW + */ +typedef struct SkipContext +{ + int64 pos; /* last row absolute position */ +} SkipContext; + static bool rank_up(WindowObject winobj); static Datum leadlag_common(FunctionCallInfo fcinfo, bool forward, bool withoffset, bool withdefault); - /* * utility routine for *_rank functions. */ @@ -683,7 +691,7 @@ window_last_value(PG_FUNCTION_ARGS) WinCheckAndInitializeNullTreatment(winobj, true, fcinfo); result = WinGetFuncArgInFrame(winobj, 0, - 0, WINDOW_SEEK_TAIL, true, + 0, WINDOW_SEEK_TAIL, false, &isnull, NULL); if (isnull) PG_RETURN_NULL(); @@ -724,3 +732,25 @@ window_nth_value(PG_FUNCTION_ARGS) PG_RETURN_DATUM(result); } + +/* + * prev + * Dummy function to invoke RPR's navigation operator "PREV". + * This is *not* a window function. + */ +Datum +window_prev(PG_FUNCTION_ARGS) +{ + PG_RETURN_DATUM(PG_GETARG_DATUM(0)); +} + +/* + * next + * Dummy function to invoke RPR's navigation operation "NEXT". + * This is *not* a window function. + */ +Datum +window_next(PG_FUNCTION_ARGS) +{ + PG_RETURN_DATUM(PG_GETARG_DATUM(0)); +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index fd9448ec7b9..8fa0df4a8bd 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10837,6 +10837,12 @@ { oid => '3114', descr => 'fetch the Nth row value', proname => 'nth_value', prokind => 'w', prorettype => 'anyelement', proargtypes => 'anyelement int4', prosrc => 'window_nth_value' }, +{ oid => '8126', descr => 'previous value', + proname => 'prev', provolatile => 's', prorettype => 'anyelement', + proargtypes => 'anyelement', prosrc => 'window_prev' }, +{ oid => '8127', descr => 'next value', + proname => 'next', provolatile => 's', prorettype => 'anyelement', + proargtypes => 'anyelement', prosrc => 'window_next' }, # functions for range types { oid => '3832', descr => 'I/O', diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3968429f991..3a798de25b2 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -39,6 +39,7 @@ #include "nodes/plannodes.h" #include "nodes/tidbitmap.h" #include "partitioning/partdefs.h" +#include "regex/regex.h" #include "storage/condition_variable.h" #include "utils/hsearch.h" #include "utils/queryenvironment.h" @@ -2627,6 +2628,37 @@ typedef enum WindowAggStatus * tuples during spool */ } WindowAggStatus; +#define RF_NOT_DETERMINED 0 +#define RF_FRAME_HEAD 1 +#define RF_SKIPPED 2 +#define RF_UNMATCHED 3 + +/* + * Allowed PATTERN variables positions. + * Used in RPR. + * + * pos represents the pattern variable defined order in DEFINE caluase. For + * example. "DEFINE START..., UP..., DOWN ..." and "PATTERN START UP DOWN UP" + * will create: + * VariablePos[0].pos[0] = 0; START + * VariablePos[1].pos[0] = 1; UP + * VariablePos[1].pos[1] = 3; UP + * VariablePos[2].pos[0] = 2; DOWN + * + * Note that UP has two pos because UP appears in PATTERN twice. + * + * By using this strucrture, we can know which pattern variable can be followed + * by which pattern variable(s). For example, START can be followed by UP and + * DOWN since START's pos is 0, and UP's pos is 1 or 3, DOWN's pos is 2. + * DOWN can be followed by UP since UP's pos is either 1 or 3. + * + */ +#define NUM_ALPHABETS 26 /* we allow [a-z] variable initials */ +typedef struct VariablePos +{ + int pos[NUM_ALPHABETS]; /* postions in PATTERN */ +} VariablePos; + typedef struct WindowAggState { ScanState ss; /* its first field is NodeTag */ @@ -2686,6 +2718,21 @@ typedef struct WindowAggState int64 groupheadpos; /* current row's peer group head position */ int64 grouptailpos; /* " " " " tail position (group end+1) */ + /* these fields are used in Row pattern recognition: */ + RPSkipTo rpSkipTo; /* Row Pattern Skip To type */ + List *patternVariableList; /* list of row pattern variables names + * (list of String) */ + List *patternRegexpList; /* list of row pattern regular expressions + * ('+' or ''. list of String) */ + List *defineVariableList; /* list of row pattern definition + * variables (list of String) */ + List *defineClauseList; /* expression for row pattern definition + * search conditions ExprState list */ + List *defineInitial; /* list of row pattern definition variable + * initials (list of String) */ + VariablePos *variable_pos; /* list of pattern variable positions */ + StringInfo pattern_str; /* PATTERN initials */ + MemoryContext partcontext; /* context for partition-lifespan data */ MemoryContext aggcontext; /* shared context for aggregate working data */ MemoryContext curaggcontext; /* current aggregate's working data */ @@ -2713,6 +2760,22 @@ typedef struct WindowAggState TupleTableSlot *agg_row_slot; TupleTableSlot *temp_slot_1; TupleTableSlot *temp_slot_2; + + /* temporary slots for RPR */ + TupleTableSlot *prev_slot; /* PREV row navigation operator */ + TupleTableSlot *next_slot; /* NEXT row navigation operator */ + TupleTableSlot *null_slot; /* all NULL slot */ + + /* + * Each byte corresponds to a row positioned at absolute its pos in + * partition. See above definition for RF_*. Used for RPR. + */ + char *reduced_frame_map; + int64 alloc_sz; /* size of the map */ + + /* regular expression compiled result cache. Used for RPR. */ + char *patbuf; /* pattern to be compiled */ + regex_t preg; /* compiled re pattern */ } WindowAggState; /* ---------------- -- 2.43.0