0004-backend-changes-for-parallel-backup_v6.patch

application/octet-stream

Filename: 0004-backend-changes-for-parallel-backup_v6.patch
Type: application/octet-stream
Part: 4
Message: Re: WIP/PoC for parallel backup

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v6-0004
Subject: backend changes for parallel backup
File+
src/backend/access/transam/xlog.c 1 1
src/backend/replication/basebackup.c 527 14
src/backend/replication/repl_gram.y 172 34
src/backend/replication/repl_scanner.l 6 0
src/include/nodes/replnodes.h 10 0
src/include/replication/basebackup.h 1 1
From e49af53b950e2cfa5a38ad4a5db21651f1374c59 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Sun, 13 Oct 2019 22:59:28 +0500
Subject: [PATCH 4/7] backend changes for parallel backup

---
 src/backend/access/transam/xlog.c      |   2 +-
 src/backend/replication/basebackup.c   | 541 ++++++++++++++++++++++++-
 src/backend/replication/repl_gram.y    | 206 ++++++++--
 src/backend/replication/repl_scanner.l |   6 +
 src/include/nodes/replnodes.h          |  10 +
 src/include/replication/basebackup.h   |   2 +-
 6 files changed, 717 insertions(+), 50 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 451fe6c0d1..445aad291e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -12279,7 +12279,7 @@ collectTablespaces(List **tablespaces, StringInfo tblspcmapfile,
 		ti->oid = pstrdup(de->d_name);
 		ti->path = pstrdup(buflinkpath.data);
 		ti->rpath = relpath ? pstrdup(relpath) : NULL;
-		ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+		ti->size = infotbssize ? sendTablespace(fullpath, true, NULL) : -1;
 
 		if (tablespaces)
 			*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index b679f36021..e55b156092 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -41,6 +41,7 @@
 #include "utils/ps_status.h"
 #include "utils/relcache.h"
 #include "utils/timestamp.h"
+#include "utils/pg_lsn.h"
 
 typedef struct
 {
@@ -51,11 +52,20 @@ typedef struct
 	bool		includewal;
 	uint32		maxrate;
 	bool		sendtblspcmapfile;
+	XLogRecPtr	wal_location;
 } basebackup_options;
 
+typedef struct
+{
+	char		path[MAXPGPATH];
+	char		type;
+	int32		size;
+	time_t		mtime;
+} BackupFile;
+
 
 static int64 sendDir(const char *path, int basepathlen, bool dryrun,
-					 List *tablespaces, bool sendtblspclinks);
+					 List *tablespaces, bool sendtblspclinks, List **filelist);
 static bool sendFile(const char *readfilename, const char *tarfilename,
 					 struct stat *statbuf, bool missing_ok, Oid dboid);
 static void sendFileWithContent(const char *filename, const char *content);
@@ -75,6 +85,13 @@ static void throttle(size_t increment);
 static void setup_throttle(int maxrate);
 static bool is_checksummed_file(const char *fullpath, const char *filename);
 
+static void StartBackup(basebackup_options *opt);
+static void StopBackup(basebackup_options *opt);
+static void SendBackupFileList(void);
+static void SendBackupFiles(basebackup_options *opt, List *filenames, bool missing_ok);
+static void addToBackupFileList(List **filelist, char *path, char type, int32 size,
+								time_t mtime);
+
 /* Was the backup currently in-progress initiated in recovery mode? */
 static bool backup_started_in_recovery = false;
 
@@ -289,7 +306,7 @@ perform_base_backup(basebackup_options *opt)
 
 	/* Add a node for the base directory at the end */
 	ti = palloc0(sizeof(tablespaceinfo));
-	ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+	ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
 	tablespaces = lappend(tablespaces, ti);
 
 	/* Send tablespace header */
@@ -323,10 +340,10 @@ perform_base_backup(basebackup_options *opt)
 			if (tblspc_map_file && opt->sendtblspcmapfile)
 			{
 				sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data);
-				sendDir(".", 1, false, tablespaces, false);
+				sendDir(".", 1, false, tablespaces, false, NULL);
 			}
 			else
-				sendDir(".", 1, false, tablespaces, true);
+				sendDir(".", 1, false, tablespaces, true, NULL);
 
 			/* ... and pg_control after everything else. */
 			if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -337,7 +354,7 @@ perform_base_backup(basebackup_options *opt)
 			sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
 		}
 		else
