Thread

  1. [PATCH v4 2/2] MergeAppend should support Async Foreign Scan subplans

    Alexander Pyhalov <a.pyhalov@postgrespro.ru> — 2025-07-26T07:43:57Z

    ---
     .../postgres_fdw/expected/postgres_fdw.out    | 247 +++++++++
     contrib/postgres_fdw/postgres_fdw.c           |  10 +-
     contrib/postgres_fdw/sql/postgres_fdw.sql     |  78 +++
     doc/src/sgml/config.sgml                      |  14 +
     src/backend/executor/execAsync.c              |   4 +
     src/backend/executor/nodeMergeAppend.c        | 481 +++++++++++++++++-
     src/backend/optimizer/path/costsize.c         |   1 +
     src/backend/optimizer/plan/createplan.c       |   9 +
     src/backend/utils/misc/guc_parameters.dat     |   7 +
     src/backend/utils/misc/postgresql.conf.sample |   1 +
     src/include/executor/nodeMergeAppend.h        |   1 +
     src/include/nodes/execnodes.h                 |  56 ++
     src/include/optimizer/cost.h                  |   1 +
     src/test/regress/expected/sysviews.out        |   3 +-
     14 files changed, 907 insertions(+), 6 deletions(-)
    
    diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
    index cd28126049d..d5f25a6de9a 100644
    --- a/contrib/postgres_fdw/expected/postgres_fdw.out
    +++ b/contrib/postgres_fdw/expected/postgres_fdw.out
    @@ -11556,6 +11556,46 @@ SELECT * FROM result_tbl ORDER BY a;
     (2 rows)
     
     DELETE FROM result_tbl;
    +-- Test Merge Append
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
    +                                                          QUERY PLAN                                                          
    +------------------------------------------------------------------------------------------------------------------------------
    + Merge Append
    +   Sort Key: async_pt.b, async_pt.a
    +   ->  Async Foreign Scan on public.async_p1 async_pt_1
    +         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
    +         Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
    +   ->  Async Foreign Scan on public.async_p2 async_pt_2
    +         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
    +         Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
    +(8 rows)
    +
    +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
    +  a   |  b  |  c   
    +------+-----+------
    + 1000 |   0 | 0000
    + 2000 |   0 | 0000
    + 1100 | 100 | 0100
    + 2100 | 100 | 0100
    + 1200 | 200 | 0200
    + 2200 | 200 | 0200
    + 1300 | 300 | 0300
    + 2300 | 300 | 0300
    + 1400 | 400 | 0400
    + 2400 | 400 | 0400
    + 1500 | 500 | 0500
    + 2500 | 500 | 0500
    + 1600 | 600 | 0600
    + 2600 | 600 | 0600
    + 1700 | 700 | 0700
    + 2700 | 700 | 0700
    + 1800 | 800 | 0800
    + 2800 | 800 | 0800
    + 1900 | 900 | 0900
    + 2900 | 900 | 0900
    +(20 rows)
    +
     -- Test error handling, if accessing one of the foreign partitions errors out
     CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
       SERVER loopback OPTIONS (table_name 'non_existent_table');
    @@ -11604,6 +11644,35 @@ COPY async_pt TO stdout; --error
     ERROR:  cannot copy from foreign table "async_p1"
     DETAIL:  Partition "async_p1" is a foreign table in partitioned table "async_pt"
     HINT:  Try the COPY (SELECT ...) TO variant.
    +-- Test Merge Append
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
    +                                              QUERY PLAN                                              
    +------------------------------------------------------------------------------------------------------
    + Merge Append
    +   Sort Key: async_pt.b, async_pt.a
    +   ->  Async Foreign Scan on public.async_p1 async_pt_1
    +         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
    +         Filter: (async_pt_1.b === 505)
    +         Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
    +   ->  Async Foreign Scan on public.async_p2 async_pt_2
    +         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
    +         Filter: (async_pt_2.b === 505)
    +         Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
    +   ->  Async Foreign Scan on public.async_p3 async_pt_3
    +         Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
    +         Filter: (async_pt_3.b === 505)
    +         Remote SQL: SELECT a, b, c FROM public.base_tbl3 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
    +(14 rows)
    +
    +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
    +  a   |  b  |  c   
    +------+-----+------
    + 1505 | 505 | 0505
    + 2505 | 505 | 0505
    + 3505 | 505 | 0505
    +(3 rows)
    +
     DROP FOREIGN TABLE async_p3;
     DROP TABLE base_tbl3;
     -- Check case where the partitioned table has local/remote partitions
    @@ -11639,6 +11708,37 @@ SELECT * FROM result_tbl ORDER BY a;
     (3 rows)
     
     DELETE FROM result_tbl;
    +-- Test Merge Append
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
    +                                              QUERY PLAN                                              
    +------------------------------------------------------------------------------------------------------
    + Merge Append
    +   Sort Key: async_pt.b, async_pt.a
    +   ->  Async Foreign Scan on public.async_p1 async_pt_1
    +         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
    +         Filter: (async_pt_1.b === 505)
    +         Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
    +   ->  Async Foreign Scan on public.async_p2 async_pt_2
    +         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
    +         Filter: (async_pt_2.b === 505)
    +         Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
    +   ->  Sort
    +         Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
    +         Sort Key: async_pt_3.b, async_pt_3.a
    +         ->  Seq Scan on public.async_p3 async_pt_3
    +               Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
    +               Filter: (async_pt_3.b === 505)
    +(16 rows)
    +
    +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
    +  a   |  b  |  c   
    +------+-----+------
    + 1505 | 505 | 0505
    + 2505 | 505 | 0505
    + 3505 | 505 | 0505
    +(3 rows)
    +
     -- partitionwise joins
     SET enable_partitionwise_join TO true;
     CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
    @@ -12421,6 +12521,153 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
     DROP FOREIGN TABLE foreign_tbl CASCADE;
     NOTICE:  drop cascades to foreign table foreign_tbl2
     DROP TABLE base_tbl;
    +-- Test async Merge Append
    +CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
    +CREATE TABLE base1 (i int, j int, k text);
    +CREATE TABLE base2 (i int, j int, k text);
    +CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
    +SERVER loopback OPTIONS (table_name 'base1');
    +CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
    +SERVER loopback OPTIONS (table_name 'base2');
    +CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
    +CREATE TABLE base3 (i int, j int, k text);
    +CREATE TABLE base4 (i int, j int, k text);
    +CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
    +SERVER loopback OPTIONS (table_name 'base3');
    +CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
    +SERVER loopback OPTIONS (table_name 'base4');
    +INSERT INTO distr1
    +SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
    +INSERT INTO distr2
    +SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
    +ANALYZE distr1_p1;
    +ANALYZE distr1_p2;
    +ANALYZE distr2_p1;
    +ANALYZE distr2_p2;
    +SET enable_partitionwise_join TO ON;
    +-- Test joins with async Merge Append
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
    +ORDER BY distr2.i LIMIT 10;
    +                                                                                                    QUERY PLAN                                                                                                     
    +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    + Limit
    +   Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
    +   ->  Merge Append
    +         Sort Key: distr1.i
    +         ->  Async Foreign Scan
    +               Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
    +               Relations: (public.distr1_p1 distr1_1) INNER JOIN (public.distr2_p1 distr2_1)
    +               Remote SQL: SELECT r3.i, r3.j, r3.k, r5.i, r5.j, r5.k FROM (public.base1 r3 INNER JOIN public.base3 r5 ON (((r3.i = r5.i)) AND ((r5.j > 90)) AND ((r5.k ~~ 'data%')))) ORDER BY r3.i ASC NULLS LAST
    +         ->  Async Foreign Scan
    +               Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
    +               Relations: (public.distr1_p2 distr1_2) INNER JOIN (public.distr2_p2 distr2_2)
    +               Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base2 r4 INNER JOIN public.base4 r6 ON (((r4.i = r6.i)) AND ((r6.j > 90)) AND ((r6.k ~~ 'data%')))) ORDER BY r4.i ASC NULLS LAST
    +(12 rows)
    +
    +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
    +ORDER BY distr2.i LIMIT 10;
    + i  |  j  |    k    | i  |  j  |    k    
    +----+-----+---------+----+-----+---------
    + 10 | 100 | data_10 | 10 | 100 | data_10
    + 11 | 110 | data_11 | 11 | 110 | data_11
    + 12 | 120 | data_12 | 12 | 120 | data_12
    + 13 | 130 | data_13 | 13 | 130 | data_13
    + 14 | 140 | data_14 | 14 | 140 | data_14
    + 15 | 150 | data_15 | 15 | 150 | data_15
    + 16 | 160 | data_16 | 16 | 160 | data_16
    + 17 | 170 | data_17 | 17 | 170 | data_17
    + 18 | 180 | data_18 | 18 | 180 | data_18
    + 19 | 190 | data_19 | 19 | 190 | data_19
    +(10 rows)
    +
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE  distr1.i > 90
    +ORDER BY distr1.i LIMIT 20;
    +                                                                                                     QUERY PLAN                                                                                                     
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    + Limit
    +   Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
    +   ->  Merge Append
    +         Sort Key: distr1.i
    +         ->  Async Foreign Scan
    +               Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
    +               Relations: (public.distr1_p1 distr1_1) LEFT JOIN (public.distr2_p1 distr2_1)
    +               Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base1 r4 LEFT JOIN public.base3 r6 ON (((r4.i = r6.i)) AND ((r6.k ~~ 'data%')))) WHERE ((r4.i > 90)) ORDER BY r4.i ASC NULLS LAST
    +         ->  Async Foreign Scan
    +               Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
    +               Relations: (public.distr1_p2 distr1_2) LEFT JOIN (public.distr2_p2 distr2_2)
    +               Remote SQL: SELECT r5.i, r5.j, r5.k, r7.i, r7.j, r7.k FROM (public.base2 r5 LEFT JOIN public.base4 r7 ON (((r5.i = r7.i)) AND ((r7.k ~~ 'data%')))) WHERE ((r5.i > 90)) ORDER BY r5.i ASC NULLS LAST
    +(12 rows)
    +
    +SELECT * FROM distr1 LEFT JOIN distr2 ON  distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
    +ORDER BY distr1.i LIMIT 20;
    +  i  |  j   |    k     |  i  |  j   |    k     
    +-----+------+----------+-----+------+----------
    +  91 |  910 | data_91  |  91 |  910 | data_91
    +  92 |  920 | data_92  |  92 |  920 | data_92
    +  93 |  930 | data_93  |  93 |  930 | data_93
    +  94 |  940 | data_94  |  94 |  940 | data_94
    +  95 |  950 | data_95  |  95 |  950 | data_95
    +  96 |  960 | data_96  |  96 |  960 | data_96
    +  97 |  970 | data_97  |  97 |  970 | data_97
    +  98 |  980 | data_98  |  98 |  980 | data_98
    +  99 |  990 | data_99  |  99 |  990 | data_99
    + 100 | 1000 | data_100 | 100 | 1000 | data_100
    + 101 | 1010 | data_101 |     |      | 
    + 102 | 1020 | data_102 |     |      | 
    + 103 | 1030 | data_103 |     |      | 
    + 104 | 1040 | data_104 |     |      | 
    + 105 | 1050 | data_105 |     |      | 
    + 106 | 1060 | data_106 |     |      | 
    + 107 | 1070 | data_107 |     |      | 
    + 108 | 1080 | data_108 |     |      | 
    + 109 | 1090 | data_109 |     |      | 
    + 110 | 1100 | data_110 |     |      | 
    +(20 rows)
    +
    +-- Test pruning with async Merge Append
    +DELETE FROM distr2;
    +INSERT INTO distr2
    +SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
    +DEALLOCATE ALL;
    +SET plan_cache_mode TO force_generic_plan;
    +PREPARE async_pt_query (int, int) AS
    +  SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
    +  ORDER BY i,j
    +  LIMIT 10;
    +EXPLAIN (VERBOSE, COSTS OFF)
    +	EXECUTE async_pt_query(1, 1);
    +                                                                         QUERY PLAN                                                                         
    +------------------------------------------------------------------------------------------------------------------------------------------------------------
    + Limit
    +   Output: distr2.i, distr2.j, distr2.k
    +   ->  Merge Append
    +         Sort Key: distr2.i, distr2.j
    +         Subplans Removed: 1
    +         ->  Async Foreign Scan on public.distr2_p1 distr2_1
    +               Output: distr2_1.i, distr2_1.j, distr2_1.k
    +               Remote SQL: SELECT i, j, k FROM public.base3 WHERE ((i = ANY (ARRAY[$1::integer, $2::integer]))) ORDER BY i ASC NULLS LAST, j ASC NULLS LAST
    +(8 rows)
    +
    +EXECUTE async_pt_query(1, 1);
    + i |  j  |    k    
    +---+-----+---------
    + 1 |  10 | data_1
    + 1 | 110 | data_11
    + 1 | 210 | data_21
    + 1 | 310 | data_31
    + 1 | 410 | data_41
    + 1 | 510 | data_51
    + 1 | 610 | data_61
    + 1 | 710 | data_71
    + 1 | 810 | data_81
    + 1 | 910 | data_91
    +(10 rows)
    +
    +RESET plan_cache_mode;
    +RESET enable_partitionwise_join;
    +DROP TABLE distr1, distr2, base1, base2, base3, base4;
     ALTER SERVER loopback OPTIONS (DROP async_capable);
     ALTER SERVER loopback2 OPTIONS (DROP async_capable);
     -- ===================================================================
    diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
    index 456b267f70b..a63eb002416 100644
    --- a/contrib/postgres_fdw/postgres_fdw.c
    +++ b/contrib/postgres_fdw/postgres_fdw.c
    @@ -7213,12 +7213,16 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
     	ForeignScanState *node = (ForeignScanState *) areq->requestee;
     	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
     	AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
    -	AppendState *requestor = (AppendState *) areq->requestor;
    -	WaitEventSet *set = requestor->as_eventset;
    +	PlanState  *requestor = areq->requestor;
    +	WaitEventSet *set;
    +	Bitmapset  *needrequest;
     
     	/* This should not be called unless callback_pending */
     	Assert(areq->callback_pending);
     
    +	set = GetAppendEventSet(requestor);
    +	needrequest = GetNeedRequest(requestor);
    +
     	/*
     	 * If process_pending_request() has been invoked on the given request
     	 * before we get here, we might have some tuples already; in which case
    @@ -7256,7 +7260,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
     		 * below, because we might otherwise end up with no configured events
     		 * other than the postmaster death event.
     		 */
    -		if (!bms_is_empty(requestor->as_needrequest))
    +		if (!bms_is_empty(needrequest))
     			return;
     		if (GetNumRegisteredWaitEvents(set) > 1)
     			return;
    diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
    index 9a8f9e28135..0786ba2c502 100644
    --- a/contrib/postgres_fdw/sql/postgres_fdw.sql
    +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
    @@ -3921,6 +3921,11 @@ INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
     SELECT * FROM result_tbl ORDER BY a;
     DELETE FROM result_tbl;
     
    +-- Test Merge Append
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
    +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
    +
     -- Test error handling, if accessing one of the foreign partitions errors out
     CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
       SERVER loopback OPTIONS (table_name 'non_existent_table');
    @@ -3944,6 +3949,11 @@ DELETE FROM result_tbl;
     -- Test COPY TO when foreign table is partition
     COPY async_pt TO stdout; --error
     
    +-- Test Merge Append
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
    +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
    +
     DROP FOREIGN TABLE async_p3;
     DROP TABLE base_tbl3;
     
    @@ -3959,6 +3969,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
     SELECT * FROM result_tbl ORDER BY a;
     DELETE FROM result_tbl;
     
    +-- Test Merge Append
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
    +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
    +
     -- partitionwise joins
     SET enable_partitionwise_join TO true;
     
    @@ -4197,6 +4212,69 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
     DROP FOREIGN TABLE foreign_tbl CASCADE;
     DROP TABLE base_tbl;
     
    +-- Test async Merge Append
    +CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
    +CREATE TABLE base1 (i int, j int, k text);
    +CREATE TABLE base2 (i int, j int, k text);
    +CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
    +SERVER loopback OPTIONS (table_name 'base1');
    +CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
    +SERVER loopback OPTIONS (table_name 'base2');
    +
    +CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
    +CREATE TABLE base3 (i int, j int, k text);
    +CREATE TABLE base4 (i int, j int, k text);
    +CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
    +SERVER loopback OPTIONS (table_name 'base3');
    +CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
    +SERVER loopback OPTIONS (table_name 'base4');
    +
    +INSERT INTO distr1
    +SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
    +
    +INSERT INTO distr2
    +SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
    +
    +ANALYZE distr1_p1;
    +ANALYZE distr1_p2;
    +ANALYZE distr2_p1;
    +ANALYZE distr2_p2;
    +
    +SET enable_partitionwise_join TO ON;
    +
    +-- Test joins with async Merge Append
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
    +ORDER BY distr2.i LIMIT 10;
    +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
    +ORDER BY distr2.i LIMIT 10;
    +
    +EXPLAIN (VERBOSE, COSTS OFF)
    +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE  distr1.i > 90
    +ORDER BY distr1.i LIMIT 20;
    +SELECT * FROM distr1 LEFT JOIN distr2 ON  distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
    +ORDER BY distr1.i LIMIT 20;
    +
    +-- Test pruning with async Merge Append
    +DELETE FROM distr2;
    +INSERT INTO distr2
    +SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
    +
    +DEALLOCATE ALL;
    +SET plan_cache_mode TO force_generic_plan;
    +PREPARE async_pt_query (int, int) AS
    +  SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
    +  ORDER BY i,j
    +  LIMIT 10;
    +EXPLAIN (VERBOSE, COSTS OFF)
    +	EXECUTE async_pt_query(1, 1);
    +EXECUTE async_pt_query(1, 1);
    +RESET plan_cache_mode;
    +
    +RESET enable_partitionwise_join;
    +
    +DROP TABLE distr1, distr2, base1, base2, base3, base4;
    +
     ALTER SERVER loopback OPTIONS (DROP async_capable);
     ALTER SERVER loopback2 OPTIONS (DROP async_capable);
     
    diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
    index 0a2a8b49fdb..772c42fcf80 100644
    --- a/doc/src/sgml/config.sgml
    +++ b/doc/src/sgml/config.sgml
    @@ -5444,6 +5444,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
           </listitem>
          </varlistentry>
     
    +     <varlistentry id="guc-enable-async-merge-append" xreflabel="enable_async_merge_append">
    +      <term><varname>enable_async_merge_append</varname> (<type>boolean</type>)
    +      <indexterm>
    +       <primary><varname>enable_async_merge_append</varname> configuration parameter</primary>
    +      </indexterm>
    +      </term>
    +      <listitem>
    +       <para>
    +        Enables or disables the query planner's use of async-aware
    +        merge append plan types. The default is <literal>on</literal>.
    +       </para>
    +      </listitem>
    +     </varlistentry>
    +
          <varlistentry id="guc-enable-bitmapscan" xreflabel="enable_bitmapscan">
           <term><varname>enable_bitmapscan</varname> (<type>boolean</type>)
           <indexterm>
    diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
    index 5d3cabe73e3..6dc19ebc374 100644
    --- a/src/backend/executor/execAsync.c
    +++ b/src/backend/executor/execAsync.c
    @@ -17,6 +17,7 @@
     #include "executor/execAsync.h"
     #include "executor/executor.h"
     #include "executor/nodeAppend.h"
    +#include "executor/nodeMergeAppend.h"
     #include "executor/nodeForeignscan.h"
     
     /*
    @@ -121,6 +122,9 @@ ExecAsyncResponse(AsyncRequest *areq)
     		case T_AppendState:
     			ExecAsyncAppendResponse(areq);
     			break;
    +		case T_MergeAppendState:
    +			ExecAsyncMergeAppendResponse(areq);
    +			break;
     		default:
     			/* If the node doesn't support async, caller messed up. */
     			elog(ERROR, "unrecognized node type: %d",
    diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
    index 405e8f94285..6f899a2d5f5 100644
    --- a/src/backend/executor/nodeMergeAppend.c
    +++ b/src/backend/executor/nodeMergeAppend.c
    @@ -39,10 +39,15 @@
     #include "postgres.h"
     
     #include "executor/executor.h"
    +#include "executor/execAsync.h"
     #include "executor/execPartition.h"
     #include "executor/nodeMergeAppend.h"
     #include "lib/binaryheap.h"
     #include "miscadmin.h"
    +#include "storage/latch.h"
    +#include "utils/wait_event.h"
    +
    +#define EVENT_BUFFER_SIZE                     16
     
     /*
      * We have one slot for each item in the heap array.  We use SlotNumber
    @@ -54,6 +59,12 @@ typedef int32 SlotNumber;
     static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
     static int	heap_compare_slots(Datum a, Datum b, void *arg);
     
    +static void classify_matching_subplans(MergeAppendState *node);
    +static void ExecMergeAppendAsyncBegin(MergeAppendState *node);
    +static void ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan);
    +static bool ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan);
    +static void ExecMergeAppendAsyncEventWait(MergeAppendState *node);
    +
     
     /* ----------------------------------------------------------------
      *		ExecInitMergeAppend
    @@ -71,6 +82,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
     	int			nplans;
     	int			i,
     				j;
    +	Bitmapset  *asyncplans;
    +	int			nasyncplans;
     
     	/* check for unsupported flags */
     	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
    @@ -106,7 +119,10 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
     		 * later calls to ExecFindMatchingSubPlans.
     		 */
     		if (!prunestate->do_exec_prune && nplans > 0)
    +		{
     			mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
    +			mergestate->ms_valid_subplans_identified = true;
    +		}
     	}
     	else
     	{
    @@ -119,6 +135,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
     		Assert(nplans > 0);
     		mergestate->ms_valid_subplans = validsubplans =
     			bms_add_range(NULL, 0, nplans - 1);
    +		mergestate->ms_valid_subplans_identified = true;
     		mergestate->ms_prune_state = NULL;
     	}
     
    @@ -135,11 +152,25 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
     	 * the results into the mergeplanstates array.
     	 */
     	j = 0;
    +	asyncplans = NULL;
    +	nasyncplans = 0;
    +
     	i = -1;
     	while ((i = bms_next_member(validsubplans, i)) >= 0)
     	{
     		Plan	   *initNode = (Plan *) list_nth(node->mergeplans, i);
     
    +		/*
    +		 * Record async subplans.  When executing EvalPlanQual, we treat them
    +		 * as sync ones; don't do this when initializing an EvalPlanQual plan
    +		 * tree.
    +		 */
    +		if (initNode->async_capable && estate->es_epq_active == NULL)
    +		{
    +			asyncplans = bms_add_member(asyncplans, j);
    +			nasyncplans++;
    +		}
    +
     		mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
     	}
     
    @@ -170,6 +201,45 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
     	 */
     	mergestate->ps.ps_ProjInfo = NULL;
     
    +	/* Initialize async state */
    +	mergestate->ms_asyncplans = asyncplans;
    +	mergestate->ms_nasyncplans = nasyncplans;
    +	mergestate->ms_asyncrequests = NULL;
    +	mergestate->ms_asyncresults = NULL;
    +	mergestate->ms_has_asyncresults = NULL;
    +	mergestate->ms_asyncremain = NULL;
    +	mergestate->ms_needrequest = NULL;
    +	mergestate->ms_eventset = NULL;
    +	mergestate->ms_valid_asyncplans = NULL;
    +
    +	if (nasyncplans > 0)
    +	{
    +		mergestate->ms_asyncrequests = (AsyncRequest **)
    +			palloc0(nplans * sizeof(AsyncRequest *));
    +
    +		i = -1;
    +		while ((i = bms_next_member(asyncplans, i)) >= 0)
    +		{
    +			AsyncRequest *areq;
    +
    +			areq = palloc(sizeof(AsyncRequest));
    +			areq->requestor = (PlanState *) mergestate;
    +			areq->requestee = mergeplanstates[i];
    +			areq->request_index = i;
    +			areq->callback_pending = false;
    +			areq->request_complete = false;
    +			areq->result = NULL;
    +
    +			mergestate->ms_asyncrequests[i] = areq;
    +		}
    +
    +		mergestate->ms_asyncresults = (TupleTableSlot **)
    +			palloc0(nplans * sizeof(TupleTableSlot *));
    +
    +		if (mergestate->ms_valid_subplans_identified)
    +			classify_matching_subplans(mergestate);
    +	}
    +
     	/*
     	 * initialize sort-key information
     	 */
    @@ -231,9 +301,16 @@ ExecMergeAppend(PlanState *pstate)
     		 * run-time pruning is disabled then the valid subplans will always be
     		 * set to all subplans.
     		 */
    -		if (node->ms_valid_subplans == NULL)
    +		if (!node->ms_valid_subplans_identified)
    +		{
     			node->ms_valid_subplans =
     				ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
    +			node->ms_valid_subplans_identified = true;
    +		}
    +
    +		/* If there are any async subplans, begin executing them. */
    +		if (node->ms_nasyncplans > 0)
    +			ExecMergeAppendAsyncBegin(node);
     
     		/*
     		 * First time through: pull the first tuple from each valid subplan,
    @@ -246,6 +323,16 @@ ExecMergeAppend(PlanState *pstate)
     			if (!TupIsNull(node->ms_slots[i]))
     				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
     		}
    +
    +		/* Look at valid async subplans */
    +		i = -1;
    +		while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
    +		{
    +			ExecMergeAppendAsyncGetNext(node, i);
    +			if (!TupIsNull(node->ms_slots[i]))
    +				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
    +		}
    +
     		binaryheap_build(node->ms_heap);
     		node->ms_initialized = true;
     	}
    @@ -260,7 +347,13 @@ ExecMergeAppend(PlanState *pstate)
     		 * to not pull tuples until necessary.)
     		 */
     		i = DatumGetInt32(binaryheap_first(node->ms_heap));
    -		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
    +		if (bms_is_member(i, node->ms_asyncplans))
    +			ExecMergeAppendAsyncGetNext(node, i);
    +		else
    +		{
    +			Assert(bms_is_member(i, node->ms_valid_subplans));
    +			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
    +		}
     		if (!TupIsNull(node->ms_slots[i]))
     			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
     		else
    @@ -276,6 +369,8 @@ ExecMergeAppend(PlanState *pstate)
     	{
     		i = DatumGetInt32(binaryheap_first(node->ms_heap));
     		result = node->ms_slots[i];
    +		/* For async plan record that we can get the next tuple */
    +		node->ms_has_asyncresults = bms_del_member(node->ms_has_asyncresults, i);
     	}
     
     	return result;
    @@ -355,6 +450,7 @@ void
     ExecReScanMergeAppend(MergeAppendState *node)
     {
     	int			i;
    +	int			nasyncplans = node->ms_nasyncplans;
     
     	/*
     	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
    @@ -365,8 +461,11 @@ ExecReScanMergeAppend(MergeAppendState *node)
     		bms_overlap(node->ps.chgParam,
     					node->ms_prune_state->execparamids))
     	{
    +		node->ms_valid_subplans_identified = false;
     		bms_free(node->ms_valid_subplans);
     		node->ms_valid_subplans = NULL;
    +		bms_free(node->ms_valid_asyncplans);
    +		node->ms_valid_asyncplans = NULL;
     	}
     
     	for (i = 0; i < node->ms_nplans; i++)
    @@ -387,6 +486,384 @@ ExecReScanMergeAppend(MergeAppendState *node)
     		if (subnode->chgParam == NULL)
     			ExecReScan(subnode);
     	}
    +
    +	/* Reset async state */
    +	if (nasyncplans > 0)
    +	{
    +		i = -1;
    +		while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
    +		{
    +			AsyncRequest *areq = node->ms_asyncrequests[i];
    +
    +			areq->callback_pending = false;
    +			areq->request_complete = false;
    +			areq->result = NULL;
    +		}
    +
    +		bms_free(node->ms_asyncremain);
    +		node->ms_asyncremain = NULL;
    +		bms_free(node->ms_needrequest);
    +		node->ms_needrequest = NULL;
    +		bms_free(node->ms_has_asyncresults);
    +		node->ms_has_asyncresults = NULL;
    +	}
     	binaryheap_reset(node->ms_heap);
     	node->ms_initialized = false;
     }
    +
    +/* ----------------------------------------------------------------
    + *              classify_matching_subplans
    + *
    + *              Classify the node's ms_valid_subplans into sync ones and
    + *              async ones, adjust it to contain sync ones only, and save
    + *              async ones in the node's ms_valid_asyncplans.
    + * ----------------------------------------------------------------
    + */
    +static void
    +classify_matching_subplans(MergeAppendState *node)
    +{
    +	Bitmapset  *valid_asyncplans;
    +
    +	Assert(node->ms_valid_subplans_identified);
    +	Assert(node->ms_valid_asyncplans == NULL);
    +
    +	/* Nothing to do if there are no valid subplans. */
    +	if (bms_is_empty(node->ms_valid_subplans))
    +	{
    +		node->ms_asyncremain = NULL;
    +		return;
    +	}
    +
    +	/* Nothing to do if there are no valid async subplans. */
    +	if (!bms_overlap(node->ms_valid_subplans, node->ms_asyncplans))
    +	{
    +		node->ms_asyncremain = NULL;
    +		return;
    +	}
    +
    +	/* Get valid async subplans. */
    +	valid_asyncplans = bms_intersect(node->ms_asyncplans,
    +									 node->ms_valid_subplans);
    +
    +	/* Adjust the valid subplans to contain sync subplans only. */
    +	node->ms_valid_subplans = bms_del_members(node->ms_valid_subplans,
    +											  valid_asyncplans);
    +
    +	/* Save valid async subplans. */
    +	node->ms_valid_asyncplans = valid_asyncplans;
    +}
    +
    +/* ----------------------------------------------------------------
    + *              ExecMergeAppendAsyncBegin
    + *
    + *              Begin executing designed async-capable subplans.
    + * ----------------------------------------------------------------
    + */
    +static void
    +ExecMergeAppendAsyncBegin(MergeAppendState *node)
    +{
    +	int			i;
    +
    +	/* Backward scan is not supported by async-aware MergeAppends. */
    +	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
    +
    +	/* We should never be called when there are no subplans */
    +	Assert(node->ms_nplans > 0);
    +
    +	/* We should never be called when there are no async subplans. */
    +	Assert(node->ms_nasyncplans > 0);
    +
    +	/* If we've yet to determine the valid subplans then do so now. */
    +	if (!node->ms_valid_subplans_identified)
    +	{
    +		node->ms_valid_subplans =
    +			ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
    +		node->ms_valid_subplans_identified = true;
    +
    +		classify_matching_subplans(node);
    +	}
    +
    +	/* Initialize state variables. */
    +	node->ms_asyncremain = bms_copy(node->ms_valid_asyncplans);
    +
    +	/* Nothing to do if there are no valid async subplans. */
    +	if (bms_is_empty(node->ms_asyncremain))
    +		return;
    +
    +	/* Make a request for each of the valid async subplans. */
    +	i = -1;
    +	while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
    +	{
    +		AsyncRequest *areq = node->ms_asyncrequests[i];
    +
    +		Assert(areq->request_index == i);
    +		Assert(!areq->callback_pending);
    +
    +		/* Do the actual work. */
    +		ExecAsyncRequest(areq);
    +	}
    +}
    +
    +/* ----------------------------------------------------------------
    + *              ExecMergeAppendAsyncGetNext
    + *
    + *              Get the next tuple from specified asynchronous subplan.
    + * ----------------------------------------------------------------
    + */
    +static void
    +ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan)
    +{
    +	node->ms_slots[mplan] = NULL;
    +
    +	/* Request a tuple asynchronously. */
    +	if (ExecMergeAppendAsyncRequest(node, mplan))
    +		return;
    +
    +	/*
    +	 * node->ms_asyncremain can be NULL if we have fetched tuples, but haven't
    +	 * returned them yet. In this case ExecMergeAppendAsyncRequest() above
    +	 * just returns tuples without performing a request.
    +	 */
    +	while (bms_is_member(mplan, node->ms_asyncremain))
    +	{
    +		CHECK_FOR_INTERRUPTS();
    +
    +		/* Wait or poll for async events. */
    +		ExecMergeAppendAsyncEventWait(node);
    +
    +		/* Request a tuple asynchronously. */
    +		if (ExecMergeAppendAsyncRequest(node, mplan))
    +			return;
    +
    +		/*
    +		 * Waiting until there's no async requests pending or we got some
    +		 * tuples from our request
    +		 */
    +	}
    +
    +	/* No tuples */
    +	return;
    +}
    +
    +/* ----------------------------------------------------------------
    + *              ExecMergeAppendAsyncRequest
    + *
    + *              Request a tuple asynchronously.
    + * ----------------------------------------------------------------
    + */
    +static bool
    +ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan)
    +{
    +	Bitmapset  *needrequest;
    +	int			i;
    +
    +	/*
    +	 * If we've already fetched necessary data, just return it
    +	 */
    +	if (bms_is_member(mplan, node->ms_has_asyncresults))
    +	{
    +		node->ms_slots[mplan] = node->ms_asyncresults[mplan];
    +		return true;
    +	}
    +
    +	/*
    +	 * Get a list of members which can process request and don't have data
    +	 * ready.
    +	 */
    +	needrequest = NULL;
    +	i = -1;
    +	while ((i = bms_next_member(node->ms_needrequest, i)) >= 0)
    +	{
    +		if (!bms_is_member(i, node->ms_has_asyncresults))
    +			needrequest = bms_add_member(needrequest, i);
    +	}
    +
    +	/*
    +	 * If there's no members, which still need request, no need to send it.
    +	 */
    +	if (bms_is_empty(needrequest))
    +		return false;
    +
    +	/* Clear ms_needrequest flag, as we are going to send requests now */
    +	node->ms_needrequest = bms_del_members(node->ms_needrequest, needrequest);
    +
    +	/* Make a new request for each of the async subplans that need it. */
    +	i = -1;
    +	while ((i = bms_next_member(needrequest, i)) >= 0)
    +	{
    +		AsyncRequest *areq = node->ms_asyncrequests[i];
    +
    +		/*
    +		 * We've just checked that subplan doesn't already have some fetched
    +		 * data
    +		 */
    +		Assert(!bms_is_member(i, node->ms_has_asyncresults));
    +
    +		/* Do the actual work. */
    +		ExecAsyncRequest(areq);
    +	}
    +	bms_free(needrequest);
    +
    +	/* Return needed asynchronously-generated results if any. */
    +	if (bms_is_member(mplan, node->ms_has_asyncresults))
    +	{
    +		node->ms_slots[mplan] = node->ms_asyncresults[mplan];
    +		return true;
    +	}
    +
    +	return false;
    +}
    +
    +/* ----------------------------------------------------------------
    + *              ExecAsyncMergeAppendResponse
    + *
    + *              Receive a response from an asynchronous request we made.
    + * ----------------------------------------------------------------
    + */
    +void
    +ExecAsyncMergeAppendResponse(AsyncRequest *areq)
    +{
    +	MergeAppendState *node = (MergeAppendState *) areq->requestor;
    +	TupleTableSlot *slot = areq->result;
    +
    +	/* The result should be a TupleTableSlot or NULL. */
    +	Assert(slot == NULL || IsA(slot, TupleTableSlot));
    +	Assert(!bms_is_member(areq->request_index, node->ms_has_asyncresults));
    +
    +	node->ms_asyncresults[areq->request_index] = NULL;
    +	/* Nothing to do if the request is pending. */
    +	if (!areq->request_complete)
    +	{
    +		/* The request would have been pending for a callback. */
    +		Assert(areq->callback_pending);
    +		return;
    +	}
    +
    +	/* If the result is NULL or an empty slot, there's nothing more to do. */
    +	if (TupIsNull(slot))
    +	{
    +		/* The ending subplan wouldn't have been pending for a callback. */
    +		Assert(!areq->callback_pending);
    +		node->ms_asyncremain = bms_del_member(node->ms_asyncremain, areq->request_index);
    +		return;
    +	}
    +
    +	node->ms_has_asyncresults = bms_add_member(node->ms_has_asyncresults, areq->request_index);
    +	/* Save result so we can return it. */
    +	node->ms_asyncresults[areq->request_index] = slot;
    +
    +	/*
    +	 * Mark the subplan that returned a result as ready for a new request.  We
    +	 * don't launch another one here immediately because it might complete.
    +	 */
    +	node->ms_needrequest = bms_add_member(node->ms_needrequest,
    +										  areq->request_index);
    +}
    +
    +/* ----------------------------------------------------------------
    + *		ExecMergeAppendAsyncEventWait
    + *
    + *		Wait or poll for file descriptor events and fire callbacks.
    + * ----------------------------------------------------------------
    + */
    +static void
    +ExecMergeAppendAsyncEventWait(MergeAppendState *node)
    +{
    +	int			nevents = node->ms_nasyncplans + 2; /* one for PM death and
    +													 * one for latch */
    +	WaitEvent	occurred_event[EVENT_BUFFER_SIZE];
    +	int			noccurred;
    +	int			i;
    +
    +	/* We should never be called when there are no valid async subplans. */
    +	Assert(bms_num_members(node->ms_asyncremain) > 0);
    +
    +	node->ms_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
    +	AddWaitEventToSet(node->ms_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
    +					  NULL, NULL);
    +
    +	/* Give each waiting subplan a chance to add an event. */
    +	i = -1;
    +	while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
    +	{
    +		AsyncRequest *areq = node->ms_asyncrequests[i];
    +
    +		if (areq->callback_pending)
    +			ExecAsyncConfigureWait(areq);
    +	}
    +
    +	/*
    +	 * No need for further processing if none of the subplans configured any
    +	 * events.
    +	 */
    +	if (GetNumRegisteredWaitEvents(node->ms_eventset) == 1)
    +	{
    +		FreeWaitEventSet(node->ms_eventset);
    +		node->ms_eventset = NULL;
    +		return;
    +	}
    +
    +	/*
    +	 * Add the process latch to the set, so that we wake up to process the
    +	 * standard interrupts with CHECK_FOR_INTERRUPTS().
    +	 *
    +	 * NOTE: For historical reasons, it's important that this is added to the
    +	 * WaitEventSet after the ExecAsyncConfigureWait() calls.  Namely,
    +	 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
    +	 * any other events are in the set.  That's a poor design, it's
    +	 * questionable for postgres_fdw to be doing that in the first place, but
    +	 * we cannot change it now.  The pattern has possibly been copied to other
    +	 * extensions too.
    +	 */
    +	AddWaitEventToSet(node->ms_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
    +					  MyLatch, NULL);
    +
    +	/* Return at most EVENT_BUFFER_SIZE events in one call. */
    +	if (nevents > EVENT_BUFFER_SIZE)
    +		nevents = EVENT_BUFFER_SIZE;
    +
    +	/*
    +	 * Wait until at least one event occurs.
    +	 */
    +	noccurred = WaitEventSetWait(node->ms_eventset, -1 /* no timeout */ , occurred_event,
    +								 nevents, WAIT_EVENT_APPEND_READY);
    +	FreeWaitEventSet(node->ms_eventset);
    +	node->ms_eventset = NULL;
    +	if (noccurred == 0)
    +		return;
    +
    +	/* Deliver notifications. */
    +	for (i = 0; i < noccurred; i++)
    +	{
    +		WaitEvent  *w = &occurred_event[i];
    +
    +		/*
    +		 * Each waiting subplan should have registered its wait event with
    +		 * user_data pointing back to its AsyncRequest.
    +		 */
    +		if ((w->events & WL_SOCKET_READABLE) != 0)
    +		{
    +			AsyncRequest *areq = (AsyncRequest *) w->user_data;
    +
    +			if (areq->callback_pending)
    +			{
    +				/*
    +				 * Mark it as no longer needing a callback.  We must do this
    +				 * before dispatching the callback in case the callback resets
    +				 * the flag.
    +				 */
    +				areq->callback_pending = false;
    +
    +				/* Do the actual work. */
    +				ExecAsyncNotify(areq);
    +			}
    +		}
    +
    +		/* Handle standard interrupts */
    +		if ((w->events & WL_LATCH_SET) != 0)
    +		{
    +			ResetLatch(MyLatch);
    +			CHECK_FOR_INTERRUPTS();
    +		}
    +	}
    +}
    diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
    index 94077e6a006..97e37a0ed64 100644
    --- a/src/backend/optimizer/path/costsize.c
    +++ b/src/backend/optimizer/path/costsize.c
    @@ -163,6 +163,7 @@ bool		enable_parallel_hash = true;
     bool		enable_partition_pruning = true;
     bool		enable_presorted_aggregate = true;
     bool		enable_async_append = true;
    +bool		enable_async_merge_append = true;
     
     typedef struct
     {
    diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
    index 0053befc8db..7de12dba0e2 100644
    --- a/src/backend/optimizer/plan/createplan.c
    +++ b/src/backend/optimizer/plan/createplan.c
    @@ -1464,6 +1464,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
     	List	   *subplans = NIL;
     	ListCell   *subpaths;
     	RelOptInfo *rel = best_path->path.parent;
    +	bool		consider_async = false;
     
     	/*
     	 * We don't have the actual creation of the MergeAppend node split out
    @@ -1478,6 +1479,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
     	plan->righttree = NULL;
     	node->apprelids = rel->relids;
     
    +	consider_async = (enable_async_merge_append &&
    +					  !best_path->path.parallel_safe &&
    +					  list_length(best_path->subpaths) > 1);
    +
     	/*
     	 * Compute sort column info, and adjust MergeAppend's tlist as needed.
     	 * Because we pass adjust_tlist_in_place = true, we may ignore the
    @@ -1578,6 +1583,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
     			subplan = sort_plan;
     		}
     
    +		/* If needed, check to see if subplan can be executed asynchronously */
    +		if (consider_async)
    +			mark_async_capable_plan(subplan, subpath);
    +
     		subplans = lappend(subplans, subplan);
     	}
     
    diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
    index d6fc8333850..bbb2afc2e13 100644
    --- a/src/backend/utils/misc/guc_parameters.dat
    +++ b/src/backend/utils/misc/guc_parameters.dat
    @@ -189,6 +189,13 @@
       boot_val => 'true',
     },
     
    +{ name => 'enable_async_merge_append', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD',
    +  short_desc => 'Enables the planner\'s use of async merge append plans.',
    +  flags => 'GUC_EXPLAIN',
    +  variable => 'enable_async_merge_append',
    +  boot_val => 'true',
    +},
    +
     { name => 'enable_self_join_elimination', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD',
       short_desc => 'Enables removal of unique self-joins.',
       flags => 'GUC_EXPLAIN',
    diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
    index f62b61967ef..daa26d6890b 100644
    --- a/src/backend/utils/misc/postgresql.conf.sample
    +++ b/src/backend/utils/misc/postgresql.conf.sample
    @@ -405,6 +405,7 @@
     # - Planner Method Configuration -
     
     #enable_async_append = on
    +#enable_async_merge_append = on
     #enable_bitmapscan = on
     #enable_gathermerge = on
     #enable_hashagg = on
    diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h
    index 4eb05dc30d6..e3fdb26ece6 100644
    --- a/src/include/executor/nodeMergeAppend.h
    +++ b/src/include/executor/nodeMergeAppend.h
    @@ -19,5 +19,6 @@
     extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags);
     extern void ExecEndMergeAppend(MergeAppendState *node);
     extern void ExecReScanMergeAppend(MergeAppendState *node);
    +extern void ExecAsyncMergeAppendResponse(AsyncRequest *areq);
     
     #endif							/* NODEMERGEAPPEND_H */
    diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
    index a36653c37f9..3a29f71c919 100644
    --- a/src/include/nodes/execnodes.h
    +++ b/src/include/nodes/execnodes.h
    @@ -1539,10 +1539,66 @@ typedef struct MergeAppendState
     	TupleTableSlot **ms_slots;	/* array of length ms_nplans */
     	struct binaryheap *ms_heap; /* binary heap of slot indices */
     	bool		ms_initialized; /* are subplans started? */
    +	Bitmapset  *ms_asyncplans;	/* asynchronous plans indexes */
    +	int			ms_nasyncplans; /* # of asynchronous plans */
    +	AsyncRequest **ms_asyncrequests;	/* array of AsyncRequests */
    +	TupleTableSlot **ms_asyncresults;	/* unreturned results of async plans */
    +	Bitmapset  *ms_has_asyncresults;	/* plans which have async results */
    +	Bitmapset  *ms_asyncremain; /* remaining asynchronous plans */
    +	Bitmapset  *ms_needrequest; /* asynchronous plans needing a new request */
    +	struct WaitEventSet *ms_eventset;	/* WaitEventSet used to configure file
    +										 * descriptor wait events */
     	struct PartitionPruneState *ms_prune_state;
    +	bool		ms_valid_subplans_identified;	/* is ms_valid_subplans valid? */
     	Bitmapset  *ms_valid_subplans;
    +	Bitmapset  *ms_valid_asyncplans;	/* valid asynchronous plans indexes */
     } MergeAppendState;
     
    +/* Getters for AppendState and MergeAppendState */
    +static inline struct WaitEventSet *
    +GetAppendEventSet(PlanState *ps)
    +{
    +	Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
    +
    +	if (IsA(ps, AppendState))
    +		return ((AppendState *) ps)->as_eventset;
    +	else
    +		return ((MergeAppendState *) ps)->ms_eventset;
    +}
    +
    +static inline Bitmapset *
    +GetNeedRequest(PlanState *ps)
    +{
    +	Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
    +
    +	if (IsA(ps, AppendState))
    +		return ((AppendState *) ps)->as_needrequest;
    +	else
    +		return ((MergeAppendState *) ps)->ms_needrequest;
    +}
    +
    +static inline Bitmapset *
    +GetValidAsyncplans(PlanState *ps)
    +{
    +	Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
    +
    +	if (IsA(ps, AppendState))
    +		return ((AppendState *) ps)->as_valid_asyncplans;
    +	else
    +		return ((MergeAppendState *) ps)->ms_valid_asyncplans;
    +}
    +
    +static inline AsyncRequest *
    +GetValidAsyncRequest(PlanState *ps, int nreq)
    +{
    +	Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState));
    +
    +	if (IsA(ps, AppendState))
    +		return ((AppendState *) ps)->as_asyncrequests[nreq];
    +	else
    +		return ((MergeAppendState *) ps)->ms_asyncrequests[nreq];
    +}
    +
     /* ----------------
      *	 RecursiveUnionState information
      *
    diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
    index b523bcda8f3..fee491b77ad 100644
    --- a/src/include/optimizer/cost.h
    +++ b/src/include/optimizer/cost.h
    @@ -70,6 +70,7 @@ extern PGDLLIMPORT bool enable_parallel_hash;
     extern PGDLLIMPORT bool enable_partition_pruning;
     extern PGDLLIMPORT bool enable_presorted_aggregate;
     extern PGDLLIMPORT bool enable_async_append;
    +extern PGDLLIMPORT bool enable_async_merge_append;
     extern PGDLLIMPORT int constraint_exclusion;
     
     extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
    diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
    index 3b37fafa65b..ae4fa42a438 100644
    --- a/src/test/regress/expected/sysviews.out
    +++ b/src/test/regress/expected/sysviews.out
    @@ -149,6 +149,7 @@ select name, setting from pg_settings where name like 'enable%';
                   name              | setting 
     --------------------------------+---------
      enable_async_append            | on
    + enable_async_merge_append      | on
      enable_bitmapscan              | on
      enable_distinct_reordering     | on
      enable_eager_aggregate         | on
    @@ -173,7 +174,7 @@ select name, setting from pg_settings where name like 'enable%';
      enable_seqscan                 | on
      enable_sort                    | on
      enable_tidscan                 | on
    -(25 rows)
    +(26 rows)
     
     -- There are always wait event descriptions for various types.  InjectionPoint
     -- may be present or absent, depending on history since last postmaster start.
    -- 
    2.47.3
    
    
    --nfti2epu7uxujyyg--