v4-0001-postgres_fdw-Use-COPY-to-speed-up-batch-inserts.patch

text/plain

Filename: v4-0001-postgres_fdw-Use-COPY-to-speed-up-batch-inserts.patch
Type: text/plain
Part: 0
Message: Re: postgres_fdw: Use COPY to speed up batch inserts
From 1856fba0c49d5aa7b164debb90b376c72cfa3e02 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Fri, 10 Oct 2025 16:07:08 -0300
Subject: [PATCH v4] postgres_fdw: Use COPY to speed up batch inserts

---
 contrib/postgres_fdw/deparse.c                |  30 ++++
 .../postgres_fdw/expected/postgres_fdw.out    | 120 +++++++++++--
 contrib/postgres_fdw/postgres_fdw.c           | 159 +++++++++++++++++-
 contrib/postgres_fdw/postgres_fdw.h           |   1 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  52 ++++++
 5 files changed, 350 insertions(+), 12 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index f2fb0051843..113e6fb7d91 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -2236,6 +2236,36 @@ rebuildInsertSql(StringInfo buf, Relation rel,
 	appendStringInfoString(buf, orig_query + values_end_len);
 }
 
+/*
+ *  Build a COPY FROM STDIN statement using the TEXT format
+ */
+void
+buildCopySql(StringInfo buf, Relation rel, List *target_attrs)
+{
+	TupleDesc	tupdesc = RelationGetDescr(rel);
+	bool		first = true;
+
+	appendStringInfo(buf, "COPY ");
+	deparseRelation(buf, rel);
+	appendStringInfo(buf, "(");
+
+	foreach_int(attnum, target_attrs)
+	{
+		Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+		if (attr->attgenerated)
+			continue;
+
+		if (!first)
+			appendStringInfoString(buf, ", ");
+
+		first = false;
+
+		appendStringInfoString(buf, quote_identifier(NameStr(attr->attname)));
+	}
+	appendStringInfoString(buf, ") FROM STDIN");
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index cd28126049d..dd507ad6186 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -50,6 +50,18 @@ CREATE TABLE "S 1"."T 4" (
 	c3 text,
 	CONSTRAINT t4_pkey PRIMARY KEY (c1)
 );
+CREATE TABLE "S 1"."T 5"(
+    x int
+);
+CREATE TABLE "S 1"."T 6"(
+    id int not null,
+    note text,
+    value int NOT NULL
+);
+CREATE TABLE "S 1"."T 7"(
+    id int,
+    t text
+);
 -- Disable autovacuum for these tables to avoid unexpected effects of that
 ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false');
 ALTER TABLE "S 1"."T 2" SET (autovacuum_enabled = 'false');
@@ -132,6 +144,21 @@ CREATE FOREIGN TABLE ft7 (
 	c2 int NOT NULL,
 	c3 text
 ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE ft8 (
+    x int
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5', batch_size '10');
+CREATE FOREIGN TABLE ft9 (
+    id int not null,
+    note text,
+    value int NOT NULL
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 6', batch_size '10');
+CREATE FOREIGN TABLE ft10 (
+    id int,
+    t text
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 7', batch_size '10');
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -205,16 +232,19 @@ ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1');
 ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
 \det+
-                              List of foreign tables
- Schema | Table |  Server   |              FDW options              | Description 
---------+-------+-----------+---------------------------------------+-------------
- public | ft1   | loopback  | (schema_name 'S 1', table_name 'T 1') | 
- public | ft2   | loopback  | (schema_name 'S 1', table_name 'T 1') | 
- public | ft4   | loopback  | (schema_name 'S 1', table_name 'T 3') | 
- public | ft5   | loopback  | (schema_name 'S 1', table_name 'T 4') | 
- public | ft6   | loopback2 | (schema_name 'S 1', table_name 'T 4') | 
- public | ft7   | loopback3 | (schema_name 'S 1', table_name 'T 4') | 
-(6 rows)
+                                      List of foreign tables
+ Schema | Table |  Server   |                      FDW options                       | Description 
+--------+-------+-----------+--------------------------------------------------------+-------------
+ public | ft1   | loopback  | (schema_name 'S 1', table_name 'T 1')                  | 
+ public | ft10  | loopback  | (schema_name 'S 1', table_name 'T 7', batch_size '10') | 
+ public | ft2   | loopback  | (schema_name 'S 1', table_name 'T 1')                  | 
+ public | ft4   | loopback  | (schema_name 'S 1', table_name 'T 3')                  | 
+ public | ft5   | loopback  | (schema_name 'S 1', table_name 'T 4')                  | 
+ public | ft6   | loopback2 | (schema_name 'S 1', table_name 'T 4')                  | 
+ public | ft7   | loopback3 | (schema_name 'S 1', table_name 'T 4')                  | 
+ public | ft8   | loopback  | (schema_name 'S 1', table_name 'T 5', batch_size '10') | 
+ public | ft9   | loopback  | (schema_name 'S 1', table_name 'T 6', batch_size '10') | 
+(9 rows)
 
 -- Test that alteration of server options causes reconnection
 -- Remote's errors might be non-English, so hide them to ensure stable results
@@ -12664,6 +12694,76 @@ ANALYZE analyze_ftable;
 -- cleanup
 DROP FOREIGN TABLE analyze_ftable;
 DROP TABLE analyze_table;
+-- ===================================================================
+-- test for batch insert using COPY
+-- ===================================================================
+ALTER FOREIGN TABLE ft8 DROP COLUMN x;
+ALTER FOREIGN TABLE ft8 add COLUMN x int;
+INSERT INTO ft8 SELECT * FROM generate_series(1, 10) i;
+SELECT * FROM ft8;
+ x  
+----
+  1
+  2
+  3
+  4
+  5
+  6
+  7
+  8
+  9
+ 10
+(10 rows)
+
+EXPLAIN(ANALYZE, VERBOSE, COSTS OFF, SUMMARY OFF, BUFFERS OFF, TIMING OFF) INSERT INTO ft9 (id, value, note)
+SELECT g,
+       g * 2,
+       'batch insert test data' || g
+FROM generate_series(1, 20) g;
+                                   QUERY PLAN                                    
+---------------------------------------------------------------------------------
+ Insert on public.ft9 (actual rows=0.00 loops=1)
+   Remote SQL: COPY "S 1"."T 6"(id, note, value) FROM STDIN
+   Batch Size: 10
+   ->  Function Scan on pg_catalog.generate_series g (actual rows=20.00 loops=1)
+         Output: g.g, ('batch insert test data'::text || (g.g)::text), (g.g * 2)
+         Function Call: generate_series(1, 20)
+(6 rows)
+
+SELECT * FROM ft9;
+ id |           note           | value 
+----+--------------------------+-------
+  1 | batch insert test data1  |     2
+  2 | batch insert test data2  |     4
+  3 | batch insert test data3  |     6
+  4 | batch insert test data4  |     8
+  5 | batch insert test data5  |    10
+  6 | batch insert test data6  |    12
+  7 | batch insert test data7  |    14
+  8 | batch insert test data8  |    16
+  9 | batch insert test data9  |    18
+ 10 | batch insert test data10 |    20
+ 11 | batch insert test data11 |    22
+ 12 | batch insert test data12 |    24
+ 13 | batch insert test data13 |    26
+ 14 | batch insert test data14 |    28
+ 15 | batch insert test data15 |    30
+ 16 | batch insert test data16 |    32
+ 17 | batch insert test data17 |    34
+ 18 | batch insert test data18 |    36
+ 19 | batch insert test data19 |    38
+ 20 | batch insert test data20 |    40
+(20 rows)
+
+-- Test buffer limit of copy data on COPYBUFSIZ
+INSERT INTO ft10 (id, t)
+SELECT s, repeat(md5(s::text), 10000) from generate_series(100, 103) s;
+SELECT COUNT(*) FROM ft10;
+ count 
+-------
+     4
+(1 row)
+
 -- ===================================================================
 -- test for postgres_fdw_get_connections function with check_conn = true
 -- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 456b267f70b..d8e13e78938 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT(
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Buffer size to send COPY IN data*/
+#define COPYBUFSIZ 8192
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -192,6 +195,7 @@ typedef struct PgFdwModifyState
 	/* extracted fdw_private data */
 	char	   *query;			/* text of INSERT/UPDATE/DELETE command */
 	char	   *orig_query;		/* original text of INSERT command */
+	char	   *copy_query;		/* text of COPY command if it's being used */
 	List	   *target_attrs;	/* list of target attribute numbers */
 	int			values_end;		/* length up to the end of VALUES */
 	int			batch_size;		/* value of FDW option "batch_size" */
@@ -545,6 +549,9 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_o,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int	get_batch_size_option(Relation rel);
+static TupleTableSlot **execute_foreign_insert_using_copy(PgFdwModifyState *fmstate,
+														  TupleTableSlot **slots,
+														  int *numSlots);
 
 
 /*
@@ -2942,8 +2949,23 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
 {
 	if (es->verbose)
 	{
-		char	   *sql = strVal(list_nth(fdw_private,
-										  FdwModifyPrivateUpdateSql));
+		char	   *sql = NULL;
+
+		/*
+		 * We only have ri_FdwState during EXPLAIN(ANALYZE), so check if the
+		 * COPY was used during query execution and show it as a Remote SQL.
+		 */
+		if (rinfo->ri_FdwState != NULL)
+		{
+			PgFdwModifyState *fmstate = (PgFdwModifyState *) rinfo->ri_FdwState;
+
+			if (fmstate->copy_query != NULL)
+				sql = fmstate->copy_query;
+		}
+
+		if (sql == NULL)
+			sql = strVal(list_nth(fdw_private,
+								  FdwModifyPrivateUpdateSql));
 
 		ExplainPropertyText("Remote SQL", sql, es);
 
@@ -4066,6 +4088,50 @@ create_foreign_modify(EState *estate,
 	return fmstate;
 }
 
+/*
+ *  Write target attribute values from fmstate into buf buffer to be sent as
+ *  COPY FROM STDIN data
+ */
+static void
+convert_slot_to_copy_text(StringInfo buf,
+						  PgFdwModifyState *fmstate,
+						  TupleTableSlot *slot)
+{
+	TupleDesc	tupdesc = RelationGetDescr(fmstate->rel);
+	bool		first = true;
+	int			i = 0;
+
+	foreach_int(attnum, fmstate->target_attrs)
+	{
+		CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 1);
+		Datum		datum;
+		bool		isnull;
+
+		/* Ignore generated columns; they are set to DEFAULT */
+		if (attr->attgenerated)
+			continue;
+
+		if (!first)
+			appendStringInfoCharMacro(buf, '\t');
+		first = false;
+
+		datum = slot_getattr(slot, attnum, &isnull);
+
+		if (isnull)
+			appendStringInfoString(buf, "\\N");
+		else
+		{
+			const char *value = OutputFunctionCall(&fmstate->p_flinfo[i],
+												   datum);
+
+			appendStringInfoString(buf, value);
+		}
+		i++;
+	}
+
+	appendStringInfoCharMacro(buf, '\n');
+}
+
 /*
  * execute_foreign_modify
  *		Perform foreign-table modification as required, and fetch RETURNING
@@ -4097,6 +4163,13 @@ execute_foreign_modify(EState *estate,
 	if (fmstate->conn_state->pendingAreq)
 		process_pending_request(fmstate->conn_state->pendingAreq);
 
+	/*
+	 * Use COPY command for batch insert if the original query don't include a
+	 * RETURNING clause
+	 */
+	if (operation == CMD_INSERT && *numSlots > 1 && !fmstate->has_returning)
+		return execute_foreign_insert_using_copy(fmstate, slots, numSlots);
+
 	/*
 	 * If the existing query was deparsed and prepared for a different number
 	 * of rows, rebuild it for the proper number.
@@ -7886,3 +7959,85 @@ get_batch_size_option(Relation rel)
 
 	return batch_size;
 }
+
+/*  Execute a batch insert into a foreign table using the COPY command */
+static TupleTableSlot **
+execute_foreign_insert_using_copy(PgFdwModifyState *fmstate,
+								  TupleTableSlot **slots,
+								  int *numSlots)
+{
+	PGresult   *res;
+	StringInfoData sql;
+	StringInfoData copy_data;
+	int			n_rows;
+	int			i;
+
+	if (fmstate->copy_query == NULL)
+	{
+		/* Build COPY command */
+		initStringInfo(&sql);
+		buildCopySql(&sql, fmstate->rel, fmstate->target_attrs);
+
+		/* Cache for reuse. */
+		fmstate->copy_query = sql.data;
+	}
+
+	/* Send COPY command */
+	if (!PQsendQuery(fmstate->conn, fmstate->copy_query))
+		pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+	/* get the COPY result */
+	res = pgfdw_get_result(fmstate->conn);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+	/* Convert the TupleTableSlot data into a TEXT-formatted line */
+	initStringInfo(&copy_data);
+	for (i = 0; i < *numSlots; i++)
+	{
+		convert_slot_to_copy_text(&copy_data, fmstate, slots[i]);
+
+		/*
+		 * Send initial COPY data if the buffer reach the limit to avoid large
+		 * memory usage.
+		 */
+		if (copy_data.len >= COPYBUFSIZ)
+		{
+			if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0)
+				pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+			resetStringInfo(&copy_data);
+		}
+	}
+
+	/* Send the remaining COPY data */
+	if (copy_data.len > 0)
+	{
+		if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) <= 0)
+			pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+	}
+
+	/* End the COPY operation */
+	if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn))
+		pgfdw_report_error(NULL, fmstate->conn, fmstate->copy_query);
+
+	/*
+	 * Get the result, and check for success.
+	 */
+	res = pgfdw_get_result(fmstate->conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(res, fmstate->conn, fmstate->copy_query);
+
+	n_rows = atoi(PQcmdTuples(res));
+
+	/* And clean up */
+	PQclear(res);
+
+	MemoryContextReset(fmstate->temp_cxt);
+
+	*numSlots = n_rows;
+
+	/*
+	 * Return NULL if nothing was inserted on the remote end
+	 */
+	return (n_rows > 0) ? slots : NULL;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index e69735298d7..c0198b865f3 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel,
 							 char *orig_query, List *target_attrs,
 							 int values_end_len, int num_params,
 							 int num_rows);
+extern void buildCopySql(StringInfo buf, Relation rel, List *target_attrs);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9a8f9e28135..79f4f305641 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -54,6 +54,18 @@ CREATE TABLE "S 1"."T 4" (
 	c3 text,
 	CONSTRAINT t4_pkey PRIMARY KEY (c1)
 );
+CREATE TABLE "S 1"."T 5"(
+    x int
+);
+CREATE TABLE "S 1"."T 6"(
+    id int not null,
+    note text,
+    value int NOT NULL
+);
+CREATE TABLE "S 1"."T 7"(
+    id int,
+    t text
+);
 
 -- Disable autovacuum for these tables to avoid unexpected effects of that
 ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false');
@@ -146,6 +158,24 @@ CREATE FOREIGN TABLE ft7 (
 	c3 text
 ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
 
+CREATE FOREIGN TABLE ft8 (
+    x int
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5', batch_size '10');
+
+CREATE FOREIGN TABLE ft9 (
+    id int not null,
+    note text,
+    value int NOT NULL
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 6', batch_size '10');
+
+CREATE FOREIGN TABLE ft10 (
+    id int,
+    t text
+)
+SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 7', batch_size '10');
+
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -4379,6 +4409,28 @@ ANALYZE analyze_ftable;
 DROP FOREIGN TABLE analyze_ftable;
 DROP TABLE analyze_table;
 
+-- ===================================================================
+-- test for batch insert using COPY
+-- ===================================================================
+ALTER FOREIGN TABLE ft8 DROP COLUMN x;
+ALTER FOREIGN TABLE ft8 add COLUMN x int;
+
+INSERT INTO ft8 SELECT * FROM generate_series(1, 10) i;
+SELECT * FROM ft8;
+
+EXPLAIN(ANALYZE, VERBOSE, COSTS OFF, SUMMARY OFF, BUFFERS OFF, TIMING OFF) INSERT INTO ft9 (id, value, note)
+SELECT g,
+       g * 2,
+       'batch insert test data' || g
+FROM generate_series(1, 20) g;
+
+SELECT * FROM ft9;
+
+-- Test buffer limit of copy data on COPYBUFSIZ
+INSERT INTO ft10 (id, t)
+SELECT s, repeat(md5(s::text), 10000) from generate_series(100, 103) s;
+SELECT COUNT(*) FROM ft10;
+
 -- ===================================================================
 -- test for postgres_fdw_get_connections function with check_conn = true
 -- ===================================================================
-- 
2.51.0