Thread

  1. [PATCH] Add --jobs-per-disk option to allow multiple processes per tablespace

    Jaime Casanova <jcasanov@systemguards.com.ec> — 2021-12-15T17:14:44Z

    This option is independent of the --jobs one. It's will fork new processes
    to copy the different segments of a relfilenode in parallel.
    ---
     src/bin/pg_upgrade/option.c      |  8 ++-
     src/bin/pg_upgrade/parallel.c    | 93 ++++++++++++++++++++++++++++++++
     src/bin/pg_upgrade/pg_upgrade.h  |  4 ++
     src/bin/pg_upgrade/relfilenode.c | 59 +++++++++++---------
     4 files changed, 139 insertions(+), 25 deletions(-)
    
    diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
    index 66fe16964e..46b1913a42 100644
    --- a/src/bin/pg_upgrade/option.c
    +++ b/src/bin/pg_upgrade/option.c
    @@ -54,6 +54,7 @@ parseCommandLine(int argc, char *argv[])
     		{"link", no_argument, NULL, 'k'},
     		{"retain", no_argument, NULL, 'r'},
     		{"jobs", required_argument, NULL, 'j'},
    +		{"jobs-per-disks", required_argument, NULL, 'J'},
     		{"socketdir", required_argument, NULL, 's'},
     		{"verbose", no_argument, NULL, 'v'},
     		{"clone", no_argument, NULL, 1},
    @@ -103,7 +104,7 @@ parseCommandLine(int argc, char *argv[])
     	if (os_user_effective_id == 0)
     		pg_fatal("%s: cannot be run as root\n", os_info.progname);
     
    -	while ((option = getopt_long(argc, argv, "d:D:b:B:cj:kNo:O:p:P:rs:U:v",
    +	while ((option = getopt_long(argc, argv, "d:D:b:B:cj:J:kNo:O:p:P:rs:U:v",
     								 long_options, &optindex)) != -1)
     	{
     		switch (option)
    @@ -132,6 +133,10 @@ parseCommandLine(int argc, char *argv[])
     				user_opts.jobs = atoi(optarg);
     				break;
     
    +			case 'J':
    +				user_opts.jobs_per_disk = atoi(optarg);
    +				break;
    +
     			case 'k':
     				user_opts.transfer_mode = TRANSFER_MODE_LINK;
     				break;
    @@ -291,6 +296,7 @@ usage(void)
     	printf(_("  -d, --old-datadir=DATADIR     old cluster data directory\n"));
     	printf(_("  -D, --new-datadir=DATADIR     new cluster data directory\n"));
     	printf(_("  -j, --jobs=NUM                number of simultaneous processes or threads to use\n"));
    +	printf(_("  -J, --jobs_per_disk=NUM       number of simultaneous processes or threads to use per tablespace\n"));
     	printf(_("  -k, --link                    link instead of copying files to new cluster\n"));
     	printf(_("  -N, --no-sync                 do not wait for changes to be written safely to disk\n"));
     	printf(_("  -o, --old-options=OPTIONS     old cluster options to pass to the server\n"));
    diff --git a/src/bin/pg_upgrade/parallel.c b/src/bin/pg_upgrade/parallel.c
    index ee7364da3b..82f698a9ab 100644
    --- a/src/bin/pg_upgrade/parallel.c
    +++ b/src/bin/pg_upgrade/parallel.c
    @@ -17,6 +17,9 @@
     #include "pg_upgrade.h"
     
     static int	parallel_jobs;
    +static int	current_jobs = 0;
    +
    +static bool      reap_subchild(bool wait_for_child);
     
     #ifdef WIN32
     /*
    @@ -277,6 +280,60 @@ win32_transfer_all_new_dbs(transfer_thread_arg *args)
     #endif
     
     
    +
    +/*
    + * parallel_process_relfile_segment()
    + *
    + * Copy or link file from old cluster to new one.  If vm_must_add_frozenbit
    + * is true, visibility map forks are converted and rewritten, even in link
    + * mode.
    + */
    +void
    +parallel_process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file)
    +{
    +#ifndef WIN32
    +	pid_t		child;
    +#else
    +	HANDLE		child;
    +	transfer_thread_arg *new_arg;
    +#endif
    +	if (user_opts.jobs <= 1 || user_opts.jobs_per_disk <= 1)
    +		process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
    +	else
    +	{
    +		/* parallel */
    +
    +		/* harvest any dead children */
    +		while (reap_subchild(false) == true)
    +			;
    +
    +		/* must we wait for a dead child? use a maximum of 3 childs per tablespace */
    +		if (current_jobs >= user_opts.jobs_per_disk)
    +			reap_subchild(true);
    +
    +		/* set this before we start the job */
    +		current_jobs++;
    +
    +		/* Ensure stdio state is quiesced before forking */
    +		fflush(NULL);
    +
    +#ifndef WIN32
    +		child = fork();
    +		if (child == 0)
    +		{
    +			process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
    +			/* use _exit to skip atexit() functions */
    +			_exit(0);
    +		}
    +		else if (child < 0)
    +			/* fork failed */
    +			pg_fatal("could not create worker process: %s\n", strerror(errno));
    +#endif
    +	}
    +}
    +
    +
    +
     /*
      *	collect status from a completed worker child
      */
    @@ -345,3 +402,39 @@ reap_child(bool wait_for_child)
     
     	return true;
     }
    +
    +
    +
    +
    +/*
    + *	collect status from a completed worker subchild
    + */
    +static bool
    +reap_subchild(bool wait_for_child)
    +{
    +#ifndef WIN32
    +	int			work_status;
    +	pid_t		child;
    +#else
    +	int			thread_num;
    +	DWORD		res;
    +#endif
    +
    +	if (user_opts.jobs <= 1 || current_jobs == 0)
    +		return false;
    +
    +#ifndef WIN32
    +	child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
    +	if (child == (pid_t) -1)
    +		pg_fatal("waitpid() failed: %s\n", strerror(errno));
    +	if (child == 0)
    +		return false;			/* no children, or no dead children */
    +	if (work_status != 0)
    +		pg_fatal("child process exited abnormally: status %d\n", work_status);
    +#endif
    +
    +	/* do this after job has been removed */
    +	current_jobs--;
    +
    +	return true;
    +}
    diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
    index 22169f1002..adcb24ffea 100644
    --- a/src/bin/pg_upgrade/pg_upgrade.h
    +++ b/src/bin/pg_upgrade/pg_upgrade.h
    @@ -282,6 +282,7 @@ typedef struct
     	bool		do_sync;		/* flush changes to disk */
     	transferMode transfer_mode; /* copy files or link them? */
     	int			jobs;			/* number of processes/threads to use */
    +	int			jobs_per_disk;			/* number of processes/threads to use */
     	char	   *socketdir;		/* directory to use for Unix sockets */
     } UserOpts;
     
    @@ -450,4 +451,7 @@ void		parallel_exec_prog(const char *log_file, const char *opt_log_file,
     void		parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
     										  char *old_pgdata, char *new_pgdata,
     										  char *old_tablespace);
    +
    +void 		process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
    +void 		parallel_process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
     bool		reap_child(bool wait_for_child);
    diff --git a/src/bin/pg_upgrade/relfilenode.c b/src/bin/pg_upgrade/relfilenode.c
    index 5dbefbceaf..8a7c49efaa 100644
    --- a/src/bin/pg_upgrade/relfilenode.c
    +++ b/src/bin/pg_upgrade/relfilenode.c
    @@ -17,6 +17,7 @@
     
     static void transfer_single_new_db(FileNameMap *maps, int size, char *old_tablespace);
     static void transfer_relfile(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit);
    +void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file);
     
     
     /*
    @@ -232,30 +233,40 @@ transfer_relfile(FileNameMap *map, const char *type_suffix, bool vm_must_add_fro
     		/* Copying files might take some time, so give feedback. */
     		pg_log(PG_STATUS, "%s", old_file);
     
    -		if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0)
    +		parallel_process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file);
    +	}
    +}
    +
    +
    +
    +void
    +process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file)
    +{
    +
    +	if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0)
    +	{
    +		/* Need to rewrite visibility map format */
    +		pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n",
    +			   old_file, new_file);
    +		rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname);
    +	}
    +	else
    +		switch (user_opts.transfer_mode)
     		{
    -			/* Need to rewrite visibility map format */
    -			pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n",
    -				   old_file, new_file);
    -			rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname);
    +			case TRANSFER_MODE_CLONE:
    +				pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n",
    +					   old_file, new_file);
    +				cloneFile(old_file, new_file, map->nspname, map->relname);
    +				break;
    +			case TRANSFER_MODE_COPY:
    +				pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n",
    +					   old_file, new_file);
    +				copyFile(old_file, new_file, map->nspname, map->relname);
    +				break;
    +			case TRANSFER_MODE_LINK:
    +				pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n",
    +					   old_file, new_file);
    +				linkFile(old_file, new_file, map->nspname, map->relname);
    +				break;
     		}
    -		else
    -			switch (user_opts.transfer_mode)
    -			{
    -				case TRANSFER_MODE_CLONE:
    -					pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n",
    -						   old_file, new_file);
    -					cloneFile(old_file, new_file, map->nspname, map->relname);
    -					break;
    -				case TRANSFER_MODE_COPY:
    -					pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n",
    -						   old_file, new_file);
    -					copyFile(old_file, new_file, map->nspname, map->relname);
    -					break;
    -				case TRANSFER_MODE_LINK:
    -					pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n",
    -						   old_file, new_file);
    -					linkFile(old_file, new_file, map->nspname, map->relname);
    -			}
    -	}
     }
    -- 
    2.20.1
    
    
    --/LRcthy5hmXY96z2--