-			sendTablespace(ti->path, false);
+			sendTablespace(ti->path, false, NULL);
 
 		/*
 		 * If we're including WAL, and this is the main data directory we
@@ -409,6 +426,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_maxrate = false;
 	bool		o_tablespace_map = false;
 	bool		o_noverify_checksums = false;
+	bool		o_wal_location = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	foreach(lopt, options)
@@ -497,12 +515,24 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			noverify_checksums = true;
 			o_noverify_checksums = true;
 		}
+		else if (strcmp(defel->defname, "start_wal_location") == 0)
+		{
+			bool		have_error = false;
+			char	   *wal_location;
+
+			if (o_wal_location)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+
+			wal_location = strVal(defel->arg);
+			opt->wal_location = pg_lsn_in_internal(wal_location, &have_error);
+			o_wal_location = true;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
 	}
-	if (opt->label == NULL)
-		opt->label = "base backup";
 }
 
 
@@ -520,6 +550,15 @@ SendBaseBackup(BaseBackupCmd *cmd)
 
 	parse_basebackup_options(cmd->options, &opt);
 
+	/* default value for label, if not specified. */
+	if (opt.label == NULL)
+	{
+		if (cmd->cmdtag == BASE_BACKUP)
+			opt.label = "base backup";
+		else
+			opt.label = "start backup";
+	}
+
 	WalSndSetState(WALSNDSTATE_BACKUP);
 
 	if (update_process_title)
@@ -531,7 +570,29 @@ SendBaseBackup(BaseBackupCmd *cmd)
 		set_ps_display(activitymsg, false);
 	}
 
-	perform_base_backup(&opt);
+	switch (cmd->cmdtag)
+	{
+		case BASE_BACKUP:
+			perform_base_backup(&opt);
+			break;
+		case START_BACKUP:
+			StartBackup(&opt);
+			break;
+		case SEND_BACKUP_FILELIST:
+			SendBackupFileList();
+			break;
+		case SEND_BACKUP_FILES:
+			SendBackupFiles(&opt, cmd->backupfiles, true);
+			break;
+		case STOP_BACKUP:
+			StopBackup(&opt);
+			break;
+
+		default:
+			elog(ERROR, "unrecognized replication command tag: %u",
+				 cmd->cmdtag);
+			break;
+	}
 }
 
 static void
@@ -674,6 +735,61 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 	pq_puttextmessage('C', "SELECT");
 }
 
+/*
+ * Send a single resultset containing backup label and tablespace map
+ */
+static void
+SendStartBackupResult(StringInfo labelfile, StringInfo tblspc_map_file)
+{
+	StringInfoData buf;
+	Size		len;
+
+	pq_beginmessage(&buf, 'T'); /* RowDescription */
+	pq_sendint16(&buf, 2);		/* 2 fields */
+
+	/* Field headers */
+	pq_sendstring(&buf, "label");
+	pq_sendint32(&buf, 0);		/* table oid */
+	pq_sendint16(&buf, 0);		/* attnum */
+	pq_sendint32(&buf, TEXTOID);	/* type oid */
+	pq_sendint16(&buf, -1);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+
+	pq_sendstring(&buf, "tablespacemap");
+	pq_sendint32(&buf, 0);		/* table oid */
+	pq_sendint16(&buf, 0);		/* attnum */
+	pq_sendint32(&buf, TEXTOID);	/* type oid */
+	pq_sendint16(&buf, -1);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+	pq_endmessage(&buf);
+
+	/* Data row */
+	pq_beginmessage(&buf, 'D');
+	pq_sendint16(&buf, 2);		/* number of columns */
+
+	len = labelfile->len;
+	pq_sendint32(&buf, len);
+	pq_sendbytes(&buf, labelfile->data, len);
+
+	if (tblspc_map_file)
+	{
+		len = tblspc_map_file->len;
+		pq_sendint32(&buf, len);
+		pq_sendbytes(&buf, tblspc_map_file->data, len);
+	}
+	else
+	{
+		pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
+	}
+
+	pq_endmessage(&buf);
+
+	/* Send a CommandComplete message */
+	pq_puttextmessage('C', "SELECT");
+}
+
 /*
  * Inject a file with given name and content in the output tar stream.
  */
