From 945cd4b33f3b98bddf849fcca3c2a091248f0142 Mon Sep 17 00:00:00 2001 From: Asif Rehman Date: Mon, 27 Jan 2020 18:56:21 +0500 Subject: [PATCH 4/6] Parallel Backup - pg_basebackup Implements the replication commands added in the backend replication system and adds support for --jobs=NUM in pg_basebackup to take a full backup in parallel using multiple connections. The utility will collect a list of files from the server first and then workers will copy files (one by one) over COPY protocol. The WAL files are also copied in similar manner. --- src/bin/pg_basebackup/pg_basebackup.c | 1080 +++++++++++++++++++++++-- 1 file changed, 1015 insertions(+), 65 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 48bd838803b..7e392889809 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -13,6 +13,7 @@ #include "postgres_fe.h" +#include #include #include #include @@ -85,12 +86,65 @@ typedef struct UnpackTarState const char *mapped_tblspc_path; pgoff_t current_len_left; int current_padding; + size_t current_bytes_read; FILE *file; } UnpackTarState; typedef void (*WriteDataCallback) (size_t nbytes, char *buf, void *callback_data); +typedef struct BackupFile +{ + char path[MAXPGPATH]; + char type; + int32 size; + time_t mtime; + + int tsindex; /* index of tsInfo this file belongs to. */ + struct BackupFile *next; +} BackupFile; + +typedef enum BackupState +{ + PB_FETCH_REL_LIST, + PB_FETCH_REL_FILES, + PB_FETCH_WAL_LIST, + PB_FETCH_WAL_FILES, + PB_STOP_BACKUP, + PB_BACKUP_COMPLETE +} BackupState; + +typedef struct BackupInfo +{ + int totalfiles; + uint64 bytes_skipped; + char xlogstart[64]; + char xlogend[64]; + BackupFile *files; /* list of BackupFile pointers */ + BackupFile *curr; /* pointer to the file in the list */ + BackupState backupstate; + bool workersdone; + int activeworkers; +} BackupInfo; + +typedef struct WorkerState +{ + pthread_t worker; + int workerid; + BackupInfo *backupinfo; + PGconn *conn; + uint64 bytesread; +} WorkerState; + +BackupInfo *backupinfo = NULL; +WorkerState *workers = NULL; + +/* lock to be used for fetching file from the files list. */ +static pthread_mutex_t fetch_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* condition to be used when the files list is filled. */ +static pthread_cond_t data_ready = PTHREAD_COND_INITIALIZER; + /* * pg_xlog has been renamed to pg_wal in version 10. This version number * should be compared with PQserverVersion(). @@ -144,6 +198,9 @@ static bool found_existing_xlogdir = false; static bool made_tablespace_dirs = false; static bool found_tablespace_dirs = false; +static int numWorkers = 1; +static PGresult *tablespacehdr; + /* Progress counters */ static uint64 totalsize_kb; static uint64 totaldone; @@ -174,10 +231,12 @@ static PQExpBuffer recoveryconfcontents = NULL; static void usage(void); static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found); static void progress_report(int tablespacenum, const char *filename, bool force); +static void workers_progress_report(uint64 totalBytesRead, + const char *filename, bool force); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data); -static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); +static int ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data); static void BaseBackup(void); @@ -188,6 +247,22 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, static const char *get_tablespace_mapping(const char *dir); static void tablespace_list_append(const char *arg); +static void *worker_run(void *arg); +static void create_parallel_workers(BackupInfo *backupInfo); +static void parallel_backup_run(BackupInfo *backupInfo); +static void cleanup_workers(void); +static void stop_backup(void); +static void get_backup_filelist(PGconn *conn, BackupInfo *backupInfo); +static void get_wal_filelist(PGconn *conn, BackupInfo *backupInfo, + char *xlogstart, char *xlogend); +static void free_filelist(BackupInfo *backupInfo); +static int worker_get_files(WorkerState *wstate); +static int receive_file(PGconn *conn, char *file, int tsIndex); +static void create_backup_dirs(bool basetablespace, char *tablespace, + char *name); +static void create_tblspc_symlink(char *filename); +static void writefile(char *path, char *buf); +static int fetch_max_wal_senders(PGconn *conn); static void cleanup_directories_atexit(void) @@ -239,6 +314,8 @@ cleanup_directories_atexit(void) static void disconnect_atexit(void) { + cleanup_workers(); + if (conn != NULL) PQfinish(conn); } @@ -386,6 +463,7 @@ usage(void) printf(_(" --no-slot prevent creation of temporary replication slot\n")); printf(_(" --no-verify-checksums\n" " do not verify checksums\n")); + printf(_(" -j, --jobs=NUM use this many parallel jobs to backup\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=CONNSTR connection string\n")); @@ -733,6 +811,94 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found) } } +/* + * Print a progress report of worker threads. If verbose output + * is enabled, also print the current file name. + * + * Progress report is written at maximum once per second, unless the + * force parameter is set to true. + */ +static void +workers_progress_report(uint64 totalBytesRead, const char *filename, bool force) +{ + int percent; + char totalBytesRead_str[32]; + char totalsize_str[32]; + pg_time_t now; + + if (!showprogress) + return; + + now = time(NULL); + if (now == last_progress_report && !force) + return; /* Max once per second */ + + last_progress_report = now; + percent = totalsize_kb ? (int) ((totalBytesRead / 1024) * 100 / totalsize_kb) : 0; + + /* + * Avoid overflowing past 100% or the full size. This may make the total + * size number change as we approach the end of the backup (the estimate + * will always be wrong if WAL is included), but that's better than having + * the done column be bigger than the total. + */ + if (percent > 100) + percent = 100; + if (totalBytesRead / 1024 > totalsize_kb) + totalsize_kb = totalBytesRead / 1024; + + /* + * Separate step to keep platform-dependent format code out of + * translatable strings. And we only test for INT64_FORMAT availability + * in snprintf, not fprintf. + */ + snprintf(totalBytesRead_str, sizeof(totalBytesRead_str), INT64_FORMAT, + totalBytesRead / 1024); + snprintf(totalsize_str, sizeof(totalsize_str), INT64_FORMAT, totalsize_kb); + +#define VERBOSE_FILENAME_LENGTH 35 + + if (verbose) + { + if (!filename) + + /* + * No filename given, so clear the status line (used for last + * call) + */ + fprintf(stderr, _("%*s/%s kB (%d%%) copied %*s"), + (int) strlen(totalsize_str), + totalBytesRead_str, totalsize_str, + percent, + VERBOSE_FILENAME_LENGTH + 5, ""); + else + { + bool truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH); + + fprintf(stderr, _("%*s/%s kB (%d%%) copied, current file (%s%-*.*s)"), + (int) strlen(totalsize_str), totalBytesRead_str, totalsize_str, + percent, + /* Prefix with "..." if we do leading truncation */ + truncate ? "..." : "", + truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH, + truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH, + /* Truncate filename at beginning if it's too long */ + truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename); + } + } + else + { + fprintf(stderr, _("%*s/%s kB (%d%%) copied"), + (int) strlen(totalsize_str), + totalBytesRead_str, totalsize_str, + percent); + } + + if (isatty(fileno(stderr))) + fprintf(stderr, "\r"); + else + fprintf(stderr, "\n"); +} /* * Print a progress report based on the global variables. If verbose output @@ -749,7 +915,7 @@ progress_report(int tablespacenum, const char *filename, bool force) char totalsize_str[32]; pg_time_t now; - if (!showprogress) + if (!showprogress || numWorkers > 1) return; now = time(NULL); @@ -1439,7 +1605,7 @@ get_tablespace_mapping(const char *dir) * specified directory. If it's for another tablespace, it will be restored * in the original or mapped directory. */ -static void +static int ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) { UnpackTarState state; @@ -1470,13 +1636,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) exit(1); } - if (basetablespace && writerecoveryconf) - WriteRecoveryConfig(conn, basedir, recoveryconfcontents); - /* * No data is synced here, everything is done for all tablespaces at the * end. */ + + return state.current_bytes_read; } static void @@ -1499,6 +1664,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data) exit(1); } totaldone += 512; + state->current_bytes_read += 512; state->current_len_left = read_tar_number(©buf[124], 12); @@ -1630,6 +1796,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data) fclose(state->file); state->file = NULL; totaldone += r; + state->current_bytes_read += r; return; } @@ -1639,6 +1806,7 @@ ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data) exit(1); } totaldone += r; + state->current_bytes_read += r; progress_report(state->tablespacenum, state->filename, false); state->current_len_left -= r; @@ -1706,6 +1874,24 @@ BaseBackup(void) exit(1); } + if (numWorkers > 1) + { + int max_wal_senders = fetch_max_wal_senders(conn); + + /* + * In parallel backup mode, pg_basebackup opens numWorkers + 2 + * connections. One of the two additional connections is used by the + * main application while the other one is used if WAL streaming is + * enabled (-X Stream). + */ + if (numWorkers + 2 > max_wal_senders) + { + pg_log_error("number of requested workers exceeds max_wal_senders (currently %d)", + max_wal_senders); + exit(1); + } + } + /* * Build contents of configuration file if requested */ @@ -1738,16 +1924,26 @@ BaseBackup(void) fprintf(stderr, "\n"); } - basebkp = - psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s", - escaped_label, - showprogress ? "PROGRESS" : "", - includewal == FETCH_WAL ? "WAL" : "", - fastcheckpoint ? "FAST" : "", - includewal == NO_WAL ? "" : "NOWAIT", - maxrate_clause ? maxrate_clause : "", - format == 't' ? "TABLESPACE_MAP" : "", - verify_checksums ? "" : "NOVERIFY_CHECKSUMS"); + if (numWorkers <= 1) + { + basebkp = + psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s", + escaped_label, + showprogress ? "PROGRESS" : "", + includewal == FETCH_WAL ? "WAL" : "", + fastcheckpoint ? "FAST" : "", + includewal == NO_WAL ? "" : "NOWAIT", + maxrate_clause ? maxrate_clause : "", + format == 't' ? "TABLESPACE_MAP" : "", + verify_checksums ? "" : "NOVERIFY_CHECKSUMS"); + } + else + { + basebkp = + psprintf("START_BACKUP LABEL '%s' %s", + escaped_label, + fastcheckpoint ? "FAST" : ""); + } if (PQsendQuery(conn, basebkp) == 0) { @@ -1794,10 +1990,36 @@ BaseBackup(void) pg_log_info("write-ahead log start point: %s on timeline %u", xlogstart, starttli); + if (numWorkers > 1) + { + /* + * Finish up the START_BACKUP command execution and make sure we have + * CommandComplete. + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data for '%s': %s", "START_BACKUP", + PQerrorMessage(conn)); + exit(1); + } + res = PQgetResult(conn); + + basebkp = psprintf("LIST_TABLESPACES %s", + showprogress ? "PROGRESS" : ""); + + if (PQsendQuery(conn, basebkp) == 0) + { + pg_log_error("could not send replication command \"%s\": %s", + "LIST_TABLESPACES", PQerrorMessage(conn)); + exit(1); + } + } + /* * Get the header */ - res = PQgetResult(conn); + tablespacehdr = res = PQgetResult(conn); if (PQresultStatus(res) != PGRES_TUPLES_OK) { pg_log_error("could not get backup header: %s", @@ -1853,65 +2075,98 @@ BaseBackup(void) StartLogStreamer(xlogstart, starttli, sysidentifier); } - /* - * Start receiving chunks - */ - for (i = 0; i < PQntuples(res); i++) - { - if (format == 't') - ReceiveTarFile(conn, res, i); - else - ReceiveAndUnpackTarFile(conn, res, i); - } /* Loop over all tablespaces */ - - if (showprogress) + if (numWorkers <= 1) { - progress_report(PQntuples(res), NULL, true); - if (isatty(fileno(stderr))) - fprintf(stderr, "\n"); /* Need to move to next line */ - } + /* + * Start receiving chunks + */ + for (i = 0; i < PQntuples(res); i++) + { + if (format == 't') + ReceiveTarFile(conn, res, i); + else + ReceiveAndUnpackTarFile(conn, res, i); + } /* Loop over all tablespaces */ - PQclear(res); + if (showprogress) + { + progress_report(PQntuples(tablespacehdr), NULL, true); + if (isatty(fileno(stderr))) + fprintf(stderr, "\n"); /* Need to move to next line */ + } - /* - * Get the stop position - */ - res = PQgetResult(conn); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - pg_log_error("could not get write-ahead log end position from server: %s", - PQerrorMessage(conn)); - exit(1); - } - if (PQntuples(res) != 1) - { - pg_log_error("no write-ahead log end position returned from server"); - exit(1); - } - strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend)); - if (verbose && includewal != NO_WAL) - pg_log_info("write-ahead log end point: %s", xlogend); - PQclear(res); + PQclear(res); - res = PQgetResult(conn); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + /* + * Get the stop position + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not get write-ahead log end position from server: %s", + PQerrorMessage(conn)); + exit(1); + } + if (PQntuples(res) != 1) + { + pg_log_error("no write-ahead log end position returned from server"); + exit(1); + } + strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend)); + if (verbose && includewal != NO_WAL) + pg_log_info("write-ahead log end point: %s", xlogend); + PQclear(res); - if (sqlstate && - strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0) + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { - pg_log_error("checksum error occurred"); - checksum_failure = true; + const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + + if (sqlstate && + strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0) + { + pg_log_error("checksum error occurred"); + checksum_failure = true; + } + else + { + pg_log_error("final receive failed: %s", + PQerrorMessage(conn)); + } + exit(1); } - else + } + + if (numWorkers > 1) + { + /* + * Finish up the LIST_TABLESPACES command execution and make sure we + * have CommandComplete. + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { - pg_log_error("final receive failed: %s", + pg_log_error("could not get data for '%s': %s", "LIST_TABLESPACES", PQerrorMessage(conn)); + exit(1); } - exit(1); + res = PQgetResult(conn); + + backupinfo = palloc0(sizeof(BackupInfo)); + backupinfo->backupstate = PB_FETCH_REL_LIST; + + /* copy starting WAL location */ + strlcpy(backupinfo->xlogstart, xlogstart, sizeof(backupinfo->xlogstart)); + create_parallel_workers(backupinfo); + parallel_backup_run(backupinfo); + /* copy ending WAL location */ + strlcpy(xlogend, backupinfo->xlogend, sizeof(xlogend)); } + /* Write recovery contents */ + if (format == 'p' && writerecoveryconf) + WriteRecoveryConfig(conn, basedir, recoveryconfcontents); + if (bgchild > 0) { #ifndef WIN32 @@ -2066,6 +2321,7 @@ main(int argc, char **argv) {"waldir", required_argument, NULL, 1}, {"no-slot", no_argument, NULL, 2}, {"no-verify-checksums", no_argument, NULL, 3}, + {"jobs", required_argument, NULL, 'j'}, {NULL, 0, NULL, 0} }; int c; @@ -2093,7 +2349,7 @@ main(int argc, char **argv) atexit(cleanup_directories_atexit); - while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP", + while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:", long_options, &option_index)) != -1) { switch (c) @@ -2234,6 +2490,9 @@ main(int argc, char **argv) case 3: verify_checksums = false; break; + case 'j': /* number of jobs */ + numWorkers = atoi(optarg); + break; default: /* @@ -2348,6 +2607,30 @@ main(int argc, char **argv) } } + if (numWorkers <= 0) + { + pg_log_error("invalid number of parallel jobs"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (format != 'p' && numWorkers > 1) + { + pg_log_error("parallel jobs are only supported with 'plain' format"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (maxrate > 0 && numWorkers > 1) + { + pg_log_error("--max-rate is not supported with parallel jobs"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef HAVE_LIBZ if (compresslevel != 0) { @@ -2420,3 +2703,670 @@ main(int argc, char **argv) success = true; return 0; } + +/* + * Worker thread function. Added for code readability. + */ +static void * +worker_run(void *arg) +{ + WorkerState *wstate = (WorkerState *) arg; + + worker_get_files(wstate); + + return NULL; +} + +/* + * Create workers and initialize worker state. + */ +static void +create_parallel_workers(BackupInfo *backupinfo) +{ + int status, + i; + + workers = (WorkerState *) palloc(sizeof(WorkerState) * numWorkers); + backupinfo->activeworkers = 0; + + for (i = 0; i < numWorkers; i++) + { + WorkerState *worker = &workers[i]; + + worker->backupinfo = backupinfo; + worker->bytesread = 0; + worker->workerid = i; + worker->conn = GetConnection(); + backupinfo->activeworkers++; + + status = pthread_create(&worker->worker, NULL, worker_run, worker); + if (status != 0) + { + pg_log_error("failed to create thread: %m"); + exit(1); + } + + if (verbose) + pg_log_info("backup worker (%d) created, %d", i, status); + } +} + +/* + * This is the main function that controls the worker, assign tasks and does + * cleanup. + */ +static void +parallel_backup_run(BackupInfo *backupinfo) +{ + uint64_t totalread = 0; + + while (1) + { + char *filename = NULL; + + switch (backupinfo->backupstate) + { + case PB_FETCH_REL_LIST: /* get the list of files to fetch */ + backupinfo->backupstate = PB_FETCH_REL_FILES; + /* retrieve backup file list from the server. */ + get_backup_filelist(conn, backupinfo); + /* unblock any workers waiting on the condition */ + pthread_cond_broadcast(&data_ready); + break; + case PB_FETCH_REL_FILES: /* fetch files from server */ + if (backupinfo->activeworkers == 0) + { + backupinfo->backupstate = PB_STOP_BACKUP; + free_filelist(backupinfo); + } + break; + case PB_FETCH_WAL_LIST: /* get the list of WAL files to fetch */ + backupinfo->backupstate = PB_FETCH_WAL_FILES; + get_wal_filelist(conn, backupinfo, backupinfo->xlogstart, backupinfo->xlogend); + /* unblock any workers waiting on the condition */ + pthread_cond_broadcast(&data_ready); + break; + case PB_FETCH_WAL_FILES: /* fetch WAL files from server */ + if (backupinfo->activeworkers == 0) + { + backupinfo->backupstate = PB_BACKUP_COMPLETE; + } + break; + case PB_STOP_BACKUP: + + /* + * All relation files have been fetched, time to stop the + * backup, making sure to fetch the WAL files first (if needs + * be). + */ + if (includewal == FETCH_WAL) + backupinfo->backupstate = PB_FETCH_WAL_LIST; + else + backupinfo->backupstate = PB_BACKUP_COMPLETE; + + /* get the pg_control file at last. */ + receive_file(conn, "global/pg_control", tablespacecount - 1); + stop_backup(); + break; + case PB_BACKUP_COMPLETE: + + /* + * All relation and WAL files, (if needed) have been fetched, + * now we can safly stop all workers and finish up. + */ + cleanup_workers(); + if (showprogress) + { + workers_progress_report(totalread, NULL, true); + if (isatty(fileno(stderr))) + fprintf(stderr, "\n"); /* Need to move to next line */ + } + + /* nothing more to do here */ + return; + break; + default: + /* shouldn't come here. */ + pg_log_error("unexpected backup state: %d", + backupinfo->backupstate); + exit(1); + break; + } + + /* update and report progress */ + totalread = 0; + for (int i = 0; i < numWorkers; i++) + { + WorkerState *worker = &workers[i]; + + totalread += worker->bytesread; + } + totalread += backupinfo->bytes_skipped; + + if (backupinfo->curr != NULL) + filename = backupinfo->curr->path; + + workers_progress_report(totalread, filename, false); + pg_usleep(100000); + } +} + +/* + * Wait for the workers to complete the work and free connections. + */ +static void +cleanup_workers(void) +{ + /* either non parallel backup */ + if (!backupinfo) + return; + /* workers have already been stopped and cleanup has been done. */ + if (backupinfo->workersdone) + return; + + backupinfo->workersdone = true; + /* wakeup any workers waiting on the condition */ + pthread_cond_broadcast(&data_ready); + + for (int i = 0; i < numWorkers; i++) + { + pthread_join(workers[i].worker, NULL); + PQfinish(workers[i].conn); + } + free_filelist(backupinfo); +} + +/* + * Take the system out of backup mode, also adds the backup_label file in + * the backup. + */ +static void +stop_backup(void) +{ + PGresult *res = NULL; + char *basebkp; + + basebkp = psprintf("STOP_BACKUP %s", + includewal == NO_WAL ? "" : "NOWAIT"); + if (PQsendQuery(conn, basebkp) == 0) + { + pg_log_error("could not execute STOP BACKUP \"%s\"", + PQerrorMessage(conn)); + exit(1); + } + + /* + * Get the stop position + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not get write-ahead log end position from server: %s", + PQerrorMessage(conn)); + exit(1); + } + if (PQntuples(res) != 1) + { + pg_log_error("no write-ahead log end position returned from server"); + exit(1); + } + + /* retrieve the end wal location. */ + strlcpy(backupinfo->xlogend, PQgetvalue(res, 0, 0), + sizeof(backupinfo->xlogend)); + + /* retrieve the backup_label file contents and write them to the backup */ + writefile("backup_label", PQgetvalue(res, 0, 2)); + + PQclear(res); + + /* + * Finish up the Stop command execution and make sure we have + * CommandComplete and ReadyForQuery response. + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data %s", PQerrorMessage(conn)); + exit(1); + } + res = PQgetResult(conn); + + if (verbose && includewal != NO_WAL) + pg_log_info("write-ahead log end point: %s", backupinfo->xlogend); +} + +/* + * Retrieves the list of files available in $PGDATA from the server. + */ +static void +get_backup_filelist(PGconn *conn, BackupInfo *backupInfo) +{ + PGresult *res = NULL; + char *basebkp; + + for (int i = 0; i < tablespacecount; i++) + { + bool basetablespace; + char *tablespace; + int numFiles; + + /* + * Query server to fetch the file list for given tablespace name. If + * the tablespace name is empty, it will fetch files list of 'base' + * tablespace. + */ + basetablespace = PQgetisnull(tablespacehdr, i, 0); + tablespace = PQgetvalue(tablespacehdr, i, 1); + + basebkp = psprintf("LIST_FILES '%s'", + basetablespace ? "" : tablespace); + if (PQsendQuery(conn, basebkp) == 0) + { + pg_log_error("could not send replication command \"%s\": %s", + "LIST_FILES", PQerrorMessage(conn)); + exit(1); + } + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not list backup files: %s", + PQerrorMessage(conn)); + exit(1); + } + if (PQntuples(res) < 1) + { + pg_log_error("no data returned from server"); + exit(1); + } + + numFiles = PQntuples(res); + for (int j = 0; j < numFiles; j++) + { + BackupFile *file; + char *path = PQgetvalue(res, j, 0); + char type = PQgetvalue(res, j, 1)[0]; + int32 size = atol(PQgetvalue(res, j, 2)); + time_t mtime = atol(PQgetvalue(res, j, 3)); + + /* + * In 'plain' format, create backup directories first. + */ + if (format == 'p' && type == 'd') + { + /* + * directory entries are skipped. however, a tar header size + * was included for them in totalsize_kb, so we need to add it + * for progress reporting purpose. + */ + backupInfo->bytes_skipped += 512; + create_backup_dirs(basetablespace, tablespace, path); + continue; + } + + if (format == 'p' && type == 'l') + { + /* + * symlink entries are skipped. however, a tar header size was + * included for them in totalsize_kb, so we need to add it for + * progress reporting purpose. + */ + backupInfo->bytes_skipped += 512; + create_tblspc_symlink(path); + continue; + } + + file = (BackupFile *) palloc(sizeof(BackupFile)); + strlcpy(file->path, path, MAXPGPATH); + file->type = type; + file->size = size; + file->mtime = mtime; + file->tsindex = i; + + /* add to the files list */ + backupInfo->totalfiles++; + if (backupInfo->curr == NULL) + backupInfo->curr = backupInfo->files = file; + else + { + backupInfo->curr->next = file; + backupInfo->curr = backupInfo->curr->next; + } + } + + PQclear(res); + + /* + * Finish up the LIST_FILES command execution and make sure we have + * CommandComplete. + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data for '%s': %s", "LIST_FILES", + PQerrorMessage(conn)); + exit(1); + } + res = PQgetResult(conn); + } + + /* point curr to the head of list. */ + backupInfo->curr = backupInfo->files; +} + +/* + * Retrieve WAL file list from the server based on the starting wal location + * and ending wal location. + */ +static void +get_wal_filelist(PGconn *conn, BackupInfo *backupInfo, char *xlogstart, char *xlogend) +{ + PGresult *res = NULL; + char *basebkp; + int numWals; + + basebkp = psprintf("LIST_WAL_FILES START_WAL_LOCATION '%s' END_WAL_LOCATION '%s'", + xlogstart, xlogend); + + if (PQsendQuery(conn, basebkp) == 0) + { + pg_log_error("could not send replication command \"%s\": %s", + "LIST_FILES", PQerrorMessage(conn)); + exit(1); + } + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not list wal files: %s", + PQerrorMessage(conn)); + exit(1); + } + + numWals = PQntuples(res); + for (int i = 0; i < numWals; i++) + { + BackupFile *file = (BackupFile *) palloc0(sizeof(BackupFile)); + + if (backupInfo->curr == NULL) + backupInfo->curr = backupInfo->files = file; + else + { + backupInfo->curr->next = file; + backupInfo->curr = file; + } + + strlcpy(file->path, PQgetvalue(res, i, 0), MAXPGPATH); + file->tsindex = tablespacecount - 1; + backupInfo->totalfiles++; + } + + /* + * Finish up the LIST_WAL_FILES command execution and make sure we have + * CommandComplete. + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data for '%s': %s", "LIST_WAL_FILES", + PQerrorMessage(conn)); + exit(1); + } + res = PQgetResult(conn); + + /* point curr to the head of list. */ + backupInfo->curr = backupInfo->files; +} + +/* free files list */ +static void +free_filelist(BackupInfo *backupInfo) +{ + /* free files list */ + if (backupInfo->files != NULL) + { + backupInfo->curr = backupInfo->files; + while (backupInfo->curr != NULL) + { + BackupFile *file = backupInfo->curr; + + backupInfo->curr = file->next; + + pfree(file); + } + + backupInfo->files = NULL; + backupInfo->totalfiles = 0; + } +} + +/* + * Worker function to process and retrieve the files from the server. If the + * files list is empty, it will wait for it to be filled. Otherwise picks the + * next file in the list. + */ +static int +worker_get_files(WorkerState *wstate) +{ + BackupFile *fetchfile = NULL; + BackupInfo *backupinfo = wstate->backupinfo; + + while (!backupinfo->workersdone) + { + pthread_mutex_lock(&fetch_mutex); + if (backupinfo->curr == NULL) + { + /* + * Wait until there is data available in the list to process. + * pthread_cond_wait call unlocks the already locked mutex during + * the wait state. When the condition is true (a signal is + * raised), one of the competing threads acquires the mutex. + */ + backupinfo->activeworkers--; + pthread_cond_wait(&data_ready, &fetch_mutex); + backupinfo->activeworkers++; + } + + fetchfile = backupinfo->curr; + if (fetchfile != NULL) + { + backupinfo->totalfiles--; + backupinfo->curr = fetchfile->next; + } + pthread_mutex_unlock(&fetch_mutex); + + if (fetchfile != NULL) + { + wstate->bytesread += + receive_file(wstate->conn, fetchfile->path, fetchfile->tsindex); + } + } + + return 0; +} + +/* + * This function fetches the requested file from the server. + */ +static int +receive_file(PGconn *conn, char *file, int tsIndex) +{ + PGresult *res = NULL; + int bytesread; + PQExpBuffer buf = createPQExpBuffer(); + + /* + * Fetch a single file from the server. To fetch the file, build a query + * in form of: + * + * SEND_FILES ('base/1/1245/32683') [options] + */ + appendPQExpBuffer(buf, "SEND_FILES ( '%s' )", file); + + /* add options */ + appendPQExpBuffer(buf, " START_WAL_LOCATION '%s' %s", + backupinfo->xlogstart, + verify_checksums ? "" : "NOVERIFY_CHECKSUMS"); + if (!conn) + return 1; + + if (PQsendQuery(conn, buf->data) == 0) + { + pg_log_error("could not send files list \"%s\"", + PQerrorMessage(conn)); + return 1; + } + + destroyPQExpBuffer(buf); + + /* process file contents, also count bytesRead for progress */ + bytesread = ReceiveAndUnpackTarFile(conn, tablespacehdr, tsIndex); + + PQclear(res); + + /* + * Finish up the SEND_FILES command execution and make sure we have + * CommandComplete. + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data for '%s': %s", "SEND_FILES", + PQerrorMessage(conn)); + exit(1); + } + res = PQgetResult(conn); + return bytesread; +} + +/* + * Create backup directories while taking care of tablespace path. If tablespace + * mapping (with -T) is given then the directory will be created on the mapped + * path. + */ +static void +create_backup_dirs(bool basetablespace, char *tablespace, char *name) +{ + char dirpath[MAXPGPATH]; + + Assert(name != NULL); + + if (basetablespace) + snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name); + else + { + Assert(tablespace != NULL); + snprintf(dirpath, sizeof(dirpath), "%s/%s", + get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1)); + } + + if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0) + { + if (errno != EEXIST) + { + pg_log_error("could not create directory \"%s\": %m", + dirpath); + exit(1); + } + } +} + +/* + * Create a symlink in pg_tblspc and apply any tablespace mapping given on + * the command line (--tablespace-mapping). + */ +static void +create_tblspc_symlink(char *filename) +{ + int i; + + for (i = 0; i < tablespacecount; i++) + { + char *tsoid = PQgetvalue(tablespacehdr, i, 0); + + if (strstr(filename, tsoid) != NULL) + { + char *linkloc = psprintf("%s/%s", basedir, filename); + const char *mapped_tblspc_path = get_tablespace_mapping(PQgetvalue(tablespacehdr, i, 1)); + +#ifdef HAVE_SYMLINK + if (symlink(mapped_tblspc_path, linkloc) != 0) + { + pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m", + linkloc, mapped_tblspc_path); + exit(1); + } +#else + pg_log_error("symlinks are not supported on this platform"); + exit(1); +#endif + free(linkloc); + break; + } + } +} + +/* + * General function for writing to a file; creates one if it doesn't exist + */ +static void +writefile(char *path, char *buf) +{ + FILE *f; + char pathbuf[MAXPGPATH]; + + snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path); + f = fopen(pathbuf, "w"); + if (f == NULL) + { + pg_log_error("could not open file \"%s\": %m", pathbuf); + exit(1); + } + + if (fwrite(buf, strlen(buf), 1, f) != 1) + { + pg_log_error("could not write to file \"%s\": %m", pathbuf); + exit(1); + } + + if (fclose(f)) + { + pg_log_error("could not write to file \"%s\": %m", pathbuf); + exit(1); + } +} + +static int +fetch_max_wal_senders(PGconn *conn) +{ + PGresult *res; + int max_wal_senders; + + /* check connection existence */ + Assert(conn != NULL); + + res = PQexec(conn, "SHOW max_wal_senders"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not send replication command \"%s\": %s", + "SHOW max_wal_senders", PQerrorMessage(conn)); + + PQclear(res); + return -1; + } + + if (PQntuples(res) != 1 || PQnfields(res) < 1) + { + pg_log_error("could not fetch max wal senders: got %d rows and %d fields, expected %d rows and %d or more fields", + PQntuples(res), PQnfields(res), 1, 1); + + PQclear(res); + return false; + } + + max_wal_senders = atoi(PQgetvalue(res, 0, 0)); + PQclear(res); + + return max_wal_senders; +} -- 2.21.1 (Apple Git-122.3)