0004-backend-changes-for-parallel-backup_v4.patch

application/octet-stream

Filename: 0004-backend-changes-for-parallel-backup_v4.patch
Type: application/octet-stream
Part: 3
Message: Re: WIP/PoC for parallel backup

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v4-0004
Subject: backend changes for parallel backup
File+
src/backend/access/transam/xlog.c 1 1
src/backend/replication/basebackup.c 514 12
src/backend/replication/repl_gram.y 72 0
src/backend/replication/repl_scanner.l 7 0
src/include/nodes/replnodes.h 10 0
src/include/replication/basebackup.h 1 1
From 42818d0ebcdfa119e27e95ed6428cc7026a38143 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Sun, 13 Oct 2019 22:59:28 +0500
Subject: [PATCH 4/6] backend changes for parallel backup

---
 src/backend/access/transam/xlog.c      |   2 +-
 src/backend/replication/basebackup.c   | 526 ++++++++++++++++++++++++-
 src/backend/replication/repl_gram.y    |  72 ++++
 src/backend/replication/repl_scanner.l |   7 +
 src/include/nodes/replnodes.h          |  10 +
 src/include/replication/basebackup.h   |   2 +-
 6 files changed, 605 insertions(+), 14 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index aa7d82a045..842b317c8d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -12265,7 +12265,7 @@ collectTablespaces(List **tablespaces, StringInfo tblspcmapfile,
 		ti->oid = pstrdup(de->d_name);
 		ti->path = pstrdup(buflinkpath.data);
 		ti->rpath = relpath ? pstrdup(relpath) : NULL;
-		ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+		ti->size = infotbssize ? sendTablespace(fullpath, true, NULL) : -1;
 
 		if (tablespaces)
 			*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index b8e3daf711..a0a6e816b0 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -41,6 +41,7 @@
 #include "utils/ps_status.h"
 #include "utils/relcache.h"
 #include "utils/timestamp.h"
+#include "utils/pg_lsn.h"
 
 
 typedef struct
@@ -52,11 +53,21 @@ typedef struct
 	bool		includewal;
 	uint32		maxrate;
 	bool		sendtblspcmapfile;
+	const char *tablespace_path;
+	XLogRecPtr	wal_location;
 } basebackup_options;
 
+typedef struct
+{
+	char		path[MAXPGPATH];
+	char		type;
+	int32		size;
+	time_t		mtime;
+} BackupFile;
+
 
 static int64 sendDir(const char *path, int basepathlen, bool dryrun,
-					 List *tablespaces, bool sendtblspclinks);
+					 List *tablespaces, bool sendtblspclinks, List **filelist);
 static bool sendFile(const char *readfilename, const char *tarfilename,
 					 struct stat *statbuf, bool missing_ok, Oid dboid);
 static void sendFileWithContent(const char *filename, const char *content);
@@ -76,6 +87,13 @@ static void throttle(size_t increment);
 static void setup_throttle(int maxrate);
 static bool is_checksummed_file(const char *fullpath, const char *filename);
 
+static void StartBackup(basebackup_options *opt);
+static void StopBackup(basebackup_options *opt);
+static void SendBackupFileList(basebackup_options *opt);
+static void SendBackupFiles(basebackup_options *opt, List *filenames, bool missing_ok);
+static void addToBackupFileList(List **filelist, char *path, char type, int32 size,
+								time_t mtime);
+
 /* Was the backup currently in-progress initiated in recovery mode? */
 static bool backup_started_in_recovery = false;
 
@@ -290,7 +308,7 @@ perform_base_backup(basebackup_options *opt)
 
 	/* Add a node for the base directory at the end */
 	ti = palloc0(sizeof(tablespaceinfo));
-	ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+	ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
 	tablespaces = lappend(tablespaces, ti);
 
 	/* Send tablespace header */
@@ -324,10 +342,10 @@ perform_base_backup(basebackup_options *opt)
 			if (tblspc_map_file && opt->sendtblspcmapfile)
 			{
 				sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data);
-				sendDir(".", 1, false, tablespaces, false);
+				sendDir(".", 1, false, tablespaces, false, NULL);
 			}
 			else