@@ -725,7 +841,7 @@ sendFileWithContent(const char *filename, const char *content)
  * Only used to send auxiliary tablespaces, not PGDATA.
  */
 int64
-sendTablespace(char *path, bool dryrun)
+sendTablespace(char *path, bool dryrun, List **filelist)
 {
 	int64		size;
 	char		pathbuf[MAXPGPATH];
@@ -754,11 +870,11 @@ sendTablespace(char *path, bool dryrun)
 		return 0;
 	}
 
+	addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 	size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
 						   dryrun);
-
 	/* Send all the files in the tablespace version directory */
-	size += sendDir(pathbuf, strlen(path), dryrun, NIL, true);
+	size += sendDir(pathbuf, strlen(path), dryrun, NIL, true, filelist);
 
 	return size;
 }
@@ -777,7 +893,7 @@ sendTablespace(char *path, bool dryrun)
  */
 static int64
 sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
-		bool sendtblspclinks)
+		bool sendtblspclinks, List **filelist)
 {
 	DIR		   *dir;
 	struct dirent *de;
@@ -931,6 +1047,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 			if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
 			{
 				elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+				addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 				size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
 				excludeFound = true;
 				break;
@@ -947,6 +1065,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 		if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
 		{
 			elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
+
+			addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 			size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
 			continue;
 		}
@@ -968,6 +1088,10 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 			size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
 									dryrun);
 
+			addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
+			addToBackupFileList(filelist, "./pg_wal/archive_status", 'd', -1,
+								statbuf.st_mtime);
+
 			continue;			/* don't recurse into pg_wal */
 		}
 
@@ -997,6 +1121,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 								pathbuf)));
 			linkpath[rllen] = '\0';
 
+			addToBackupFileList(filelist, pathbuf, 'l', statbuf.st_size, statbuf.st_mtime);
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
 									&statbuf, dryrun);
 #else
@@ -1023,6 +1148,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 			 */
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
 									dryrun);
