[PATCH] Add --jobs-per-disk option to allow multiple processes per tablespace
Jaime Casanova <jcasanov@systemguards.com.ec>
From: Jaime Casanova <jcasanov@systemguards.com.ec>
To:
Date: 2021-12-15T17:14:44Z
Lists: pgsql-hackers
This option is independent of the --jobs one. It's will fork new processes
to copy the different segments of a relfilenode in parallel.
---
src/bin/pg_upgrade/option.c | 8 ++-
src/bin/pg_upgrade/parallel.c | 93 ++++++++++++++++++++++++++++++++
src/bin/pg_upgrade/pg_upgrade.h | 4 ++
src/bin/pg_upgrade/relfilenode.c | 59 +++++++++++---------
4 files changed, 139 insertions(+), 25 deletions(-)
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 66fe16964e..46b1913a42 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -54,6 +54,7 @@ parseCommandLine(int argc, char *argv[])
{"link", no_argument, NULL, 'k'},
{"retain", no_argument, NULL, 'r'},
{"jobs", required_argument, NULL, 'j'},
+ {"jobs-per-disks", required_argument, NULL, 'J'},
{"socketdir", required_argument, NULL, 's'},
{"verbose", no_argument, NULL, 'v'},
{"clone", no_argument, NULL, 1},
@@ -103,7 +104,7 @@ parseCommandLine(int argc, char *argv[])
if (os_user_effective_id == 0)
pg_fatal("%s: cannot be run as root\n", os_info.progname);
- while ((option = getopt_long(argc, argv, "d:D:b:B:cj:kNo:O:p:P:rs:U:v",
+ while ((option = getopt_long(argc, argv, "d:D:b:B:cj:J:kNo:O:p:P:rs:U:v",
long_options, &optindex)) != -1)
{
switch (option)
@@ -132,6 +133,10 @@ parseCommandLine(int argc, char *argv[])
user_opts.jobs = atoi(optarg);
break;
+ case 'J':
+ user_opts.jobs_per_disk = atoi(optarg);
+ break;
+
case 'k':
user_opts.transfer_mode = TRANSFER_MODE_LINK;
break;
@@ -291,6 +296,7 @@ usage(void)
printf(_(" -d, --old-datadir=DATADIR old cluster data directory\n"));
printf(_(" -D, --new-datadir=DATADIR new cluster data directory\n"));
printf(_(" -j, --jobs=NUM number of simultaneous processes or threads to use\n"));
+ printf(_(" -J, --jobs_per_disk=NUM number of simultaneous processes or threads to use per tablespace\n"));
printf(_(" -k, --link link instead of copying files to new cluster\n"));
printf(_(" -N, --no-sync do not wait for changes to be written safely to disk\n"));
printf(_(" -o, --old-options=OPTIONS old cluster options to pass to the server\n"));
diff --git a/src/bin/pg_upgrade/parallel.c b/src/bin/pg_upgrade/parallel.c
index ee7364da3b..82f698a9ab 100644
--- a/src/bin/pg_upgrade/parallel.c
+++ b/src/bin/pg_upgrade/parallel.c
@@ -17,6 +17,9 @@
#include "pg_upgrade.h"
static int parallel_jobs;
+static int current_jobs = 0;
+
+static bool reap_subchild(bool wait_for_child);
#ifdef WIN32
/*
@@ -277,6 +280,60 @@ win32_transfer_all_new_dbs(transfer_thread_arg *args)
#endif
+
+/*
+ * parallel_process_relfile_segment()
+ *
+ * Copy or link file from old cluster to new one. If vm_must_add_frozenbit
+ * is true, visibility map forks are converted and rewritten, even in link
+ * mode.
+ */
+void
+parallel_process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file)
+{
+#ifndef WIN32
+ pid_t child;
+#else
+ HANDLE child;
+ transfer_thread_arg *new_arg;
+#endif
+ if (user_opts.jobs <= 1 || user_opts.jobs_per_disk <= 1)
+ process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
+ else
+ {
+ /* parallel */
+
+ /* harvest any dead children */
+ while (reap_subchild(false) == true)
+ ;
+
+ /* must we wait for a dead child? use a maximum of 3 childs per tablespace */
+ if (current_jobs >= user_opts.jobs_per_disk)
+ reap_subchild(true);
+
+ /* set this before we start the job */
+ current_jobs++;
+
+ /* Ensure stdio state is quiesced before forking */
+ fflush(NULL);
+
+#ifndef WIN32
+ child = fork();
+ if (child == 0)
+ {
+ process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
+ /* use _exit to skip atexit() functions */
+ _exit(0);
+ }
+ else if (child < 0)
+ /* fork failed */
+ pg_fatal("could not create worker process: %s\n", strerror(errno));
+#endif
+ }
+}
+
+
+
/*
* collect status from a completed worker child
*/
@@ -345,3 +402,39 @@ reap_child(bool wait_for_child)
return true;
}
+
+
+
+
+/*
+ * collect status from a completed worker subchild
+ */
+static bool
+reap_subchild(bool wait_for_child)
+{
+#ifndef WIN32
+ int work_status;
+ pid_t child;
+#else
+ int thread_num;
+ DWORD res;
+#endif
+
+ if (user_opts.jobs <= 1 || current_jobs == 0)
+ return false;
+
+#ifndef WIN32
+ child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
+ if (child == (pid_t) -1)
+ pg_fatal("waitpid() failed: %s\n", strerror(errno));
+ if (child == 0)
+ return false; /* no children, or no dead children */
+ if (work_status != 0)
+ pg_fatal("child process exited abnormally: status %d\n", work_status);
+#endif
+
+ /* do this after job has been removed */
+ current_jobs--;
+
+ return true;
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 22169f1002..adcb24ffea 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -282,6 +282,7 @@ typedef struct
bool do_sync; /* flush changes to disk */
transferMode transfer_mode; /* copy files or link them? */
int jobs; /* number of processes/threads to use */
+ int jobs_per_disk; /* number of processes/threads to use */
char *socketdir; /* directory to use for Unix sockets */
} UserOpts;
@@ -450,4 +451,7 @@ void parallel_exec_prog(const char *log_file, const char *opt_log_file,
void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
char *old_pgdata, char *new_pgdata,
char *old_tablespace);
+
+void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
+void parallel_process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
bool reap_child(bool wait_for_child);
diff --git a/src/bin/pg_upgrade/relfilenode.c b/src/bin/pg_upgrade/relfilenode.c
index 5dbefbceaf..8a7c49efaa 100644
--- a/src/bin/pg_upgrade/relfilenode.c
+++ b/src/bin/pg_upgrade/relfilenode.c
@@ -17,6 +17,7 @@
static void transfer_single_new_db(FileNameMap *maps, int size, char *old_tablespace);
static void transfer_relfile(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit);
+void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
/*
@@ -232,30 +233,40 @@ transfer_relfile(FileNameMap *map, const char *type_suffix, bool vm_must_add_fro
/* Copying files might take some time, so give feedback. */
pg_log(PG_STATUS, "%s", old_file);
- if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0)
+ parallel_process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
+ }
+}
+
+
+
+void
+process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file)
+{
+
+ if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0)
+ {
+ /* Need to rewrite visibility map format */
+ pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n",
+ old_file, new_file);
+ rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname);
+ }
+ else
+ switch (user_opts.transfer_mode)
{
- /* Need to rewrite visibility map format */
- pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n",
- old_file, new_file);
- rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname);
+ case TRANSFER_MODE_CLONE:
+ pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n",
+ old_file, new_file);
+ cloneFile(old_file, new_file, map->nspname, map->relname);
+ break;
+ case TRANSFER_MODE_COPY:
+ pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n",
+ old_file, new_file);
+ copyFile(old_file, new_file, map->nspname, map->relname);
+ break;
+ case TRANSFER_MODE_LINK:
+ pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n",
+ old_file, new_file);
+ linkFile(old_file, new_file, map->nspname, map->relname);
+ break;
}
- else
- switch (user_opts.transfer_mode)
- {
- case TRANSFER_MODE_CLONE:
- pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n",
- old_file, new_file);
- cloneFile(old_file, new_file, map->nspname, map->relname);
- break;
- case TRANSFER_MODE_COPY:
- pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n",
- old_file, new_file);
- copyFile(old_file, new_file, map->nspname, map->relname);
- break;
- case TRANSFER_MODE_LINK:
- pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n",
- old_file, new_file);
- linkFile(old_file, new_file, map->nspname, map->relname);
- }
- }
}
--
2.20.1
--/LRcthy5hmXY96z2--