0005-Parallel-Backup-pg_basebackup_v7.patch

application/octet-stream

Filename: 0005-Parallel-Backup-pg_basebackup_v7.patch
Type: application/octet-stream
Part: 4
Message: Re: WIP/PoC for parallel backup

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v7-0005
Subject: Parallel Backup - pg_basebackup
File+
src/bin/pg_basebackup/pg_basebackup.c 735 31
From bcfe471e924a88cffda223577335ca0c878a3c9c Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Mon, 14 Oct 2019 17:28:58 +0500
Subject: [PATCH 5/7] Parallel Backup - pg_basebackup

Implements the replication commands added in the backend replication
system and adds support for --jobs=NUM in pg_basebackup to take a full
backup in parallel using multiple connections. The utility will collect
a list of files from the server first and then workers will copy files
(one by one) over COPY protocol.
---
 src/bin/pg_basebackup/pg_basebackup.c | 766 ++++++++++++++++++++++++--
 1 file changed, 735 insertions(+), 31 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 16886fbe71..c8a36a0c12 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -13,6 +13,7 @@
 
 #include "postgres_fe.h"
 
+#include <pthread.h>
 #include <unistd.h>
 #include <dirent.h>
 #include <sys/stat.h>
@@ -85,12 +86,65 @@ typedef struct UnpackTarState
 	const char *mapped_tblspc_path;
 	pgoff_t		current_len_left;
 	int			current_padding;
+	size_t		current_bytes_read;
 	FILE	   *file;
 } UnpackTarState;
 
 typedef void (*WriteDataCallback) (size_t nbytes, char *buf,
 								   void *callback_data);
 
+typedef struct
+{
+	char		path[MAXPGPATH];
+	char		type;
+	int32		size;
+	time_t		mtime;
+
+	int			tsIndex;		/* index of tsInfo this file belongs to. */
+} BackupFile;
+
+typedef struct
+{
+	Oid			tblspcOid;
+	char	   *tablespace;		/* tablespace name or NULL if 'base'
+								 * tablespace */
+	int			numFiles;		/* number of files */
+	BackupFile *backupFiles;	/* list of files in a tablespace */
+} TablespaceInfo;
+
+typedef struct
+{
+	int			tablespacecount;
+	int			totalfiles;
+	int			numWorkers;
+
+	char		xlogstart[64];
+	char	   *backup_label;
+	char	   *tablespace_map;
+
+	TablespaceInfo *tsInfo;
+	BackupFile **files;			/* list of BackupFile pointers */
+	int			fileIndex;		/* index of file to be fetched */
+
+	PGconn	  **workerConns;
+} BackupInfo;
+
+typedef struct
+{
+	BackupInfo *backupInfo;
+	uint64		bytesRead;
+
+	int			workerid;
+	pthread_t	worker;
+
+	bool		terminated;
+} WorkerState;
+
+BackupInfo *backupInfo = NULL;
+WorkerState *workers = NULL;
+
+static pthread_mutex_t fetch_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 /*
  * pg_xlog has been renamed to pg_wal in version 10.  This version number
  * should be compared with PQserverVersion().
@@ -144,6 +198,9 @@ static bool found_existing_xlogdir = false;
 static bool made_tablespace_dirs = false;
 static bool found_tablespace_dirs = false;
 
+static int	numWorkers = 1;
+static PGresult *tablespacehdr;
+
 /* Progress counters */
 static uint64 totalsize_kb;
 static uint64 totaldone;
@@ -174,10 +231,11 @@ static PQExpBuffer recoveryconfcontents = NULL;
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
 static void progress_report(int tablespacenum, const char *filename, bool force);
+static void workers_progress_report(uint64 totalBytesRead, const char *filename, bool force);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
-static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
+static int	ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf,
 										 void *callback_data);
 static void BaseBackup(void);
@@ -188,6 +246,18 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 static const char *get_tablespace_mapping(const char *dir);
 static void tablespace_list_append(const char *arg);
 