+			addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 
 			/*
 			 * Call ourselves recursively for a directory, unless it happens
@@ -1053,13 +1179,15 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 				skip_this_dir = true;
 
 			if (!skip_this_dir)
-				size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks);
+				size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks, filelist);
 		}
 		else if (S_ISREG(statbuf.st_mode))
 		{
 			bool		sent = false;
 
-			if (!dryrun)
+			addToBackupFileList(filelist, pathbuf, 'f', statbuf.st_size, statbuf.st_mtime);
+
+			if (!dryrun && filelist == NULL)
 				sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
 								true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
 
@@ -1764,3 +1892,388 @@ setup_throttle(int maxrate)
 		throttling_counter = -1;
 	}
 }
+
+/*
+ * StartBackup - prepare to start an online backup.
+ *
+ * This function calls do_pg_start_backup() and sends back starting checkpoint,
+ * available tablespaces, content of backup_label and tablespace_map files.
+ */
+static void
+StartBackup(basebackup_options *opt)
+{
+	TimeLineID	starttli;
+	StringInfo	labelfile;
+	StringInfo	tblspc_map_file = NULL;
+	int			datadirpathlen;
+	List	   *tablespaces = NIL;
+	tablespaceinfo *ti;
+
+	datadirpathlen = strlen(DataDir);
+
+	backup_started_in_recovery = RecoveryInProgress();
+
+	labelfile = makeStringInfo();
+	tblspc_map_file = makeStringInfo();
+
+	total_checksum_failures = 0;
+
+	startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+								  labelfile, &tablespaces,
+								  tblspc_map_file,
+								  opt->progress, opt->sendtblspcmapfile);
+
+	/*
+	 * Once do_pg_start_backup has been called, ensure that any failure causes
+	 * us to abort the backup so we don't "leak" a backup counter. For this
+	 * reason, register base_backup_cleanup with before_shmem_exit handler. This
+	 * will make sure that call is always made when process exits. In success,
+	 * do_pg_stop_backup will have taken the system out of backup mode and this
+	 * callback will have no effect, Otherwise the required cleanup will be done
+	 * in any case.
+	 */
+	before_shmem_exit(base_backup_cleanup, (Datum) 0);
+
+	SendXlogRecPtrResult(startptr, starttli);
+
+	/*
+	 * Calculate the relative path of temporary statistics directory in
+	 * order to skip the files which are located in that directory later.
+	 */
+	if (is_absolute_path(pgstat_stat_directory) &&
+		strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
+		statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
+	else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
+		statrelpath = psprintf("./%s", pgstat_stat_directory);
+	else
+		statrelpath = pgstat_stat_directory;
+
+	/* Add a node for the base directory at the end */
+	ti = palloc0(sizeof(tablespaceinfo));
+	ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
+	tablespaces = lappend(tablespaces, ti);
+
+	/* Send tablespace header */
+	SendBackupHeader(tablespaces);
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	if ((tblspc_map_file && tblspc_map_file->len <= 0) ||
+		!opt->sendtblspcmapfile)
+		tblspc_map_file = NULL;
+
+	/* send backup_label and tablespace_map to frontend */
+	SendStartBackupResult(labelfile, tblspc_map_file);
+}
+
+/*
+ * StopBackup() - ends an online backup
+ *
+ * The function is called at the end of an online backup. It sends out pg_control
+ * file, optionaly WAL segments and ending WAL location.
+ */
+static void
+StopBackup(basebackup_options *opt)
+{
+	TimeLineID	endtli;
+	XLogRecPtr	endptr;
+	struct stat statbuf;
+	StringInfoData buf;
+	char	   *labelfile = NULL;
+
+	if (get_backup_status() != SESSION_BACKUP_NON_EXCLUSIVE)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("non-exclusive backup is not in progress")));
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	/* Send CopyOutResponse message */
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint16(&buf, 0);		/* natts */
+	pq_endmessage(&buf);
+
+	/* ... and pg_control after everything else. */
+	if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						XLOG_CONTROL_FILE)));
+	sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+
+	/* stop backup */
+	labelfile = (char *) opt->label;
+	endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
+
+	if (opt->includewal)
+		include_wal_files(endptr);
+
+	pq_putemptymessage('c');	/* CopyDone */
+	SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * SendBackupFileList() - sends a list of filenames to frontend
+ *
+ * The function collects a list of filenames, necessary for a complete backup and
+ * sends this list to the client.
+ */
+static void
+SendBackupFileList(void)
+{
+	StringInfoData buf;
+	ListCell   *lc;
+	List	   *tablespaces = NIL;
+	StringInfo	tblspc_map_file = NULL;
+	tablespaceinfo *ti;
+
+	tblspc_map_file = makeStringInfo();
+	collectTablespaces(&tablespaces, tblspc_map_file, false, false);
+
+	/* Add a node for the base directory at the end */
+	ti = palloc0(sizeof(tablespaceinfo));
+	tablespaces = lappend(tablespaces, ti);
+
+	foreach(lc, tablespaces)
+	{
+		List	   *filelist = NULL;
+		tablespaceinfo *ti;
+
+		ti = (tablespaceinfo *) lfirst(lc);
+		if (ti->path == NULL)
+			sendDir(".", 1, true, NIL, true, &filelist);
+		else
+			sendTablespace(ti->path, true, &filelist);
+
+		/* Construct and send the list of filenames */
+		pq_beginmessage(&buf, 'T'); /* RowDescription */
+		pq_sendint16(&buf, 4);	/* n field */
+
+		/* First field - file name */
+		pq_sendstring(&buf, "path");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, TEXTOID);
+		pq_sendint16(&buf, -1);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Second field - is_dir */
+		pq_sendstring(&buf, "type");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, CHAROID);
+		pq_sendint16(&buf, 1);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Third field - size */
+		pq_sendstring(&buf, "size");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, INT8OID);
+		pq_sendint16(&buf, 8);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Third field - mtime */
+		pq_sendstring(&buf, "mtime");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, INT8OID);
+		pq_sendint16(&buf, 8);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_endmessage(&buf);
+
+		foreach(lc, filelist)
+		{
+			BackupFile *backupFile = (BackupFile *) lfirst(lc);
+			Size		len;
+
+			/* Send one datarow message */
+			pq_beginmessage(&buf, 'D');
+			pq_sendint16(&buf, 4);	/* number of columns */
+
+			/* send path */
+			len = strlen(backupFile->path);
+			pq_sendint32(&buf, len);
+			pq_sendbytes(&buf, backupFile->path, len);
+
+			/* send type */
+			pq_sendint32(&buf, 1);
+			pq_sendbyte(&buf, backupFile->type);
+
+			/* send size */
+			send_int8_string(&buf, backupFile->size);
+
+			/* send mtime */
+			send_int8_string(&buf, backupFile->mtime);
+
+			pq_endmessage(&buf);
+		}
+
+		if (filelist)
+			pfree(filelist);
+	}
+
+	/* Send a CommandComplete message */
+	pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * SendBackupFiles() - sends the actual files to the caller
+ *
+ * The function sends out the given file(s) over to the caller using the COPY
+ * protocol.
+ */
+static void
+SendBackupFiles(basebackup_options *opt, List *filenames, bool missing_ok)
+{
+	StringInfoData buf;
+	ListCell   *lc;
+	int			basepathlen = 1;
+
+	if (list_length(filenames) <= 0)
+		return;
+
+	total_checksum_failures = 0;
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	/* set backup start location. */
+	startptr = opt->wal_location;
+
+	/* Send CopyOutResponse message */
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint16(&buf, 0);		/* natts */
+	pq_endmessage(&buf);
+
+	foreach(lc, filenames)
+	{
+		struct stat statbuf;
+		char	   *pathbuf;
+
+		pathbuf = (char *) strVal(lfirst(lc));
+		if (is_absolute_path(pathbuf))
+		{
+			char *basepath;
+
+			/*
+			 * 'pathbuf' points to the tablespace location, but we only want to
+			 * include the version directory in it that belongs to us.
+			 */
+			basepath = strstr(pathbuf, TABLESPACE_VERSION_DIRECTORY);
+			if (basepath)
+				basepathlen  = basepath - pathbuf - 1;
+		}
+
+		if (lstat(pathbuf, &statbuf) != 0)
+		{
+			if (errno != ENOENT)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not stat file or directory \"%s\": %m",
+								pathbuf)));
+
+			/* If the file went away while scanning, it's not an error. */
+			continue;
+		}
+
+		/* Allow symbolic links in pg_tblspc only */
+		if (strstr(pathbuf, "./pg_tblspc") != NULL &&
+#ifndef WIN32
+			S_ISLNK(statbuf.st_mode)
+#else
+			pgwin32_is_junction(pathbuf)
+#endif
+			)
+		{
+			char		linkpath[MAXPGPATH];
+			int			rllen;
+
+			rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
+			if (rllen < 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not read symbolic link \"%s\": %m",
+								pathbuf)));
+			if (rllen >= sizeof(linkpath))
+				ereport(ERROR,
+						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+						 errmsg("symbolic link \"%s\" target is too long",
+								pathbuf)));
+			linkpath[rllen] = '\0';
+
+			_tarWriteHeader(pathbuf, linkpath, &statbuf, false);
+		}
+		else if (S_ISDIR(statbuf.st_mode))
+		{
+			_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+		}
+		else if (
+#ifndef WIN32
+				 S_ISLNK(statbuf.st_mode)
+#else
+				 pgwin32_is_junction(pathbuf)
+#endif
+			)
+		{
+			/*
+			 * If symlink, write it as a directory. file symlinks only allowed
+			 * in pg_tblspc
+			 */
+			statbuf.st_mode = S_IFDIR | pg_dir_create_mode;
+			_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+		}
+		else
+		{
+			/* send file to client */
+			sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, InvalidOid);
+		}
+	}
+
+	pq_putemptymessage('c');	/* CopyDone */
+
+	/*
+	 * Check for checksum failures. If there are failures across multiple
+	 * processes it may not report total checksum count, but it will error
+	 * out,terminating the backup.
+	 */
+	if (total_checksum_failures)
+	{
+		if (total_checksum_failures > 1)
+			ereport(WARNING,
+					(errmsg("%lld total checksum verification failures", total_checksum_failures)));
+
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("checksum verification failure during base backup")));
+	}
+}
+
+/*
+ * Construct a BackupFile entry and add to the list.
+ */
+static void
+addToBackupFileList(List **filelist,  char *path, char type, int32 size,
+					time_t mtime)
+{
+	BackupFile *backupFile;
+
+	if (filelist)
+	{
+		backupFile = (BackupFile *) palloc0(sizeof(BackupFile));
+		strlcpy(backupFile->path, path, sizeof(backupFile->path));
+		backupFile->type = type;
+		backupFile->size = size;
+		backupFile->mtime = mtime;
+
+		*filelist = lappend(*filelist, backupFile);
+	}
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc4e8..225c35efdb 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,13 +87,24 @@ static SQLCmd *make_sqlcmd(void);
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_START_BACKUP
+%token K_SEND_BACKUP_FILELIST
+%token K_SEND_BACKUP_FILES
+%token K_STOP_BACKUP
+%token K_START_WAL_LOCATION
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
 				timeline_history show sql_cmd
 %type <list>	base_backup_opt_list
+				start_backup_opt_list stop_backup_opt_list
+				send_backup_files_opt_list
 %type <defelt>	base_backup_opt
+				backup_opt_label backup_opt_progress backup_opt_maxrate
+				backup_opt_fast backup_opt_tsmap backup_opt_wal backup_opt_nowait
+				backup_opt_chksum backup_opt_wal_loc
+				start_backup_opt stop_backup_opt send_backup_files_opt
 %type <uintval>	opt_timeline
 %type <list>	plugin_options plugin_opt_list
 %type <defelt>	plugin_opt_elem
@@ -102,6 +113,8 @@ static SQLCmd *make_sqlcmd(void);
 %type <boolval>	opt_temporary
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
+%type <list>	backup_files backup_files_list
+%type <node>	backup_file
 
 %%
 
@@ -162,10 +175,61 @@ base_backup:
 				{
 					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
 					cmd->options = $2;
+					cmd->cmdtag = BASE_BACKUP;
+					$$ = (Node *) cmd;
+				}
+			| K_START_BACKUP start_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = START_BACKUP;
+					$$ = (Node *) cmd;
+				}
+			| K_SEND_BACKUP_FILELIST
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = NIL;
+					cmd->cmdtag = SEND_BACKUP_FILELIST;
+					$$ = (Node *) cmd;
+				}
+			| K_SEND_BACKUP_FILES backup_files send_backup_files_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $3;
+					cmd->cmdtag = SEND_BACKUP_FILES;
+					cmd->backupfiles = $2;
+					$$ = (Node *) cmd;
+				}
+			| K_STOP_BACKUP stop_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = STOP_BACKUP;
 					$$ = (Node *) cmd;
 				}
 			;
 
