0003-Parallel-Backup-Backend-Replication-commands_v9.patch

application/octet-stream

Filename: 0003-Parallel-Backup-Backend-Replication-commands_v9.patch
Type: application/octet-stream
Part: 3
Message: Re: WIP/PoC for parallel backup

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v9-0003
Subject: Parallel Backup - Backend Replication commands
File+
src/backend/access/transam/xlog.c 2 2
src/backend/replication/basebackup.c 507 22
src/backend/replication/repl_gram.y 221 44
src/backend/replication/repl_scanner.l 8 0
src/include/nodes/replnodes.h 12 0
src/include/replication/basebackup.h 1 1
From ab91e2c9078bfe42fb9306314304c558a41b7632 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Mon, 27 Jan 2020 18:32:42 +0500
Subject: [PATCH 3/6] Parallel Backup - Backend Replication commands

This feature adds following replication commands to the backend replication
system, to help facilitate taking a full backup in parallel using multiple
connections.

	- START_BACKUP [LABEL '<label>'] [FAST]
	This command instructs the server to get prepared for performing an
	online backup.

	- STOP_BACKUP [NOWAIT]
	This command instructs the server that online backup is finished. It
	will bring the system out of backup mode.

	- LIST_TABLESPACES [PROGRESS]
	This command instructs the server to return a list of tablespaces.

	- LIST_FILES [TABLESPACE]
	This command instructs the server to return a list of files for a
	given tablespace, base tablespace if TABLESPACE is empty.

	- LIST_WAL_FILES [START_WAL_LOCATION 'X/X'] [END_WAL_LOCATION 'X/X']
	This command instructs the server to return a list WAL files between
	the given locations.

	- SEND_FILES '(' FILE, FILE... ')' [START_WAL_LOCATION 'X/X']
			[NOVERIFY_CHECKSUMS]
	Instructs the server to send the contents of the requested FILE(s).
---
 src/backend/access/transam/xlog.c      |   4 +-
 src/backend/replication/basebackup.c   | 529 ++++++++++++++++++++++++-
 src/backend/replication/repl_gram.y    | 265 +++++++++++--
 src/backend/replication/repl_scanner.l |   8 +
 src/include/nodes/replnodes.h          |  12 +
 src/include/replication/basebackup.h   |   2 +-
 6 files changed, 751 insertions(+), 69 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f5670141126..4189b056c88 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -11128,7 +11128,7 @@ do_pg_abort_backup(int code, Datum arg)
 
 	if (emit_warning)
 		ereport(WARNING,
-				(errmsg("aborting backup due to backend exiting before pg_stop_back up was called")));
+				(errmsg("aborting backup due to backend exiting while a non-exclusive backup is in progress")));
 }
 
 /*
@@ -12377,7 +12377,7 @@ collect_tablespaces(List **tablespaces, StringInfo tblspcmapfile,
 		ti->oid = pstrdup(de->d_name);
 		ti->path = pstrdup(buflinkpath.data);
 		ti->rpath = relpath ? pstrdup(relpath) : NULL;
-		ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+		ti->size = infotbssize ? sendTablespace(fullpath, true, NULL) : -1;
 
 		if (tablespaces)
 			*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index abc3bad01ee..a294d77da50 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -39,6 +39,8 @@
 #include "storage/ipc.h"
 #include "storage/reinit.h"
 #include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
 #include "utils/relcache.h"
 #include "utils/timestamp.h"
@@ -52,11 +54,22 @@ typedef struct
 	bool		includewal;
 	uint32		maxrate;
 	bool		sendtblspcmapfile;
+	XLogRecPtr	startwallocation;
+	XLogRecPtr	endwallocation;
+	char	   *tablespace;
 } basebackup_options;
 
+typedef struct
+{
+	char		path[MAXPGPATH];
+	char		type;
+	size_t		size;
+	time_t		mtime;
+} BackupFile;
+
 
 static int64 sendDir(const char *path, int basepathlen, bool dryrun,
-					 List *tablespaces, bool sendtblspclinks);
+					 List *tablespaces, bool sendtblspclinks, List **filelist);
 static bool sendFile(const char *readfilename, const char *tarfilename,
 					 struct stat *statbuf, bool missing_ok, Oid dboid);
 static void sendFileWithContent(const char *filename, const char *content);
@@ -70,12 +83,28 @@ static void perform_base_backup(basebackup_options *opt);
 static List *collect_wal_files(XLogRecPtr startptr, XLogRecPtr endptr,
 							   List **historyFileList);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
-static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
+static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli, StringInfo label);
+static void SendFilesHeader(List *files);
 static int	compareWalFileNames(const ListCell *a, const ListCell *b);
 static void throttle(size_t increment);
 static void update_basebackup_progress(int64 delta);
 static bool is_checksummed_file(const char *fullpath, const char *filename);
 
+static void start_backup(basebackup_options *opt);
+static void stop_backup(basebackup_options *opt);
+static void list_tablespaces(basebackup_options *opt);
+static void list_files(basebackup_options *opt);
+static void list_wal_files(basebackup_options *opt);
+static void send_files(basebackup_options *opt, List *filenames,
+					   bool missing_ok);
+static void add_to_filelist(List **filelist, char *path, char type,
+							size_t size, time_t mtime);
+
+/*
+ * Store label file during non-exclusive backups.
+ */
+static StringInfo label_file;
+
 /* Was the backup currently in-progress initiated in recovery mode? */
 static bool backup_started_in_recovery = false;
 