-				sendDir(".", 1, false, tablespaces, true);
+				sendDir(".", 1, false, tablespaces, true, NULL);
 
 			/* ... and pg_control after everything else. */
 			if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -338,7 +356,7 @@ perform_base_backup(basebackup_options *opt)
 			sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
 		}
 		else
-			sendTablespace(ti->path, false);
+			sendTablespace(ti->path, false, NULL);
 
 		/*
 		 * If we're including WAL, and this is the main data directory we
@@ -410,6 +428,8 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_maxrate = false;
 	bool		o_tablespace_map = false;
 	bool		o_noverify_checksums = false;
+	bool		o_tablespace_path = false;
+	bool		o_wal_location = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	foreach(lopt, options)
@@ -498,6 +518,29 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			noverify_checksums = true;
 			o_noverify_checksums = true;
 		}
+		else if (strcmp(defel->defname, "tablespace_path") == 0)
+		{
+			if (o_tablespace_path)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+			opt->tablespace_path = strVal(defel->arg);
+			o_tablespace_path = true;
+		}
+		else if (strcmp(defel->defname, "start_wal_location") == 0)
+		{
+			bool		have_error = false;
+			char	   *wal_location;
+
+			if (o_wal_location)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+
+			wal_location = strVal(defel->arg);
+			opt->wal_location = pg_lsn_in_internal(wal_location, &have_error);
+			o_wal_location = true;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
@@ -532,7 +575,29 @@ SendBaseBackup(BaseBackupCmd *cmd)
 		set_ps_display(activitymsg, false);
 	}
 
-	perform_base_backup(&opt);
+	switch (cmd->cmdtag)
+	{
+		case BASE_BACKUP:
+			perform_base_backup(&opt);
+			break;
+		case START_BACKUP:
+			StartBackup(&opt);
+			break;
+		case SEND_BACKUP_FILELIST:
+			SendBackupFileList(&opt);
+			break;
+		case SEND_BACKUP_FILES:
+			SendBackupFiles(&opt, cmd->backupfiles, true);
+			break;
+		case STOP_BACKUP:
+			StopBackup(&opt);
+			break;
+
+		default:
+			elog(ERROR, "unrecognized replication command tag: %u",
+				 cmd->cmdtag);
+			break;
+	}
 }
 
 static void
@@ -675,6 +740,61 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 	pq_puttextmessage('C', "SELECT");
 }
 
+/*
+ * Send a single resultset containing backup label and tablespace map
+ */
+static void
+SendStartBackupResult(StringInfo labelfile, StringInfo tblspc_map_file)
+{
+	StringInfoData buf;
+	Size		len;
+
+	pq_beginmessage(&buf, 'T'); /* RowDescription */
+	pq_sendint16(&buf, 2);		/* 2 fields */
+
+	/* Field headers */
+	pq_sendstring(&buf, "label");
+	pq_sendint32(&buf, 0);		/* table oid */
+	pq_sendint16(&buf, 0);		/* attnum */
+	pq_sendint32(&buf, TEXTOID);	/* type oid */
+	pq_sendint16(&buf, -1);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+
+	pq_sendstring(&buf, "tablespacemap");
+	pq_sendint32(&buf, 0);		/* table oid */
+	pq_sendint16(&buf, 0);		/* attnum */
+	pq_sendint32(&buf, TEXTOID);	/* type oid */
+	pq_sendint16(&buf, -1);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+	pq_endmessage(&buf);
+
+	/* Data row */
+	pq_beginmessage(&buf, 'D');
+	pq_sendint16(&buf, 2);		/* number of columns */
+
+	len = labelfile->len;
+	pq_sendint32(&buf, len);
+	pq_sendbytes(&buf, labelfile->data, len);
+
+	if (tblspc_map_file)
+	{
+		len = tblspc_map_file->len;
+		pq_sendint32(&buf, len);
+		pq_sendbytes(&buf, tblspc_map_file->data, len);
+	}
+	else
+	{
+		pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
+	}
+
+	pq_endmessage(&buf);
+
+	/* Send a CommandComplete message */
+	pq_puttextmessage('C', "SELECT");
+}
+
 /*
  * Inject a file with given name and content in the output tar stream.
  */
