v1-0001-parallel-pgbench-wip.patch

text/plain

Filename: v1-0001-parallel-pgbench-wip.patch
Type: text/plain
Part: 0
Message: parallel data loading for pgbench -i
From 18d91ec9c22d43522dc1cd83c16359c36b3dc58d Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <cadariu.mircea@gmail.com>
Date: Sun, 9 Nov 2025 10:41:51 +0000
Subject: [PATCH v1] wip

---
 src/bin/pgbench/pgbench.c | 455 +++++++++++++++++++++++++++++++++++---
 1 file changed, 420 insertions(+), 35 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index a425176ecd..ef4e05678a 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -78,6 +78,8 @@
 #define ERRCODE_T_R_DEADLOCK_DETECTED  "40P01"
 #define ERRCODE_UNDEFINED_TABLE  "42P01"
 
+#define COPY_BATCH_SIZE	(1024 * 1024)
+
 /*
  * Hashing constants
  */
@@ -825,7 +827,6 @@ static const BuiltinScript builtin_script[] =
 	}
 };
 
-
 /* Function prototypes */
 static void setNullValue(PgBenchValue *pv);
 static void setBoolValue(PgBenchValue *pv, bool bval);
@@ -848,6 +849,8 @@ static void clear_socket_set(socket_set *sa);
 static void add_socket_to_set(socket_set *sa, int fd, int idx);
 static int	wait_on_socket_set(socket_set *sa, int64 usecs);
 static bool socket_has_input(socket_set *sa, int fd, int idx);
+static void createPartitions(PGconn *con, int part_start, int part_end);
+static void attachPartitions(PGconn *con);
 
 /* callback used to build rows for COPY during data loading */
 typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
@@ -856,6 +859,19 @@ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
 static const PsqlScanCallbacks pgbench_callbacks = {
 	NULL,						/* don't need get_variable functionality */
 };
+/* Worker thread data for parallel table loading */
+typedef struct WorkerTaskDescription
+{
+	PGconn	   *con;
+	const char *table;
+	int64		start_row;
+	int64		end_row;
+	initRowMethod append_row;
+	int			worker_id;
+	int			part_start;
+	int			part_end;
+	int64		part_size;
+} WorkerTaskDescription;
 
 static char
 get_table_relkind(PGconn *con, const char *table)
@@ -1631,6 +1647,277 @@ doConnect(void)
 	return conn;
 }
 
