0005-pg_basebackup-changes-for-parallel-backup_v4.patch

application/octet-stream

Filename: 0005-pg_basebackup-changes-for-parallel-backup_v4.patch
Type: application/octet-stream
Part: 2
Message: Re: WIP/PoC for parallel backup

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v4-0005
Subject: pg_basebackup changes for parallel backup.
File+
src/bin/pg_basebackup/pg_basebackup.c 672 38
From 16b77550d4e4e185b6bb45176301212db0edb09b Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Mon, 14 Oct 2019 17:28:58 +0500
Subject: [PATCH 5/6] pg_basebackup changes for parallel backup.

---
 src/bin/pg_basebackup/pg_basebackup.c | 710 ++++++++++++++++++++++++--
 1 file changed, 672 insertions(+), 38 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index a9d162a7da..9dd7c62933 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -19,6 +19,7 @@
 #include <sys/wait.h>
 #include <signal.h>
 #include <time.h>
+#include <pthread.h>
 #ifdef HAVE_SYS_SELECT_H
 #include <sys/select.h>
 #endif
@@ -41,6 +42,7 @@
 #include "receivelog.h"
 #include "replication/basebackup.h"
 #include "streamutil.h"
+#include "fe_utils/simple_list.h"
 
 #define ERRCODE_DATA_CORRUPTED	"XX001"
 
@@ -57,6 +59,57 @@ typedef struct TablespaceList
 	TablespaceListCell *tail;
 } TablespaceList;
 
+typedef struct
+{
+	char		path[MAXPGPATH];
+	char		type;
+	int32		size;
+	time_t		mtime;
+
+	int			tsIndex;	/* index of tsInfo this file belongs to. */
+} BackupFile;
+
+typedef struct
+{
+	Oid			tblspcOid;
+	char	   *tablespace;	 /* tablespace name or NULL if 'base' tablespace */
+	int			numFiles;	 /* number of files */
+	BackupFile *backupFiles; /* list of files in a tablespace */
+} TablespaceInfo;
+
+typedef struct
+{
+	int 	tablespacecount;
+	int		totalfiles;
+	int		numWorkers;
+
+	char	xlogstart[64];
+	char   *backup_label;
+	char   *tablespace_map;
+
+	TablespaceInfo *tsInfo;
+	BackupFile	  **files;		/* list of BackupFile pointers */
+	int				fileIndex;	/* index of file to be fetched */
+
+	PGconn	**workerConns;
+} BackupInfo;
+
+typedef struct
+{
+	BackupInfo *backupInfo;
+	uint64		bytesRead;
+
+	int			workerid;
+	pthread_t	worker;
+
+	bool	terminated;
+} WorkerState;
+
+BackupInfo *backupInfo = NULL;
+WorkerState *workers = NULL;
+
+static pthread_mutex_t fetch_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 /*
  * pg_xlog has been renamed to pg_wal in version 10.  This version number
  * should be compared with PQserverVersion().
@@ -110,6 +163,9 @@ static bool found_existing_xlogdir = false;
 static bool made_tablespace_dirs = false;
 static bool found_tablespace_dirs = false;
 
+static int	numWorkers = 1;
+static PGresult *tablespacehdr;
+
 /* Progress counters */
 static uint64 totalsize_kb;
 static uint64 totaldone;
@@ -140,9 +196,10 @@ static PQExpBuffer recoveryconfcontents = NULL;
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
 static void progress_report(int tablespacenum, const char *filename, bool force);
+static void workers_progress_report(uint64 totalBytesRead, const char *filename, bool force);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
-static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
+static int	ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void BaseBackup(void);
 
 static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
@@ -151,6 +208,17 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 static const char *get_tablespace_mapping(const char *dir);
 static void tablespace_list_append(const char *arg);
 