@@ -303,7 +332,7 @@ perform_base_backup(basebackup_options *opt)
 
 		/* Add a node for the base directory at the end */
 		ti = palloc0(sizeof(tablespaceinfo));
-		ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+		ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
 		tablespaces = lappend(tablespaces, ti);
 
 		/*
@@ -336,7 +365,7 @@ perform_base_backup(basebackup_options *opt)
 		}
 
 		/* Send the starting position of the backup */
-		SendXlogRecPtrResult(startptr, starttli);
+		SendXlogRecPtrResult(startptr, starttli, NULL);
 
 		/* Send tablespace header */
 		SendBackupHeader(tablespaces);
@@ -391,10 +420,10 @@ perform_base_backup(basebackup_options *opt)
 				if (tblspc_map_file && opt->sendtblspcmapfile)
 				{
 					sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data);
-					sendDir(".", 1, false, tablespaces, false);
+					sendDir(".", 1, false, tablespaces, false, NULL);
 				}
 				else
-					sendDir(".", 1, false, tablespaces, true);
+					sendDir(".", 1, false, tablespaces, true, NULL);
 
 				/* ... and pg_control after everything else. */
 				if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -405,7 +434,7 @@ perform_base_backup(basebackup_options *opt)
 				sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
 			}
 			else
-				sendTablespace(ti->path, false);
+				sendTablespace(ti->path, false, NULL);
 
 			/*
 			 * If we're including WAL, and this is the main data directory we
@@ -568,7 +597,7 @@ perform_base_backup(basebackup_options *opt)
 		/* Send CopyDone message for the last tar file */
 		pq_putemptymessage('c');
 	}
-	SendXlogRecPtrResult(endptr, endtli);
+	SendXlogRecPtrResult(endptr, endtli, NULL);
 
 	if (total_checksum_failures)
 	{
@@ -726,6 +755,9 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_maxrate = false;
 	bool		o_tablespace_map = false;
 	bool		o_noverify_checksums = false;
+	bool		o_startwallocation = false;
+	bool		o_endwallocation = false;
+	bool		o_tablespace = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	foreach(lopt, options)
@@ -814,12 +846,47 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			noverify_checksums = true;
 			o_noverify_checksums = true;
 		}
+		else if (strcmp(defel->defname, "start_wal_location") == 0)
+		{
+			bool		have_error = false;
+			char	   *startwallocation;
+
+			if (o_startwallocation)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+
+			startwallocation = strVal(defel->arg);
+			opt->startwallocation = pg_lsn_in_internal(startwallocation, &have_error);
+			o_startwallocation = true;
+		}
+		else if (strcmp(defel->defname, "end_wal_location") == 0)
+		{
+			bool		have_error = false;
+			char	   *endwallocation;
+
+			if (o_endwallocation)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+
+			endwallocation = strVal(defel->arg);
+			opt->endwallocation = pg_lsn_in_internal(endwallocation, &have_error);
+			o_endwallocation = true;
+		}
+		else if (strcmp(defel->defname, "tablespace") == 0)
+		{
+			if (o_tablespace)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+			opt->tablespace = strVal(defel->arg);
+			o_tablespace = true;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
 	}
-	if (opt->label == NULL)
-		opt->label = "base backup";
 }
 
 
@@ -837,6 +904,15 @@ SendBaseBackup(BaseBackupCmd *cmd)
 
 	parse_basebackup_options(cmd->options, &opt);
 
