0001-parallel-backup.patch
application/octet-stream
Filename: 0001-parallel-backup.patch
Type: application/octet-stream
Part: 0
Message:
Re: WIP/PoC for parallel backup
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0001
Subject: parallel backup
| File | + | − |
|---|---|---|
| src/backend/access/transam/xlog.c | 1 | 1 |
| src/backend/replication/basebackup.c | 826 | 252 |
| src/backend/replication/repl_gram.y | 58 | 0 |
| src/backend/replication/repl_scanner.l | 5 | 0 |
| src/bin/pg_basebackup/pg_basebackup.c | 326 | 34 |
| src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl | 571 | 0 |
| src/include/nodes/replnodes.h | 9 | 0 |
| src/include/replication/basebackup.h | 1 | 1 |
From 8c29c68ff24413d8d01478080d9741b0b231d848 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Thu, 3 Oct 2019 23:41:55 +0500
Subject: [PATCH] parallel backup
---
src/backend/access/transam/xlog.c | 2 +-
src/backend/replication/basebackup.c | 1078 +++++++++++++----
src/backend/replication/repl_gram.y | 58 +
src/backend/replication/repl_scanner.l | 5 +
src/bin/pg_basebackup/pg_basebackup.c | 360 +++++-
.../t/040_pg_basebackup_parallel.pl | 571 +++++++++
src/include/nodes/replnodes.h | 9 +
src/include/replication/basebackup.h | 2 +-
8 files changed, 1797 insertions(+), 288 deletions(-)
create mode 100644 src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 790e2c8714..3dc2ebd7dc 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -10477,7 +10477,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
ti->oid = pstrdup(de->d_name);
ti->path = pstrdup(buflinkpath.data);
ti->rpath = relpath ? pstrdup(relpath) : NULL;
- ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+ ti->size = infotbssize ? sendTablespace(fullpath, true, NULL) : -1;
if (tablespaces)
*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index d0f210de8c..fe906dbfdf 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -52,11 +52,31 @@ typedef struct
bool includewal;
uint32 maxrate;
bool sendtblspcmapfile;
+ int32 worker;
} basebackup_options;
+typedef struct
+{
+ char path[MAXPGPATH];
+ bool isdir;
+ int32 size;
+} pathinfo;
+
+#define STORE_PATHINFO(_filenames, _path, _isdir, _size) \
+ do { \
+ if (files != NULL) { \
+ pathinfo *pi = palloc0(sizeof(pathinfo)); \
+ strlcpy(pi->path, _path, sizeof(pi->path)); \
+ pi->isdir = _isdir; \
+ pi->size = _size; \
+ *_filenames = lappend(*_filenames, pi); \
+ } \
+ } while(0)
static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
List *tablespaces, bool sendtblspclinks);
+static int64 sendDir_(const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks, List **files);
static bool sendFile(const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid);
static void sendFileWithContent(const char *filename, const char *content);
@@ -71,15 +91,26 @@ static void perform_base_backup(basebackup_options *opt);
static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const ListCell *a, const ListCell *b);
+static int compareFileSize(const ListCell *a, const ListCell *b);
static void throttle(size_t increment);
static bool is_checksummed_file(const char *fullpath, const char *filename);
+static void StartBackup(basebackup_options *opt);
+static void StopBackup(basebackup_options *opt);
+static void SendBackupFileList(basebackup_options *opt, List *tablespaces);
+static void SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok);
+static void include_wal_files(XLogRecPtr endptr, TimeLineID endtli);
+static void setup_throttle(int maxrate);
+static char *readfile(const char *readfilename, bool missing_ok);
+
/* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false;
/* Relative path of temporary statistics directory */
static char *statrelpath = NULL;
+#define BACKUP_LABEL_FILE_TMP BACKUP_LABEL_FILE ".tmp"
+#define TABLESPACE_MAP_TMP TABLESPACE_MAP ".tmp"
/*
* Size of each block sent into the tar stream for larger files.
*/
@@ -192,6 +223,14 @@ static const char *const excludeFiles[] =
BACKUP_LABEL_FILE,
TABLESPACE_MAP,
+ /*
+ * Skip backup_label.tmp or tablespace_map.tmp files. These are temporary
+ * and are injected into the backup by SendFilesList and
+ * SendFilesContents, will be removed after as well.
+ */
+ BACKUP_LABEL_FILE_TMP,
+ TABLESPACE_MAP_TMP,
+
"postmaster.pid",
"postmaster.opts",
@@ -294,28 +333,7 @@ perform_base_backup(basebackup_options *opt)
SendBackupHeader(tablespaces);
/* Setup and activate network throttling, if client requested it */
- if (opt->maxrate > 0)
- {
- throttling_sample =
- (int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
-
- /*
- * The minimum amount of time for throttling_sample bytes to be
- * transferred.
- */
- elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
-
- /* Enable throttling. */
- throttling_counter = 0;
-
- /* The 'real data' starts now (header was ignored). */
- throttled_last = GetCurrentTimestamp();
- }
- else
- {
- /* Disable throttling. */
- throttling_counter = -1;
- }
+ setup_throttle(opt->maxrate);
/* Send off our tablespaces one by one */
foreach(lc, tablespaces)
@@ -357,7 +375,7 @@ perform_base_backup(basebackup_options *opt)
sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
}
else
- sendTablespace(ti->path, false);
+ sendTablespace(ti->path, false, NULL);
/*
* If we're including WAL, and this is the main data directory we
@@ -384,227 +402,7 @@ perform_base_backup(basebackup_options *opt)
* We've left the last tar file "open", so we can now append the
* required WAL files to it.
*/
- char pathbuf[MAXPGPATH];
- XLogSegNo segno;
- XLogSegNo startsegno;
- XLogSegNo endsegno;
- struct stat statbuf;
- List *historyFileList = NIL;
- List *walFileList = NIL;
- char firstoff[MAXFNAMELEN];
- char lastoff[MAXFNAMELEN];
- DIR *dir;
- struct dirent *de;
- ListCell *lc;
- TimeLineID tli;
-
- /*
- * I'd rather not worry about timelines here, so scan pg_wal and
- * include all WAL files in the range between 'startptr' and 'endptr',
- * regardless of the timeline the file is stamped with. If there are
- * some spurious WAL files belonging to timelines that don't belong in
- * this server's history, they will be included too. Normally there
- * shouldn't be such files, but if there are, there's little harm in
- * including them.
- */
- XLByteToSeg(startptr, startsegno, wal_segment_size);
- XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size);
- XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
- XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size);
-
- dir = AllocateDir("pg_wal");
- while ((de = ReadDir(dir, "pg_wal")) != NULL)
- {
- /* Does it look like a WAL segment, and is it in the range? */
- if (IsXLogFileName(de->d_name) &&
- strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
- strcmp(de->d_name + 8, lastoff + 8) <= 0)
- {
- walFileList = lappend(walFileList, pstrdup(de->d_name));
- }
- /* Does it look like a timeline history file? */
- else if (IsTLHistoryFileName(de->d_name))
- {
- historyFileList = lappend(historyFileList, pstrdup(de->d_name));
- }
- }
- FreeDir(dir);
-
- /*
- * Before we go any further, check that none of the WAL segments we
- * need were removed.
- */
- CheckXLogRemoved(startsegno, ThisTimeLineID);
-
- /*
- * Sort the WAL filenames. We want to send the files in order from
- * oldest to newest, to reduce the chance that a file is recycled
- * before we get a chance to send it over.
- */
- list_sort(walFileList, compareWalFileNames);
-
- /*
- * There must be at least one xlog file in the pg_wal directory, since
- * we are doing backup-including-xlog.
- */
- if (walFileList == NIL)
- ereport(ERROR,
- (errmsg("could not find any WAL files")));
-
- /*
- * Sanity check: the first and last segment should cover startptr and
- * endptr, with no gaps in between.
- */
- XLogFromFileName((char *) linitial(walFileList),
- &tli, &segno, wal_segment_size);
- if (segno != startsegno)
- {
- char startfname[MAXFNAMELEN];
-
- XLogFileName(startfname, ThisTimeLineID, startsegno,
- wal_segment_size);
- ereport(ERROR,
- (errmsg("could not find WAL file \"%s\"", startfname)));
- }
- foreach(lc, walFileList)
- {
- char *walFileName = (char *) lfirst(lc);
- XLogSegNo currsegno = segno;
- XLogSegNo nextsegno = segno + 1;
-
- XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
- if (!(nextsegno == segno || currsegno == segno))
- {
- char nextfname[MAXFNAMELEN];
-
- XLogFileName(nextfname, ThisTimeLineID, nextsegno,
- wal_segment_size);
- ereport(ERROR,
- (errmsg("could not find WAL file \"%s\"", nextfname)));
- }
- }
- if (segno != endsegno)
- {
- char endfname[MAXFNAMELEN];
-
- XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size);
- ereport(ERROR,
- (errmsg("could not find WAL file \"%s\"", endfname)));
- }
-
- /* Ok, we have everything we need. Send the WAL files. */
- foreach(lc, walFileList)
- {
- char *walFileName = (char *) lfirst(lc);
- FILE *fp;
- char buf[TAR_SEND_SIZE];
- size_t cnt;
- pgoff_t len = 0;
-
- snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
- XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
-
- fp = AllocateFile(pathbuf, "rb");
- if (fp == NULL)
- {
- int save_errno = errno;
-
- /*
- * Most likely reason for this is that the file was already
- * removed by a checkpoint, so check for that to get a better
- * error message.
- */
- CheckXLogRemoved(segno, tli);
-
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m", pathbuf)));
- }
-
- if (fstat(fileno(fp), &statbuf) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file \"%s\": %m",
- pathbuf)));
- if (statbuf.st_size != wal_segment_size)
- {
- CheckXLogRemoved(segno, tli);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("unexpected WAL file size \"%s\"", walFileName)));
- }
-
- /* send the WAL file itself */
- _tarWriteHeader(pathbuf, NULL, &statbuf, false);
-
- while ((cnt = fread(buf, 1,
- Min(sizeof(buf), wal_segment_size - len),
- fp)) > 0)
- {
- CheckXLogRemoved(segno, tli);
- /* Send the chunk as a CopyData message */
- if (pq_putmessage('d', buf, cnt))
- ereport(ERROR,
- (errmsg("base backup could not send data, aborting backup")));
-
- len += cnt;
- throttle(cnt);
-
- if (len == wal_segment_size)
- break;
- }
-
- CHECK_FREAD_ERROR(fp, pathbuf);
-
- if (len != wal_segment_size)
- {
- CheckXLogRemoved(segno, tli);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("unexpected WAL file size \"%s\"", walFileName)));
- }
-
- /* wal_segment_size is a multiple of 512, so no need for padding */
-
- FreeFile(fp);
-
- /*
- * Mark file as archived, otherwise files can get archived again
- * after promotion of a new node. This is in line with
- * walreceiver.c always doing an XLogArchiveForceDone() after a
- * complete segment.
- */
- StatusFilePath(pathbuf, walFileName, ".done");
- sendFileWithContent(pathbuf, "");
- }
-
- /*
- * Send timeline history files too. Only the latest timeline history
- * file is required for recovery, and even that only if there happens
- * to be a timeline switch in the first WAL segment that contains the
- * checkpoint record, or if we're taking a base backup from a standby
- * server and the target timeline changes while the backup is taken.
- * But they are small and highly useful for debugging purposes, so
- * better include them all, always.
- */
- foreach(lc, historyFileList)
- {
- char *fname = lfirst(lc);
-
- snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
-
- if (lstat(pathbuf, &statbuf) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file \"%s\": %m", pathbuf)));
-
- sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid);
-
- /* unconditionally mark file as archived */
- StatusFilePath(pathbuf, fname, ".done");
- sendFileWithContent(pathbuf, "");
- }
+ include_wal_files(endptr, endtli);
/* Send CopyDone message for the last tar file */
pq_putemptymessage('c');
@@ -637,6 +435,24 @@ compareWalFileNames(const ListCell *a, const ListCell *b)
return strcmp(fna + 8, fnb + 8);
}
+/*
+ * list_sort comparison function, to compare size attribute of pathinfo
+ * in descending order.
+ */
+static int
+compareFileSize(const ListCell *a, const ListCell *b)
+{
+ pathinfo *fna = (pathinfo *) lfirst(a);
+ pathinfo *fnb = (pathinfo *) lfirst(b);
+
+ if (fna->size > fnb->size)
+ return -1;
+ if (fna->size < fnb->size)
+ return 1;
+ return 0;
+
+}
+
/*
* Parse the base backup options passed down by the parser
*/
@@ -652,8 +468,10 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_maxrate = false;
bool o_tablespace_map = false;
bool o_noverify_checksums = false;
+ bool o_worker = false;
MemSet(opt, 0, sizeof(*opt));
+ opt->worker = -1;
foreach(lopt, options)
{
DefElem *defel = (DefElem *) lfirst(lopt);
@@ -740,6 +558,16 @@ parse_basebackup_options(List *options, basebackup_options *opt)
noverify_checksums = true;
o_noverify_checksums = true;
}
+ else if (strcmp(defel->defname, "worker") == 0)
+ {
+ if (o_worker)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ opt->worker = intVal(defel->arg);
+ o_worker = true;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
@@ -774,7 +602,26 @@ SendBaseBackup(BaseBackupCmd *cmd)
set_ps_display(activitymsg, false);
}
- perform_base_backup(&opt);
+ switch (cmd->cmdtag)
+ {
+ case BASE_BACKUP:
+ perform_base_backup(&opt);
+ break;
+ case START_BACKUP:
+ StartBackup(&opt);
+ break;
+ case SEND_FILES_CONTENT:
+ SendFilesContents(&opt, cmd->backupfiles, true);
+ break;
+ case STOP_BACKUP:
+ StopBackup(&opt);
+ break;
+
+ default:
+ elog(ERROR, "unrecognized replication command tag: %u",
+ cmd->cmdtag);
+ break;
+ }
}
static void
@@ -968,7 +815,7 @@ sendFileWithContent(const char *filename, const char *content)
* Only used to send auxiliary tablespaces, not PGDATA.
*/
int64
-sendTablespace(char *path, bool sizeonly)
+sendTablespace(char *path, bool sizeonly, List **files)
{
int64 size;
char pathbuf[MAXPGPATH];
@@ -997,11 +844,11 @@ sendTablespace(char *path, bool sizeonly)
return 0;
}
+ STORE_PATHINFO(files, pathbuf, true, -1);
size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
sizeonly);
-
/* Send all the files in the tablespace version directory */
- size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true);
+ size += sendDir_(pathbuf, strlen(path), sizeonly, NIL, true, files);
return size;
}
@@ -1019,8 +866,16 @@ sendTablespace(char *path, bool sizeonly)
* as it will be sent separately in the tablespace_map file.
*/
static int64
-sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
- bool sendtblspclinks)
+sendDir(const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks)
+{
+ return sendDir_(path, basepathlen, sizeonly, tablespaces, sendtblspclinks, NULL);
+}
+
+/* Same as sendDir(), except that it also returns a list of filenames in PGDATA */
+static int64
+sendDir_(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
+ bool sendtblspclinks, List **files)
{
DIR *dir;
struct dirent *de;
@@ -1174,6 +1029,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+ STORE_PATHINFO(files, pathbuf, true, -1);
size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
excludeFound = true;
break;
@@ -1190,6 +1047,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
+
+ STORE_PATHINFO(files, pathbuf, true, -1);
size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
continue;
}
@@ -1211,6 +1070,9 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
sizeonly);
+ STORE_PATHINFO(files, pathbuf, true, -1);
+ STORE_PATHINFO(files, "./pg_wal/archive_status", true, -1);
+
continue; /* don't recurse into pg_wal */
}
@@ -1240,6 +1102,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
pathbuf)));
linkpath[rllen] = '\0';
+ STORE_PATHINFO(files, pathbuf, false, statbuf.st_size);
size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
&statbuf, sizeonly);
#else
@@ -1266,6 +1129,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
*/
size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
sizeonly);
+ STORE_PATHINFO(files, pathbuf, true, -1);
+
/*
* Call ourselves recursively for a directory, unless it happens
@@ -1296,13 +1161,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
skip_this_dir = true;
if (!skip_this_dir)
- size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
+ size += sendDir_(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, files);
}
else if (S_ISREG(statbuf.st_mode))
{
bool sent = false;
- if (!sizeonly)
+ STORE_PATHINFO(files, pathbuf, false, statbuf.st_size);
+
+ if (!sizeonly && files == NULL)
sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
@@ -1743,3 +1610,710 @@ throttle(size_t increment)
*/
throttled_last = GetCurrentTimestamp();
}
+
+/*
+ * In parallel mode, pg_stop_backup() is not called, nor are the files sent
+ * right away. Upon receiving the BASE_BACKUP call, it sends out a list of
+ * files in $PGDATA.
+ */
+static void
+StartBackup(basebackup_options *opt)
+{
+ TimeLineID starttli;
+ StringInfo labelfile;
+ StringInfo tblspc_map_file = NULL;
+ int datadirpathlen;
+ List *tablespaces = NIL;
+
+ datadirpathlen = strlen(DataDir);
+
+ backup_started_in_recovery = RecoveryInProgress();
+
+ labelfile = makeStringInfo();
+ tblspc_map_file = makeStringInfo();
+
+ total_checksum_failures = 0;
+
+ startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+ labelfile, &tablespaces,
+ tblspc_map_file,
+ opt->progress, opt->sendtblspcmapfile);
+
+ /*
+ * Once do_pg_start_backup has been called, ensure that any failure causes
+ * us to abort the backup so we don't "leak" a backup counter. For this
+ * reason, *all* functionality between do_pg_start_backup() and the end of
+ * do_pg_stop_backup() should be inside the error cleanup block!
+ */
+
+ PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+ {
+ tablespaceinfo *ti;
+ FILE *fp;
+
+ SendXlogRecPtrResult(startptr, starttli);
+
+ /*
+ * Calculate the relative path of temporary statistics directory in
+ * order to skip the files which are located in that directory later.
+ */
+ if (is_absolute_path(pgstat_stat_directory) &&
+ strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
+ statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
+ else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
+ statrelpath = psprintf("./%s", pgstat_stat_directory);
+ else
+ statrelpath = pgstat_stat_directory;
+
+ /* Add a node for the base directory at the end */
+ ti = palloc0(sizeof(tablespaceinfo));
+ ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+ tablespaces = lappend(tablespaces, ti);
+
+ /* Send tablespace header */
+ SendBackupHeader(tablespaces);
+
+ /* Setup and activate network throttling, if client requested it */
+ setup_throttle(opt->maxrate);
+
+ /*
+ * backup_label and tablespace_map are stored into temp files for
+ * their usage are a later stage i.e. during STOP_BACKUP or while
+ * transfering files to the client.
+ */
+ fp = AllocateFile(BACKUP_LABEL_FILE_TMP, "w");
+ if (!fp)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m",
+ BACKUP_LABEL_FILE_TMP)));
+ if (fwrite(labelfile->data, labelfile->len, 1, fp) != 1 ||
+ fflush(fp) != 0 ||
+ pg_fsync(fileno(fp)) != 0 ||
+ ferror(fp) ||
+ FreeFile(fp))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write file \"%s\": %m",
+ BACKUP_LABEL_FILE_TMP)));
+
+ if (opt->sendtblspcmapfile && tblspc_map_file->len > 0)
+ {
+ fp = AllocateFile(TABLESPACE_MAP_TMP, "w");
+ if (!fp)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m",
+ TABLESPACE_MAP_TMP)));
+ if (fwrite(tblspc_map_file->data, tblspc_map_file->len, 1, fp) != 1 ||
+ fflush(fp) != 0 ||
+ pg_fsync(fileno(fp)) != 0 ||
+ ferror(fp) ||
+ FreeFile(fp))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write file \"%s\": %m",
+ TABLESPACE_MAP_TMP)));
+ }
+
+ /* send out the list of file in $PGDATA */
+ SendBackupFileList(opt, tablespaces);
+ }
+ PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+}
+
+/*
+ * StopBackup() - ends a parallel backup
+ *
+ * The function is called in parallel mode. It ends a parallel backup session
+ * established by 'BASE_BACKUP PARALLEL' command.
+ */
+static void
+StopBackup(basebackup_options *opt)
+{
+ TimeLineID endtli;
+ XLogRecPtr endptr;
+ struct stat statbuf;
+ StringInfoData buf;
+ char *labelfile;
+
+ /* Setup and activate network throttling, if client requested it */
+ setup_throttle(opt->maxrate);
+
+ /* read backup_label file into buffer, we need it for do_pg_stop_backup */
+ labelfile = readfile(BACKUP_LABEL_FILE_TMP, false);
+
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+
+ /* ... and pg_control after everything else. */
+ if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ XLOG_CONTROL_FILE)));
+ sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+
+ /* stop backup */
+ endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
+
+ if (opt->includewal)
+ include_wal_files(endptr, endtli);
+
+ pq_putemptymessage('c'); /* CopyDone */
+ SendXlogRecPtrResult(endptr, endtli);
+
+ unlink(BACKUP_LABEL_FILE_TMP);
+ unlink(TABLESPACE_MAP_TMP);
+}
+
+/*
+ * SendBackupFileList() - sends a list of filenames of PGDATA
+ *
+ * The function collects a list of filenames, nessery for a full backup and sends
+ * this list to the client.
+ */
+static void
+SendBackupFileList(basebackup_options *opt, List *tablespaces)
+{
+ StringInfoData buf;
+ ListCell *lc;
+
+ foreach(lc, tablespaces)
+ {
+ List *filenames = NULL;
+ tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
+
+ if (ti->path == NULL)
+ sendDir_(".", 1, false, NIL, !opt->sendtblspcmapfile, &filenames);
+ else
+ sendTablespace(ti->path, false, &filenames);
+
+ /* sort the files in desending order, based on file size */
+ list_sort(filenames, compareFileSize);
+
+ /* Construct and send the list of filenames */
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 3); /* 1 field */
+
+ /* First field - file path */
+ pq_sendstring(&buf, "path");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, TEXTOID);
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Second field - is_dir */
+ pq_sendstring(&buf, "isdir");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, BOOLOID);
+ pq_sendint16(&buf, 1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - size */
+ pq_sendstring(&buf, "size");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ foreach(lc, filenames)
+ {
+ pathinfo *pi = (pathinfo *) lfirst(lc);
+ Size len;
+
+ /* Send one datarow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 3); /* number of columns */
+
+ /* send file name */
+ len = strlen(pi->path);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, pi->path, len);
+
+ /* send isdir */
+ pq_sendint32(&buf, 1);
+ pq_sendbytes(&buf, pi->isdir ? "t" : "f", 1);
+
+ /* send size */
+ send_int8_string(&buf, pi->size);
+
+ pq_endmessage(&buf);
+ }
+
+ pfree(filenames);
+ }
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * SendFilesContents() - sends the actual files to the caller
+ *
+ * The function sends the files over to the caller using the COPY protocol.
+ */
+static void
+SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok)
+{
+ StringInfoData buf;
+ char *labelfile;
+ ListCell *lc;
+ char startxlogfilename[MAXFNAMELEN];
+ bool basetablespace = true;
+ int basepathlen = 1;
+ char ch;
+ uint32 hi,
+ lo;
+
+ if (list_length(filenames) <= 0)
+ return;
+
+ total_checksum_failures = 0;
+
+ /* Setup and activate network throttling, if client requested it */
+ setup_throttle(opt->maxrate);
+
+ /*
+ * LABEL is reused here to identify the tablespace path on server. Its empty
+ * in case of 'base' tablespace.
+ */
+ if (is_absolute_path(opt->label))
+ {
+ basepathlen = strlen(opt->label);
+ basetablespace = false;
+ }
+
+ /* retrive the backup start location from backup_label file. */
+ labelfile = readfile(BACKUP_LABEL_FILE_TMP, false);
+ if (sscanf(labelfile, "START WAL LOCATION: %X/%X (file %24s)%c",
+ &hi, &lo, startxlogfilename,
+ &ch) != 4 || ch != '\n')
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE_TMP)));
+ startptr = ((uint64) hi) << 32 | lo;
+
+ /* Send CopyOutResponse message */
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+
+ if (opt->worker == 0 && basetablespace) /* 'base' tablespace */
+ {
+ /* Send BACKUP_LABEL_FILE file */
+ sendFileWithContent(BACKUP_LABEL_FILE, labelfile);
+
+ /* Send TABLESPACE_MAP file */
+ if (opt->sendtblspcmapfile)
+ {
+ char *mapfile = readfile(TABLESPACE_MAP_TMP, true);
+
+ if (mapfile)
+ {
+ sendFileWithContent(TABLESPACE_MAP, mapfile);
+ pfree(mapfile);
+ }
+ }
+ }
+
+ foreach(lc, filenames)
+ {
+ struct stat statbuf;
+ char *pathbuf;
+
+ pathbuf = (char *) strVal(lfirst(lc));
+ if (lstat(pathbuf, &statbuf) != 0)
+ {
+ if (errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file or directory \"%s\": %m",
+ pathbuf)));
+
+ /* If the file went away while scanning, it's not an error. */
+ continue;
+ }
+
+ /* Allow symbolic links in pg_tblspc only */
+ if (strstr(pathbuf, "./pg_tblspc") != NULL &&
+#ifndef WIN32
+ S_ISLNK(statbuf.st_mode)
+#else
+ pgwin32_is_junction(pathbuf)
+#endif
+ )
+ {
+ char linkpath[MAXPGPATH];
+ int rllen;
+
+ rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
+ if (rllen < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read symbolic link \"%s\": %m",
+ pathbuf)));
+ if (rllen >= sizeof(linkpath))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("symbolic link \"%s\" target is too long",
+ pathbuf)));
+ linkpath[rllen] = '\0';
+
+ _tarWriteHeader(pathbuf, linkpath, &statbuf, false);
+ }
+ else if (S_ISDIR(statbuf.st_mode))
+ {
+ _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+ }
+ else if (
+#ifndef WIN32
+ S_ISLNK(statbuf.st_mode)
+#else
+ pgwin32_is_junction(pathbuf)
+#endif
+ )
+ {
+ /*
+ * If symlink, write it as a directory. file symlinks only allowed
+ * in pg_tblspc
+ */
+ statbuf.st_mode = S_IFDIR | pg_dir_create_mode;
+ _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+ }
+ else
+ {
+ /* send file to client */
+ sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, InvalidOid);
+ }
+ }
+
+ pq_putemptymessage('c'); /* CopyDone */
+
+ /*
+ * Check for checksum failures. If there are failures across multiple
+ * processes it may not report totoal checksum count, but it will error
+ * out,terminating the backup.
+ */
+ if (total_checksum_failures)
+ {
+ if (total_checksum_failures > 1)
+ ereport(WARNING,
+ (errmsg("%lld total checksum verification failures", total_checksum_failures)));
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("checksum verification failure during base backup")));
+ }
+}
+
+static void
+include_wal_files(XLogRecPtr endptr, TimeLineID endtli)
+{
+ /*
+ * We've left the last tar file "open", so we can now append the required
+ * WAL files to it.
+ */
+ char pathbuf[MAXPGPATH];
+ XLogSegNo segno;
+ XLogSegNo startsegno;
+ XLogSegNo endsegno;
+ struct stat statbuf;
+ List *historyFileList = NIL;
+ List *walFileList = NIL;
+ char firstoff[MAXFNAMELEN];
+ char lastoff[MAXFNAMELEN];
+ DIR *dir;
+ struct dirent *de;
+ ListCell *lc;
+ TimeLineID tli;
+
+ /*
+ * I'd rather not worry about timelines here, so scan pg_wal and include
+ * all WAL files in the range between 'startptr' and 'endptr', regardless
+ * of the timeline the file is stamped with. If there are some spurious
+ * WAL files belonging to timelines that don't belong in this server's
+ * history, they will be included too. Normally there shouldn't be such
+ * files, but if there are, there's little harm in including them.
+ */
+ XLByteToSeg(startptr, startsegno, wal_segment_size);
+ XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size);
+ XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
+ XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size);
+
+ dir = AllocateDir("pg_wal");
+ while ((de = ReadDir(dir, "pg_wal")) != NULL)
+ {
+ /* Does it look like a WAL segment, and is it in the range? */
+ if (IsXLogFileName(de->d_name) &&
+ strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
+ strcmp(de->d_name + 8, lastoff + 8) <= 0)
+ {
+ walFileList = lappend(walFileList, pstrdup(de->d_name));
+ }
+ /* Does it look like a timeline history file? */
+ else if (IsTLHistoryFileName(de->d_name))
+ {
+ historyFileList = lappend(historyFileList, pstrdup(de->d_name));
+ }
+ }
+ FreeDir(dir);
+
+ /*
+ * Before we go any further, check that none of the WAL segments we need
+ * were removed.
+ */
+ CheckXLogRemoved(startsegno, ThisTimeLineID);
+
+ /*
+ * Sort the WAL filenames. We want to send the files in order from oldest
+ * to newest, to reduce the chance that a file is recycled before we get a
+ * chance to send it over.
+ */
+ list_sort(walFileList, compareWalFileNames);
+
+ /*
+ * There must be at least one xlog file in the pg_wal directory, since we
+ * are doing backup-including-xlog.
+ */
+ if (walFileList == NIL)
+ ereport(ERROR,
+ (errmsg("could not find any WAL files")));
+
+ /*
+ * Sanity check: the first and last segment should cover startptr and
+ * endptr, with no gaps in between.
+ */
+ XLogFromFileName((char *) linitial(walFileList),
+ &tli, &segno, wal_segment_size);
+ if (segno != startsegno)
+ {
+ char startfname[MAXFNAMELEN];
+
+ XLogFileName(startfname, ThisTimeLineID, startsegno,
+ wal_segment_size);
+ ereport(ERROR,
+ (errmsg("could not find WAL file \"%s\"", startfname)));
+ }
+ foreach(lc, walFileList)
+ {
+ char *walFileName = (char *) lfirst(lc);
+ XLogSegNo currsegno = segno;
+ XLogSegNo nextsegno = segno + 1;
+
+ XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
+ if (!(nextsegno == segno || currsegno == segno))
+ {
+ char nextfname[MAXFNAMELEN];
+
+ XLogFileName(nextfname, ThisTimeLineID, nextsegno,
+ wal_segment_size);
+ ereport(ERROR,
+ (errmsg("could not find WAL file \"%s\"", nextfname)));
+ }
+ }
+ if (segno != endsegno)
+ {
+ char endfname[MAXFNAMELEN];
+
+ XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size);
+ ereport(ERROR,
+ (errmsg("could not find WAL file \"%s\"", endfname)));
+ }
+
+ /* Ok, we have everything we need. Send the WAL files. */
+ foreach(lc, walFileList)
+ {
+ char *walFileName = (char *) lfirst(lc);
+ FILE *fp;
+ char buf[TAR_SEND_SIZE];
+ size_t cnt;
+ pgoff_t len = 0;
+
+ snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
+ XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
+
+ fp = AllocateFile(pathbuf, "rb");
+ if (fp == NULL)
+ {
+ int save_errno = errno;
+
+ /*
+ * Most likely reason for this is that the file was already
+ * removed by a checkpoint, so check for that to get a better
+ * error message.
+ */
+ CheckXLogRemoved(segno, tli);
+
+ errno = save_errno;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", pathbuf)));
+ }
+
+ if (fstat(fileno(fp), &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ pathbuf)));
+ if (statbuf.st_size != wal_segment_size)
+ {
+ CheckXLogRemoved(segno, tli);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected WAL file size \"%s\"", walFileName)));
+ }
+
+ /* send the WAL file itself */
+ _tarWriteHeader(pathbuf, NULL, &statbuf, false);
+
+ while ((cnt = fread(buf, 1,
+ Min(sizeof(buf), wal_segment_size - len),
+ fp)) > 0)
+ {
+ CheckXLogRemoved(segno, tli);
+ /* Send the chunk as a CopyData message */
+ if (pq_putmessage('d', buf, cnt))
+ ereport(ERROR,
+ (errmsg("base backup could not send data, aborting backup")));
+
+ len += cnt;
+ throttle(cnt);
+
+ if (len == wal_segment_size)
+ break;
+ }
+
+ if (len != wal_segment_size)
+ {
+ CheckXLogRemoved(segno, tli);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected WAL file size \"%s\"", walFileName)));
+ }
+
+ /* wal_segment_size is a multiple of 512, so no need for padding */
+
+ FreeFile(fp);
+
+ /*
+ * Mark file as archived, otherwise files can get archived again after
+ * promotion of a new node. This is in line with walreceiver.c always
+ * doing an XLogArchiveForceDone() after a complete segment.
+ */
+ StatusFilePath(pathbuf, walFileName, ".done");
+ sendFileWithContent(pathbuf, "");
+ }
+
+ /*
+ * Send timeline history files too. Only the latest timeline history file
+ * is required for recovery, and even that only if there happens to be a
+ * timeline switch in the first WAL segment that contains the checkpoint
+ * record, or if we're taking a base backup from a standby server and the
+ * target timeline changes while the backup is taken. But they are small
+ * and highly useful for debugging purposes, so better include them all,
+ * always.
+ */
+ foreach(lc, historyFileList)
+ {
+ char *fname = lfirst(lc);
+
+ snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
+
+ if (lstat(pathbuf, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m", pathbuf)));
+
+ sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid);
+
+ /* unconditionally mark file as archived */
+ StatusFilePath(pathbuf, fname, ".done");
+ sendFileWithContent(pathbuf, "");
+ }
+}
+
+/*
+ * Setup and activate network throttling, if client requested it
+ */
+static void
+setup_throttle(int maxrate)
+{
+ /* Setup and activate network throttling, if client requested it */
+ if (maxrate > 0)
+ {
+ throttling_sample =
+ (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
+
+ /*
+ * The minimum amount of time for throttling_sample bytes to be
+ * transferred.
+ */
+ elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
+
+ /* Enable throttling. */
+ throttling_counter = 0;
+
+ /* The 'real data' starts now (header was ignored). */
+ throttled_last = GetCurrentTimestamp();
+ }
+ else
+ {
+ /* Disable throttling. */
+ throttling_counter = -1;
+ }
+}
+
+static char *
+readfile(const char *readfilename, bool missing_ok)
+{
+ struct stat statbuf;
+ FILE *fp;
+ char *data;
+ int r;
+
+ if (stat(readfilename, &statbuf))
+ {
+ if (errno == ENOENT && missing_ok)
+ return NULL;
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ readfilename)));
+ }
+
+ fp = AllocateFile(readfilename, "r");
+ if (!fp)
+ {
+ if (errno == ENOENT && missing_ok)
+ return NULL;
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", readfilename)));
+ }
+
+ data = palloc(statbuf.st_size + 1);
+ r = fread(data, statbuf.st_size, 1, fp);
+ data[statbuf.st_size] = '\0';
+
+ /* Close the file */
+ if (r != 1 || ferror(fp) || FreeFile(fp))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m",
+ readfilename)));
+
+ return data;
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc4e8..88e384bf3c 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,6 +87,10 @@ static SQLCmd *make_sqlcmd(void);
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
+%token K_START_BACKUP
+%token K_SEND_FILES_CONTENT
+%token K_STOP_BACKUP
+%token K_WORKER
%type <node> command
%type <node> base_backup start_replication start_logical_replication
@@ -102,6 +106,8 @@ static SQLCmd *make_sqlcmd(void);
%type <boolval> opt_temporary
%type <list> create_slot_opt_list
%type <defelt> create_slot_opt
+%type <list> backup_files backup_files_list
+%type <node> backup_file
%%
@@ -162,6 +168,29 @@ base_backup:
{
BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
cmd->options = $2;
+ cmd->cmdtag = BASE_BACKUP;
+ $$ = (Node *) cmd;
+ }
+ | K_START_BACKUP base_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = START_BACKUP;
+ $$ = (Node *) cmd;
+ }
+ | K_SEND_FILES_CONTENT backup_files base_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $3;
+ cmd->cmdtag = SEND_FILES_CONTENT;
+ cmd->backupfiles = $2;
+ $$ = (Node *) cmd;
+ }
+ | K_STOP_BACKUP base_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = STOP_BACKUP;
$$ = (Node *) cmd;
}
;
@@ -214,6 +243,35 @@ base_backup_opt:
$$ = makeDefElem("noverify_checksums",
(Node *)makeInteger(true), -1);
}
+ | K_WORKER UCONST
+ {
+ $$ = makeDefElem("worker",
+ (Node *)makeInteger($2), -1);
+ }
+ ;
+
+backup_files:
+ '(' backup_files_list ')'
+ {
+ $$ = $2;
+ }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+backup_files_list:
+ backup_file
+ {
+ $$ = list_make1($1);
+ }
+ | backup_files_list ',' backup_file
+ {
+ $$ = lappend($1, $3);
+ }
+ ;
+
+backup_file:
+ SCONST { $$ = (Node *) makeString($1); }
;
create_replication_slot:
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb5f6..4836828c39 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,11 @@ EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
WAIT { return K_WAIT; }
+START_BACKUP { return K_START_BACKUP; }
+SEND_FILES_CONTENT { return K_SEND_FILES_CONTENT; }
+STOP_BACKUP { return K_STOP_BACKUP; }
+WORKER { return K_WORKER; }
+
"," { return ','; }
";" { return ';'; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 55ef13926d..5139dcbe03 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -41,6 +41,7 @@
#include "receivelog.h"
#include "replication/basebackup.h"
#include "streamutil.h"
+#include "fe_utils/simple_list.h"
#define ERRCODE_DATA_CORRUPTED "XX001"
@@ -57,6 +58,15 @@ typedef struct TablespaceList
TablespaceListCell *tail;
} TablespaceList;
+typedef struct WorkerFiles
+{
+ int num_files;
+ char *tspath;
+ SimpleStringList *worker_files;
+
+} WorkerFiles;
+
+
/*
* pg_xlog has been renamed to pg_wal in version 10. This version number
* should be compared with PQserverVersion().
@@ -110,6 +120,10 @@ static bool found_existing_xlogdir = false;
static bool made_tablespace_dirs = false;
static bool found_tablespace_dirs = false;
+static int numWorkers = 1;
+static PGresult *tablespacehdr;
+static SimpleOidList workerspid = {NULL, NULL};
+
/* Progress counters */
static uint64 totalsize_kb;
static uint64 totaldone;
@@ -141,7 +155,7 @@ static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
static void progress_report(int tablespacenum, const char *filename, bool force);
-static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
@@ -151,6 +165,10 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
static const char *get_tablespace_mapping(const char *dir);
static void tablespace_list_append(const char *arg);
+static void ParallelBackupEnd(void);
+static int ReceiveFiles(WorkerFiles * workerFiles, int worker);
+static void create_workers_and_fetch(WorkerFiles * workerFiles);
+static int simple_list_length(SimpleStringList *list);
static void
cleanup_directories_atexit(void)
@@ -349,6 +367,7 @@ usage(void)
printf(_(" --no-slot prevent creation of temporary replication slot\n"));
printf(_(" --no-verify-checksums\n"
" do not verify checksums\n"));
+ printf(_(" -j, --jobs=NUM use this many parallel jobs to backup\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -d, --dbname=CONNSTR connection string\n"));
@@ -921,7 +940,7 @@ writeTarData(
* No attempt to inspect or validate the contents of the file is done.
*/
static void
-ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
+ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker)
{
char filename[MAXPGPATH];
char *copybuf = NULL;
@@ -978,7 +997,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ
if (compresslevel != 0)
{
- snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
+ if (numWorkers > 1)
+ snprintf(filename, sizeof(filename), "%s/base.%d.tar.gz", basedir, worker);
+ else
+ snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
ztarfile = gzopen(filename, "wb");
if (gzsetparams(ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
@@ -991,7 +1013,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
else
#endif
{
- snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
+ if (numWorkers > 1)
+ snprintf(filename, sizeof(filename), "%s/base.%d.tar", basedir, worker);
+ else
+ snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
tarfile = fopen(filename, "wb");
}
}
@@ -1004,8 +1029,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ
if (compresslevel != 0)
{
- snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
- PQgetvalue(res, rownum, 0));
+ if (numWorkers > 1)
+ snprintf(filename, sizeof(filename), "%s/%s.%d.tar.gz", basedir,
+ PQgetvalue(res, rownum, 0), worker);
+ else
+ snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
+ PQgetvalue(res, rownum, 0));
ztarfile = gzopen(filename, "wb");
if (gzsetparams(ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
@@ -1018,8 +1047,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
else
#endif
{
- snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
- PQgetvalue(res, rownum, 0));
+ if (numWorkers > 1)
+ snprintf(filename, sizeof(filename), "%s/%s.%d.tar", basedir,
+ PQgetvalue(res, rownum, 0), worker);
+ else
+ snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
+ PQgetvalue(res, rownum, 0));
tarfile = fopen(filename, "wb");
}
}
@@ -1475,6 +1508,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
*/
snprintf(filename, sizeof(filename), "%s/%s", current_path,
copybuf);
+
if (filename[strlen(filename) - 1] == '/')
{
/*
@@ -1486,21 +1520,14 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
* Directory
*/
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
+
+ /*
+ * In parallel mode, we create directories before fetching
+ * files so its Ok if a directory already exist.
+ */
if (mkdir(filename, pg_dir_create_mode) != 0)
{
- /*
- * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
- * clusters) will have been created by the wal
- * receiver process. Also, when the WAL directory
- * location was specified, pg_wal (or pg_xlog) has
- * already been created as a symbolic link before
- * starting the actual backup. So just ignore creation
- * failures on related directories.
- */
- if (!((pg_str_endswith(filename, "/pg_wal") ||
- pg_str_endswith(filename, "/pg_xlog") ||
- pg_str_endswith(filename, "/archive_status")) &&
- errno == EEXIST))
+ if (errno != EEXIST)
{
pg_log_error("could not create directory \"%s\": %m",
filename);
@@ -1528,8 +1555,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
* can map them too.)
*/
filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
-
mapped_tblspc_path = get_tablespace_mapping(©buf[157]);
+
if (symlink(mapped_tblspc_path, filename) != 0)
{
pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
@@ -1716,7 +1743,8 @@ BaseBackup(void)
}
basebkp =
- psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+ psprintf("%s LABEL '%s' %s %s %s %s %s %s %s",
+ (numWorkers > 1) ? "START_BACKUP" : "BASE_BACKUP",
escaped_label,
showprogress ? "PROGRESS" : "",
includewal == FETCH_WAL ? "WAL" : "",
@@ -1830,20 +1858,102 @@ BaseBackup(void)
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
- /*
- * Start receiving chunks
- */
- for (i = 0; i < PQntuples(res); i++)
+ if (numWorkers > 1)
{
- if (format == 't')
- ReceiveTarFile(conn, res, i);
- else
- ReceiveAndUnpackTarFile(conn, res, i);
- } /* Loop over all tablespaces */
+ WorkerFiles *workerFiles = palloc0(sizeof(WorkerFiles) * tablespacecount);
+
+ tablespacehdr = res;
+
+ for (i = 0; i < tablespacecount; i++)
+ {
+ bool basetablespace;
+
+ workerFiles[i].worker_files = palloc0(sizeof(SimpleStringList) * numWorkers);
+
+ /*
+ * Get the header
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not get backup header: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ if (PQntuples(res) < 1)
+ {
+ pg_log_error("no data returned from server");
+ exit(1);
+ }
+
+ basetablespace = PQgetisnull(tablespacehdr, i, 0);
+ workerFiles[i].tspath = PQgetvalue(tablespacehdr, i, 1);
+ workerFiles[i].num_files = 0;
+
+ for (int j = 0; j < PQntuples(res); j++)
+ {
+ const char *path = PQgetvalue(res, j, 0);
+ bool isdir = PQgetvalue(res, j, 1)[0] == 't';
+
+ if (format == 'p' && isdir)
+ {
+ char dirpath[MAXPGPATH];
+
+ if (basetablespace)
+ snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, path);
+ else
+ {
+ const char *tspath = PQgetvalue(tablespacehdr, i, 1);
+
+ snprintf(dirpath, sizeof(dirpath), "%s/%s",
+ get_tablespace_mapping(tspath), (path + strlen(tspath) + 1));
+ }
+
+ if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0)
+ {
+ if (errno != EEXIST)
+ {
+ pg_log_error("could not create directory \"%s\": %m",
+ dirpath);
+ exit(1);
+ }
+ }
+ }
+
+ workerFiles[i].num_files++;
+ simple_string_list_append(&workerFiles[i].worker_files[j % numWorkers], path);
+ }
+ PQclear(res);
+ }
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data: %s", PQerrorMessage(conn));
+ exit(1);
+ }
+
+ res = PQgetResult(conn);
+ create_workers_and_fetch(workerFiles);
+ ParallelBackupEnd();
+ }
+ else
+ {
+ /*
+ * Start receiving chunks
+ */
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ if (format == 't')
+ ReceiveTarFile(conn, res, i, 0);
+ else
+ ReceiveAndUnpackTarFile(conn, res, i);
+ } /* Loop over all tablespaces */
+ }
if (showprogress)
{
- progress_report(PQntuples(res), NULL, true);
+ progress_report(PQntuples(tablespacehdr), NULL, true);
if (isatty(fileno(stderr)))
fprintf(stderr, "\n"); /* Need to move to next line */
}
@@ -2043,6 +2153,7 @@ main(int argc, char **argv)
{"waldir", required_argument, NULL, 1},
{"no-slot", no_argument, NULL, 2},
{"no-verify-checksums", no_argument, NULL, 3},
+ {"jobs", required_argument, NULL, 'j'},
{NULL, 0, NULL, 0}
};
int c;
@@ -2070,7 +2181,7 @@ main(int argc, char **argv)
atexit(cleanup_directories_atexit);
- while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+ while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -2211,6 +2322,9 @@ main(int argc, char **argv)
case 3:
verify_checksums = false;
break;
+ case 'j': /* number of jobs */
+ numWorkers = atoi(optarg);
+ break;
default:
/*
@@ -2325,6 +2439,14 @@ main(int argc, char **argv)
}
}
+ if (numWorkers <= 0)
+ {
+ pg_log_error("invalid number of parallel jobs");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef HAVE_LIBZ
if (compresslevel != 0)
{
@@ -2397,3 +2519,173 @@ main(int argc, char **argv)
success = true;
return 0;
}
+
+static void
+ParallelBackupEnd(void)
+{
+ PGresult *res = NULL;
+ char *basebkp;
+
+ basebkp = psprintf("STOP_BACKUP %s %s",
+ includewal == FETCH_WAL ? "WAL" : "",
+ includewal == NO_WAL ? "" : "NOWAIT");
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not execute STOP BACKUP \"%s\"",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ /* receive pg_control and wal files */
+ if (format == 't')
+ ReceiveTarFile(conn, res, tablespacecount, numWorkers);
+ else
+ ReceiveAndUnpackTarFile(conn, res, tablespacecount);
+
+ PQclear(res);
+}
+
+static int
+ReceiveFiles(WorkerFiles * workerFiles, int worker)
+{
+ SimpleStringListCell *cell;
+ PGresult *res = NULL;
+ PGconn *worker_conn;
+ int i;
+
+ worker_conn = GetConnection();
+ for (i = 0; i < tablespacecount; i++)
+ {
+ SimpleStringList *files = &workerFiles[i].worker_files[worker];
+ PQExpBuffer buf = createPQExpBuffer();
+
+ if (simple_list_length(files) <= 0)
+ continue;
+
+
+ /*
+ * build query in form of: SEND_FILES_CONTENT ('base/1/1245/32683',
+ * 'base/1/1245/32683', ...) [options]
+ */
+ appendPQExpBuffer(buf, "SEND_FILES_CONTENT (");
+ for (cell = files->head; cell; cell = cell->next)
+ {
+ if (cell != files->tail)
+ appendPQExpBuffer(buf, "'%s' ,", cell->val);
+ else
+ appendPQExpBuffer(buf, "'%s'", cell->val);
+ }
+ appendPQExpBufferStr(buf, " )");
+
+ /*
+ * Add backup options to the command. we are reusing the LABEL here to
+ * keep the original tablespace path on the server.
+ */
+ appendPQExpBuffer(buf, " LABEL '%s' WORKER %u %s %s",
+ workerFiles[i].tspath,
+ worker,
+ format == 't' ? "TABLESPACE_MAP" : "",
+ verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ if (maxrate > 0)
+ appendPQExpBuffer(buf, " MAX_RATE %u", maxrate);
+
+ if (!worker_conn)
+ return 1;
+
+ if (PQsendQuery(worker_conn, buf->data) == 0)
+ {
+ pg_log_error("could not send files list \"%s\"",
+ PQerrorMessage(worker_conn));
+ return 1;
+ }
+ destroyPQExpBuffer(buf);
+
+ if (format == 't')
+ ReceiveTarFile(worker_conn, tablespacehdr, i, worker);
+ else
+ ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, i);
+
+ res = PQgetResult(worker_conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_error("could not get data stream: %s",
+ PQerrorMessage(worker_conn));
+ exit(1);
+ }
+
+ res = PQgetResult(worker_conn);
+ }
+
+ PQclear(res);
+ PQfinish(worker_conn);
+
+ return 0;
+}
+
+static void
+create_workers_and_fetch(WorkerFiles * workerFiles)
+{
+ int status;
+ int pid,
+ i;
+
+ for (i = 0; i < numWorkers; i++)
+ {
+ pid = fork();
+ if (pid == 0)
+ {
+ /* in child process */
+ _exit(ReceiveFiles(workerFiles, i));
+ }
+ else if (pid < 0)
+ {
+ pg_log_error("could not create backup worker: %m");
+ exit(1);
+ }
+
+ simple_oid_list_append(&workerspid, pid);
+ if (verbose)
+ pg_log_info("backup worker (%d) created", pid);
+
+ /*
+ * Else we are in the parent process and all is well.
+ */
+ }
+
+ for (i = 0; i < numWorkers; i++)
+ {
+ pid = waitpid(-1, &status, 0);
+
+ if (WIFEXITED(status) && WEXITSTATUS(status) == EXIT_FAILURE)
+ {
+ SimpleOidListCell *cell;
+
+ pg_log_error("backup worker (%d) failed with code %d", pid, WEXITSTATUS(status));
+
+ /* error. kill other workers and exit. */
+ for (cell = workerspid.head; cell; cell = cell->next)
+ {
+ if (pid != cell->val)
+ {
+ kill(cell->val, SIGTERM);
+ pg_log_error("backup worker killed %d", cell->val);
+ }
+ }
+
+ exit(1);
+ }
+ }
+}
+
+
+static int
+simple_list_length(SimpleStringList *list)
+{
+ int len = 0;
+ SimpleStringListCell *cell;
+
+ for (cell = list->head; cell; cell = cell->next, len++)
+ ;
+
+ return len;
+}
diff --git a/src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl b/src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl
new file mode 100644
index 0000000000..6c31214f3d
--- /dev/null
+++ b/src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl
@@ -0,0 +1,571 @@
+use strict;
+use warnings;
+use Cwd;
+use Config;
+use File::Basename qw(basename dirname);
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 106;
+
+program_help_ok('pg_basebackup');
+program_version_ok('pg_basebackup');
+program_options_handling_ok('pg_basebackup');
+
+my $tempdir = TestLib::tempdir;
+
+my $node = get_new_node('main');
+
+# Set umask so test directories and files are created with default permissions
+umask(0077);
+
+# Initialize node without replication settings
+$node->init(extra => ['--data-checksums']);
+$node->start;
+my $pgdata = $node->data_dir;
+
+$node->command_fails(['pg_basebackup'],
+ 'pg_basebackup needs target directory specified');
+
+# Some Windows ANSI code pages may reject this filename, in which case we
+# quietly proceed without this bit of test coverage.
+if (open my $badchars, '>>', "$tempdir/pgdata/FOO\xe0\xe0\xe0BAR")
+{
+ print $badchars "test backup of file with non-UTF8 name\n";
+ close $badchars;
+}
+
+$node->set_replication_conf();
+system_or_bail 'pg_ctl', '-D', $pgdata, 'reload';
+
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/backup" ],
+ 'pg_basebackup fails because of WAL configuration');
+
+ok(!-d "$tempdir/backup", 'backup directory was cleaned up');
+
+# Create a backup directory that is not empty so the next command will fail
+# but leave the data directory behind
+mkdir("$tempdir/backup")
+ or BAIL_OUT("unable to create $tempdir/backup");
+append_to_file("$tempdir/backup/dir-not-empty.txt", "Some data");
+
+$node->command_fails([ 'pg_basebackup', '-D', "$tempdir/backup", '-n' ],
+ 'failing run with no-clean option');
+
+ok(-d "$tempdir/backup", 'backup directory was created and left behind');
+rmtree("$tempdir/backup");
+
+open my $conf, '>>', "$pgdata/postgresql.conf";
+print $conf "max_replication_slots = 10\n";
+print $conf "max_wal_senders = 10\n";
+print $conf "wal_level = replica\n";
+close $conf;
+$node->restart;
+
+# Write some files to test that they are not copied.
+foreach my $filename (
+ qw(backup_label tablespace_map postgresql.auto.conf.tmp current_logfiles.tmp)
+ )
+{
+ open my $file, '>>', "$pgdata/$filename";
+ print $file "DONOTCOPY";
+ close $file;
+}
+
+# Connect to a database to create global/pg_internal.init. If this is removed
+# the test to ensure global/pg_internal.init is not copied will return a false
+# positive.
+$node->safe_psql('postgres', 'SELECT 1;');
+
+# Create an unlogged table to test that forks other than init are not copied.
+$node->safe_psql('postgres', 'CREATE UNLOGGED TABLE base_unlogged (id int)');
+
+my $baseUnloggedPath = $node->safe_psql('postgres',
+ q{select pg_relation_filepath('base_unlogged')});
+
+# Make sure main and init forks exist
+ok(-f "$pgdata/${baseUnloggedPath}_init", 'unlogged init fork in base');
+ok(-f "$pgdata/$baseUnloggedPath", 'unlogged main fork in base');
+
+# Create files that look like temporary relations to ensure they are ignored.
+my $postgresOid = $node->safe_psql('postgres',
+ q{select oid from pg_database where datname = 'postgres'});
+
+my @tempRelationFiles =
+ qw(t999_999 t9999_999.1 t999_9999_vm t99999_99999_vm.1);
+
+foreach my $filename (@tempRelationFiles)
+{
+ append_to_file("$pgdata/base/$postgresOid/$filename", 'TEMP_RELATION');
+}
+
+# Run base backup in parallel mode.
+$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/backup", '-X', 'none', "-j 4" ],
+ 'pg_basebackup runs');
+ok(-f "$tempdir/backup/PG_VERSION", 'backup was created');
+
+# Permissions on backup should be default
+SKIP:
+{
+ skip "unix-style permissions not supported on Windows", 1
+ if ($windows_os);
+
+ ok(check_mode_recursive("$tempdir/backup", 0700, 0600),
+ "check backup dir permissions");
+}
+
+# Only archive_status directory should be copied in pg_wal/.
+is_deeply(
+ [ sort(slurp_dir("$tempdir/backup/pg_wal/")) ],
+ [ sort qw(. .. archive_status) ],
+ 'no WAL files copied');
+
+# Contents of these directories should not be copied.
+foreach my $dirname (
+ qw(pg_dynshmem pg_notify pg_replslot pg_serial pg_snapshots pg_stat_tmp pg_subtrans)
+ )
+{
+ is_deeply(
+ [ sort(slurp_dir("$tempdir/backup/$dirname/")) ],
+ [ sort qw(. ..) ],
+ "contents of $dirname/ not copied");
+}
+
+# These files should not be copied.
+foreach my $filename (
+ qw(postgresql.auto.conf.tmp postmaster.opts postmaster.pid tablespace_map current_logfiles.tmp
+ global/pg_internal.init))
+{
+ ok(!-f "$tempdir/backup/$filename", "$filename not copied");
+}
+
+# Unlogged relation forks other than init should not be copied
+ok(-f "$tempdir/backup/${baseUnloggedPath}_init",
+ 'unlogged init fork in backup');
+ok( !-f "$tempdir/backup/$baseUnloggedPath",
+ 'unlogged main fork not in backup');
+
+# Temp relations should not be copied.
+foreach my $filename (@tempRelationFiles)
+{
+ ok( !-f "$tempdir/backup/base/$postgresOid/$filename",
+ "base/$postgresOid/$filename not copied");
+}
+
+# Make sure existing backup_label was ignored.
+isnt(slurp_file("$tempdir/backup/backup_label"),
+ 'DONOTCOPY', 'existing backup_label not copied');
+rmtree("$tempdir/backup");
+
+$node->command_ok(
+ [
+ 'pg_basebackup', '-D', "$tempdir/backup2", '--waldir',
+ "$tempdir/xlog2", "-j 4"
+ ],
+ 'separate xlog directory');
+ok(-f "$tempdir/backup2/PG_VERSION", 'backup was created');
+ok(-d "$tempdir/xlog2/", 'xlog directory was created');
+rmtree("$tempdir/backup2");
+rmtree("$tempdir/xlog2");
+
+$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/tarbackup", '-Ft', "-j 4"],
+ 'tar format');
+foreach my $filename (
+ qw(base.0.tar base.1.tar base.2.tar base.3.tar))
+{
+ ok(!-f "$tempdir/backup/$filename", "backup $filename tar created");
+}
+
+rmtree("$tempdir/tarbackup");
+
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-T=/foo" ],
+ '-T with empty old directory fails');
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-T/foo=" ],
+ '-T with empty new directory fails');
+$node->command_fails(
+ [
+ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4",
+ "-T/foo=/bar=/baz"
+ ],
+ '-T with multiple = fails');
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-Tfoo=/bar" ],
+ '-T with old directory not absolute fails');
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-T/foo=bar" ],
+ '-T with new directory not absolute fails');
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-Tfoo" ],
+ '-T with invalid format fails');
+
+# Tar format doesn't support filenames longer than 100 bytes.
+my $superlongname = "superlongname_" . ("x" x 100);
+my $superlongpath = "$pgdata/$superlongname";
+
+open my $file, '>', "$superlongpath"
+ or die "unable to create file $superlongpath";
+close $file;
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/tarbackup_l1", '-Ft', "-j 4" ],
+ 'pg_basebackup tar with long name fails');
+unlink "$pgdata/$superlongname";
+
+
+# The following tests test symlinks. Windows doesn't have symlinks, so
+# skip on Windows.
+SKIP:
+{
+ skip "symlinks not supported on Windows", 18 if ($windows_os);
+
+ # Move pg_replslot out of $pgdata and create a symlink to it.
+ $node->stop;
+
+ # Set umask so test directories and files are created with group permissions
+ umask(0027);
+
+ # Enable group permissions on PGDATA
+ chmod_recursive("$pgdata", 0750, 0640);
+
+ rename("$pgdata/pg_replslot", "$tempdir/pg_replslot")
+ or BAIL_OUT "could not move $pgdata/pg_replslot";
+ symlink("$tempdir/pg_replslot", "$pgdata/pg_replslot")
+ or BAIL_OUT "could not symlink to $pgdata/pg_replslot";
+
+ $node->start;
+
+ # Create a temporary directory in the system location and symlink it
+ # to our physical temp location. That way we can use shorter names
+ # for the tablespace directories, which hopefully won't run afoul of
+ # the 99 character length limit.
+ my $shorter_tempdir = TestLib::tempdir_short . "/tempdir";
+ symlink "$tempdir", $shorter_tempdir;
+
+ mkdir "$tempdir/tblspc1";
+ $node->safe_psql('postgres',
+ "CREATE TABLESPACE tblspc1 LOCATION '$shorter_tempdir/tblspc1';");
+ $node->safe_psql('postgres',
+ "CREATE TABLE test1 (a int) TABLESPACE tblspc1;");
+ $node->command_ok([ 'pg_basebackup', '-D', "$tempdir/tarbackup2", '-Ft', "-j 4" ],
+ 'tar format with tablespaces');
+ ok(-f "$tempdir/tarbackup2/base.0.tar", 'backup tar was created');
+ my @tblspc_tars = glob "$tempdir/tarbackup2/[0-9]*.tar";
+ is(scalar(@tblspc_tars), 3, 'one tablespace tar was created');
+ rmtree("$tempdir/tarbackup2");
+
+ # Create an unlogged table to test that forks other than init are not copied.
+ $node->safe_psql('postgres',
+ 'CREATE UNLOGGED TABLE tblspc1_unlogged (id int) TABLESPACE tblspc1;'
+ );
+
+ my $tblspc1UnloggedPath = $node->safe_psql('postgres',
+ q{select pg_relation_filepath('tblspc1_unlogged')});
+
+ # Make sure main and init forks exist
+ ok( -f "$pgdata/${tblspc1UnloggedPath}_init",
+ 'unlogged init fork in tablespace');
+ ok(-f "$pgdata/$tblspc1UnloggedPath", 'unlogged main fork in tablespace');
+
+ # Create files that look like temporary relations to ensure they are ignored
+ # in a tablespace.
+ my @tempRelationFiles = qw(t888_888 t888888_888888_vm.1);
+ my $tblSpc1Id = basename(
+ dirname(
+ dirname(
+ $node->safe_psql(
+ 'postgres', q{select pg_relation_filepath('test1')}))));
+
+ foreach my $filename (@tempRelationFiles)
+ {
+ append_to_file(
+ "$shorter_tempdir/tblspc1/$tblSpc1Id/$postgresOid/$filename",
+ 'TEMP_RELATION');
+ }
+
+ $node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/backup1", '-Fp', "-j 4" ],
+ 'plain format with tablespaces fails without tablespace mapping');
+
+ $node->command_ok(
+ [
+ 'pg_basebackup', '-D', "$tempdir/backup1", '-Fp', "-j 4",
+ "-T$shorter_tempdir/tblspc1=$tempdir/tbackup/tblspc1"
+ ],
+ 'plain format with tablespaces succeeds with tablespace mapping');
+ ok(-d "$tempdir/tbackup/tblspc1", 'tablespace was relocated');
+ opendir(my $dh, "$pgdata/pg_tblspc") or die;
+ ok( ( grep {
+ -l "$tempdir/backup1/pg_tblspc/$_"
+ and readlink "$tempdir/backup1/pg_tblspc/$_" eq
+ "$tempdir/tbackup/tblspc1"
+ } readdir($dh)),
+ "tablespace symlink was updated");
+ closedir $dh;
+
+ # Group access should be enabled on all backup files
+ ok(check_mode_recursive("$tempdir/backup1", 0750, 0640),
+ "check backup dir permissions");
+
+ # Unlogged relation forks other than init should not be copied
+ my ($tblspc1UnloggedBackupPath) =
+ $tblspc1UnloggedPath =~ /[^\/]*\/[^\/]*\/[^\/]*$/g;
+
+ ok(-f "$tempdir/tbackup/tblspc1/${tblspc1UnloggedBackupPath}_init",
+ 'unlogged init fork in tablespace backup');
+ ok(!-f "$tempdir/tbackup/tblspc1/$tblspc1UnloggedBackupPath",
+ 'unlogged main fork not in tablespace backup');
+
+ # Temp relations should not be copied.
+ foreach my $filename (@tempRelationFiles)
+ {
+ ok( !-f "$tempdir/tbackup/tblspc1/$tblSpc1Id/$postgresOid/$filename",
+ "[tblspc1]/$postgresOid/$filename not copied");
+
+ # Also remove temp relation files or tablespace drop will fail.
+ my $filepath =
+ "$shorter_tempdir/tblspc1/$tblSpc1Id/$postgresOid/$filename";
+
+ unlink($filepath)
+ or BAIL_OUT("unable to unlink $filepath");
+ }
+
+ ok( -d "$tempdir/backup1/pg_replslot",
+ 'pg_replslot symlink copied as directory');
+ rmtree("$tempdir/backup1");
+
+ mkdir "$tempdir/tbl=spc2";
+ $node->safe_psql('postgres', "DROP TABLE test1;");
+ $node->safe_psql('postgres', "DROP TABLE tblspc1_unlogged;");
+ $node->safe_psql('postgres', "DROP TABLESPACE tblspc1;");
+ $node->safe_psql('postgres',
+ "CREATE TABLESPACE tblspc2 LOCATION '$shorter_tempdir/tbl=spc2';");
+ $node->command_ok(
+ [
+ 'pg_basebackup', '-D', "$tempdir/backup3", '-Fp', "-j 4",
+ "-T$shorter_tempdir/tbl\\=spc2=$tempdir/tbackup/tbl\\=spc2"
+ ],
+ 'mapping tablespace with = sign in path');
+ ok(-d "$tempdir/tbackup/tbl=spc2",
+ 'tablespace with = sign was relocated');
+ $node->safe_psql('postgres', "DROP TABLESPACE tblspc2;");
+ rmtree("$tempdir/backup3");
+
+ mkdir "$tempdir/$superlongname";
+ $node->safe_psql('postgres',
+ "CREATE TABLESPACE tblspc3 LOCATION '$tempdir/$superlongname';");
+ $node->command_ok(
+ [ 'pg_basebackup', '-D', "$tempdir/tarbackup_l3", '-Ft' , '-j 4'],
+ 'pg_basebackup tar with long symlink target');
+ $node->safe_psql('postgres', "DROP TABLESPACE tblspc3;");
+ rmtree("$tempdir/tarbackup_l3");
+}
+
+$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/backupR", '-R' , '-j 4'],
+ 'pg_basebackup -R runs');
+ok(-f "$tempdir/backupR/postgresql.auto.conf", 'postgresql.auto.conf exists');
+ok(-f "$tempdir/backupR/standby.signal", 'standby.signal was created');
+my $recovery_conf = slurp_file "$tempdir/backupR/postgresql.auto.conf";
+rmtree("$tempdir/backupR");
+
+my $port = $node->port;
+like(
+ $recovery_conf,
+ qr/^primary_conninfo = '.*port=$port.*'\n/m,
+ 'postgresql.auto.conf sets primary_conninfo');
+
+$node->command_ok(
+ [ 'pg_basebackup', '-D', "$tempdir/backupxd" , "-j 4"],
+ 'pg_basebackup runs in default xlog mode');
+ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxd/pg_wal")),
+ 'WAL files copied');
+rmtree("$tempdir/backupxd");
+
+$node->command_ok(
+ [ 'pg_basebackup', '-D', "$tempdir/backupxf", '-X', 'fetch' , "-j 4"],
+ 'pg_basebackup -X fetch runs');
+ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_wal")),
+ 'WAL files copied');
+rmtree("$tempdir/backupxf");
+$node->command_ok(
+ [ 'pg_basebackup', '-D', "$tempdir/backupxs", '-X', 'stream' , "-j 4"],
+ 'pg_basebackup -X stream runs');
+ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxs/pg_wal")),
+ 'WAL files copied');
+rmtree("$tempdir/backupxs");
+$node->command_ok(
+ [ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' , "-j 4"],
+ 'pg_basebackup -X stream runs in tar mode');
+ok(-f "$tempdir/backupxst/pg_wal.tar", "tar file was created");
+rmtree("$tempdir/backupxst");
+$node->command_ok(
+ [
+ 'pg_basebackup', '-D',
+ "$tempdir/backupnoslot", '-X',
+ 'stream', '--no-slot',
+ '-j 4'
+ ],
+ 'pg_basebackup -X stream runs with --no-slot');
+rmtree("$tempdir/backupnoslot");
+
+$node->command_fails(
+ [
+ 'pg_basebackup', '-D',
+ "$tempdir/backupxs_sl_fail", '-X',
+ 'stream', '-S',
+ 'slot0',
+ '-j 4'
+ ],
+ 'pg_basebackup fails with nonexistent replication slot');
+#
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' , '-j 4'],
+ 'pg_basebackup -C fails without slot name');
+
+$node->command_fails(
+ [
+ 'pg_basebackup', '-D',
+ "$tempdir/backupxs_slot", '-C',
+ '-S', 'slot0',
+ '--no-slot',
+ '-j 4'
+ ],
+ 'pg_basebackup fails with -C -S --no-slot');
+
+$node->command_ok(
+ [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0', '-j 4'],
+ 'pg_basebackup -C runs');
+rmtree("$tempdir/backupxs_slot");
+
+is( $node->safe_psql(
+ 'postgres',
+ q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'slot0'}
+ ),
+ 'slot0',
+ 'replication slot was created');
+isnt(
+ $node->safe_psql(
+ 'postgres',
+ q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot0'}
+ ),
+ '',
+ 'restart LSN of new slot is not null');
+
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot1", '-C', '-S', 'slot0', '-j 4'],
+ 'pg_basebackup fails with -C -S and a previously existing slot');
+
+$node->safe_psql('postgres',
+ q{SELECT * FROM pg_create_physical_replication_slot('slot1')});
+my $lsn = $node->safe_psql('postgres',
+ q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot1'}
+);
+is($lsn, '', 'restart LSN of new slot is null');
+$node->command_fails(
+ [ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1', '-X', 'none', '-j 4'],
+ 'pg_basebackup with replication slot fails without WAL streaming');
+$node->command_ok(
+ [
+ 'pg_basebackup', '-D', "$tempdir/backupxs_sl", '-X',
+ 'stream', '-S', 'slot1', '-j 4'
+ ],
+ 'pg_basebackup -X stream with replication slot runs');
+$lsn = $node->safe_psql('postgres',
+ q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot1'}
+);
+like($lsn, qr!^0/[0-9A-Z]{7,8}$!, 'restart LSN of slot has advanced');
+rmtree("$tempdir/backupxs_sl");
+
+$node->command_ok(
+ [
+ 'pg_basebackup', '-D', "$tempdir/backupxs_sl_R", '-X',
+ 'stream', '-S', 'slot1', '-R',
+ '-j 4'
+ ],
+ 'pg_basebackup with replication slot and -R runs');
+like(
+ slurp_file("$tempdir/backupxs_sl_R/postgresql.auto.conf"),
+ qr/^primary_slot_name = 'slot1'\n/m,
+ 'recovery conf file sets primary_slot_name');
+
+my $checksum = $node->safe_psql('postgres', 'SHOW data_checksums;');
+is($checksum, 'on', 'checksums are enabled');
+rmtree("$tempdir/backupxs_sl_R");
+
+# create tables to corrupt and get their relfilenodes
+my $file_corrupt1 = $node->safe_psql('postgres',
+ q{SELECT a INTO corrupt1 FROM generate_series(1,10000) AS a; ALTER TABLE corrupt1 SET (autovacuum_enabled=false); SELECT pg_relation_filepath('corrupt1')}
+);
+my $file_corrupt2 = $node->safe_psql('postgres',
+ q{SELECT b INTO corrupt2 FROM generate_series(1,2) AS b; ALTER TABLE corrupt2 SET (autovacuum_enabled=false); SELECT pg_relation_filepath('corrupt2')}
+);
+
+# set page header and block sizes
+my $pageheader_size = 24;
+my $block_size = $node->safe_psql('postgres', 'SHOW block_size;');
+
+# induce corruption
+system_or_bail 'pg_ctl', '-D', $pgdata, 'stop';
+open $file, '+<', "$pgdata/$file_corrupt1";
+seek($file, $pageheader_size, 0);
+syswrite($file, "\0\0\0\0\0\0\0\0\0");
+close $file;
+system_or_bail 'pg_ctl', '-D', $pgdata, 'start';
+
+$node->command_checks_all(
+ [ 'pg_basebackup', '-D', "$tempdir/backup_corrupt", '-j 4'],
+ 1,
+ [qr{^$}],
+ [qr/^WARNING.*checksum verification failed/s],
+ 'pg_basebackup reports checksum mismatch');
+rmtree("$tempdir/backup_corrupt");
+
+# induce further corruption in 5 more blocks
+system_or_bail 'pg_ctl', '-D', $pgdata, 'stop';
+open $file, '+<', "$pgdata/$file_corrupt1";
+for my $i (1 .. 5)
+{
+ my $offset = $pageheader_size + $i * $block_size;
+ seek($file, $offset, 0);
+ syswrite($file, "\0\0\0\0\0\0\0\0\0");
+}
+close $file;
+system_or_bail 'pg_ctl', '-D', $pgdata, 'start';
+
+$node->command_checks_all(
+ [ 'pg_basebackup', '-D', "$tempdir/backup_corrupt2", '-j 4'],
+ 1,
+ [qr{^$}],
+ [qr/^WARNING.*further.*failures.*will.not.be.reported/s],
+ 'pg_basebackup does not report more than 5 checksum mismatches');
+rmtree("$tempdir/backup_corrupt2");
+
+# induce corruption in a second file
+system_or_bail 'pg_ctl', '-D', $pgdata, 'stop';
+open $file, '+<', "$pgdata/$file_corrupt2";
+seek($file, $pageheader_size, 0);
+syswrite($file, "\0\0\0\0\0\0\0\0\0");
+close $file;
+system_or_bail 'pg_ctl', '-D', $pgdata, 'start';
+
+#$node->command_checks_all(
+# [ 'pg_basebackup', '-D', "$tempdir/backup_corrupt3", '-j 4'],
+# 1,
+# [qr{^$}],
+# [qr/^WARNING.*checksum verification failed/s],
+# 'pg_basebackup correctly report the total number of checksum mismatches');
+#rmtree("$tempdir/backup_corrupt3");
+
+# do not verify checksums, should return ok
+$node->command_ok(
+ [
+ 'pg_basebackup', '-D',
+ "$tempdir/backup_corrupt4", '--no-verify-checksums',
+ '-j 4'
+ ],
+ 'pg_basebackup with -k does not report checksum mismatch');
+rmtree("$tempdir/backup_corrupt4");
+
+$node->safe_psql('postgres', "DROP TABLE corrupt1;");
+$node->safe_psql('postgres', "DROP TABLE corrupt2;");
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 1e3ed4e19f..f92d593e2e 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,13 @@ typedef enum ReplicationKind
REPLICATION_KIND_LOGICAL
} ReplicationKind;
+typedef enum BackupCmdTag
+{
+ BASE_BACKUP,
+ START_BACKUP,
+ SEND_FILES_CONTENT,
+ STOP_BACKUP
+} BackupCmdTag;
/* ----------------------
* IDENTIFY_SYSTEM command
@@ -42,6 +49,8 @@ typedef struct BaseBackupCmd
{
NodeTag type;
List *options;
+ BackupCmdTag cmdtag;
+ List *backupfiles;
} BaseBackupCmd;
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index 503a5b9f0b..9e792af99d 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,6 @@ typedef struct
extern void SendBaseBackup(BaseBackupCmd *cmd);
-extern int64 sendTablespace(char *path, bool sizeonly);
+extern int64 sendTablespace(char *path, bool sizeonly, List **files);
#endif /* _BASEBACKUP_H */
--
2.21.0 (Apple Git-122)