+static void ParallelBackupRun(BackupInfo *backupInfo);
+static void StopBackup(BackupInfo *backupInfo);
+static void GetBackupFileList(PGconn *conn, BackupInfo *backupInfo);
+static int GetBackupFile(WorkerState *wstate);
+static BackupFile *getNextFile(BackupInfo *backupInfo);
+static int	compareFileSize(const void *a, const void *b);
+static void read_label_tblspcmap(PGconn *conn, char **backup_label, char **tablespace_map);
+static void create_backup_dirs(bool basetablespace, char *tablespace, char *name);
+static void writefile(char *path, char *buf);
+static void *workerRun(void *arg);
+
 
 static void
 cleanup_directories_atexit(void)
@@ -202,6 +270,17 @@ cleanup_directories_atexit(void)
 static void
 disconnect_atexit(void)
 {
+	/* close worker connections */
+	if (backupInfo && backupInfo->workerConns != NULL)
+	{
+		int i;
+		for (i = 0; i < numWorkers; i++)
+		{
+			if (backupInfo->workerConns[i] != NULL)
+				PQfinish(backupInfo->workerConns[i]);
+		}
+	}
+
 	if (conn != NULL)
 		PQfinish(conn);
 }
@@ -349,6 +428,7 @@ usage(void)
 	printf(_("      --no-slot          prevent creation of temporary replication slot\n"));
 	printf(_("      --no-verify-checksums\n"
 			 "                         do not verify checksums\n"));
+	printf(_("  -j, --jobs=NUM         use this many parallel jobs to backup\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -695,6 +775,93 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found)
 	}
 }
 
+/*
+ * Print a progress report of worker threads. If verbose output
+ * is enabled, also print the current file name.
+ *
+ * Progress report is written at maximum once per second, unless the
+ * force parameter is set to true.
+ */
+static void
+workers_progress_report(uint64 totalBytesRead, const char *filename, bool force)
+{
+	int			percent;
+	char		totalBytesRead_str[32];
+	char		totalsize_str[32];
+	pg_time_t	now;
+
+	if (!showprogress)
+		return;
+
+	now = time(NULL);
+	if (now == last_progress_report && !force)
+		return;					/* Max once per second */
+
+	last_progress_report = now;
+	percent = totalsize_kb ? (int) ((totalBytesRead / 1024) * 100 / totalsize_kb) : 0;
+
+	/*
+	 * Avoid overflowing past 100% or the full size. This may make the total
+	 * size number change as we approach the end of the backup (the estimate
+	 * will always be wrong if WAL is included), but that's better than having
+	 * the done column be bigger than the total.
+	 */
+	if (percent > 100)
+		percent = 100;
+	if (totalBytesRead / 1024 > totalsize_kb)
+		totalsize_kb = totalBytesRead / 1024;
+
+	/*
+	 * Separate step to keep platform-dependent format code out of
+	 * translatable strings.  And we only test for INT64_FORMAT availability
+	 * in snprintf, not fprintf.
+	 */
+	snprintf(totalBytesRead_str, sizeof(totalBytesRead_str), INT64_FORMAT,
+			 totalBytesRead / 1024);
+	snprintf(totalsize_str, sizeof(totalsize_str), INT64_FORMAT, totalsize_kb);
+
+#define VERBOSE_FILENAME_LENGTH 35
+
+	if (verbose)
+	{
+		if (!filename)
+
+			/*
+			 * No filename given, so clear the status line (used for last
+			 * call)
+			 */
+			fprintf(stderr, _("%*s/%s kB (%d%%) copied %*s"),
+					(int) strlen(totalsize_str),
+					totalBytesRead_str, totalsize_str,
+					percent,
+					VERBOSE_FILENAME_LENGTH + 5, "");
+		else
+		{
+			bool		truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH);
+			fprintf(stderr, _("%*s/%s kB (%d%%) copied, current file (%s%-*.*s)"),
+					(int) strlen(totalsize_str), totalBytesRead_str, totalsize_str,
+					percent,
+					/* Prefix with "..." if we do leading truncation */
+							truncate ? "..." : "",
+							truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+							truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+					/* Truncate filename at beginning if it's too long */
+							truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename);
+		}
+	}
+	else
+	{
+		fprintf(stderr, _("%*s/%s kB (%d%%) copied"),
+				(int) strlen(totalsize_str),
+				totalBytesRead_str, totalsize_str,
+				percent);
+	}
+
+	if (isatty(fileno(stderr)))
+		fprintf(stderr, "\r");
+	else
+		fprintf(stderr, "\n");
+}
 
 /*
  * Print a progress report based on the global variables. If verbose output
@@ -711,7 +878,7 @@ progress_report(int tablespacenum, const char *filename, bool force)
 	char		totalsize_str[32];
 	pg_time_t	now;
 
-	if (!showprogress)
+	if (!showprogress || numWorkers > 1)
 		return;
 
 	now = time(NULL);
@@ -1381,7 +1548,7 @@ get_tablespace_mapping(const char *dir)
  * specified directory. If it's for another tablespace, it will be restored
  * in the original or mapped directory.
  */
