v10-0003-Create-execAppend.c-to-avoid-duplicated-code-on-.patch

text/plain

Filename: v10-0003-Create-execAppend.c-to-avoid-duplicated-code-on-.patch
Type: text/plain
Part: 2
Message: Re: Asynchronous MergeAppend
From 4b08e19de2a52a479a3f3f8c5db6601770e4c3aa Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Tue, 16 Dec 2025 16:32:14 -0300
Subject: [PATCH v10 3/3] Create execAppend.c to avoid duplicated code on
 [Merge]Append

---
 contrib/pg_overexplain/pg_overexplain.c |   4 +-
 contrib/postgres_fdw/postgres_fdw.c     |   8 +-
 src/backend/commands/explain.c          |  26 +-
 src/backend/executor/Makefile           |   1 +
 src/backend/executor/execAmi.c          |   2 +-
 src/backend/executor/execAppend.c       | 410 +++++++++++++++++++
 src/backend/executor/execCurrent.c      |   4 +-
 src/backend/executor/execProcnode.c     |   8 +-
 src/backend/executor/meson.build        |   1 +
 src/backend/executor/nodeAppend.c       | 497 +++++-------------------
 src/backend/executor/nodeMergeAppend.c  | 416 +++-----------------
 src/backend/nodes/nodeFuncs.c           |   8 +-
 src/backend/optimizer/plan/createplan.c |  34 +-
 src/backend/optimizer/plan/setrefs.c    |  44 +--
 src/backend/optimizer/plan/subselect.c  |   4 +-
 src/backend/utils/adt/ruleutils.c       |   8 +-
 src/include/executor/execAppend.h       |  33 ++
 src/include/nodes/execnodes.h           |  80 ++--
 src/include/nodes/plannodes.h           |  45 +--
 19 files changed, 720 insertions(+), 913 deletions(-)
 create mode 100644 src/backend/executor/execAppend.c
 create mode 100644 src/include/executor/execAppend.h