+/*
+ * Truncate specified table(s)
+ * tableName can be a single table or comma-separated list of tables
+ */
+static void
+truncateTable(PGconn *con, const char *tableName)
+{
+	PQExpBufferData query;
+
+	initPQExpBuffer(&query);
+	printfPQExpBuffer(&query, "TRUNCATE TABLE %s", tableName);
+	executeStatement(con, query.data);
+	termPQExpBuffer(&query);
+}
+
+/*
+ * Parameters needed for COPY operations.
+ */
+typedef struct CopyTarget
+{
+	const char *table_name;
+	int64		start_row;
+	int64		end_row;
+	bool		use_freeze;
+} CopyTarget;
+
+/*
+ * Perform COPY operation for a single table or partition.
+ * Batches rows into larger buffers before sending to reduce overhead.
+ */
+static void
+performCopy(PGconn *conn, WorkerTaskDescription *wtd, CopyTarget *target)
+{
+	PGresult   *res;
+	char		copy_statement[NAMEDATALEN + 32];
+	int64		row;
+	PQExpBufferData batch_buffer;
+
+	/* Build the COPY command */
+	if (target->use_freeze)
+		snprintf(copy_statement, sizeof(copy_statement),
+				 "COPY %s FROM STDIN (FREEZE ON)", target->table_name);
+	else
+		snprintf(copy_statement, sizeof(copy_statement),
+				 "COPY %s FROM STDIN", target->table_name);
+
+	/* Initiate COPY mode */
+	res = PQexec(conn, copy_statement);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pg_fatal("COPY command failed for table \"%s\": %s",
+				 target->table_name, PQerrorMessage(conn));
+	PQclear(res);
+
+	/* Pre-allocate buffer to avoid repeated reallocs */
+	initPQExpBuffer(&batch_buffer);
+	enlargePQExpBuffer(&batch_buffer, COPY_BATCH_SIZE);
+
+	/* Generate and send rows in batches using append_row */
+	for (row = target->start_row; row < target->end_row; row++)
+	{
+		/* Use append_row to accumulate multiple rows in the buffer */
+		wtd->append_row(&batch_buffer, row);
+
+		/* Send batch when buffer reaches size threshold */
+		if (batch_buffer.len >= COPY_BATCH_SIZE)
+		{
+			if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0)
+				pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn));
+
+			resetPQExpBuffer(&batch_buffer);
+		}
+	}
+
+	/* Send any remaining buffered data */
+	if (batch_buffer.len > 0)
+	{
+		if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0)
+			pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn));
+	}
+
+	/* Finalize the COPY operation */
+	if (PQputCopyEnd(conn, NULL) <= 0)
+		pg_fatal("error in PQputCopyEnd: %s", PQerrorMessage(conn));
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("COPY failed for table \"%s\": %s",
+				 target->table_name, PQerrorMessage(conn));
+	PQclear(res);
+
+	termPQExpBuffer(&batch_buffer);
+}
+
+
+static void
+assignWorkerRows(WorkerTaskDescription *wtd, int num_workers, int64 total_rows)
+{
+	int64		rows_per_worker = total_rows / num_workers;
+
+	wtd->start_row = wtd->worker_id * rows_per_worker;
+	wtd->end_row = (wtd->worker_id == num_workers - 1) ?
+		total_rows :
+		(wtd->worker_id + 1) * rows_per_worker;
+}
+
+/*
+ * Covers only multiple partitions per worker (workers <= partitions) for now.
+ * Each worker loads complete partitions independently and can use COPY FREEZE.
+ */
+static void
+assignWorkerPartitions(WorkerTaskDescription *wtd, int num_workers, int64 total_rows,
+					   int num_parts)
+{
+	int			parts_per_worker = num_parts / num_workers;
+	int			extra_parts = num_parts % num_workers;
+
+	wtd->part_start = wtd->worker_id * parts_per_worker + 1 +
+		(wtd->worker_id < extra_parts ? wtd->worker_id : extra_parts);
+	wtd->part_end = wtd->part_start + parts_per_worker - 1 +
+		(wtd->worker_id < extra_parts ? 1 : 0);
+
+	wtd->start_row = (wtd->part_start - 1) * wtd->part_size;
+	wtd->end_row = (wtd->part_end == num_parts) ?
+		total_rows :
+		wtd->part_end * wtd->part_size;
+}
+
+
+/* Load data into partitioned table */
+static void
+loadPartitionedTable(PGconn *conn, WorkerTaskDescription *wtd)
+{
+	int			p;
+
+	for (p = wtd->part_start; p <= wtd->part_end; p++)
+	{
+		CopyTarget	target;
+		int64		part_start_row = (p - 1) * wtd->part_size;
+		int64		part_end_row = (p == partitions) ? (naccounts * (int64) scale) : (p * wtd->part_size);
+		char		partition_table[NAMEDATALEN];
+
+		snprintf(partition_table, sizeof(partition_table), "pgbench_accounts_%d", p);
+
+		target.table_name = partition_table;
+		target.start_row = part_start_row;
+		target.end_row = part_end_row;
+		target.use_freeze = true;
+
+		performCopy(conn, wtd, &target);
+	}
+}
+
+/* Load data into non-partitioned table */
+static void
+loadRegularTable(PGconn *conn, WorkerTaskDescription *wtd)
+{
+	CopyTarget	target;
+
+	target.table_name = wtd->table;
+	target.start_row = wtd->start_row;
+	target.end_row = wtd->end_row;
+	target.use_freeze = (wtd->worker_id == 0);
+
+	performCopy(conn, wtd, &target);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initWorkerThread(void *arg)
+{
+	WorkerTaskDescription		*wtd = (WorkerTaskDescription *) arg;
+	PGconn			*conn;
+
+	/* Connection is pre-created, just use it */
+	conn = wtd->con;
+
+	/*
+	 * Start a new transaction for this worker, except for worker 0 on
+	 * non-partitioned tables. Worker 0 continues the transaction from the
+	 * main thread that already did the truncate (to enable COPY FREEZE).
+	 */
+	if (wtd->part_start > 0 || wtd->worker_id > 0)
+		executeStatement(conn, "begin");
+
+	if (wtd->part_start > 0)
+	{
+		createPartitions(conn, wtd->part_start, wtd->part_end);
+		loadPartitionedTable(conn, wtd);
+	}
+	else
+		loadRegularTable(conn, wtd);
+
+	executeStatement(conn, "commit");
+
+	return NULL;
+}
+
+static void
+initPopulateTableParallel(PGconn *connection, int num_workers,
+						  const char *table, int64 total_rows,
+						  initRowMethod append_row)
+{
+	THREAD_T   *worker_threads;
+	WorkerTaskDescription *worker_data;
+	PGconn	  **connections;
+	bool		is_partitioned;
+	int			i;
+
+	/* Allocate worker data and threads */
+	worker_threads = pg_malloc(num_workers * sizeof(pthread_t));
+	worker_data = pg_malloc0(num_workers * sizeof(WorkerTaskDescription));
+	connections = pg_malloc(num_workers * sizeof(PGconn *));
+
+	/* Reuse main connection for worker 0, create new ones for others */
+	connections[0] = connection;
+	for (i = 1; i < num_workers; i++)
+		connections[i] = doConnect();
+
+	/* Works only for pgbench_accounts and the range partitioning option */
+	is_partitioned = strcmp(table, "pgbench_accounts") == 0 && partition_method == PART_RANGE;
+
+	/* For partitioned tables, we handle only num_workers <= partitions for now */
+	if (is_partitioned && num_workers > partitions)
+		pg_fatal("number of threads (%d) must not exceed the number of partitions (%d)",
+				 num_workers, partitions);
+
+	executeStatement(connections[0], "begin");
+	truncateTable(connections[0], table);
+
+	if (is_partitioned)
+	{
+		executeStatement(connections[0], "commit");
+	}
+
+	/* Create and start worker threads */
+	for (i = 0; i < num_workers; i++)
+	{
+		worker_data[i].con = connections[i];
+		worker_data[i].table = table;
+		worker_data[i].append_row = append_row;
+		worker_data[i].worker_id = i;
+
+		if (!is_partitioned)
+			assignWorkerRows(&worker_data[i], num_workers, total_rows);
+		else
+		{
+			worker_data[i].part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
+			assignWorkerPartitions(&worker_data[i], num_workers, total_rows,
+								   partitions);
+		}
+
+		THREAD_CREATE(&worker_threads[i], initWorkerThread, &worker_data[i]);
+	}
+
+	for (i = 0; i < num_workers; i++)
+		THREAD_JOIN(worker_threads[i]);
+
+	/*
+	 * For partitioned tables, attach all partitions now that data is loaded.
+	 */
+	if (is_partitioned)
+		attachPartitions(connection);
+
+	/* Clean up worker connections (skip index 0, which is the main connection) */
+	for (i = 1; i < num_workers; i++)
+		PQfinish(connections[i]);
+
+	free(connections);
+	free(worker_threads);
+	free(worker_data);
+}
+
 /* qsort comparator for Variable array */
 static int
 compareVariableNames(const void *v1, const void *v2)
@@ -4869,14 +5156,58 @@ initDropTables(PGconn *con)
  * with a known size, so we choose to partition it.
  */
 static void
-createPartitions(PGconn *con)
+createPartitions(PGconn *con, int part_start, int part_end)
 {
 	PQExpBufferData query;
 
 	/* we must have to create some partitions */
 	Assert(partitions > 0);
 
-	fprintf(stderr, "creating %d partitions...\n", partitions);
+	/* If called with -1, create all partitions */
+	if (part_start == -1)
+	{
+		part_start = 1;
+		part_end = partitions;
+		fprintf(stderr, "creating %d partitions...\n", partitions);
+	}
+
+	initPQExpBuffer(&query);
+
+	for (int p = part_start; p <= part_end; p++)
+	{
+		/*
+		 * Create standalone tables (not attached to parent yet).
+		 * This avoids AccessExclusiveLock on the parent table, allowing
+		 * parallel creation. Tables will be attached after data loading.
+		 */
+		printfPQExpBuffer(&query,
+						  "create%s table pgbench_accounts_%d\n"
+						  "  (aid int not null,\n"
+						  "   bid int,\n"
+						  "   abalance int,\n"
+						  "   filler character(84))\n"
+						  "  with (fillfactor=%d)",
+						  unlogged_tables ? " unlogged" : "", p,
+						  fillfactor);
+
+		executeStatement(con, query.data);
+	}
+
+	termPQExpBuffer(&query);
+}
+
+/*
+ * Attach standalone partition tables to the parent table.
+ * Called after all data has been loaded in parallel.
+ */
+static void
+attachPartitions(PGconn *con)
+{
+	PQExpBufferData query;
+
+	Assert(partitions > 0);
+
+	fprintf(stderr, "attaching %d partitions...\n", partitions);
 
 	initPQExpBuffer(&query);
 
@@ -4884,13 +5215,12 @@ createPartitions(PGconn *con)
 	{
 		if (partition_method == PART_RANGE)
 		{
-			int64		part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
+			int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
 
 			printfPQExpBuffer(&query,
-							  "create%s table pgbench_accounts_%d\n"
-							  "  partition of pgbench_accounts\n"
-							  "  for values from (",
-							  unlogged_tables ? " unlogged" : "", p);
+							  "alter table pgbench_accounts\n"
+							  "  attach partition pgbench_accounts_%d\n"
+							  "  for values from (", p);
 
 			/*
 			 * For RANGE, we use open-ended partitions at the beginning and
@@ -4913,21 +5243,16 @@ createPartitions(PGconn *con)
 			appendPQExpBufferChar(&query, ')');
 		}
 		else if (partition_method == PART_HASH)
+		{
 			printfPQExpBuffer(&query,
-							  "create%s table pgbench_accounts_%d\n"
-							  "  partition of pgbench_accounts\n"
+							  "alter table pgbench_accounts\n"
+							  "  attach partition pgbench_accounts_%d\n"
 							  "  for values with (modulus %d, remainder %d)",
-							  unlogged_tables ? " unlogged" : "", p,
-							  partitions, p - 1);
+							  p, partitions, p - 1);
+		}
 		else					/* cannot get there */
 			Assert(0);
 
-		/*
-		 * Per ddlinfo in initCreateTables, fillfactor is needed on table
-		 * pgbench_accounts.
-		 */
-		appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
-
 		executeStatement(con, query.data);
 	}
 
@@ -5025,8 +5350,8 @@ initCreateTables(PGconn *con)
 
 	termPQExpBuffer(&query);
 
-	if (partition_method != PART_NONE)
-		createPartitions(con);
+	if (partition_method != PART_NONE && (nthreads == 1 || partition_method == PART_HASH))
+		createPartitions(con, -1, -1);
 }
 
 /*
@@ -5035,11 +5360,7 @@ initCreateTables(PGconn *con)
 static void
 initTruncateTables(PGconn *con)
 {
-	executeStatement(con, "truncate table "
-					 "pgbench_accounts, "
-					 "pgbench_branches, "
-					 "pgbench_history, "
-					 "pgbench_tellers");
+	truncateTable(con, "pgbench_accounts, pgbench_branches, pgbench_history, pgbench_tellers");
 }
 
 static void
@@ -5069,8 +5390,40 @@ initAccount(PQExpBufferData *sql, int64 curr)
 					  curr + 1, curr / naccounts + 1);
 }
 
+/*
+ * Append-based versions to enable batching.
+ * These use appendPQExpBuffer instead of printfPQExpBuffer to allow
+ * multiple rows to be accumulated in a single buffer.
+ */
+static void
+appendBranch(PQExpBufferData *sql, int64 curr)
+{
+	/* "filler" column uses NULL */
+	appendPQExpBuffer(sql,
+					  INT64_FORMAT "\t0\t\\N\n",
+					  curr + 1);
+}
+
+static void
+appendTeller(PQExpBufferData *sql, int64 curr)
+{
+	/* "filler" column uses NULL */
+	appendPQExpBuffer(sql,
+					  INT64_FORMAT "\t" INT64_FORMAT "\t0\t\\N\n",
+					  curr + 1, curr / ntellers + 1);
+}
+
 static void
-initPopulateTable(PGconn *con, const char *table, int64 base,
+appendAccount(PQExpBufferData *sql, int64 curr)
+{
+	/* "filler" column defaults to blank padded empty string */
+	appendPQExpBuffer(sql,
+					  INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
+					  curr + 1, curr / naccounts + 1);
+}
+
+static void
+initPopulateTableSerial(PGconn *con, const char *table, int64 base,
 				  initRowMethod init_row)
 {
 	int			n;
@@ -5079,7 +5432,7 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
 	int			prev_chars = 0;
 	PGresult   *res;
 	PQExpBufferData sql;
-	char		copy_statement[256];
+	char		copy_statement[NAMEDATALEN + 32];
 	const char *copy_statement_fmt = "copy %s from stdin";
 	int64		total = base * scale;
 
@@ -5188,6 +5541,27 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
 	termPQExpBuffer(&sql);
 }
 
+static void
+initPopulateTable(PGconn *con, const char *table, int64 total_rows,
+				  initRowMethod init_row, initRowMethod append_row, bool use_parallel)
+{
+	bool		is_accounts = (strcmp(table, "pgbench_accounts") == 0);
+
+	if (use_parallel && nthreads > 1)
+		initPopulateTableParallel(con, nthreads, table, total_rows * scale, append_row);
+	else
+	{
+		/*
+		 * For single-threaded mode with partitioned tables, attach partitions
+		 * before loading data so COPY to the parent table can route rows.
+		 */
+		if (is_accounts && partitions > 0 && partition_method != PART_NONE)
+			attachPartitions(con);
+
+		initPopulateTableSerial(con, table, total_rows, init_row);
+	}
+}
+
 /*
  * Fill the standard tables with some data generated and sent from the client.
  *
@@ -5200,8 +5574,9 @@ initGenerateDataClientSide(PGconn *con)
 	fprintf(stderr, "generating data (client-side)...\n");
 
 	/*
-	 * we do all of this in one transaction to enable the backend's
-	 * data-loading optimizations
+	 * For single-threaded mode, do everything in one transaction.
+	 * For multi-threaded mode, do branches/tellers/history in one transaction,
+	 * then accounts in parallel (each thread handles its own transaction).
 	 */
 	executeStatement(con, "begin");
 
@@ -5212,11 +5587,16 @@ initGenerateDataClientSide(PGconn *con)
 	 * fill branches, tellers, accounts in that order in case foreign keys
 	 * already exist
 	 */
-	initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
-	initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
-	initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+	initPopulateTable(con, "pgbench_branches", nbranches, initBranch, appendBranch, false);
+	initPopulateTable(con, "pgbench_tellers", ntellers, initTeller, appendTeller, false);
 
-	executeStatement(con, "commit");
+	if (nthreads > 1)
+		executeStatement(con, "commit");
+
+	initPopulateTable(con, "pgbench_accounts", naccounts, initAccount, appendAccount, nthreads > 1);
+
+	if (nthreads == 1)
+		executeStatement(con, "commit");
 }
 
 /*
@@ -5242,6 +5622,9 @@ initGenerateDataServerSide(PGconn *con)
 	/* truncate away any old data */
 	initTruncateTables(con);
 
+	if (partitions > 0 && partition_method != PART_NONE)
+		attachPartitions(con);
+
 	initPQExpBuffer(&sql);
 
 	printfPQExpBuffer(&sql,
@@ -6989,7 +7372,6 @@ main(int argc, char **argv)
 				initialization_option_set = true;
 				break;
 			case 'j':			/* jobs */
-				benchmarking_option_set = true;
 				if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX,
 									  &nthreads))
 				{
@@ -7221,7 +7603,7 @@ main(int argc, char **argv)
 	 * optimization; throttle_delay is calculated incorrectly below if some
 	 * threads have no clients assigned to them.)
 	 */
-	if (nthreads > nclients)
+	if (nthreads > nclients && !is_init_mode)
 		nthreads = nclients;
 
 	/*
@@ -7266,6 +7648,9 @@ main(int argc, char **argv)
 		if (partitions > 0 && partition_method == PART_NONE)
 			partition_method = PART_RANGE;
 
+		if (partition_method == PART_HASH && nthreads > 1)
+			pg_fatal("parallel data loading (-j) is not supported with hash partitioning");
+
 		if (initialize_steps == NULL)
 			initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
 
-- 
2.39.5 (Apple Git-154)