0002-backend-changes-for-parallel-backup.patch
application/octet-stream
Filename: 0002-backend-changes-for-parallel-backup.patch
Type: application/octet-stream
Part: 1
Message:
Re: WIP/PoC for parallel backup
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0002
Subject: backend changes for parallel backup
| File | + | − |
|---|---|---|
| src/backend/access/transam/xlog.c | 1 | 1 |
| src/backend/replication/basebackup.c | 580 | 9 |
| src/backend/replication/repl_gram.y | 72 | 0 |
| src/backend/replication/repl_scanner.l | 7 | 0 |
| src/include/nodes/replnodes.h | 10 | 0 |
| src/include/replication/basebackup.h | 1 | 1 |
From b60e132d16ddd026b48fc095a285458725ec74ca Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Sun, 13 Oct 2019 22:59:28 +0500
Subject: [PATCH 2/4] backend changes for parallel backup
---
src/backend/access/transam/xlog.c | 2 +-
src/backend/replication/basebackup.c | 589 ++++++++++++++++++++++++-
src/backend/replication/repl_gram.y | 72 +++
src/backend/replication/repl_scanner.l | 7 +
src/include/nodes/replnodes.h | 10 +
src/include/replication/basebackup.h | 2 +-
6 files changed, 671 insertions(+), 11 deletions(-)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 54a430d041..eafa531389 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -12249,7 +12249,7 @@ collectTablespaces(List **tablespaces, StringInfo tblspcmapfile,
ti->oid = pstrdup(de->d_name);
ti->path = pstrdup(buflinkpath.data);
ti->rpath = relpath ? pstrdup(relpath) : NULL;
- ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+ ti->size = infotbssize ? sendTablespace(fullpath, true, NULL) : -1;
if (tablespaces)
*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 5f25f5848d..e77e0114e1 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -41,6 +41,7 @@
#include "utils/ps_status.h"
#include "utils/relcache.h"
#include "utils/timestamp.h"
+#include "utils/pg_lsn.h"
typedef struct
@@ -52,11 +53,34 @@ typedef struct
bool includewal;
uint32 maxrate;
bool sendtblspcmapfile;
+ bool exclusive;
+ XLogRecPtr lsn;
} basebackup_options;
+typedef struct
+{
+ char name[MAXPGPATH];
+ char type;
+ int32 size;
+ time_t mtime;
+} BackupFile;
+
+#define STORE_BACKUPFILE(_backupfiles, _name, _type, _size, _mtime) \
+ do { \
+ if (_backupfiles != NULL) { \
+ BackupFile *file = palloc0(sizeof(BackupFile)); \
+ strlcpy(file->name, _name, sizeof(file->name)); \
+ file->type = _type; \
+ file->size = _size; \
+ file->mtime = _mtime; \
+ *_backupfiles = lappend(*_backupfiles, file); \
+ } \
+ } while(0)
static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
List *tablespaces, bool sendtblspclinks);
+static int64 sendDir_(const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks, List **files);
static bool sendFile(const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid);
static void sendFileWithContent(const char *filename, const char *content);
@@ -76,6 +100,12 @@ static void throttle(size_t increment);
static void setup_throttle(int maxrate);
static bool is_checksummed_file(const char *fullpath, const char *filename);
+static void StartBackup(basebackup_options *opt);
+static void StopBackup(basebackup_options *opt);
+static void SendFileList(basebackup_options *opt);
+static void SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok);
+static char *readfile(const char *readfilename, bool missing_ok);
+
/* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false;
@@ -337,7 +367,7 @@ perform_base_backup(basebackup_options *opt)
sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
}
else
- sendTablespace(ti->path, false);
+ sendTablespace(ti->path, false, NULL);
/*
* If we're including WAL, and this is the main data directory we
@@ -412,6 +442,8 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_maxrate = false;
bool o_tablespace_map = false;
bool o_noverify_checksums = false;
+ bool o_exclusive = false;
+ bool o_lsn = false;
MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
@@ -500,6 +532,30 @@ parse_basebackup_options(List *options, basebackup_options *opt)
noverify_checksums = true;
o_noverify_checksums = true;
}
+ else if (strcmp(defel->defname, "exclusive") == 0)
+ {
+ if (o_exclusive)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ opt->exclusive = intVal(defel->arg);
+ o_exclusive = true;
+ }
+ else if (strcmp(defel->defname, "lsn") == 0)
+ {
+ bool have_error = false;
+ char *lsn;
+
+ if (o_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ lsn = strVal(defel->arg);
+ opt->lsn = pg_lsn_in_internal(lsn, &have_error);
+ o_lsn = true;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
@@ -534,7 +590,29 @@ SendBaseBackup(BaseBackupCmd *cmd)
set_ps_display(activitymsg, false);
}
- perform_base_backup(&opt);
+ switch (cmd->cmdtag)
+ {
+ case BASE_BACKUP:
+ perform_base_backup(&opt);
+ break;
+ case START_BACKUP:
+ StartBackup(&opt);
+ break;
+ case SEND_FILE_LIST:
+ SendFileList(&opt);
+ break;
+ case SEND_FILES_CONTENT:
+ SendFilesContents(&opt, cmd->backupfiles, true);
+ break;
+ case STOP_BACKUP:
+ StopBackup(&opt);
+ break;
+
+ default:
+ elog(ERROR, "unrecognized replication command tag: %u",
+ cmd->cmdtag);
+ break;
+ }
}
static void
@@ -677,6 +755,61 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
pq_puttextmessage('C', "SELECT");
}
+/*
+ * Send a single resultset containing backup label and tablespace map
+ */
+static void
+SendStartBackupResult(StringInfo labelfile, StringInfo tblspc_map_file)
+{
+ StringInfoData buf;
+ Size len;
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 2); /* 2 fields */
+
+ /* Field headers */
+ pq_sendstring(&buf, "label");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, TEXTOID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ pq_sendstring(&buf, "tablespacemap");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, TEXTOID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ /* Data row */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 2); /* number of columns */
+
+ len = labelfile->len;
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, labelfile->data, len);
+
+ if (tblspc_map_file)
+ {
+ len = tblspc_map_file->len;
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, tblspc_map_file->data, len);
+ }
+ else
+ {
+ pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
+ }
+
+ pq_endmessage(&buf);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
/*
* Inject a file with given name and content in the output tar stream.
*/
@@ -728,7 +861,7 @@ sendFileWithContent(const char *filename, const char *content)
* Only used to send auxiliary tablespaces, not PGDATA.
*/
int64
-sendTablespace(char *path, bool sizeonly)
+sendTablespace(char *path, bool sizeonly, List **files)
{
int64 size;
char pathbuf[MAXPGPATH];
@@ -757,11 +890,11 @@ sendTablespace(char *path, bool sizeonly)
return 0;
}
+ STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
sizeonly);
-
/* Send all the files in the tablespace version directory */
- size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true);
+ size += sendDir_(pathbuf, strlen(path), sizeonly, NIL, true, files);
return size;
}
@@ -779,8 +912,16 @@ sendTablespace(char *path, bool sizeonly)
* as it will be sent separately in the tablespace_map file.
*/
static int64
-sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
- bool sendtblspclinks)
+sendDir(const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks)
+{
+ return sendDir_(path, basepathlen, sizeonly, tablespaces, sendtblspclinks, NULL);
+}
+
+/* Same as sendDir(), except that it also returns a list of filenames in PGDATA */
+static int64
+sendDir_(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
+ bool sendtblspclinks, List **files)
{
DIR *dir;
struct dirent *de;
@@ -934,6 +1075,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+ STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
excludeFound = true;
break;
@@ -950,6 +1093,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
+
+ STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
continue;
}
@@ -971,6 +1116,9 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
sizeonly);
+ STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
+ STORE_BACKUPFILE(files, "./pg_wal/archive_status", 'd', -1, statbuf.st_mtime);
+
continue; /* don't recurse into pg_wal */
}
@@ -1000,6 +1148,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
pathbuf)));
linkpath[rllen] = '\0';
+ STORE_BACKUPFILE(files, pathbuf, 'l', statbuf.st_size, statbuf.st_mtime);
size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
&statbuf, sizeonly);
#else
@@ -1026,6 +1175,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
*/
size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
sizeonly);
+ STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
+
/*
* Call ourselves recursively for a directory, unless it happens
@@ -1056,13 +1207,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
skip_this_dir = true;
if (!skip_this_dir)
- size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
+ size += sendDir_(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, files);
}
else if (S_ISREG(statbuf.st_mode))
{
bool sent = false;
- if (!sizeonly)
+ STORE_BACKUPFILE(files, pathbuf, 'f', statbuf.st_size, statbuf.st_mtime);
+
+ if (!sizeonly && files == NULL)
sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
@@ -1767,3 +1920,421 @@ setup_throttle(int maxrate)
throttling_counter = -1;
}
}
+
+/*
+ * StartBackup - prepare to start an online backup.
+ *
+ * This function calls do_pg_start_backup() and sends back starting checkpoint,
+ * available tablespaces, content of backup_label and tablespace_map files.
+ */
+static void
+StartBackup(basebackup_options *opt)
+{
+ TimeLineID starttli;
+ StringInfo labelfile;
+ StringInfo tblspc_map_file = NULL;
+ int datadirpathlen;
+ List *tablespaces = NIL;
+
+ datadirpathlen = strlen(DataDir);
+
+ backup_started_in_recovery = RecoveryInProgress();
+
+ labelfile = makeStringInfo();
+ tblspc_map_file = makeStringInfo();
+
+ total_checksum_failures = 0;
+
+ startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+ opt->exclusive? NULL : labelfile, &tablespaces,
+ tblspc_map_file,
+ opt->progress, opt->sendtblspcmapfile);
+
+ /*
+ * Once do_pg_start_backup has been called, ensure that any failure causes
+ * us to abort the backup so we don't "leak" a backup counter. For this
+ * reason, *all* functionality between do_pg_start_backup() and the end of
+ * do_pg_stop_backup() should be inside the error cleanup block!
+ */
+
+ PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+ {
+ tablespaceinfo *ti;
+
+ SendXlogRecPtrResult(startptr, starttli);
+
+ /*
+ * Calculate the relative path of temporary statistics directory in
+ * order to skip the files which are located in that directory later.
+ */
+ if (is_absolute_path(pgstat_stat_directory) &&
+ strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
+ statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
+ else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
+ statrelpath = psprintf("./%s", pgstat_stat_directory);
+ else
+ statrelpath = pgstat_stat_directory;
+
+ /* Add a node for the base directory at the end */
+ ti = palloc0(sizeof(tablespaceinfo));
+ ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+ tablespaces = lappend(tablespaces, ti);
+
+ /* Send tablespace header */
+ SendBackupHeader(tablespaces);
+
+ /* Setup and activate network throttling, if client requested it */
+ setup_throttle(opt->maxrate);
+
+ /*
+ * In exclusive mode, pg_start_backup creates backup_label and
+ * tablespace_map files and does not their contents in *labelfile
+ * and *tblspcmapfile. So we read them from these files to return
+ * to frontend.
+ *
+ * In non-exlusive mode, contents of these files are available in
+ * *labelfile and *tblspcmapfile and are retured directly.
+ */
+ if (opt->exclusive)
+ {
+ resetStringInfo(labelfile);
+ resetStringInfo(tblspc_map_file);
+
+ appendStringInfoString(labelfile, readfile(BACKUP_LABEL_FILE, false));
+ if (opt->sendtblspcmapfile)
+ appendStringInfoString(tblspc_map_file, readfile(TABLESPACE_MAP, false));
+ }
+
+ if ((tblspc_map_file && tblspc_map_file->len <= 0) ||
+ !opt->sendtblspcmapfile)
+ tblspc_map_file = NULL;
+
+ /* send backup_label and tablespace_map to frontend */
+ SendStartBackupResult(labelfile, tblspc_map_file);
+ }
+ PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+}
+
+/*
+ * StopBackup() - ends an online backup
+ *
+ * The function is called at the end of an online backup. It sends out pg_control
+ * file, optionaly WAL segments and ending WAL location.
+ */
+static void
+StopBackup(basebackup_options *opt)
+{
+ TimeLineID endtli;
+ XLogRecPtr endptr;
+ struct stat statbuf;
+ StringInfoData buf;
+ char *labelfile = NULL;
+
+ /* Setup and activate network throttling, if client requested it */
+ setup_throttle(opt->maxrate);
+
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+
+ /* ... and pg_control after everything else. */
+ if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ XLOG_CONTROL_FILE)));
+ sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+
+ /* stop backup */
+ if (!opt->exclusive)
+ labelfile = (char *) opt->label;
+ endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
+
+ if (opt->includewal)
+ include_wal_files(endptr);
+
+ pq_putemptymessage('c'); /* CopyDone */
+ SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * SendFileList() - sends a list of filenames to frontend
+ *
+ * The function collects a list of filenames, nessery for a complete backup and
+ * sends this list to the client.
+ */
+static void
+SendFileList(basebackup_options *opt)
+{
+ StringInfoData buf;
+ ListCell *lc;
+ List *tablespaces = NIL;
+ StringInfo tblspc_map_file = NULL;
+
+ tblspc_map_file = makeStringInfo();
+ collectTablespaces(&tablespaces, tblspc_map_file, false, false);
+
+ /* Add a node for the base directory at the end */
+ tablespaceinfo *ti = palloc0(sizeof(tablespaceinfo));
+ tablespaces = lappend(tablespaces, ti);
+
+ foreach(lc, tablespaces)
+ {
+ List *backupFiles = NULL;
+ tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
+
+ if (ti->path == NULL)
+ sendDir_(".", 1, false, NIL, !opt->sendtblspcmapfile, &backupFiles);
+ else
+ sendTablespace(ti->path, false, &backupFiles);
+
+ /* Construct and send the list of filenames */
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 4); /* n field */
+
+ /* First field - file name */
+ pq_sendstring(&buf, "filename");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, TEXTOID);
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Second field - is_dir */
+ pq_sendstring(&buf, "type");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, CHAROID);
+ pq_sendint16(&buf, 1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - size */
+ pq_sendstring(&buf, "size");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - mtime */
+ pq_sendstring(&buf, "mtime");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ foreach(lc, backupFiles)
+ {
+ BackupFile *backupFile = (BackupFile *) lfirst(lc);
+ Size len;
+
+ /* Send one datarow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 4); /* number of columns */
+
+ /* send file name */
+ len = strlen(backupFile->name);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, backupFile->name, len);
+
+ /* send type */
+ pq_sendint32(&buf, 1);
+ pq_sendbyte(&buf, backupFile->type);
+
+ /* send size */
+ send_int8_string(&buf, backupFile->size);
+
+ /* send mtime */
+ send_int8_string(&buf, backupFile->mtime);
+
+ pq_endmessage(&buf);
+ }
+
+ pfree(backupFiles);
+ }
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * SendFilesContents() - sends the actual files to the caller
+ *
+ * The function sends out the given file(s) over to the caller using the COPY
+ * protocol.
+ */
+static void
+SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok)
+{
+ StringInfoData buf;
+ ListCell *lc;
+ bool basetablespace = true;
+ int basepathlen = 1;
+
+ if (list_length(filenames) <= 0)
+ return;
+
+ total_checksum_failures = 0;
+
+ /* Setup and activate network throttling, if client requested it */
+ setup_throttle(opt->maxrate);
+
+ /*
+ * LABEL is reused here to identify the tablespace path on server. Its empty
+ * in case of 'base' tablespace.
+ */
+ if (is_absolute_path(opt->label))
+ {
+ basepathlen = strlen(opt->label);
+ basetablespace = false;
+ }
+
+ /* set backup start location. */
+ startptr = opt->lsn;
+
+ /* Send CopyOutResponse message */
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+
+ foreach(lc, filenames)
+ {
+ struct stat statbuf;
+ char *pathbuf;
+
+ pathbuf = (char *) strVal(lfirst(lc));
+ if (lstat(pathbuf, &statbuf) != 0)
+ {
+ if (errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file or directory \"%s\": %m",
+ pathbuf)));
+
+ /* If the file went away while scanning, it's not an error. */
+ continue;
+ }
+
+ /* Allow symbolic links in pg_tblspc only */
+ if (strstr(pathbuf, "./pg_tblspc") != NULL &&
+#ifndef WIN32
+ S_ISLNK(statbuf.st_mode)
+#else
+ pgwin32_is_junction(pathbuf)
+#endif
+ )
+ {
+ char linkpath[MAXPGPATH];
+ int rllen;
+
+ rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
+ if (rllen < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read symbolic link \"%s\": %m",
+ pathbuf)));
+ if (rllen >= sizeof(linkpath))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("symbolic link \"%s\" target is too long",
+ pathbuf)));
+ linkpath[rllen] = '\0';
+
+ _tarWriteHeader(pathbuf, linkpath, &statbuf, false);
+ }
+ else if (S_ISDIR(statbuf.st_mode))
+ {
+ _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+ }
+ else if (
+#ifndef WIN32
+ S_ISLNK(statbuf.st_mode)
+#else
+ pgwin32_is_junction(pathbuf)
+#endif
+ )
+ {
+ /*
+ * If symlink, write it as a directory. file symlinks only allowed
+ * in pg_tblspc
+ */
+ statbuf.st_mode = S_IFDIR | pg_dir_create_mode;
+ _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+ }
+ else
+ {
+ /* send file to client */
+ sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, InvalidOid);
+ }
+ }
+
+ pq_putemptymessage('c'); /* CopyDone */
+
+ /*
+ * Check for checksum failures. If there are failures across multiple
+ * processes it may not report totoal checksum count, but it will error
+ * out,terminating the backup.
+ */
+ if (total_checksum_failures)
+ {
+ if (total_checksum_failures > 1)
+ ereport(WARNING,
+ (errmsg("%lld total checksum verification failures", total_checksum_failures)));
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("checksum verification failure during base backup")));
+ }
+}
+
+static char *
+readfile(const char *readfilename, bool missing_ok)
+{
+ struct stat statbuf;
+ FILE *fp;
+ char *data;
+ int r;
+
+ if (stat(readfilename, &statbuf))
+ {
+ if (errno == ENOENT && missing_ok)
+ return NULL;
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ readfilename)));
+ }
+
+ fp = AllocateFile(readfilename, "r");
+ if (!fp)
+ {
+ if (errno == ENOENT && missing_ok)
+ return NULL;
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", readfilename)));
+ }
+
+ data = palloc(statbuf.st_size + 1);
+ r = fread(data, statbuf.st_size, 1, fp);
+ data[statbuf.st_size] = '\0';
+
+ /* Close the file */
+ if (r != 1 || ferror(fp) || FreeFile(fp))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m",
+ readfilename)));
+
+ return data;
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc4e8..bba437c785 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,6 +87,12 @@ static SQLCmd *make_sqlcmd(void);
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
+%token K_START_BACKUP
+%token K_SEND_FILE_LIST
+%token K_SEND_FILES_CONTENT
+%token K_STOP_BACKUP
+%token K_EXCLUSIVE
+%token K_LSN
%type <node> command
%type <node> base_backup start_replication start_logical_replication
@@ -102,6 +108,8 @@ static SQLCmd *make_sqlcmd(void);
%type <boolval> opt_temporary
%type <list> create_slot_opt_list
%type <defelt> create_slot_opt
+%type <list> backup_files backup_files_list
+%type <node> backup_file
%%
@@ -162,6 +170,36 @@ base_backup:
{
BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
cmd->options = $2;
+ cmd->cmdtag = BASE_BACKUP;
+ $$ = (Node *) cmd;
+ }
+ | K_START_BACKUP base_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = START_BACKUP;
+ $$ = (Node *) cmd;
+ }
+ | K_SEND_FILE_LIST base_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = SEND_FILE_LIST;
+ $$ = (Node *) cmd;
+ }
+ | K_SEND_FILES_CONTENT backup_files base_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $3;
+ cmd->cmdtag = SEND_FILES_CONTENT;
+ cmd->backupfiles = $2;
+ $$ = (Node *) cmd;
+ }
+ | K_STOP_BACKUP base_backup_opt_list
+ {
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = STOP_BACKUP;
$$ = (Node *) cmd;
}
;
@@ -214,6 +252,40 @@ base_backup_opt:
$$ = makeDefElem("noverify_checksums",
(Node *)makeInteger(true), -1);
}
+ | K_EXCLUSIVE
+ {
+ $$ = makeDefElem("exclusive",
+ (Node *)makeInteger(true), -1);
+ }
+ | K_LSN SCONST
+ {
+ $$ = makeDefElem("lsn",
+ (Node *)makeString($2), -1);
+ }
+ ;
+
+backup_files:
+ '(' backup_files_list ')'
+ {
+ $$ = $2;
+ }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+backup_files_list:
+ backup_file
+ {
+ $$ = list_make1($1);
+ }
+ | backup_files_list ',' backup_file
+ {
+ $$ = lappend($1, $3);
+ }
+ ;
+
+backup_file:
+ SCONST { $$ = (Node *) makeString($1); }
;
create_replication_slot:
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb5f6..f97fe804ff 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,13 @@ EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
WAIT { return K_WAIT; }
+START_BACKUP { return K_START_BACKUP; }
+SEND_FILE_LIST { return K_SEND_FILE_LIST; }
+SEND_FILES_CONTENT { return K_SEND_FILES_CONTENT; }
+STOP_BACKUP { return K_STOP_BACKUP; }
+EXCLUSIVE { return K_EXCLUSIVE; }
+LSN { return K_LSN; }
+
"," { return ','; }
";" { return ';'; }
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 1e3ed4e19f..1a224122a2 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,14 @@ typedef enum ReplicationKind
REPLICATION_KIND_LOGICAL
} ReplicationKind;
+typedef enum BackupCmdTag
+{
+ BASE_BACKUP,
+ START_BACKUP,
+ SEND_FILE_LIST,
+ SEND_FILES_CONTENT,
+ STOP_BACKUP
+} BackupCmdTag;
/* ----------------------
* IDENTIFY_SYSTEM command
@@ -42,6 +50,8 @@ typedef struct BaseBackupCmd
{
NodeTag type;
List *options;
+ BackupCmdTag cmdtag;
+ List *backupfiles;
} BaseBackupCmd;
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index 503a5b9f0b..9e792af99d 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,6 @@ typedef struct
extern void SendBaseBackup(BaseBackupCmd *cmd);
-extern int64 sendTablespace(char *path, bool sizeonly);
+extern int64 sendTablespace(char *path, bool sizeonly, List **files);
#endif /* _BASEBACKUP_H */
--
2.21.0 (Apple Git-122)