v1-0001-parallel-pgbench-wip.patch
text/plain
Filename: v1-0001-parallel-pgbench-wip.patch
Type: text/plain
Part: 0
Message:
parallel data loading for pgbench -i
From 18d91ec9c22d43522dc1cd83c16359c36b3dc58d Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <cadariu.mircea@gmail.com>
Date: Sun, 9 Nov 2025 10:41:51 +0000
Subject: [PATCH v1] wip
---
src/bin/pgbench/pgbench.c | 455 +++++++++++++++++++++++++++++++++++---
1 file changed, 420 insertions(+), 35 deletions(-)
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index a425176ecd..ef4e05678a 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -78,6 +78,8 @@
#define ERRCODE_T_R_DEADLOCK_DETECTED "40P01"
#define ERRCODE_UNDEFINED_TABLE "42P01"
+#define COPY_BATCH_SIZE (1024 * 1024)
+
/*
* Hashing constants
*/
@@ -825,7 +827,6 @@ static const BuiltinScript builtin_script[] =
}
};
-
/* Function prototypes */
static void setNullValue(PgBenchValue *pv);
static void setBoolValue(PgBenchValue *pv, bool bval);
@@ -848,6 +849,8 @@ static void clear_socket_set(socket_set *sa);
static void add_socket_to_set(socket_set *sa, int fd, int idx);
static int wait_on_socket_set(socket_set *sa, int64 usecs);
static bool socket_has_input(socket_set *sa, int fd, int idx);
+static void createPartitions(PGconn *con, int part_start, int part_end);
+static void attachPartitions(PGconn *con);
/* callback used to build rows for COPY during data loading */
typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
@@ -856,6 +859,19 @@ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
static const PsqlScanCallbacks pgbench_callbacks = {
NULL, /* don't need get_variable functionality */
};
+/* Worker thread data for parallel table loading */
+typedef struct WorkerTaskDescription
+{
+ PGconn *con;
+ const char *table;
+ int64 start_row;
+ int64 end_row;
+ initRowMethod append_row;
+ int worker_id;
+ int part_start;
+ int part_end;
+ int64 part_size;
+} WorkerTaskDescription;
static char
get_table_relkind(PGconn *con, const char *table)
@@ -1631,6 +1647,277 @@ doConnect(void)
return conn;
}
+/*
+ * Truncate specified table(s)
+ * tableName can be a single table or comma-separated list of tables
+ */
+static void
+truncateTable(PGconn *con, const char *tableName)
+{
+ PQExpBufferData query;
+
+ initPQExpBuffer(&query);
+ printfPQExpBuffer(&query, "TRUNCATE TABLE %s", tableName);
+ executeStatement(con, query.data);
+ termPQExpBuffer(&query);
+}
+
+/*
+ * Parameters needed for COPY operations.
+ */
+typedef struct CopyTarget
+{
+ const char *table_name;
+ int64 start_row;
+ int64 end_row;
+ bool use_freeze;
+} CopyTarget;
+
+/*
+ * Perform COPY operation for a single table or partition.
+ * Batches rows into larger buffers before sending to reduce overhead.
+ */
+static void
+performCopy(PGconn *conn, WorkerTaskDescription *wtd, CopyTarget *target)
+{
+ PGresult *res;
+ char copy_statement[NAMEDATALEN + 32];
+ int64 row;
+ PQExpBufferData batch_buffer;
+
+ /* Build the COPY command */
+ if (target->use_freeze)
+ snprintf(copy_statement, sizeof(copy_statement),
+ "COPY %s FROM STDIN (FREEZE ON)", target->table_name);
+ else
+ snprintf(copy_statement, sizeof(copy_statement),
+ "COPY %s FROM STDIN", target->table_name);
+
+ /* Initiate COPY mode */
+ res = PQexec(conn, copy_statement);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pg_fatal("COPY command failed for table \"%s\": %s",
+ target->table_name, PQerrorMessage(conn));
+ PQclear(res);
+
+ /* Pre-allocate buffer to avoid repeated reallocs */
+ initPQExpBuffer(&batch_buffer);
+ enlargePQExpBuffer(&batch_buffer, COPY_BATCH_SIZE);
+
+ /* Generate and send rows in batches using append_row */
+ for (row = target->start_row; row < target->end_row; row++)
+ {
+ /* Use append_row to accumulate multiple rows in the buffer */
+ wtd->append_row(&batch_buffer, row);
+
+ /* Send batch when buffer reaches size threshold */
+ if (batch_buffer.len >= COPY_BATCH_SIZE)
+ {
+ if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0)
+ pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn));
+
+ resetPQExpBuffer(&batch_buffer);
+ }
+ }
+
+ /* Send any remaining buffered data */
+ if (batch_buffer.len > 0)
+ {
+ if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 0)
+ pg_fatal("error in PQputCopyData: %s", PQerrorMessage(conn));
+ }
+
+ /* Finalize the COPY operation */
+ if (PQputCopyEnd(conn, NULL) <= 0)
+ pg_fatal("error in PQputCopyEnd: %s", PQerrorMessage(conn));
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("COPY failed for table \"%s\": %s",
+ target->table_name, PQerrorMessage(conn));
+ PQclear(res);
+
+ termPQExpBuffer(&batch_buffer);
+}
+
+
+static void
+assignWorkerRows(WorkerTaskDescription *wtd, int num_workers, int64 total_rows)
+{
+ int64 rows_per_worker = total_rows / num_workers;
+
+ wtd->start_row = wtd->worker_id * rows_per_worker;
+ wtd->end_row = (wtd->worker_id == num_workers - 1) ?
+ total_rows :
+ (wtd->worker_id + 1) * rows_per_worker;
+}
+
+/*
+ * Covers only multiple partitions per worker (workers <= partitions) for now.
+ * Each worker loads complete partitions independently and can use COPY FREEZE.
+ */
+static void
+assignWorkerPartitions(WorkerTaskDescription *wtd, int num_workers, int64 total_rows,
+ int num_parts)
+{
+ int parts_per_worker = num_parts / num_workers;
+ int extra_parts = num_parts % num_workers;
+
+ wtd->part_start = wtd->worker_id * parts_per_worker + 1 +
+ (wtd->worker_id < extra_parts ? wtd->worker_id : extra_parts);
+ wtd->part_end = wtd->part_start + parts_per_worker - 1 +
+ (wtd->worker_id < extra_parts ? 1 : 0);
+
+ wtd->start_row = (wtd->part_start - 1) * wtd->part_size;
+ wtd->end_row = (wtd->part_end == num_parts) ?
+ total_rows :
+ wtd->part_end * wtd->part_size;
+}
+
+
+/* Load data into partitioned table */
+static void
+loadPartitionedTable(PGconn *conn, WorkerTaskDescription *wtd)
+{
+ int p;
+
+ for (p = wtd->part_start; p <= wtd->part_end; p++)
+ {
+ CopyTarget target;
+ int64 part_start_row = (p - 1) * wtd->part_size;
+ int64 part_end_row = (p == partitions) ? (naccounts * (int64) scale) : (p * wtd->part_size);
+ char partition_table[NAMEDATALEN];
+
+ snprintf(partition_table, sizeof(partition_table), "pgbench_accounts_%d", p);
+
+ target.table_name = partition_table;
+ target.start_row = part_start_row;
+ target.end_row = part_end_row;
+ target.use_freeze = true;
+
+ performCopy(conn, wtd, &target);
+ }
+}
+
+/* Load data into non-partitioned table */
+static void
+loadRegularTable(PGconn *conn, WorkerTaskDescription *wtd)
+{
+ CopyTarget target;
+
+ target.table_name = wtd->table;
+ target.start_row = wtd->start_row;
+ target.end_row = wtd->end_row;
+ target.use_freeze = (wtd->worker_id == 0);
+
+ performCopy(conn, wtd, &target);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initWorkerThread(void *arg)
+{
+ WorkerTaskDescription *wtd = (WorkerTaskDescription *) arg;
+ PGconn *conn;
+
+ /* Connection is pre-created, just use it */
+ conn = wtd->con;
+
+ /*
+ * Start a new transaction for this worker, except for worker 0 on
+ * non-partitioned tables. Worker 0 continues the transaction from the
+ * main thread that already did the truncate (to enable COPY FREEZE).
+ */
+ if (wtd->part_start > 0 || wtd->worker_id > 0)
+ executeStatement(conn, "begin");
+
+ if (wtd->part_start > 0)
+ {
+ createPartitions(conn, wtd->part_start, wtd->part_end);
+ loadPartitionedTable(conn, wtd);
+ }
+ else
+ loadRegularTable(conn, wtd);
+
+ executeStatement(conn, "commit");
+
+ return NULL;
+}
+
+static void
+initPopulateTableParallel(PGconn *connection, int num_workers,
+ const char *table, int64 total_rows,
+ initRowMethod append_row)
+{
+ THREAD_T *worker_threads;
+ WorkerTaskDescription *worker_data;
+ PGconn **connections;
+ bool is_partitioned;
+ int i;
+
+ /* Allocate worker data and threads */
+ worker_threads = pg_malloc(num_workers * sizeof(pthread_t));
+ worker_data = pg_malloc0(num_workers * sizeof(WorkerTaskDescription));
+ connections = pg_malloc(num_workers * sizeof(PGconn *));
+
+ /* Reuse main connection for worker 0, create new ones for others */
+ connections[0] = connection;
+ for (i = 1; i < num_workers; i++)
+ connections[i] = doConnect();
+
+ /* Works only for pgbench_accounts and the range partitioning option */
+ is_partitioned = strcmp(table, "pgbench_accounts") == 0 && partition_method == PART_RANGE;
+
+ /* For partitioned tables, we handle only num_workers <= partitions for now */
+ if (is_partitioned && num_workers > partitions)
+ pg_fatal("number of threads (%d) must not exceed the number of partitions (%d)",
+ num_workers, partitions);
+
+ executeStatement(connections[0], "begin");
+ truncateTable(connections[0], table);
+
+ if (is_partitioned)
+ {
+ executeStatement(connections[0], "commit");
+ }
+
+ /* Create and start worker threads */
+ for (i = 0; i < num_workers; i++)
+ {
+ worker_data[i].con = connections[i];
+ worker_data[i].table = table;
+ worker_data[i].append_row = append_row;
+ worker_data[i].worker_id = i;
+
+ if (!is_partitioned)
+ assignWorkerRows(&worker_data[i], num_workers, total_rows);
+ else
+ {
+ worker_data[i].part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
+ assignWorkerPartitions(&worker_data[i], num_workers, total_rows,
+ partitions);
+ }
+
+ THREAD_CREATE(&worker_threads[i], initWorkerThread, &worker_data[i]);
+ }
+
+ for (i = 0; i < num_workers; i++)
+ THREAD_JOIN(worker_threads[i]);
+
+ /*
+ * For partitioned tables, attach all partitions now that data is loaded.
+ */
+ if (is_partitioned)
+ attachPartitions(connection);
+
+ /* Clean up worker connections (skip index 0, which is the main connection) */
+ for (i = 1; i < num_workers; i++)
+ PQfinish(connections[i]);
+
+ free(connections);
+ free(worker_threads);
+ free(worker_data);
+}
+
/* qsort comparator for Variable array */
static int
compareVariableNames(const void *v1, const void *v2)
@@ -4869,14 +5156,58 @@ initDropTables(PGconn *con)
* with a known size, so we choose to partition it.
*/
static void
-createPartitions(PGconn *con)
+createPartitions(PGconn *con, int part_start, int part_end)
{
PQExpBufferData query;
/* we must have to create some partitions */
Assert(partitions > 0);
- fprintf(stderr, "creating %d partitions...\n", partitions);
+ /* If called with -1, create all partitions */
+ if (part_start == -1)
+ {
+ part_start = 1;
+ part_end = partitions;
+ fprintf(stderr, "creating %d partitions...\n", partitions);
+ }
+
+ initPQExpBuffer(&query);
+
+ for (int p = part_start; p <= part_end; p++)
+ {
+ /*
+ * Create standalone tables (not attached to parent yet).
+ * This avoids AccessExclusiveLock on the parent table, allowing
+ * parallel creation. Tables will be attached after data loading.
+ */
+ printfPQExpBuffer(&query,
+ "create%s table pgbench_accounts_%d\n"
+ " (aid int not null,\n"
+ " bid int,\n"
+ " abalance int,\n"
+ " filler character(84))\n"
+ " with (fillfactor=%d)",
+ unlogged_tables ? " unlogged" : "", p,
+ fillfactor);
+
+ executeStatement(con, query.data);
+ }
+
+ termPQExpBuffer(&query);
+}
+
+/*
+ * Attach standalone partition tables to the parent table.
+ * Called after all data has been loaded in parallel.
+ */
+static void
+attachPartitions(PGconn *con)
+{
+ PQExpBufferData query;
+
+ Assert(partitions > 0);
+
+ fprintf(stderr, "attaching %d partitions...\n", partitions);
initPQExpBuffer(&query);
@@ -4884,13 +5215,12 @@ createPartitions(PGconn *con)
{
if (partition_method == PART_RANGE)
{
- int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
+ int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
printfPQExpBuffer(&query,
- "create%s table pgbench_accounts_%d\n"
- " partition of pgbench_accounts\n"
- " for values from (",
- unlogged_tables ? " unlogged" : "", p);
+ "alter table pgbench_accounts\n"
+ " attach partition pgbench_accounts_%d\n"
+ " for values from (", p);
/*
* For RANGE, we use open-ended partitions at the beginning and
@@ -4913,21 +5243,16 @@ createPartitions(PGconn *con)
appendPQExpBufferChar(&query, ')');
}
else if (partition_method == PART_HASH)
+ {
printfPQExpBuffer(&query,
- "create%s table pgbench_accounts_%d\n"
- " partition of pgbench_accounts\n"
+ "alter table pgbench_accounts\n"
+ " attach partition pgbench_accounts_%d\n"
" for values with (modulus %d, remainder %d)",
- unlogged_tables ? " unlogged" : "", p,
- partitions, p - 1);
+ p, partitions, p - 1);
+ }
else /* cannot get there */
Assert(0);
- /*
- * Per ddlinfo in initCreateTables, fillfactor is needed on table
- * pgbench_accounts.
- */
- appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
-
executeStatement(con, query.data);
}
@@ -5025,8 +5350,8 @@ initCreateTables(PGconn *con)
termPQExpBuffer(&query);
- if (partition_method != PART_NONE)
- createPartitions(con);
+ if (partition_method != PART_NONE && (nthreads == 1 || partition_method == PART_HASH))
+ createPartitions(con, -1, -1);
}
/*
@@ -5035,11 +5360,7 @@ initCreateTables(PGconn *con)
static void
initTruncateTables(PGconn *con)
{
- executeStatement(con, "truncate table "
- "pgbench_accounts, "
- "pgbench_branches, "
- "pgbench_history, "
- "pgbench_tellers");
+ truncateTable(con, "pgbench_accounts, pgbench_branches, pgbench_history, pgbench_tellers");
}
static void
@@ -5069,8 +5390,40 @@ initAccount(PQExpBufferData *sql, int64 curr)
curr + 1, curr / naccounts + 1);
}
+/*
+ * Append-based versions to enable batching.
+ * These use appendPQExpBuffer instead of printfPQExpBuffer to allow
+ * multiple rows to be accumulated in a single buffer.
+ */
+static void
+appendBranch(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column uses NULL */
+ appendPQExpBuffer(sql,
+ INT64_FORMAT "\t0\t\\N\n",
+ curr + 1);
+}
+
+static void
+appendTeller(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column uses NULL */
+ appendPQExpBuffer(sql,
+ INT64_FORMAT "\t" INT64_FORMAT "\t0\t\\N\n",
+ curr + 1, curr / ntellers + 1);
+}
+
static void
-initPopulateTable(PGconn *con, const char *table, int64 base,
+appendAccount(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column defaults to blank padded empty string */
+ appendPQExpBuffer(sql,
+ INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
+ curr + 1, curr / naccounts + 1);
+}
+
+static void
+initPopulateTableSerial(PGconn *con, const char *table, int64 base,
initRowMethod init_row)
{
int n;
@@ -5079,7 +5432,7 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
int prev_chars = 0;
PGresult *res;
PQExpBufferData sql;
- char copy_statement[256];
+ char copy_statement[NAMEDATALEN + 32];
const char *copy_statement_fmt = "copy %s from stdin";
int64 total = base * scale;
@@ -5188,6 +5541,27 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
termPQExpBuffer(&sql);
}
+static void
+initPopulateTable(PGconn *con, const char *table, int64 total_rows,
+ initRowMethod init_row, initRowMethod append_row, bool use_parallel)
+{
+ bool is_accounts = (strcmp(table, "pgbench_accounts") == 0);
+
+ if (use_parallel && nthreads > 1)
+ initPopulateTableParallel(con, nthreads, table, total_rows * scale, append_row);
+ else
+ {
+ /*
+ * For single-threaded mode with partitioned tables, attach partitions
+ * before loading data so COPY to the parent table can route rows.
+ */
+ if (is_accounts && partitions > 0 && partition_method != PART_NONE)
+ attachPartitions(con);
+
+ initPopulateTableSerial(con, table, total_rows, init_row);
+ }
+}
+
/*
* Fill the standard tables with some data generated and sent from the client.
*
@@ -5200,8 +5574,9 @@ initGenerateDataClientSide(PGconn *con)
fprintf(stderr, "generating data (client-side)...\n");
/*
- * we do all of this in one transaction to enable the backend's
- * data-loading optimizations
+ * For single-threaded mode, do everything in one transaction.
+ * For multi-threaded mode, do branches/tellers/history in one transaction,
+ * then accounts in parallel (each thread handles its own transaction).
*/
executeStatement(con, "begin");
@@ -5212,11 +5587,16 @@ initGenerateDataClientSide(PGconn *con)
* fill branches, tellers, accounts in that order in case foreign keys
* already exist
*/
- initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
- initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
- initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+ initPopulateTable(con, "pgbench_branches", nbranches, initBranch, appendBranch, false);
+ initPopulateTable(con, "pgbench_tellers", ntellers, initTeller, appendTeller, false);
- executeStatement(con, "commit");
+ if (nthreads > 1)
+ executeStatement(con, "commit");
+
+ initPopulateTable(con, "pgbench_accounts", naccounts, initAccount, appendAccount, nthreads > 1);
+
+ if (nthreads == 1)
+ executeStatement(con, "commit");
}
/*
@@ -5242,6 +5622,9 @@ initGenerateDataServerSide(PGconn *con)
/* truncate away any old data */
initTruncateTables(con);
+ if (partitions > 0 && partition_method != PART_NONE)
+ attachPartitions(con);
+
initPQExpBuffer(&sql);
printfPQExpBuffer(&sql,
@@ -6989,7 +7372,6 @@ main(int argc, char **argv)
initialization_option_set = true;
break;
case 'j': /* jobs */
- benchmarking_option_set = true;
if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX,
&nthreads))
{
@@ -7221,7 +7603,7 @@ main(int argc, char **argv)
* optimization; throttle_delay is calculated incorrectly below if some
* threads have no clients assigned to them.)
*/
- if (nthreads > nclients)
+ if (nthreads > nclients && !is_init_mode)
nthreads = nclients;
/*
@@ -7266,6 +7648,9 @@ main(int argc, char **argv)
if (partitions > 0 && partition_method == PART_NONE)
partition_method = PART_RANGE;
+ if (partition_method == PART_HASH && nthreads > 1)
+ pg_fatal("parallel data loading (-j) is not supported with hash partitioning");
+
if (initialize_steps == NULL)
initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
--
2.39.5 (Apple Git-154)