0004-Parallel-Backup-pg_basebackup_v9.patch
application/octet-stream
Filename: 0004-Parallel-Backup-pg_basebackup_v9.patch
Type: application/octet-stream
Part: 1
Message:
Re: WIP/PoC for parallel backup
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v9-0004
Subject: Parallel Backup - pg_basebackup
| File | + | − |
|---|---|---|
| src/bin/pg_basebackup/pg_basebackup.c | 1015 | 65 |
From 945cd4b33f3b98bddf849fcca3c2a091248f0142 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Mon, 27 Jan 2020 18:56:21 +0500
Subject: [PATCH 4/6] Parallel Backup - pg_basebackup
Implements the replication commands added in the backend replication
system and adds support for --jobs=NUM in pg_basebackup to take a full
backup in parallel using multiple connections. The utility will collect
a list of files from the server first and then workers will copy files
(one by one) over COPY protocol. The WAL files are also copied in similar
manner.
---
src/bin/pg_basebackup/pg_basebackup.c | 1080 +++++++++++++++++++++++--
1 file changed, 1015 insertions(+), 65 deletions(-)
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 48bd838803b..7e392889809 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -13,6 +13,7 @@
#include "postgres_fe.h"
+#include <pthread.h>
#include <unistd.h>
#include <dirent.h>
#include <sys/stat.h>
@@ -85,12 +86,65 @@ typedef struct UnpackTarState
const char *mapped_tblspc_path;
pgoff_t current_len_left;
int current_padding;
+ size_t current_bytes_read;
FILE *file;
} UnpackTarState;
typedef void (*WriteDataCallback) (size_t nbytes, char *buf,
void *callback_data);
+typedef struct BackupFile
+{
+ char path[MAXPGPATH];
+ char type;
+ int32 size;
+ time_t mtime;
+
+ int tsindex; /* index of tsInfo this file belongs to. */
+ struct BackupFile *next;
+} BackupFile;
+
+typedef enum BackupState
+{
+ PB_FETCH_REL_LIST,
+ PB_FETCH_REL_FILES,
+ PB_FETCH_WAL_LIST,
+ PB_FETCH_WAL_FILES,
+ PB_STOP_BACKUP,
+ PB_BACKUP_COMPLETE
+} BackupState;
+
+typedef struct BackupInfo
+{
+ int totalfiles;
+ uint64 bytes_skipped;
+ char xlogstart[64];
+ char xlogend[64];
+ BackupFile *files; /* list of BackupFile pointers */
+ BackupFile *curr; /* pointer to the file in the list */
+ BackupState backupstate;
+ bool workersdone;
+ int activeworkers;
+} BackupInfo;
+
+typedef struct WorkerState
+{
+ pthread_t worker;
+ int workerid;
+ BackupInfo *backupinfo;
+ PGconn *conn;
+ uint64 bytesread;
+} WorkerState;
+
+BackupInfo *backupinfo = NULL;
+WorkerState *workers = NULL;
+
+/* lock to be used for fetching file from the files list. */
+static pthread_mutex_t fetch_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* condition to be used when the files list is filled. */
+static pthread_cond_t data_ready = PTHREAD_COND_INITIALIZER;
+
/*
* pg_xlog has been renamed to pg_wal in version 10. This version number
* should be compared with PQserverVersion().
@@ -144,6 +198,9 @@ static bool found_existing_xlogdir = false;
static bool made_tablespace_dirs = false;
static bool found_tablespace_dirs = false;
+static int numWorkers = 1;
+static PGresult *tablespacehdr;
+
/* Progress counters */
static uint64 totalsize_kb;
static uint64 totaldone;
@@ -174,10 +231,12 @@ static PQExpBuffer recoveryconfcontents = NULL;
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
static void progress_report(int tablespacenum, const char *filename, bool force);
+static void workers_progress_report(uint64 totalBytesRead,
+ const char *filename, bool force);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
-static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
+static int ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf,
void *callback_data);
static void BaseBackup(void);
@@ -188,6 +247,22 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
static const char *get_tablespace_mapping(const char *dir);
static void tablespace_list_append(const char *arg);
+static void *worker_run(void *arg);
+static void create_parallel_workers(BackupInfo *backupInfo);
+static void parallel_backup_run(BackupInfo *backupInfo);
+static void cleanup_workers(void);
+static void stop_backup(void);
+static void get_backup_filelist(PGconn *conn, BackupInfo *backupInfo);
+static void get_wal_filelist(PGconn *conn, BackupInfo *backupInfo,
+ char *xlogstart, char *xlogend);
+static void free_filelist(BackupInfo *backupInfo);
+static int worker_get_files(WorkerState *wstate);
+static int receive_file(PGconn *conn, char *file, int tsIndex);
+static void create_backup_dirs(bool basetablespace, char *tablespace,
+ char *name);
+static void create_tblspc_symlink(char *filename);
+static void writefile(char *path, char *buf);
+static int fetch_max_wal_senders(PGconn *conn);
static void
cleanup_directories_atexit(void)
@@ -239,6 +314,8 @@ cleanup_directories_atexit(void)
static void
disconnect_atexit(void)
{
+ cleanup_workers();
+
if (conn != NULL)
PQfinish(conn);
}
@@ -386,6 +463,7 @@ usage(void)
printf(_(" --no-slot prevent creation of temporary replication slot\n"));
printf(_(" --no-verify-checksums\n"
" do not verify checksums\n"));
+ printf(_(" -j, --jobs=NUM use this many parallel jobs to backup\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -d, --dbname=CONNSTR connection string\n"));
@@ -733,6 +811,94 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found)
}
}
+/*
+ * Print a progress report of worker threads. If verbose output
+ * is enabled, also print the current file name.
+ *
+ * Progress report is written at maximum once per second, unless the
+ * force parameter is set to true.
+ */
+static void
+workers_progress_report(uint64 totalBytesRead, const char *filename, bool force)
+{
+ int percent;
+ char totalBytesRead_str[32];
+ char totalsize_str[32];
+ pg_time_t now;
+
+ if (!showprogress)
+ return;
+
+ now = time(NULL);
+ if (now == last_progress_report && !force)
+ return; /* Max once per second */
+
+ last_progress_report = now;
+ percent = totalsize_kb ? (int) ((totalBytesRead / 1024) * 100 / totalsize_kb) : 0;
+
+ /*
+ * Avoid overflowing past 100% or the full size. This may make the total
+ * size number change as we approach the end of the backup (the estimate
+ * will always be wrong if WAL is included), but that's better than having
+ * the done column be bigger than the total.
+ */
+ if (percent > 100)
+ percent = 100;
+ if (totalBytesRead / 1024 > totalsize_kb)
+ totalsize_kb = totalBytesRead / 1024;
+
+ /*
+ * Separate step to keep platform-dependent format code out of
+ * translatable strings. And we only test for INT64_FORMAT availability
+ * in snprintf, not fprintf.
+ */
+ snprintf(totalBytesRead_str, sizeof(totalBytesRead_str), INT64_FORMAT,
+ totalBytesRead / 1024);
+ snprintf(totalsize_str, sizeof(totalsize_str), INT64_FORMAT, totalsize_kb);
+
+#define VERBOSE_FILENAME_LENGTH 35
+
+ if (verbose)
+ {
+ if (!filename)
+
+ /*
+ * No filename given, so clear the status line (used for last
+ * call)
+ */
+ fprintf(stderr, _("%*s/%s kB (%d%%) copied %*s"),
+ (int) strlen(totalsize_str),
+ totalBytesRead_str, totalsize_str,
+ percent,
+ VERBOSE_FILENAME_LENGTH + 5, "");
+ else
+ {
+ bool truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH);
+
+ fprintf(stderr, _("%*s/%s kB (%d%%) copied, current file (%s%-*.*s)"),
+ (int) strlen(totalsize_str), totalBytesRead_str, totalsize_str,
+ percent,
+ /* Prefix with "..." if we do leading truncation */
+ truncate ? "..." : "",
+ truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+ truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH,
+ /* Truncate filename at beginning if it's too long */
+ truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename);
+ }
+ }
+ else
+ {
+ fprintf(stderr, _("%*s/%s kB (%d%%) copied"),
+ (int) strlen(totalsize_str),
+ totalBytesRead_str, totalsize_str,
+ percent);
+ }
+
+ if (isatty(fileno(stderr)))
+ fprintf(stderr, "\r");
+ else
+ fprintf(stderr, "\n");
+}
/*
* Print a progress report based on the global variables. If verbose output
@@ -749,7 +915,7 @@ progress_report(int tablespacenum, const char *filename, bool force)
char totalsize_str[32];
pg_time_t now;
- if (!showprogress)
+ if (!showprogress || numWorkers > 1)
return;
now = time(NULL);
@@ -1439,7 +1605,7 @@ get_tablespace_mapping(const char *dir)
* specified directory. If it's for another tablespace, it will be restored
* in the original or mapped directory.
*/
-static void
+static int
ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
{
UnpackTarState state;
@@ -1470,13 +1636,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
exit(1);
}
- if (basetablespace && writerecoveryconf)
- WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
-
/*
* No data is synced here, everything is done for all tablespaces at the
* end.
*/
+
+ return state.current_bytes_read;
}
static void
@@ -1499,6 +1664,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
exit(1);
}
totaldone += 512;
+ state->current_bytes_read += 512;
state->current_len_left = read_tar_number(©buf[124], 12);
@@ -1630,6 +1796,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
fclose(state->file);
state->file = NULL;
totaldone += r;
+ state->current_bytes_read += r;
return;
}
@@ -1639,6 +1806,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
exit(1);
}
totaldone += r;
+ state->current_bytes_read += r;
progress_report(state->tablespacenum, state->filename, false);
state->current_len_left -= r;
@@ -1706,6 +1874,24 @@ BaseBackup(void)
exit(1);
}
+ if (numWorkers > 1)
+ {
+ int max_wal_senders = fetch_max_wal_senders(conn);
+
+ /*
+ * In parallel backup mode, pg_basebackup opens numWorkers + 2
+ * connections. One of the two additional connections is used by the
+ * main application while the other one is used if WAL streaming is
+ * enabled (-X Stream).
+ */
+ if (numWorkers + 2 > max_wal_senders)
+ {
+ pg_log_error("number of requested workers exceeds max_wal_senders (currently %d)",
+ max_wal_senders);
+ exit(1);
+ }
+ }
+
/*
* Build contents of configuration file if requested
*/
@@ -1738,16 +1924,26 @@ BaseBackup(void)
fprintf(stderr, "\n");
}
- basebkp =
- psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
- escaped_label,
- showprogress ? "PROGRESS" : "",
- includewal == FETCH_WAL ? "WAL" : "",
- fastcheckpoint ? "FAST" : "",
- includewal == NO_WAL ? "" : "NOWAIT",
- maxrate_clause ? maxrate_clause : "",
- format == 't' ? "TABLESPACE_MAP" : "",
- verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ if (numWorkers <= 1)
+ {
+ basebkp =
+ psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+ escaped_label,
+ showprogress ? "PROGRESS" : "",
+ includewal == FETCH_WAL ? "WAL" : "",
+ fastcheckpoint ? "FAST" : "",
+ includewal == NO_WAL ? "" : "NOWAIT",
+ maxrate_clause ? maxrate_clause : "",
+ format == 't' ? "TABLESPACE_MAP" : "",
+ verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ }
+ else
+ {
+ basebkp =
+ psprintf("START_BACKUP LABEL '%s' %s",
+ escaped_label,
+ fastcheckpoint ? "FAST" : "");
+ }
if (PQsendQuery(conn, basebkp) == 0)
{
@@ -1794,10 +1990,36 @@ BaseBackup(void)
pg_log_info("write-ahead log start point: %s on timeline %u",
xlogstart, starttli);
+ if (numWorkers > 1)
+ {
+ /*
+ * Finish up the START_BACKUP command execution and make sure we have
+ * CommandComplete.
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data for '%s': %s", "START_BACKUP",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ res = PQgetResult(conn);
+
+ basebkp = psprintf("LIST_TABLESPACES %s",
+ showprogress ? "PROGRESS" : "");
+
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not send replication command \"%s\": %s",
+ "LIST_TABLESPACES", PQerrorMessage(conn));
+ exit(1);
+ }
+ }
+
/*
* Get the header
*/
- res = PQgetResult(conn);
+ tablespacehdr = res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
pg_log_error("could not get backup header: %s",
@@ -1853,65 +2075,98 @@ BaseBackup(void)
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
- /*
- * Start receiving chunks
- */
- for (i = 0; i < PQntuples(res); i++)
- {
- if (format == 't')
- ReceiveTarFile(conn, res, i);
- else
- ReceiveAndUnpackTarFile(conn, res, i);
- } /* Loop over all tablespaces */
-
- if (showprogress)
+ if (numWorkers <= 1)
{
- progress_report(PQntuples(res), NULL, true);
- if (isatty(fileno(stderr)))
- fprintf(stderr, "\n"); /* Need to move to next line */
- }
+ /*
+ * Start receiving chunks
+ */
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ if (format == 't')
+ ReceiveTarFile(conn, res, i);
+ else
+ ReceiveAndUnpackTarFile(conn, res, i);
+ } /* Loop over all tablespaces */
- PQclear(res);
+ if (showprogress)
+ {
+ progress_report(PQntuples(tablespacehdr), NULL, true);
+ if (isatty(fileno(stderr)))
+ fprintf(stderr, "\n"); /* Need to move to next line */
+ }
- /*
- * Get the stop position
- */
- res = PQgetResult(conn);
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- pg_log_error("could not get write-ahead log end position from server: %s",
- PQerrorMessage(conn));
- exit(1);
- }
- if (PQntuples(res) != 1)
- {
- pg_log_error("no write-ahead log end position returned from server");
- exit(1);
- }
- strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));
- if (verbose && includewal != NO_WAL)
- pg_log_info("write-ahead log end point: %s", xlogend);
- PQclear(res);
+ PQclear(res);
- res = PQgetResult(conn);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- {
- const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+ /*
+ * Get the stop position
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not get write-ahead log end position from server: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ if (PQntuples(res) != 1)
+ {
+ pg_log_error("no write-ahead log end position returned from server");
+ exit(1);
+ }
+ strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));
+ if (verbose && includewal != NO_WAL)
+ pg_log_info("write-ahead log end point: %s", xlogend);
+ PQclear(res);
- if (sqlstate &&
- strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0)
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
- pg_log_error("checksum error occurred");
- checksum_failure = true;
+ const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+
+ if (sqlstate &&
+ strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0)
+ {
+ pg_log_error("checksum error occurred");
+ checksum_failure = true;
+ }
+ else
+ {
+ pg_log_error("final receive failed: %s",
+ PQerrorMessage(conn));
+ }
+ exit(1);
}
- else
+ }
+
+ if (numWorkers > 1)
+ {
+ /*
+ * Finish up the LIST_TABLESPACES command execution and make sure we
+ * have CommandComplete.
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
- pg_log_error("final receive failed: %s",
+ pg_log_error("could not get data for '%s': %s", "LIST_TABLESPACES",
PQerrorMessage(conn));
+ exit(1);
}
- exit(1);
+ res = PQgetResult(conn);
+
+ backupinfo = palloc0(sizeof(BackupInfo));
+ backupinfo->backupstate = PB_FETCH_REL_LIST;
+
+ /* copy starting WAL location */
+ strlcpy(backupinfo->xlogstart, xlogstart, sizeof(backupinfo->xlogstart));
+ create_parallel_workers(backupinfo);
+ parallel_backup_run(backupinfo);
+ /* copy ending WAL location */
+ strlcpy(xlogend, backupinfo->xlogend, sizeof(xlogend));
}
+ /* Write recovery contents */
+ if (format == 'p' && writerecoveryconf)
+ WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
+
if (bgchild > 0)
{
#ifndef WIN32
@@ -2066,6 +2321,7 @@ main(int argc, char **argv)
{"waldir", required_argument, NULL, 1},
{"no-slot", no_argument, NULL, 2},
{"no-verify-checksums", no_argument, NULL, 3},
+ {"jobs", required_argument, NULL, 'j'},
{NULL, 0, NULL, 0}
};
int c;
@@ -2093,7 +2349,7 @@ main(int argc, char **argv)
atexit(cleanup_directories_atexit);
- while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+ while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -2234,6 +2490,9 @@ main(int argc, char **argv)
case 3:
verify_checksums = false;
break;
+ case 'j': /* number of jobs */
+ numWorkers = atoi(optarg);
+ break;
default:
/*
@@ -2348,6 +2607,30 @@ main(int argc, char **argv)
}
}
+ if (numWorkers <= 0)
+ {
+ pg_log_error("invalid number of parallel jobs");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ if (format != 'p' && numWorkers > 1)
+ {
+ pg_log_error("parallel jobs are only supported with 'plain' format");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ if (maxrate > 0 && numWorkers > 1)
+ {
+ pg_log_error("--max-rate is not supported with parallel jobs");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef HAVE_LIBZ
if (compresslevel != 0)
{
@@ -2420,3 +2703,670 @@ main(int argc, char **argv)
success = true;
return 0;
}
+
+/*
+ * Worker thread function. Added for code readability.
+ */
+static void *
+worker_run(void *arg)
+{
+ WorkerState *wstate = (WorkerState *) arg;
+
+ worker_get_files(wstate);
+
+ return NULL;
+}
+
+/*
+ * Create workers and initialize worker state.
+ */
+static void
+create_parallel_workers(BackupInfo *backupinfo)
+{
+ int status,
+ i;
+
+ workers = (WorkerState *) palloc(sizeof(WorkerState) * numWorkers);
+ backupinfo->activeworkers = 0;
+
+ for (i = 0; i < numWorkers; i++)
+ {
+ WorkerState *worker = &workers[i];
+
+ worker->backupinfo = backupinfo;
+ worker->bytesread = 0;
+ worker->workerid = i;
+ worker->conn = GetConnection();
+ backupinfo->activeworkers++;
+
+ status = pthread_create(&worker->worker, NULL, worker_run, worker);
+ if (status != 0)
+ {
+ pg_log_error("failed to create thread: %m");
+ exit(1);
+ }
+
+ if (verbose)
+ pg_log_info("backup worker (%d) created, %d", i, status);
+ }
+}
+
+/*
+ * This is the main function that controls the worker, assign tasks and does
+ * cleanup.
+ */
+static void
+parallel_backup_run(BackupInfo *backupinfo)
+{
+ uint64_t totalread = 0;
+
+ while (1)
+ {
+ char *filename = NULL;
+
+ switch (backupinfo->backupstate)
+ {
+ case PB_FETCH_REL_LIST: /* get the list of files to fetch */
+ backupinfo->backupstate = PB_FETCH_REL_FILES;
+ /* retrieve backup file list from the server. */
+ get_backup_filelist(conn, backupinfo);
+ /* unblock any workers waiting on the condition */
+ pthread_cond_broadcast(&data_ready);
+ break;
+ case PB_FETCH_REL_FILES: /* fetch files from server */
+ if (backupinfo->activeworkers == 0)
+ {
+ backupinfo->backupstate = PB_STOP_BACKUP;
+ free_filelist(backupinfo);
+ }
+ break;
+ case PB_FETCH_WAL_LIST: /* get the list of WAL files to fetch */
+ backupinfo->backupstate = PB_FETCH_WAL_FILES;
+ get_wal_filelist(conn, backupinfo, backupinfo->xlogstart, backupinfo->xlogend);
+ /* unblock any workers waiting on the condition */
+ pthread_cond_broadcast(&data_ready);
+ break;
+ case PB_FETCH_WAL_FILES: /* fetch WAL files from server */
+ if (backupinfo->activeworkers == 0)
+ {
+ backupinfo->backupstate = PB_BACKUP_COMPLETE;
+ }
+ break;
+ case PB_STOP_BACKUP:
+
+ /*
+ * All relation files have been fetched, time to stop the
+ * backup, making sure to fetch the WAL files first (if needs
+ * be).
+ */
+ if (includewal == FETCH_WAL)
+ backupinfo->backupstate = PB_FETCH_WAL_LIST;
+ else
+ backupinfo->backupstate = PB_BACKUP_COMPLETE;
+
+ /* get the pg_control file at last. */
+ receive_file(conn, "global/pg_control", tablespacecount - 1);
+ stop_backup();
+ break;
+ case PB_BACKUP_COMPLETE:
+
+ /*
+ * All relation and WAL files, (if needed) have been fetched,
+ * now we can safly stop all workers and finish up.
+ */
+ cleanup_workers();
+ if (showprogress)
+ {
+ workers_progress_report(totalread, NULL, true);
+ if (isatty(fileno(stderr)))
+ fprintf(stderr, "\n"); /* Need to move to next line */
+ }
+
+ /* nothing more to do here */
+ return;
+ break;
+ default:
+ /* shouldn't come here. */
+ pg_log_error("unexpected backup state: %d",
+ backupinfo->backupstate);
+ exit(1);
+ break;
+ }
+
+ /* update and report progress */
+ totalread = 0;
+ for (int i = 0; i < numWorkers; i++)
+ {
+ WorkerState *worker = &workers[i];
+
+ totalread += worker->bytesread;
+ }
+ totalread += backupinfo->bytes_skipped;
+
+ if (backupinfo->curr != NULL)
+ filename = backupinfo->curr->path;
+
+ workers_progress_report(totalread, filename, false);
+ pg_usleep(100000);
+ }
+}
+
+/*
+ * Wait for the workers to complete the work and free connections.
+ */
+static void
+cleanup_workers(void)
+{
+ /* either non parallel backup */
+ if (!backupinfo)
+ return;
+ /* workers have already been stopped and cleanup has been done. */
+ if (backupinfo->workersdone)
+ return;
+
+ backupinfo->workersdone = true;
+ /* wakeup any workers waiting on the condition */
+ pthread_cond_broadcast(&data_ready);
+
+ for (int i = 0; i < numWorkers; i++)
+ {
+ pthread_join(workers[i].worker, NULL);
+ PQfinish(workers[i].conn);
+ }
+ free_filelist(backupinfo);
+}
+
+/*
+ * Take the system out of backup mode, also adds the backup_label file in
+ * the backup.
+ */
+static void
+stop_backup(void)
+{
+ PGresult *res = NULL;
+ char *basebkp;
+
+ basebkp = psprintf("STOP_BACKUP %s",
+ includewal == NO_WAL ? "" : "NOWAIT");
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not execute STOP BACKUP \"%s\"",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ /*
+ * Get the stop position
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not get write-ahead log end position from server: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ if (PQntuples(res) != 1)
+ {
+ pg_log_error("no write-ahead log end position returned from server");
+ exit(1);
+ }
+
+ /* retrieve the end wal location. */
+ strlcpy(backupinfo->xlogend, PQgetvalue(res, 0, 0),
+ sizeof(backupinfo->xlogend));
+
+ /* retrieve the backup_label file contents and write them to the backup */
+ writefile("backup_label", PQgetvalue(res, 0, 2));
+
+ PQclear(res);
+
+ /*
+ * Finish up the Stop command execution and make sure we have
+ * CommandComplete and ReadyForQuery response.
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data %s", PQerrorMessage(conn));
+ exit(1);
+ }
+ res = PQgetResult(conn);
+
+ if (verbose && includewal != NO_WAL)
+ pg_log_info("write-ahead log end point: %s", backupinfo->xlogend);
+}
+
+/*
+ * Retrieves the list of files available in $PGDATA from the server.
+ */
+static void
+get_backup_filelist(PGconn *conn, BackupInfo *backupInfo)
+{
+ PGresult *res = NULL;
+ char *basebkp;
+
+ for (int i = 0; i < tablespacecount; i++)
+ {
+ bool basetablespace;
+ char *tablespace;
+ int numFiles;
+
+ /*
+ * Query server to fetch the file list for given tablespace name. If
+ * the tablespace name is empty, it will fetch files list of 'base'
+ * tablespace.
+ */
+ basetablespace = PQgetisnull(tablespacehdr, i, 0);
+ tablespace = PQgetvalue(tablespacehdr, i, 1);
+
+ basebkp = psprintf("LIST_FILES '%s'",
+ basetablespace ? "" : tablespace);
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not send replication command \"%s\": %s",
+ "LIST_FILES", PQerrorMessage(conn));
+ exit(1);
+ }
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not list backup files: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ if (PQntuples(res) < 1)
+ {
+ pg_log_error("no data returned from server");
+ exit(1);
+ }
+
+ numFiles = PQntuples(res);
+ for (int j = 0; j < numFiles; j++)
+ {
+ BackupFile *file;
+ char *path = PQgetvalue(res, j, 0);
+ char type = PQgetvalue(res, j, 1)[0];
+ int32 size = atol(PQgetvalue(res, j, 2));
+ time_t mtime = atol(PQgetvalue(res, j, 3));
+
+ /*
+ * In 'plain' format, create backup directories first.
+ */
+ if (format == 'p' && type == 'd')
+ {
+ /*
+ * directory entries are skipped. however, a tar header size
+ * was included for them in totalsize_kb, so we need to add it
+ * for progress reporting purpose.
+ */
+ backupInfo->bytes_skipped += 512;
+ create_backup_dirs(basetablespace, tablespace, path);
+ continue;
+ }
+
+ if (format == 'p' && type == 'l')
+ {
+ /*
+ * symlink entries are skipped. however, a tar header size was
+ * included for them in totalsize_kb, so we need to add it for
+ * progress reporting purpose.
+ */
+ backupInfo->bytes_skipped += 512;
+ create_tblspc_symlink(path);
+ continue;
+ }
+
+ file = (BackupFile *) palloc(sizeof(BackupFile));
+ strlcpy(file->path, path, MAXPGPATH);
+ file->type = type;
+ file->size = size;
+ file->mtime = mtime;
+ file->tsindex = i;
+
+ /* add to the files list */
+ backupInfo->totalfiles++;
+ if (backupInfo->curr == NULL)
+ backupInfo->curr = backupInfo->files = file;
+ else
+ {
+ backupInfo->curr->next = file;
+ backupInfo->curr = backupInfo->curr->next;
+ }
+ }
+
+ PQclear(res);
+
+ /*
+ * Finish up the LIST_FILES command execution and make sure we have
+ * CommandComplete.
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data for '%s': %s", "LIST_FILES",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ res = PQgetResult(conn);
+ }
+
+ /* point curr to the head of list. */
+ backupInfo->curr = backupInfo->files;
+}
+
+/*
+ * Retrieve WAL file list from the server based on the starting wal location
+ * and ending wal location.
+ */
+static void
+get_wal_filelist(PGconn *conn, BackupInfo *backupInfo, char *xlogstart, char *xlogend)
+{
+ PGresult *res = NULL;
+ char *basebkp;
+ int numWals;
+
+ basebkp = psprintf("LIST_WAL_FILES START_WAL_LOCATION '%s' END_WAL_LOCATION '%s'",
+ xlogstart, xlogend);
+
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not send replication command \"%s\": %s",
+ "LIST_FILES", PQerrorMessage(conn));
+ exit(1);
+ }
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not list wal files: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ numWals = PQntuples(res);
+ for (int i = 0; i < numWals; i++)
+ {
+ BackupFile *file = (BackupFile *) palloc0(sizeof(BackupFile));
+
+ if (backupInfo->curr == NULL)
+ backupInfo->curr = backupInfo->files = file;
+ else
+ {
+ backupInfo->curr->next = file;
+ backupInfo->curr = file;
+ }
+
+ strlcpy(file->path, PQgetvalue(res, i, 0), MAXPGPATH);
+ file->tsindex = tablespacecount - 1;
+ backupInfo->totalfiles++;
+ }
+
+ /*
+ * Finish up the LIST_WAL_FILES command execution and make sure we have
+ * CommandComplete.
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data for '%s': %s", "LIST_WAL_FILES",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ res = PQgetResult(conn);
+
+ /* point curr to the head of list. */
+ backupInfo->curr = backupInfo->files;
+}
+
+/* free files list */
+static void
+free_filelist(BackupInfo *backupInfo)
+{
+ /* free files list */
+ if (backupInfo->files != NULL)
+ {
+ backupInfo->curr = backupInfo->files;
+ while (backupInfo->curr != NULL)
+ {
+ BackupFile *file = backupInfo->curr;
+
+ backupInfo->curr = file->next;
+
+ pfree(file);
+ }
+
+ backupInfo->files = NULL;
+ backupInfo->totalfiles = 0;
+ }
+}
+
+/*
+ * Worker function to process and retrieve the files from the server. If the
+ * files list is empty, it will wait for it to be filled. Otherwise picks the
+ * next file in the list.
+ */
+static int
+worker_get_files(WorkerState *wstate)
+{
+ BackupFile *fetchfile = NULL;
+ BackupInfo *backupinfo = wstate->backupinfo;
+
+ while (!backupinfo->workersdone)
+ {
+ pthread_mutex_lock(&fetch_mutex);
+ if (backupinfo->curr == NULL)
+ {
+ /*
+ * Wait until there is data available in the list to process.
+ * pthread_cond_wait call unlocks the already locked mutex during
+ * the wait state. When the condition is true (a signal is
+ * raised), one of the competing threads acquires the mutex.
+ */
+ backupinfo->activeworkers--;
+ pthread_cond_wait(&data_ready, &fetch_mutex);
+ backupinfo->activeworkers++;
+ }
+
+ fetchfile = backupinfo->curr;
+ if (fetchfile != NULL)
+ {
+ backupinfo->totalfiles--;
+ backupinfo->curr = fetchfile->next;
+ }
+ pthread_mutex_unlock(&fetch_mutex);
+
+ if (fetchfile != NULL)
+ {
+ wstate->bytesread +=
+ receive_file(wstate->conn, fetchfile->path, fetchfile->tsindex);
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * This function fetches the requested file from the server.
+ */
+static int
+receive_file(PGconn *conn, char *file, int tsIndex)
+{
+ PGresult *res = NULL;
+ int bytesread;
+ PQExpBuffer buf = createPQExpBuffer();
+
+ /*
+ * Fetch a single file from the server. To fetch the file, build a query
+ * in form of:
+ *
+ * SEND_FILES ('base/1/1245/32683') [options]
+ */
+ appendPQExpBuffer(buf, "SEND_FILES ( '%s' )", file);
+
+ /* add options */
+ appendPQExpBuffer(buf, " START_WAL_LOCATION '%s' %s",
+ backupinfo->xlogstart,
+ verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ if (!conn)
+ return 1;
+
+ if (PQsendQuery(conn, buf->data) == 0)
+ {
+ pg_log_error("could not send files list \"%s\"",
+ PQerrorMessage(conn));
+ return 1;
+ }
+
+ destroyPQExpBuffer(buf);
+
+ /* process file contents, also count bytesRead for progress */
+ bytesread = ReceiveAndUnpackTarFile(conn, tablespacehdr, tsIndex);
+
+ PQclear(res);
+
+ /*
+ * Finish up the SEND_FILES command execution and make sure we have
+ * CommandComplete.
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data for '%s': %s", "SEND_FILES",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ res = PQgetResult(conn);
+ return bytesread;
+}
+
+/*
+ * Create backup directories while taking care of tablespace path. If tablespace
+ * mapping (with -T) is given then the directory will be created on the mapped
+ * path.
+ */
+static void
+create_backup_dirs(bool basetablespace, char *tablespace, char *name)
+{
+ char dirpath[MAXPGPATH];
+
+ Assert(name != NULL);
+
+ if (basetablespace)
+ snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name);
+ else
+ {
+ Assert(tablespace != NULL);
+ snprintf(dirpath, sizeof(dirpath), "%s/%s",
+ get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1));
+ }
+
+ if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0)
+ {
+ if (errno != EEXIST)
+ {
+ pg_log_error("could not create directory \"%s\": %m",
+ dirpath);
+ exit(1);
+ }
+ }
+}
+
+/*
+ * Create a symlink in pg_tblspc and apply any tablespace mapping given on
+ * the command line (--tablespace-mapping).
+ */
+static void
+create_tblspc_symlink(char *filename)
+{
+ int i;
+
+ for (i = 0; i < tablespacecount; i++)
+ {
+ char *tsoid = PQgetvalue(tablespacehdr, i, 0);
+
+ if (strstr(filename, tsoid) != NULL)
+ {
+ char *linkloc = psprintf("%s/%s", basedir, filename);
+ const char *mapped_tblspc_path = get_tablespace_mapping(PQgetvalue(tablespacehdr, i, 1));
+
+#ifdef HAVE_SYMLINK
+ if (symlink(mapped_tblspc_path, linkloc) != 0)
+ {
+ pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
+ linkloc, mapped_tblspc_path);
+ exit(1);
+ }
+#else
+ pg_log_error("symlinks are not supported on this platform");
+ exit(1);
+#endif
+ free(linkloc);
+ break;
+ }
+ }
+}
+
+/*
+ * General function for writing to a file; creates one if it doesn't exist
+ */
+static void
+writefile(char *path, char *buf)
+{
+ FILE *f;
+ char pathbuf[MAXPGPATH];
+
+ snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path);
+ f = fopen(pathbuf, "w");
+ if (f == NULL)
+ {
+ pg_log_error("could not open file \"%s\": %m", pathbuf);
+ exit(1);
+ }
+
+ if (fwrite(buf, strlen(buf), 1, f) != 1)
+ {
+ pg_log_error("could not write to file \"%s\": %m", pathbuf);
+ exit(1);
+ }
+
+ if (fclose(f))
+ {
+ pg_log_error("could not write to file \"%s\": %m", pathbuf);
+ exit(1);
+ }
+}
+
+static int
+fetch_max_wal_senders(PGconn *conn)
+{
+ PGresult *res;
+ int max_wal_senders;
+
+ /* check connection existence */
+ Assert(conn != NULL);
+
+ res = PQexec(conn, "SHOW max_wal_senders");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not send replication command \"%s\": %s",
+ "SHOW max_wal_senders", PQerrorMessage(conn));
+
+ PQclear(res);
+ return -1;
+ }
+
+ if (PQntuples(res) != 1 || PQnfields(res) < 1)
+ {
+ pg_log_error("could not fetch max wal senders: got %d rows and %d fields, expected %d rows and %d or more fields",
+ PQntuples(res), PQnfields(res), 1, 1);
+
+ PQclear(res);
+ return false;
+ }
+
+ max_wal_senders = atoi(PQgetvalue(res, 0, 0));
+ PQclear(res);
+
+ return max_wal_senders;
+}
--
2.21.1 (Apple Git-122.3)