0004-Parallel-Backup-pg_basebackup_v9.patch

application/octet-stream

Filename: 0004-Parallel-Backup-pg_basebackup_v9.patch
Type: application/octet-stream
Part: 1
Message: Re: WIP/PoC for parallel backup

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v9-0004
Subject: Parallel Backup - pg_basebackup
File+
src/bin/pg_basebackup/pg_basebackup.c 1015 65
From 945cd4b33f3b98bddf849fcca3c2a091248f0142 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Mon, 27 Jan 2020 18:56:21 +0500
Subject: [PATCH 4/6] Parallel Backup - pg_basebackup

Implements the replication commands added in the backend replication
system and adds support for --jobs=NUM in pg_basebackup to take a full
backup in parallel using multiple connections. The utility will collect
a list of files from the server first and then workers will copy files
(one by one) over COPY protocol. The WAL files are also copied in similar
manner.
---
 src/bin/pg_basebackup/pg_basebackup.c | 1080 +++++++++++++++++++++++--
 1 file changed, 1015 insertions(+), 65 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 48bd838803b..7e392889809 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -13,6 +13,7 @@
 
 #include "postgres_fe.h"
 
+#include <pthread.h>
 #include <unistd.h>
 #include <dirent.h>
 #include <sys/stat.h>
@@ -85,12 +86,65 @@ typedef struct UnpackTarState
 	const char *mapped_tblspc_path;
 	pgoff_t		current_len_left;
 	int			current_padding;
+	size_t		current_bytes_read;
 	FILE	   *file;
 } UnpackTarState;
 
 typedef void (*WriteDataCallback) (size_t nbytes, char *buf,
 								   void *callback_data);
 
+typedef struct BackupFile
+{
+	char		path[MAXPGPATH];
+	char		type;
+	int32		size;
+	time_t		mtime;
+
+	int			tsindex;		/* index of tsInfo this file belongs to. */
+	struct BackupFile *next;
+} BackupFile;
+
+typedef enum BackupState
+{
+	PB_FETCH_REL_LIST,
+	PB_FETCH_REL_FILES,
+	PB_FETCH_WAL_LIST,
+	PB_FETCH_WAL_FILES,
+	PB_STOP_BACKUP,
+	PB_BACKUP_COMPLETE
+} BackupState;
+
+typedef struct BackupInfo
+{
+	int			totalfiles;
+	uint64		bytes_skipped;
+	char		xlogstart[64];
+	char		xlogend[64];
+	BackupFile *files;			/* list of BackupFile pointers */
+	BackupFile *curr;			/* pointer to the file in the list */
+	BackupState backupstate;
+	bool		workersdone;
+	int			activeworkers;
+} BackupInfo;
+
+typedef struct WorkerState
+{
+	pthread_t	worker;
+	int			workerid;
+	BackupInfo *backupinfo;
+	PGconn	   *conn;
+	uint64		bytesread;
+} WorkerState;
+
+BackupInfo *backupinfo = NULL;
+WorkerState *workers = NULL;
+
+/* lock to be used for fetching file from the files list. */
+static pthread_mutex_t fetch_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* condition to be used when the files list is filled. */
+static pthread_cond_t data_ready = PTHREAD_COND_INITIALIZER;
+
 /*
  * pg_xlog has been renamed to pg_wal in version 10.  This version number
  * should be compared with PQserverVersion().
@@ -144,6 +198,9 @@ static bool found_existing_xlogdir = false;
 static bool made_tablespace_dirs = false;
 static bool found_tablespace_dirs = false;
 
+static int	numWorkers = 1;
+static PGresult *tablespacehdr;
+
 /* Progress counters */
 static uint64 totalsize_kb;
 static uint64 totaldone;
@@ -174,10 +231,12 @@ static PQExpBuffer recoveryconfcontents = NULL;
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
 static void progress_report(int tablespacenum, const char *filename, bool force);
+static void workers_progress_report(uint64 totalBytesRead,
+									const char *filename, bool force);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
-static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
+static int	ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf,
 										 void *callback_data);
 static void BaseBackup(void);
@@ -188,6 +247,22 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 static const char *get_tablespace_mapping(const char *dir);
 static void tablespace_list_append(const char *arg);
 
