v4-0001-postgres_fdw-Use-COPY-to-speed-up-batch-inserts.patch
text/plain
Filename: v4-0001-postgres_fdw-Use-COPY-to-speed-up-batch-inserts.patch
Type: text/plain
Part: 0
From 1856fba0c49d5aa7b164debb90b376c72cfa3e02 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 10 Oct 2025 16:07:08 -0300
Subject: [PATCH v4] postgres_fdw: Use COPY to speed up batch inserts
---
contrib/postgres_fdw/deparse.c | 30 ++++
.../postgres_fdw/expected/postgres_fdw.out | 120 +++++++++++--
contrib/postgres_fdw/postgres_fdw.c | 159 +++++++++++++++++-
contrib/postgres_fdw/postgres_fdw.h | 1 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 52 ++++++
5 files changed, 350 insertions(+), 12 deletions(-)
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index f2fb0051843..113e6fb7d91 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -2236,6 +2236,36 @@ rebuildInsertSql(StringInfo buf, Relation rel,
appendStringInfoString(buf, orig_query + values_end_len);
}
+/*
+ * Build a COPY FROM STDIN statement using the TEXT format
+ */
+void
+buildCopySql(StringInfo buf, Relation rel, List *target_attrs)
+{
+ TupleDesc tupdesc = RelationGetDescr(rel);
+ bool first = true;
+
+ appendStringInfo(buf, "COPY ");
+ deparseRelation(buf, rel);
+ appendStringInfo(buf, "(");
+
+ foreach_int(attnum, target_attrs)
+ {
+ Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+ if (attr->attgenerated)
+ continue;
+
+ if (!first)
+ appendStringInfoString(buf, ", ");
+
+ first = false;
+
+ appendStringInfoString(buf, quote_identifier(NameStr(attr->attname)));
+ }
+ appendStringInfoString(buf, ") FROM STDIN");
+}
+
/*
* deparse remote UPDATE statement
*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index cd28126049d..dd507ad6186 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -50,6 +50,18 @@ CREATE TABLE "S 1"."T 4" (
c3 text,
CONSTRAINT t4_pkey PRIMARY KEY (c1)
);
+CREATE TABLE "S 1"."T 5"(
+ x int
+);
+CREATE TABLE "S 1"."T 6"(
+ id int not null,
+ note text,
+ value int NOT NULL
+);
+CREATE TABLE "S 1"."T 7"(
+ id int,
+ t text
+);
-- Disable autovacuum for these tables to avoid unexpected effects of that
ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false');
ALTER TABLE "S 1"."T 2" SET (autovacuum_enabled = 'false');
@@ -132,6 +144,21 @@ CREATE FOREIGN TABLE ft7 (
c2 int NOT NULL,
c3 text
) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE ft8 (
+ x int
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5', batch_size '10');
+CREATE FOREIGN TABLE ft9 (
+ id int not null,
+ note text,
+ value int NOT NULL
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 6', batch_size '10');
+CREATE FOREIGN TABLE ft10 (
+ id int,
+ t text
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 7', batch_size '10');
-- ===================================================================
-- tests for validator
-- ===================================================================
@@ -205,16 +232,19 @@ ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1');
ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
\det+
- List of foreign tables
- Schema | Table | Server | FDW options | Description
---------+-------+-----------+---------------------------------------+-------------
- public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') |
- public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') |
- public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') |
- public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') |
- public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') |
- public | ft7 | loopback3 | (schema_name 'S 1', table_name 'T 4') |
-(6 rows)
+ List of foreign tables
+ Schema | Table | Server | FDW options | Description
+--------+-------+-----------+--------------------------------------------------------+-------------
+ public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') |
+ public | ft10 | loopback | (schema_name 'S 1', table_name 'T 7', batch_size '10') |
+ public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') |
+ public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') |
+ public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') |
+ public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') |
+ public | ft7 | loopback3 | (schema_name 'S 1', table_name 'T 4') |
+ public | ft8 | loopback | (schema_name 'S 1', table_name 'T 5', batch_size '10') |
+ public | ft9 | loopback | (schema_name 'S 1', table_name 'T 6', batch_size '10') |
+(9 rows)
-- Test that alteration of server options causes reconnection
-- Remote's errors might be non-English, so hide them to ensure stable results
@@ -12664,6 +12694,76 @@ ANALYZE analyze_ftable;
-- cleanup
DROP FOREIGN TABLE analyze_ftable;
DROP TABLE analyze_table;
+-- ===================================================================
+-- test for batch insert using COPY
+-- ===================================================================
+ALTER FOREIGN TABLE ft8 DROP COLUMN x;
+ALTER FOREIGN TABLE ft8 add COLUMN x int;
+INSERT INTO ft8 SELECT * FROM generate_series(1, 10) i;
+SELECT * FROM ft8;
+ x
+----
+ 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+ 10
+(10 rows)
+
+EXPLAIN(ANALYZE, VERBOSE, COSTS OFF, SUMMARY OFF, BUFFERS OFF, TIMING OFF) INSERT INTO ft9 (id, value, note)
+SELECT g,
+ g * 2,
+ 'batch insert test data' || g
+FROM generate_series(1, 20) g;
+ QUERY PLAN
+---------------------------------------------------------------------------------
+ Insert on public.ft9 (actual rows=0.00 loops=1)
+ Remote SQL: COPY "S 1"."T 6"(id, note, value) FROM STDIN
+ Batch Size: 10
+ -> Function Scan on pg_catalog.generate_series g (actual rows=20.00 loops=1)
+ Output: g.g, ('batch insert test data'::text || (g.g)::text), (g.g * 2)
+ Function Call: generate_series(1, 20)
+(6 rows)
+
+SELECT * FROM ft9;
+ id | note | value
+----+--------------------------+-------
+ 1 | batch insert test data1 | 2
+ 2 | batch insert test data2 | 4
+ 3 | batch insert test data3 | 6
+ 4 | batch insert test data4 | 8
+ 5 | batch insert test data5 | 10
+ 6 | batch insert test data6 | 12
+ 7 | batch insert test data7 | 14
+ 8 | batch insert test data8 | 16
+ 9 | batch insert test data9 | 18
+ 10 | batch insert test data10 | 20
+ 11 | batch insert test data11 | 22
+ 12 | batch insert test data12 | 24
+ 13 | batch insert test data13 | 26
+ 14 | batch insert test data14 | 28
+ 15 | batch insert test data15 | 30
+ 16 | batch insert test data16 | 32
+ 17 | batch insert test data17 | 34
+ 18 | batch insert test data18 | 36
+ 19 | batch insert test data19 | 38
+ 20 | batch insert test data20 | 40
+(20 rows)
+
+-- Test buffer limit of copy data on COPYBUFSIZ
+INSERT INTO ft10 (id, t)
+SELECT s, repeat(md5(s::text), 10000) from generate_series(100, 103) s;
+SELECT COUNT(*) FROM ft10;
+ count
+-------
+ 4
+(1 row)
+
-- ===================================================================
-- test for postgres_fdw_get_connections function with check_conn = true
-- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 456b267f70b..d8e13e78938 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT(
/* If no remote estimates, assume a sort costs 20% extra */
#define DEFAULT_FDW_SORT_MULTIPLIER 1.2
+/* Buffer size to send COPY IN data*/
+#define COPYBUFSIZ 8192
+
/*
* Indexes of FDW-private information stored in fdw_private lists.
*
@@ -192,6 +195,7 @@ typedef struct PgFdwModifyState
/* extracted fdw_private data */
char *query; /* text of INSERT/UPDATE/DELETE command */
char *orig_query; /* original text of INSERT command */
+ char *copy_query; /* text of COPY command if it's being used */
List *target_attrs; /* list of target attribute numbers */
int values_end; /* length up to the end of VALUES */
int batch_size; /* value of FDW option "batch_size" */
@@ -545,6 +549,9 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_o,
const PgFdwRelationInfo *fpinfo_i);
static int get_batch_size_option(Relation rel);
+static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState *fmstate,
+ TupleTableSlot **slots,
+ int *numSlots);
/*
@@ -2942,8 +2949,23 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
{
if (es->verbose)
{
- char *sql = strVal(list_nth(fdw_private,
- FdwModifyPrivateUpdateSql));
+ char *sql = NULL;
+
+ /*
+ * We only have ri_FdwState during EXPLAIN(ANALYZE), so check if the
+ * COPY was used during query execution and show it as a Remote SQL.
+ */
+ if (rinfo->ri_FdwState != NULL)
+ {
+ PgFdwModifyState *fmstate = (PgFdwModifyState *) rinfo->ri_FdwState;
+
+ if (fmstate->copy_query != NULL)
+ sql = fmstate->copy_query;
+ }
+
+ if (sql == NULL)
+ sql = strVal(list_nth(fdw_private,
+ FdwModifyPrivateUpdateSql));
ExplainPropertyText("Remote SQL", sql, es);
@@ -4066,6 +4088,50 @@ create_foreign_modify(EState *estate,
return fmstate;
}
+/*
+ * Write target attribute values from fmstate into buf buffer to be sent as
+ * COPY FROM STDIN data
+ */
+static void
+convert_slot_to_copy_text(StringInfo buf,
+ PgFdwModifyState *fmstate,
+ TupleTableSlot *slot)
+{
+ TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
+ bool first = true;
+ int i = 0;
+
+ foreach_int(attnum, fmstate->target_attrs)
+ {
+ CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1);
+ Datum datum;
+ bool isnull;
+
+ /* Ignore generated columns; they are set to DEFAULT */
+ if (attr->attgenerated)
+ continue;
+
+ if (!first)
+ appendStringInfoCharMacro(buf, '\t');
+ first = false;
+
+ datum = slot_getattr(slot, attnum, &isnull);
+
+ if (isnull)
+ appendStringInfoString(buf, "\\N");
+ else
+ {
+ const char *value = OutputFunctionCall(&fmstate->p_flinfo[i],
+ datum);
+
+ appendStringInfoString(buf, value);
+ }
+ i++;
+ }
+
+ appendStringInfoCharMacro(buf, '\n');
+}
+
/*
* execute_foreign_modify
* Perform foreign-table modification as required, and fetch RETURNING
@@ -4097,6 +4163,13 @@ execute_foreign_modify(EState *estate,
if (fmstate->conn_state->pendingAreq)
process_pending_request(fmstate->conn_state->pendingAreq);
+ /*
+ * Use COPY command for batch insert if the original query don't include a
+ * RETURNING clause
+ */
+ if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning)
+ return execute_foreign_insert_using_copy(fmstate, slots, numSlots);
+
/*
* If the existing query was deparsed and prepared for a different number
* of rows, rebuild it for the proper number.
@@ -7886,3 +7959,85 @@ get_batch_size_option(Relation rel)
return batch_size;
}
+
+/* Execute a batch insert into a foreign table using the COPY command */
+static TupleTableSlot **
+execute_foreign_insert_using_copy(PgFdwModifyState *fmstate,
+ TupleTableSlot **slots,
+ int *numSlots)
+{
+ PGresult *res;
+ StringInfoData sql;
+ StringInfoData copy_data;
+ int n_rows;
+ int i;
+
+ if (fmstate->copy_query == NULL)
+ {
+ /* Build COPY command */
+ initStringInfo(&sql);
+ buildCopySql(&sql, fmstate->rel, fmstate->target_attrs);
+
+ /* Cache for reuse. */
+ fmstate->copy_query = sql.data;
+ }
+
+ /* Send COPY command */
+ if (!PQsendQuery(fmstate->conn, fmstate->copy_query))
+ pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+ /* get the COPY result */
+ res = pgfdw_get_result(fmstate->conn);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+ /* Convert the TupleTableSlot data into a TEXT-formatted line */
+ initStringInfo(©_data);
+ for (i = 0; i < *numSlots; i++)
+ {
+ convert_slot_to_copy_text(©_data, fmstate, slots[i]);
+
+ /*
+ * Send initial COPY data if the buffer reach the limit to avoid large
+ * memory usage.
+ */
+ if (copy_data.len >= COPYBUFSIZ)
+ {
+ if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0)
+ pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+ resetStringInfo(©_data);
+ }
+ }
+
+ /* Send the remaining COPY data */
+ if (copy_data.len > 0)
+ {
+ if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0)
+ pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+ }
+
+ /* End the COPY operation */
+ if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn))
+ pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+ /*
+ * Get the result, and check for success.
+ */
+ res = pgfdw_get_result(fmstate->conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+ n_rows = atoi(PQcmdTuples(res));
+
+ /* And clean up */
+ PQclear(res);
+
+ MemoryContextReset(fmstate->temp_cxt);
+
+ *numSlots = n_rows;
+
+ /*
+ * Return NULL if nothing was inserted on the remote end
+ */
+ return (n_rows > 0) ? slots : NULL;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index e69735298d7..c0198b865f3 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel,
char *orig_query, List *target_attrs,
int values_end_len, int num_params,
int num_rows);
+extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs);
extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9a8f9e28135..79f4f305641 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -54,6 +54,18 @@ CREATE TABLE "S 1"."T 4" (
c3 text,
CONSTRAINT t4_pkey PRIMARY KEY (c1)
);
+CREATE TABLE "S 1"."T 5"(
+ x int
+);
+CREATE TABLE "S 1"."T 6"(
+ id int not null,
+ note text,
+ value int NOT NULL
+);
+CREATE TABLE "S 1"."T 7"(
+ id int,
+ t text
+);
-- Disable autovacuum for these tables to avoid unexpected effects of that
ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false');
@@ -146,6 +158,24 @@ CREATE FOREIGN TABLE ft7 (
c3 text
) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE ft8 (
+ x int
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5', batch_size '10');
+
+CREATE FOREIGN TABLE ft9 (
+ id int not null,
+ note text,
+ value int NOT NULL
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 6', batch_size '10');
+
+CREATE FOREIGN TABLE ft10 (
+ id int,
+ t text
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 7', batch_size '10');
+
-- ===================================================================
-- tests for validator
-- ===================================================================
@@ -4379,6 +4409,28 @@ ANALYZE analyze_ftable;
DROP FOREIGN TABLE analyze_ftable;
DROP TABLE analyze_table;
+-- ===================================================================
+-- test for batch insert using COPY
+-- ===================================================================
+ALTER FOREIGN TABLE ft8 DROP COLUMN x;
+ALTER FOREIGN TABLE ft8 add COLUMN x int;
+
+INSERT INTO ft8 SELECT * FROM generate_series(1, 10) i;
+SELECT * FROM ft8;
+
+EXPLAIN(ANALYZE, VERBOSE, COSTS OFF, SUMMARY OFF, BUFFERS OFF, TIMING OFF) INSERT INTO ft9 (id, value, note)
+SELECT g,
+ g * 2,
+ 'batch insert test data' || g
+FROM generate_series(1, 20) g;
+
+SELECT * FROM ft9;
+
+-- Test buffer limit of copy data on COPYBUFSIZ
+INSERT INTO ft10 (id, t)
+SELECT s, repeat(md5(s::text), 10000) from generate_series(100, 103) s;
+SELECT COUNT(*) FROM ft10;
+
-- ===================================================================
-- test for postgres_fdw_get_connections function with check_conn = true
-- ===================================================================
--
2.51.0