+static void ParallelBackupRun(BackupInfo *backupInfo);
+static void StopBackup(BackupInfo *backupInfo);
+static void GetBackupFileList(PGconn *conn, BackupInfo *backupInfo);
+static int	GetBackupFile(WorkerState *wstate);
+static BackupFile *getNextFile(BackupInfo *backupInfo);
+static int	compareFileSize(const void *a, const void *b);
+static void read_label_tblspcmap(PGconn *conn, char **backup_label, char **tablespace_map);
+static void create_backup_dirs(bool basetablespace, char *tablespace, char *name);
+static void create_tblspc_symlink(char *filename);
+static void writefile(char *path, char *buf);
+static void *workerRun(void *arg);
+
 
 static void
 cleanup_directories_atexit(void)
@@ -239,6 +309,18 @@ cleanup_directories_atexit(void)
 static void
 disconnect_atexit(void)
 {
+	/* close worker connections */
+	if (backupInfo && backupInfo->workerConns != NULL)
+	{
+		int			i;
+
+		for (i = 0; i < numWorkers; i++)
+		{
+			if (backupInfo->workerConns[i] != NULL)
+				PQfinish(backupInfo->workerConns[i]);
+		}
+	}
+
 	if (conn != NULL)
 		PQfinish(conn);
 }
@@ -386,6 +468,7 @@ usage(void)
 	printf(_("      --no-slot          prevent creation of temporary replication slot\n"));
 	printf(_("      --no-verify-checksums\n"
 			 "                         do not verify checksums\n"));
+	printf(_("  -j, --jobs=NUM         use this many parallel jobs to backup\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -732,6 +815,94 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found)
 	}
 }
 
+/*
+ * Print a progress report of worker threads. If verbose output
+ * is enabled, also print the current file name.
+ *
+ * Progress report is written at maximum once per second, unless the
+ * force parameter is set to true.
+ */
+static void
+workers_progress_report(uint64 totalBytesRead, const char *filename, bool force)
+{
+	int			percent;
+	char		totalBytesRead_str[32];
+	char		totalsize_str[32];
+	pg_time_t	now;
+
+	if (!showprogress)
+		return;
+
+	now = time(NULL);
+	if (now == last_progress_report && !force)
+		return;					/* Max once per second */
+
+	last_progress_report = now;
+	percent = totalsize_kb ? (int) ((totalBytesRead / 1024) * 100 / totalsize_kb) : 0;
+
+	/*
+	 * Avoid overflowing past 100% or the full size. This may make the total
+	 * size number change as we approach the end of the backup (the estimate
+	 * will always be wrong if WAL is included), but that's better than having
+	 * the done column be bigger than the total.
+	 */
+	if (percent > 100)
+		percent = 100;
+	if (totalBytesRead / 1024 > totalsize_kb)
+		totalsize_kb = totalBytesRead / 1024;
+
+	/*
+	 * Separate step to keep platform-dependent format code out of
+	 * translatable strings.  And we only test for INT64_FORMAT availability
+	 * in snprintf, not fprintf.
+	 */
+	snprintf(totalBytesRead_str, sizeof(totalBytesRead_str), INT64_FORMAT,
+			 totalBytesRead / 1024);
+	snprintf(totalsize_str, sizeof(totalsize_str), INT64_FORMAT, totalsize_kb);
+
+#define VERBOSE_FILENAME_LENGTH 35
+
+	if (verbose)
+	{
+		if (!filename)
+
+			/*
+			 * No filename given, so clear the status line (used for last
+			 * call)
+			 */
+			fprintf(stderr, _("%*s/%s kB (%d%%) copied %*s"),
+					(int) strlen(totalsize_str),
+					totalBytesRead_str, totalsize_str,
+					percent,
+					VERBOSE_FILENAME_LENGTH + 5, "");
+		else
+		{
+			bool		truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH);
+
+			fprintf(stderr, _("%*s/%s kB (%d%%) copied, current file (%s%-*.*s)"),
+					(int) strlen(totalsize_str), totalBytesRead_str, totalsize_str,
+					percent,
+			/* Prefix with "..." if we do leading truncation */
+					truncate ? "..." : "",
+					truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+					truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+			/* Truncate filename at beginning if it's too long */
+					truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename);
+		}
+	}
+	else
+	{
+		fprintf(stderr, _("%*s/%s kB (%d%%) copied"),
+				(int) strlen(totalsize_str),
+				totalBytesRead_str, totalsize_str,
+				percent);
+	}
+
+	if (isatty(fileno(stderr)))
+		fprintf(stderr, "\r");
+	else
+		fprintf(stderr, "\n");
+}
 
 /*
  * Print a progress report based on the global variables. If verbose output
@@ -748,7 +919,7 @@ progress_report(int tablespacenum, const char *filename, bool force)
 	char		totalsize_str[32];
 	pg_time_t	now;
 
-	if (!showprogress)
+	if (!showprogress || numWorkers > 1)
 		return;
 
 	now = time(NULL);
@@ -1409,8 +1580,10 @@ get_tablespace_mapping(const char *dir)
 	canonicalize_path(canon_dir);
 
 	for (cell = tablespace_dirs.head; cell; cell = cell->next)
+	{
 		if (strcmp(canon_dir, cell->old_dir) == 0)
 			return cell->new_dir;
+	}
 
 	return dir;
 }
@@ -1425,7 +1598,7 @@ get_tablespace_mapping(const char *dir)
  * specified directory. If it's for another tablespace, it will be restored
  * in the original or mapped directory.
  */