@@ -726,7 +846,7 @@ sendFileWithContent(const char *filename, const char *content)
  * Only used to send auxiliary tablespaces, not PGDATA.
  */
 int64
-sendTablespace(char *path, bool dryrun)
+sendTablespace(char *path, bool dryrun, List **filelist)
 {
 	int64		size;
 	char		pathbuf[MAXPGPATH];
@@ -755,11 +875,11 @@ sendTablespace(char *path, bool dryrun)
 		return 0;
 	}
 
+	addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 	size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
 						   dryrun);
-
 	/* Send all the files in the tablespace version directory */
-	size += sendDir(pathbuf, strlen(path), dryrun, NIL, true);
+	size += sendDir(pathbuf, strlen(path), dryrun, NIL, true, filelist);
 
 	return size;
 }
@@ -778,7 +898,7 @@ sendTablespace(char *path, bool dryrun)
  */
 static int64
 sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
-		bool sendtblspclinks)
+		bool sendtblspclinks, List **filelist)
 {
 	DIR		   *dir;
 	struct dirent *de;
@@ -932,6 +1052,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 			if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
 			{
 				elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+				addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 				size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
 				excludeFound = true;
 				break;
@@ -948,6 +1070,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 		if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
 		{
 			elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
+
+			addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 			size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
 			continue;
 		}
@@ -969,6 +1093,10 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 			size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
 									dryrun);
 
+			addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
+			addToBackupFileList(filelist, "./pg_wal/archive_status", 'd', -1,
+								statbuf.st_mtime);
+
 			continue;			/* don't recurse into pg_wal */
 		}
 
@@ -998,6 +1126,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 								pathbuf)));
 			linkpath[rllen] = '\0';
 
+			addToBackupFileList(filelist, pathbuf, 'l', statbuf.st_size, statbuf.st_mtime);
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
 									&statbuf, dryrun);
 #else
@@ -1024,6 +1153,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 			 */
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
 									dryrun);