+start_backup_opt_list:
+	start_backup_opt_list start_backup_opt
+		{ $$ = lappend($1, $2); }
+	| /* EMPTY */
+		{ $$ = NIL; }
+			;
+
+stop_backup_opt_list:
+	stop_backup_opt_list stop_backup_opt
+		{ $$ = lappend($1, $2); }
+	| /* EMPTY */
+		{ $$ = NIL; }
+	;
+
+send_backup_files_opt_list:
+	send_backup_files_opt_list send_backup_files_opt
+		{ $$ = lappend($1, $2); }
+	| /* EMPTY */
+		{ $$ = NIL; }
+	;
+
 base_backup_opt_list:
 			base_backup_opt_list base_backup_opt
 				{ $$ = lappend($1, $2); }
@@ -173,49 +237,123 @@ base_backup_opt_list:
 				{ $$ = NIL; }
 			;
 
+start_backup_opt:
+		backup_opt_label		{ $$ = $1; }
+		| backup_opt_progress	{ $$ = $1; }
+		| backup_opt_fast		{ $$ = $1; }
+		| backup_opt_tsmap		{ $$ = $1; }
+			;
+
+stop_backup_opt:
+		backup_opt_label		{ $$ = $1; }
+		| backup_opt_wal 		{ $$ = $1; }
+		| backup_opt_nowait		{ $$ = $1; }
+			;
+
+send_backup_files_opt:
+			backup_opt_maxrate		{ $$ = $1; }
+			| backup_opt_chksum		{ $$ = $1; }
+			| backup_opt_wal_loc	{ $$ = $1; }
+			;
+
 base_backup_opt:
