0001-Initial-POC-on-parallel-backup.patch
application/octet-stream
Filename: 0001-Initial-POC-on-parallel-backup.patch
Type: application/octet-stream
Part: 0
Message:
WIP/PoC for parallel backup
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0001
Subject: Initial POC on parallel backup
| File | + | − |
|---|---|---|
| src/backend/replication/basebackup.c | 533 | 232 |
| src/backend/replication/repl_gram.y | 52 | 1 |
| src/backend/replication/repl_scanner.l | 4 | 0 |
| src/bin/pg_basebackup/pg_basebackup.c | 268 | 17 |
| src/include/nodes/replnodes.h | 8 | 0 |
From ffa6d0946af34d78e59eb5b82f1572f2537fffeb Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Wed, 21 Aug 2019 18:35:45 +0500
Subject: [PATCH] Initial POC on parallel backup
---
src/backend/replication/basebackup.c | 765 +++++++++++++++++--------
src/backend/replication/repl_gram.y | 53 +-
src/backend/replication/repl_scanner.l | 4 +
src/bin/pg_basebackup/pg_basebackup.c | 285 ++++++++-
src/include/nodes/replnodes.h | 8 +
5 files changed, 865 insertions(+), 250 deletions(-)
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index c91f66dcbe..9cbee408ff 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -52,11 +52,26 @@ typedef struct
bool includewal;
uint32 maxrate;
bool sendtblspcmapfile;
+ bool parallel;
} basebackup_options;
+typedef struct
+{
+ bool isdir;
+ char *path;
+} pathinfo;
+
+#define MAKE_PATHINFO(a, b) \
+ do { \
+ pi = palloc0(sizeof(pathinfo)); \
+ pi->isdir = a; \
+ pi->path = pstrdup(b); \
+ } while(0)
static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
List *tablespaces, bool sendtblspclinks);
+static int64 sendDir_(const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks, List **files);
static bool sendFile(const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid);
static void sendFileWithContent(const char *filename, const char *content);
@@ -74,12 +89,18 @@ static int compareWalFileNames(const ListCell *a, const ListCell *b);
static void throttle(size_t increment);
static bool is_checksummed_file(const char *fullpath, const char *filename);
+static void StopBackup(basebackup_options *opt);
+static void SendBackupFileList(List *tablespaces);
+static void SendFilesContents(List *files, bool missing_ok);
+static void includeWALFiles(basebackup_options *opt, XLogRecPtr endptr, TimeLineID endtli);
+
/* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false;
/* Relative path of temporary statistics directory */
static char *statrelpath = NULL;
+#define TMP_BACKUP_LABEL_FILE BACKUP_LABEL_FILE".tmp"
/*
* Size of each block sent into the tar stream for larger files.
*/
@@ -305,6 +326,33 @@ perform_base_backup(basebackup_options *opt)
throttling_counter = -1;
}
+ /*
+ * In the parallel mode, we will not be closing the backup or sending the files right away.
+ * Instead we will only send the list of file names in the $PGDATA direcotry.
+ */
+ if (opt->parallel)
+ {
+ /* save backup label into temp file for now. So stop backup can send it to pg_basebackup later on. */
+ FILE *fp = AllocateFile(TMP_BACKUP_LABEL_FILE, "w");
+ if (!fp)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m",
+ TMP_BACKUP_LABEL_FILE)));
+ if (fwrite(labelfile->data, labelfile->len, 1, fp) != 1 ||
+ fflush(fp) != 0 ||
+ pg_fsync(fileno(fp)) != 0 ||
+ ferror(fp) ||
+ FreeFile(fp))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write file \"%s\": %m",
+ TMP_BACKUP_LABEL_FILE)));
+
+ SendBackupFileList(tablespaces);
+ return;
+ }
+
/* Send off our tablespaces one by one */
foreach(lc, tablespaces)
{
@@ -367,234 +415,8 @@ perform_base_backup(basebackup_options *opt)
if (opt->includewal)
- {
- /*
- * We've left the last tar file "open", so we can now append the
- * required WAL files to it.
- */
- char pathbuf[MAXPGPATH];
- XLogSegNo segno;
- XLogSegNo startsegno;
- XLogSegNo endsegno;
- struct stat statbuf;
- List *historyFileList = NIL;
- List *walFileList = NIL;
- char firstoff[MAXFNAMELEN];
- char lastoff[MAXFNAMELEN];
- DIR *dir;
- struct dirent *de;
- ListCell *lc;
- TimeLineID tli;
-
- /*
- * I'd rather not worry about timelines here, so scan pg_wal and
- * include all WAL files in the range between 'startptr' and 'endptr',
- * regardless of the timeline the file is stamped with. If there are
- * some spurious WAL files belonging to timelines that don't belong in
- * this server's history, they will be included too. Normally there
- * shouldn't be such files, but if there are, there's little harm in
- * including them.
- */
- XLByteToSeg(startptr, startsegno, wal_segment_size);
- XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size);
- XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
- XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size);
-
- dir = AllocateDir("pg_wal");
- while ((de = ReadDir(dir, "pg_wal")) != NULL)
- {
- /* Does it look like a WAL segment, and is it in the range? */
- if (IsXLogFileName(de->d_name) &&
- strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
- strcmp(de->d_name + 8, lastoff + 8) <= 0)
- {
- walFileList = lappend(walFileList, pstrdup(de->d_name));
- }
- /* Does it look like a timeline history file? */
- else if (IsTLHistoryFileName(de->d_name))
- {
- historyFileList = lappend(historyFileList, pstrdup(de->d_name));
- }
- }
- FreeDir(dir);
-
- /*
- * Before we go any further, check that none of the WAL segments we
- * need were removed.
- */
- CheckXLogRemoved(startsegno, ThisTimeLineID);
-
- /*
- * Sort the WAL filenames. We want to send the files in order from
- * oldest to newest, to reduce the chance that a file is recycled
- * before we get a chance to send it over.
- */
- list_sort(walFileList, compareWalFileNames);
-
- /*
- * There must be at least one xlog file in the pg_wal directory, since
- * we are doing backup-including-xlog.
- */
- if (walFileList == NIL)
- ereport(ERROR,
- (errmsg("could not find any WAL files")));
-
- /*
- * Sanity check: the first and last segment should cover startptr and
- * endptr, with no gaps in between.
- */
- XLogFromFileName((char *) linitial(walFileList),
- &tli, &segno, wal_segment_size);
- if (segno != startsegno)
- {
- char startfname[MAXFNAMELEN];
+ includeWALFiles(opt, endptr, endtli);
- XLogFileName(startfname, ThisTimeLineID, startsegno,
- wal_segment_size);
- ereport(ERROR,
- (errmsg("could not find WAL file \"%s\"", startfname)));
- }
- foreach(lc, walFileList)
- {
- char *walFileName = (char *) lfirst(lc);
- XLogSegNo currsegno = segno;
- XLogSegNo nextsegno = segno + 1;
-
- XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
- if (!(nextsegno == segno || currsegno == segno))
- {
- char nextfname[MAXFNAMELEN];
-
- XLogFileName(nextfname, ThisTimeLineID, nextsegno,
- wal_segment_size);
- ereport(ERROR,
- (errmsg("could not find WAL file \"%s\"", nextfname)));
- }
- }
- if (segno != endsegno)
- {
- char endfname[MAXFNAMELEN];
-
- XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size);
- ereport(ERROR,
- (errmsg("could not find WAL file \"%s\"", endfname)));
- }
-
- /* Ok, we have everything we need. Send the WAL files. */
- foreach(lc, walFileList)
- {
- char *walFileName = (char *) lfirst(lc);
- FILE *fp;
- char buf[TAR_SEND_SIZE];
- size_t cnt;
- pgoff_t len = 0;
-
- snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
- XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
-
- fp = AllocateFile(pathbuf, "rb");
- if (fp == NULL)
- {
- int save_errno = errno;
-
- /*
- * Most likely reason for this is that the file was already
- * removed by a checkpoint, so check for that to get a better
- * error message.
- */
- CheckXLogRemoved(segno, tli);
-
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m", pathbuf)));
- }
-
- if (fstat(fileno(fp), &statbuf) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file \"%s\": %m",
- pathbuf)));
- if (statbuf.st_size != wal_segment_size)
- {
- CheckXLogRemoved(segno, tli);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("unexpected WAL file size \"%s\"", walFileName)));
- }
-
- /* send the WAL file itself */
- _tarWriteHeader(pathbuf, NULL, &statbuf, false);
-
- while ((cnt = fread(buf, 1,
- Min(sizeof(buf), wal_segment_size - len),
- fp)) > 0)
- {
- CheckXLogRemoved(segno, tli);
- /* Send the chunk as a CopyData message */
- if (pq_putmessage('d', buf, cnt))
- ereport(ERROR,
- (errmsg("base backup could not send data, aborting backup")));
-
- len += cnt;
- throttle(cnt);
-
- if (len == wal_segment_size)
- break;
- }
-
- if (len != wal_segment_size)
- {
- CheckXLogRemoved(segno, tli);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("unexpected WAL file size \"%s\"", walFileName)));
- }
-
- /* wal_segment_size is a multiple of 512, so no need for padding */
-
- FreeFile(fp);
-
- /*
- * Mark file as archived, otherwise files can get archived again
- * after promotion of a new node. This is in line with
- * walreceiver.c always doing an XLogArchiveForceDone() after a
- * complete segment.
- */
- StatusFilePath(pathbuf, walFileName, ".done");
- sendFileWithContent(pathbuf, "");
- }
-
- /*
- * Send timeline history files too. Only the latest timeline history
- * file is required for recovery, and even that only if there happens
- * to be a timeline switch in the first WAL segment that contains the
- * checkpoint record, or if we're taking a base backup from a standby
- * server and the target timeline changes while the backup is taken.
- * But they are small and highly useful for debugging purposes, so
- * better include them all, always.
- */
- foreach(lc, historyFileList)
- {
- char *fname = lfirst(lc);
-
- snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
-
- if (lstat(pathbuf, &statbuf) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file \"%s\": %m", pathbuf)));
-
- sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid);
-
- /* unconditionally mark file as archived */
- StatusFilePath(pathbuf, fname, ".done");
- sendFileWithContent(pathbuf, "");
- }
-
- /* Send CopyDone message for the last tar file */
- pq_putemptymessage('c');
- }
SendXlogRecPtrResult(endptr, endtli);
if (total_checksum_failures)
@@ -638,6 +460,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_maxrate = false;
bool o_tablespace_map = false;
bool o_noverify_checksums = false;
+ bool o_parallel = false;
MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
@@ -726,6 +549,16 @@ parse_basebackup_options(List *options, basebackup_options *opt)
noverify_checksums = true;
o_noverify_checksums = true;
}
+ else if (strcmp(defel->defname, "parallel") == 0)
+ {
+ if (o_parallel)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ opt->parallel = true;
+ o_parallel = true;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
@@ -760,7 +593,12 @@ SendBaseBackup(BaseBackupCmd *cmd)
set_ps_display(activitymsg, false);
}
- perform_base_backup(&opt);
+ if (cmd->cmdtag == SEND_FILES_CONTENT)
+ SendFilesContents(cmd->backupfiles, true);
+ else if (cmd->cmdtag == STOP_BACKUP)
+ StopBackup(&opt);
+ else
+ perform_base_backup(&opt);
}
static void
@@ -1004,9 +842,16 @@ sendTablespace(char *path, bool sizeonly)
* information in the tar file. If not, we can skip that
* as it will be sent separately in the tablespace_map file.
*/
+
+static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks)
+{
+ return sendDir_(path, basepathlen, sizeonly, tablespaces, sendtblspclinks, NULL);
+}
+
static int64
-sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
- bool sendtblspclinks)
+sendDir_(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
+ bool sendtblspclinks, List **files)
{
DIR *dir;
struct dirent *de;
@@ -1160,6 +1005,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+ if (files != NULL)
+ {
+ pathinfo *pi;
+
+ MAKE_PATHINFO(true, pathbuf);
+ *files = lappend(*files, pi);
+ }
+
size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
excludeFound = true;
break;
@@ -1197,6 +1051,13 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
sizeonly);
+ if (files != NULL)
+ {
+ pathinfo *pi;
+
+ MAKE_PATHINFO(true, pathbuf);
+ *files = lappend(*files, pi);
+ }
continue; /* don't recurse into pg_wal */
}
@@ -1282,13 +1143,29 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
skip_this_dir = true;
if (!skip_this_dir)
- size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
+ {
+ if (files != NULL)
+ {
+ pathinfo *pi;
+
+ MAKE_PATHINFO(true, pathbuf);
+ *files = lappend(*files, pi);
+ }
+ size += sendDir_(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, files);
+ }
}
else if (S_ISREG(statbuf.st_mode))
{
bool sent = false;
- if (!sizeonly)
+ if (files != NULL)
+ {
+ pathinfo *pi;
+
+ MAKE_PATHINFO(false, pathbuf);
+ *files = lappend(*files, pi);
+ }
+ else if (!sizeonly)
sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
@@ -1711,3 +1588,427 @@ throttle(size_t increment)
*/
throttled_last = GetCurrentTimestamp();
}
+
+
+static void
+StopBackup(basebackup_options *opt)
+{
+ TimeLineID endtli;
+ XLogRecPtr endptr;
+ char *labelfile;
+ struct stat statbuf;
+ int r;
+ StringInfoData buf;
+
+ /* Disable throttling. */
+ throttling_counter = -1;
+
+ /* send backup_label.tmp and pg_control files */
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+
+ /* ... and pg_control after everything else. */
+ if (lstat(TMP_BACKUP_LABEL_FILE, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ TMP_BACKUP_LABEL_FILE)));
+ sendFile(TMP_BACKUP_LABEL_FILE, BACKUP_LABEL_FILE, &statbuf, false, InvalidOid);
+
+ /* read backup_label file into buffer, we need it for do_pg_stop_backup */
+ FILE *lfp = AllocateFile(TMP_BACKUP_LABEL_FILE, "r");
+ if (!lfp)
+ {
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m",
+ TMP_BACKUP_LABEL_FILE)));
+ }
+
+ labelfile = palloc(statbuf.st_size + 1);
+ r = fread(labelfile, statbuf.st_size, 1, lfp);
+ labelfile[statbuf.st_size] = '\0';
+
+
+ /* ... and pg_control after everything else. */
+ if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ XLOG_CONTROL_FILE)));
+ sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+
+ pq_putemptymessage('c'); /* CopyDone */
+
+ /* stop backup */
+ endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
+
+ /*
+ * FIXME: opt->includewal is not avaiable here. so just calling it unconditionaly. but should add
+ * includewal option to STOP_BACKUP command that pg_basebacup sends.
+ */
+
+ // if (opt->includewal)
+ includeWALFiles(opt, endptr, endtli);
+
+ /* send ending wal record. */
+ SendXlogRecPtrResult(endptr, endtli);
+}
+
+static void
+SendBackupFileList(List *tablespaces)
+{
+ StringInfoData buf;
+ ListCell *lc;
+
+ List *files = NIL;
+ foreach(lc, tablespaces)
+ {
+ tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
+ if (ti->path == NULL)
+ sendDir_(".", 1, false, NIL, true, &files);
+ else
+ sendDir_(ti->path, 1, false, NIL, true, &files);
+ }
+
+ // add backup label file
+ pathinfo *pi;
+ MAKE_PATHINFO(false, TMP_BACKUP_LABEL_FILE);
+ files = lcons(pi, files);
+
+ /* Construct and send the directory information */
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 2); /* 2 fields */
+
+ /* First field - isdirectory */
+ pq_sendstring(&buf, "isDir");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, INT4OID); /* type oid */
+ pq_sendint16(&buf, 4); /* typlen */
+ pq_sendint32(&buf, 0); /* typmod */
+ pq_sendint16(&buf, 0); /* format code */
+
+ /* Second field - file path */
+ pq_sendstring(&buf, "path");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, TEXTOID);
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ pq_endmessage(&buf);
+
+ foreach(lc, files)
+ {
+ pathinfo *pi = (pathinfo *) lfirst(lc);
+ char *path = pi->path;
+
+ /* Send one datarow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 2); /* number of columns */
+
+ int32 isdir = pi->isdir ? 1 : 0;
+ send_int8_string(&buf, isdir);
+
+ Size len = strlen(path);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, path, len);
+
+ pq_endmessage(&buf);
+ }
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+ }
+
+static void
+SendFilesContents(List *files, bool missing_ok)
+{
+ StringInfoData buf;
+ ListCell *lc;
+
+ /* Disable throttling. */
+ throttling_counter = -1;
+
+ /* Send CopyOutResponse message */
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+
+ foreach(lc, files)
+ {
+ Value *strval = lfirst(lc);
+ char *pathbuf = (char *) strVal(strval);
+
+ // send file
+ struct stat statbuf;
+ if (lstat(pathbuf, &statbuf) != 0)
+ {
+ if (errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file or directory \"%s\": %m",
+ pathbuf)));
+
+ /* If the file went away while scanning, it's not an error. */
+ continue;
+ }
+
+ /*
+ * TODO: perhaps create directory entry in the tar file, to avoid the need of manually creating
+ * directories in pg_basebackup.c
+ */
+// if (S_ISDIR(statbuf.st_mode))
+// {
+// bool skip_this_dir = false;
+// ListCell *lc;
+//
+// /*
+// * Store a directory entry in the tar file so we can get the
+// * permissions right.
+// */
+//
+// _tarWriteHeader(pathbuf, NULL, &statbuf, false);
+// }
+ sendFile(pathbuf, pathbuf, &statbuf, true, InvalidOid);
+ }
+
+ pq_putemptymessage('c'); /* CopyDone */
+ return;
+}
+
+static void
+includeWALFiles(basebackup_options *opt, XLogRecPtr endptr, TimeLineID endtli)
+{
+ /*
+ * We've left the last tar file "open", so we can now append the
+ * required WAL files to it.
+ */
+ char pathbuf[MAXPGPATH];
+ XLogSegNo segno;
+ XLogSegNo startsegno;
+ XLogSegNo endsegno;
+ struct stat statbuf;
+ List *historyFileList = NIL;
+ List *walFileList = NIL;
+ char firstoff[MAXFNAMELEN];
+ char lastoff[MAXFNAMELEN];
+ DIR *dir;
+ struct dirent *de;
+ ListCell *lc;
+ TimeLineID tli;
+
+ /*
+ * I'd rather not worry about timelines here, so scan pg_wal and
+ * include all WAL files in the range between 'startptr' and 'endptr',
+ * regardless of the timeline the file is stamped with. If there are
+ * some spurious WAL files belonging to timelines that don't belong in
+ * this server's history, they will be included too. Normally there
+ * shouldn't be such files, but if there are, there's little harm in
+ * including them.
+ */
+ XLByteToSeg(startptr, startsegno, wal_segment_size);
+ XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size);
+ XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
+ XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size);
+
+ dir = AllocateDir("pg_wal");
+ while ((de = ReadDir(dir, "pg_wal")) != NULL)
+ {
+ /* Does it look like a WAL segment, and is it in the range? */
+ if (IsXLogFileName(de->d_name) &&
+ strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
+ strcmp(de->d_name + 8, lastoff + 8) <= 0)
+ {
+ walFileList = lappend(walFileList, pstrdup(de->d_name));
+ }
+ /* Does it look like a timeline history file? */
+ else if (IsTLHistoryFileName(de->d_name))
+ {
+ historyFileList = lappend(historyFileList, pstrdup(de->d_name));
+ }
+ }
+ FreeDir(dir);
+
+ /*
+ * Before we go any further, check that none of the WAL segments we
+ * need were removed.
+ */
+ CheckXLogRemoved(startsegno, ThisTimeLineID);
+
+ /*
+ * Sort the WAL filenames. We want to send the files in order from
+ * oldest to newest, to reduce the chance that a file is recycled
+ * before we get a chance to send it over.
+ */
+ list_sort(walFileList, compareWalFileNames);
+
+ /*
+ * There must be at least one xlog file in the pg_wal directory, since
+ * we are doing backup-including-xlog.
+ */
+ if (walFileList == NIL)
+ ereport(ERROR,
+ (errmsg("could not find any WAL files")));
+
+ /*
+ * Sanity check: the first and last segment should cover startptr and
+ * endptr, with no gaps in between.
+ */
+ XLogFromFileName((char *) linitial(walFileList),
+ &tli, &segno, wal_segment_size);
+ if (segno != startsegno)
+ {
+ char startfname[MAXFNAMELEN];
+
+ XLogFileName(startfname, ThisTimeLineID, startsegno,
+ wal_segment_size);
+ ereport(ERROR,
+ (errmsg("could not find WAL file \"%s\"", startfname)));
+ }
+ foreach(lc, walFileList)
+ {
+ char *walFileName = (char *) lfirst(lc);
+ XLogSegNo currsegno = segno;
+ XLogSegNo nextsegno = segno + 1;
+
+ XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
+ if (!(nextsegno == segno || currsegno == segno))
+ {
+ char nextfname[MAXFNAMELEN];
+
+ XLogFileName(nextfname, ThisTimeLineID, nextsegno,
+ wal_segment_size);
+ ereport(ERROR,
+ (errmsg("could not find WAL file \"%s\"", nextfname)));
+ }
+ }
+ if (segno != endsegno)
+ {
+ char endfname[MAXFNAMELEN];
+
+ XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size);
+ ereport(ERROR,
+ (errmsg("could not find WAL file \"%s\"", endfname)));
+ }
+
+ /* Ok, we have everything we need. Send the WAL files. */
+ foreach(lc, walFileList)
+ {
+ char *walFileName = (char *) lfirst(lc);
+ FILE *fp;
+ char buf[TAR_SEND_SIZE];
+ size_t cnt;
+ pgoff_t len = 0;
+
+ snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
+ XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
+
+ fp = AllocateFile(pathbuf, "rb");
+ if (fp == NULL)
+ {
+ int save_errno = errno;
+
+ /*
+ * Most likely reason for this is that the file was already
+ * removed by a checkpoint, so check for that to get a better
+ * error message.
+ */
+ CheckXLogRemoved(segno, tli);
+
+ errno = save_errno;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", pathbuf)));
+ }
+
+ if (fstat(fileno(fp), &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ pathbuf)));
+ if (statbuf.st_size != wal_segment_size)
+ {
+ CheckXLogRemoved(segno, tli);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected WAL file size \"%s\"", walFileName)));
+ }
+
+ /* send the WAL file itself */
+ _tarWriteHeader(pathbuf, NULL, &statbuf, false);
+
+ while ((cnt = fread(buf, 1,
+ Min(sizeof(buf), wal_segment_size - len),
+ fp)) > 0)
+ {
+ CheckXLogRemoved(segno, tli);
+ /* Send the chunk as a CopyData message */
+ if (pq_putmessage('d', buf, cnt))
+ ereport(ERROR,
+ (errmsg("base backup could not send data, aborting backup")));
+
+ len += cnt;
+ throttle(cnt);
+
+ if (len == wal_segment_size)
+ break;
+ }
+
+ if (len != wal_segment_size)
+ {
+ CheckXLogRemoved(segno, tli);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected WAL file size \"%s\"", walFileName)));
+ }
+
+ /* wal_segment_size is a multiple of 512, so no need for padding */
+
+ FreeFile(fp);
+
+ /*
+ * Mark file as archived, otherwise files can get archived again
+ * after promotion of a new node. This is in line with
+ * walreceiver.c always doing an XLogArchiveForceDone() after a
+ * complete segment.
+ */
+ StatusFilePath(pathbuf, walFileName, ".done");
+ sendFileWithContent(pathbuf, "");
+ }
+
+ /*
+ * Send timeline history files too. Only the latest timeline history
+ * file is required for recovery, and even that only if there happens
+ * to be a timeline switch in the first WAL segment that contains the
+ * checkpoint record, or if we're taking a base backup from a standby
+ * server and the target timeline changes while the backup is taken.
+ * But they are small and highly useful for debugging purposes, so
+ * better include them all, always.
+ */
+ foreach(lc, historyFileList)
+ {
+ char *fname = lfirst(lc);
+
+ snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
+
+ if (lstat(pathbuf, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m", pathbuf)));
+
+ sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid);
+
+ /* unconditionally mark file as archived */
+ StatusFilePath(pathbuf, fname, ".done");
+ sendFileWithContent(pathbuf, "");
+ }
+
+ /* Send CopyDone message for the last tar file */
+ pq_putemptymessage('c');
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc4e8..56b6934e43 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,6 +87,10 @@ static SQLCmd *make_sqlcmd(void);
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
+%token K_PARALLEL
+%token K_START_BACKUP
+%token K_SEND_FILES_CONTENT
+%token K_STOP_BACKUP
%type <node> command
%type <node> base_backup start_replication start_logical_replication
@@ -102,6 +106,8 @@ static SQLCmd *make_sqlcmd(void);
%type <boolval> opt_temporary
%type <list> create_slot_opt_list
%type <defelt> create_slot_opt
+%type <list> backup_files backup_files_list
+%type <node> backup_file
%%
@@ -155,13 +161,29 @@ var_name: IDENT { $$ = $1; }
/*
* BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
- * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS]
+ * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS] [PARALLEL]
*/
base_backup:
K_BASE_BACKUP base_backup_opt_list
{
BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
cmd->options = $2;
+ cmd->cmdtag = BASE_BACKUP;
+ $$ = (Node *) cmd;
+ }
+ | K_SEND_FILES_CONTENT backup_files
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = NIL;
+ cmd->cmdtag = SEND_FILES_CONTENT;
+ cmd->backupfiles = $2;
+ $$ = (Node *) cmd;
+ }
+ | K_STOP_BACKUP
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = NIL;
+ cmd->cmdtag = STOP_BACKUP;
$$ = (Node *) cmd;
}
;
@@ -214,6 +236,35 @@ base_backup_opt:
$$ = makeDefElem("noverify_checksums",
(Node *)makeInteger(true), -1);
}
+ | K_PARALLEL
+ {
+ $$ = makeDefElem("parallel",
+ (Node *)makeInteger(true), -1);
+ }
+ ;
+
+backup_files:
+ '(' backup_files_list ')'
+ {
+ $$ = $2;
+ }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+backup_files_list:
+ backup_file
+ {
+ $$ = list_make1($1);
+ }
+ | backup_files_list ',' backup_file
+ {
+ $$ = lappend($1, $3);
+ }
+ ;
+
+backup_file:
+ SCONST { $$ = (Node *) makeString($1); }
;
create_replication_slot:
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb5f6..87a38046c0 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,10 @@ EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
WAIT { return K_WAIT; }
+PARALLEL { return K_PARALLEL; }
+START_BACKUP { return K_START_BACKUP; }
+SEND_FILES_CONTENT { return K_SEND_FILES_CONTENT; }
+STOP_BACKUP { return K_STOP_BACKUP; }
"," { return ','; }
";" { return ';'; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 9207109ba3..ed58d06316 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -40,6 +40,7 @@
#include "receivelog.h"
#include "replication/basebackup.h"
#include "streamutil.h"
+#include "fe_utils/simple_list.h"
#define ERRCODE_DATA_CORRUPTED "XX001"
@@ -105,6 +106,7 @@ static bool temp_replication_slot = true;
static bool create_slot = false;
static bool no_slot = false;
static bool verify_checksums = true;
+static int numWorkers = 1;
static bool success = false;
static bool made_new_pgdata = false;
@@ -114,6 +116,9 @@ static bool found_existing_xlogdir = false;
static bool made_tablespace_dirs = false;
static bool found_tablespace_dirs = false;
+PGconn **conn_list = NULL;
+int *worker_process;
+
/* Progress counters */
static uint64 totalsize;
static uint64 totaldone;
@@ -157,6 +162,11 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
static const char *get_tablespace_mapping(const char *dir);
static void tablespace_list_append(const char *arg);
+static void ParallelBackupEnd(void);
+static int getFiles(SimpleStringList *files);
+static SimpleStringList** divideFilesList(SimpleStringList *files, int numFiles);
+static void create_workers_and_fetch(SimpleStringList **worker_files);
+
static void
cleanup_directories_atexit(void)
@@ -355,6 +365,7 @@ usage(void)
printf(_(" --no-slot prevent creation of temporary replication slot\n"));
printf(_(" --no-verify-checksums\n"
" do not verify checksums\n"));
+ printf(_(" -j, --jobs=NUM use this many parallel jobs to backup\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n"));
printf(_(" -d, --dbname=CONNSTR connection string\n"));
@@ -1477,6 +1488,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
*/
snprintf(filename, sizeof(filename), "%s/%s", current_path,
copybuf);
+
if (filename[strlen(filename) - 1] == '/')
{
/*
@@ -1867,7 +1879,23 @@ BaseBackup(void)
fprintf(stderr, "\n");
}
- basebkp =
+ if (numWorkers > 1)
+ {
+ basebkp =
+ psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s %s",
+ escaped_label,
+ showprogress ? "PROGRESS" : "",
+ includewal == FETCH_WAL ? "WAL" : "",
+ fastcheckpoint ? "FAST" : "",
+ includewal == NO_WAL ? "" : "NOWAIT",
+ maxrate_clause ? maxrate_clause : "",
+ format == 't' ? "TABLESPACE_MAP" : "",
+ verify_checksums ? "" : "NOVERIFY_CHECKSUMS",
+ (numWorkers > 1) ? "PARALLEL" : "");
+ }
+ else
+ {
+ basebkp =
psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
@@ -1877,6 +1905,8 @@ BaseBackup(void)
maxrate_clause ? maxrate_clause : "",
format == 't' ? "TABLESPACE_MAP" : "",
verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ }
+
if (PQsendQuery(conn, basebkp) == 0)
{
@@ -1982,24 +2012,87 @@ BaseBackup(void)
StartLogStreamer(xlogstart, starttli, sysidentifier);
}
- /*
- * Start receiving chunks
- */
- for (i = 0; i < PQntuples(res); i++)
+ if (numWorkers > 1)
{
- if (format == 't')
- ReceiveTarFile(conn, res, i);
- else
- ReceiveAndUnpackTarFile(conn, res, i);
- } /* Loop over all tablespaces */
+ SimpleStringList files = {NULL, NULL};
+ SimpleStringList **worker_files;
- if (showprogress)
- {
- progress_report(PQntuples(res), NULL, true);
- if (isatty(fileno(stderr)))
- fprintf(stderr, "\n"); /* Need to move to next line */
+ /*
+ * Get the header
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not get backup header: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ if (PQntuples(res) < 1)
+ {
+ pg_log_error("no data returned from server");
+ exit(1);
+ }
+
+ int num_files = 0;
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ bool isdir = atoi(PQgetvalue(res, i, 0));
+ const char *path = PQgetvalue(res, i, 1);
+
+ /* create directories while traversing */
+ if (isdir)
+ {
+ bool created;
+ bool found;
+ char current_path[MAXPGPATH];
+
+ if (includewal == STREAM_WAL &&
+ (pg_str_endswith(path, "/pg_wal") ||
+ pg_str_endswith(path, "/pg_xlog") ||
+ pg_str_endswith(path, "/archive_status")))
+ continue;
+
+ snprintf(current_path, sizeof(current_path), "%s/%s", basedir, path + 2);
+ verify_dir_is_empty_or_create(current_path, &created, &found);
+ }
+
+ else
+ {
+ num_files++;
+ simple_string_list_append(&files, path);
+ }
+ }
+
+ res = PQgetResult(conn); //NoData
+ res = PQgetResult(conn); //CopyDone
+
+ worker_files = divideFilesList(&files, num_files);
+ create_workers_and_fetch(worker_files);
+
+ pg_log_info("total files in $PGDTA: %d", num_files);
+
+ ParallelBackupEnd();
}
+ else
+ {
+ /*
+ * Start receiving chunks
+ */
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ if (format == 't')
+ ReceiveTarFile(conn, res, i);
+ else
+ ReceiveAndUnpackTarFile(conn, res, i);
+ } /* Loop over all tablespaces */
+ if (showprogress)
+ {
+ progress_report(PQntuples(res), NULL, true);
+ if (isatty(fileno(stderr)))
+ fprintf(stderr, "\n"); /* Need to move to next line */
+ }
+ }
PQclear(res);
/*
@@ -2195,6 +2288,7 @@ main(int argc, char **argv)
{"waldir", required_argument, NULL, 1},
{"no-slot", no_argument, NULL, 2},
{"no-verify-checksums", no_argument, NULL, 3},
+ {"jobs", required_argument, NULL, 'j'},
{NULL, 0, NULL, 0}
};
int c;
@@ -2222,7 +2316,7 @@ main(int argc, char **argv)
atexit(cleanup_directories_atexit);
- while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+ while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -2363,6 +2457,9 @@ main(int argc, char **argv)
case 3:
verify_checksums = false;
break;
+ case 'j': /* number of jobs */
+ numWorkers = atoi(optarg);
+ break;
default:
/*
@@ -2477,6 +2574,22 @@ main(int argc, char **argv)
}
}
+ if (numWorkers <= 0)
+ {
+ pg_log_error("invalid number of parallel jobs");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ if (numWorkers > 1 && format != 'p')
+ {
+ pg_log_error("Worker can only be specified in plain mode");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef HAVE_LIBZ
if (compresslevel != 0)
{
@@ -2545,7 +2658,145 @@ main(int argc, char **argv)
}
BaseBackup();
-
success = true;
return 0;
}
+
+static void
+ParallelBackupEnd(void)
+{
+ PGresult *res = NULL;
+ int i = 0;
+ char *basebkp;
+
+ basebkp = psprintf("STOP_BACKUP"); /* FIXME: add "WAL" to the command, to handle -X FETCH command option. */
+
+ if (PQsendQuery(conn, basebkp) == 0)
+ {
+ pg_log_error("could not execute STOP BACKUP \"%s\"",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ /* receive backup_label and pg_control files */
+ ReceiveAndUnpackTarFile(conn, res, i);
+ PQclear(res);
+}
+
+static int
+getFiles(SimpleStringList *files)
+{
+ SimpleStringListCell *cell;
+ PGresult *res = NULL;
+ int i = 0;
+
+ PQExpBuffer buf;
+ buf = createPQExpBuffer();
+
+ /* build query in form of: SEND_FILES_CONTENT ('base/1/1245/32683', 'base/1/1245/32683', ...) */
+ appendPQExpBuffer(buf, "SEND_FILES_CONTENT ( ");
+ for (cell = files->head; cell; cell = cell->next)
+ {
+ char *str = cell->val; // skip './'
+
+ if (str == NULL)
+ continue;
+
+ if (str[0] == '.' && str[1] == '/')
+ str += 2;
+
+ i++;
+ if (cell != files->tail)
+ appendPQExpBuffer(buf, "'%s' ,", str);
+ else
+ appendPQExpBuffer(buf, "'%s'", str);
+ }
+ appendPQExpBufferStr(buf, " )");
+
+ PGconn *worker_conn = GetConnection();
+ if (!worker_conn)
+ return 1;
+
+
+ if (PQsendQuery(worker_conn, buf->data) == 0)
+ {
+ pg_log_error("could not send files list \"%s\"",
+ PQerrorMessage(worker_conn));
+ return 1;
+ }
+ destroyPQExpBuffer(buf);
+
+// if (format == 't')
+// ReceiveTarFile(conn1, res, i);
+// else
+ ReceiveAndUnpackTarFile(worker_conn, res, i);
+
+ res = PQgetResult(worker_conn); //NoData
+ res = PQgetResult(worker_conn); //CopyDone
+
+ PQclear(res);
+ PQfinish(worker_conn);
+
+ return 0;
+}
+
+static SimpleStringList**
+divideFilesList(SimpleStringList *files, int numFiles)
+{
+ SimpleStringList **worker_files;
+ SimpleStringListCell *cell;
+ int file_per_worker = (numFiles / numWorkers) + 1;
+ int cnt = 0, i = 0;
+
+ /* init worker_files */
+ worker_files = (SimpleStringList**) palloc0(sizeof(SimpleStringList) * numWorkers);
+ for (i = 0; i < numWorkers; i++)
+ worker_files[i] = (SimpleStringList*) palloc0(sizeof(SimpleStringList));
+
+ /* copy file to worker_files[] */
+ i = 0;
+ for (cell = files->head; cell; cell = cell->next)
+ {
+ if (i >= file_per_worker)
+ {
+ printf("%d files for worker %d\n", i, cnt);
+ cnt ++;
+ i = 0;
+ }
+
+ simple_string_list_append(worker_files[cnt], cell->val);
+ i++;
+ }
+
+ return worker_files;
+}
+
+
+static void
+create_workers_and_fetch(SimpleStringList **worker_files)
+{
+ worker_process = (int*) palloc(sizeof(int) * numWorkers);
+ int status;
+ int pid, i;
+ for (i = 0; i < numWorkers; i++)
+ {
+ worker_process[i] = fork();
+ if (worker_process[i] == 0)
+ {
+ /* in child process */
+ _exit(getFiles(worker_files[i]));
+ }
+ else if (worker_process[i] < 0)
+ {
+ pg_log_error("could not create background process: %m");
+ exit(1);
+ }
+
+ pg_log_info("process (%d) created", worker_process[i]);
+ /*
+ * Else we are in the parent process and all is well.
+ */
+ }
+
+ while (waitpid(-1, NULL, WNOHANG) > 0);
+}
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 1e3ed4e19f..b4127864c2 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,12 @@ typedef enum ReplicationKind
REPLICATION_KIND_LOGICAL
} ReplicationKind;
+typedef enum BackupCmdTag
+{
+ BASE_BACKUP,
+ SEND_FILES_CONTENT,
+ STOP_BACKUP
+} BackupCmdTag;
/* ----------------------
* IDENTIFY_SYSTEM command
@@ -42,6 +48,8 @@ typedef struct BaseBackupCmd
{
NodeTag type;
List *options;
+ BackupCmdTag cmdtag;
+ List *backupfiles;
} BaseBackupCmd;
--
2.20.1 (Apple Git-117)