-static void
+static int
 ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 {
 	UnpackTarState state;
@@ -1456,13 +1629,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 		exit(1);
 	}
 
-	if (basetablespace && writerecoveryconf)
-		WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
-
 	/*
 	 * No data is synced here, everything is done for all tablespaces at the
 	 * end.
 	 */
+
+	return state.current_bytes_read;
 }
 
 static void
@@ -1485,6 +1657,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
 			exit(1);
 		}
 		totaldone += 512;
+		state->current_bytes_read += 512;
 
 		state->current_len_left = read_tar_number(&copybuf[124], 12);
 
@@ -1616,6 +1789,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
 			fclose(state->file);
 			state->file = NULL;
 			totaldone += r;
+			state->current_bytes_read += r;
 			return;
 		}
 
@@ -1625,6 +1799,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
 			exit(1);
 		}
 		totaldone += r;
+		state->current_bytes_read += r;
 		progress_report(state->tablespacenum, state->filename, false);
 
 		state->current_len_left -= r;
@@ -1724,16 +1899,28 @@ BaseBackup(void)
 			fprintf(stderr, "\n");
 	}
 
-	basebkp =
-		psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
-				 escaped_label,
-				 showprogress ? "PROGRESS" : "",
-				 includewal == FETCH_WAL ? "WAL" : "",
-				 fastcheckpoint ? "FAST" : "",
-				 includewal == NO_WAL ? "" : "NOWAIT",
-				 maxrate_clause ? maxrate_clause : "",
-				 format == 't' ? "TABLESPACE_MAP" : "",
-				 verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+	if (numWorkers <= 1)
+	{
+		basebkp =
+			psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+					 escaped_label,
+					 showprogress ? "PROGRESS" : "",
+					 includewal == FETCH_WAL ? "WAL" : "",
+					 fastcheckpoint ? "FAST" : "",
+					 includewal == NO_WAL ? "" : "NOWAIT",
+					 maxrate_clause ? maxrate_clause : "",
+					 format == 't' ? "TABLESPACE_MAP" : "",
+					 verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+	}
+	else
+	{
+		basebkp =
+			psprintf("START_BACKUP LABEL '%s' %s %s %s",
+					 escaped_label,
+					 showprogress ? "PROGRESS" : "",
+					 fastcheckpoint ? "FAST" : "",
+					 format == 't' ? "TABLESPACE_MAP" : "");
+	}
 
 	if (PQsendQuery(conn, basebkp) == 0)
 	{
@@ -1783,7 +1970,7 @@ BaseBackup(void)
 	/*
 	 * Get the header
 	 */
-	res = PQgetResult(conn);
+	tablespacehdr = res = PQgetResult(conn);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		pg_log_error("could not get backup header: %s",
@@ -1839,24 +2026,75 @@ BaseBackup(void)
 		StartLogStreamer(xlogstart, starttli, sysidentifier);
 	}
 
-	/*
-	 * Start receiving chunks
-	 */
-	for (i = 0; i < PQntuples(res); i++)
+	if (numWorkers > 1)
 	{
-		if (format == 't')
-			ReceiveTarFile(conn, res, i);
-		else
-			ReceiveAndUnpackTarFile(conn, res, i);
-	}							/* Loop over all tablespaces */
+		int			j = 0,
+					k = 0;
 
-	if (showprogress)
+		backupInfo = palloc0(sizeof(BackupInfo));
+		backupInfo->workerConns = (PGconn **) palloc0(sizeof(PGconn *) * numWorkers);
+		backupInfo->tablespacecount = tablespacecount;
+		backupInfo->numWorkers = numWorkers;
+		strlcpy(backupInfo->xlogstart, xlogstart, sizeof(backupInfo->xlogstart));
+
+		read_label_tblspcmap(conn, &backupInfo->backup_label, &backupInfo->tablespace_map);
+
+		/* retrieve backup file list from the server. */
+		GetBackupFileList(conn, backupInfo);
+
+		/*
+		 * add backup_label in backup, (for tar format, ReceiveTarFile() will
+		 * take care of it).
+		 */
+		if (format == 'p')
+			writefile("backup_label", backupInfo->backup_label);
+
+		/*
+		 * Flatten the file list to avoid unnecessary locks and enable the
+		 * sequential access to file list. (Creating an array of BackupFile
+		 * structre pointers).
+		 */
+		backupInfo->files =
+			(BackupFile **) palloc0(sizeof(BackupFile *) * backupInfo->totalfiles);
+		for (i = 0; i < backupInfo->tablespacecount; i++)
+		{
+			TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i];
+
+			for (j = 0; j < curTsInfo->numFiles; j++)
+			{
+				backupInfo->files[k] = &curTsInfo->backupFiles[j];
+				k++;
+			}
+		}
+
+		ParallelBackupRun(backupInfo);
+		StopBackup(backupInfo);
+	}
+	else
 	{
-		progress_report(PQntuples(res), NULL, true);
-		if (isatty(fileno(stderr)))
-			fprintf(stderr, "\n");	/* Need to move to next line */
+		/*
+		 * Start receiving chunks
+		 */
+		for (i = 0; i < PQntuples(res); i++)
+		{
+			if (format == 't')
+				ReceiveTarFile(conn, res, i);
+			else
+				ReceiveAndUnpackTarFile(conn, res, i);
+		}						/* Loop over all tablespaces */
+
+		if (showprogress)
+		{
+			progress_report(PQntuples(tablespacehdr), NULL, true);
+			if (isatty(fileno(stderr)))
+				fprintf(stderr, "\n");	/* Need to move to next line */
+		}
 	}
 
