0003-Parallel-Backup-Backend-Replication-commands_v9.patch
application/octet-stream
Filename: 0003-Parallel-Backup-Backend-Replication-commands_v9.patch
Type: application/octet-stream
Part: 3
Message:
Re: WIP/PoC for parallel backup
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v9-0003
Subject: Parallel Backup - Backend Replication commands
| File | + | − |
|---|---|---|
| src/backend/access/transam/xlog.c | 2 | 2 |
| src/backend/replication/basebackup.c | 507 | 22 |
| src/backend/replication/repl_gram.y | 221 | 44 |
| src/backend/replication/repl_scanner.l | 8 | 0 |
| src/include/nodes/replnodes.h | 12 | 0 |
| src/include/replication/basebackup.h | 1 | 1 |
From ab91e2c9078bfe42fb9306314304c558a41b7632 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Mon, 27 Jan 2020 18:32:42 +0500
Subject: [PATCH 3/6] Parallel Backup - Backend Replication commands
This feature adds following replication commands to the backend replication
system, to help facilitate taking a full backup in parallel using multiple
connections.
- START_BACKUP [LABEL '<label>'] [FAST]
This command instructs the server to get prepared for performing an
online backup.
- STOP_BACKUP [NOWAIT]
This command instructs the server that online backup is finished. It
will bring the system out of backup mode.
- LIST_TABLESPACES [PROGRESS]
This command instructs the server to return a list of tablespaces.
- LIST_FILES [TABLESPACE]
This command instructs the server to return a list of files for a
given tablespace, base tablespace if TABLESPACE is empty.
- LIST_WAL_FILES [START_WAL_LOCATION 'X/X'] [END_WAL_LOCATION 'X/X']
This command instructs the server to return a list WAL files between
the given locations.
- SEND_FILES '(' FILE, FILE... ')' [START_WAL_LOCATION 'X/X']
[NOVERIFY_CHECKSUMS]
Instructs the server to send the contents of the requested FILE(s).
---
src/backend/access/transam/xlog.c | 4 +-
src/backend/replication/basebackup.c | 529 ++++++++++++++++++++++++-
src/backend/replication/repl_gram.y | 265 +++++++++++--
src/backend/replication/repl_scanner.l | 8 +
src/include/nodes/replnodes.h | 12 +
src/include/replication/basebackup.h | 2 +-
6 files changed, 751 insertions(+), 69 deletions(-)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f5670141126..4189b056c88 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -11128,7 +11128,7 @@ do_pg_abort_backup(int code, Datum arg)
if (emit_warning)
ereport(WARNING,
- (errmsg("aborting backup due to backend exiting before pg_stop_back up was called")));
+ (errmsg("aborting backup due to backend exiting while a non-exclusive backup is in progress")));
}
/*
@@ -12377,7 +12377,7 @@ collect_tablespaces(List **tablespaces, StringInfo tblspcmapfile,
ti->oid = pstrdup(de->d_name);
ti->path = pstrdup(buflinkpath.data);
ti->rpath = relpath ? pstrdup(relpath) : NULL;
- ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+ ti->size = infotbssize ? sendTablespace(fullpath, true, NULL) : -1;
if (tablespaces)
*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index abc3bad01ee..a294d77da50 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -39,6 +39,8 @@
#include "storage/ipc.h"
#include "storage/reinit.h"
#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/relcache.h"
#include "utils/timestamp.h"
@@ -52,11 +54,22 @@ typedef struct
bool includewal;
uint32 maxrate;
bool sendtblspcmapfile;
+ XLogRecPtr startwallocation;
+ XLogRecPtr endwallocation;
+ char *tablespace;
} basebackup_options;
+typedef struct
+{
+ char path[MAXPGPATH];
+ char type;
+ size_t size;
+ time_t mtime;
+} BackupFile;
+
static int64 sendDir(const char *path, int basepathlen, bool dryrun,
- List *tablespaces, bool sendtblspclinks);
+ List *tablespaces, bool sendtblspclinks, List **filelist);
static bool sendFile(const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid);
static void sendFileWithContent(const char *filename, const char *content);
@@ -70,12 +83,28 @@ static void perform_base_backup(basebackup_options *opt);
static List *collect_wal_files(XLogRecPtr startptr, XLogRecPtr endptr,
List **historyFileList);
static void parse_basebackup_options(List *options, basebackup_options *opt);
-static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
+static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli, StringInfo label);
+static void SendFilesHeader(List *files);
static int compareWalFileNames(const ListCell *a, const ListCell *b);
static void throttle(size_t increment);
static void update_basebackup_progress(int64 delta);
static bool is_checksummed_file(const char *fullpath, const char *filename);
+static void start_backup(basebackup_options *opt);
+static void stop_backup(basebackup_options *opt);
+static void list_tablespaces(basebackup_options *opt);
+static void list_files(basebackup_options *opt);
+static void list_wal_files(basebackup_options *opt);
+static void send_files(basebackup_options *opt, List *filenames,
+ bool missing_ok);
+static void add_to_filelist(List **filelist, char *path, char type,
+ size_t size, time_t mtime);
+
+/*
+ * Store label file during non-exclusive backups.
+ */
+static StringInfo label_file;
+
/* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false;
@@ -303,7 +332,7 @@ perform_base_backup(basebackup_options *opt)
/* Add a node for the base directory at the end */
ti = palloc0(sizeof(tablespaceinfo));
- ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+ ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
tablespaces = lappend(tablespaces, ti);
/*
@@ -336,7 +365,7 @@ perform_base_backup(basebackup_options *opt)
}
/* Send the starting position of the backup */
- SendXlogRecPtrResult(startptr, starttli);
+ SendXlogRecPtrResult(startptr, starttli, NULL);
/* Send tablespace header */
SendBackupHeader(tablespaces);
@@ -391,10 +420,10 @@ perform_base_backup(basebackup_options *opt)
if (tblspc_map_file && opt->sendtblspcmapfile)
{
sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data);
- sendDir(".", 1, false, tablespaces, false);
+ sendDir(".", 1, false, tablespaces, false, NULL);
}
else
- sendDir(".", 1, false, tablespaces, true);
+ sendDir(".", 1, false, tablespaces, true, NULL);
/* ... and pg_control after everything else. */
if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -405,7 +434,7 @@ perform_base_backup(basebackup_options *opt)
sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
}
else
- sendTablespace(ti->path, false);
+ sendTablespace(ti->path, false, NULL);
/*
* If we're including WAL, and this is the main data directory we
@@ -568,7 +597,7 @@ perform_base_backup(basebackup_options *opt)
/* Send CopyDone message for the last tar file */
pq_putemptymessage('c');
}
- SendXlogRecPtrResult(endptr, endtli);
+ SendXlogRecPtrResult(endptr, endtli, NULL);
if (total_checksum_failures)
{
@@ -726,6 +755,9 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_maxrate = false;
bool o_tablespace_map = false;
bool o_noverify_checksums = false;
+ bool o_startwallocation = false;
+ bool o_endwallocation = false;
+ bool o_tablespace = false;
MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
@@ -814,12 +846,47 @@ parse_basebackup_options(List *options, basebackup_options *opt)
noverify_checksums = true;
o_noverify_checksums = true;
}
+ else if (strcmp(defel->defname, "start_wal_location") == 0)
+ {
+ bool have_error = false;
+ char *startwallocation;
+
+ if (o_startwallocation)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ startwallocation = strVal(defel->arg);
+ opt->startwallocation = pg_lsn_in_internal(startwallocation, &have_error);
+ o_startwallocation = true;
+ }
+ else if (strcmp(defel->defname, "end_wal_location") == 0)
+ {
+ bool have_error = false;
+ char *endwallocation;
+
+ if (o_endwallocation)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ endwallocation = strVal(defel->arg);
+ opt->endwallocation = pg_lsn_in_internal(endwallocation, &have_error);
+ o_endwallocation = true;
+ }
+ else if (strcmp(defel->defname, "tablespace") == 0)
+ {
+ if (o_tablespace)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ opt->tablespace = strVal(defel->arg);
+ o_tablespace = true;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
- if (opt->label == NULL)
- opt->label = "base backup";
}
@@ -837,6 +904,15 @@ SendBaseBackup(BaseBackupCmd *cmd)
parse_basebackup_options(cmd->options, &opt);
+ /* default value for label, if not specified. */
+ if (opt.label == NULL)
+ {
+ if (cmd->cmdtag == BASE_BACKUP)
+ opt.label = "base backup";
+ else
+ opt.label = "start backup";
+ }
+
WalSndSetState(WALSNDSTATE_BACKUP);
if (update_process_title)
@@ -848,7 +924,34 @@ SendBaseBackup(BaseBackupCmd *cmd)
set_ps_display(activitymsg);
}
- perform_base_backup(&opt);
+ switch (cmd->cmdtag)
+ {
+ case BASE_BACKUP:
+ perform_base_backup(&opt);
+ break;
+ case START_BACKUP:
+ start_backup(&opt);
+ break;
+ case LIST_TABLESPACES:
+ list_tablespaces(&opt);
+ break;
+ case LIST_FILES:
+ list_files(&opt);
+ break;
+ case SEND_FILES:
+ send_files(&opt, cmd->backupfiles, true);
+ break;
+ case STOP_BACKUP:
+ stop_backup(&opt);
+ break;
+ case LIST_WAL_FILES:
+ list_wal_files(&opt);
+ break;
+ default:
+ elog(ERROR, "unrecognized replication command tag: %u",
+ cmd->cmdtag);
+ break;
+ }
}
static void
@@ -936,18 +1039,18 @@ SendBackupHeader(List *tablespaces)
}
/*
- * Send a single resultset containing just a single
- * XLogRecPtr record (in text format)
+ * Send a single resultset containing XLogRecPtr record (in text format)
+ * TimelineID and backup label.
*/
static void
-SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
+SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli, StringInfo label)
{
StringInfoData buf;
char str[MAXFNAMELEN];
Size len;
pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint16(&buf, 2); /* 2 fields */
+ pq_sendint16(&buf, 3); /* 3 fields */
/* Field headers */
pq_sendstring(&buf, "recptr");
@@ -970,11 +1073,19 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
pq_sendint16(&buf, -1);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
+
+ pq_sendstring(&buf, "label");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, TEXTOID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
pq_endmessage(&buf);
/* Data row */
pq_beginmessage(&buf, 'D');
- pq_sendint16(&buf, 2); /* number of columns */
+ pq_sendint16(&buf, 3); /* number of columns */
len = snprintf(str, sizeof(str),
"%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
@@ -985,12 +1096,109 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
pq_sendint32(&buf, len);
pq_sendbytes(&buf, str, len);
+ if (label)
+ {
+ len = label->len;
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, label->data, len);
+ }
+ else
+ {
+ pq_sendint32(&buf, -1); /* NULL */
+ }
+
pq_endmessage(&buf);
/* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT");
}
+
+/*
+ * Sends the resultset containing filename, type (where type can be f' for
+ * regular, 'd' for directory, 'l' for link), file size and modification time).
+ */
+static void
+SendFilesHeader(List *files)
+{
+ StringInfoData buf;
+ ListCell *lc;
+
+ /* Construct and send the list of files */
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 4); /* n field */
+
+ /* First field - file name */
+ pq_sendstring(&buf, "path");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, TEXTOID);
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Second field - is_dir */
+ pq_sendstring(&buf, "type");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, CHAROID);
+ pq_sendint16(&buf, 1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - size */
+ pq_sendstring(&buf, "size");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Fourth field - mtime */
+ pq_sendstring(&buf, "mtime");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ foreach(lc, files)
+ {
+ BackupFile *file = (BackupFile *) lfirst(lc);
+ Size len;
+
+ /* Send one datarow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 4); /* number of columns */
+
+ /* send path */
+ len = strlen(file->path);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, file->path, len);
+
+ /* send type */
+ pq_sendint32(&buf, 1);
+ pq_sendbyte(&buf, file->type);
+
+ /* send size */
+ send_int8_string(&buf, file->size);
+
+ /* send mtime */
+ send_int8_string(&buf, file->mtime);
+
+ pq_endmessage(&buf);
+ }
+
+ list_free(files);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
/*
* Inject a file with given name and content in the output tar stream.
*/
@@ -1044,7 +1252,7 @@ sendFileWithContent(const char *filename, const char *content)
* Only used to send auxiliary tablespaces, not PGDATA.
*/
int64
-sendTablespace(char *path, bool dryrun)
+sendTablespace(char *path, bool dryrun, List **filelist)
{
int64 size;
char pathbuf[MAXPGPATH];
@@ -1073,11 +1281,11 @@ sendTablespace(char *path, bool dryrun)
return 0;
}
+ add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
dryrun);
-
/* Send all the files in the tablespace version directory */
- size += sendDir(pathbuf, strlen(path), dryrun, NIL, true);
+ size += sendDir(pathbuf, strlen(path), dryrun, NIL, true, filelist);
return size;
}
@@ -1096,7 +1304,7 @@ sendTablespace(char *path, bool dryrun)
*/
static int64
sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
- bool sendtblspclinks)
+ bool sendtblspclinks, List **filelist)
{
DIR *dir;
struct dirent *de;
@@ -1254,6 +1462,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+ add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
excludeFound = true;
break;
@@ -1270,6 +1480,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
+
+ add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
continue;
}
@@ -1291,6 +1503,10 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
dryrun);
+ add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
+ add_to_filelist(filelist, "./pg_wal/archive_status", 'd', -1,
+ statbuf.st_mtime);
+
continue; /* don't recurse into pg_wal */
}
@@ -1320,6 +1536,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
pathbuf)));
linkpath[rllen] = '\0';
+ add_to_filelist(filelist, pathbuf, 'l', statbuf.st_size, statbuf.st_mtime);
size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
&statbuf, dryrun);
#else
@@ -1346,6 +1563,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
*/
size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
dryrun);
+ add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
/*
* Call ourselves recursively for a directory, unless it happens
@@ -1376,13 +1594,15 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
skip_this_dir = true;
if (!skip_this_dir)
- size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks);
+ size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks, filelist);
}
else if (S_ISREG(statbuf.st_mode))
{
bool sent = false;
- if (!dryrun)
+ add_to_filelist(filelist, pathbuf, 'f', statbuf.st_size, statbuf.st_mtime);
+
+ if (!dryrun && filelist == NULL)
sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
true, isDbDir ? atooid(lastDir + 1) : InvalidOid);
@@ -1867,3 +2087,268 @@ update_basebackup_progress(int64 delta)
pgstat_progress_update_multi_param(nparam, index, val);
}
+
+/*
+ * start_backup - prepare to start an online backup.
+ *
+ * This function calls do_pg_start_backup() and sends back starting checkpoint,
+ * available tablespaces, content of backup_label and tablespace_map files.
+ */
+static void
+start_backup(basebackup_options *opt)
+{
+ TimeLineID starttli;
+ StringInfo tblspc_map_file;
+ MemoryContext oldcontext;
+
+ /* Label file need to be long-lived, since its read in stop_backup. */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ label_file = makeStringInfo();
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * tablespace map file is not used, but since this argument is required by
+ * do_pg_start_backup, we have to provide it here.
+ */
+ tblspc_map_file = makeStringInfo();
+
+ register_persistent_abort_backup_handler();
+ startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+ label_file, NULL, tblspc_map_file, false, false);
+
+ /* send startptr and starttli to frontend */
+ SendXlogRecPtrResult(startptr, starttli, NULL);
+
+ /* free tablspace map buffer. */
+ pfree(tblspc_map_file->data);
+ pfree(tblspc_map_file);
+}
+
+/*
+ * stop_backup() - ends an online backup
+ *
+ * The function is called at the end of an online backup. It sends out pg_control
+ * file, optionally WAL segments and ending WAL location.
+ */
+static void
+stop_backup(basebackup_options *opt)
+{
+ TimeLineID endtli;
+ XLogRecPtr endptr;
+
+ if (get_backup_status() != SESSION_BACKUP_NON_EXCLUSIVE)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("non-exclusive backup is not in progress")));
+
+ /*
+ * Stop the non-exclusive backup. Return a copy of the backup label so it
+ * can be written to disk by the caller.
+ */
+ endptr = do_pg_stop_backup(label_file->data, !opt->nowait, &endtli);
+ SendXlogRecPtrResult(endptr, endtli, label_file);
+
+ /* Free structures allocated in TopMemoryContext */
+ pfree(label_file->data);
+ pfree(label_file);
+ label_file = NULL;
+}
+
+/*
+ * list_tablespaces() - sends a list of tablespace entries
+ */
+static void
+list_tablespaces(basebackup_options *opt)
+{
+ StringInfo tblspc_map_file;
+ List *tablespaces = NIL;
+ tablespaceinfo *ti;
+
+ tblspc_map_file = makeStringInfo();
+ collect_tablespaces(&tablespaces, tblspc_map_file, opt->progress, false);
+
+ /* Add a node for the base directory at the end */
+ ti = palloc0(sizeof(tablespaceinfo));
+ ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
+ tablespaces = lappend(tablespaces, ti);
+
+ SendBackupHeader(tablespaces);
+ list_free(tablespaces);
+}
+
+/*
+ * list_files() - sends a list of files available in given tablespace.
+ */
+static void
+list_files(basebackup_options *opt)
+{
+ List *files = NIL;
+ int datadirpathlen;
+
+ datadirpathlen = strlen(DataDir);
+
+ /*
+ * Calculate the relative path of temporary statistics directory in order
+ * to skip the files which are located in that directory later.
+ */
+ if (is_absolute_path(pgstat_stat_directory) &&
+ strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
+ statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
+ else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
+ statrelpath = psprintf("./%s", pgstat_stat_directory);
+ else
+ statrelpath = pgstat_stat_directory;
+
+ if (strlen(opt->tablespace) > 0)
+ sendTablespace(opt->tablespace, true, &files);
+ else
+ sendDir(".", 1, true, NIL, true, &files);
+
+ SendFilesHeader(files);
+}
+
+/*
+ * list_wal_files() - sends a list of WAL files between start wal location and
+ * end wal location.
+ */
+static void
+list_wal_files(basebackup_options *opt)
+{
+ List *historyFileList = NIL;
+ List *walFileList = NIL;
+ List *files = NIL;
+ ListCell *lc;
+
+ walFileList = collect_wal_files(opt->startwallocation, opt->endwallocation,
+ &historyFileList);
+ foreach(lc, walFileList)
+ {
+ char pathbuf[MAXPGPATH];
+ char *walFileName = (char *) lfirst(lc);
+
+ snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
+ add_to_filelist(&files, pathbuf, 'f', wal_segment_size, 0);
+ }
+
+ SendFilesHeader(files);
+}
+
+/*
+ * send_files() - sends the actual files to the caller
+ *
+ * The function sends out the given file(s) over to the caller using the COPY
+ * protocol. It does only entertains the regular files and any other kind such
+ * as directories or symlink etc will be ignored.
+ */
+static void
+send_files(basebackup_options *opt, List *filenames, bool missing_ok)
+{
+ StringInfoData buf;
+ ListCell *lc;
+ int basepathlen = 0;
+
+ if (list_length(filenames) <= 0)
+ return;
+
+ total_checksum_failures = 0;
+
+ /* Disable throttling. */
+ throttling_counter = -1;
+
+ /* set backup start location. */
+ startptr = opt->startwallocation;
+
+ /* Send CopyOutResponse message */
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+
+ foreach(lc, filenames)
+ {
+ struct stat statbuf;
+ char *pathbuf;
+
+ pathbuf = (char *) strVal(lfirst(lc));
+ if (is_absolute_path(pathbuf))
+ {
+ char *basepath;
+
+ /*
+ * 'pathbuf' points to the tablespace location, but we only want
+ * to include the version directory in it that belongs to us.
+ */
+ basepath = strstr(pathbuf, TABLESPACE_VERSION_DIRECTORY);
+ if (basepath)
+ basepathlen = basepath - pathbuf - 1;
+ }
+ else if (pathbuf[0] == '.' && pathbuf[1] == '/')
+ basepathlen = 2;
+ else
+ basepathlen = 0;
+
+ if (lstat(pathbuf, &statbuf) != 0)
+ {
+ if (errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file or directory \"%s\": %m",
+ pathbuf)));
+
+ /* If the file went away while scanning, it's not an error. */
+ continue;
+ }
+
+ /*
+ * Only entertain requests for regular file, skip any directories or
+ * special files.
+ */
+ if (S_ISREG(statbuf.st_mode))
+ {
+ /* send file to client */
+ sendFile(pathbuf, pathbuf + basepathlen, &statbuf, true, InvalidOid);
+ }
+ else
+ ereport(WARNING,
+ (errmsg("skipping special file or directory \"%s\"", pathbuf)));
+ }
+
+ pq_putemptymessage('c'); /* CopyDone */
+
+ /*
+ * Check for checksum failures. If there are failures across multiple
+ * processes it may not report total checksum count, but it will error
+ * out,terminating the backup.
+ */
+ if (total_checksum_failures)
+ {
+ if (total_checksum_failures > 1)
+ ereport(WARNING,
+ (errmsg("%lld total checksum verification failures", total_checksum_failures)));
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("checksum verification failure during base backup")));
+ }
+}
+
+/*
+ * Construct a BackupFile entry and add to the list.
+ */
+static void
+add_to_filelist(List **filelist, char *path, char type, size_t size,
+ time_t mtime)
+{
+ BackupFile *file;
+
+ if (filelist)
+ {
+ file = (BackupFile *) palloc(sizeof(BackupFile));
+ strlcpy(file->path, path, sizeof(file->path));
+ file->type = type;
+ file->size = size;
+ file->mtime = mtime;
+
+ *filelist = lappend(*filelist, file);
+ }
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 14fcd532218..16e5402d55d 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,13 +87,28 @@ static SQLCmd *make_sqlcmd(void);
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
+%token K_START_BACKUP
+%token K_LIST_TABLESPACES
+%token K_LIST_FILES
+%token K_SEND_FILES
+%token K_STOP_BACKUP
+%token K_LIST_WAL_FILES
+%token K_START_WAL_LOCATION
+%token K_END_WAL_LOCATION
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot identify_system
timeline_history show sql_cmd
-%type <list> base_backup_opt_list
-%type <defelt> base_backup_opt
+%type <list> base_backup_opt_list start_backup_opt_list stop_backup_opt_list
+ list_tablespace_opt_list list_files_opt_list
+ list_wal_files_opt_list send_backup_files_opt_list
+ backup_files backup_files_list
+%type <defelt> base_backup_opt backup_opt_label backup_opt_progress
+ backup_opt_fast backup_opt_wal backup_opt_nowait
+ backup_opt_maxrate backup_opt_tsmap backup_opt_chksum
+ backup_opt_start_wal_loc backup_opt_end_wal_loc
+ backup_opt_tablespace start_backup_opt send_backup_files_opt
%type <uintval> opt_timeline
%type <list> plugin_options plugin_opt_list
%type <defelt> plugin_opt_elem
@@ -153,69 +168,231 @@ var_name: IDENT { $$ = $1; }
{ $$ = psprintf("%s.%s", $1, $3); }
;
-/*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
- * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS]
- */
base_backup:
+ /*
+ * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+ * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS]
+ */
K_BASE_BACKUP base_backup_opt_list
{
BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
cmd->options = $2;
+ cmd->cmdtag = BASE_BACKUP;
$$ = (Node *) cmd;
}
- ;
-
-base_backup_opt_list:
- base_backup_opt_list base_backup_opt
- { $$ = lappend($1, $2); }
- | /* EMPTY */
- { $$ = NIL; }
- ;
-
-base_backup_opt:
- K_LABEL SCONST
- {
- $$ = makeDefElem("label",
- (Node *)makeString($2), -1);
- }
- | K_PROGRESS
+ /* START_BACKUP [LABEL '<label>'] [FAST] */
+ | K_START_BACKUP start_backup_opt_list
{
- $$ = makeDefElem("progress",
- (Node *)makeInteger(true), -1);
- }
- | K_FAST
- {
- $$ = makeDefElem("fast",
- (Node *)makeInteger(true), -1);
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = START_BACKUP;
+ $$ = (Node *) cmd;
}
- | K_WAL
+ /* STOP_BACKUP [NOWAIT] */
+ | K_STOP_BACKUP stop_backup_opt_list
{
- $$ = makeDefElem("wal",
- (Node *)makeInteger(true), -1);
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = STOP_BACKUP;
+ $$ = (Node *) cmd;
}
- | K_NOWAIT
+ /* LIST_TABLESPACES [PROGRESS] */
+ | K_LIST_TABLESPACES list_tablespace_opt_list
{
- $$ = makeDefElem("nowait",
- (Node *)makeInteger(true), -1);
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = LIST_TABLESPACES;
+ $$ = (Node *) cmd;
}
- | K_MAX_RATE UCONST
+ /* LIST_FILES [TABLESPACE] */
+ | K_LIST_FILES list_files_opt_list
{
- $$ = makeDefElem("max_rate",
- (Node *)makeInteger($2), -1);
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = LIST_FILES;
+ $$ = (Node *) cmd;
}
- | K_TABLESPACE_MAP
+ /* LIST_WAL_FILES [START_WAL_LOCATION 'X/X'] [END_WAL_LOCATION 'X/X'] */
+ | K_LIST_WAL_FILES list_wal_files_opt_list
{
- $$ = makeDefElem("tablespace_map",
- (Node *)makeInteger(true), -1);
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $2;
+ cmd->cmdtag = LIST_WAL_FILES;
+ $$ = (Node *) cmd;
}
- | K_NOVERIFY_CHECKSUMS
+ /*
+ * SEND_FILES '(' 'FILE' [, ...] ')' [START_WAL_LOCATION 'X/X']
+ * [NOVERIFY_CHECKSUMS]
+ */
+ | K_SEND_FILES backup_files send_backup_files_opt_list
{
- $$ = makeDefElem("noverify_checksums",
- (Node *)makeInteger(true), -1);
+ BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+ cmd->options = $3;
+ cmd->cmdtag = SEND_FILES;
+ cmd->backupfiles = $2;
+ $$ = (Node *) cmd;
}
;
+base_backup_opt_list:
+ base_backup_opt_list base_backup_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+base_backup_opt:
+ backup_opt_label { $$ = $1; }
+ | backup_opt_progress { $$ = $1; }
+ | backup_opt_fast { $$ = $1; }
+ | backup_opt_wal { $$ = $1; }
+ | backup_opt_nowait { $$ = $1; }
+ | backup_opt_maxrate { $$ = $1; }
+ | backup_opt_tsmap { $$ = $1; }
+ | backup_opt_chksum { $$ = $1; }
+ ;
+
+start_backup_opt_list:
+ start_backup_opt_list start_backup_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+start_backup_opt:
+ backup_opt_label { $$ = $1; }
+ | backup_opt_fast { $$ = $1; }
+ ;
+
+stop_backup_opt_list:
+ backup_opt_nowait
+ { $$ = list_make1($1); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+list_tablespace_opt_list:
+ backup_opt_progress
+ { $$ = list_make1($1); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+list_files_opt_list:
+ backup_opt_tablespace
+ { $$ = list_make1($1); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+list_wal_files_opt_list:
+ backup_opt_start_wal_loc backup_opt_end_wal_loc
+ { $$ = list_make2($1, $2); }
+ ;
+
+send_backup_files_opt_list:
+ send_backup_files_opt_list send_backup_files_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+backup_files:
+ '(' backup_files_list ')'
+ { $$ = $2; }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+backup_files_list:
+ SCONST
+ { $$ = list_make1(makeString($1)); }
+ | backup_files_list ',' SCONST
+ { $$ = lappend($1, makeString($3)); }
+ ;
+
+send_backup_files_opt:
+ backup_opt_chksum { $$ = $1; }
+ | backup_opt_start_wal_loc { $$ = $1; }
+ ;
+
+backup_opt_label:
+ K_LABEL SCONST
+ {
+ $$ = makeDefElem("label",
+ (Node *)makeString($2), -1);
+ };
+
+backup_opt_progress:
+ K_PROGRESS
+ {
+ $$ = makeDefElem("progress",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_fast:
+ K_FAST
+ {
+ $$ = makeDefElem("fast",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_wal:
+ K_WAL
+ {
+ $$ = makeDefElem("wal",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_nowait:
+ K_NOWAIT
+ {
+ $$ = makeDefElem("nowait",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_maxrate:
+ K_MAX_RATE UCONST
+ {
+ $$ = makeDefElem("max_rate",
+ (Node *)makeInteger($2), -1);
+ };
+
+backup_opt_tsmap:
+ K_TABLESPACE_MAP
+ {
+ $$ = makeDefElem("tablespace_map",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_chksum:
+ K_NOVERIFY_CHECKSUMS
+ {
+ $$ = makeDefElem("noverify_checksums",
+ (Node *)makeInteger(true), -1);
+ };
+
+backup_opt_start_wal_loc:
+ K_START_WAL_LOCATION SCONST
+ {
+ $$ = makeDefElem("start_wal_location",
+ (Node *)makeString($2), -1);
+ };
+
+backup_opt_end_wal_loc:
+ K_END_WAL_LOCATION SCONST
+ {
+ $$ = makeDefElem("end_wal_location",
+ (Node *)makeString($2), -1);
+ };
+
+backup_opt_tablespace:
+ SCONST
+ {
+ $$ = makeDefElem("tablespace", //tblspcname?
+ (Node *)makeString($1), -1);
+ };
+
create_replication_slot:
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 14c9a1e798a..faa00cfd0ee 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,14 @@ EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
WAIT { return K_WAIT; }
+START_BACKUP { return K_START_BACKUP; }
+LIST_FILES { return K_LIST_FILES; }
+LIST_TABLESPACES { return K_LIST_TABLESPACES; }
+SEND_FILES { return K_SEND_FILES; }
+STOP_BACKUP { return K_STOP_BACKUP; }
+LIST_WAL_FILES { return K_LIST_WAL_FILES; }
+START_WAL_LOCATION { return K_START_WAL_LOCATION; }
+END_WAL_LOCATION { return K_END_WAL_LOCATION; }
"," { return ','; }
";" { return ';'; }
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 5456141a8ab..c046ea39ae9 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,16 @@ typedef enum ReplicationKind
REPLICATION_KIND_LOGICAL
} ReplicationKind;
+typedef enum BackupCmdTag
+{
+ BASE_BACKUP,
+ START_BACKUP,
+ LIST_TABLESPACES,
+ LIST_FILES,
+ LIST_WAL_FILES,
+ SEND_FILES,
+ STOP_BACKUP
+} BackupCmdTag;
/* ----------------------
* IDENTIFY_SYSTEM command
@@ -42,6 +52,8 @@ typedef struct BaseBackupCmd
{
NodeTag type;
List *options;
+ BackupCmdTag cmdtag;
+ List *backupfiles;
} BaseBackupCmd;
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index e0210def6f3..3bc85d4c3e2 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,6 @@ typedef struct
extern void SendBaseBackup(BaseBackupCmd *cmd);
-extern int64 sendTablespace(char *path, bool dryrun);
+extern int64 sendTablespace(char *path, bool dryrun, List **filelist);
#endif /* _BASEBACKUP_H */
--
2.21.1 (Apple Git-122.3)