-static void
+static int
 ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 {
 	char		current_path[MAXPGPATH];
@@ -1392,6 +1559,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 	bool		basetablespace;
 	char	   *copybuf = NULL;
 	FILE	   *file = NULL;
+	int			readBytes = 0;
 
 	basetablespace = PQgetisnull(res, rownum, 0);
 	if (basetablespace)
@@ -1455,7 +1623,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 				pg_log_error("invalid tar block header size: %d", r);
 				exit(1);
 			}
-			totaldone += 512;
+			readBytes += 512;
 
 			current_len_left = read_tar_number(&copybuf[124], 12);
 
@@ -1486,21 +1654,14 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 					 * Directory
 					 */
 					filename[strlen(filename) - 1] = '\0';	/* Remove trailing slash */
+
+					/*
+					 * In parallel mode, we create directories before fetching
+					 * files so its Ok if a directory already exist.
+					 */
 					if (mkdir(filename, pg_dir_create_mode) != 0)
 					{
-						/*
-						 * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
-						 * clusters) will have been created by the wal
-						 * receiver process. Also, when the WAL directory
-						 * location was specified, pg_wal (or pg_xlog) has
-						 * already been created as a symbolic link before
-						 * starting the actual backup. So just ignore creation
-						 * failures on related directories.
-						 */
-						if (!((pg_str_endswith(filename, "/pg_wal") ||
-							   pg_str_endswith(filename, "/pg_xlog") ||
-							   pg_str_endswith(filename, "/archive_status")) &&
-							  errno == EEXIST))
+						if (errno != EEXIST)
 						{
 							pg_log_error("could not create directory \"%s\": %m",
 										 filename);
@@ -1585,7 +1746,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 				 */
 				fclose(file);
 				file = NULL;
-				totaldone += r;
+				readBytes += r;
 				continue;
 			}
 
@@ -1594,7 +1755,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 				pg_log_error("could not write to file \"%s\": %m", filename);
 				exit(1);
 			}
-			totaldone += r;
+			readBytes += r;
+			totaldone = readBytes;
 			progress_report(rownum, filename, false);
 
 			current_len_left -= r;
@@ -1622,13 +1784,11 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 	if (copybuf != NULL)
 		PQfreemem(copybuf);
 
-	if (basetablespace && writerecoveryconf)
-		WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
-
 	/*
 	 * No data is synced here, everything is done for all tablespaces at the
 	 * end.
 	 */
+	return readBytes;
 }
 
 
@@ -1716,7 +1876,8 @@ BaseBackup(void)
 	}
 
 	basebkp =