+static void *worker_run(void *arg);
+static void create_parallel_workers(BackupInfo *backupInfo);
+static void parallel_backup_run(BackupInfo *backupInfo);
+static void cleanup_workers(void);
+static void stop_backup(void);
+static void get_backup_filelist(PGconn *conn, BackupInfo *backupInfo);
+static void get_wal_filelist(PGconn *conn, BackupInfo *backupInfo,
+							 char *xlogstart, char *xlogend);
+static void free_filelist(BackupInfo *backupInfo);
+static int	worker_get_files(WorkerState *wstate);
+static int	receive_file(PGconn *conn, char *file, int tsIndex);
+static void create_backup_dirs(bool basetablespace, char *tablespace,
+							   char *name);
+static void create_tblspc_symlink(char *filename);
+static void writefile(char *path, char *buf);
+static int	fetch_max_wal_senders(PGconn *conn);
 
 static void
 cleanup_directories_atexit(void)
@@ -239,6 +314,8 @@ cleanup_directories_atexit(void)
 static void
 disconnect_atexit(void)
 {
+	cleanup_workers();
+
 	if (conn != NULL)
 		PQfinish(conn);
 }
@@ -386,6 +463,7 @@ usage(void)
 	printf(_("      --no-slot          prevent creation of temporary replication slot\n"));
 	printf(_("      --no-verify-checksums\n"
 			 "                         do not verify checksums\n"));
+	printf(_("  -j, --jobs=NUM         use this many parallel jobs to backup\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -733,6 +811,94 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found)
 	}
 }
 
+/*
+ * Print a progress report of worker threads. If verbose output
+ * is enabled, also print the current file name.
+ *
+ * Progress report is written at maximum once per second, unless the
+ * force parameter is set to true.
+ */
+static void
+workers_progress_report(uint64 totalBytesRead, const char *filename, bool force)
+{
+	int			percent;
+	char		totalBytesRead_str[32];
+	char		totalsize_str[32];
+	pg_time_t	now;
+
+	if (!showprogress)
+		return;
+
+	now = time(NULL);
+	if (now == last_progress_report && !force)
+		return;					/* Max once per second */
+
+	last_progress_report = now;
+	percent = totalsize_kb ? (int) ((totalBytesRead / 1024) * 100 / totalsize_kb) : 0;
+
+	/*
+	 * Avoid overflowing past 100% or the full size. This may make the total
+	 * size number change as we approach the end of the backup (the estimate
+	 * will always be wrong if WAL is included), but that's better than having
+	 * the done column be bigger than the total.
+	 */
+	if (percent > 100)
+		percent = 100;
+	if (totalBytesRead / 1024 > totalsize_kb)
+		totalsize_kb = totalBytesRead / 1024;
+
+	/*
+	 * Separate step to keep platform-dependent format code out of
+	 * translatable strings.  And we only test for INT64_FORMAT availability
+	 * in snprintf, not fprintf.
+	 */
+	snprintf(totalBytesRead_str, sizeof(totalBytesRead_str), INT64_FORMAT,
+			 totalBytesRead / 1024);
+	snprintf(totalsize_str, sizeof(totalsize_str), INT64_FORMAT, totalsize_kb);
+
+#define VERBOSE_FILENAME_LENGTH 35
+
+	if (verbose)
+	{
+		if (!filename)
+
+			/*
+			 * No filename given, so clear the status line (used for last
+			 * call)
+			 */
+			fprintf(stderr, _("%*s/%s kB (%d%%) copied %*s"),
+					(int) strlen(totalsize_str),
+					totalBytesRead_str, totalsize_str,
+					percent,
+					VERBOSE_FILENAME_LENGTH + 5, "");
+		else
+		{
+			bool		truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH);
+
+			fprintf(stderr, _("%*s/%s kB (%d%%) copied, current file (%s%-*.*s)"),
+					(int) strlen(totalsize_str), totalBytesRead_str, totalsize_str,
+					percent,
+			/* Prefix with "..." if we do leading truncation */
+					truncate ? "..." : "",
+					truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+					truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+			/* Truncate filename at beginning if it's too long */
+					truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename);
+		}
+	}
+	else
+	{
+		fprintf(stderr, _("%*s/%s kB (%d%%) copied"),
+				(int) strlen(totalsize_str),
+				totalBytesRead_str, totalsize_str,
+				percent);
+	}
+
+	if (isatty(fileno(stderr)))
+		fprintf(stderr, "\r");
+	else
+		fprintf(stderr, "\n");
+}
 
 /*
  * Print a progress report based on the global variables. If verbose output
@@ -749,7 +915,7 @@ progress_report(int tablespacenum, const char *filename, bool force)
 	char		totalsize_str[32];
 	pg_time_t	now;
 
-	if (!showprogress)
+	if (!showprogress || numWorkers > 1)
 		return;
 
 	now = time(NULL);
@@ -1439,7 +1605,7 @@ get_tablespace_mapping(const char *dir)
  * specified directory. If it's for another tablespace, it will be restored
  * in the original or mapped directory.
  */