+			addToBackupFileList(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 
 			/*
 			 * Call ourselves recursively for a directory, unless it happens
@@ -1054,13 +1184,15 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 				skip_this_dir = true;
 
 			if (!skip_this_dir)
-				size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks);
+				size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks, filelist);
 		}
 		else if (S_ISREG(statbuf.st_mode))
 		{
 			bool		sent = false;
 
-			if (!dryrun)
+			addToBackupFileList(filelist, pathbuf, 'f', statbuf.st_size, statbuf.st_mtime);
+
+			if (!dryrun && filelist == NULL)
 				sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
 								true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
 
@@ -1765,3 +1897,373 @@ setup_throttle(int maxrate)
 		throttling_counter = -1;
 	}
 }
+
+/*
+ * StartBackup - prepare to start an online backup.
+ *
+ * This function calls do_pg_start_backup() and sends back starting checkpoint,
+ * available tablespaces, content of backup_label and tablespace_map files.
+ */
+static void
+StartBackup(basebackup_options *opt)
+{
+	TimeLineID	starttli;
+	StringInfo	labelfile;
+	StringInfo	tblspc_map_file = NULL;
+	int			datadirpathlen;
+	List	   *tablespaces = NIL;
+	tablespaceinfo *ti;
+
+	datadirpathlen = strlen(DataDir);
+
+	backup_started_in_recovery = RecoveryInProgress();
+
+	labelfile = makeStringInfo();
+	tblspc_map_file = makeStringInfo();
+
+	total_checksum_failures = 0;
+
+	startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+								  labelfile, &tablespaces,
+								  tblspc_map_file,
+								  opt->progress, opt->sendtblspcmapfile);
+
+	/*
+	 * Once do_pg_start_backup has been called, ensure that any failure causes
+	 * us to abort the backup so we don't "leak" a backup counter. For this
+	 * reason, register base_backup_cleanup with before_shmem_exit handler. This
+	 * will make sure that call is always made when process exits. In success,
+	 * do_pg_stop_backup will have taken the system out of backup mode and this
+	 * callback will have no effect, Otherwise the required cleanup will be done
+	 * in any case.
+	 */
+	before_shmem_exit(base_backup_cleanup, (Datum) 0);
+
+	SendXlogRecPtrResult(startptr, starttli);
+
+	/*
+	 * Calculate the relative path of temporary statistics directory in
+	 * order to skip the files which are located in that directory later.
+	 */
+	if (is_absolute_path(pgstat_stat_directory) &&
+		strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
+		statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
+	else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
+		statrelpath = psprintf("./%s", pgstat_stat_directory);
+	else
+		statrelpath = pgstat_stat_directory;
+
+	/* Add a node for the base directory at the end */
+	ti = palloc0(sizeof(tablespaceinfo));
+	ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
+	tablespaces = lappend(tablespaces, ti);
+
+	/* Send tablespace header */
+	SendBackupHeader(tablespaces);
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	if ((tblspc_map_file && tblspc_map_file->len <= 0) ||
+		!opt->sendtblspcmapfile)
+		tblspc_map_file = NULL;
+
+	/* send backup_label and tablespace_map to frontend */
+	SendStartBackupResult(labelfile, tblspc_map_file);
+}
+
+/*
+ * StopBackup() - ends an online backup
+ *
+ * The function is called at the end of an online backup. It sends out pg_control
+ * file, optionaly WAL segments and ending WAL location.
+ */
+static void
+StopBackup(basebackup_options *opt)
+{
+	TimeLineID	endtli;
+	XLogRecPtr	endptr;
+	struct stat statbuf;
+	StringInfoData buf;
+	char	   *labelfile = NULL;
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint16(&buf, 0);		/* natts */
+	pq_endmessage(&buf);
+
+	/* ... and pg_control after everything else. */
+	if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						XLOG_CONTROL_FILE)));
+	sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+
+	/* stop backup */
+	labelfile = (char *) opt->label;
+	endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
+
+	if (opt->includewal)
+		include_wal_files(endptr);
+
+	pq_putemptymessage('c');	/* CopyDone */
+	SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * SendBackupFileList() - sends a list of filenames to frontend
+ *
+ * The function collects a list of filenames, necessary for a complete backup and
+ * sends this list to the client.
+ */
+static void
+SendBackupFileList(basebackup_options *opt)
+{
+	StringInfoData buf;
+	ListCell   *lc;
+	List	   *tablespaces = NIL;
+	StringInfo	tblspc_map_file = NULL;
+
+	tblspc_map_file = makeStringInfo();
+	collectTablespaces(&tablespaces, tblspc_map_file, false, false);
+
+	/* Add a node for the base directory at the end */
+	tablespaceinfo *ti = palloc0(sizeof(tablespaceinfo));
+	tablespaces = lappend(tablespaces, ti);
+
+	foreach(lc, tablespaces)
+	{
+		List	   *filelist = NULL;
+		tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
+
+		if (ti->path == NULL)
+			sendDir(".", 1, true, NIL, !opt->sendtblspcmapfile, &filelist);
+		else
+			sendTablespace(ti->path, true, &filelist);
+
+		/* Construct and send the list of filenames */
+		pq_beginmessage(&buf, 'T'); /* RowDescription */
+		pq_sendint16(&buf, 4);	/* n field */
+
+		/* First field - file name */
+		pq_sendstring(&buf, "filename");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, TEXTOID);
+		pq_sendint16(&buf, -1);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Second field - is_dir */
+		pq_sendstring(&buf, "type");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, CHAROID);
+		pq_sendint16(&buf, 1);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Third field - size */
+		pq_sendstring(&buf, "size");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, INT8OID);
+		pq_sendint16(&buf, 8);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Third field - mtime */
+		pq_sendstring(&buf, "mtime");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, INT8OID);
+		pq_sendint16(&buf, 8);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_endmessage(&buf);
+
+		foreach(lc, filelist)
+		{
+			BackupFile *backupFile = (BackupFile *) lfirst(lc);
+			Size		len;
+
+			/* Send one datarow message */
+			pq_beginmessage(&buf, 'D');
+			pq_sendint16(&buf, 4);	/* number of columns */
+
+			/* send file name */
+			len = strlen(backupFile->path);
+			pq_sendint32(&buf, len);
+			pq_sendbytes(&buf, backupFile->path, len);
+
+			/* send type */
+			pq_sendint32(&buf, 1);
+			pq_sendbyte(&buf, backupFile->type);
+
+			/* send size */
+			send_int8_string(&buf, backupFile->size);
+
+			/* send mtime */
+			send_int8_string(&buf, backupFile->mtime);
+
+			pq_endmessage(&buf);
+		}
+
+		pfree(filelist);
+	}
+
+	/* Send a CommandComplete message */
+	pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * SendBackupFiles() - sends the actual files to the caller
+ *
+ * The function sends out the given file(s) over to the caller using the COPY
+ * protocol.
+ */
+static void
+SendBackupFiles(basebackup_options *opt, List *filenames, bool missing_ok)
+{
+	StringInfoData buf;
+	ListCell   *lc;
+	bool		basetablespace = true;
+	int			basepathlen = 1;
+
+	if (list_length(filenames) <= 0)
+		return;
+
+	total_checksum_failures = 0;
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	if (is_absolute_path(opt->tablespace_path))
+	{
+		basepathlen = strlen(opt->tablespace_path);
+		basetablespace = false;
+	}
+
+	/* set backup start location. */
+	startptr = opt->wal_location;
+
+	/* Send CopyOutResponse message */
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint16(&buf, 0);		/* natts */
+	pq_endmessage(&buf);
+
+	foreach(lc, filenames)
+	{
+		struct stat statbuf;
+		char	   *pathbuf;
+
+		pathbuf = (char *) strVal(lfirst(lc));
+		if (lstat(pathbuf, &statbuf) != 0)
+		{
+			if (errno != ENOENT)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not stat file or directory \"%s\": %m",
+								pathbuf)));
+
+			/* If the file went away while scanning, it's not an error. */
+			continue;
+		}
+
+		/* Allow symbolic links in pg_tblspc only */
+		if (strstr(pathbuf, "./pg_tblspc") != NULL &&
+#ifndef WIN32
+			S_ISLNK(statbuf.st_mode)
+#else
+			pgwin32_is_junction(pathbuf)
+#endif
+			)
+		{
+			char		linkpath[MAXPGPATH];
+			int			rllen;
+
+			rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
+			if (rllen < 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not read symbolic link \"%s\": %m",
+								pathbuf)));
+			if (rllen >= sizeof(linkpath))
+				ereport(ERROR,
+						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+						 errmsg("symbolic link \"%s\" target is too long",
+								pathbuf)));
+			linkpath[rllen] = '\0';
+
+			_tarWriteHeader(pathbuf, linkpath, &statbuf, false);
+		}
+		else if (S_ISDIR(statbuf.st_mode))
+		{
+			_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+		}
+		else if (
+#ifndef WIN32
+				 S_ISLNK(statbuf.st_mode)
+#else
+				 pgwin32_is_junction(pathbuf)
+#endif
+			)
+		{
+			/*
+			 * If symlink, write it as a directory. file symlinks only allowed
+			 * in pg_tblspc
+			 */
+			statbuf.st_mode = S_IFDIR | pg_dir_create_mode;
+			_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+		}
+		else
+		{
+			/* send file to client */
+			sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, InvalidOid);
+		}
+	}
+
+	pq_putemptymessage('c');	/* CopyDone */
+
+	/*
+	 * Check for checksum failures. If there are failures across multiple
+	 * processes it may not report total checksum count, but it will error
+	 * out,terminating the backup.
+	 */
+	if (total_checksum_failures)
+	{
+		if (total_checksum_failures > 1)
+			ereport(WARNING,
+					(errmsg("%lld total checksum verification failures", total_checksum_failures)));
+
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("checksum verification failure during base backup")));
+	}
+}
+
+/*
+ * Construct a BackupFile entry and add to the list.
+ */
+static void
+addToBackupFileList(List **filelist,  char *path, char type, int32 size,
+					time_t mtime)
+{
+	BackupFile *backupFile;
+
+	if (filelist)
+	{
+		backupFile = (BackupFile *) palloc0(sizeof(BackupFile));
+		strlcpy(backupFile->path, path, sizeof(backupFile->path));
+		backupFile->type = type;
+		backupFile->size = size;
+		backupFile->mtime = mtime;
+
+		*filelist = lappend(*filelist, backupFile);
+	}
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc4e8..5619837ebe 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,6 +87,12 @@ static SQLCmd *make_sqlcmd(void);
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_START_BACKUP
+%token K_SEND_BACKUP_FILELIST
+%token K_SEND_BACKUP_FILES
+%token K_STOP_BACKUP
+%token K_START_WAL_LOCATION
+%token K_TABLESPACE_PATH
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -102,6 +108,8 @@ static SQLCmd *make_sqlcmd(void);
 %type <boolval>	opt_temporary
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
+%type <list>	backup_files backup_files_list
+%type <node>	backup_file
 
 %%
 
@@ -162,6 +170,36 @@ base_backup:
 				{
 					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
 					cmd->options = $2;
+					cmd->cmdtag = BASE_BACKUP;
+					$$ = (Node *) cmd;
+				}
+			| K_START_BACKUP base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = START_BACKUP;
+					$$ = (Node *) cmd;
+				}
+			| K_SEND_BACKUP_FILELIST base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = SEND_BACKUP_FILELIST;
+					$$ = (Node *) cmd;
+				}
+			| K_SEND_BACKUP_FILES backup_files base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $3;
+					cmd->cmdtag = SEND_BACKUP_FILES;
+					cmd->backupfiles = $2;
+					$$ = (Node *) cmd;
+				}
+			| K_STOP_BACKUP base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = STOP_BACKUP;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -214,6 +252,40 @@ base_backup_opt:
 				  $$ = makeDefElem("noverify_checksums",
 								   (Node *)makeInteger(true), -1);
 				}