-			K_LABEL SCONST
-				{
-				  $$ = makeDefElem("label",
-								   (Node *)makeString($2), -1);
-				}
-			| K_PROGRESS
-				{
-				  $$ = makeDefElem("progress",
-								   (Node *)makeInteger(true), -1);
-				}
-			| K_FAST
-				{
-				  $$ = makeDefElem("fast",
-								   (Node *)makeInteger(true), -1);
-				}
-			| K_WAL
-				{
-				  $$ = makeDefElem("wal",
-								   (Node *)makeInteger(true), -1);
-				}
-			| K_NOWAIT
-				{
-				  $$ = makeDefElem("nowait",
-								   (Node *)makeInteger(true), -1);
-				}
-			| K_MAX_RATE UCONST
+			backup_opt_label		{ $$ = $1; }
+			| backup_opt_progress	{ $$ = $1; }
+			| backup_opt_fast		{ $$ = $1; }
+			| backup_opt_wal 		{ $$ = $1; }
+			| backup_opt_nowait		{ $$ = $1; }
+			| backup_opt_maxrate	{ $$ = $1; }
+			| backup_opt_tsmap		{ $$ = $1; }
+			| backup_opt_chksum		{ $$ = $1; }
+			;
+
+backup_opt_label:
+	K_LABEL SCONST
+	{
+	  $$ = makeDefElem("label",
+					   (Node *)makeString($2), -1);
+	};
+
+backup_opt_progress:
+	K_PROGRESS
+	{
+	  $$ = makeDefElem("progress",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_fast:
+	K_FAST
+	{
+	  $$ = makeDefElem("fast",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_wal:
+	K_WAL
+	{
+	  $$ = makeDefElem("wal",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_nowait:
+	K_NOWAIT
+	{
+	  $$ = makeDefElem("nowait",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_maxrate:
+	K_MAX_RATE UCONST
+	{
+	  $$ = makeDefElem("max_rate",
+					   (Node *)makeInteger($2), -1);
+	};
+
+backup_opt_tsmap:
+	K_TABLESPACE_MAP
+	{
+	  $$ = makeDefElem("tablespace_map",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_chksum:
+	K_NOVERIFY_CHECKSUMS
+	{
+	  $$ = makeDefElem("noverify_checksums",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_wal_loc:
+	K_START_WAL_LOCATION SCONST
+	{
+	  $$ = makeDefElem("start_wal_location",
+					   (Node *)makeString($2), -1);
+	};
+
+backup_files:
+			'(' backup_files_list ')'
 				{
-				  $$ = makeDefElem("max_rate",
-								   (Node *)makeInteger($2), -1);
+					$$ = $2;
 				}
-			| K_TABLESPACE_MAP
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+backup_files_list:
+			backup_file
 				{
-				  $$ = makeDefElem("tablespace_map",
-								   (Node *)makeInteger(true), -1);
+					$$ = list_make1($1);
 				}
-			| K_NOVERIFY_CHECKSUMS
+			| backup_files_list ',' backup_file
 				{
-				  $$ = makeDefElem("noverify_checksums",
-								   (Node *)makeInteger(true), -1);
+					$$ = lappend($1, $3);
 				}
 			;
 
+backup_file:
+			SCONST							{ $$ = (Node *) makeString($1); }
+			;
+
 create_replication_slot:
 			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
 			K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb5f6..0a88639239 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,12 @@ EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
 WAIT				{ return K_WAIT; }
+START_BACKUP		{ return K_START_BACKUP; }
+SEND_BACKUP_FILELIST	{ return K_SEND_BACKUP_FILELIST; }
+SEND_BACKUP_FILES	{ return K_SEND_BACKUP_FILES; }
+STOP_BACKUP			{ return K_STOP_BACKUP; }
+START_WAL_LOCATION	{ return K_START_WAL_LOCATION; }
+
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 1e3ed4e19f..3685f260b5 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,14 @@ typedef enum ReplicationKind
 	REPLICATION_KIND_LOGICAL
 } ReplicationKind;
 
+typedef enum BackupCmdTag
+{
+	BASE_BACKUP,
+	START_BACKUP,
+	SEND_BACKUP_FILELIST,
+	SEND_BACKUP_FILES,
+	STOP_BACKUP
+} BackupCmdTag;
 
 /* ----------------------
  *		IDENTIFY_SYSTEM command
@@ -42,6 +50,8 @@ typedef struct BaseBackupCmd
 {
 	NodeTag		type;
 	List	   *options;
+	BackupCmdTag cmdtag;
+	List	   *backupfiles;
 } BaseBackupCmd;
 
 
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index b55917b9b6..5202e4160b 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,6 @@ typedef struct
 
 extern void SendBaseBackup(BaseBackupCmd *cmd);
 
-extern int64 sendTablespace(char *path, bool dryrun);
+extern int64 sendTablespace(char *path, bool dryrun, List **filelist);
 
 #endif							/* _BASEBACKUP_H */
-- 
2.21.0 (Apple Git-122.2)