0005-pg_basebackup-changes-for-parallel-backup_v6.patch
application/octet-stream
Filename: 0005-pg_basebackup-changes-for-parallel-backup_v6.patch
Type: application/octet-stream
Part: 2
Message:
Re: WIP/PoC for parallel backup
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v6-0005
Subject: pg_basebackup changes for parallel backup.
| File | + | − |
|---|---|---|
| src/bin/pg_basebackup/pg_basebackup.c | 689 | 47 |
From d4663820abbec2944e5c65500e574691a251e170 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Mon, 14 Oct 2019 17:28:58 +0500
Subject: [PATCH 5/7] pg_basebackup changes for parallel backup.
---
src/bin/pg_basebackup/pg_basebackup.c | 736 ++++++++++++++++++++++++--
1 file changed, 689 insertions(+), 47 deletions(-)
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index a9d162a7da..f63c106130 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -19,6 +19,7 @@
#include <sys/wait.h>
#include <signal.h>
#include <time.h>
+#include <pthread.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
@@ -41,6 +42,7 @@
#include "receivelog.h"
#include "replication/basebackup.h"
#include "streamutil.h"
+#include "fe_utils/simple_list.h"
#define ERRCODE_DATA_CORRUPTED "XX001"
@@ -57,6 +59,57 @@ typedef struct TablespaceList
TablespaceListCell *tail;
} TablespaceList;
+typedef struct
+{
+ char path[MAXPGPATH];
+ char type;
+ int32 size;
+ time_t mtime;
+
+ int tsIndex; /* index of tsInfo this file belongs to. */
+} BackupFile;
+
+typedef struct
+{
+ Oid tblspcOid;
+ char *tablespace; /* tablespace name or NULL if 'base' tablespace */
+ int numFiles; /* number of files */
+ BackupFile *backupFiles; /* list of files in a tablespace */
+} TablespaceInfo;
+
+typedef struct
+{
+ int tablespacecount;
+ int totalfiles;
+ int numWorkers;
+
+ char xlogstart[64];
+ char *backup_label;
+ char *tablespace_map;
+
+ TablespaceInfo *tsInfo;
+ BackupFile **files; /* list of BackupFile pointers */
+ int fileIndex; /* index of file to be fetched */
+
+ PGconn **workerConns;
+} BackupInfo;
+
+typedef struct
+{
+ BackupInfo *backupInfo;
+ uint64 bytesRead;
+
+ int workerid;
+ pthread_t worker;
+
+ bool terminated;
+} WorkerState;
+
+BackupInfo *backupInfo = NULL;
+WorkerState *workers = NULL;
+
+static pthread_mutex_t fetch_mutex = PTHREAD_MUTEX_INITIALIZER;
+
/*
* pg_xlog has been renamed to pg_wal in version 10. This version number
* should be compared with PQserverVersion().
@@ -110,6 +163,9 @@ static bool found_existing_xlogdir = false;
static bool made_tablespace_dirs = false;
static bool found_tablespace_dirs = false;
+static int numWorkers = 1;
+static PGresult *tablespacehdr;
+
/* Progress counters */
static uint64 totalsize_kb;
static uint64 totaldone;
@@ -140,9 +196,10 @@ static PQExpBuffer recoveryconfcontents = NULL;
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
static void progress_report(int tablespacenum, const char *filename, bool force);
+static void workers_progress_report(uint64 totalBytesRead, const char *filename, bool force);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
-static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
+static int ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
@@ -151,6 +208,17 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
static const char *get_tablespace_mapping(const char *dir);
static void tablespace_list_append(const char *arg);
+static void ParallelBackupRun(BackupInfo *backupInfo);
+static void StopBackup(BackupInfo *backupInfo);
+static void GetBackupFileList(PGconn *conn, BackupInfo *backupInfo);
+static int GetBackupFile(WorkerState *wstate);
+static BackupFile *getNextFile(BackupInfo *backupInfo);
+static int compareFileSize(const void *a, const void *b);
+static void read_label_tblspcmap(PGconn *conn, char **backup_label, char **tablespace_map);
+static void create_backup_dirs(bool basetablespace, char *tablespace, char *name);
+static void writefile(char *path, char *buf);
+static void *workerRun(void *arg);
+
static void
cleanup_directories_atexit(void)
@@ -202,6 +270,17 @@ cleanup_directories_atexit(void)
static void
disconnect_atexit(void)
{
+ /* close worker connections */
+ if (backupInfo && backupInfo->workerConns != NULL)
+ {
+ int i;
+ for (i = 0; i < numWorkers; i++)
+ {
+ if (backupInfo->workerConns[i] != NULL)
+ PQfinish(backupInfo->workerConns[i]);
+ }
+ }
+
if (conn != NULL)
PQfinish(conn);
}
@@ -349,6 +428,7 @@ usage(void)
printf(_(" --no-slot prevent creation of temporary replication slot\n"));
printf(_(" --no-verify-checksums\n"
" do not verify checksums\n"));
+ printf(_(" -j, --jobs=NUM use this many parallel jobs to backup\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -d, --dbname=CONNSTR connection string\n"));
@@ -695,6 +775,93 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found)
}
}
+/*
+ * Print a progress report of worker threads. If verbose output
+ * is enabled, also print the current file name.
+ *
+ * Progress report is written at maximum once per second, unless the
+ * force parameter is set to true.
+ */
+static void
+workers_progress_report(uint64 totalBytesRead, const char *filename, bool force)
+{
+ int percent;
+ char totalBytesRead_str[32];
+ char totalsize_str[32];
+ pg_time_t now;
+
+ if (!showprogress)
+ return;
+
+ now = time(NULL);
+ if (now == last_progress_report && !force)
+ return; /* Max once per second */
+
+ last_progress_report = now;
+ percent = totalsize_kb ? (int) ((totalBytesRead / 1024) * 100 / totalsize_kb) : 0;
+
+ /*
+ * Avoid overflowing past 100% or the full size. This may make the total
+ * size number change as we approach the end of the backup (the estimate
+ * will always be wrong if WAL is included), but that's better than having
+ * the done column be bigger than the total.
+ */
+ if (percent > 100)
+ percent = 100;
+ if (totalBytesRead / 1024 > totalsize_kb)
+ totalsize_kb = totalBytesRead / 1024;
+
+ /*
+ * Separate step to keep platform-dependent format code out of
+ * translatable strings. And we only test for INT64_FORMAT availability
+ * in snprintf, not fprintf.
+ */
+ snprintf(totalBytesRead_str, sizeof(totalBytesRead_str), INT64_FORMAT,
+ totalBytesRead / 1024);
+ snprintf(totalsize_str, sizeof(totalsize_str), INT64_FORMAT, totalsize_kb);
+
+#define VERBOSE_FILENAME_LENGTH 35
+
+ if (verbose)
+ {
+ if (!filename)
+
+ /*
+ * No filename given, so clear the status line (used for last
+ * call)
+ */
+ fprintf(stderr, _("%*s/%s kB (%d%%) copied %*s"),
+ (int) strlen(totalsize_str),
+ totalBytesRead_str, totalsize_str,
+ percent,
+ VERBOSE_FILENAME_LENGTH + 5, "");
+ else
+ {
+ bool truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH);
+ fprintf(stderr, _("%*s/%s kB (%d%%) copied, current file (%s%-*.*s)"),
+ (int) strlen(totalsize_str), totalBytesRead_str, totalsize_str,
+ percent,
+ /* Prefix with "..." if we do leading truncation */
+ truncate ? "..." : "",
+ truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+ truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+ /* Truncate filename at beginning if it's too long */
+ truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename);
+ }
+ }
+ else
+ {
+ fprintf(stderr, _("%*s/%s kB (%d%%) copied"),
+ (int) strlen(totalsize_str),
+ totalBytesRead_str, totalsize_str,
+ percent);
+ }
+
+ if (isatty(fileno(stderr)))
+ fprintf(stderr, "\r");
+ else
+ fprintf(stderr, "\n");
+}
/*
* Print a progress report based on the global variables. If verbose output
@@ -711,7 +878,7 @@ progress_report(int tablespacenum, const char *filename, bool force)
char totalsize_str[32];
pg_time_t now;
- if (!showprogress)
+ if (!showprogress || numWorkers > 1)
return;
now = time(NULL);
@@ -1381,7 +1548,7 @@ get_tablespace_mapping(const char *dir)
* specified directory. If it's for another tablespace, it will be restored
* in the original or mapped directory.
*/
-static void
+static int
ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
{
char current_path[MAXPGPATH];
@@ -1392,6 +1559,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
bool basetablespace;
char *copybuf = NULL;
FILE *file = NULL;
+ int readBytes = 0;
basetablespace = PQgetisnull(res, rownum, 0);
if (basetablespace)
@@ -1455,7 +1623,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
pg_log_error("invalid tar block header size: %d", r);
exit(1);
}
- totaldone += 512;
+ readBytes += 512;
current_len_left = read_tar_number(©buf[124], 12);
@@ -1486,21 +1654,14 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
* Directory
*/
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
+
+ /*
+ * In parallel mode, we create directories before fetching
+ * files so its Ok if a directory already exist.
+ */
if (mkdir(filename, pg_dir_create_mode) != 0)
{
- /*
- * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
- * clusters) will have been created by the wal
- * receiver process. Also, when the WAL directory
- * location was specified, pg_wal (or pg_xlog) has
- * already been created as a symbolic link before
- * starting the actual backup. So just ignore creation
- * failures on related directories.
- */
- if (!((pg_str_endswith(filename, "/pg_wal") ||
- pg_str_endswith(filename, "/pg_xlog") ||
- pg_str_endswith(filename, "/archive_status")) &&
- errno == EEXIST))
+ if (errno != EEXIST)
{
pg_log_error("could not create directory \"%s\": %m",
filename);
@@ -1585,7 +1746,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
*/
fclose(file);
file = NULL;
- totaldone += r;
+ readBytes += r;
continue;
}
@@ -1594,7 +1755,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
pg_log_error("could not write to file \"%s\": %m", filename);
exit(1);
}
- totaldone += r;
+ readBytes += r;
+ totaldone = readBytes;
progress_report(rownum, filename, false);
current_len_left -= r;
@@ -1622,13 +1784,11 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
if (copybuf != NULL)
PQfreemem(copybuf);
- if (basetablespace && writerecoveryconf)
- WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
-
/*
* No data is synced here, everything is done for all tablespaces at the
* end.
*/
+ return readBytes;
}
@@ -1715,16 +1875,28 @@ BaseBackup(void)
fprintf(stderr, "\n");
}
- basebkp =
- psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
- escaped_label,
- showprogress ? "PROGRESS" : "",
- includewal == FETCH_WAL ? "WAL" : "",
- fastcheckpoint ? "FAST" : "",
- includewal == NO_WAL ? "" : "NOWAIT",
- maxrate_clause ? maxrate_clause : "",
- format == 't' ? "TABLESPACE_MAP" : "",
- verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ if (numWorkers <= 1)
+ {
+ basebkp =
+ psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+ escaped_label,
+ showprogress ? "PROGRESS" : "",
+ includewal == FETCH_WAL ? "WAL" : "",
+ fastcheckpoint ? "FAST" : "",
+ includewal == NO_WAL ? "" : "NOWAIT",
+ maxrate_clause ? maxrate_clause : "",
+ format == 't' ? "TABLESPACE_MAP" : "",
+ verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ }
+ else
+ {
+ basebkp =
+ psprintf("START_BACKUP LABEL '%s' %s %s %s",
+ escaped_label,
+ showprogress ? "PROGRESS" : "",
+ fastcheckpoint ? "FAST" : "",
+ format == 't' ? "TABLESPACE_MAP" : "");
+ }
if (PQsendQuery(conn, basebkp) == 0)
{
@@ -1774,7 +1946,7 @@ BaseBackup(void)
/*
* Get the header
*/
- res = PQgetResult(conn);
+ tablespacehdr = res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
pg_log_error("could not get backup header: %s",
@@ -1830,24 +2002,74 @@ BaseBackup(void)
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
- /*
- * Start receiving chunks
- */
- for (i = 0; i < PQntuples(res); i++)
+ if (numWorkers > 1)
{
- if (format == 't')
- ReceiveTarFile(conn, res, i);
- else
- ReceiveAndUnpackTarFile(conn, res, i);
- } /* Loop over all tablespaces */
+ int j = 0,
+ k = 0;
- if (showprogress)
+ backupInfo = palloc0(sizeof(BackupInfo));
+ backupInfo->workerConns = (PGconn **) palloc0(sizeof(PGconn *) * numWorkers);
+ backupInfo->tablespacecount = tablespacecount;
+ backupInfo->numWorkers = numWorkers;
+ strlcpy(backupInfo->xlogstart, xlogstart, sizeof(backupInfo->xlogstart));
+
+ read_label_tblspcmap(conn, &backupInfo->backup_label, &backupInfo->tablespace_map);
+
+ /* retrieve backup file list from the server. **/
+ GetBackupFileList(conn, backupInfo);
+
+ /*
+ * add backup_label in backup, (for tar format, ReceiveTarFile() will
+ * take care of it).
+ */
+ if (format == 'p')
+ writefile("backup_label", backupInfo->backup_label);
+
+ /*
+ * Flatten the file list to avoid unnecessary locks and enable the sequential
+ * access to file list. (Creating an array of BackupFile structre pointers).
+ */
+ backupInfo->files =
+ (BackupFile **) palloc0(sizeof(BackupFile *) * backupInfo->totalfiles);
+ for (i = 0; i < backupInfo->tablespacecount; i++)
+ {
+ TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i];
+
+ for (j = 0; j < curTsInfo->numFiles; j++)
+ {
+ backupInfo->files[k] = &curTsInfo->backupFiles[j];
+ k++;
+ }
+ }
+
+ ParallelBackupRun(backupInfo);
+ StopBackup(backupInfo);
+ }
+ else
{
- progress_report(PQntuples(res), NULL, true);
- if (isatty(fileno(stderr)))
- fprintf(stderr, "\n"); /* Need to move to next line */
+ /*
+ * Start receiving chunks
+ */
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ if (format == 't')
+ ReceiveTarFile(conn, res, i);
+ else
+ ReceiveAndUnpackTarFile(conn, res, i);
+ } /* Loop over all tablespaces */
+
+ if (showprogress)
+ {
+ progress_report(PQntuples(tablespacehdr), NULL, true);
+ if (isatty(fileno(stderr)))
+ fprintf(stderr, "\n"); /* Need to move to next line */
+ }
}
+ /* Write recovery contents */
+ if (format == 'p' && writerecoveryconf)
+ WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
+
PQclear(res);
/*
@@ -2043,6 +2265,7 @@ main(int argc, char **argv)
{"waldir", required_argument, NULL, 1},
{"no-slot", no_argument, NULL, 2},
{"no-verify-checksums", no_argument, NULL, 3},
+ {"jobs", required_argument, NULL, 'j'},
{NULL, 0, NULL, 0}
};
int c;
@@ -2070,7 +2293,7 @@ main(int argc, char **argv)
atexit(cleanup_directories_atexit);
- while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+ while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -2211,6 +2434,9 @@ main(int argc, char **argv)
case 3:
verify_checksums = false;
break;
+ case 'j': /* number of jobs */
+ numWorkers = atoi(optarg);
+ break;
default:
/*
@@ -2325,6 +2551,22 @@ main(int argc, char **argv)
}
}
+ if (numWorkers <= 0)
+ {
+ pg_log_error("invalid number of parallel jobs");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ if (format != 'p' && numWorkers > 1)
+ {
+ pg_log_error("parallel jobs are only supported with 'plain' format");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef HAVE_LIBZ
if (compresslevel != 0)
{
@@ -2397,3 +2639,403 @@ main(int argc, char **argv)
success = true;
return 0;
}
+
+/*
+ * Thread worker
+ */
+static void *
+workerRun(void *arg)
+{
+ WorkerState *wstate = (WorkerState *) arg;
+
+ GetBackupFile(wstate);
+
+ wstate->terminated = true;
+ return NULL;
+}
+
+/*
+ * Runs the worker threads and updates progress until all workers have
+ * terminated/completed.
+ */
+static void
+ParallelBackupRun(BackupInfo *backupInfo)
+{
+ int status,
+ i;
+ bool threadsActive = true;
+ uint64 totalBytes = 0;
+
+ workers = (WorkerState *) palloc0(sizeof(WorkerState) * numWorkers);
+
+ for (i = 0; i < numWorkers; i++)
+ {
+ WorkerState *worker = &workers[i];
+
+ worker->backupInfo = backupInfo;
+ worker->workerid = i;
+ worker->bytesRead = 0;
+ worker->terminated = false;
+
+ backupInfo->workerConns[i] = GetConnection();
+ status = pthread_create(&worker->worker, NULL, workerRun, worker);
+ if (status != 0)
+ {
+ pg_log_error("failed to create thread: %m");
+ exit(1);
+ }
+
+ if (verbose)
+ pg_log_info("backup worker (%d) created, %d", i, status);
+ }
+
+ /*
+ * This is the main thread for updating progrsss. It waits for workers to
+ * complete and gets updated status during every loop iteration.
+ */
+ while(threadsActive)
+ {
+ char *filename = NULL;
+
+ threadsActive = false;
+ totalBytes = 0;
+
+ for (i = 0; i < numWorkers; i++)
+ {
+ WorkerState *worker = &workers[i];
+
+ totalBytes += worker->bytesRead;
+ threadsActive |= !worker->terminated;
+ }
+
+ if (backupInfo->fileIndex < backupInfo->totalfiles)
+ filename = backupInfo->files[backupInfo->fileIndex]->path;
+
+ workers_progress_report(totalBytes, filename, false);
+ pg_usleep(100000);
+ }
+
+ if (showprogress)
+ {
+ workers_progress_report(totalBytes, NULL, true);
+ if (isatty(fileno(stderr)))
+ fprintf(stderr, "\n"); /* Need to move to next line */
+ }
+}
+
+/*
+ * Take the system out of backup mode.
+ */
+static void
+StopBackup(BackupInfo *backupInfo)
+{
+ PGresult *res = NULL;
+ char *basebkp;
+
+ basebkp = psprintf("STOP_BACKUP LABEL '%s' %s %s",
+ backupInfo->backup_label,
+ includewal == FETCH_WAL ? "WAL" : "",
+ includewal == NO_WAL ? "" : "NOWAIT");
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not execute STOP BACKUP \"%s\"",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ /* receive pg_control and wal files */
+ ReceiveAndUnpackTarFile(conn, res, tablespacecount);
+ PQclear(res);
+}
+
+/*
+ * Retrive backup file list from the server and populate TablespaceInfo struct
+ * to keep track of tablespaces and its files.
+ */
+static void
+GetBackupFileList(PGconn *conn, BackupInfo *backupInfo)
+{
+ TablespaceInfo *tsInfo;
+ PGresult *res = NULL;
+ char *basebkp;
+ int i;
+
+ backupInfo->tsInfo = palloc0(sizeof(TablespaceInfo) * backupInfo->tablespacecount);
+ tsInfo = backupInfo->tsInfo;
+
+ /*
+ * Get list of files.
+ */
+ basebkp = psprintf("SEND_BACKUP_FILELIST");
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not send replication command \"%s\": %s",
+ "SEND_BACKUP_FILELIST", PQerrorMessage(conn));
+ exit(1);
+ }
+
+ /*
+ * The list of files is grouped by tablespaces, and we want to fetch them
+ * in the same order.
+ */
+ for (i = 0; i < backupInfo->tablespacecount; i++)
+ {
+ bool basetablespace;
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not get backup header: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ if (PQntuples(res) < 1)
+ {
+ pg_log_error("no data returned from server");
+ exit(1);
+ }
+
+ basetablespace = PQgetisnull(tablespacehdr, i, 0);
+ tsInfo[i].tblspcOid = atol(PQgetvalue(tablespacehdr, i, 0));
+ tsInfo[i].tablespace = PQgetvalue(tablespacehdr, i, 1);
+ tsInfo[i].numFiles = PQntuples(res);
+ tsInfo[i].backupFiles = palloc0(sizeof(BackupFile) * tsInfo[i].numFiles);
+
+ /* keep count of all files in backup */
+ backupInfo->totalfiles += tsInfo[i].numFiles;
+
+ for (int j = 0; j < tsInfo[i].numFiles; j++)
+ {
+ char *path = PQgetvalue(res, j, 0);
+ char type = PQgetvalue(res, j, 1)[0];
+ int32 size = atol(PQgetvalue(res, j, 2));
+ time_t mtime = atol(PQgetvalue(res, j, 3));
+
+ /*
+ * In 'plain' format, create backup directories first.
+ */
+ if (format == 'p' && type == 'd')
+ create_backup_dirs(basetablespace, tsInfo[i].tablespace, path);
+
+ strlcpy(tsInfo[i].backupFiles[j].path, path, MAXPGPATH);
+ tsInfo[i].backupFiles[j].type = type;
+ tsInfo[i].backupFiles[j].size = size;
+ tsInfo[i].backupFiles[j].mtime = mtime;
+ tsInfo[i].backupFiles[j].tsIndex = i;
+ }
+
+ /* sort files in descending order, based on size */
+ qsort(tsInfo[i].backupFiles, tsInfo[i].numFiles,
+ sizeof(BackupFile), &compareFileSize);
+ PQclear(res);
+ }
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data: %s", PQerrorMessage(conn));
+ exit(1);
+ }
+ res = PQgetResult(conn);
+}
+
+/*
+ * Retrive and write backup file from the server. The file list is provided by
+ * worker state. It pulls a single file from this list and writes it to the
+ * backup directory.
+ */
+static int
+GetBackupFile(WorkerState *wstate)
+{
+ PGresult *res = NULL;
+ PGconn *worker_conn = NULL;
+ BackupFile *fetchFile = NULL;
+ BackupInfo *backupInfo = NULL;
+
+ backupInfo = wstate->backupInfo;
+ worker_conn = backupInfo->workerConns[wstate->workerid];
+ while ((fetchFile = getNextFile(backupInfo)) != NULL)
+ {
+ PQExpBuffer buf = createPQExpBuffer();
+
+ /*
+ * build query in form of: SEND_BACKUP_FILES ('base/1/1245/32683',
+ * 'base/1/1245/32683', ...) [options]
+ */
+ appendPQExpBuffer(buf, "SEND_BACKUP_FILES ( '%s' )", fetchFile->path);
+
+ /* add options */
+ appendPQExpBuffer(buf, " START_WAL_LOCATION '%s' %s %s",
+ backupInfo->xlogstart,
+ format == 't' ? "TABLESPACE_MAP" : "",
+ verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ if (maxrate > 0)
+ appendPQExpBuffer(buf, " MAX_RATE %u", maxrate);
+
+ if (!worker_conn)
+ return 1;
+
+ if (PQsendQuery(worker_conn, buf->data) == 0)
+ {
+ pg_log_error("could not send files list \"%s\"",
+ PQerrorMessage(worker_conn));
+ return 1;
+ }
+
+ destroyPQExpBuffer(buf);
+
+ /* process file contents, also count bytesRead for progress */
+ wstate->bytesRead +=
+ ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, fetchFile->tsIndex);
+
+ res = PQgetResult(worker_conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data stream: %s",
+ PQerrorMessage(worker_conn));
+ exit(1);
+ }
+
+ res = PQgetResult(worker_conn);
+ }
+
+ PQclear(res);
+ return 0;
+}
+
+/*
+ * Increment fileIndex and store it in a local variable so that even a
+ * context switch does not affect the file index value and we don't accidentally
+ * increment the value twice and therefore skip some files.
+ */
+static BackupFile*
+getNextFile(BackupInfo *backupInfo)
+{
+ int fileIndex = 0;
+
+ pthread_mutex_lock(&fetch_mutex);
+ fileIndex = backupInfo->fileIndex++;
+ pthread_mutex_unlock(&fetch_mutex);
+
+ if (fileIndex >= backupInfo->totalfiles)
+ return NULL;
+
+ return backupInfo->files[fileIndex];
+}
+
+/* qsort comparator for BackupFile (sort descending order) */
+static int
+compareFileSize(const void *a, const void *b)
+{
+ const BackupFile *v1 = (BackupFile *) a;
+ const BackupFile *v2 = (BackupFile *) b;
+
+ if (v1->size > v2->size)
+ return -1;
+ if (v1->size < v2->size)
+ return 1;
+
+ return 0;
+}
+
+static void
+read_label_tblspcmap(PGconn *conn, char **backuplabel, char **tblspc_map)
+{
+ PGresult *res = NULL;
+
+ Assert(backuplabel != NULL);
+ Assert(tblspc_map != NULL);
+
+ /*
+ * Get Backup label and tablespace map data.
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not get data: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ if (PQntuples(res) < 1)
+ {
+ pg_log_error("no data returned from server");
+ exit(1);
+ }
+
+ *backuplabel = PQgetvalue(res, 0, 0); /* backup_label */
+ if (!PQgetisnull(res, 0, 1))
+ *tblspc_map = PQgetvalue(res, 0, 1); /* tablespace_map */
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ res = PQgetResult(conn);
+ PQclear(res);
+}
+
+/*
+ * Create backup direcotries while taking care of tablespace path. If tablespace
+ * mapping (with -T) is given then the directory will be created on the mapped
+ * path.
+ */
+static void
+create_backup_dirs(bool basetablespace, char *tablespace, char *name)
+{
+ char dirpath[MAXPGPATH];
+
+ Assert(name != NULL);
+
+ if (basetablespace)
+ snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name);
+ else
+ {
+ Assert(tablespace != NULL);
+ snprintf(dirpath, sizeof(dirpath), "%s/%s",
+ get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1));
+ }
+
+ if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0)
+ {
+ if (errno != EEXIST)
+ {
+ pg_log_error("could not create directory \"%s\": %m",
+ dirpath);
+ exit(1);
+ }
+ }
+}
+
+/*
+ * General function for writing to a file; creates one if it doesn't exist
+ */
+static void
+writefile(char *path, char *buf)
+{
+ FILE *f;
+ char pathbuf[MAXPGPATH];
+
+ snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path);
+ f = fopen(pathbuf, "w");
+ if (f == NULL)
+ {
+ pg_log_error("could not open file \"%s\": %m", pathbuf);
+ exit(1);
+ }
+
+ if (fwrite(buf, strlen(buf), 1, f) != 1)
+ {
+ pg_log_error("could not write to file \"%s\": %m", pathbuf);
+ exit(1);
+ }
+
+ if (fclose(f))
+ {
+ pg_log_error("could not write to file \"%s\": %m", pathbuf);
+ exit(1);
+ }
+}
--
2.21.0 (Apple Git-122.2)