+	/* default value for label, if not specified. */
+	if (opt.label == NULL)
+	{
+		if (cmd->cmdtag == BASE_BACKUP)
+			opt.label = "base backup";
+		else
+			opt.label = "start backup";
+	}
+
 	WalSndSetState(WALSNDSTATE_BACKUP);
 
 	if (update_process_title)
@@ -848,7 +924,34 @@ SendBaseBackup(BaseBackupCmd *cmd)
 		set_ps_display(activitymsg);
 	}
 
-	perform_base_backup(&opt);
+	switch (cmd->cmdtag)
+	{
+		case BASE_BACKUP:
+			perform_base_backup(&opt);
+			break;
+		case START_BACKUP:
+			start_backup(&opt);
+			break;
+		case LIST_TABLESPACES:
+			list_tablespaces(&opt);
+			break;
+		case LIST_FILES:
+			list_files(&opt);
+			break;
+		case SEND_FILES:
+			send_files(&opt, cmd->backupfiles, true);
+			break;
+		case STOP_BACKUP:
+			stop_backup(&opt);
+			break;
+		case LIST_WAL_FILES:
+			list_wal_files(&opt);
+			break;
+		default:
+			elog(ERROR, "unrecognized replication command tag: %u",
+				 cmd->cmdtag);
+			break;
+	}
 }
 
 static void
@@ -936,18 +1039,18 @@ SendBackupHeader(List *tablespaces)
 }
 
 /*
- * Send a single resultset containing just a single
- * XLogRecPtr record (in text format)
+ * Send a single resultset containing XLogRecPtr record (in text format)
+ * TimelineID and backup label.
  */
 static void
-SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
+SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli, StringInfo label)
 {
 	StringInfoData buf;
 	char		str[MAXFNAMELEN];
 	Size		len;
 
 	pq_beginmessage(&buf, 'T'); /* RowDescription */
-	pq_sendint16(&buf, 2);		/* 2 fields */
+	pq_sendint16(&buf, 3);		/* 3 fields */
 
 	/* Field headers */
 	pq_sendstring(&buf, "recptr");
@@ -970,11 +1073,19 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 	pq_sendint16(&buf, -1);
 	pq_sendint32(&buf, 0);
 	pq_sendint16(&buf, 0);
+
+	pq_sendstring(&buf, "label");
+	pq_sendint32(&buf, 0);		/* table oid */
+	pq_sendint16(&buf, 0);		/* attnum */
+	pq_sendint32(&buf, TEXTOID);	/* type oid */
+	pq_sendint16(&buf, -1);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
 	pq_endmessage(&buf);
 
 	/* Data row */
 	pq_beginmessage(&buf, 'D');
-	pq_sendint16(&buf, 2);		/* number of columns */
+	pq_sendint16(&buf, 3);		/* number of columns */
 
 	len = snprintf(str, sizeof(str),
 				   "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
@@ -985,12 +1096,109 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 	pq_sendint32(&buf, len);
 	pq_sendbytes(&buf, str, len);
 
+	if (label)
+	{
+		len = label->len;
+		pq_sendint32(&buf, len);
+		pq_sendbytes(&buf, label->data, len);
+	}
+	else
+	{
+		pq_sendint32(&buf, -1); /* NULL */
+	}
+
 	pq_endmessage(&buf);
 
 	/* Send a CommandComplete message */
 	pq_puttextmessage('C', "SELECT");
 }
 
+
+/*
+ * Sends the resultset containing filename, type (where type can be f' for
+ * regular, 'd' for directory, 'l' for link), file size and modification time).
+ */
+static void
+SendFilesHeader(List *files)
+{
+	StringInfoData buf;
+	ListCell   *lc;
+
+	/* Construct and send the list of files */
+
+	pq_beginmessage(&buf, 'T'); /* RowDescription */
+	pq_sendint16(&buf, 4);		/* n field */
+
+	/* First field - file name */
+	pq_sendstring(&buf, "path");
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+	pq_sendint32(&buf, TEXTOID);
+	pq_sendint16(&buf, -1);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+
+	/* Second field - is_dir */
+	pq_sendstring(&buf, "type");
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+	pq_sendint32(&buf, CHAROID);
+	pq_sendint16(&buf, 1);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+
+	/* Third field - size */
+	pq_sendstring(&buf, "size");
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+	pq_sendint32(&buf, INT8OID);
+	pq_sendint16(&buf, 8);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+
+	/* Fourth field - mtime */
+	pq_sendstring(&buf, "mtime");
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+	pq_sendint32(&buf, INT8OID);
+	pq_sendint16(&buf, 8);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+	pq_endmessage(&buf);
+
+	foreach(lc, files)
+	{
+		BackupFile *file = (BackupFile *) lfirst(lc);
+		Size		len;
+
+		/* Send one datarow message */
+		pq_beginmessage(&buf, 'D');
+		pq_sendint16(&buf, 4);	/* number of columns */
+
+		/* send path */
+		len = strlen(file->path);
+		pq_sendint32(&buf, len);
+		pq_sendbytes(&buf, file->path, len);
+
+		/* send type */
+		pq_sendint32(&buf, 1);
+		pq_sendbyte(&buf, file->type);
+
+		/* send size */
+		send_int8_string(&buf, file->size);
+
+		/* send mtime */
+		send_int8_string(&buf, file->mtime);
+
+		pq_endmessage(&buf);
+	}
+
+	list_free(files);
+
+	/* Send a CommandComplete message */
+	pq_puttextmessage('C', "SELECT");
+}
+
 /*
  * Inject a file with given name and content in the output tar stream.
  */
@@ -1044,7 +1252,7 @@ sendFileWithContent(const char *filename, const char *content)
  * Only used to send auxiliary tablespaces, not PGDATA.
  */
 int64
-sendTablespace(char *path, bool dryrun)
+sendTablespace(char *path, bool dryrun, List **filelist)
 {
 	int64		size;
 	char		pathbuf[MAXPGPATH];
@@ -1073,11 +1281,11 @@ sendTablespace(char *path, bool dryrun)
 		return 0;
 	}
 
+	add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 	size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
 						   dryrun);