diff --git a/contrib/pg_overexplain/pg_overexplain.c b/contrib/pg_overexplain/pg_overexplain.c
index fcdc17012da..7f18c2ab06c 100644
--- a/contrib/pg_overexplain/pg_overexplain.c
+++ b/contrib/pg_overexplain/pg_overexplain.c
@@ -228,12 +228,12 @@ overexplain_per_node_hook(PlanState *planstate, List *ancestors,
 				break;
 			case T_Append:
 				overexplain_bitmapset("Append RTIs",
-									  ((Append *) plan)->apprelids,
+									  ((Append *) plan)->ap.apprelids,
 									  es);
 				break;
 			case T_MergeAppend:
 				overexplain_bitmapset("Append RTIs",
-									  ((MergeAppend *) plan)->apprelids,
+									  ((MergeAppend *) plan)->ap.apprelids,
 									  es);
 				break;
 			case T_Result:
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index bd551a1db72..b01ad40ad17 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -2412,8 +2412,8 @@ find_modifytable_subplan(PlannerInfo *root,
 	{
 		Append	   *appendplan = (Append *) subplan;
 
-		if (subplan_index < list_length(appendplan->appendplans))
-			subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
+		if (subplan_index < list_length(appendplan->ap.subplans))
+			subplan = (Plan *) list_nth(appendplan->ap.subplans, subplan_index);
 	}
 	else if (IsA(subplan, Result) &&
 			 outerPlan(subplan) != NULL &&
@@ -2421,8 +2421,8 @@ find_modifytable_subplan(PlannerInfo *root,
 	{
 		Append	   *appendplan = (Append *) outerPlan(subplan);
 
-		if (subplan_index < list_length(appendplan->appendplans))
-			subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
+		if (subplan_index < list_length(appendplan->ap.subplans))
+			subplan = (Plan *) list_nth(appendplan->ap.subplans, subplan_index);
 	}
 
 	/* Now, have we got a ForeignScan on the desired rel? */
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 5a6390631eb..3eaa1f7459e 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1224,11 +1224,11 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used)
 			break;
 		case T_Append:
 			*rels_used = bms_add_members(*rels_used,
-										 ((Append *) plan)->apprelids);
+										 ((Append *) plan)->ap.apprelids);
 			break;
 		case T_MergeAppend:
 			*rels_used = bms_add_members(*rels_used,
-										 ((MergeAppend *) plan)->apprelids);
+										 ((MergeAppend *) plan)->ap.apprelids);
 			break;
 		case T_Result:
 			*rels_used = bms_add_members(*rels_used,
@@ -1272,7 +1272,7 @@ plan_is_disabled(Plan *plan)
 		 * includes any run-time pruned children.  Ignoring those could give
 		 * us the incorrect number of disabled nodes.
 		 */
-		foreach(lc, aplan->appendplans)
+		foreach(lc, aplan->ap.subplans)
 		{
 			Plan	   *subplan = lfirst(lc);
 
@@ -1289,7 +1289,7 @@ plan_is_disabled(Plan *plan)
 		 * includes any run-time pruned children.  Ignoring those could give
 		 * us the incorrect number of disabled nodes.
 		 */
-		foreach(lc, maplan->mergeplans)
+		foreach(lc, maplan->ap.subplans)
 		{
 			Plan	   *subplan = lfirst(lc);
 
@@ -2336,13 +2336,13 @@ ExplainNode(PlanState *planstate, List *ancestors,
 	switch (nodeTag(plan))
 	{
 		case T_Append:
-			ExplainMissingMembers(((AppendState *) planstate)->as_nplans,
-								  list_length(((Append *) plan)->appendplans),
+			ExplainMissingMembers(((AppendState *) planstate)->as.nplans,
+								  list_length(((Append *) plan)->ap.subplans),
 								  es);
 			break;
 		case T_MergeAppend:
-			ExplainMissingMembers(((MergeAppendState *) planstate)->ms_nplans,
-								  list_length(((MergeAppend *) plan)->mergeplans),
+			ExplainMissingMembers(((MergeAppendState *) planstate)->ms.nplans,
+								  list_length(((MergeAppend *) plan)->ap.subplans),
 								  es);
 			break;
 		default:
@@ -2386,13 +2386,13 @@ ExplainNode(PlanState *planstate, List *ancestors,
 	switch (nodeTag(plan))
 	{
 		case T_Append:
-			ExplainMemberNodes(((AppendState *) planstate)->appendplans,
-							   ((AppendState *) planstate)->as_nplans,
+			ExplainMemberNodes(((AppendState *) planstate)->as.plans,
+							   ((AppendState *) planstate)->as.nplans,
 							   ancestors, es);
 			break;
 		case T_MergeAppend:
-			ExplainMemberNodes(((MergeAppendState *) planstate)->mergeplans,
-							   ((MergeAppendState *) planstate)->ms_nplans,
+			ExplainMemberNodes(((MergeAppendState *) planstate)->ms.plans,
+							   ((MergeAppendState *) planstate)->ms.nplans,
 							   ancestors, es);
 			break;
 		case T_BitmapAnd:
@@ -2606,7 +2606,7 @@ static void
 show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 					   ExplainState *es)
 {
-	MergeAppend *plan = (MergeAppend *) mstate->ps.plan;
+	MergeAppend *plan = (MergeAppend *) mstate->ms.ps.plan;
 
 	show_sort_group_keys((PlanState *) mstate, "Sort Key",
 						 plan->numCols, 0, plan->sortColIdx,
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 11118d0ce02..66b62fca921 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
 	execAmi.o \
 	execAsync.o \
+	execAppend.o \
 	execCurrent.o \
 	execExpr.o \
 	execExprInterp.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 1d0e8ad57b4..5c897048ba3 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -537,7 +537,7 @@ ExecSupportsBackwardScan(Plan *node)
 				if (((Append *) node)->nasyncplans > 0)
 					return false;
 
-				foreach(l, ((Append *) node)->appendplans)
+				foreach(l, ((Append *) node)->ap.subplans)
 				{
 					if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
 						return false;
diff --git a/src/backend/executor/execAppend.c b/src/backend/executor/execAppend.c
new file mode 100644
index 00000000000..1ddf717cf95
--- /dev/null
+++ b/src/backend/executor/execAppend.c
@@ -0,0 +1,410 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAppend.c
+ *	  This code provides support functions for executing MergeAppend and Append
+ *	  nodes.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execAppend.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+#include "executor/executor.h"
+#include "executor/execAppend.h"
+#include "executor/execAsync.h"
+#include "executor/execPartition.h"
+#include "storage/latch.h"
+#include "storage/waiteventset.h"
+#include "miscadmin.h"
+
+#define EVENT_BUFFER_SIZE			16
+
+/*  Begin all of the subscans of an Appender node. */
+void
+ExecInitAppender(AppenderState * state,
+				 Appender * node,
+				 EState *estate,
+				 int eflags,
+				 int first_partial_plan,
+				 int *first_valid_partial_plan)
+{
+	PlanState **appendplanstates;
+	const TupleTableSlotOps *appendops;
+	Bitmapset  *validsubplans;
+	Bitmapset  *asyncplans;
+	int			nplans;
+	int			nasyncplans;
+	int			firstvalid;
+	int			i,
+				j;
+
+	/* If run-time partition pruning is enabled, then set that up now */
+	if (node->part_prune_index >= 0)
+	{
+		PartitionPruneState *prunestate;
+
+		/*
+		 * Set up pruning data structure.  This also initializes the set of
+		 * subplans to initialize (validsubplans) by taking into account the
+		 * result of performing initial pruning if any.
+		 */
+		prunestate = ExecInitPartitionExecPruning(&state->ps,
+												  list_length(node->subplans),
+												  node->part_prune_index,
+												  node->apprelids,
+												  &validsubplans);
+		state->prune_state = prunestate;
+		nplans = bms_num_members(validsubplans);
+
+		/*
+		 * When no run-time pruning is required and there's at least one
+		 * subplan, we can fill as_valid_subplans immediately, preventing
+		 * later calls to ExecFindMatchingSubPlans.
+		 */
+		if (!prunestate->do_exec_prune && nplans > 0)
+		{
+			state->valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+			state->valid_subplans_identified = true;
+		}
+	}
+	else
+	{
+		nplans = list_length(node->subplans);
+
+		/*
+		 * When run-time partition pruning is not enabled we can just mark all
+		 * subplans as valid; they must also all be initialized.
+		 */
+		Assert(nplans > 0);
+		state->valid_subplans = validsubplans =
+			bms_add_range(NULL, 0, nplans - 1);
+		state->valid_subplans_identified = true;
+		state->prune_state = NULL;
+	}
+
+	appendplanstates = palloc0_array(PlanState *, nplans);
+
+	/*
+	 * call ExecInitNode on each of the valid plans to be executed and save
+	 * the results into the appendplanstates array.
+	 *
+	 * While at it, find out the first valid partial plan.
+	 */
+	j = 0;
+	asyncplans = NULL;
+	nasyncplans = 0;
+	firstvalid = nplans;
+	i = -1;
+	while ((i = bms_next_member(validsubplans, i)) >= 0)
+	{
+		Plan	   *initNode = (Plan *) list_nth(node->subplans, i);
+
+		/*
+		 * Record async subplans.  When executing EvalPlanQual, we treat them
+		 * as sync ones; don't do this when initializing an EvalPlanQual plan
+		 * tree.
+		 */
+		if (initNode->async_capable && estate->es_epq_active == NULL)
+		{
+			asyncplans = bms_add_member(asyncplans, j);
+			nasyncplans++;
+		}
+
+		/*
+		 * Record the lowest appendplans index which is a valid partial plan.
+		 */
+		if (first_valid_partial_plan && i >= first_partial_plan && j < firstvalid)
+			firstvalid = j;
+
+		appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+	}
+
+	if (first_valid_partial_plan)
+		*first_valid_partial_plan = firstvalid;
+
+	state->plans = appendplanstates;
+	state->nplans = nplans;
+
+	/*
+	 * Initialize Append's result tuple type and slot.  If the child plans all
+	 * produce the same fixed slot type, we can use that slot type; otherwise
+	 * make a virtual slot.  (Note that the result slot itself is used only to
+	 * return a null tuple at end of execution; real tuples are returned to
+	 * the caller in the children's own result slots.  What we are doing here
+	 * is allowing the parent plan node to optimize if the Append will return
+	 * only one kind of slot.)
+	 */
+	appendops = ExecGetCommonSlotOps(appendplanstates, j);
+	if (appendops != NULL)
+	{
+		ExecInitResultTupleSlotTL(&state->ps, appendops);
+	}
+	else
+	{
+		ExecInitResultTupleSlotTL(&state->ps, &TTSOpsVirtual);
+		/* show that the output slot type is not fixed */
+		state->ps.resultopsset = true;
+		state->ps.resultopsfixed = false;
+	}
+
+	/* Initialize async state */
+	state->asyncplans = asyncplans;
+	state->nasyncplans = nasyncplans;
+	state->asyncrequests = NULL;
+	state->asyncresults = NULL;
+	state->needrequest = NULL;
+	state->eventset = NULL;
+	state->valid_asyncplans = NULL;
+
+	if (nasyncplans > 0)
+	{
+		state->asyncrequests = (AsyncRequest **)
+			palloc0(nplans * sizeof(AsyncRequest *));
+
+		i = -1;
+		while ((i = bms_next_member(asyncplans, i)) >= 0)
+		{
+			AsyncRequest *areq;
+
+			areq = palloc_object(AsyncRequest);
+			areq->requestor = (PlanState *) state;
+			areq->requestee = appendplanstates[i];
+			areq->request_index = i;
+			areq->callback_pending = false;
+			areq->request_complete = false;
+			areq->result = NULL;
+
+			state->asyncrequests[i] = areq;
+		}
+
+		/*
+		 * AppendState and MergeAppendState have slightly different allocation
+		 * sizes for asyncresults in the original code, but we unify to the
+		 * larger requirement or specific nplans if required.
+		 */
+		state->asyncresults = (TupleTableSlot **)
+			palloc0(nplans * sizeof(TupleTableSlot *));
+	}
+
+	/*
+	 * Miscellaneous initialization
+	 */
+	state->ps.ps_ProjInfo = NULL;
+}
+
+void
+ExecReScanAppender(AppenderState * node)
+{
+	int			i;
+	int			nasyncplans = node->nasyncplans;
+
+	/*
+	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
+	 * we'd better unset the valid subplans so that they are reselected for
+	 * the new parameter values.
+	 */
+	if (node->prune_state &&
+		bms_overlap(node->ps.chgParam,
+					node->prune_state->execparamids))
+	{
+		node->valid_subplans_identified = false;
+		bms_free(node->valid_subplans);
+		node->valid_subplans = NULL;
+		bms_free(node->valid_asyncplans);
+		node->valid_asyncplans = NULL;
+	}
+
+	for (i = 0; i < node->nplans; i++)
+	{
+		PlanState  *subnode = node->plans[i];
+
+		/*
+		 * ExecReScan doesn't know about my subplans, so I have to do
+		 * changed-parameter signaling myself.
+		 */
+		if (node->ps.chgParam != NULL)
+			UpdateChangedParamSet(subnode, node->ps.chgParam);
+
+		/*
+		 * If chgParam of subnode is not null then plan will be re-scanned by
+		 * first ExecProcNode.
+		 */
+		if (subnode->chgParam == NULL)
+			ExecReScan(subnode);
+	}
+
+	/* Reset async state */
+	if (nasyncplans > 0)
+	{
+		i = -1;
+		while ((i = bms_next_member(node->asyncplans, i)) >= 0)
+		{
+			AsyncRequest *areq = node->asyncrequests[i];
+
+			areq->callback_pending = false;
+			areq->request_complete = false;
+			areq->result = NULL;
+		}
+
+		bms_free(node->needrequest);
+		node->needrequest = NULL;
+	}
+}
+
+/*  Wait or poll for file descriptor events and fire callbacks. */
+void
+ExecAppenderAsyncEventWait(AppenderState * node, int timeout, uint32 wait_event_info)
+{
+	int			nevents = node->nasyncplans + 2;	/* one for PM death and
+													 * one for latch */
+	int			noccurred;
+	int			i;
+	WaitEvent	occurred_event[EVENT_BUFFER_SIZE];
+
+	Assert(node->eventset == NULL);
+
+	node->eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
+	AddWaitEventToSet(node->eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+					  NULL, NULL);
+
+	/* Give each waiting subplan a chance to add an event. */
+	i = -1;
+	while ((i = bms_next_member(node->asyncplans, i)) >= 0)
+	{
+		AsyncRequest *areq = node->asyncrequests[i];
+
+		if (areq->callback_pending)
+			ExecAsyncConfigureWait(areq);
+	}
+
+	/*
+	 * No need for further processing if none of the subplans configured any
+	 * events.
+	 */
+	if (GetNumRegisteredWaitEvents(node->eventset) == 1)
+	{
+		FreeWaitEventSet(node->eventset);
+		node->eventset = NULL;
+		return;
+	}
+
+	/*
+	 * Add the process latch to the set, so that we wake up to process the
+	 * standard interrupts with CHECK_FOR_INTERRUPTS().
+	 *
+	 * NOTE: For historical reasons, it's important that this is added to the
+	 * WaitEventSet after the ExecAsyncConfigureWait() calls.  Namely,
+	 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
+	 * any other events are in the set.  That's a poor design, it's
+	 * questionable for postgres_fdw to be doing that in the first place, but
+	 * we cannot change it now.  The pattern has possibly been copied to other
+	 * extensions too.
+	 */
+	AddWaitEventToSet(node->eventset, WL_LATCH_SET, PGINVALID_SOCKET,
+					  MyLatch, NULL);
+
+	/* Return at most EVENT_BUFFER_SIZE events in one call. */
+	if (nevents > EVENT_BUFFER_SIZE)
+		nevents = EVENT_BUFFER_SIZE;
+
+	/* Wait until at least one event occurs. */
+	noccurred = WaitEventSetWait(node->eventset, timeout, occurred_event,
+								 nevents, wait_event_info);
+
+
+	FreeWaitEventSet(node->eventset);
+	node->eventset = NULL;
+	if (noccurred == 0)
+		return;
+
+
+	/* Deliver notifications. */
+	for (i = 0; i < noccurred; i++)
+	{
+		WaitEvent  *w = &occurred_event[i];
+
+		/*
+		 * Each waiting subplan should have registered its wait event with
+		 * user_data pointing back to its AsyncRequest.
+		 */
+		if ((w->events & WL_SOCKET_READABLE) != 0)
+		{
+			AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+			if (areq->callback_pending)
+			{
+				/*
+				 * Mark it as no longer needing a callback.  We must do this
+				 * before dispatching the callback in case the callback resets
+				 * the flag.
+				 */
+				areq->callback_pending = false;
+
+				/* Do the actual work. */
+				ExecAsyncNotify(areq);
+			}
+		}
+
+		/* Handle standard interrupts */
+		if ((w->events & WL_LATCH_SET) != 0)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+	}
+}
+
+/*  Begin executing async-capable subplans. */
+void
+ExecAppenderAsyncBegin(AppenderState * node)
+{
+	int			i;
+
+	/* Backward scan is not supported by async-aware Appends. */
+	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+	/* We should never be called when there are no subplans */
+	Assert(node->nplans > 0);
+
+	/* We should never be called when there are no async subplans. */
+	Assert(node->nasyncplans > 0);
+
+	/* Make a request for each of the valid async subplans. */
+	i = -1;
+	while ((i = bms_next_member(node->valid_asyncplans, i)) >= 0)
+	{
+		AsyncRequest *areq = node->asyncrequests[i];
+
+		Assert(areq->request_index == i);
+		Assert(!areq->callback_pending);
+
+		/* Do the actual work. */
+		ExecAsyncRequest(areq);
+	}
+}
+
+/*  Shuts down the subplans of an Appender node. */
+void
+ExecEndAppender(AppenderState * node)
+{
+	PlanState **subplans;
+	int			nplans;
+	int			i;
+
+	/*
+	 * get information from the node
+	 */
+	subplans = node->plans;
+	nplans = node->nplans;
+
+	/*
+	 * shut down each of the subscans
+	 */
+	for (i = 0; i < nplans; i++)
+		ExecEndNode(subplans[i]);
+}
diff --git a/src/backend/executor/execCurrent.c b/src/backend/executor/execCurrent.c
index 3bfdc0230ff..e8cf2ead8a8 100644
--- a/src/backend/executor/execCurrent.c
+++ b/src/backend/executor/execCurrent.c
@@ -375,9 +375,9 @@ search_plan_tree(PlanState *node, Oid table_oid,
 				AppendState *astate = (AppendState *) node;
 				int			i;
 
-				for (i = 0; i < astate->as_nplans; i++)
+				for (i = 0; i < astate->as.nplans; i++)
 				{
-					ScanState  *elem = search_plan_tree(astate->appendplans[i],
+					ScanState  *elem = search_plan_tree(astate->as.plans[i],
 														table_oid,
 														pending_rescan);
 
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index f5f9cfbeead..3eb1de1cd30 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -910,8 +910,8 @@ ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
 		AppendState *aState = (AppendState *) child_node;
 		int			i;
 
-		for (i = 0; i < aState->as_nplans; i++)
-			ExecSetTupleBound(tuples_needed, aState->appendplans[i]);
+		for (i = 0; i < aState->as.nplans; i++)
+			ExecSetTupleBound(tuples_needed, aState->as.plans[i]);
 	}
 	else if (IsA(child_node, MergeAppendState))
 	{
@@ -923,8 +923,8 @@ ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
 		MergeAppendState *maState = (MergeAppendState *) child_node;
 		int			i;
 
-		for (i = 0; i < maState->ms_nplans; i++)
-			ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
+		for (i = 0; i < maState->ms.nplans; i++)
+			ExecSetTupleBound(tuples_needed, maState->ms.plans[i]);
 	}
 	else if (IsA(child_node, ResultState))
 	{
diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build
index 2cea41f8771..b5cb710a59f 100644
--- a/src/backend/executor/meson.build
+++ b/src/backend/executor/meson.build
@@ -3,6 +3,7 @@
 backend_sources += files(
   'execAmi.c',
   'execAsync.c',
+  'execAppend.c',
   'execCurrent.c',
   'execExpr.c',
   'execExprInterp.c',
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index dfbc7b510c4..5c39ee275d2 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -57,13 +57,13 @@
 
 #include "postgres.h"
 
+#include "executor/execAppend.h"
 #include "executor/execAsync.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/nodeAppend.h"
 #include "miscadmin.h"
 #include "pgstat.h"
-#include "storage/latch.h"
 
 /* Shared state for parallel-aware Append. */
 struct ParallelAppendState
@@ -109,15 +109,6 @@ AppendState *
 ExecInitAppend(Append *node, EState *estate, int eflags)
 {
 	AppendState *appendstate = makeNode(AppendState);
-	PlanState **appendplanstates;
-	const TupleTableSlotOps *appendops;
-	Bitmapset  *validsubplans;
-	Bitmapset  *asyncplans;
-	int			nplans;
-	int			nasyncplans;
-	int			firstvalid;
-	int			i,
-				j;
 
 	/* check for unsupported flags */
 	Assert(!(eflags & EXEC_FLAG_MARK));
@@ -125,167 +116,27 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	/*
 	 * create new AppendState for our append node
 	 */
-	appendstate->ps.plan = (Plan *) node;
-	appendstate->ps.state = estate;
-	appendstate->ps.ExecProcNode = ExecAppend;
+	appendstate->as.ps.plan = (Plan *) node;
+	appendstate->as.ps.state = estate;
+	appendstate->as.ps.ExecProcNode = ExecAppend;
 
 	/* Let choose_next_subplan_* function handle setting the first subplan */
 	appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
 	appendstate->as_syncdone = false;
 	appendstate->as_begun = false;
 
-	/* If run-time partition pruning is enabled, then set that up now */
-	if (node->part_prune_index >= 0)
-	{
-		PartitionPruneState *prunestate;
-
-		/*
-		 * Set up pruning data structure.  This also initializes the set of
-		 * subplans to initialize (validsubplans) by taking into account the
-		 * result of performing initial pruning if any.
-		 */
-		prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
-												  list_length(node->appendplans),
-												  node->part_prune_index,
-												  node->apprelids,
-												  &validsubplans);
-		appendstate->as_prune_state = prunestate;
-		nplans = bms_num_members(validsubplans);
-
-		/*
-		 * When no run-time pruning is required and there's at least one
-		 * subplan, we can fill as_valid_subplans immediately, preventing
-		 * later calls to ExecFindMatchingSubPlans.
-		 */
-		if (!prunestate->do_exec_prune && nplans > 0)
-		{
-			appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
-			appendstate->as_valid_subplans_identified = true;
-		}
-	}
-	else
-	{
-		nplans = list_length(node->appendplans);
-
-		/*
-		 * When run-time partition pruning is not enabled we can just mark all
-		 * subplans as valid; they must also all be initialized.
-		 */
-		Assert(nplans > 0);
-		appendstate->as_valid_subplans = validsubplans =
-			bms_add_range(NULL, 0, nplans - 1);
-		appendstate->as_valid_subplans_identified = true;
-		appendstate->as_prune_state = NULL;
-	}
-
-	appendplanstates = (PlanState **) palloc(nplans *
-											 sizeof(PlanState *));
-
-	/*
-	 * call ExecInitNode on each of the valid plans to be executed and save
-	 * the results into the appendplanstates array.
-	 *
-	 * While at it, find out the first valid partial plan.
-	 */
-	j = 0;
-	asyncplans = NULL;
-	nasyncplans = 0;
-	firstvalid = nplans;
-	i = -1;
-	while ((i = bms_next_member(validsubplans, i)) >= 0)
-	{
-		Plan	   *initNode = (Plan *) list_nth(node->appendplans, i);
-
-		/*
-		 * Record async subplans.  When executing EvalPlanQual, we treat them
-		 * as sync ones; don't do this when initializing an EvalPlanQual plan
-		 * tree.
-		 */
-		if (initNode->async_capable && estate->es_epq_active == NULL)
-		{
-			asyncplans = bms_add_member(asyncplans, j);
-			nasyncplans++;
-		}
-
-		/*
-		 * Record the lowest appendplans index which is a valid partial plan.
-		 */
-		if (i >= node->first_partial_plan && j < firstvalid)
-			firstvalid = j;
-
-		appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
-	}
-
-	appendstate->as_first_partial_plan = firstvalid;
-	appendstate->appendplans = appendplanstates;
-	appendstate->as_nplans = nplans;
+	/* Initialize common fields */
+	ExecInitAppender(&appendstate->as,
+					 &node->ap,
+					 estate,
+					 eflags,
+					 node->first_partial_plan,
+					 &appendstate->as_first_partial_plan);
 
-	/*
-	 * Initialize Append's result tuple type and slot.  If the child plans all
-	 * produce the same fixed slot type, we can use that slot type; otherwise
-	 * make a virtual slot.  (Note that the result slot itself is used only to
-	 * return a null tuple at end of execution; real tuples are returned to
-	 * the caller in the children's own result slots.  What we are doing here
-	 * is allowing the parent plan node to optimize if the Append will return
-	 * only one kind of slot.)
-	 */
-	appendops = ExecGetCommonSlotOps(appendplanstates, j);
-	if (appendops != NULL)
-	{
-		ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
-	}
-	else
-	{
-		ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
-		/* show that the output slot type is not fixed */
-		appendstate->ps.resultopsset = true;
-		appendstate->ps.resultopsfixed = false;
-	}
+	if (appendstate->as.nasyncplans > 0 && appendstate->as.valid_subplans_identified)
+		classify_matching_subplans(appendstate);
 
-	/* Initialize async state */
-	appendstate->as_asyncplans = asyncplans;
-	appendstate->as_nasyncplans = nasyncplans;
-	appendstate->as_asyncrequests = NULL;
-	appendstate->as_asyncresults = NULL;
-	appendstate->as_nasyncresults = 0;
 	appendstate->as_nasyncremain = 0;
-	appendstate->as_needrequest = NULL;
-	appendstate->as_eventset = NULL;
-	appendstate->as_valid_asyncplans = NULL;
-
-	if (nasyncplans > 0)
-	{
-		appendstate->as_asyncrequests = (AsyncRequest **)
-			palloc0(nplans * sizeof(AsyncRequest *));
-
-		i = -1;
-		while ((i = bms_next_member(asyncplans, i)) >= 0)
-		{
-			AsyncRequest *areq;
-
-			areq = palloc_object(AsyncRequest);
-			areq->requestor = (PlanState *) appendstate;
-			areq->requestee = appendplanstates[i];
-			areq->request_index = i;
-			areq->callback_pending = false;
-			areq->request_complete = false;
-			areq->result = NULL;
-
-			appendstate->as_asyncrequests[i] = areq;
-		}
-
-		appendstate->as_asyncresults = (TupleTableSlot **)
-			palloc0(nasyncplans * sizeof(TupleTableSlot *));
-
-		if (appendstate->as_valid_subplans_identified)
-			classify_matching_subplans(appendstate);
-	}
-
-	/*
-	 * Miscellaneous initialization
-	 */
-
-	appendstate->ps.ps_ProjInfo = NULL;
 
 	/* For parallel query, this will be overridden later. */
 	appendstate->choose_next_subplan = choose_next_subplan_locally;
@@ -315,11 +166,11 @@ ExecAppend(PlanState *pstate)
 		Assert(!node->as_syncdone);
 
 		/* Nothing to do if there are no subplans */
-		if (node->as_nplans == 0)
-			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		if (node->as.nplans == 0)
+			return ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
 
 		/* If there are any async subplans, begin executing them. */
-		if (node->as_nasyncplans > 0)
+		if (node->as.nasyncplans > 0)
 			ExecAppendAsyncBegin(node);
 
 		/*
@@ -327,11 +178,11 @@ ExecAppend(PlanState *pstate)
 		 * proceeding.
 		 */
 		if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
-			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+			return ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
 
 		Assert(node->as_syncdone ||
 			   (node->as_whichplan >= 0 &&
-				node->as_whichplan < node->as_nplans));
+				node->as_whichplan < node->as.nplans));
 
 		/* And we're initialized. */
 		node->as_begun = true;
@@ -346,19 +197,19 @@ ExecAppend(PlanState *pstate)
 		/*
 		 * try to get a tuple from an async subplan if any
 		 */
-		if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
+		if (node->as_syncdone || !bms_is_empty(node->as.needrequest))
 		{
 			if (ExecAppendAsyncGetNext(node, &result))
 				return result;
 			Assert(!node->as_syncdone);
-			Assert(bms_is_empty(node->as_needrequest));
+			Assert(bms_is_empty(node->as.needrequest));
 		}
 
 		/*
 		 * figure out which sync subplan we are currently processing
 		 */
-		Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
-		subnode = node->appendplans[node->as_whichplan];
+		Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as.nplans);
+		subnode = node->as.plans[node->as_whichplan];
 
 		/*
 		 * get a tuple from the subplan
@@ -385,7 +236,7 @@ ExecAppend(PlanState *pstate)
 
 		/* choose new sync subplan; if no sync/async subplans, we're done */
 		if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
-			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+			return ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
 	}
 }
 
@@ -400,81 +251,22 @@ ExecAppend(PlanState *pstate)
 void
 ExecEndAppend(AppendState *node)
 {
-	PlanState **appendplans;
-	int			nplans;
-	int			i;
-
-	/*
-	 * get information from the node
-	 */
-	appendplans = node->appendplans;
-	nplans = node->as_nplans;
-
-	/*
-	 * shut down each of the subscans
-	 */
-	for (i = 0; i < nplans; i++)
-		ExecEndNode(appendplans[i]);
+	ExecEndAppender(&node->as);
 }
 
 void
 ExecReScanAppend(AppendState *node)
 {
-	int			nasyncplans = node->as_nasyncplans;
-	int			i;
-
-	/*
-	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
-	 * we'd better unset the valid subplans so that they are reselected for
-	 * the new parameter values.
-	 */
-	if (node->as_prune_state &&
-		bms_overlap(node->ps.chgParam,
-					node->as_prune_state->execparamids))
-	{
-		node->as_valid_subplans_identified = false;
-		bms_free(node->as_valid_subplans);
-		node->as_valid_subplans = NULL;
-		bms_free(node->as_valid_asyncplans);
-		node->as_valid_asyncplans = NULL;
-	}
-
-	for (i = 0; i < node->as_nplans; i++)
-	{
-		PlanState  *subnode = node->appendplans[i];
 
-		/*
-		 * ExecReScan doesn't know about my subplans, so I have to do
-		 * changed-parameter signaling myself.
-		 */
-		if (node->ps.chgParam != NULL)
-			UpdateChangedParamSet(subnode, node->ps.chgParam);
+	int			nasyncplans = node->as.nasyncplans;
 
-		/*
-		 * If chgParam of subnode is not null then plan will be re-scanned by
-		 * first ExecProcNode or by first ExecAsyncRequest.
-		 */
-		if (subnode->chgParam == NULL)
-			ExecReScan(subnode);
-	}
+	ExecReScanAppender(&node->as);
 
-	/* Reset async state */
+	/* Reset specific append async state */
 	if (nasyncplans > 0)
 	{
-		i = -1;
-		while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
-		{
-			AsyncRequest *areq = node->as_asyncrequests[i];
-
-			areq->callback_pending = false;
-			areq->request_complete = false;
-			areq->result = NULL;
-		}
-
 		node->as_nasyncresults = 0;
 		node->as_nasyncremain = 0;
-		bms_free(node->as_needrequest);
-		node->as_needrequest = NULL;
 	}
 
 	/* Let choose_next_subplan_* function handle setting the first subplan */
@@ -501,7 +293,7 @@ ExecAppendEstimate(AppendState *node,
 {
 	node->pstate_len =
 		add_size(offsetof(ParallelAppendState, pa_finished),
-				 sizeof(bool) * node->as_nplans);
+				 sizeof(bool) * node->as.nplans);
 
 	shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
@@ -523,7 +315,7 @@ ExecAppendInitializeDSM(AppendState *node,
 	pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
 	memset(pstate, 0, node->pstate_len);
 	LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
-	shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
+	shm_toc_insert(pcxt->toc, node->as.ps.plan->plan_node_id, pstate);
 
 	node->as_pstate = pstate;
 	node->choose_next_subplan = choose_next_subplan_for_leader;
@@ -541,7 +333,7 @@ ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
 	ParallelAppendState *pstate = node->as_pstate;
 
 	pstate->pa_next_plan = 0;
-	memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
+	memset(pstate->pa_finished, 0, sizeof(bool) * node->as.nplans);
 }
 
 /* ----------------------------------------------------------------
@@ -554,7 +346,7 @@ ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
 void
 ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
 {
-	node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
+	node->as_pstate = shm_toc_lookup(pwcxt->toc, node->as.ps.plan->plan_node_id, false);
 	node->choose_next_subplan = choose_next_subplan_for_worker;
 }
 
@@ -572,7 +364,7 @@ choose_next_subplan_locally(AppendState *node)
 	int			nextplan;
 
 	/* We should never be called when there are no subplans */
-	Assert(node->as_nplans > 0);
+	Assert(node->as.nplans > 0);
 
 	/* Nothing to do if syncdone */
 	if (node->as_syncdone)
@@ -587,33 +379,33 @@ choose_next_subplan_locally(AppendState *node)
 	 */
 	if (whichplan == INVALID_SUBPLAN_INDEX)
 	{
-		if (node->as_nasyncplans > 0)
+		if (node->as.nasyncplans > 0)
 		{
 			/* We'd have filled as_valid_subplans already */
-			Assert(node->as_valid_subplans_identified);
+			Assert(node->as.valid_subplans_identified);
 		}
-		else if (!node->as_valid_subplans_identified)
+		else if (!node->as.valid_subplans_identified)
 		{
-			node->as_valid_subplans =
-				ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
-			node->as_valid_subplans_identified = true;
+			node->as.valid_subplans =
+				ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
+			node->as.valid_subplans_identified = true;
 		}
 
 		whichplan = -1;
 	}
 
 	/* Ensure whichplan is within the expected range */
-	Assert(whichplan >= -1 && whichplan <= node->as_nplans);
+	Assert(whichplan >= -1 && whichplan <= node->as.nplans);
 
-	if (ScanDirectionIsForward(node->ps.state->es_direction))
-		nextplan = bms_next_member(node->as_valid_subplans, whichplan);
+	if (ScanDirectionIsForward(node->as.ps.state->es_direction))
+		nextplan = bms_next_member(node->as.valid_subplans, whichplan);
 	else
-		nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
+		nextplan = bms_prev_member(node->as.valid_subplans, whichplan);
 
 	if (nextplan < 0)
 	{
 		/* Set as_syncdone if in async mode */
-		if (node->as_nasyncplans > 0)
+		if (node->as.nasyncplans > 0)
 			node->as_syncdone = true;
 		return false;
 	}
@@ -637,10 +429,10 @@ choose_next_subplan_for_leader(AppendState *node)
 	ParallelAppendState *pstate = node->as_pstate;
 
 	/* Backward scan is not supported by parallel-aware plans */
-	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+	Assert(ScanDirectionIsForward(node->as.ps.state->es_direction));
 
 	/* We should never be called when there are no subplans */
-	Assert(node->as_nplans > 0);
+	Assert(node->as.nplans > 0);
 
 	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
@@ -652,18 +444,18 @@ choose_next_subplan_for_leader(AppendState *node)
 	else
 	{
 		/* Start with last subplan. */
-		node->as_whichplan = node->as_nplans - 1;
+		node->as_whichplan = node->as.nplans - 1;
 
 		/*
 		 * If we've yet to determine the valid subplans then do so now.  If
 		 * run-time pruning is disabled then the valid subplans will always be
 		 * set to all subplans.
 		 */
-		if (!node->as_valid_subplans_identified)
+		if (!node->as.valid_subplans_identified)
 		{
-			node->as_valid_subplans =
-				ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
-			node->as_valid_subplans_identified = true;
+			node->as.valid_subplans =
+				ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
+			node->as.valid_subplans_identified = true;
 
 			/*
 			 * Mark each invalid plan as finished to allow the loop below to
@@ -719,10 +511,10 @@ choose_next_subplan_for_worker(AppendState *node)
 	ParallelAppendState *pstate = node->as_pstate;
 
 	/* Backward scan is not supported by parallel-aware plans */
-	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+	Assert(ScanDirectionIsForward(node->as.ps.state->es_direction));
 
 	/* We should never be called when there are no subplans */
-	Assert(node->as_nplans > 0);
+	Assert(node->as.nplans > 0);
 
 	LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
@@ -735,11 +527,11 @@ choose_next_subplan_for_worker(AppendState *node)
 	 * run-time pruning is disabled then the valid subplans will always be set
 	 * to all subplans.
 	 */
-	else if (!node->as_valid_subplans_identified)
+	else if (!node->as.valid_subplans_identified)
 	{
-		node->as_valid_subplans =
-			ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
-		node->as_valid_subplans_identified = true;
+		node->as.valid_subplans =
+			ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
+		node->as.valid_subplans_identified = true;
 
 		mark_invalid_subplans_as_finished(node);
 	}
@@ -759,7 +551,7 @@ choose_next_subplan_for_worker(AppendState *node)
 	{
 		int			nextplan;
 
-		nextplan = bms_next_member(node->as_valid_subplans,
+		nextplan = bms_next_member(node->as.valid_subplans,
 								   pstate->pa_next_plan);
 		if (nextplan >= 0)
 		{
@@ -772,7 +564,7 @@ choose_next_subplan_for_worker(AppendState *node)
 			 * Try looping back to the first valid partial plan, if there is
 			 * one.  If there isn't, arrange to bail out below.
 			 */
-			nextplan = bms_next_member(node->as_valid_subplans,
+			nextplan = bms_next_member(node->as.valid_subplans,
 									   node->as_first_partial_plan - 1);
 			pstate->pa_next_plan =
 				nextplan < 0 ? node->as_whichplan : nextplan;
@@ -797,7 +589,7 @@ choose_next_subplan_for_worker(AppendState *node)
 
 	/* Pick the plan we found, and advance pa_next_plan one more time. */
 	node->as_whichplan = pstate->pa_next_plan;
-	pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
+	pstate->pa_next_plan = bms_next_member(node->as.valid_subplans,
 										   pstate->pa_next_plan);
 
 	/*
@@ -806,7 +598,7 @@ choose_next_subplan_for_worker(AppendState *node)
 	 */
 	if (pstate->pa_next_plan < 0)
 	{
-		int			nextplan = bms_next_member(node->as_valid_subplans,
+		int			nextplan = bms_next_member(node->as.valid_subplans,
 											   node->as_first_partial_plan - 1);
 
 		if (nextplan >= 0)
@@ -848,16 +640,16 @@ mark_invalid_subplans_as_finished(AppendState *node)
 	Assert(node->as_pstate);
 
 	/* Shouldn't have been called when run-time pruning is not enabled */
-	Assert(node->as_prune_state);
+	Assert(node->as.prune_state);
 
 	/* Nothing to do if all plans are valid */
-	if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
+	if (bms_num_members(node->as.valid_subplans) == node->as.nplans)
 		return;
 
 	/* Mark all non-valid plans as finished */
-	for (i = 0; i < node->as_nplans; i++)
+	for (i = 0; i < node->as.nplans; i++)
 	{
-		if (!bms_is_member(i, node->as_valid_subplans))
+		if (!bms_is_member(i, node->as.valid_subplans))
 			node->as_pstate->pa_finished[i] = true;
 	}
 }
@@ -876,47 +668,25 @@ mark_invalid_subplans_as_finished(AppendState *node)
 static void
 ExecAppendAsyncBegin(AppendState *node)
 {
-	int			i;
-
-	/* Backward scan is not supported by async-aware Appends. */
-	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
-
-	/* We should never be called when there are no subplans */
-	Assert(node->as_nplans > 0);
-
-	/* We should never be called when there are no async subplans. */
-	Assert(node->as_nasyncplans > 0);
-
 	/* If we've yet to determine the valid subplans then do so now. */
-	if (!node->as_valid_subplans_identified)
+	if (!node->as.valid_subplans_identified)
 	{
-		node->as_valid_subplans =
-			ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
-		node->as_valid_subplans_identified = true;
+		node->as.valid_subplans =
+			ExecFindMatchingSubPlans(node->as.prune_state, false, NULL);
+		node->as.valid_subplans_identified = true;
 
 		classify_matching_subplans(node);
 	}
 
 	/* Initialize state variables. */
-	node->as_syncdone = bms_is_empty(node->as_valid_subplans);
-	node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
+	node->as_syncdone = bms_is_empty(node->as.valid_subplans);
+	node->as_nasyncremain = bms_num_members(node->as.valid_asyncplans);
 
 	/* Nothing to do if there are no valid async subplans. */
 	if (node->as_nasyncremain == 0)
 		return;
 
-	/* Make a request for each of the valid async subplans. */
-	i = -1;
-	while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
-	{
-		AsyncRequest *areq = node->as_asyncrequests[i];
-
-		Assert(areq->request_index == i);
-		Assert(!areq->callback_pending);
-
-		/* Do the actual work. */
-		ExecAsyncRequest(areq);
-	}
+	ExecAppenderAsyncBegin(&node->as);
 }
 
 /* ----------------------------------------------------------------
@@ -961,7 +731,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
 	if (node->as_syncdone)
 	{
 		Assert(node->as_nasyncremain == 0);
-		*result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		*result = ExecClearTuple(node->as.ps.ps_ResultTupleSlot);
 		return true;
 	}
 
@@ -981,7 +751,7 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
 	int			i;
 
 	/* Nothing to do if there are no async subplans needing a new request. */
-	if (bms_is_empty(node->as_needrequest))
+	if (bms_is_empty(node->as.needrequest))
 	{
 		Assert(node->as_nasyncresults == 0);
 		return false;
@@ -994,17 +764,17 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
 	if (node->as_nasyncresults > 0)
 	{
 		--node->as_nasyncresults;
-		*result = node->as_asyncresults[node->as_nasyncresults];
+		*result = node->as.asyncresults[node->as_nasyncresults];
 		return true;
 	}
 
 	/* Make a new request for each of the async subplans that need it. */
-	needrequest = node->as_needrequest;
-	node->as_needrequest = NULL;
+	needrequest = node->as.needrequest;
+	node->as.needrequest = NULL;
 	i = -1;
 	while ((i = bms_next_member(needrequest, i)) >= 0)
 	{
-		AsyncRequest *areq = node->as_asyncrequests[i];
+		AsyncRequest *areq = node->as.asyncrequests[i];
 
 		/* Do the actual work. */
 		ExecAsyncRequest(areq);
@@ -1015,7 +785,7 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
 	if (node->as_nasyncresults > 0)
 	{
 		--node->as_nasyncresults;
-		*result = node->as_asyncresults[node->as_nasyncresults];
+		*result = node->as.asyncresults[node->as_nasyncresults];
 		return true;
 	}
 
@@ -1031,105 +801,12 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
 static void
 ExecAppendAsyncEventWait(AppendState *node)
 {
-	int			nevents = node->as_nasyncplans + 2;
 	long		timeout = node->as_syncdone ? -1 : 0;
-	WaitEvent	occurred_event[EVENT_BUFFER_SIZE];
-	int			noccurred;
-	int			i;
 
 	/* We should never be called when there are no valid async subplans. */
 	Assert(node->as_nasyncremain > 0);
 
-	Assert(node->as_eventset == NULL);
-	node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
-	AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
-					  NULL, NULL);
-
-	/* Give each waiting subplan a chance to add an event. */
-	i = -1;
-	while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
-	{
-		AsyncRequest *areq = node->as_asyncrequests[i];
-
-		if (areq->callback_pending)
-			ExecAsyncConfigureWait(areq);
-	}
-
-	/*
-	 * No need for further processing if none of the subplans configured any
-	 * events.
-	 */
-	if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
-	{
-		FreeWaitEventSet(node->as_eventset);
-		node->as_eventset = NULL;
-		return;
-	}
-
-	/*
-	 * Add the process latch to the set, so that we wake up to process the
-	 * standard interrupts with CHECK_FOR_INTERRUPTS().
-	 *
-	 * NOTE: For historical reasons, it's important that this is added to the
-	 * WaitEventSet after the ExecAsyncConfigureWait() calls.  Namely,
-	 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
-	 * any other events are in the set.  That's a poor design, it's
-	 * questionable for postgres_fdw to be doing that in the first place, but
-	 * we cannot change it now.  The pattern has possibly been copied to other
-	 * extensions too.
-	 */
-	AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
-					  MyLatch, NULL);
-
-	/* Return at most EVENT_BUFFER_SIZE events in one call. */
-	if (nevents > EVENT_BUFFER_SIZE)
-		nevents = EVENT_BUFFER_SIZE;
-
-	/*
-	 * If the timeout is -1, wait until at least one event occurs.  If the
-	 * timeout is 0, poll for events, but do not wait at all.
-	 */
-	noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
-								 nevents, WAIT_EVENT_APPEND_READY);
-	FreeWaitEventSet(node->as_eventset);
-	node->as_eventset = NULL;
-	if (noccurred == 0)
-		return;
-
-	/* Deliver notifications. */
-	for (i = 0; i < noccurred; i++)
-	{
-		WaitEvent  *w = &occurred_event[i];
-
-		/*
-		 * Each waiting subplan should have registered its wait event with
-		 * user_data pointing back to its AsyncRequest.
-		 */
-		if ((w->events & WL_SOCKET_READABLE) != 0)
-		{
-			AsyncRequest *areq = (AsyncRequest *) w->user_data;
-
-			if (areq->callback_pending)
-			{
-				/*
-				 * Mark it as no longer needing a callback.  We must do this
-				 * before dispatching the callback in case the callback resets
-				 * the flag.
-				 */
-				areq->callback_pending = false;
-
-				/* Do the actual work. */
-				ExecAsyncNotify(areq);
-			}
-		}
-
-		/* Handle standard interrupts */
-		if ((w->events & WL_LATCH_SET) != 0)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-	}
+	ExecAppenderAsyncEventWait(&node->as, timeout, WAIT_EVENT_APPEND_READY);
 }
 
 /* ----------------------------------------------------------------
@@ -1165,14 +842,14 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
 	}
 
 	/* Save result so we can return it. */
-	Assert(node->as_nasyncresults < node->as_nasyncplans);
-	node->as_asyncresults[node->as_nasyncresults++] = slot;
+	Assert(node->as_nasyncresults < node->as.nasyncplans);
+	node->as.asyncresults[node->as_nasyncresults++] = slot;
 
 	/*
 	 * Mark the subplan that returned a result as ready for a new request.  We
 	 * don't launch another one here immediately because it might complete.
 	 */
-	node->as_needrequest = bms_add_member(node->as_needrequest,
+	node->as.needrequest = bms_add_member(node->as.needrequest,
 										  areq->request_index);
 }
 
@@ -1187,10 +864,10 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
 static void
 classify_matching_subplans(AppendState *node)
 {
-	Assert(node->as_valid_subplans_identified);
+	Assert(node->as.valid_subplans_identified);
 
 	/* Nothing to do if there are no valid subplans. */
-	if (bms_is_empty(node->as_valid_subplans))
+	if (bms_is_empty(node->as.valid_subplans))
 	{
 		node->as_syncdone = true;
 		node->as_nasyncremain = 0;
@@ -1199,8 +876,8 @@ classify_matching_subplans(AppendState *node)
 
 	/* No valid async subplans identified. */
 	if (!classify_matching_subplans_common(
-										   &node->as_valid_subplans,
-										   node->as_asyncplans,
-										   &node->as_valid_asyncplans))
+										   &node->as.valid_subplans,
+										   node->as.asyncplans,
+										   &node->as.valid_asyncplans))
 		node->as_nasyncremain = 0;
 }
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index f1c267eb9eb..e1a207aeb85 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -38,6 +38,7 @@
 
 #include "postgres.h"
 
+#include "executor/execAppend.h"
 #include "executor/executor.h"
 #include "executor/execAsync.h"
 #include "executor/execPartition.h"
@@ -76,14 +77,7 @@ MergeAppendState *
 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 {
 	MergeAppendState *mergestate = makeNode(MergeAppendState);
-	PlanState **mergeplanstates;
-	const TupleTableSlotOps *mergeops;
-	Bitmapset  *validsubplans;
-	int			nplans;
-	int			i,
-				j;
-	Bitmapset  *asyncplans;
-	int			nasyncplans;
+	int			i;
 
 	/* check for unsupported flags */
 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -91,154 +85,27 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	/*
 	 * create new MergeAppendState for our node
 	 */
-	mergestate->ps.plan = (Plan *) node;
-	mergestate->ps.state = estate;
-	mergestate->ps.ExecProcNode = ExecMergeAppend;
-
-	/* If run-time partition pruning is enabled, then set that up now */
-	if (node->part_prune_index >= 0)
-	{
-		PartitionPruneState *prunestate;
-
-		/*
-		 * Set up pruning data structure.  This also initializes the set of
-		 * subplans to initialize (validsubplans) by taking into account the
-		 * result of performing initial pruning if any.
-		 */
-		prunestate = ExecInitPartitionExecPruning(&mergestate->ps,
-												  list_length(node->mergeplans),
-												  node->part_prune_index,
-												  node->apprelids,
-												  &validsubplans);
-		mergestate->ms_prune_state = prunestate;
-		nplans = bms_num_members(validsubplans);
-
-		/*
-		 * When no run-time pruning is required and there's at least one
-		 * subplan, we can fill ms_valid_subplans immediately, preventing
-		 * later calls to ExecFindMatchingSubPlans.
-		 */
-		if (!prunestate->do_exec_prune && nplans > 0)
-		{
-			mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
-			mergestate->ms_valid_subplans_identified = true;
-		}
-	}
-	else
-	{
-		nplans = list_length(node->mergeplans);
-
-		/*
-		 * When run-time partition pruning is not enabled we can just mark all
-		 * subplans as valid; they must also all be initialized.
-		 */
-		Assert(nplans > 0);
-		mergestate->ms_valid_subplans = validsubplans =
-			bms_add_range(NULL, 0, nplans - 1);
-		mergestate->ms_valid_subplans_identified = true;
-		mergestate->ms_prune_state = NULL;
-	}
-
-	mergeplanstates = palloc_array(PlanState *, nplans);
-	mergestate->mergeplans = mergeplanstates;
-	mergestate->ms_nplans = nplans;
-
-	mergestate->ms_slots = palloc0_array(TupleTableSlot *, nplans);
-	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
+	mergestate->ms.ps.plan = (Plan *) node;
+	mergestate->ms.ps.state = estate;
+	mergestate->ms.ps.ExecProcNode = ExecMergeAppend;
+
+	/* Initialize common fields */
+	ExecInitAppender(&mergestate->ms,
+					 &node->ap,
+					 estate,
+					 eflags,
+					 -1,
+					 NULL);
+
+	if (mergestate->ms.nasyncplans > 0 && mergestate->ms.valid_subplans_identified)
+		classify_matching_subplans(mergestate);
+
+	mergestate->ms_slots = palloc0_array(TupleTableSlot *, mergestate->ms.nplans);
+	mergestate->ms_heap = binaryheap_allocate(mergestate->ms.nplans, heap_compare_slots,
 											  mergestate);
 
-	/*
-	 * call ExecInitNode on each of the valid plans to be executed and save
-	 * the results into the mergeplanstates array.
-	 */
-	j = 0;
-	asyncplans = NULL;
-	nasyncplans = 0;
-
-	i = -1;
-	while ((i = bms_next_member(validsubplans, i)) >= 0)
-	{
-		Plan	   *initNode = (Plan *) list_nth(node->mergeplans, i);
-
-		/*
-		 * Record async subplans.  When executing EvalPlanQual, we treat them
-		 * as sync ones; don't do this when initializing an EvalPlanQual plan
-		 * tree.
-		 */
-		if (initNode->async_capable && estate->es_epq_active == NULL)
-		{
-			asyncplans = bms_add_member(asyncplans, j);
-			nasyncplans++;
-		}
-
-		mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
-	}
-
-	/*
-	 * Initialize MergeAppend's result tuple type and slot.  If the child
-	 * plans all produce the same fixed slot type, we can use that slot type;
-	 * otherwise make a virtual slot.  (Note that the result slot itself is
-	 * used only to return a null tuple at end of execution; real tuples are
-	 * returned to the caller in the children's own result slots.  What we are
-	 * doing here is allowing the parent plan node to optimize if the
-	 * MergeAppend will return only one kind of slot.)
-	 */
-	mergeops = ExecGetCommonSlotOps(mergeplanstates, j);
-	if (mergeops != NULL)
-	{
-		ExecInitResultTupleSlotTL(&mergestate->ps, mergeops);
-	}
-	else
-	{
-		ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
-		/* show that the output slot type is not fixed */
-		mergestate->ps.resultopsset = true;
-		mergestate->ps.resultopsfixed = false;
-	}
-
-	/*
-	 * Miscellaneous initialization
-	 */
-	mergestate->ps.ps_ProjInfo = NULL;
-
-	/* Initialize async state */
-	mergestate->ms_asyncplans = asyncplans;
-	mergestate->ms_nasyncplans = nasyncplans;
-	mergestate->ms_asyncrequests = NULL;
-	mergestate->ms_asyncresults = NULL;
 	mergestate->ms_has_asyncresults = NULL;
 	mergestate->ms_asyncremain = NULL;
-	mergestate->ms_needrequest = NULL;
-	mergestate->ms_eventset = NULL;
-	mergestate->ms_valid_asyncplans = NULL;
-
-	if (nasyncplans > 0)
-	{
-		mergestate->ms_asyncrequests = (AsyncRequest **)
-			palloc0(nplans * sizeof(AsyncRequest *));
-
-		i = -1;
-		while ((i = bms_next_member(asyncplans, i)) >= 0)
-		{
-			AsyncRequest *areq;
-
-			areq = palloc(sizeof(AsyncRequest));
-			areq->requestor = (PlanState *) mergestate;
-			areq->requestee = mergeplanstates[i];
-			areq->request_index = i;
-			areq->callback_pending = false;
-			areq->request_complete = false;
-			areq->result = NULL;
-
-			mergestate->ms_asyncrequests[i] = areq;
-		}
-
-		mergestate->ms_asyncresults = (TupleTableSlot **)
-			palloc0(nplans * sizeof(TupleTableSlot *));
-
-		if (mergestate->ms_valid_subplans_identified)
-			classify_matching_subplans(mergestate);
-	}
 
 	/*
 	 * initialize sort-key information
@@ -293,20 +160,20 @@ ExecMergeAppend(PlanState *pstate)
 	if (!node->ms_initialized)
 	{
 		/* Nothing to do if all subplans were pruned */
-		if (node->ms_nplans == 0)
-			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		if (node->ms.nplans == 0)
+			return ExecClearTuple(node->ms.ps.ps_ResultTupleSlot);
 
 		/* If we've yet to determine the valid subplans then do so now. */
-		if (!node->ms_valid_subplans_identified)
+		if (!node->ms.valid_subplans_identified)
 		{
-			node->ms_valid_subplans =
-				ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
-			node->ms_valid_subplans_identified = true;
+			node->ms.valid_subplans =
+				ExecFindMatchingSubPlans(node->ms.prune_state, false, NULL);
+			node->ms.valid_subplans_identified = true;
 			classify_matching_subplans(node);
 		}
 
 		/* If there are any async subplans, begin executing them. */
-		if (node->ms_nasyncplans > 0)
+		if (node->ms.nasyncplans > 0)
 			ExecMergeAppendAsyncBegin(node);
 
 		/*
@@ -314,16 +181,16 @@ ExecMergeAppend(PlanState *pstate)
 		 * and set up the heap.
 		 */
 		i = -1;
-		while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
+		while ((i = bms_next_member(node->ms.valid_subplans, i)) >= 0)
 		{
-			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+			node->ms_slots[i] = ExecProcNode(node->ms.plans[i]);
 			if (!TupIsNull(node->ms_slots[i]))
 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
 		}
 
 		/* Look at valid async subplans */
 		i = -1;
-		while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
+		while ((i = bms_next_member(node->ms.valid_asyncplans, i)) >= 0)
 		{
 			ExecMergeAppendAsyncGetNext(node, i);
 			if (!TupIsNull(node->ms_slots[i]))
@@ -344,12 +211,12 @@ ExecMergeAppend(PlanState *pstate)
 		 * to not pull tuples until necessary.)
 		 */
 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
-		if (bms_is_member(i, node->ms_asyncplans))
+		if (bms_is_member(i, node->ms.asyncplans))
 			ExecMergeAppendAsyncGetNext(node, i);
 		else
 		{
-			Assert(bms_is_member(i, node->ms_valid_subplans));
-			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+			Assert(bms_is_member(i, node->ms.valid_subplans));
+			node->ms_slots[i] = ExecProcNode(node->ms.plans[i]);
 		}
 		if (!TupIsNull(node->ms_slots[i]))
 			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
@@ -360,7 +227,7 @@ ExecMergeAppend(PlanState *pstate)
 	if (binaryheap_empty(node->ms_heap))
 	{
 		/* All the subplans are exhausted, and so is the heap */
-		result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		result = ExecClearTuple(node->ms.ps.ps_ResultTupleSlot);
 	}
 	else
 	{
@@ -426,81 +293,21 @@ heap_compare_slots(Datum a, Datum b, void *arg)
 void
 ExecEndMergeAppend(MergeAppendState *node)
 {
-	PlanState **mergeplans;
-	int			nplans;
-	int			i;
-
-	/*
-	 * get information from the node
-	 */
-	mergeplans = node->mergeplans;
-	nplans = node->ms_nplans;
-
-	/*
-	 * shut down each of the subscans
-	 */
-	for (i = 0; i < nplans; i++)
-		ExecEndNode(mergeplans[i]);
+	ExecEndAppender(&node->ms);
 }
 
 void
 ExecReScanMergeAppend(MergeAppendState *node)
 {
-	int			i;
-	int			nasyncplans = node->ms_nasyncplans;
+	int			nasyncplans = node->ms.nasyncplans;
 
-	/*
-	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
-	 * we'd better unset the valid subplans so that they are reselected for
-	 * the new parameter values.
-	 */
-	if (node->ms_prune_state &&
-		bms_overlap(node->ps.chgParam,
-					node->ms_prune_state->execparamids))
-	{
-		node->ms_valid_subplans_identified = false;
-		bms_free(node->ms_valid_subplans);
-		node->ms_valid_subplans = NULL;
-		bms_free(node->ms_valid_asyncplans);
-		node->ms_valid_asyncplans = NULL;
-	}
-
-	for (i = 0; i < node->ms_nplans; i++)
-	{
-		PlanState  *subnode = node->mergeplans[i];
-
-		/*
-		 * ExecReScan doesn't know about my subplans, so I have to do
-		 * changed-parameter signaling myself.
-		 */
-		if (node->ps.chgParam != NULL)
-			UpdateChangedParamSet(subnode, node->ps.chgParam);
-
-		/*
-		 * If chgParam of subnode is not null then plan will be re-scanned by
-		 * first ExecProcNode.
-		 */
-		if (subnode->chgParam == NULL)
-			ExecReScan(subnode);
-	}
+	ExecReScanAppender(&node->ms);
 
-	/* Reset async state */
+	/* Reset specific merge append async state */
 	if (nasyncplans > 0)
 	{
-		i = -1;
-		while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
-		{
-			AsyncRequest *areq = node->ms_asyncrequests[i];
-
-			areq->callback_pending = false;
-			areq->request_complete = false;
-			areq->result = NULL;
-		}
-
 		bms_free(node->ms_asyncremain);
 		node->ms_asyncremain = NULL;
-		bms_free(node->ms_needrequest);
-		node->ms_needrequest = NULL;
 		bms_free(node->ms_has_asyncresults);
 		node->ms_has_asyncresults = NULL;
 	}
@@ -519,10 +326,10 @@ ExecReScanMergeAppend(MergeAppendState *node)
 static void
 classify_matching_subplans(MergeAppendState *node)
 {
-	Assert(node->ms_valid_subplans_identified);
+	Assert(node->ms.valid_subplans_identified);
 
 	/* Nothing to do if there are no valid subplans. */
-	if (bms_is_empty(node->ms_valid_subplans))
+	if (bms_is_empty(node->ms.valid_subplans))
 	{
 		node->ms_asyncremain = NULL;
 		return;
@@ -530,9 +337,9 @@ classify_matching_subplans(MergeAppendState *node)
 
 	/* No valid async subplans identified. */
 	if (!classify_matching_subplans_common(
-										   &node->ms_valid_subplans,
-										   node->ms_asyncplans,
-										   &node->ms_valid_asyncplans))
+										   &node->ms.valid_subplans,
+										   node->ms.asyncplans,
+										   &node->ms.valid_asyncplans))
 		node->ms_asyncremain = NULL;
 }
 
@@ -545,39 +352,17 @@ classify_matching_subplans(MergeAppendState *node)
 static void
 ExecMergeAppendAsyncBegin(MergeAppendState *node)
 {
-	int			i;
-
-	/* Backward scan is not supported by async-aware MergeAppends. */
-	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
-
-	/* We should never be called when there are no subplans */
-	Assert(node->ms_nplans > 0);
-
-	/* We should never be called when there are no async subplans. */
-	Assert(node->ms_nasyncplans > 0);
-
 	/* ExecMergeAppend() identifies valid subplans */
-	Assert(node->ms_valid_subplans_identified);
+	Assert(node->ms.valid_subplans_identified);
 
 	/* Initialize state variables. */
-	node->ms_asyncremain = bms_copy(node->ms_valid_asyncplans);
+	node->ms_asyncremain = bms_copy(node->ms.valid_asyncplans);
 
 	/* Nothing to do if there are no valid async subplans. */
 	if (bms_is_empty(node->ms_asyncremain))
 		return;
 
-	/* Make a request for each of the valid async subplans. */
-	i = -1;
-	while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
-	{
-		AsyncRequest *areq = node->ms_asyncrequests[i];
-
-		Assert(areq->request_index == i);
-		Assert(!areq->callback_pending);
-
-		/* Do the actual work. */
-		ExecAsyncRequest(areq);
-	}
+	ExecAppenderAsyncBegin(&node->ms);
 }
 
 /* ----------------------------------------------------------------
@@ -638,7 +423,7 @@ ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan)
 	 */
 	if (bms_is_member(mplan, node->ms_has_asyncresults))
 	{
-		node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+		node->ms_slots[mplan] = node->ms.asyncresults[mplan];
 		return true;
 	}
 
@@ -648,7 +433,7 @@ ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan)
 	 */
 	needrequest = NULL;
 	i = -1;
-	while ((i = bms_next_member(node->ms_needrequest, i)) >= 0)
+	while ((i = bms_next_member(node->ms.needrequest, i)) >= 0)
 	{
 		if (!bms_is_member(i, node->ms_has_asyncresults))
 			needrequest = bms_add_member(needrequest, i);
@@ -661,13 +446,13 @@ ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan)
 		return false;
 
 	/* Clear ms_needrequest flag, as we are going to send requests now */
-	node->ms_needrequest = bms_del_members(node->ms_needrequest, needrequest);
+	node->ms.needrequest = bms_del_members(node->ms.needrequest, needrequest);
 
 	/* Make a new request for each of the async subplans that need it. */
 	i = -1;
 	while ((i = bms_next_member(needrequest, i)) >= 0)
 	{
-		AsyncRequest *areq = node->ms_asyncrequests[i];
+		AsyncRequest *areq = node->ms.asyncrequests[i];
 
 		/*
 		 * We've just checked that subplan doesn't already have some fetched
@@ -683,7 +468,7 @@ ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan)
 	/* Return needed asynchronously-generated results if any. */
 	if (bms_is_member(mplan, node->ms_has_asyncresults))
 	{
-		node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+		node->ms_slots[mplan] = node->ms.asyncresults[mplan];
 		return true;
 	}
 
@@ -707,7 +492,7 @@ ExecAsyncMergeAppendResponse(AsyncRequest *areq)
 	/* We should handle previous async result prior to getting new one */
 	Assert(!bms_is_member(areq->request_index, node->ms_has_asyncresults));
 
-	node->ms_asyncresults[areq->request_index] = NULL;
+	node->ms.asyncresults[areq->request_index] = NULL;
 	/* Nothing to do if the request is pending. */
 	if (!areq->request_complete)
 	{
@@ -730,13 +515,13 @@ ExecAsyncMergeAppendResponse(AsyncRequest *areq)
 	node->ms_has_asyncresults = bms_add_member(node->ms_has_asyncresults,
 											   areq->request_index);
 	/* Save result so we can return it. */
-	node->ms_asyncresults[areq->request_index] = slot;
+	node->ms.asyncresults[areq->request_index] = slot;
 
 	/*
 	 * Mark the subplan that returned a result as ready for a new request.  We
 	 * don't launch another one here immediately because it might complete.
 	 */
-	node->ms_needrequest = bms_add_member(node->ms_needrequest,
+	node->ms.needrequest = bms_add_member(node->ms.needrequest,
 										  areq->request_index);
 }
 
@@ -749,101 +534,8 @@ ExecAsyncMergeAppendResponse(AsyncRequest *areq)
 static void
 ExecMergeAppendAsyncEventWait(MergeAppendState *node)
 {
-	int			nevents = node->ms_nasyncplans + 2; /* one for PM death and
-													 * one for latch */
-	WaitEvent	occurred_event[EVENT_BUFFER_SIZE];
-	int			noccurred;
-	int			i;
-
 	/* We should never be called when there are no valid async subplans. */
 	Assert(bms_num_members(node->ms_asyncremain) > 0);
 
-	node->ms_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
-	AddWaitEventToSet(node->ms_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
-					  NULL, NULL);
-
-	/* Give each waiting subplan a chance to add an event. */
-	i = -1;
-	while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
-	{
-		AsyncRequest *areq = node->ms_asyncrequests[i];
-
-		if (areq->callback_pending)
-			ExecAsyncConfigureWait(areq);
-	}
-
-	/*
-	 * No need for further processing if none of the subplans configured any
-	 * events.
-	 */
-	if (GetNumRegisteredWaitEvents(node->ms_eventset) == 1)
-	{
-		FreeWaitEventSet(node->ms_eventset);
-		node->ms_eventset = NULL;
-		return;
-	}
-
-	/*
-	 * Add the process latch to the set, so that we wake up to process the
-	 * standard interrupts with CHECK_FOR_INTERRUPTS().
-	 *
-	 * NOTE: For historical reasons, it's important that this is added to the
-	 * WaitEventSet after the ExecAsyncConfigureWait() calls.  Namely,
-	 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
-	 * any other events are in the set.  That's a poor design, it's
-	 * questionable for postgres_fdw to be doing that in the first place, but
-	 * we cannot change it now.  The pattern has possibly been copied to other
-	 * extensions too.
-	 */
-	AddWaitEventToSet(node->ms_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
-					  MyLatch, NULL);
-
-	/* Return at most EVENT_BUFFER_SIZE events in one call. */
-	if (nevents > EVENT_BUFFER_SIZE)
-		nevents = EVENT_BUFFER_SIZE;
-
-	/*
-	 * Wait until at least one event occurs.
-	 */
-	noccurred = WaitEventSetWait(node->ms_eventset, -1 /* no timeout */ , occurred_event,
-								 nevents, WAIT_EVENT_APPEND_READY);
-	FreeWaitEventSet(node->ms_eventset);
-	node->ms_eventset = NULL;
-	if (noccurred == 0)
-		return;
-
-	/* Deliver notifications. */
-	for (i = 0; i < noccurred; i++)
-	{
-		WaitEvent  *w = &occurred_event[i];
-
-		/*
-		 * Each waiting subplan should have registered its wait event with
-		 * user_data pointing back to its AsyncRequest.
-		 */
-		if ((w->events & WL_SOCKET_READABLE) != 0)
-		{
-			AsyncRequest *areq = (AsyncRequest *) w->user_data;
-
-			if (areq->callback_pending)
-			{
-				/*
-				 * Mark it as no longer needing a callback.  We must do this
-				 * before dispatching the callback in case the callback resets
-				 * the flag.
-				 */
-				areq->callback_pending = false;
-
-				/* Do the actual work. */
-				ExecAsyncNotify(areq);
-			}
-		}
-
-		/* Handle standard interrupts */
-		if ((w->events & WL_LATCH_SET) != 0)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-	}
+	ExecAppenderAsyncEventWait(&node->ms, -1 /* no timeout */ , WAIT_EVENT_APPEND_READY);
 }
diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c
index 024a2b2fd84..2f4e2ae6d39 100644
--- a/src/backend/nodes/nodeFuncs.c
+++ b/src/backend/nodes/nodeFuncs.c
@@ -4751,14 +4751,14 @@ planstate_tree_walker_impl(PlanState *planstate,
 	switch (nodeTag(plan))
 	{
 		case T_Append:
-			if (planstate_walk_members(((AppendState *) planstate)->appendplans,
-									   ((AppendState *) planstate)->as_nplans,
+			if (planstate_walk_members(((AppendState *) planstate)->as.plans,
+									   ((AppendState *) planstate)->as.nplans,
 									   walker, context))
 				return true;
 			break;
 		case T_MergeAppend:
-			if (planstate_walk_members(((MergeAppendState *) planstate)->mergeplans,
-									   ((MergeAppendState *) planstate)->ms_nplans,
+			if (planstate_walk_members(((MergeAppendState *) planstate)->ms.plans,
+									   ((MergeAppendState *) planstate)->ms.nplans,
 									   walker, context))
 				return true;
 			break;
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 24325d42f0d..bb84040e8f9 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1262,11 +1262,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	 * child plans, to make cross-checking the sort info easier.
 	 */
 	plan = makeNode(Append);
-	plan->plan.targetlist = tlist;
-	plan->plan.qual = NIL;
-	plan->plan.lefttree = NULL;
-	plan->plan.righttree = NULL;
-	plan->apprelids = rel->relids;
+	plan->ap.plan.targetlist = tlist;
+	plan->ap.plan.qual = NIL;
+	plan->ap.plan.lefttree = NULL;
+	plan->ap.plan.righttree = NULL;
+	plan->ap.apprelids = rel->relids;
 
 	if (pathkeys != NIL)
 	{
@@ -1285,7 +1285,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 										  &nodeSortOperators,
 										  &nodeCollations,
 										  &nodeNullsFirst);
-		tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
+		tlist_was_changed = (orig_tlist_length != list_length(plan->ap.plan.targetlist));
 	}
 
 	/* If appropriate, consider async append */
@@ -1395,7 +1395,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	}
 
 	/* Set below if we find quals that we can use to run-time prune */
-	plan->part_prune_index = -1;
+	plan->ap.part_prune_index = -1;
 
 	/*
 	 * If any quals exist, they may be useful to perform further partition
@@ -1420,16 +1420,16 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 		}
 
 		if (prunequal != NIL)
-			plan->part_prune_index = make_partition_pruneinfo(root, rel,
+			plan->ap.part_prune_index = make_partition_pruneinfo(root, rel,
 															  best_path->subpaths,
 															  prunequal);
 	}
 
-	plan->appendplans = subplans;
+	plan->ap.subplans = subplans;
 	plan->nasyncplans = nasyncplans;
 	plan->first_partial_plan = best_path->first_partial_path;
 
-	copy_generic_path_info(&plan->plan, (Path *) best_path);
+	copy_generic_path_info(&plan->ap.plan, (Path *) best_path);
 
 	/*
 	 * If prepare_sort_from_pathkeys added sort columns, but we were told to
@@ -1438,9 +1438,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
 	 */
 	if (tlist_was_changed && (flags & (CP_EXACT_TLIST | CP_SMALL_TLIST)))
 	{
-		tlist = list_copy_head(plan->plan.targetlist, orig_tlist_length);
+		tlist = list_copy_head(plan->ap.plan.targetlist, orig_tlist_length);
 		return inject_projection_plan((Plan *) plan, tlist,
-									  plan->plan.parallel_safe);
+									  plan->ap.plan.parallel_safe);
 	}
 	else
 		return (Plan *) plan;
@@ -1458,7 +1458,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 						 int flags)
 {
 	MergeAppend *node = makeNode(MergeAppend);
-	Plan	   *plan = &node->plan;
+	Plan	   *plan = &node->ap.plan;
 	List	   *tlist = build_path_tlist(root, &best_path->path);
 	int			orig_tlist_length = list_length(tlist);
 	bool		tlist_was_changed;
@@ -1479,7 +1479,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 	plan->qual = NIL;
 	plan->lefttree = NULL;
 	plan->righttree = NULL;
-	node->apprelids = rel->relids;
+	node->ap.apprelids = rel->relids;
 
 	consider_async = (enable_async_merge_append &&
 					  !best_path->path.parallel_safe &&
@@ -1593,7 +1593,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 	}
 
 	/* Set below if we find quals that we can use to run-time prune */
-	node->part_prune_index = -1;
+	node->ap.part_prune_index = -1;
 
 	/*
 	 * If any quals exist, they may be useful to perform further partition
@@ -1610,12 +1610,12 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 		Assert(best_path->path.param_info == NULL);
 
 		if (prunequal != NIL)
-			node->part_prune_index = make_partition_pruneinfo(root, rel,
+			node->ap.part_prune_index = make_partition_pruneinfo(root, rel,
 															  best_path->subpaths,
 															  prunequal);
 	}
 
-	node->mergeplans = subplans;
+	node->ap.subplans = subplans;
 
 	/*
 	 * If prepare_sort_from_pathkeys added sort columns, but we were told to
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index cd7ea1e6b58..a595f34c87b 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -1850,10 +1850,10 @@ set_append_references(PlannerInfo *root,
 	 * check quals.  If it's got exactly one child plan, then it's not doing
 	 * anything useful at all, and we can strip it out.
 	 */
-	Assert(aplan->plan.qual == NIL);
+	Assert(aplan->ap.plan.qual == NIL);
 
 	/* First, we gotta recurse on the children */
-	foreach(l, aplan->appendplans)
+	foreach(l, aplan->ap.subplans)
 	{
 		lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
 	}
@@ -1866,11 +1866,11 @@ set_append_references(PlannerInfo *root,
 	 * plan may execute the non-parallel aware child multiple times.  (If you
 	 * change these rules, update create_append_path to match.)
 	 */
-	if (list_length(aplan->appendplans) == 1)
+	if (list_length(aplan->ap.subplans) == 1)
 	{
-		Plan	   *p = (Plan *) linitial(aplan->appendplans);
+		Plan	   *p = (Plan *) linitial(aplan->ap.subplans);
 
-		if (p->parallel_aware == aplan->plan.parallel_aware)
+		if (p->parallel_aware == aplan->ap.plan.parallel_aware)
 			return clean_up_removed_plan_level((Plan *) aplan, p);
 	}
 
@@ -1881,19 +1881,19 @@ set_append_references(PlannerInfo *root,
 	 */
 	set_dummy_tlist_references((Plan *) aplan, rtoffset);
 
-	aplan->apprelids = offset_relid_set(aplan->apprelids, rtoffset);
+	aplan->ap.apprelids = offset_relid_set(aplan->ap.apprelids, rtoffset);
 
 	/*
 	 * Add PartitionPruneInfo, if any, to PlannerGlobal and update the index.
 	 * Also update the RT indexes present in it to add the offset.
 	 */
-	if (aplan->part_prune_index >= 0)
-		aplan->part_prune_index =
-			register_partpruneinfo(root, aplan->part_prune_index, rtoffset);
+	if (aplan->ap.part_prune_index >= 0)
+		aplan->ap.part_prune_index =
+			register_partpruneinfo(root, aplan->ap.part_prune_index, rtoffset);
 
 	/* We don't need to recurse to lefttree or righttree ... */
-	Assert(aplan->plan.lefttree == NULL);
-	Assert(aplan->plan.righttree == NULL);
+	Assert(aplan->ap.plan.lefttree == NULL);
+	Assert(aplan->ap.plan.righttree == NULL);
 
 	return (Plan *) aplan;
 }
@@ -1917,10 +1917,10 @@ set_mergeappend_references(PlannerInfo *root,
 	 * or check quals.  If it's got exactly one child plan, then it's not
 	 * doing anything useful at all, and we can strip it out.
 	 */
-	Assert(mplan->plan.qual == NIL);
+	Assert(mplan->ap.plan.qual == NIL);
 
 	/* First, we gotta recurse on the children */
-	foreach(l, mplan->mergeplans)
+	foreach(l, mplan->ap.subplans)
 	{
 		lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
 	}
@@ -1934,11 +1934,11 @@ set_mergeappend_references(PlannerInfo *root,
 	 * multiple times.  (If you change these rules, update
 	 * create_merge_append_path to match.)
 	 */
-	if (list_length(mplan->mergeplans) == 1)
+	if (list_length(mplan->ap.subplans) == 1)
 	{
-		Plan	   *p = (Plan *) linitial(mplan->mergeplans);
+		Plan	   *p = (Plan *) linitial(mplan->ap.subplans);
 
-		if (p->parallel_aware == mplan->plan.parallel_aware)
+		if (p->parallel_aware == mplan->ap.plan.parallel_aware)
 			return clean_up_removed_plan_level((Plan *) mplan, p);
 	}
 
@@ -1949,19 +1949,19 @@ set_mergeappend_references(PlannerInfo *root,
 	 */
 	set_dummy_tlist_references((Plan *) mplan, rtoffset);
 
-	mplan->apprelids = offset_relid_set(mplan->apprelids, rtoffset);
+	mplan->ap.apprelids = offset_relid_set(mplan->ap.apprelids, rtoffset);
 
 	/*
 	 * Add PartitionPruneInfo, if any, to PlannerGlobal and update the index.
 	 * Also update the RT indexes present in it to add the offset.
 	 */
-	if (mplan->part_prune_index >= 0)
-		mplan->part_prune_index =
-			register_partpruneinfo(root, mplan->part_prune_index, rtoffset);
+	if (mplan->ap.part_prune_index >= 0)
+		mplan->ap.part_prune_index =
+			register_partpruneinfo(root, mplan->ap.part_prune_index, rtoffset);
 
 	/* We don't need to recurse to lefttree or righttree ... */
-	Assert(mplan->plan.lefttree == NULL);
-	Assert(mplan->plan.righttree == NULL);
+	Assert(mplan->ap.plan.lefttree == NULL);
+	Assert(mplan->ap.plan.righttree == NULL);
 
 	return (Plan *) mplan;
 }
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index ff63d20f8d5..eb616c977bc 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -2759,7 +2759,7 @@ finalize_plan(PlannerInfo *root, Plan *plan,
 
 		case T_Append:
 			{
-				foreach(l, ((Append *) plan)->appendplans)
+				foreach(l, ((Append *) plan)->ap.subplans)
 				{
 					context.paramids =
 						bms_add_members(context.paramids,
@@ -2774,7 +2774,7 @@ finalize_plan(PlannerInfo *root, Plan *plan,
 
 		case T_MergeAppend:
 			{
-				foreach(l, ((MergeAppend *) plan)->mergeplans)
+				foreach(l, ((MergeAppend *) plan)->ap.subplans)
 				{
 					context.paramids =
 						bms_add_members(context.paramids,
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 9f85eb86da1..ce57f80e5e3 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -5163,9 +5163,9 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan)
 	 * natural choice.
 	 */
 	if (IsA(plan, Append))
-		dpns->outer_plan = linitial(((Append *) plan)->appendplans);
+		dpns->outer_plan = linitial(((Append *) plan)->ap.subplans);
 	else if (IsA(plan, MergeAppend))
-		dpns->outer_plan = linitial(((MergeAppend *) plan)->mergeplans);
+		dpns->outer_plan = linitial(((MergeAppend *) plan)->ap.subplans);
 	else
 		dpns->outer_plan = outerPlan(plan);
 
@@ -7955,10 +7955,10 @@ resolve_special_varno(Node *node, deparse_context *context,
 
 		if (IsA(dpns->plan, Append))
 			context->appendparents = bms_union(context->appendparents,
-											   ((Append *) dpns->plan)->apprelids);
+											   ((Append *) dpns->plan)->ap.apprelids);
 		else if (IsA(dpns->plan, MergeAppend))
 			context->appendparents = bms_union(context->appendparents,
-											   ((MergeAppend *) dpns->plan)->apprelids);
+											   ((MergeAppend *) dpns->plan)->ap.apprelids);
 
 		push_child_plan(dpns, dpns->outer_plan, &save_dpns);
 		resolve_special_varno((Node *) tle->expr, context,
diff --git a/src/include/executor/execAppend.h b/src/include/executor/execAppend.h
new file mode 100644
index 00000000000..c1030dc5282
--- /dev/null
+++ b/src/include/executor/execAppend.h
@@ -0,0 +1,33 @@
+/*-------------------------------------------------------------------------
+ * execAppend.h
+ *		Support functions for MergeAppend and Append nodes.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/include/executor/execAppend.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECAPPEND_H
+#define EXECAPPEND_H
+
+#include "nodes/execnodes.h"
+
+void		ExecInitAppender(AppenderState * state,
+							 Appender * node,
+							 EState *estate,
+							 int eflags,
+							 int first_partial_plan,
+							 int *first_valid_partial_plan);
+
+void		ExecEndAppender(AppenderState * node);
+
+void		ExecReScanAppender(AppenderState * node);
+
+void		ExecAppenderAsyncBegin(AppenderState * node);
+
+void		ExecAppenderAsyncEventWait(AppenderState * node, int timeout, uint32 wait_event_info);
+
+#endif							/* EXECAPPEND_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 5887cbf4f16..69123a31bbd 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1472,6 +1472,27 @@ typedef struct ModifyTableState
 	List	   *mt_mergeJoinConditions;
 } ModifyTableState;
 
+typedef struct AppenderState
+{
+	PlanState	ps;				/* its first field is NodeTag */
+	PlanState **plans;			/* array of PlanStates for my inputs */
+	int			nplans;
+
+	/* Asynchronous execution state */
+	Bitmapset  *asyncplans;		/* asynchronous plans indexes */
+	int			nasyncplans;	/* # of asynchronous plans */
+	AsyncRequest **asyncrequests;	/* array of AsyncRequests */
+	TupleTableSlot **asyncresults;	/* unreturned results of async plans */
+	Bitmapset  *needrequest;	/* asynchronous plans needing a new request */
+	struct WaitEventSet *eventset;	/* WaitEventSet for file descriptor waits */
+
+	/* Partition pruning state */
+	struct PartitionPruneState *prune_state;
+	bool		valid_subplans_identified;
+	Bitmapset  *valid_subplans;
+	Bitmapset  *valid_asyncplans;	/* valid asynchronous plans indexes */
+}			AppenderState;
+
 /* ----------------
  *	 AppendState information
  *
@@ -1493,31 +1514,20 @@ struct PartitionPruneState;
 
 struct AppendState
 {
-	PlanState	ps;				/* its first field is NodeTag */
-	PlanState **appendplans;	/* array of PlanStates for my inputs */
-	int			as_nplans;
+	AppenderState as;
+
 	int			as_whichplan;
 	bool		as_begun;		/* false means need to initialize */
-	Bitmapset  *as_asyncplans;	/* asynchronous plans indexes */
-	int			as_nasyncplans; /* # of asynchronous plans */
-	AsyncRequest **as_asyncrequests;	/* array of AsyncRequests */
-	TupleTableSlot **as_asyncresults;	/* unreturned results of async plans */
-	int			as_nasyncresults;	/* # of valid entries in as_asyncresults */
-	bool		as_syncdone;	/* true if all synchronous plans done in
-								 * asynchronous mode, else false */
+	int			as_nasyncresults;	/* # of valid entries in asyncresults */
+	bool		as_syncdone;	/* all sync plans done in async mode? */
 	int			as_nasyncremain;	/* # of remaining asynchronous plans */
-	Bitmapset  *as_needrequest; /* asynchronous plans needing a new request */
-	struct WaitEventSet *as_eventset;	/* WaitEventSet used to configure file
-										 * descriptor wait events */
-	int			as_first_partial_plan;	/* Index of 'appendplans' containing
-										 * the first partial plan */
-	ParallelAppendState *as_pstate; /* parallel coordination info */
-	Size		pstate_len;		/* size of parallel coordination info */
-	struct PartitionPruneState *as_prune_state;
-	bool		as_valid_subplans_identified;	/* is as_valid_subplans valid? */
-	Bitmapset  *as_valid_subplans;
-	Bitmapset  *as_valid_asyncplans;	/* valid asynchronous plans indexes */
-	bool		(*choose_next_subplan) (AppendState *);
+	int			as_first_partial_plan;
+
+	/* Parallel append specific */
+	ParallelAppendState *as_pstate;
+	Size		pstate_len;
+
+	bool		(*choose_next_subplan) (struct AppendState *);
 };
 
 /* ----------------
@@ -1537,27 +1547,17 @@ struct AppendState
  */
 typedef struct MergeAppendState
 {
-	PlanState	ps;				/* its first field is NodeTag */
-	PlanState **mergeplans;		/* array of PlanStates for my inputs */
-	int			ms_nplans;
+	AppenderState ms;
+
 	int			ms_nkeys;
 	SortSupport ms_sortkeys;	/* array of length ms_nkeys */
 	TupleTableSlot **ms_slots;	/* array of length ms_nplans */
 	struct binaryheap *ms_heap; /* binary heap of slot indices */
 	bool		ms_initialized; /* are subplans started? */
-	Bitmapset  *ms_asyncplans;	/* asynchronous plans indexes */
-	int			ms_nasyncplans; /* # of asynchronous plans */
-	AsyncRequest **ms_asyncrequests;	/* array of AsyncRequests */
-	TupleTableSlot **ms_asyncresults;	/* unreturned results of async plans */
+
+	/* Merge-specific async tracking */
 	Bitmapset  *ms_has_asyncresults;	/* plans which have async results */
 	Bitmapset  *ms_asyncremain; /* remaining asynchronous plans */
-	Bitmapset  *ms_needrequest; /* asynchronous plans needing a new request */
-	struct WaitEventSet *ms_eventset;	/* WaitEventSet used to configure file
-										 * descriptor wait events */
-	struct PartitionPruneState *ms_prune_state;
-	bool		ms_valid_subplans_identified;	/* is ms_valid_subplans valid? */
-	Bitmapset  *ms_valid_subplans;
-	Bitmapset  *ms_valid_asyncplans;	/* valid asynchronous plans indexes */
 } MergeAppendState;
 
 /* Getters for AppendState and MergeAppendState */
@@ -1567,9 +1567,9 @@ GetAppendEventSet(PlanState *ps)
 	Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
 
 	if (IsA(ps, AppendState))
-		return ((AppendState *) ps)->as_eventset;
+		return ((AppendState *) ps)->as.eventset;
 	else
-		return ((MergeAppendState *) ps)->ms_eventset;
+		return ((MergeAppendState *) ps)->ms.eventset;
 }
 
 static inline Bitmapset *
@@ -1578,9 +1578,9 @@ GetNeedRequest(PlanState *ps)
 	Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
 
 	if (IsA(ps, AppendState))
-		return ((AppendState *) ps)->as_needrequest;
+		return ((AppendState *) ps)->as.needrequest;
 	else
-		return ((MergeAppendState *) ps)->ms_needrequest;
+		return ((MergeAppendState *) ps)->ms.needrequest;
 }
 
 /* Common part of classify_matching_subplans() for Append and MergeAppend */
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index c4393a94321..30c20e80b40 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -380,6 +380,20 @@ typedef struct ModifyTable
 
 struct PartitionPruneInfo;		/* forward reference to struct below */
 
+typedef struct Appender
+{
+	Plan		plan;			/* its first field is NodeTag */
+	Bitmapset  *apprelids;		/* RTIs of appendrel(s) formed by this node */
+	List	   *subplans;		/* List of Plans (formerly
+								 * appendplans/mergeplans) */
+
+	/*
+	 * Index into PlannedStmt.partPruneInfos and parallel lists in EState. Set
+	 * to -1 if no run-time pruning is used.
+	 */
+	int			part_prune_index;
+}			Appender;
+
 /* ----------------
  *	 Append node -
  *		Generate the concatenation of the results of sub-plans.
@@ -387,25 +401,16 @@ struct PartitionPruneInfo;		/* forward reference to struct below */
  */
 typedef struct Append
 {
-	Plan		plan;
-	/* RTIs of appendrel(s) formed by this node */
-	Bitmapset  *apprelids;
-	List	   *appendplans;
+	Appender	ap;
+
 	/* # of asynchronous plans */
 	int			nasyncplans;
 
 	/*
-	 * All 'appendplans' preceding this index are non-partial plans. All
-	 * 'appendplans' from this index onwards are partial plans.
+	 * All 'subplans' preceding this index are non-partial plans. All
+	 * 'subplans' from this index onwards are partial plans.
 	 */
 	int			first_partial_plan;
-
-	/*
-	 * Index into PlannedStmt.partPruneInfos and parallel lists in EState:
-	 * es_part_prune_states and es_part_prune_results. Set to -1 if no
-	 * run-time pruning is used.
-	 */
-	int			part_prune_index;
 } Append;
 
 /* ----------------
@@ -415,12 +420,7 @@ typedef struct Append
  */
 typedef struct MergeAppend
 {
-	Plan		plan;
-
-	/* RTIs of appendrel(s) formed by this node */
-	Bitmapset  *apprelids;
-
-	List	   *mergeplans;
+	Appender	ap;
 
 	/* these fields are just like the sort-key info in struct Sort: */
 
@@ -438,13 +438,6 @@ typedef struct MergeAppend
 
 	/* NULLS FIRST/LAST directions */
 	bool	   *nullsFirst pg_node_attr(array_size(numCols));
-
-	/*
-	 * Index into PlannedStmt.partPruneInfos and parallel lists in EState:
-	 * es_part_prune_states and es_part_prune_results. Set to -1 if no
-	 * run-time pruning is used.
-	 */
-	int			part_prune_index;
 } MergeAppend;
 
 /* ----------------
-- 
2.51.2