-		psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+		psprintf("%s LABEL '%s' %s %s %s %s %s %s %s",
+				 (numWorkers > 1) ? "START_BACKUP" : "BASE_BACKUP",
 				 escaped_label,
 				 showprogress ? "PROGRESS" : "",
 				 includewal == FETCH_WAL ? "WAL" : "",
@@ -1774,7 +1935,7 @@ BaseBackup(void)
 	/*
 	 * Get the header
 	 */
-	res = PQgetResult(conn);
+	tablespacehdr = res = PQgetResult(conn);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		pg_log_error("could not get backup header: %s",
@@ -1830,24 +1991,74 @@ BaseBackup(void)
 		StartLogStreamer(xlogstart, starttli, sysidentifier);
 	}
 
-	/*
-	 * Start receiving chunks
-	 */
-	for (i = 0; i < PQntuples(res); i++)
+	if (numWorkers > 1)
 	{
-		if (format == 't')
-			ReceiveTarFile(conn, res, i);
-		else
-			ReceiveAndUnpackTarFile(conn, res, i);
-	}							/* Loop over all tablespaces */
+		int j = 0,
+			k = 0;
 
-	if (showprogress)
+		backupInfo = palloc0(sizeof(BackupInfo));
+		backupInfo->workerConns = (PGconn **) palloc0(sizeof(PGconn *) * numWorkers);
+		backupInfo->tablespacecount = tablespacecount;
+		backupInfo->numWorkers = numWorkers;
+		strlcpy(backupInfo->xlogstart, xlogstart, sizeof(backupInfo->xlogstart));
+
+		read_label_tblspcmap(conn, &backupInfo->backup_label, &backupInfo->tablespace_map);
+
+		/* retrieve backup file list from the server. **/
+		GetBackupFileList(conn, backupInfo);
+
+		/*
+		 * add backup_label in backup, (for tar format, ReceiveTarFile() will
+		 * take care of it).
+		 */
+		if (format == 'p')
+			writefile("backup_label", backupInfo->backup_label);
+
+		/*
+		 * Flatten the file list to avoid unnecessary locks and enable the sequential
+		 * access to file list. (Creating an array of BackupFile structre pointers).
+		 */
+		backupInfo->files =
+			(BackupFile **) palloc0(sizeof(BackupFile *) * backupInfo->totalfiles);
+		for (i = 0; i < backupInfo->tablespacecount; i++)
+		{
+			TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i];
+
+			for (j = 0; j < curTsInfo->numFiles; j++)
+			{
+				backupInfo->files[k] = &curTsInfo->backupFiles[j];
+				k++;
+			}
+		}
+
+		ParallelBackupRun(backupInfo);
+		StopBackup(backupInfo);
+	}
+	else
 	{
-		progress_report(PQntuples(res), NULL, true);
-		if (isatty(fileno(stderr)))
-			fprintf(stderr, "\n");	/* Need to move to next line */
+		/*
+		 * Start receiving chunks
+		 */
+		for (i = 0; i < PQntuples(res); i++)
+		{
+			if (format == 't')
+				ReceiveTarFile(conn, res, i);
+			else
+				ReceiveAndUnpackTarFile(conn, res, i);
+		}						/* Loop over all tablespaces */
+
+		if (showprogress)
+		{
+			progress_report(PQntuples(tablespacehdr), NULL, true);
+			if (isatty(fileno(stderr)))
+				fprintf(stderr, "\n");	/* Need to move to next line */
+		}
 	}
 