-
 	/* Send all the files in the tablespace version directory */
-	size += sendDir(pathbuf, strlen(path), dryrun, NIL, true);
+	size += sendDir(pathbuf, strlen(path), dryrun, NIL, true, filelist);
 
 	return size;
 }
@@ -1096,7 +1304,7 @@ sendTablespace(char *path, bool dryrun)
  */
 static int64
 sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
-		bool sendtblspclinks)
+		bool sendtblspclinks, List **filelist)
 {
 	DIR		   *dir;
 	struct dirent *de;
@@ -1254,6 +1462,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 			if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
 			{
 				elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+				add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 				size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
 				excludeFound = true;
 				break;
@@ -1270,6 +1480,8 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 		if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
 		{
 			elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
+
+			add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 			size += _tarWriteDir(pathbuf, basepathlen, &statbuf, dryrun);
 			continue;
 		}
@@ -1291,6 +1503,10 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 			size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
 									dryrun);
 
+			add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
+			add_to_filelist(filelist, "./pg_wal/archive_status", 'd', -1,
+							statbuf.st_mtime);
+
 			continue;			/* don't recurse into pg_wal */
 		}
 
@@ -1320,6 +1536,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 								pathbuf)));
 			linkpath[rllen] = '\0';
 
+			add_to_filelist(filelist, pathbuf, 'l', statbuf.st_size, statbuf.st_mtime);
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
 									&statbuf, dryrun);
 #else
@@ -1346,6 +1563,7 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 			 */
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
 									dryrun);
