0003-pg_basebackup-changes-for-parallel-backup.patch
application/octet-stream
Filename: 0003-pg_basebackup-changes-for-parallel-backup.patch
Type: application/octet-stream
Part: 2
Message:
Re: WIP/PoC for parallel backup
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0003
Subject: pg_basebackup changes for parallel backup.
| File | + | − |
|---|---|---|
| src/bin/pg_basebackup/pg_basebackup.c | 548 | 35 |
From 245b10802490fafeba7b17779e5c2860fbc1181c Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Mon, 14 Oct 2019 17:28:58 +0500
Subject: [PATCH 3/4] pg_basebackup changes for parallel backup.
---
src/bin/pg_basebackup/pg_basebackup.c | 583 ++++++++++++++++++++++++--
1 file changed, 548 insertions(+), 35 deletions(-)
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 55ef13926d..311c1f94ca 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -41,6 +41,7 @@
#include "receivelog.h"
#include "replication/basebackup.h"
#include "streamutil.h"
+#include "fe_utils/simple_list.h"
#define ERRCODE_DATA_CORRUPTED "XX001"
@@ -57,6 +58,37 @@ typedef struct TablespaceList
TablespaceListCell *tail;
} TablespaceList;
+typedef struct
+{
+ char name[MAXPGPATH];
+ char type;
+ int32 size;
+ time_t mtime;
+} BackupFile;
+
+typedef struct
+{
+ Oid tblspcOid;
+ char *tablespace; /* tablespace name or NULL if 'base' tablespace */
+ int numFiles; /* number of files */
+ BackupFile *backupFiles; /* list of files in tablespace */
+} TablespaceInfo;
+
+typedef struct
+{
+ int tablespacecount;
+ int numWorkers;
+
+ char xlogstart[64];
+ char *backup_label;
+ char *tablespace_map;
+
+ TablespaceInfo *tsInfo;
+ SimpleStringList **worker_files;
+} BackupInfo;
+
+static BackupInfo *backupInfo = NULL;
+
/*
* pg_xlog has been renamed to pg_wal in version 10. This version number
* should be compared with PQserverVersion().
@@ -110,6 +142,10 @@ static bool found_existing_xlogdir = false;
static bool made_tablespace_dirs = false;
static bool found_tablespace_dirs = false;
+static int numWorkers = 1;
+static PGresult *tablespacehdr;
+static SimpleOidList workerspid = {NULL, NULL};
+
/* Progress counters */
static uint64 totalsize_kb;
static uint64 totaldone;
@@ -141,7 +177,7 @@ static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
static void progress_report(int tablespacenum, const char *filename, bool force);
-static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
@@ -151,6 +187,16 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
static const char *get_tablespace_mapping(const char *dir);
static void tablespace_list_append(const char *arg);
+static void ParallelBackupEnd(void);
+static void GetBackupFilesList(PGconn *conn, BackupInfo *binfo);
+static int ReceiveFiles(BackupInfo *backupInfo, int worker);
+static int compareFileSize(const void *a, const void *b);
+static void create_workers_and_fetch(BackupInfo *backupInfo);
+static void read_label_tblspcmap(PGconn *conn, char **backup_label, char **tablespace_map);
+static void create_backup_dirs(bool basetablespace, char *tablespace, char *name);
+static void writefile(char *path, char *buf);
+static int simple_list_length(SimpleStringList *list);
+
static void
cleanup_directories_atexit(void)
@@ -349,6 +395,7 @@ usage(void)
printf(_(" --no-slot prevent creation of temporary replication slot\n"));
printf(_(" --no-verify-checksums\n"
" do not verify checksums\n"));
+ printf(_(" -j, --jobs=NUM use this many parallel jobs to backup\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -d, --dbname=CONNSTR connection string\n"));
@@ -921,7 +968,7 @@ writeTarData(
* No attempt to inspect or validate the contents of the file is done.
*/
static void
-ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
+ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker)
{
char filename[MAXPGPATH];
char *copybuf = NULL;
@@ -978,7 +1025,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ
if (compresslevel != 0)
{
- snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
+ if (numWorkers > 1)
+ snprintf(filename, sizeof(filename), "%s/base.%d.tar.gz", basedir, worker);
+ else
+ snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
ztarfile = gzopen(filename, "wb");
if (gzsetparams(ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
@@ -991,7 +1041,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
else
#endif
{
- snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
+ if (numWorkers > 1)
+ snprintf(filename, sizeof(filename), "%s/base.%d.tar", basedir, worker);
+ else
+ snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
tarfile = fopen(filename, "wb");
}
}
@@ -1004,8 +1057,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ
if (compresslevel != 0)
{
- snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
- PQgetvalue(res, rownum, 0));
+ if (numWorkers > 1)
+ snprintf(filename, sizeof(filename), "%s/%s.%d.tar.gz", basedir,
+ PQgetvalue(res, rownum, 0), worker);
+ else
+ snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
+ PQgetvalue(res, rownum, 0));
ztarfile = gzopen(filename, "wb");
if (gzsetparams(ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
@@ -1018,8 +1075,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
else
#endif
{
- snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
- PQgetvalue(res, rownum, 0));
+ if (numWorkers > 1)
+ snprintf(filename, sizeof(filename), "%s/%s.%d.tar", basedir,
+ PQgetvalue(res, rownum, 0), worker);
+ else
+ snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
+ PQgetvalue(res, rownum, 0));
tarfile = fopen(filename, "wb");
}
}
@@ -1082,6 +1143,45 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
MemSet(zerobuf, 0, sizeof(zerobuf));
+ if (numWorkers > 1 && basetablespace && worker == 0)
+ {
+ char header[512];
+ int padding;
+ int len;
+
+ /* add backup_label and tablespace_map files to the tar */
+ len = strlen(backupInfo->backup_label);
+ tarCreateHeader(header,
+ "backup_label",
+ NULL,
+ len,
+ pg_file_create_mode, 04000, 02000,
+ time(NULL));
+
+ padding = ((len + 511) & ~511) - len;
+ WRITE_TAR_DATA(header, sizeof(header));
+ WRITE_TAR_DATA(backupInfo->backup_label, len);
+ if (padding)
+ WRITE_TAR_DATA(zerobuf, padding);
+
+ if (backupInfo->tablespace_map)
+ {
+ len = strlen(backupInfo->tablespace_map);
+ tarCreateHeader(header,
+ "tablespace_map",
+ NULL,
+ len,
+ pg_file_create_mode, 04000, 02000,
+ time(NULL));
+
+ padding = ((len + 511) & ~511) - len;
+ WRITE_TAR_DATA(header, sizeof(header));
+ WRITE_TAR_DATA(backupInfo->tablespace_map, len);
+ if (padding)
+ WRITE_TAR_DATA(zerobuf, padding);
+ }
+ }
+
if (basetablespace && writerecoveryconf)
{
char header[512];
@@ -1475,6 +1575,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
*/
snprintf(filename, sizeof(filename), "%s/%s", current_path,
copybuf);
+
if (filename[strlen(filename) - 1] == '/')
{
/*
@@ -1486,21 +1587,14 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
* Directory
*/
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
+
+ /*
+ * In parallel mode, we create directories before fetching
+ * files so its Ok if a directory already exist.
+ */
if (mkdir(filename, pg_dir_create_mode) != 0)
{
- /*
- * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
- * clusters) will have been created by the wal
- * receiver process. Also, when the WAL directory
- * location was specified, pg_wal (or pg_xlog) has
- * already been created as a symbolic link before
- * starting the actual backup. So just ignore creation
- * failures on related directories.
- */
- if (!((pg_str_endswith(filename, "/pg_wal") ||
- pg_str_endswith(filename, "/pg_xlog") ||
- pg_str_endswith(filename, "/archive_status")) &&
- errno == EEXIST))
+ if (errno != EEXIST)
{
pg_log_error("could not create directory \"%s\": %m",
filename);
@@ -1528,8 +1622,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
* can map them too.)
*/
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
-
mapped_tblspc_path = get_tablespace_mapping(©buf[157]);
+
if (symlink(mapped_tblspc_path, filename) != 0)
{
pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
@@ -1716,7 +1810,8 @@ BaseBackup(void)
}
basebkp =
- psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+ psprintf("%s LABEL '%s' %s %s %s %s %s %s %s",
+ (numWorkers > 1) ? "START_BACKUP" : "BASE_BACKUP",
escaped_label,
showprogress ? "PROGRESS" : "",
includewal == FETCH_WAL ? "WAL" : "",
@@ -1774,7 +1869,7 @@ BaseBackup(void)
/*
* Get the header
*/
- res = PQgetResult(conn);
+ tablespacehdr = res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
pg_log_error("could not get backup header: %s",
@@ -1830,20 +1925,62 @@ BaseBackup(void)
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
- /*
- * Start receiving chunks
- */
- for (i = 0; i < PQntuples(res); i++)
+ if (numWorkers > 1)
{
- if (format == 't')
- ReceiveTarFile(conn, res, i);
- else
- ReceiveAndUnpackTarFile(conn, res, i);
- } /* Loop over all tablespaces */
+ backupInfo = palloc0(sizeof(BackupInfo));
+
+ backupInfo->tablespacecount = tablespacecount;
+ backupInfo->numWorkers = numWorkers;
+ strlcpy(backupInfo->xlogstart, xlogstart, sizeof(backupInfo->xlogstart));
+ read_label_tblspcmap(conn, &backupInfo->backup_label, &backupInfo->tablespace_map);
+
+ /* retrive backup files from server. **/
+ GetBackupFilesList(conn, backupInfo);
+
+ /*
+ * add backup_label in backup, (for tar format, ReceiveTarFile() will
+ * takecare of it).
+ */
+ if (format == 'p')
+ writefile("backup_label", backupInfo->backup_label);
+
+ /*
+ * The backup files list is already in descending order, distribute it
+ * to workers.
+ */
+ backupInfo->worker_files = palloc0(sizeof(SimpleStringList) * tablespacecount);
+ for (i = 0; i < backupInfo->tablespacecount; i++)
+ {
+ TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i];
+
+ backupInfo->worker_files[i] = palloc0(sizeof(SimpleStringList) * numWorkers);
+ for (int j = 0; j < curTsInfo->numFiles; j++)
+ {
+ simple_string_list_append(&backupInfo->worker_files[i][j % numWorkers],
+ curTsInfo->backupFiles[j].name);
+ }
+ }
+
+ create_workers_and_fetch(backupInfo);
+ ParallelBackupEnd();
+ }
+ else
+ {
+ /*
+ * Start receiving chunks
+ */
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ if (format == 't')
+ ReceiveTarFile(conn, res, i, 0);
+ else
+ ReceiveAndUnpackTarFile(conn, res, i);
+ } /* Loop over all tablespaces */
+ }
if (showprogress)
{
- progress_report(PQntuples(res), NULL, true);
+ progress_report(PQntuples(tablespacehdr), NULL, true);
if (isatty(fileno(stderr)))
fprintf(stderr, "\n"); /* Need to move to next line */
}
@@ -2043,6 +2180,7 @@ main(int argc, char **argv)
{"waldir", required_argument, NULL, 1},
{"no-slot", no_argument, NULL, 2},
{"no-verify-checksums", no_argument, NULL, 3},
+ {"jobs", required_argument, NULL, 'j'},
{NULL, 0, NULL, 0}
};
int c;
@@ -2070,7 +2208,7 @@ main(int argc, char **argv)
atexit(cleanup_directories_atexit);
- while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+ while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -2211,6 +2349,9 @@ main(int argc, char **argv)
case 3:
verify_checksums = false;
break;
+ case 'j': /* number of jobs */
+ numWorkers = atoi(optarg);
+ break;
default:
/*
@@ -2325,6 +2466,14 @@ main(int argc, char **argv)
}
}
+ if (numWorkers <= 0)
+ {
+ pg_log_error("invalid number of parallel jobs");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef HAVE_LIBZ
if (compresslevel != 0)
{
@@ -2397,3 +2546,367 @@ main(int argc, char **argv)
success = true;
return 0;
}
+
+static void
+ParallelBackupEnd(void)
+{
+ PGresult *res = NULL;
+ char *basebkp;
+
+ basebkp = psprintf("STOP_BACKUP LABEL '%s' %s %s",
+ backupInfo->backup_label,
+ includewal == FETCH_WAL ? "WAL" : "",
+ includewal == NO_WAL ? "" : "NOWAIT");
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not execute STOP BACKUP \"%s\"",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ /* receive pg_control and wal files */
+ if (format == 't')
+ ReceiveTarFile(conn, res, tablespacecount, numWorkers);
+ else
+ ReceiveAndUnpackTarFile(conn, res, tablespacecount);
+
+ PQclear(res);
+}
+
+static void
+GetBackupFilesList(PGconn *conn, BackupInfo *backupInfo)
+{
+ int i;
+ PGresult *res = NULL;
+ char *basebkp;
+
+ backupInfo->tsInfo = palloc0(sizeof(TablespaceInfo) * backupInfo->tablespacecount);
+ TablespaceInfo *tsInfo = backupInfo->tsInfo;
+
+ /*
+ * Get list of files.
+ */
+ basebkp = psprintf("SEND_FILE_LIST %s",
+ format == 't' ? "TABLESPACE_MAP" : "");
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not send replication command \"%s\": %s",
+ "SEND_FILE_LIST", PQerrorMessage(conn));
+ exit(1);
+ }
+
+ /*
+ * The list of files is grouped by tablespaces, and we want to fetch them
+ * in the same order.
+ */
+ for (i = 0; i < backupInfo->tablespacecount; i++)
+ {
+ bool basetablespace;
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not get backup header: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ if (PQntuples(res) < 1)
+ {
+ pg_log_error("no data returned from server");
+ exit(1);
+ }
+
+ basetablespace = PQgetisnull(tablespacehdr, i, 0);
+ tsInfo[i].tblspcOid = atol(PQgetvalue(tablespacehdr, i, 0));
+ tsInfo[i].tablespace = PQgetvalue(tablespacehdr, i, 1);
+ tsInfo[i].numFiles = PQntuples(res);
+ tsInfo[i].backupFiles =
+ palloc0(sizeof(BackupFile) * tsInfo[i].numFiles);
+
+ for (int j = 0; j < tsInfo[i].numFiles; j++)
+ {
+ char *name = PQgetvalue(res, j, 0);
+ char type = PQgetvalue(res, j, 1)[0];
+ int32 size = atol(PQgetvalue(res, j, 2));
+ time_t mtime = atol(PQgetvalue(res, j, 3));
+
+ /*
+ * In 'plain' format, create backup directories first.
+ */
+ if (format == 'p' && type == 'd')
+ create_backup_dirs(basetablespace, tsInfo[i].tablespace, name);
+
+ strlcpy(tsInfo[i].backupFiles[j].name, name, MAXPGPATH);
+ tsInfo[i].backupFiles[j].type = type;
+ tsInfo[i].backupFiles[j].size = size;
+ tsInfo[i].backupFiles[j].mtime = mtime;
+ }
+
+ /* sort files in descending order, based on size */
+ qsort(tsInfo[i].backupFiles, tsInfo[i].numFiles,
+ sizeof(BackupFile), &compareFileSize);
+ PQclear(res);
+ }
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data: %s", PQerrorMessage(conn));
+ exit(1);
+ }
+ res = PQgetResult(conn);
+}
+
+static int
+ReceiveFiles(BackupInfo *backupInfo, int worker)
+{
+ SimpleStringListCell *cell;
+ PGresult *res = NULL;
+ PGconn *worker_conn;
+ int i;
+
+ worker_conn = GetConnection();
+ for (i = 0; i < backupInfo->tablespacecount; i++)
+ {
+ TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i];
+ SimpleStringList *files = &backupInfo->worker_files[i][worker];
+ PQExpBuffer buf = createPQExpBuffer();
+
+ if (simple_list_length(files) <= 0)
+ continue;
+
+
+ /*
+ * build query in form of: SEND_FILES_CONTENT ('base/1/1245/32683',
+ * 'base/1/1245/32683', ...) [options]
+ */
+ appendPQExpBuffer(buf, "SEND_FILES_CONTENT (");
+ for (cell = files->head; cell; cell = cell->next)
+ {
+ if (cell != files->tail)
+ appendPQExpBuffer(buf, "'%s' ,", cell->val);
+ else
+ appendPQExpBuffer(buf, "'%s'", cell->val);
+ }
+ appendPQExpBufferStr(buf, ")");
+
+ /*
+ * Add backup options to the command. we are reusing the LABEL here to
+ * keep the original tablespace path on the server.
+ */
+ appendPQExpBuffer(buf, " LABEL '%s' LSN '%s' %s %s",
+ curTsInfo->tablespace,
+ backupInfo->xlogstart,
+ format == 't' ? "TABLESPACE_MAP" : "",
+ verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ if (maxrate > 0)
+ appendPQExpBuffer(buf, " MAX_RATE %u", maxrate);
+
+ if (!worker_conn)
+ return 1;
+
+ if (PQsendQuery(worker_conn, buf->data) == 0)
+ {
+ pg_log_error("could not send files list \"%s\"",
+ PQerrorMessage(worker_conn));
+ return 1;
+ }
+
+ destroyPQExpBuffer(buf);
+ if (format == 't')
+ ReceiveTarFile(worker_conn, tablespacehdr, i, worker);
+ else
+ ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, i);
+
+ res = PQgetResult(worker_conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data stream: %s",
+ PQerrorMessage(worker_conn));
+ exit(1);
+ }
+
+ res = PQgetResult(worker_conn);
+ }
+
+ PQclear(res);
+ PQfinish(worker_conn);
+
+ return 0;
+}
+
+/* qsort comparator for BackupFile (sort descending order) */
+static int
+compareFileSize(const void *a, const void *b)
+{
+ const BackupFile *v1 = (BackupFile *) a;
+ const BackupFile *v2 = (BackupFile *) b;
+
+ if (v1->size > v2->size)
+ return -1;
+ if (v1->size < v2->size)
+ return 1;
+
+ return 0;
+}
+
+static void
+create_workers_and_fetch(BackupInfo *backupInfo)
+{
+ int status;
+ int pid,
+ i;
+
+ for (i = 0; i < numWorkers; i++)
+ {
+ pid = fork();
+ if (pid == 0)
+ {
+ /* in child process */
+ _exit(ReceiveFiles(backupInfo, i));
+ }
+ else if (pid < 0)
+ {
+ pg_log_error("could not create backup worker: %m");
+ exit(1);
+ }
+
+ simple_oid_list_append(&workerspid, pid);
+ if (verbose)
+ pg_log_info("backup worker (%d) created", pid);
+
+ /*
+ * Else we are in the parent process and all is well.
+ */
+ }
+
+ for (i = 0; i < numWorkers; i++)
+ {
+ pid = waitpid(-1, &status, 0);
+
+ if (WIFEXITED(status) && WEXITSTATUS(status) == EXIT_FAILURE)
+ {
+ SimpleOidListCell *cell;
+
+ pg_log_error("backup worker (%d) failed with code %d", pid, WEXITSTATUS(status));
+
+ /* error. kill other workers and exit. */
+ for (cell = workerspid.head; cell; cell = cell->next)
+ {
+ if (pid != cell->val)
+ {
+ kill(cell->val, SIGTERM);
+ pg_log_error("backup worker killed %d", cell->val);
+ }
+ }
+
+ exit(1);
+ }
+ }
+}
+
+static void
+read_label_tblspcmap(PGconn *conn, char **backuplabel, char **tblspc_map)
+{
+ PGresult *res = NULL;
+
+ Assert(backuplabel != NULL);
+ Assert(tblspc_map != NULL);
+
+ /*
+ * Get Backup label and tablespace map data.
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not get data: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ if (PQntuples(res) < 1)
+ {
+ pg_log_error("no data returned from server");
+ exit(1);
+ }
+
+ *backuplabel = PQgetvalue(res, 0, 0); /* backup_label */
+ if (!PQgetisnull(res, 0, 1))
+ *tblspc_map = PQgetvalue(res, 0, 1); /* tablespae_map */
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ res = PQgetResult(conn);
+ PQclear(res);
+}
+
+static void
+create_backup_dirs(bool basetablespace, char *tablespace, char *name)
+{
+ char dirpath[MAXPGPATH];
+
+ Assert(name != NULL);
+
+ if (basetablespace)
+ snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name);
+ else
+ {
+ Assert(tablespace != NULL);
+ snprintf(dirpath, sizeof(dirpath), "%s/%s",
+ get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1));
+ }
+
+ if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0)
+ {
+ if (errno != EEXIST)
+ {
+ pg_log_error("could not create directory \"%s\": %m",
+ dirpath);
+ exit(1);
+ }
+ }
+}
+
+static void
+writefile(char *path, char *buf)
+{
+ FILE *f;
+ char pathbuf[MAXPGPATH];
+
+ snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path);
+ f = fopen(pathbuf, "w");
+ if (f == NULL)
+ {
+ pg_log_error("could not open file \"%s\": %m", pathbuf);
+ exit(1);
+ }
+
+ if (fwrite(buf, strlen(buf), 1, f) != 1)
+ {
+ pg_log_error("could not write to file \"%s\": %m", pathbuf);
+ exit(1);
+ }
+
+ if (fclose(f))
+ {
+ pg_log_error("could not write to file \"%s\": %m", pathbuf);
+ exit(1);
+ }
+}
+
+static int
+simple_list_length(SimpleStringList *list)
+{
+ int len = 0;
+ SimpleStringListCell *cell;
+
+ for (cell = list->head; cell; cell = cell->next, len++)
+ ;
+
+ return len;
+}
--
2.21.0 (Apple Git-122)