+	/* Write recovery contents */
+	if (format == 'p' && writerecoveryconf)
+		WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
+
 	PQclear(res);
 
 	/*
@@ -2043,6 +2254,7 @@ main(int argc, char **argv)
 		{"waldir", required_argument, NULL, 1},
 		{"no-slot", no_argument, NULL, 2},
 		{"no-verify-checksums", no_argument, NULL, 3},
+		{"jobs", required_argument, NULL, 'j'},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -2070,7 +2282,7 @@ main(int argc, char **argv)
 
 	atexit(cleanup_directories_atexit);
 
-	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -2211,6 +2423,9 @@ main(int argc, char **argv)
 			case 3:
 				verify_checksums = false;
 				break;
+			case 'j':			/* number of jobs */
+				numWorkers = atoi(optarg);
+				break;
 			default:
 
 				/*
@@ -2325,6 +2540,22 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (numWorkers <= 0)
+	{
+		pg_log_error("invalid number of parallel jobs");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (format != 'p' && numWorkers > 1)
+	{
+		pg_log_error("parallel jobs are only supported with 'plain' format");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel != 0)
 	{
@@ -2397,3 +2628,406 @@ main(int argc, char **argv)
 	success = true;
 	return 0;
 }
+
+/*
+ * Thread worker
+ */
+static void *
+workerRun(void *arg)
+{
+	WorkerState *wstate = (WorkerState *) arg;
+
+	GetBackupFile(wstate);
+
+	wstate->terminated = true;
+	return NULL;
+}
+
+/*
+ * Runs the worker threads and updates progress until all workers have
+ * terminated/completed.
+ */
+static void
+ParallelBackupRun(BackupInfo *backupInfo)
+{
+	int		status,
+			i;
+	bool	threadsActive = true;
+	uint64	totalBytes = 0;
+
+	workers = (WorkerState *) palloc0(sizeof(WorkerState) * numWorkers);
+
+	for (i = 0; i < numWorkers; i++)
+	{
+		WorkerState *worker = &workers[i];
+
+		worker->backupInfo = backupInfo;
+		worker->workerid = i;
+		worker->bytesRead = 0;
+		worker->terminated = false;
+
+		backupInfo->workerConns[i] = GetConnection();
+		status = pthread_create(&worker->worker, NULL, workerRun, worker);
+		if (status != 0)
+		{
+			pg_log_error("failed to create thread: %m");
+			exit(1);
+		}
+
+		if (verbose)
+			pg_log_info("backup worker (%d) created, %d", i, status);
+	}
+
+	/*
+	 * This is the main thread for updating progrsss. It waits for workers to
+	 * complete and gets updated status during every loop iteration.
+	 */
+	while(threadsActive)
+	{
+		char *filename = NULL;
+
+		threadsActive = false;
+		totalBytes = 0;
+
+		for (i = 0; i < numWorkers; i++)
+		{
+			WorkerState *worker = &workers[i];
+
+			totalBytes += worker->bytesRead;
+			threadsActive |= !worker->terminated;
+		}
+
+		if (backupInfo->fileIndex < backupInfo->totalfiles)
+			filename = backupInfo->files[backupInfo->fileIndex]->path;
+
+		workers_progress_report(totalBytes, filename, false);
+		pg_usleep(100000);
+	}
+
+	if (showprogress)
+	{
+		workers_progress_report(totalBytes, NULL, true);
+		if (isatty(fileno(stderr)))
+			fprintf(stderr, "\n");	/* Need to move to next line */
+	}
+}
+
+/*
+ * Take the system out of backup mode.
+ */
+static void
+StopBackup(BackupInfo *backupInfo)
+{
+	PGresult   *res = NULL;
+	char	   *basebkp;
+
+	basebkp = psprintf("STOP_BACKUP LABEL '%s' %s %s",
+					   backupInfo->backup_label,
+					   includewal == FETCH_WAL ? "WAL" : "",
+					   includewal == NO_WAL ? "" : "NOWAIT");
+	if (PQsendQuery(conn, basebkp) == 0)
+	{
+		pg_log_error("could not execute STOP BACKUP \"%s\"",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+
+	/* receive pg_control and wal files */
+	ReceiveAndUnpackTarFile(conn, res, tablespacecount);
+	PQclear(res);
+}
+
+/*
+ * Retrive backup file list from the server and populate TablespaceInfo struct
+ * to keep track of tablespaces and its files.
+ */
+static void
+GetBackupFileList(PGconn *conn, BackupInfo *backupInfo)
+{
+	int			i;
+	PGresult   *res = NULL;
+	char	   *basebkp;
+
+	backupInfo->tsInfo = palloc0(sizeof(TablespaceInfo) * backupInfo->tablespacecount);
+	TablespaceInfo *tsInfo = backupInfo->tsInfo;
+
+	/*
+	 * Get list of files.
+	 */
+	basebkp = psprintf("SEND_BACKUP_FILELIST %s",
+					   format == 't' ? "TABLESPACE_MAP" : "");
+	if (PQsendQuery(conn, basebkp) == 0)
+	{
+		pg_log_error("could not send replication command \"%s\": %s",
+					 "SEND_BACKUP_FILELIST", PQerrorMessage(conn));
+		exit(1);
+	}
+
+	/*
+	 * The list of files is grouped by tablespaces, and we want to fetch them
+	 * in the same order.
+	 */
+	for (i = 0; i < backupInfo->tablespacecount; i++)
+	{
+		bool		basetablespace;
+
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not get backup header: %s",
+						 PQerrorMessage(conn));
+			exit(1);
+		}
+		if (PQntuples(res) < 1)
+		{
+			pg_log_error("no data returned from server");
+			exit(1);
+		}
+
+		basetablespace = PQgetisnull(tablespacehdr, i, 0);
+		tsInfo[i].tblspcOid = atol(PQgetvalue(tablespacehdr, i, 0));
+		tsInfo[i].tablespace = PQgetvalue(tablespacehdr, i, 1);
+		tsInfo[i].numFiles = PQntuples(res);
+		tsInfo[i].backupFiles = palloc0(sizeof(BackupFile) * tsInfo[i].numFiles);
+
+		/* keep count of all files in backup */
+		backupInfo->totalfiles += tsInfo[i].numFiles;
+
+		for (int j = 0; j < tsInfo[i].numFiles; j++)
+		{
+			char	   *path = PQgetvalue(res, j, 0);
+			char		type = PQgetvalue(res, j, 1)[0];
+			int32		size = atol(PQgetvalue(res, j, 2));
+			time_t		mtime = atol(PQgetvalue(res, j, 3));
+
+			/*
+			 * In 'plain' format, create backup directories first.
+			 */
+			if (format == 'p' && type == 'd')
+				create_backup_dirs(basetablespace, tsInfo[i].tablespace, path);
+
+			strlcpy(tsInfo[i].backupFiles[j].path, path, MAXPGPATH);
+			tsInfo[i].backupFiles[j].type = type;
+			tsInfo[i].backupFiles[j].size = size;
+			tsInfo[i].backupFiles[j].mtime = mtime;
+			tsInfo[i].backupFiles[j].tsIndex = i;
+		}
+
+		/* sort files in descending order, based on size */
+		qsort(tsInfo[i].backupFiles, tsInfo[i].numFiles,
+			  sizeof(BackupFile), &compareFileSize);
+		PQclear(res);
+	}
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not get data: %s", PQerrorMessage(conn));
+		exit(1);
+	}
+	res = PQgetResult(conn);
+}
+
+/*
+ * Retrive and write backup file from the server. The file list is provided by
+ * worker state. It pulls a single file from this list and writes it to the
+ * backup directory.
+ */
+static int
+GetBackupFile(WorkerState *wstate)
+{
+	PGresult   *res = NULL;
+	PGconn	   *worker_conn = NULL;
+	BackupFile *fetchFile = NULL;
+	BackupInfo *backupInfo = NULL;
+
+	backupInfo = wstate->backupInfo;
+	worker_conn = backupInfo->workerConns[wstate->workerid];
+	while ((fetchFile = getNextFile(backupInfo)) != NULL)
+	{
+		PQExpBuffer buf = createPQExpBuffer();
+		TablespaceInfo *curTsInfo = &backupInfo->tsInfo[fetchFile->tsIndex];
+
+
+		/*
+		 * build query in form of: SEND_BACKUP_FILES ('base/1/1245/32683',
+		 * 'base/1/1245/32683', ...) [options]
+		 */
+		appendPQExpBuffer(buf, "SEND_BACKUP_FILES ( '%s' )", fetchFile->path);
+
+		/* add options */
+		appendPQExpBuffer(buf, " TABLESPACE_PATH '%s' START_WAL_LOCATION '%s' %s %s",
+						  curTsInfo->tablespace,
+						  backupInfo->xlogstart,
+						  format == 't' ? "TABLESPACE_MAP" : "",
+						  verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+		if (maxrate > 0)
+			appendPQExpBuffer(buf, " MAX_RATE %u", maxrate);
+
+		if (!worker_conn)
+			return 1;
+
+		if (PQsendQuery(worker_conn, buf->data) == 0)
+		{
+			pg_log_error("could not send files list \"%s\"",
+						 PQerrorMessage(worker_conn));
+			return 1;
+		}
+
+		destroyPQExpBuffer(buf);
+
+		/* process file contents, also count bytesRead for progress */
+		wstate->bytesRead +=
+			ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, fetchFile->tsIndex);
+
+		res = PQgetResult(worker_conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not get data stream: %s",
+						 PQerrorMessage(worker_conn));
+			exit(1);
+		}
+
+		res = PQgetResult(worker_conn);
+	}
+
+	PQclear(res);
+	return 0;
+}
+
+/*
+ * Increment fileIndex and store it in a local variable so that even a
+ * context switch does not affect the file index value and we don't accidentally
+ * increment the value twice and therefore skip some files.
+ */
+static BackupFile*
+getNextFile(BackupInfo *backupInfo)
+{
+	int fileIndex = 0;
+
+	pthread_mutex_lock(&fetch_mutex);
+	fileIndex = backupInfo->fileIndex++;
+	pthread_mutex_unlock(&fetch_mutex);
+
+	if (fileIndex >= backupInfo->totalfiles)
+		return NULL;
+
+	return backupInfo->files[fileIndex];
+}
+
+/* qsort comparator for BackupFile (sort descending order)  */
+static int
+compareFileSize(const void *a, const void *b)
+{
+	const		BackupFile *v1 = (BackupFile *) a;
+	const		BackupFile *v2 = (BackupFile *) b;
+
+	if (v1->size > v2->size)
+		return -1;
+	if (v1->size < v2->size)
+		return 1;
+
+	return 0;
+}
+
+static void
+read_label_tblspcmap(PGconn *conn, char **backuplabel, char **tblspc_map)
+{
+	PGresult   *res = NULL;
+
+	Assert(backuplabel != NULL);
+	Assert(tblspc_map != NULL);
+
+	/*
+	 * Get Backup label and tablespace map data.
+	 */
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not get data: %s",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+	if (PQntuples(res) < 1)
+	{
+		pg_log_error("no data returned from server");
+		exit(1);
+	}
+
+	*backuplabel = PQgetvalue(res, 0, 0);	/* backup_label */
+	if (!PQgetisnull(res, 0, 1))
+		*tblspc_map = PQgetvalue(res, 0, 1);	/* tablespace_map */
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not get data: %s",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+
+	res = PQgetResult(conn);
+	PQclear(res);
+}
+
+/*
+ * Create backup direcotries while taking care of tablespace path. If tablespace
+ * mapping (with -T) is given then the directory will be created on the mapped
+ * path.
+ */
+static void
+create_backup_dirs(bool basetablespace, char *tablespace, char *name)
+{
+	char	dirpath[MAXPGPATH];
+
+	Assert(name != NULL);
+
+	if (basetablespace)
+		snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name);
+	else
+	{
+		Assert(tablespace != NULL);
+		snprintf(dirpath, sizeof(dirpath), "%s/%s",
+				 get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1));
+	}
+
+	if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0)
+	{
+		if (errno != EEXIST)
+		{
+			pg_log_error("could not create directory \"%s\": %m",
+						 dirpath);
+			exit(1);
+		}
+	}
+}
+
+/*
+ * General function for writing to a file; creates one if it doesn't exist
+ */
+static void
+writefile(char *path, char *buf)
+{
+	FILE   *f;
+	char	pathbuf[MAXPGPATH];
+
+	snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path);
+	f = fopen(pathbuf, "w");
+	if (f == NULL)
+	{
+		pg_log_error("could not open file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+
+	if (fwrite(buf, strlen(buf), 1, f) != 1)
+	{
+		pg_log_error("could not write to file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+
+	if (fclose(f))
+	{
+		pg_log_error("could not write to file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+}
-- 
2.21.0 (Apple Git-122)