-static void
+static int
 ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 {
 	UnpackTarState state;
@@ -1470,13 +1636,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 		exit(1);
 	}
 
-	if (basetablespace && writerecoveryconf)
-		WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
-
 	/*
 	 * No data is synced here, everything is done for all tablespaces at the
 	 * end.
 	 */
+
+	return state.current_bytes_read;
 }
 
 static void
@@ -1499,6 +1664,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
 			exit(1);
 		}
 		totaldone += 512;
+		state->current_bytes_read += 512;
 
 		state->current_len_left = read_tar_number(&copybuf[124], 12);
 
@@ -1630,6 +1796,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
 			fclose(state->file);
 			state->file = NULL;
 			totaldone += r;
+			state->current_bytes_read += r;
 			return;
 		}
 
@@ -1639,6 +1806,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
 			exit(1);
 		}
 		totaldone += r;
+		state->current_bytes_read += r;
 		progress_report(state->tablespacenum, state->filename, false);
 
 		state->current_len_left -= r;
@@ -1706,6 +1874,24 @@ BaseBackup(void)
 		exit(1);
 	}
 
+	if (numWorkers > 1)
+	{
+		int		max_wal_senders = fetch_max_wal_senders(conn);
+
+		/*
+		 * In parallel backup mode, pg_basebackup opens numWorkers + 2
+		 * connections. One of the two additional connections is used by the
+		 * main application while the other one is used if WAL streaming is
+		 * enabled (-X Stream).
+		 */
+		if (numWorkers + 2 > max_wal_senders)
+		{
+			pg_log_error("number of requested workers exceeds max_wal_senders (currently %d)",
+						 max_wal_senders);
+			exit(1);
+		}
+	}
+
 	/*
 	 * Build contents of configuration file if requested
 	 */
@@ -1738,16 +1924,26 @@ BaseBackup(void)
 			fprintf(stderr, "\n");
 	}
 
-	basebkp =
-		psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
-				 escaped_label,
-				 showprogress ? "PROGRESS" : "",
-				 includewal == FETCH_WAL ? "WAL" : "",
-				 fastcheckpoint ? "FAST" : "",
-				 includewal == NO_WAL ? "" : "NOWAIT",
-				 maxrate_clause ? maxrate_clause : "",
-				 format == 't' ? "TABLESPACE_MAP" : "",
-				 verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+	if (numWorkers <= 1)
+	{
+		basebkp =
+			psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+					 escaped_label,
+					 showprogress ? "PROGRESS" : "",
+					 includewal == FETCH_WAL ? "WAL" : "",
+					 fastcheckpoint ? "FAST" : "",
+					 includewal == NO_WAL ? "" : "NOWAIT",
+					 maxrate_clause ? maxrate_clause : "",
+					 format == 't' ? "TABLESPACE_MAP" : "",
+					 verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+	}
+	else
+	{
+		basebkp =
+			psprintf("START_BACKUP LABEL '%s' %s",
+					 escaped_label,
+					 fastcheckpoint ? "FAST" : "");
+	}
 
 	if (PQsendQuery(conn, basebkp) == 0)
 	{
@@ -1794,10 +1990,36 @@ BaseBackup(void)
 		pg_log_info("write-ahead log start point: %s on timeline %u",
 					xlogstart, starttli);
 
+	if (numWorkers > 1)
+	{
+		/*
+		 * Finish up the START_BACKUP command execution and make sure we have
+		 * CommandComplete.
+		 */
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not get data for '%s': %s", "START_BACKUP",
+						 PQerrorMessage(conn));
+			exit(1);
+		}
+		res = PQgetResult(conn);
+
+		basebkp = psprintf("LIST_TABLESPACES %s",
+						   showprogress ? "PROGRESS" : "");
+
+		if (PQsendQuery(conn, basebkp) == 0)
+		{
+			pg_log_error("could not send replication command \"%s\": %s",
+						 "LIST_TABLESPACES", PQerrorMessage(conn));
+			exit(1);
+		}
+	}
+
 	/*
 	 * Get the header
 	 */
-	res = PQgetResult(conn);
+	tablespacehdr = res = PQgetResult(conn);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		pg_log_error("could not get backup header: %s",
@@ -1853,65 +2075,98 @@ BaseBackup(void)
 		StartLogStreamer(xlogstart, starttli, sysidentifier);
 	}
 
