0004-Parallel-Backup-Backend-Replication-commands_v7.patch
application/octet-stream
Filename: 0004-Parallel-Backup-Backend-Replication-commands_v7.patch
Type: application/octet-stream
Part: 1
Message:
Re: WIP/PoC for parallel backup
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v7-0004
Subject: Parallel Backup - Backend Replication commands
| File | + | − |
|---|---|---|
| src/backend/access/transam/xlog.c | 1 | 1 |
| src/backend/replication/basebackup.c | 489 | 14 |
| src/backend/replication/repl_gram.y | 167 | 34 |
| src/backend/replication/repl_scanner.l | 6 | 0 |
| src/include/nodes/replnodes.h | 10 | 0 |
| src/include/replication/basebackup.h | 1 | 1 |
From 6ccc7e8970e3ccf9183627692c50a235e05c8c68 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Sun, 13 Oct 2019 22:59:28 +0500
Subject: [PATCH 4/7] Parallel Backup - Backend Replication commands
This feature adds four new replication commands to the backend replication
system, to help facilitate taking a full backup in parallel using multiple
connections.
- START_BACKUP [ LABEL 'label' ] [ PROGRESS ] [ FAST ] [ TABLESPACE_MAP ]
This command instructs the server to get prepared for performing an
online backup.
- STOP_BACKUP [ LABEL 'label' ] [ WAL ] [ NOWAIT ]
This command instructs the server that online backup is finished. It
will bring the system out of backup mode.
- SEND_FILE_LIST
Instruct the server to return a list of files and directories that
are available in $PGDATA directory.
- SEND_FILES ( 'FILE' [, ...] ) [ MAX_RATE rate ] [ NOVERIFY_CHECKSUMS ]
[ START_WAL_LOCATION ]
Instructs the server to send the contents of the requested FILE(s).
---
src/backend/access/transam/xlog.c | 2 +-
src/backend/replication/basebackup.c | 503 ++++++++++++++++++++++++-
src/backend/replication/repl_gram.y | 201 ++++++++--
src/backend/replication/repl_scanner.l | 6 +
src/include/nodes/replnodes.h | 10 +
src/include/replication/basebackup.h | 2 +-
6 files changed, 674 insertions(+), 50 deletions(-)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c20dc447f1..f8d9e0655a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -12288,7 +12288,7 @@ collectTablespaces(List **tablespaces, StringInfo tblspcmapfile,
ti->oid = pstrdup(de->d_name);
ti->path = pstrdup(buflinkpath.data);
ti->rpath = relpath ? pstrdup(relpath) : NULL;
- ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+ ti->size = infotbssize ? sendTablespace(fullpath, true, NULL) : -1;
if (tablespaces)
*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index dee590f16a..30d06d72a9 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -38,6 +38,7 @@
#include "storage/ipc.h"
#include "storage/reinit.h"
#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/relcache.h"
#include "utils/timestamp.h"
@@ -51,11 +52,20 @@ typedef struct
bool includewal;
uint32 maxrate;
bool sendtblspcmapfile;
+ XLogRecPtr startwallocation;
} basebackup_options;
+typedef struct
+{
+ char path[MAXPGPATH];
+ char type;
+ size_t size;
+ time_t mtime;
+} BackupFile;
+
static int64 sendDir(const char *path, int basepathlen, bool dryrun,
- List *tablespaces, bool sendtblspclinks);
+ List *tablespaces, bool sendtblspclinks, List **filelist);
static bool sendFile(const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid);
static void sendFileWithContent(const char *filename, const char *content);
@@ -75,6 +85,13 @@ static void throttle(size_t increment);
static void setup_throttle(int maxrate);
static bool is_checksummed_file(const char *fullpath, const char *filename);
+static void StartBackup(basebackup_options *opt);
+static void StopBackup(basebackup_options *opt);
+static void SendFileList(void);
+static void SendFiles(basebackup_options *opt, List *filenames, bool missing_ok);
+static void addToBackupFileList(List **filelist, char *path, char type, size_t size,
+ time_t mtime);
+
/* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false;
@@ -289,7 +306,7 @@ perform_base_backup(basebackup_options *opt)
/* Add a node for the base directory at the end */
ti = palloc0(sizeof(tablespaceinfo));
- ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+ ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
tablespaces = lappend(tablespaces, ti);
/* Send tablespace header */
@@ -323,10 +340,10 @@ perform_base_backup(basebackup_options *opt)
if (tblspc_map_file && opt->sendtblspcmapfile)
{
sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data);
- sendDir(".", 1, false, tablespaces, false);
+ sendDir(".", 1, false, tablespaces, false, NULL);
}
else
- sendDir(".", 1, false, tablespaces, true);
+ sendDir(".", 1, false, tablespaces, true, NULL);
/* ... and pg_control after everything else. */
if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -337,7 +354,7 @@ perform_base_backup(basebackup_options *opt)
sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
}
else
- sendTablespace(ti->path, false);
+ sendTablespace(ti->path, false, NULL);
/*
* If we're including WAL, and this is the main data directory we
@@ -409,6 +426,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_maxrate = false;
bool o_tablespace_map = false;
bool o_noverify_checksums = false;
+ bool o_startwallocation = false;
MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
@@ -497,12 +515,24 @@ parse_basebackup_options(List *options, basebackup_options *opt)
noverify_checksums = true;
o_noverify_checksums = true;
}
+ else if (strcmp(defel->defname, "start_wal_location") == 0)
+ {
+ bool have_error = false;
+ char *startwallocation;
+
+ if (o_startwallocation)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ startwallocation = strVal(defel->arg);
+ opt->startwallocation = pg_lsn_in_internal(startwallocation, &have_error);
+ o_startwallocation = true;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
- if (opt->label == NULL)
- opt->label = "base backup";
}
@@ -520,6 +550,15 @@ SendBaseBackup(BaseBackupCmd *cmd)
parse_basebackup_options(cmd->options, &opt);
+ /* default value for label, if not specified. */
+ if (opt.label == NULL)
+ {
+ if (cmd->cmdtag == BASE_BACKUP)
+ opt.label = "base backup";
+ else
+ opt.label = "start backup";
+ }
+
WalSndSetState(WALSNDSTATE_BACKUP);
if (update_process_title)
@@ -531,7 +570,29 @@ SendBaseBackup(BaseBackupCmd *cmd)
set_ps_display(activitymsg, false);
}
- perform_base_backup(&opt);
+ switch (cmd->cmdtag)
+ {
+ case BASE_BACKUP:
+ perform_base_backup(&opt);
+ break;
+ case START_BACKUP:
+ StartBackup(&opt);
+ break;
+ case SEND_FILE_LIST:
+ SendFileList();
+ break;
+ case SEND_FILES:
+ SendFiles(&opt, cmd->backupfiles, true);
+ break;
+ case STOP_BACKUP:
+ StopBackup(&opt);
+ break;
+
+ default:
+ elog(ERROR, "unrecognized replication command tag: %u",
+ cmd->cmdtag);
+ break;
+ }
}
static void
@@ -674,6 +735,61 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
pq_puttextmessage('C', "SELECT");
}
+/*
+ * Send a single resultset containing backup label and tablespace map
+ */
+static void
+SendStartBackupResult(StringInfo labelfile, StringInfo tblspc_map_file)
+{
+ StringInfoData buf;
+ Size len;
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 2); /* 2 fields */
+
+ /* Field headers */
+ pq_sendstring(&buf, "label");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, TEXTOID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ pq_sendstring(&buf, "tablespacemap");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, TEXTOID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ /* Data row */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 2); /* number of columns */
+
+ len = labelfile->len;
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, labelfile->data, len);
+
+ if (tblspc_map_file)
+ {
+ len = tblspc_map_file->len;
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, tblspc_map_file->data, len);
+ }
+ else
+ {
+ pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
+ }
+
+ pq_endmessage(&buf);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
/*
* Inject a file with given name and content in the output tar stream.
*/
@@ -725,7 +841,7 @@ sendFileWithContent(const char *filename, const char *content)
* Only used to send auxiliary tablespaces, not PGDATA.
*/
int64
-sendTablespace(char *path, bool dryrun)
+sendTablespace(char *path, bool dryrun, List **filelist)
{
int64 size;
char pathbuf[MAXPGPATH];
@@ -754,11 +870,11 @@ sendTablespace(char *path, bool dryrun)
return 0;
}
+ addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
dryrun);
-
/* Send all the files in the tablespace version directory */
- size += sendDir(pathbuf, strlen(path), dryrun, NIL, true);
+ size += sendDir(pathbuf, strlen(path), dryrun, NIL, true, filelist);
return size;
}
@@ -777,7 +893,7 @@ sendTablespace(char *path, bool dryrun)
*/
static int64
sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
- bool sendtblspclinks)
+ bool sendtblspclinks, List **filelist)
{
DIR *dir;
struct dirent *de;
@@ -931,6 +1047,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+ addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
excludeFound = true;
break;
@@ -947,6 +1065,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
+
+ addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
continue;
}
@@ -968,6 +1088,10 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
dryrun);
+ addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
+ addToBackupFileList(filelist, "./pg_wal/archive_status", 'd', -1,
+ statbuf.st_mtime);
+
continue; /* don't recurse into pg_wal */
}
@@ -997,6 +1121,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
pathbuf)));
linkpath[rllen] = '\0';
+ addToBackupFileList(filelist, pathbuf, 'l', statbuf.st_size, statbuf.st_mtime);
size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
&statbuf, dryrun);
#else
@@ -1023,6 +1148,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
*/
size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
dryrun);
+ addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
/*
* Call ourselves recursively for a directory, unless it happens
@@ -1053,13 +1179,15 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
skip_this_dir = true;
if (!skip_this_dir)
- size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks);
+ size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks, filelist);
}
else if (S_ISREG(statbuf.st_mode))
{
bool sent = false;
- if (!dryrun)
+ addToBackupFileList(filelist, pathbuf, 'f', statbuf.st_size, statbuf.st_mtime);
+
+ if (!dryrun && filelist == NULL)
sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
@@ -1762,3 +1890,350 @@ setup_throttle(int maxrate)
throttling_counter = -1;
}
}
+
+/*
+ * StartBackup - prepare to start an online backup.
+ *
+ * This function calls do_pg_start_backup() and sends back starting checkpoint,
+ * available tablespaces, content of backup_label and tablespace_map files.
+ */
+static void
+StartBackup(basebackup_options *opt)
+{
+ TimeLineID starttli;
+ StringInfo labelfile;
+ StringInfo tblspc_map_file = NULL;
+ int datadirpathlen;
+ List *tablespaces = NIL;
+ tablespaceinfo *ti;
+
+ datadirpathlen = strlen(DataDir);
+
+ backup_started_in_recovery = RecoveryInProgress();
+
+ labelfile = makeStringInfo();
+ tblspc_map_file = makeStringInfo();
+
+ total_checksum_failures = 0;
+
+ startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+ labelfile, &tablespaces,
+ tblspc_map_file,
+ opt->progress, opt->sendtblspcmapfile);
+
+ /*
+ * Once do_pg_start_backup has been called, ensure that any failure causes
+ * us to abort the backup so we don't "leak" a backup counter. For this
+ * reason, register base_backup_cleanup with before_shmem_exit handler.
+ * This will make sure that call is always made when process exits. In
+ * success, do_pg_stop_backup will have taken the system out of backup
+ * mode and this callback will have no effect, Otherwise the required
+ * cleanup will be done in any case.
+ */
+ before_shmem_exit(base_backup_cleanup, (Datum) 0);
+
+ SendXlogRecPtrResult(startptr, starttli);
+
+ /*
+ * Calculate the relative path of temporary statistics directory in order
+ * to skip the files which are located in that directory later.
+ */
+ if (is_absolute_path(pgstat_stat_directory) &&
+ strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
+ statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
+ else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
+ statrelpath = psprintf("./%s", pgstat_stat_directory);
+ else
+ statrelpath = pgstat_stat_directory;
+
+ /* Add a node for the base directory at the end */
+ ti = palloc0(sizeof(tablespaceinfo));
+ ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
+ tablespaces = lappend(tablespaces, ti);
+
+ /* Send tablespace header */
+ SendBackupHeader(tablespaces);
+
+ /* Setup and activate network throttling, if client requested it */
+ setup_throttle(opt->maxrate);
+
+ if ((tblspc_map_file && tblspc_map_file->len <= 0) ||
+ !opt->sendtblspcmapfile)
+ tblspc_map_file = NULL;
+
+ /* send backup_label and tablespace_map to frontend */
+ SendStartBackupResult(labelfile, tblspc_map_file);
+}
+
+/*
+ * StopBackup() - ends an online backup
+ *
+ * The function is called at the end of an online backup. It sends out pg_control
+ * file, optionally WAL segments and ending WAL location.
+ */
+static void
+StopBackup(basebackup_options *opt)
+{
+ TimeLineID endtli;
+ XLogRecPtr endptr;
+ struct stat statbuf;
+ StringInfoData buf;
+ char *labelfile = NULL;
+
+ if (get_backup_status() != SESSION_BACKUP_NON_EXCLUSIVE)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("non-exclusive backup is not in progress")));
+
+ /* Setup and activate network throttling, if client requested it */
+ setup_throttle(opt->maxrate);
+
+ /* Send CopyOutResponse message */
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+
+ /* ... and pg_control after everything else. */
+ if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ XLOG_CONTROL_FILE)));
+ sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+
+ /* stop backup */
+ labelfile = (char *) opt->label;
+ endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
+
+ if (opt->includewal)
+ include_wal_files(endptr);
+
+ pq_putemptymessage('c'); /* CopyDone */
+ SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * SendFileList() - sends a list of filenames to frontend
+ *
+ * The function collects a list of filenames, necessary for a complete backup and
+ * sends this list to the client.
+ */
+static void
+SendFileList(void)
+{
+ StringInfoData buf;
+ ListCell *lc;
+ List *tablespaces = NIL;
+ StringInfo tblspc_map_file = NULL;
+ tablespaceinfo *ti;
+
+ tblspc_map_file = makeStringInfo();
+ collectTablespaces(&tablespaces, tblspc_map_file, false, false);
+
+ /* Add a node for the base directory at the end */
+ ti = palloc0(sizeof(tablespaceinfo));
+ tablespaces = lappend(tablespaces, ti);
+
+ foreach(lc, tablespaces)
+ {
+ List *filelist = NULL;
+ tablespaceinfo *ti;
+
+ ti = (tablespaceinfo *) lfirst(lc);
+ if (ti->path == NULL)
+ sendDir(".", 1, true, NIL, true, &filelist);
+ else
+ sendTablespace(ti->path, true, &filelist);
+
+ /* Construct and send the list of filenames */
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 4); /* n field */
+
+ /* First field - file name */
+ pq_sendstring(&buf, "path");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, TEXTOID);
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Second field - is_dir */
+ pq_sendstring(&buf, "type");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, CHAROID);
+ pq_sendint16(&buf, 1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - size */
+ pq_sendstring(&buf, "size");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - mtime */
+ pq_sendstring(&buf, "mtime");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ foreach(lc, filelist)
+ {
+ BackupFile *backupFile = (BackupFile *) lfirst(lc);
+ Size len;
+
+ /* Send one datarow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 4); /* number of columns */
+
+ /* send path */
+ len = strlen(backupFile->path);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, backupFile->path, len);
+
+ /* send type */
+ pq_sendint32(&buf, 1);
+ pq_sendbyte(&buf, backupFile->type);
+
+ /* send size */
+ send_int8_string(&buf, backupFile->size);
+
+ /* send mtime */
+ send_int8_string(&buf, backupFile->mtime);
+
+ pq_endmessage(&buf);
+ }
+
+ if (filelist)
+ pfree(filelist);
+ }
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * SendFiles() - sends the actual files to the caller
+ *
+ * The function sends out the given file(s) over to the caller using the COPY
+ * protocol. It does only entertains the regular files and any other kind such
+ * as directories or symlink etc will be ignored.
+ */
+static void
+SendFiles(basebackup_options *opt, List *filenames, bool missing_ok)
+{
+ StringInfoData buf;
+ ListCell *lc;
+ int basepathlen = 1;
+
+ if (list_length(filenames) <= 0)
+ return;
+
+ total_checksum_failures = 0;
+
+ /* Setup and activate network throttling, if client requested it */
+ setup_throttle(opt->maxrate);
+
+ /* set backup start location. */
+ startptr = opt->startwallocation;
+
+ /* Send CopyOutResponse message */
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+
+ foreach(lc, filenames)
+ {
+ struct stat statbuf;
+ char *pathbuf;
+
+ pathbuf = (char *) strVal(lfirst(lc));
+ if (is_absolute_path(pathbuf))
+ {
+ char *basepath;
+
+ /*
+ * 'pathbuf' points to the tablespace location, but we only want
+ * to include the version directory in it that belongs to us.
+ */
+ basepath = strstr(pathbuf, TABLESPACE_VERSION_DIRECTORY);
+ if (basepath)
+ basepathlen = basepath - pathbuf - 1;
+ }
+
+ if (lstat(pathbuf, &statbuf) != 0)
+ {
+ if (errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file or directory \"%s\": %m",
+ pathbuf)));
+
+ /* If the file went away while scanning, it's not an error. */
+ continue;
+ }
+
+ /*
+ * Only entertain requests for regular file, skip any directories or
+ * special files.
+ */
+ if (S_ISREG(statbuf.st_mode))
+ {
+ /* send file to client */
+ sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, InvalidOid);
+ }
+ else
+ ereport(WARNING,
+ (errmsg("skipping special file or directory \"%s\"", pathbuf)));
+ }
+
+ pq_putemptymessage('c'); /* CopyDone */
+
+ /*
+ * Check for checksum failures. If there are failures across multiple
+ * processes it may not report total checksum count, but it will error
+ * out,terminating the backup.
+ */
+ if (total_checksum_failures)
+ {
+ if (total_checksum_failures > 1)
+ ereport(WARNING,
+ (errmsg("%lld total checksum verification failures", total_checksum_failures)));
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("checksum verification failure during base backup")));
+ }
+}
+
+/*
+ * Construct a BackupFile entry and add to the list.
+ */
+static void
+addToBackupFileList(List **filelist, char *path, char type, size_t size,
+ time_t mtime)
+{
+ BackupFile *backupFile;
+
+ if (filelist)
+ {
+ backupFile = (BackupFile *) palloc0(sizeof(BackupFile));
+ strlcpy(backupFile->path, path, sizeof(backupFile->path));
+ backupFile->type = type;
+ backupFile->size = size;
+ backupFile->mtime = mtime;
+
+ *filelist = lappend(*filelist, backupFile);
+ }
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc4e8..0aa781ebdc 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,13 +87,24 @@ static SQLCmd *make_sqlcmd(void);
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
+%token K_START_BACKUP
+%token K_SEND_FILE_LIST
+%token K_SEND_FILES
+%token K_STOP_BACKUP
+%token K_START_WAL_LOCATION
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot identify_system
timeline_history show sql_cmd
%type <list> base_backup_opt_list
+ start_backup_opt_list stop_backup_opt_list
+ send_backup_files_opt_list
%type <defelt> base_backup_opt
+ backup_opt_label backup_opt_progress backup_opt_maxrate
+ backup_opt_fast backup_opt_tsmap backup_opt_wal backup_opt_nowait
+ backup_opt_chksum backup_opt_wal_loc
+ start_backup_opt stop_backup_opt send_backup_files_opt
%type <uintval> opt_timeline
%type <list> plugin_options plugin_opt_list
%type <defelt> plugin_opt_elem
@@ -102,6 +113,7 @@ static SQLCmd *make_sqlcmd(void);
%type <boolval> opt_temporary
%type <list> create_slot_opt_list
%type <defelt> create_slot_opt
+%type <list> backup_files backup_files_list
%%
@@ -162,10 +174,61 @@ base_backup:
{
BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
cmd->options = $2;
+ cmd->cmdtag = BASE_BACKUP;
+ $$ = (Node *) cmd;
+ }
+ | K_START_BACKUP start_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = START_BACKUP;
+ $$ = (Node *) cmd;
+ }
+ | K_SEND_FILE_LIST
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = NIL;
+ cmd->cmdtag = SEND_FILE_LIST;
+ $$ = (Node *) cmd;
+ }
+ | K_SEND_FILES backup_files send_backup_files_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $3;
+ cmd->cmdtag = SEND_FILES;
+ cmd->backupfiles = $2;
+ $$ = (Node *) cmd;
+ }
+ | K_STOP_BACKUP stop_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = STOP_BACKUP;
$$ = (Node *) cmd;
}
;
+start_backup_opt_list:
+ start_backup_opt_list start_backup_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+stop_backup_opt_list:
+ stop_backup_opt_list stop_backup_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+send_backup_files_opt_list:
+ send_backup_files_opt_list send_backup_files_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
base_backup_opt_list:
base_backup_opt_list base_backup_opt
{ $$ = lappend($1, $2); }
@@ -173,46 +236,116 @@ base_backup_opt_list:
{ $$ = NIL; }
;
+start_backup_opt:
+ backup_opt_label { $$ = $1; }
+ | backup_opt_progress { $$ = $1; }
+ | backup_opt_fast { $$ = $1; }
+ | backup_opt_tsmap { $$ = $1; }
+ ;
+
+stop_backup_opt:
+ backup_opt_label { $$ = $1; }
+ | backup_opt_wal { $$ = $1; }
+ | backup_opt_nowait { $$ = $1; }
+ ;
+
+send_backup_files_opt:
+ backup_opt_maxrate { $$ = $1; }
+ | backup_opt_chksum { $$ = $1; }
+ | backup_opt_wal_loc { $$ = $1; }
+ ;
+
base_backup_opt:
- K_LABEL SCONST
- {
- $$ = makeDefElem("label",
- (Node *)makeString($2), -1);
- }
- | K_PROGRESS
- {
- $$ = makeDefElem("progress",
- (Node *)makeInteger(true), -1);
- }
- | K_FAST
- {
- $$ = makeDefElem("fast",
- (Node *)makeInteger(true), -1);
- }
- | K_WAL
- {
- $$ = makeDefElem("wal",
- (Node *)makeInteger(true), -1);
- }
- | K_NOWAIT
- {
- $$ = makeDefElem("nowait",
- (Node *)makeInteger(true), -1);
- }
- | K_MAX_RATE UCONST
+ backup_opt_label { $$ = $1; }
+ | backup_opt_progress { $$ = $1; }
+ | backup_opt_fast { $$ = $1; }
+ | backup_opt_wal { $$ = $1; }
+ | backup_opt_nowait { $$ = $1; }
+ | backup_opt_maxrate { $$ = $1; }
+ | backup_opt_tsmap { $$ = $1; }
+ | backup_opt_chksum { $$ = $1; }
+ ;
+
+backup_opt_label:
+ K_LABEL SCONST
+ {
+ $$ = makeDefElem("label",
+ (Node *)makeString($2), -1);
+ };
+
+backup_opt_progress:
+ K_PROGRESS
+ {
+ $$ = makeDefElem("progress",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_fast:
+ K_FAST
+ {
+ $$ = makeDefElem("fast",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_wal:
+ K_WAL
+ {
+ $$ = makeDefElem("wal",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_nowait:
+ K_NOWAIT
+ {
+ $$ = makeDefElem("nowait",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_maxrate:
+ K_MAX_RATE UCONST
+ {
+ $$ = makeDefElem("max_rate",
+ (Node *)makeInteger($2), -1);
+ };
+
+backup_opt_tsmap:
+ K_TABLESPACE_MAP
+ {
+ $$ = makeDefElem("tablespace_map",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_chksum:
+ K_NOVERIFY_CHECKSUMS
+ {
+ $$ = makeDefElem("noverify_checksums",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_wal_loc:
+ K_START_WAL_LOCATION SCONST
+ {
+ $$ = makeDefElem("start_wal_location",
+ (Node *)makeString($2), -1);
+ };
+
+backup_files:
+ '(' backup_files_list ')'
{
- $$ = makeDefElem("max_rate",
- (Node *)makeInteger($2), -1);
+ $$ = $2;
}
- | K_TABLESPACE_MAP
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+backup_files_list:
+ SCONST
{
- $$ = makeDefElem("tablespace_map",
- (Node *)makeInteger(true), -1);
+ $$ = list_make1(makeString($1));
}
- | K_NOVERIFY_CHECKSUMS
+ | backup_files_list ',' SCONST
{
- $$ = makeDefElem("noverify_checksums",
- (Node *)makeInteger(true), -1);
+ $$ = lappend($1, makeString($3));
}
;
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb5f6..d2e2dfe1e9 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,12 @@ EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
WAIT { return K_WAIT; }
+START_BACKUP { return K_START_BACKUP; }
+SEND_FILE_LIST { return K_SEND_FILE_LIST; }
+SEND_FILES { return K_SEND_FILES; }
+STOP_BACKUP { return K_STOP_BACKUP; }
+START_WAL_LOCATION { return K_START_WAL_LOCATION; }
+
"," { return ','; }
";" { return ';'; }
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 1e3ed4e19f..eac4802c7e 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,14 @@ typedef enum ReplicationKind
REPLICATION_KIND_LOGICAL
} ReplicationKind;
+typedef enum BackupCmdTag
+{
+ BASE_BACKUP,
+ START_BACKUP,
+ SEND_FILE_LIST,
+ SEND_FILES,
+ STOP_BACKUP
+} BackupCmdTag;
/* ----------------------
* IDENTIFY_SYSTEM command
@@ -42,6 +50,8 @@ typedef struct BaseBackupCmd
{
NodeTag type;
List *options;
+ BackupCmdTag cmdtag;
+ List *backupfiles;
} BaseBackupCmd;
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index b55917b9b6..5202e4160b 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,6 @@ typedef struct
extern void SendBaseBackup(BaseBackupCmd *cmd);
-extern int64 sendTablespace(char *path, bool dryrun);
+extern int64 sendTablespace(char *path, bool dryrun, List **filelist);
#endif /* _BASEBACKUP_H */
--
2.21.0 (Apple Git-122.2)