+			| K_START_WAL_LOCATION SCONST
+				{
+				  $$ = makeDefElem("start_wal_location",
+								   (Node *)makeString($2), -1);
+				}
+			| K_TABLESPACE_PATH SCONST
+				{
+				  $$ = makeDefElem("tablespace_path",
+								   (Node *)makeString($2), -1);
+				}
+			;
+
+backup_files:
+			'(' backup_files_list ')'
+				{
+					$$ = $2;
+				}
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+backup_files_list:
+			backup_file
+				{
+					$$ = list_make1($1);
+				}
+			| backup_files_list ',' backup_file
+				{
+					$$ = lappend($1, $3);
+				}
+			;
+
+backup_file:
+			SCONST							{ $$ = (Node *) makeString($1); }
 			;
 
 create_replication_slot:
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb5f6..c57ff02d39 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,13 @@ EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
 WAIT				{ return K_WAIT; }
+START_BACKUP		{ return K_START_BACKUP; }
+SEND_BACKUP_FILELIST	{ return K_SEND_BACKUP_FILELIST; }
+SEND_BACKUP_FILES	{ return K_SEND_BACKUP_FILES; }
+STOP_BACKUP			{ return K_STOP_BACKUP; }
+START_WAL_LOCATION	{ return K_START_WAL_LOCATION; }
+TABLESPACE_PATH		{ return K_TABLESPACE_PATH; }
+
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 1e3ed4e19f..3685f260b5 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,14 @@ typedef enum ReplicationKind
 	REPLICATION_KIND_LOGICAL
 } ReplicationKind;
 
+typedef enum BackupCmdTag
+{
+	BASE_BACKUP,
+	START_BACKUP,
+	SEND_BACKUP_FILELIST,
+	SEND_BACKUP_FILES,
+	STOP_BACKUP
+} BackupCmdTag;
 
 /* ----------------------
  *		IDENTIFY_SYSTEM command
@@ -42,6 +50,8 @@ typedef struct BaseBackupCmd
 {
 	NodeTag		type;
 	List	   *options;
+	BackupCmdTag cmdtag;
+	List	   *backupfiles;
 } BaseBackupCmd;
 
 
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index b55917b9b6..5202e4160b 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,6 @@ typedef struct
 
 extern void SendBaseBackup(BaseBackupCmd *cmd);
 
-extern int64 sendTablespace(char *path, bool dryrun);
+extern int64 sendTablespace(char *path, bool dryrun, List **filelist);
 
 #endif							/* _BASEBACKUP_H */
-- 
2.21.0 (Apple Git-122)