-	/*
-	 * Start receiving chunks
-	 */
-	for (i = 0; i < PQntuples(res); i++)
-	{
-		if (format == 't')
-			ReceiveTarFile(conn, res, i);
-		else
-			ReceiveAndUnpackTarFile(conn, res, i);
-	}							/* Loop over all tablespaces */
-
-	if (showprogress)
+	if (numWorkers <= 1)
 	{
-		progress_report(PQntuples(res), NULL, true);
-		if (isatty(fileno(stderr)))
-			fprintf(stderr, "\n");	/* Need to move to next line */
-	}
+		/*
+		 * Start receiving chunks
+		 */
+		for (i = 0; i < PQntuples(res); i++)
+		{
+			if (format == 't')
+				ReceiveTarFile(conn, res, i);
+			else
+				ReceiveAndUnpackTarFile(conn, res, i);
+		}						/* Loop over all tablespaces */
 
-	PQclear(res);
+		if (showprogress)
+		{
+			progress_report(PQntuples(tablespacehdr), NULL, true);
+			if (isatty(fileno(stderr)))
+				fprintf(stderr, "\n");	/* Need to move to next line */
+		}
 
-	/*
-	 * Get the stop position
-	 */
-	res = PQgetResult(conn);
-	if (PQresultStatus(res) != PGRES_TUPLES_OK)
-	{
-		pg_log_error("could not get write-ahead log end position from server: %s",
-					 PQerrorMessage(conn));
-		exit(1);
-	}
-	if (PQntuples(res) != 1)
-	{
-		pg_log_error("no write-ahead log end position returned from server");
-		exit(1);
-	}
-	strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));
-	if (verbose && includewal != NO_WAL)
-		pg_log_info("write-ahead log end point: %s", xlogend);
-	PQclear(res);
+		PQclear(res);
 
-	res = PQgetResult(conn);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-	{
-		const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+		/*
+		 * Get the stop position
+		 */
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not get write-ahead log end position from server: %s",
+						 PQerrorMessage(conn));
+			exit(1);
+		}
+		if (PQntuples(res) != 1)
+		{
+			pg_log_error("no write-ahead log end position returned from server");
+			exit(1);
+		}
+		strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));
+		if (verbose && includewal != NO_WAL)
+			pg_log_info("write-ahead log end point: %s", xlogend);
+		PQclear(res);
 
-		if (sqlstate &&
-			strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0)
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
-			pg_log_error("checksum error occurred");
-			checksum_failure = true;
+			const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+
+			if (sqlstate &&
+				strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0)
+			{
+				pg_log_error("checksum error occurred");
+				checksum_failure = true;
+			}
+			else
+			{
+				pg_log_error("final receive failed: %s",
+							 PQerrorMessage(conn));
+			}
+			exit(1);
 		}
-		else
+	}
+
+	if (numWorkers > 1)
+	{
+		/*
+		 * Finish up the LIST_TABLESPACES command execution and make sure we
+		 * have CommandComplete.
+		 */
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
-			pg_log_error("final receive failed: %s",
+			pg_log_error("could not get data for '%s': %s", "LIST_TABLESPACES",
 						 PQerrorMessage(conn));
+			exit(1);
 		}
-		exit(1);
+		res = PQgetResult(conn);
+
+		backupinfo = palloc0(sizeof(BackupInfo));
+		backupinfo->backupstate = PB_FETCH_REL_LIST;
+
+		/* copy starting WAL location */
+		strlcpy(backupinfo->xlogstart, xlogstart, sizeof(backupinfo->xlogstart));
+		create_parallel_workers(backupinfo);
+		parallel_backup_run(backupinfo);
+		/* copy ending WAL location */
+		strlcpy(xlogend, backupinfo->xlogend, sizeof(xlogend));
 	}
 