+	/* Write recovery contents */
+	if (format == 'p' && writerecoveryconf)
+		WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
+
 	PQclear(res);
 
 	/*
@@ -2052,6 +2290,7 @@ main(int argc, char **argv)
 		{"waldir", required_argument, NULL, 1},
 		{"no-slot", no_argument, NULL, 2},
 		{"no-verify-checksums", no_argument, NULL, 3},
+		{"jobs", required_argument, NULL, 'j'},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -2079,7 +2318,7 @@ main(int argc, char **argv)
 
 	atexit(cleanup_directories_atexit);
 
-	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -2220,6 +2459,9 @@ main(int argc, char **argv)
 			case 3:
 				verify_checksums = false;
 				break;
+			case 'j':			/* number of jobs */
+				numWorkers = atoi(optarg);
+				break;
 			default:
 
 				/*
@@ -2334,6 +2576,22 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (numWorkers <= 0)
+	{
+		pg_log_error("invalid number of parallel jobs");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (format != 'p' && numWorkers > 1)
+	{
+		pg_log_error("parallel jobs are only supported with 'plain' format");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel != 0)
 	{
@@ -2406,3 +2664,449 @@ main(int argc, char **argv)
 	success = true;
 	return 0;
 }
+
+/*
+ * Thread worker
+ */
+static void *
+workerRun(void *arg)
+{
+	WorkerState *wstate = (WorkerState *) arg;
+
+	GetBackupFile(wstate);
+
+	wstate->terminated = true;
+	return NULL;
+}
+
+/*
+ * Runs the worker threads and updates progress until all workers have
+ * terminated/completed.
+ */
+static void
+ParallelBackupRun(BackupInfo *backupInfo)
+{
+	int			status,
+				i;
+	bool		threadsActive = true;
+	uint64		totalBytes = 0;
+
+	workers = (WorkerState *) palloc0(sizeof(WorkerState) * numWorkers);
+
+	for (i = 0; i < numWorkers; i++)
+	{
+		WorkerState *worker = &workers[i];
+
+		worker->backupInfo = backupInfo;
+		worker->workerid = i;
+		worker->bytesRead = 0;
+		worker->terminated = false;
+
+		backupInfo->workerConns[i] = GetConnection();
+		status = pthread_create(&worker->worker, NULL, workerRun, worker);
+		if (status != 0)
+		{
+			pg_log_error("failed to create thread: %m");
+			exit(1);
+		}
+
+		if (verbose)
+			pg_log_info("backup worker (%d) created, %d", i, status);
+	}
+
+	/*
+	 * This is the main thread for updating progress. It waits for workers to
+	 * complete and gets updated status during every loop iteration.
+	 */
+	while (threadsActive)
+	{
+		char	   *filename = NULL;
+
+		threadsActive = false;
+		totalBytes = 0;
+
+		for (i = 0; i < numWorkers; i++)
+		{
+			WorkerState *worker = &workers[i];
+
+			totalBytes += worker->bytesRead;
+			threadsActive |= !worker->terminated;
+		}
+
+		if (backupInfo->fileIndex < backupInfo->totalfiles)
+			filename = backupInfo->files[backupInfo->fileIndex]->path;
+
+		workers_progress_report(totalBytes, filename, false);
+		pg_usleep(100000);
+	}
+
+	if (showprogress)
+	{
+		workers_progress_report(totalBytes, NULL, true);
+		if (isatty(fileno(stderr)))
+			fprintf(stderr, "\n");	/* Need to move to next line */
+	}
+}
+
+/*
+ * Take the system out of backup mode.
+ */
+static void
+StopBackup(BackupInfo *backupInfo)
+{
+	PGresult   *res = NULL;
+	char	   *basebkp;
+
+	basebkp = psprintf("STOP_BACKUP LABEL '%s' %s %s",
+					   backupInfo->backup_label,
+					   includewal == FETCH_WAL ? "WAL" : "",
+					   includewal == NO_WAL ? "" : "NOWAIT");
+	if (PQsendQuery(conn, basebkp) == 0)
+	{
+		pg_log_error("could not execute STOP BACKUP \"%s\"",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+
+	/* receive pg_control and wal files */
+	ReceiveAndUnpackTarFile(conn, res, tablespacecount);
+	PQclear(res);
+}
+
+/*
+ * Retrieve backup file list from the server and populate TablespaceInfo struct
+ * to keep track of tablespaces and its files.
+ */
+static void
+GetBackupFileList(PGconn *conn, BackupInfo *backupInfo)
+{
+	TablespaceInfo *tsInfo;
+	PGresult   *res = NULL;
+	char	   *basebkp;
+	int			i;
+
+	backupInfo->tsInfo = palloc0(sizeof(TablespaceInfo) * backupInfo->tablespacecount);
+	tsInfo = backupInfo->tsInfo;
+
+	/*
+	 * Get list of files.
+	 */
+	basebkp = psprintf("SEND_FILE_LIST");
+	if (PQsendQuery(conn, basebkp) == 0)
+	{
+		pg_log_error("could not send replication command \"%s\": %s",
+					 "SEND_FILE_LIST", PQerrorMessage(conn));
+		exit(1);
+	}
+
+	/*
+	 * The list of files is grouped by tablespaces, and we want to fetch them
+	 * in the same order.
+	 */
+	for (i = 0; i < backupInfo->tablespacecount; i++)
+	{
+		bool		basetablespace;
+
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not get backup header: %s",
+						 PQerrorMessage(conn));
+			exit(1);
+		}
+		if (PQntuples(res) < 1)
+		{
+			pg_log_error("no data returned from server");
+			exit(1);
+		}
+
+		basetablespace = PQgetisnull(tablespacehdr, i, 0);
+		tsInfo[i].tblspcOid = atol(PQgetvalue(tablespacehdr, i, 0));
+		tsInfo[i].tablespace = PQgetvalue(tablespacehdr, i, 1);
+		tsInfo[i].numFiles = PQntuples(res);
+		tsInfo[i].backupFiles = palloc0(sizeof(BackupFile) * tsInfo[i].numFiles);
+
+		/* keep count of all files in backup */
+		backupInfo->totalfiles += tsInfo[i].numFiles;
+
+		for (int j = 0; j < tsInfo[i].numFiles; j++)
+		{
+			char	   *path = PQgetvalue(res, j, 0);
+			char		type = PQgetvalue(res, j, 1)[0];
+			int32		size = atol(PQgetvalue(res, j, 2));
+			time_t		mtime = atol(PQgetvalue(res, j, 3));
+
+			/*
+			 * In 'plain' format, create backup directories first.
+			 */
+			if (format == 'p' && type == 'd')
+			{
+				create_backup_dirs(basetablespace, tsInfo[i].tablespace, path);
+				continue;
+			}
+
+			if (format == 'p' && type == 'l')
+			{
+				create_tblspc_symlink(path);
+				continue;
+			}
+
+			strlcpy(tsInfo[i].backupFiles[j].path, path, MAXPGPATH);
+			tsInfo[i].backupFiles[j].type = type;
+			tsInfo[i].backupFiles[j].size = size;
+			tsInfo[i].backupFiles[j].mtime = mtime;
+			tsInfo[i].backupFiles[j].tsIndex = i;
+		}
+
+		/* sort files in descending order, based on size */
+		qsort(tsInfo[i].backupFiles, tsInfo[i].numFiles,
+			  sizeof(BackupFile), &compareFileSize);
+		PQclear(res);
+	}
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not get data: %s", PQerrorMessage(conn));
+		exit(1);
+	}
+	res = PQgetResult(conn);
+}
+
+/*
+ * Retrieve and write backup file from the server. The file list is provided by
+ * worker state. It pulls a single file from this list and writes it to the
+ * backup directory.
+ */
+static int
+GetBackupFile(WorkerState *wstate)
+{
+	PGresult   *res = NULL;
+	PGconn	   *worker_conn = NULL;
+	BackupFile *fetchFile = NULL;
+	BackupInfo *backupInfo = NULL;
+
+	backupInfo = wstate->backupInfo;
+	worker_conn = backupInfo->workerConns[wstate->workerid];
+	while ((fetchFile = getNextFile(backupInfo)) != NULL)
+	{
+		PQExpBuffer buf = createPQExpBuffer();
+
+		/*
+		 * Fetch a single file from the server. To fetch the file, build a
+		 * query in form of:
+		 *
+		 * SEND_FILES ('base/1/1245/32683') [options]
+		 */
+		appendPQExpBuffer(buf, "SEND_FILES ( '%s' )", fetchFile->path);
+
+		/* add options */
+		appendPQExpBuffer(buf, " START_WAL_LOCATION '%s' %s %s",
+						  backupInfo->xlogstart,
+						  format == 't' ? "TABLESPACE_MAP" : "",
+						  verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+		if (maxrate > 0)
+			appendPQExpBuffer(buf, " MAX_RATE %u", maxrate);
+
+		if (!worker_conn)
+			return 1;
+
+		if (PQsendQuery(worker_conn, buf->data) == 0)
+		{
+			pg_log_error("could not send files list \"%s\"",
+						 PQerrorMessage(worker_conn));
+			return 1;
+		}
+
+		destroyPQExpBuffer(buf);
+
+		/* process file contents, also count bytesRead for progress */
+		wstate->bytesRead +=
+			ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, fetchFile->tsIndex);
+
+		res = PQgetResult(worker_conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not get data stream: %s",
+						 PQerrorMessage(worker_conn));
+			exit(1);
+		}
+
+		res = PQgetResult(worker_conn);
+	}
+
+	PQclear(res);
+	return 0;
+}
+
+/*
+ * Increment fileIndex and store it in a local variable so that even a
+ * context switch does not affect the file index value and we don't accidentally
+ * increment the value twice and therefore skip some files.
+ */
+static BackupFile *
+getNextFile(BackupInfo *backupInfo)
+{
+	int			fileIndex = 0;
+
+	pthread_mutex_lock(&fetch_mutex);
+	fileIndex = backupInfo->fileIndex++;
+	pthread_mutex_unlock(&fetch_mutex);
+
+	if (fileIndex >= backupInfo->totalfiles)
+		return NULL;
+
+	return backupInfo->files[fileIndex];
+}
+
+/* qsort comparator for BackupFile (sort descending order)  */
+static int
+compareFileSize(const void *a, const void *b)
+{
+	const BackupFile *v1 = (BackupFile *) a;
+	const BackupFile *v2 = (BackupFile *) b;
+
+	if (v1->size > v2->size)
+		return -1;
+	if (v1->size < v2->size)
+		return 1;
+
+	return 0;
+}
+
+static void
+read_label_tblspcmap(PGconn *conn, char **backuplabel, char **tblspc_map)
+{
+	PGresult   *res = NULL;
+
+	Assert(backuplabel != NULL);
+	Assert(tblspc_map != NULL);
+
+	/*
+	 * Get Backup label and tablespace map data.
+	 */
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not get data: %s",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+	if (PQntuples(res) < 1)
+	{
+		pg_log_error("no data returned from server");
+		exit(1);
+	}
+
+	*backuplabel = PQgetvalue(res, 0, 0);	/* backup_label */
+	if (!PQgetisnull(res, 0, 1))
+		*tblspc_map = PQgetvalue(res, 0, 1);	/* tablespace_map */
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not get data: %s",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+
+	res = PQgetResult(conn);
+	PQclear(res);
+}
+
+/*
+ * Create backup directories while taking care of tablespace path. If tablespace
+ * mapping (with -T) is given then the directory will be created on the mapped
+ * path.
+ */
+static void
+create_backup_dirs(bool basetablespace, char *tablespace, char *name)
+{
+	char		dirpath[MAXPGPATH];
+
+	Assert(name != NULL);
+
+	if (basetablespace)
+		snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name);
+	else
+	{
+		Assert(tablespace != NULL);
+		snprintf(dirpath, sizeof(dirpath), "%s/%s",
+				 get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1));
+	}
+
+	if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0)
+	{
+		if (errno != EEXIST)
+		{
+			pg_log_error("could not create directory \"%s\": %m",
+						 dirpath);
+			exit(1);
+		}
+	}
+}
+
+/*
+ * Create a symlink in pg_tblspc and apply any tablespace mapping given on
+ * the command line (--tablespace-mapping).
+ */
+static void
+create_tblspc_symlink(char *filename)
+{
+	int			i;
+
+	for (i = 0; i < tablespacecount; i++)
+	{
+		char	   *tsoid = PQgetvalue(tablespacehdr, i, 0);
+
+		if (strstr(filename, tsoid) != NULL)
+		{
+			char	   *linkloc = psprintf("%s/%s", basedir, filename);
+			const char *mapped_tblspc_path = get_tablespace_mapping(PQgetvalue(tablespacehdr, i, 1));
+
+#ifdef HAVE_SYMLINK
+			if (symlink(mapped_tblspc_path, linkloc) != 0)
+			{
+				pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
+							 linkloc, mapped_tblspc_path);
+				exit(1);
+			}
+#else
+			pg_log_error("symlinks are not supported on this platform");
+			exit(1);
+#endif
+			free(linkloc);
+			break;
+		}
+	}
+}
+
+/*
+ * General function for writing to a file; creates one if it doesn't exist
+ */
+static void
+writefile(char *path, char *buf)
+{
+	FILE	   *f;
+	char		pathbuf[MAXPGPATH];
+
+	snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path);
+	f = fopen(pathbuf, "w");
+	if (f == NULL)
+	{
+		pg_log_error("could not open file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+
+	if (fwrite(buf, strlen(buf), 1, f) != 1)
+	{
+		pg_log_error("could not write to file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+
+	if (fclose(f))
+	{
+		pg_log_error("could not write to file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+}
-- 
2.21.0 (Apple Git-122.2)