+			add_to_filelist(filelist, pathbuf, 'd', -1, statbuf.st_mtime);
 
 			/*
 			 * Call ourselves recursively for a directory, unless it happens
@@ -1376,13 +1594,15 @@ sendDir(const char *path, int basepathlen, bool dryrun, List *tablespaces,
 				skip_this_dir = true;
 
 			if (!skip_this_dir)
-				size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks);
+				size += sendDir(pathbuf, basepathlen, dryrun, tablespaces, sendtblspclinks, filelist);
 		}
 		else if (S_ISREG(statbuf.st_mode))
 		{
 			bool		sent = false;
 
-			if (!dryrun)
+			add_to_filelist(filelist, pathbuf, 'f', statbuf.st_size, statbuf.st_mtime);
+
+			if (!dryrun && filelist == NULL)
 				sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
 								true, isDbDir ? atooid(lastDir + 1) : InvalidOid);
 
@@ -1867,3 +2087,268 @@ update_basebackup_progress(int64 delta)
 
 	pgstat_progress_update_multi_param(nparam, index, val);
 }
+
+/*
+ * start_backup - prepare to start an online backup.
+ *
+ * This function calls do_pg_start_backup() and sends back starting checkpoint,
+ * available tablespaces, content of backup_label and tablespace_map files.
+ */
+static void
+start_backup(basebackup_options *opt)
+{
+	TimeLineID	starttli;
+	StringInfo	tblspc_map_file;
+	MemoryContext oldcontext;
+
+	/* Label file need to be long-lived, since its read in stop_backup. */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	label_file = makeStringInfo();
+	MemoryContextSwitchTo(oldcontext);
+
+	/*
+	 * tablespace map file is not used, but since this argument is required by
+	 * do_pg_start_backup, we have to provide it here.
+	 */
+	tblspc_map_file = makeStringInfo();
+
+	register_persistent_abort_backup_handler();
+	startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+								  label_file, NULL, tblspc_map_file, false, false);
+
+	/* send startptr and starttli to frontend */
+	SendXlogRecPtrResult(startptr, starttli, NULL);
+
+	/* free tablspace map buffer. */
+	pfree(tblspc_map_file->data);
+	pfree(tblspc_map_file);
+}
+
+/*
+ * stop_backup() - ends an online backup
+ *
+ * The function is called at the end of an online backup. It sends out pg_control
+ * file, optionally WAL segments and ending WAL location.
+ */
+static void
+stop_backup(basebackup_options *opt)
+{
+	TimeLineID	endtli;
+	XLogRecPtr	endptr;
+
+	if (get_backup_status() != SESSION_BACKUP_NON_EXCLUSIVE)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("non-exclusive backup is not in progress")));
+
+	/*
+	 * Stop the non-exclusive backup. Return a copy of the backup label so it
+	 * can be written to disk by the caller.
+	 */
+	endptr = do_pg_stop_backup(label_file->data, !opt->nowait, &endtli);
+	SendXlogRecPtrResult(endptr, endtli, label_file);
+
+	/* Free structures allocated in TopMemoryContext */
+	pfree(label_file->data);
+	pfree(label_file);
+	label_file = NULL;
+}
+
+/*
+ * list_tablespaces() - sends a list of tablespace entries
+ */
+static void
+list_tablespaces(basebackup_options *opt)
+{
+	StringInfo	tblspc_map_file;
+	List	   *tablespaces = NIL;
+	tablespaceinfo *ti;
+
+	tblspc_map_file = makeStringInfo();
+	collect_tablespaces(&tablespaces, tblspc_map_file, opt->progress, false);
+
+	/* Add a node for the base directory at the end */
+	ti = palloc0(sizeof(tablespaceinfo));
+	ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true, NULL) : -1;
+	tablespaces = lappend(tablespaces, ti);
+
+	SendBackupHeader(tablespaces);
+	list_free(tablespaces);
+}
+
+/*
+ * list_files() - sends a list of files available in given tablespace.
+ */
+static void
+list_files(basebackup_options *opt)
+{
+	List	   *files = NIL;
+	int			datadirpathlen;
+
+	datadirpathlen = strlen(DataDir);
+
+	/*
+	 * Calculate the relative path of temporary statistics directory in order
+	 * to skip the files which are located in that directory later.
+	 */
+	if (is_absolute_path(pgstat_stat_directory) &&
+		strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
+		statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
+	else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
+		statrelpath = psprintf("./%s", pgstat_stat_directory);
+	else
+		statrelpath = pgstat_stat_directory;
+
+	if (strlen(opt->tablespace) > 0)
+		sendTablespace(opt->tablespace, true, &files);
+	else
+		sendDir(".", 1, true, NIL, true, &files);
+
+	SendFilesHeader(files);
+}
+
+/*
+ * list_wal_files() - sends a list of WAL files between start wal location and
+ * end wal location.
+ */
+static void
+list_wal_files(basebackup_options *opt)
+{
+	List	   *historyFileList = NIL;
+	List	   *walFileList = NIL;
+	List	   *files = NIL;
+	ListCell   *lc;
+
+	walFileList = collect_wal_files(opt->startwallocation, opt->endwallocation,
+									&historyFileList);
+	foreach(lc, walFileList)
+	{
+		char		pathbuf[MAXPGPATH];
+		char	   *walFileName = (char *) lfirst(lc);
+
+		snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
+		add_to_filelist(&files, pathbuf, 'f', wal_segment_size, 0);
+	}
+
+	SendFilesHeader(files);
+}
+
+/*
+ * send_files() - sends the actual files to the caller
+ *
+ * The function sends out the given file(s) over to the caller using the COPY
+ * protocol. It does only entertains the regular files and any other kind such
+ * as directories or symlink etc will be ignored.
+ */
+static void
+send_files(basebackup_options *opt, List *filenames, bool missing_ok)
+{
+	StringInfoData buf;
+	ListCell   *lc;
+	int			basepathlen = 0;
+
+	if (list_length(filenames) <= 0)
+		return;
+
+	total_checksum_failures = 0;
+
+	/* Disable throttling. */
+	throttling_counter = -1;
+
+	/* set backup start location. */
+	startptr = opt->startwallocation;
+
+	/* Send CopyOutResponse message */
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint16(&buf, 0);		/* natts */
+	pq_endmessage(&buf);
+
+	foreach(lc, filenames)
+	{
+		struct stat statbuf;
+		char	   *pathbuf;
+
+		pathbuf = (char *) strVal(lfirst(lc));
+		if (is_absolute_path(pathbuf))
+		{
+			char	   *basepath;
+
+			/*
+			 * 'pathbuf' points to the tablespace location, but we only want
+			 * to include the version directory in it that belongs to us.
+			 */
+			basepath = strstr(pathbuf, TABLESPACE_VERSION_DIRECTORY);
+			if (basepath)
+				basepathlen = basepath - pathbuf - 1;
+		}
+		else if (pathbuf[0] == '.' && pathbuf[1] == '/')
+			basepathlen = 2;
+		else
+			basepathlen = 0;
+
+		if (lstat(pathbuf, &statbuf) != 0)
+		{
+			if (errno != ENOENT)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not stat file or directory \"%s\": %m",
+								pathbuf)));
+
+			/* If the file went away while scanning, it's not an error. */
+			continue;
+		}
+
+		/*
+		 * Only entertain requests for regular file, skip any directories or
+		 * special files.
+		 */
+		if (S_ISREG(statbuf.st_mode))
+		{
+			/* send file to client */
+			sendFile(pathbuf, pathbuf + basepathlen, &statbuf, true, InvalidOid);
+		}
+		else
+			ereport(WARNING,
+					(errmsg("skipping special file or directory \"%s\"", pathbuf)));
+	}
+
+	pq_putemptymessage('c');	/* CopyDone */
+
+	/*
+	 * Check for checksum failures. If there are failures across multiple
+	 * processes it may not report total checksum count, but it will error
+	 * out,terminating the backup.
+	 */
+	if (total_checksum_failures)
+	{
+		if (total_checksum_failures > 1)
+			ereport(WARNING,
+					(errmsg("%lld total checksum verification failures", total_checksum_failures)));
+
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("checksum verification failure during base backup")));
+	}
+}
+
+/*
+ * Construct a BackupFile entry and add to the list.
+ */
+static void
+add_to_filelist(List **filelist, char *path, char type, size_t size,
+				time_t mtime)
+{
+	BackupFile *file;
+
+	if (filelist)
+	{
+		file = (BackupFile *) palloc(sizeof(BackupFile));
+		strlcpy(file->path, path, sizeof(file->path));
+		file->type = type;
+		file->size = size;
+		file->mtime = mtime;
+
+		*filelist = lappend(*filelist, file);
+	}
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 14fcd532218..16e5402d55d 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,13 +87,28 @@ static SQLCmd *make_sqlcmd(void);
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_START_BACKUP
+%token K_LIST_TABLESPACES
+%token K_LIST_FILES
+%token K_SEND_FILES
+%token K_STOP_BACKUP
+%token K_LIST_WAL_FILES
+%token K_START_WAL_LOCATION
+%token K_END_WAL_LOCATION
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
 				timeline_history show sql_cmd
-%type <list>	base_backup_opt_list
-%type <defelt>	base_backup_opt
+%type <list>	base_backup_opt_list start_backup_opt_list stop_backup_opt_list
+				list_tablespace_opt_list list_files_opt_list
+				list_wal_files_opt_list send_backup_files_opt_list
+				backup_files backup_files_list
+%type <defelt>	base_backup_opt backup_opt_label backup_opt_progress
+				backup_opt_fast backup_opt_wal backup_opt_nowait
+				backup_opt_maxrate backup_opt_tsmap backup_opt_chksum
+				backup_opt_start_wal_loc backup_opt_end_wal_loc
+				backup_opt_tablespace start_backup_opt send_backup_files_opt
 %type <uintval>	opt_timeline
 %type <list>	plugin_options plugin_opt_list
 %type <defelt>	plugin_opt_elem
@@ -153,69 +168,231 @@ var_name:	IDENT	{ $$ = $1; }
 				{ $$ = psprintf("%s.%s", $1, $3); }
 		;
 
-/*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
- * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS]
- */
 base_backup:
+			/*
+			 * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+			 * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS]
+			 */
 			K_BASE_BACKUP base_backup_opt_list
 				{
 					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
 					cmd->options = $2;
+					cmd->cmdtag = BASE_BACKUP;
 					$$ = (Node *) cmd;
 				}
-			;
-
-base_backup_opt_list:
-			base_backup_opt_list base_backup_opt
-				{ $$ = lappend($1, $2); }
-			| /* EMPTY */
-				{ $$ = NIL; }
-			;
-
-base_backup_opt:
-			K_LABEL SCONST
-				{
-				  $$ = makeDefElem("label",
-								   (Node *)makeString($2), -1);
-				}
-			| K_PROGRESS
+			 /* START_BACKUP [LABEL '<label>'] [FAST] */
+			| K_START_BACKUP start_backup_opt_list
 				{
-				  $$ = makeDefElem("progress",
-								   (Node *)makeInteger(true), -1);
-				}
-			| K_FAST
-				{
-				  $$ = makeDefElem("fast",
-								   (Node *)makeInteger(true), -1);
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = START_BACKUP;
+					$$ = (Node *) cmd;
 				}
-			| K_WAL
+			/* STOP_BACKUP [NOWAIT] */
+			| K_STOP_BACKUP stop_backup_opt_list
 				{
-				  $$ = makeDefElem("wal",
-								   (Node *)makeInteger(true), -1);
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = STOP_BACKUP;
+					$$ = (Node *) cmd;
 				}
-			| K_NOWAIT
+			/* LIST_TABLESPACES [PROGRESS] */
+			| K_LIST_TABLESPACES list_tablespace_opt_list
 				{
-				  $$ = makeDefElem("nowait",
-								   (Node *)makeInteger(true), -1);
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = LIST_TABLESPACES;
+					$$ = (Node *) cmd;
 				}
-			| K_MAX_RATE UCONST
+			/* LIST_FILES [TABLESPACE] */
+			| K_LIST_FILES list_files_opt_list
 				{
-				  $$ = makeDefElem("max_rate",
-								   (Node *)makeInteger($2), -1);
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = LIST_FILES;
+					$$ = (Node *) cmd;
 				}
-			| K_TABLESPACE_MAP
+			/* LIST_WAL_FILES [START_WAL_LOCATION 'X/X'] [END_WAL_LOCATION 'X/X'] */
+			| K_LIST_WAL_FILES list_wal_files_opt_list
 				{
-				  $$ = makeDefElem("tablespace_map",
-								   (Node *)makeInteger(true), -1);
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = LIST_WAL_FILES;
+					$$ = (Node *) cmd;
 				}
-			| K_NOVERIFY_CHECKSUMS
+			/*
+			 * SEND_FILES '(' 'FILE' [, ...] ')' [START_WAL_LOCATION 'X/X']
+			 *		[NOVERIFY_CHECKSUMS]
+			 */
+			| K_SEND_FILES backup_files send_backup_files_opt_list
 				{
-				  $$ = makeDefElem("noverify_checksums",
-								   (Node *)makeInteger(true), -1);
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $3;
+					cmd->cmdtag = SEND_FILES;
+					cmd->backupfiles = $2;
+					$$ = (Node *) cmd;
 				}
 			;
 
+base_backup_opt_list:
+			base_backup_opt_list base_backup_opt
+				{ $$ = lappend($1, $2); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+base_backup_opt:
+			backup_opt_label		{ $$ = $1; }
+			| backup_opt_progress	{ $$ = $1; }
+			| backup_opt_fast		{ $$ = $1; }
+			| backup_opt_wal 		{ $$ = $1; }
+			| backup_opt_nowait		{ $$ = $1; }
+			| backup_opt_maxrate	{ $$ = $1; }
+			| backup_opt_tsmap		{ $$ = $1; }
+			| backup_opt_chksum		{ $$ = $1; }
+			;
+
+start_backup_opt_list:
+			start_backup_opt_list start_backup_opt
+				{ $$ = lappend($1, $2); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+start_backup_opt:
+			backup_opt_label		{ $$ = $1; }
+			| backup_opt_fast		{ $$ = $1; }
+			;
+
+stop_backup_opt_list:
+			backup_opt_nowait
+				{ $$ = list_make1($1); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+list_tablespace_opt_list:
+			backup_opt_progress
+				{ $$ = list_make1($1); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+list_files_opt_list:
+			backup_opt_tablespace
+				{ $$ = list_make1($1); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+list_wal_files_opt_list:
+			backup_opt_start_wal_loc backup_opt_end_wal_loc
+				{ $$ = list_make2($1, $2); }
+			;
+
+send_backup_files_opt_list:
+			send_backup_files_opt_list send_backup_files_opt
+				{ $$ = lappend($1, $2); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+backup_files:
+			'(' backup_files_list ')'
+				{ $$ = $2; }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+backup_files_list:
+			SCONST
+				{ $$ = list_make1(makeString($1)); }
+			| backup_files_list ',' SCONST
+				{ $$ = lappend($1, makeString($3)); }
+			;
+
+send_backup_files_opt:
+			backup_opt_chksum		{ $$ = $1; }
+			| backup_opt_start_wal_loc	{ $$ = $1; }
+			;
+
+backup_opt_label:
+	K_LABEL SCONST
+	{
+	  $$ = makeDefElem("label",
+					   (Node *)makeString($2), -1);
+	};
+
+backup_opt_progress:
+	K_PROGRESS
+	{
+	  $$ = makeDefElem("progress",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_fast:
+	K_FAST
+	{
+	  $$ = makeDefElem("fast",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_wal:
+	K_WAL
+	{
+	  $$ = makeDefElem("wal",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_nowait:
+	K_NOWAIT
+	{
+	  $$ = makeDefElem("nowait",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_maxrate:
+	K_MAX_RATE UCONST
+	{
+	  $$ = makeDefElem("max_rate",
+					   (Node *)makeInteger($2), -1);
+	};
+
+backup_opt_tsmap:
+	K_TABLESPACE_MAP
+	{
+	  $$ = makeDefElem("tablespace_map",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_chksum:
+	K_NOVERIFY_CHECKSUMS
+	{
+	  $$ = makeDefElem("noverify_checksums",
+					   (Node *)makeInteger(true), -1);
+	};
+
+backup_opt_start_wal_loc:
+	K_START_WAL_LOCATION SCONST
+	{
+	  $$ = makeDefElem("start_wal_location",
+					   (Node *)makeString($2), -1);
+	};
+
+backup_opt_end_wal_loc:
+	K_END_WAL_LOCATION SCONST
+	{
+	  $$ = makeDefElem("end_wal_location",
+					   (Node *)makeString($2), -1);
+	};
+
+backup_opt_tablespace:
+	SCONST
+	{
+		$$ = makeDefElem("tablespace", //tblspcname?
+						 (Node *)makeString($1), -1);
+	};
+
 create_replication_slot:
 			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
 			K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 14c9a1e798a..faa00cfd0ee 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,14 @@ EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
 WAIT				{ return K_WAIT; }
+START_BACKUP		{ return K_START_BACKUP; }
+LIST_FILES			{ return K_LIST_FILES; }
+LIST_TABLESPACES	{ return K_LIST_TABLESPACES; }
+SEND_FILES			{ return K_SEND_FILES; }
+STOP_BACKUP			{ return K_STOP_BACKUP; }
+LIST_WAL_FILES		{ return K_LIST_WAL_FILES; }
+START_WAL_LOCATION	{ return K_START_WAL_LOCATION; }
+END_WAL_LOCATION	{ return K_END_WAL_LOCATION; }
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 5456141a8ab..c046ea39ae9 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,16 @@ typedef enum ReplicationKind
 	REPLICATION_KIND_LOGICAL
 } ReplicationKind;
 
+typedef enum BackupCmdTag
+{
+	BASE_BACKUP,
+	START_BACKUP,
+	LIST_TABLESPACES,
+	LIST_FILES,
+	LIST_WAL_FILES,
+	SEND_FILES,
+	STOP_BACKUP
+} BackupCmdTag;
 
 /* ----------------------
  *		IDENTIFY_SYSTEM command
@@ -42,6 +52,8 @@ typedef struct BaseBackupCmd
 {
 	NodeTag		type;
 	List	   *options;
+	BackupCmdTag cmdtag;
+	List	   *backupfiles;
 } BaseBackupCmd;
 
 
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index e0210def6f3..3bc85d4c3e2 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,6 @@ typedef struct
 
 extern void SendBaseBackup(BaseBackupCmd *cmd);
 
-extern int64 sendTablespace(char *path, bool dryrun);
+extern int64 sendTablespace(char *path, bool dryrun, List **filelist);
 
 #endif							/* _BASEBACKUP_H */
-- 
2.21.1 (Apple Git-122.3)