+	/* Write recovery contents */
+	if (format == 'p' && writerecoveryconf)
+		WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
+
 	if (bgchild > 0)
 	{
 #ifndef WIN32
@@ -2066,6 +2321,7 @@ main(int argc, char **argv)
 		{"waldir", required_argument, NULL, 1},
 		{"no-slot", no_argument, NULL, 2},
 		{"no-verify-checksums", no_argument, NULL, 3},
+		{"jobs", required_argument, NULL, 'j'},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -2093,7 +2349,7 @@ main(int argc, char **argv)
 
 	atexit(cleanup_directories_atexit);
 
-	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -2234,6 +2490,9 @@ main(int argc, char **argv)
 			case 3:
 				verify_checksums = false;
 				break;
+			case 'j':			/* number of jobs */
+				numWorkers = atoi(optarg);
+				break;
 			default:
 
 				/*
@@ -2348,6 +2607,30 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (numWorkers <= 0)
+	{
+		pg_log_error("invalid number of parallel jobs");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (format != 'p' && numWorkers > 1)
+	{
+		pg_log_error("parallel jobs are only supported with 'plain' format");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (maxrate > 0 && numWorkers > 1)
+	{
+		pg_log_error("--max-rate is not supported with parallel jobs");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel != 0)
 	{
@@ -2420,3 +2703,670 @@ main(int argc, char **argv)
 	success = true;
 	return 0;
 }
+
+/*
+ * Worker thread function. Added for code readability.
+ */
+static void *
+worker_run(void *arg)
+{
+	WorkerState *wstate = (WorkerState *) arg;
+
+	worker_get_files(wstate);
+
+	return NULL;
+}
+
+/*
+ * Create workers and initialize worker state.
+ */
+static void
+create_parallel_workers(BackupInfo *backupinfo)
+{
+	int			status,
+				i;
+
+	workers = (WorkerState *) palloc(sizeof(WorkerState) * numWorkers);
+	backupinfo->activeworkers = 0;
+
+	for (i = 0; i < numWorkers; i++)
+	{
+		WorkerState *worker = &workers[i];
+
+		worker->backupinfo = backupinfo;
+		worker->bytesread = 0;
+		worker->workerid = i;
+		worker->conn = GetConnection();
+		backupinfo->activeworkers++;
+
+		status = pthread_create(&worker->worker, NULL, worker_run, worker);
+		if (status != 0)
+		{
+			pg_log_error("failed to create thread: %m");
+			exit(1);
+		}
+
+		if (verbose)
+			pg_log_info("backup worker (%d) created, %d", i, status);
+	}
+}
+
+/*
+ * This is the main function that controls the worker, assign tasks and does
+ * cleanup.
+ */
+static void
+parallel_backup_run(BackupInfo *backupinfo)
+{
+	uint64_t	totalread = 0;
+
+	while (1)
+	{
+		char	   *filename = NULL;
+
+		switch (backupinfo->backupstate)
+		{
+			case PB_FETCH_REL_LIST: /* get the list of files to fetch */
+				backupinfo->backupstate = PB_FETCH_REL_FILES;
+				/* retrieve backup file list from the server. */
+				get_backup_filelist(conn, backupinfo);
+				/* unblock any workers waiting on the condition */
+				pthread_cond_broadcast(&data_ready);
+				break;
+			case PB_FETCH_REL_FILES:	/* fetch files from server */
+				if (backupinfo->activeworkers == 0)
+				{
+					backupinfo->backupstate = PB_STOP_BACKUP;
+					free_filelist(backupinfo);
+				}
+				break;
+			case PB_FETCH_WAL_LIST: /* get the list of WAL files to fetch */
+				backupinfo->backupstate = PB_FETCH_WAL_FILES;
+				get_wal_filelist(conn, backupinfo, backupinfo->xlogstart, backupinfo->xlogend);
+				/* unblock any workers waiting on the condition */
+				pthread_cond_broadcast(&data_ready);
+				break;
+			case PB_FETCH_WAL_FILES:	/* fetch WAL files from server */
+				if (backupinfo->activeworkers == 0)
+				{
+					backupinfo->backupstate = PB_BACKUP_COMPLETE;
+				}
+				break;
+			case PB_STOP_BACKUP:
+
+				/*
+				 * All relation files have been fetched, time to stop the
+				 * backup, making sure to fetch the WAL files first (if needs
+				 * be).
+				 */
+				if (includewal == FETCH_WAL)
+					backupinfo->backupstate = PB_FETCH_WAL_LIST;
+				else
+					backupinfo->backupstate = PB_BACKUP_COMPLETE;
+
+				/* get the pg_control file at last. */
+				receive_file(conn, "global/pg_control", tablespacecount - 1);
+				stop_backup();
+				break;
+			case PB_BACKUP_COMPLETE:
+
+				/*
+				 * All relation and WAL files, (if needed) have been fetched,
+				 * now we can safly stop all workers and finish up.
+				 */
+				cleanup_workers();
+				if (showprogress)
+				{
+					workers_progress_report(totalread, NULL, true);
+					if (isatty(fileno(stderr)))
+						fprintf(stderr, "\n");	/* Need to move to next line */
+				}
+
+				/* nothing more to do here */
+				return;
+				break;
+			default:
+				/* shouldn't come here. */
+				pg_log_error("unexpected backup state: %d",
+							 backupinfo->backupstate);
+				exit(1);
+				break;
+		}
+
+		/* update and report progress */
+		totalread = 0;
+		for (int i = 0; i < numWorkers; i++)
+		{
+			WorkerState *worker = &workers[i];
+
+			totalread += worker->bytesread;
+		}
+		totalread += backupinfo->bytes_skipped;
+
+		if (backupinfo->curr != NULL)
+			filename = backupinfo->curr->path;
+
+		workers_progress_report(totalread, filename, false);
+		pg_usleep(100000);
+	}
+}
+
+/*
+ * Wait for the workers to complete the work and free connections.
+ */
+static void
+cleanup_workers(void)
+{
+	/* either non parallel backup */
+	if (!backupinfo)
+		return;
+	/* workers have already been stopped and cleanup has been done. */
+	if (backupinfo->workersdone)
+		return;
+
+	backupinfo->workersdone = true;
+	/* wakeup any workers waiting on the condition */
+	pthread_cond_broadcast(&data_ready);
+
+	for (int i = 0; i < numWorkers; i++)
+	{
+		pthread_join(workers[i].worker, NULL);
+		PQfinish(workers[i].conn);
+	}
+	free_filelist(backupinfo);
+}
+
+/*
+ * Take the system out of backup mode, also adds the backup_label file in
+ * the backup.
+ */
+static void
+stop_backup(void)
+{
+	PGresult   *res = NULL;
+	char	   *basebkp;
+
+	basebkp = psprintf("STOP_BACKUP %s",
+					   includewal == NO_WAL ? "" : "NOWAIT");
+	if (PQsendQuery(conn, basebkp) == 0)
+	{
+		pg_log_error("could not execute STOP BACKUP \"%s\"",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+
+	/*
+	 * Get the stop position
+	 */
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not get write-ahead log end position from server: %s",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+	if (PQntuples(res) != 1)
+	{
+		pg_log_error("no write-ahead log end position returned from server");
+		exit(1);
+	}
+
+	/* retrieve the end wal location. */
+	strlcpy(backupinfo->xlogend, PQgetvalue(res, 0, 0),
+			sizeof(backupinfo->xlogend));
+
+	/* retrieve the backup_label file contents and write them to the backup */
+	writefile("backup_label", PQgetvalue(res, 0, 2));
+
+	PQclear(res);
+
+	/*
+	 * Finish up the Stop command execution and make sure we have
+	 * CommandComplete and ReadyForQuery response.
+	 */
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not get data %s", PQerrorMessage(conn));
+		exit(1);
+	}
+	res = PQgetResult(conn);
+
+	if (verbose && includewal != NO_WAL)
+		pg_log_info("write-ahead log end point: %s", backupinfo->xlogend);
+}
+
+/*
+ * Retrieves the list of files available in $PGDATA from the server.
+ */
+static void
+get_backup_filelist(PGconn *conn, BackupInfo *backupInfo)
+{
+	PGresult   *res = NULL;
+	char	   *basebkp;
+
+	for (int i = 0; i < tablespacecount; i++)
+	{
+		bool		basetablespace;
+		char	   *tablespace;
+		int			numFiles;
+
+		/*
+		 * Query server to fetch the file list for given tablespace name. If
+		 * the tablespace name is empty, it will fetch files list of 'base'
+		 * tablespace.
+		 */
+		basetablespace = PQgetisnull(tablespacehdr, i, 0);
+		tablespace = PQgetvalue(tablespacehdr, i, 1);
+
+		basebkp = psprintf("LIST_FILES '%s'",
+						   basetablespace ? "" : tablespace);
+		if (PQsendQuery(conn, basebkp) == 0)
+		{
+			pg_log_error("could not send replication command \"%s\": %s",
+						 "LIST_FILES", PQerrorMessage(conn));
+			exit(1);
+		}
+
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not list backup files: %s",
+						 PQerrorMessage(conn));
+			exit(1);
+		}
+		if (PQntuples(res) < 1)
+		{
+			pg_log_error("no data returned from server");
+			exit(1);
+		}
+
+		numFiles = PQntuples(res);
+		for (int j = 0; j < numFiles; j++)
+		{
+			BackupFile *file;
+			char	   *path = PQgetvalue(res, j, 0);
+			char		type = PQgetvalue(res, j, 1)[0];
+			int32		size = atol(PQgetvalue(res, j, 2));
+			time_t		mtime = atol(PQgetvalue(res, j, 3));
+
+			/*
+			 * In 'plain' format, create backup directories first.
+			 */
+			if (format == 'p' && type == 'd')
+			{
+				/*
+				 * directory entries are skipped. however, a tar header size
+				 * was included for them in totalsize_kb, so we need to add it
+				 * for progress reporting purpose.
+				 */
+				backupInfo->bytes_skipped += 512;
+				create_backup_dirs(basetablespace, tablespace, path);
+				continue;
+			}
+
+			if (format == 'p' && type == 'l')
+			{
+				/*
+				 * symlink entries are skipped. however, a tar header size was
+				 * included for them in totalsize_kb, so we need to add it for
+				 * progress reporting purpose.
+				 */
+				backupInfo->bytes_skipped += 512;
+				create_tblspc_symlink(path);
+				continue;
+			}
+
+			file = (BackupFile *) palloc(sizeof(BackupFile));
+			strlcpy(file->path, path, MAXPGPATH);
+			file->type = type;
+			file->size = size;
+			file->mtime = mtime;
+			file->tsindex = i;
+
+			/* add to the files list */
+			backupInfo->totalfiles++;
+			if (backupInfo->curr == NULL)
+				backupInfo->curr = backupInfo->files = file;
+			else
+			{
+				backupInfo->curr->next = file;
+				backupInfo->curr = backupInfo->curr->next;
+			}
+		}
+
+		PQclear(res);
+
+		/*
+		 * Finish up the LIST_FILES command execution and make sure we have
+		 * CommandComplete.
+		 */
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not get data for '%s': %s", "LIST_FILES",
+						 PQerrorMessage(conn));
+			exit(1);
+		}
+		res = PQgetResult(conn);
+	}
+
+	/* point curr to the head of list. */
+	backupInfo->curr = backupInfo->files;
+}
+
+/*
+ * Retrieve WAL file list from the server based on the starting wal location
+ * and ending wal location.
+ */
+static void
+get_wal_filelist(PGconn *conn, BackupInfo *backupInfo, char *xlogstart, char *xlogend)
+{
+	PGresult   *res = NULL;
+	char	   *basebkp;
+	int			numWals;
+
+	basebkp = psprintf("LIST_WAL_FILES START_WAL_LOCATION '%s' END_WAL_LOCATION '%s'",
+					   xlogstart, xlogend);
+
+	if (PQsendQuery(conn, basebkp) == 0)
+	{
+		pg_log_error("could not send replication command \"%s\": %s",
+					 "LIST_FILES", PQerrorMessage(conn));
+		exit(1);
+	}
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not list wal files: %s",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+
+	numWals = PQntuples(res);
+	for (int i = 0; i < numWals; i++)
+	{
+		BackupFile *file = (BackupFile *) palloc0(sizeof(BackupFile));
+
+		if (backupInfo->curr == NULL)
+			backupInfo->curr = backupInfo->files = file;
+		else
+		{
+			backupInfo->curr->next = file;
+			backupInfo->curr = file;
+		}
+
+		strlcpy(file->path, PQgetvalue(res, i, 0), MAXPGPATH);
+		file->tsindex = tablespacecount - 1;
+		backupInfo->totalfiles++;
+	}
+
+	/*
+	 * Finish up the LIST_WAL_FILES command execution and make sure we have
+	 * CommandComplete.
+	 */
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not get data for '%s': %s", "LIST_WAL_FILES",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+	res = PQgetResult(conn);
+
+	/* point curr to the head of list. */
+	backupInfo->curr = backupInfo->files;
+}
+
+/* free files list */
+static void
+free_filelist(BackupInfo *backupInfo)
+{
+	/* free files list */
+	if (backupInfo->files != NULL)
+	{
+		backupInfo->curr = backupInfo->files;
+		while (backupInfo->curr != NULL)
+		{
+			BackupFile *file = backupInfo->curr;
+
+			backupInfo->curr = file->next;
+
+			pfree(file);
+		}
+
+		backupInfo->files = NULL;
+		backupInfo->totalfiles = 0;
+	}
+}
+
+/*
+ * Worker function to process and retrieve the files from the server. If the
+ * files list is empty, it will wait for it to be filled. Otherwise picks the
+ * next file in the list.
+ */
+static int
+worker_get_files(WorkerState *wstate)
+{
+	BackupFile *fetchfile = NULL;
+	BackupInfo *backupinfo = wstate->backupinfo;
+
+	while (!backupinfo->workersdone)
+	{
+		pthread_mutex_lock(&fetch_mutex);
+		if (backupinfo->curr == NULL)
+		{
+			/*
+			 * Wait until there is data available in the list to process.
+			 * pthread_cond_wait call unlocks the already locked mutex during
+			 * the wait state. When the condition is true (a signal is
+			 * raised), one of the competing threads acquires the mutex.
+			 */
+			backupinfo->activeworkers--;
+			pthread_cond_wait(&data_ready, &fetch_mutex);
+			backupinfo->activeworkers++;
+		}
+
+		fetchfile = backupinfo->curr;
+		if (fetchfile != NULL)
+		{
+			backupinfo->totalfiles--;
+			backupinfo->curr = fetchfile->next;
+		}
+		pthread_mutex_unlock(&fetch_mutex);
+
+		if (fetchfile != NULL)
+		{
+			wstate->bytesread +=
+				receive_file(wstate->conn, fetchfile->path, fetchfile->tsindex);
+		}
+	}
+
+	return 0;
+}
+
+/*
+ * This function fetches the requested file from the server.
+ */
+static int
+receive_file(PGconn *conn, char *file, int tsIndex)
+{
+	PGresult   *res = NULL;
+	int			bytesread;
+	PQExpBuffer buf = createPQExpBuffer();
+
+	/*
+	 * Fetch a single file from the server. To fetch the file, build a query
+	 * in form of:
+	 *
+	 * SEND_FILES ('base/1/1245/32683') [options]
+	 */
+	appendPQExpBuffer(buf, "SEND_FILES ( '%s' )", file);
+
+	/* add options */
+	appendPQExpBuffer(buf, " START_WAL_LOCATION '%s' %s",
+					  backupinfo->xlogstart,
+					  verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+	if (!conn)
+		return 1;
+
+	if (PQsendQuery(conn, buf->data) == 0)
+	{
+		pg_log_error("could not send files list \"%s\"",
+					 PQerrorMessage(conn));
+		return 1;
+	}
+
+	destroyPQExpBuffer(buf);
+
+	/* process file contents, also count bytesRead for progress */
+	bytesread = ReceiveAndUnpackTarFile(conn, tablespacehdr, tsIndex);
+
+	PQclear(res);
+
+	/*
+	 * Finish up the SEND_FILES command execution and make sure we have
+	 * CommandComplete.
+	 */
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not get data for '%s': %s", "SEND_FILES",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+	res = PQgetResult(conn);
+	return bytesread;
+}
+
+/*
+ * Create backup directories while taking care of tablespace path. If tablespace
+ * mapping (with -T) is given then the directory will be created on the mapped
+ * path.
+ */
+static void
+create_backup_dirs(bool basetablespace, char *tablespace, char *name)
+{
+	char		dirpath[MAXPGPATH];
+
+	Assert(name != NULL);
+
+	if (basetablespace)
+		snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name);
+	else
+	{
+		Assert(tablespace != NULL);
+		snprintf(dirpath, sizeof(dirpath), "%s/%s",
+				 get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1));
+	}
+
+	if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0)
+	{
+		if (errno != EEXIST)
+		{
+			pg_log_error("could not create directory \"%s\": %m",
+						 dirpath);
+			exit(1);
+		}
+	}
+}
+
+/*
+ * Create a symlink in pg_tblspc and apply any tablespace mapping given on
+ * the command line (--tablespace-mapping).
+ */
+static void
+create_tblspc_symlink(char *filename)
+{
+	int			i;
+
+	for (i = 0; i < tablespacecount; i++)
+	{
+		char	   *tsoid = PQgetvalue(tablespacehdr, i, 0);
+
+		if (strstr(filename, tsoid) != NULL)
+		{
+			char	   *linkloc = psprintf("%s/%s", basedir, filename);
+			const char *mapped_tblspc_path = get_tablespace_mapping(PQgetvalue(tablespacehdr, i, 1));
+
+#ifdef HAVE_SYMLINK
+			if (symlink(mapped_tblspc_path, linkloc) != 0)
+			{
+				pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
+							 linkloc, mapped_tblspc_path);
+				exit(1);
+			}
+#else
+			pg_log_error("symlinks are not supported on this platform");
+			exit(1);
+#endif
+			free(linkloc);
+			break;
+		}
+	}
+}
+
+/*
+ * General function for writing to a file; creates one if it doesn't exist
+ */
+static void
+writefile(char *path, char *buf)
+{
+	FILE	   *f;
+	char		pathbuf[MAXPGPATH];
+
+	snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path);
+	f = fopen(pathbuf, "w");
+	if (f == NULL)
+	{
+		pg_log_error("could not open file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+
+	if (fwrite(buf, strlen(buf), 1, f) != 1)
+	{
+		pg_log_error("could not write to file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+
+	if (fclose(f))
+	{
+		pg_log_error("could not write to file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+}
+
+static int
+fetch_max_wal_senders(PGconn *conn)
+{
+	PGresult   *res;
+	int			max_wal_senders;
+
+	/* check connection existence */
+	Assert(conn != NULL);
+
+	res = PQexec(conn, "SHOW max_wal_senders");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not send replication command \"%s\": %s",
+					 "SHOW max_wal_senders", PQerrorMessage(conn));
+
+		PQclear(res);
+		return -1;
+	}
+
+	if (PQntuples(res) != 1 || PQnfields(res) < 1)
+	{
+		pg_log_error("could not fetch max wal senders: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 1);
+
+		PQclear(res);
+		return false;
+	}
+
+	max_wal_senders = atoi(PQgetvalue(res, 0, 0));
+	PQclear(res);
+
+	return max_wal_senders;
+}
-- 
2.21.1 (Apple Git-122.3)