v1-0002-Use-tuplestore-in-PgFdwScanState-scan-state-to-limit.patch
text/x-diff
Filename: v1-0002-Use-tuplestore-in-PgFdwScanState-scan-state-to-limit.patch
Type: text/x-diff
Part: 1
From 44a4a6b8fc2910ab8bb241585855532da00f0e84 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <a.pyhalov@postgrespro.ru>
Date: Thu, 18 Dec 2025 09:17:42 +0300
Subject: [PATCH 2/2] Use tuplestore in PgFdwScanState scan state to limit
memory usage
A tuplestore doesn't preserve ctids, so we need to store them
separately.
---
contrib/postgres_fdw/postgres_fdw.c | 74 +++++++++++++++++++++++++----
1 file changed, 66 insertions(+), 8 deletions(-)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 5e178c21b39..389e906a5d5 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -158,8 +158,12 @@ typedef struct PgFdwScanState
const char **param_values; /* textual values of query parameters */
/* for storing result tuples */
- HeapTuple *tuples; /* array of currently-retrieved tuples */
- int num_tuples; /* # of tuples in array */
+ Tuplestorestate *result_store; /* currently-retrieved tuples */
+ ItemPointerData *ctids; /* separate store for ctids */
+
+ TupleTableSlot *tupstore_slot; /* slot needed for retrieving tuples from
+ * tuplestore */
+ int num_tuples; /* # of tuples in tuple store */
int next_tuple; /* index of next one to return */
/* batch-level state, for optimizing rewinds and avoiding useless fetch */
@@ -546,6 +550,8 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_i);
static int get_batch_size_option(Relation rel);
+static void clean_scan_state_result_store(PgFdwScanState *fsstate);
+
/*
* Foreign-data wrapper handler function: return a struct with pointers
@@ -1605,6 +1611,7 @@ postgresIterateForeignScan(ForeignScanState *node)
{
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ HeapTuple newtup;
/*
* In sync mode, if this is the first call after Begin or ReScan, we need
@@ -1631,12 +1638,31 @@ postgresIterateForeignScan(ForeignScanState *node)
return ExecClearTuple(slot);
}
+ if (!tuplestore_gettupleslot(fsstate->result_store, true, true,
+ fsstate->tupstore_slot))
+ {
+ /*
+ * We've checked that next_tuple is less than fsstate->num_tuples, so
+ * there should be some result
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("unexpected end of store")));
+
+ }
+
+ newtup = ExecFetchSlotHeapTuple(fsstate->tupstore_slot, true, NULL);
+ /* Tuple store doesn't preserve ctid, so restore it separately */
+ newtup->t_self = newtup->t_data->t_ctid = fsstate->ctids[fsstate->next_tuple];
+ ExecClearTuple(fsstate->tupstore_slot);
+
/*
* Return the next tuple.
*/
- ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
+ ExecStoreHeapTuple(newtup,
slot,
false);
+ fsstate->next_tuple++;
return slot;
}
@@ -1699,6 +1725,8 @@ postgresReScanForeignScan(ForeignScanState *node)
{
/* Easy: just rescan what we already have in memory, if anything */
fsstate->next_tuple = 0;
+ if (fsstate->result_store)
+ tuplestore_rescan(fsstate->result_store);
return;
}
@@ -1708,7 +1736,7 @@ postgresReScanForeignScan(ForeignScanState *node)
PQclear(res);
/* Now force a fresh FETCH. */
- fsstate->tuples = NULL;
+ clean_scan_state_result_store(fsstate);
fsstate->num_tuples = 0;
fsstate->next_tuple = 0;
fsstate->fetch_ct_2 = 0;
@@ -1737,6 +1765,8 @@ postgresEndForeignScan(ForeignScanState *node)
ReleaseConnection(fsstate->conn);
fsstate->conn = NULL;
+ clean_scan_state_result_store(fsstate);
+
/* MemoryContexts will be deleted automatically. */
}
@@ -3781,7 +3811,8 @@ create_cursor(ForeignScanState *node)
/* Mark the cursor as created, and show no tuples have been retrieved */
fsstate->cursor_exists = true;
- fsstate->tuples = NULL;
+ clean_scan_state_result_store(fsstate);
+
fsstate->num_tuples = 0;
fsstate->next_tuple = 0;
fsstate->fetch_ct_2 = 0;
@@ -3808,7 +3839,8 @@ fetch_more_data(ForeignScanState *node)
* We'll store the tuples in the batch_cxt. First, flush the previous
* batch.
*/
- fsstate->tuples = NULL;
+ clean_scan_state_result_store(fsstate);
+
MemoryContextReset(fsstate->batch_cxt);
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
@@ -3844,21 +3876,28 @@ fetch_more_data(ForeignScanState *node)
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ fsstate->result_store = tuplestore_begin_heap(true, false, work_mem);
+ fsstate->tupstore_slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
fsstate->num_tuples = numrows;
fsstate->next_tuple = 0;
+ fsstate->ctids = palloc0(numrows * sizeof(ItemPointerData));
for (i = 0; i < numrows; i++)
{
+ HeapTuple newtup;
+
Assert(IsA(node->ss.ps.plan, ForeignScan));
- fsstate->tuples[i] =
+ newtup =
make_tuple_from_result_row(res, i,
fsstate->rel,
fsstate->attinmeta,
fsstate->retrieved_attrs,
node,
fsstate->temp_cxt);
+ tuplestore_puttuple(fsstate->result_store, newtup);
+ ItemPointerCopy(&newtup->t_self, &fsstate->ctids[i]);
+ heap_freetuple(newtup);
}
/* Update fetch_ct_2 */
@@ -7636,6 +7675,25 @@ make_tuple_from_result_row(PGresult *res,
return tuple;
}
+/*
+ * Clean PgFdwScanState result store
+ */
+static void
+clean_scan_state_result_store(PgFdwScanState *fsstate)
+{
+ if (fsstate->result_store)
+ {
+ /*
+ * We partially rely on the fact that batch_cxt is also reset
+ */
+ tuplestore_end(fsstate->result_store);
+ ExecDropSingleTupleTableSlot(fsstate->tupstore_slot);
+ fsstate->result_store = NULL;
+ fsstate->tupstore_slot = NULL;
+ fsstate->ctids = NULL;
+ }
+}
+
/*
* Callback function which is called when error occurs during column value
* conversion. Print names of column and relation.
--
2.43.0