incremental_basebackup_v0.patch

text/x-patch

Filename: incremental_basebackup_v0.patch
Type: text/x-patch
Part: 0
Message: Re: block-level incremental backup

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: unified
Series: patch v0
File+
src/backend/access/transam/xlog.c 1 1
src/backend/replication/basebackup.c 310 26
src/backend/replication/repl_gram.y 8 1
src/backend/replication/repl_scanner.l 1 0
src/bin/pg_basebackup/pg_basebackup.c 124 2
src/include/replication/basebackup.h 1 2
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 13e0d23..e757bba 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -10459,7 +10459,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 			ti->oid = pstrdup(de->d_name);
 			ti->path = pstrdup(buflinkpath.data);
 			ti->rpath = relpath ? pstrdup(relpath) : NULL;
-			ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+			ti->size = infotbssize ? sendTablespace(fullpath, true, InvalidXLogRecPtr) : -1;
 
 			if (tablespaces)
 				*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index c2978a9..3560da1 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -41,6 +41,7 @@
 #include "utils/ps_status.h"
 #include "utils/relcache.h"
 #include "utils/timestamp.h"
+#include "utils/pg_lsn.h"
 
 
 typedef struct
@@ -52,13 +53,22 @@ typedef struct
 	bool		includewal;
 	uint32		maxrate;
 	bool		sendtblspcmapfile;
+	XLogRecPtr	prev_backup_start_lsn;
 } basebackup_options;
 
 
 static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
-					 List *tablespaces, bool sendtblspclinks);
+					 List *tablespaces, bool sendtblspclinks, XLogRecPtr prev_backup_start_lsn);
 static bool sendFile(const char *readfilename, const char *tarfilename,
-					 struct stat *statbuf, bool missing_ok, Oid dboid);
+					 struct stat *statbuf, bool missing_ok, Oid dboid,
+					 XLogRecPtr prev_backup_start_lsn);
+static bool sendFileMap(const char *readfilename, const char *tarfilename,
+		struct stat *statbuf, bool missing_ok, Oid dboid,
+		XLogRecPtr prev_backup_start_lsn, int *expected_write_size);
+static bool sendFilePartial(const char *readfilename, const char *tarfilename,
+		struct stat *statbuf, bool missing_ok, Oid dboid,
+		XLogRecPtr prev_backup_start_lsn, int expected_write_size);
+
 static void sendFileWithContent(const char *filename, const char *content);
 static int64 _tarWriteHeader(const char *filename, const char *linktarget,
 							 struct stat *statbuf, bool sizeonly);
@@ -275,7 +285,8 @@ perform_base_backup(basebackup_options *opt)
 
 		/* Add a node for the base directory at the end */
 		ti = palloc0(sizeof(tablespaceinfo));
-		ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+		ti->size = opt->progress ? sendDir(".", 1, true, tablespaces,
+										   true, opt->prev_backup_start_lsn) : -1;
 		tablespaces = lappend(tablespaces, ti);
 
 		/* Send tablespace header */
@@ -331,10 +342,10 @@ perform_base_backup(basebackup_options *opt)
 				if (tblspc_map_file && opt->sendtblspcmapfile)
 				{
 					sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data);
-					sendDir(".", 1, false, tablespaces, false);
+					sendDir(".", 1, false, tablespaces, false, opt->prev_backup_start_lsn);
 				}
 				else
-					sendDir(".", 1, false, tablespaces, true);
+					sendDir(".", 1, false, tablespaces, true, opt->prev_backup_start_lsn);
 
 				/* ... and pg_control after everything else. */
 				if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -342,10 +353,10 @@ perform_base_backup(basebackup_options *opt)
 							(errcode_for_file_access(),
 							 errmsg("could not stat file \"%s\": %m",
 									XLOG_CONTROL_FILE)));
-				sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+				sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid, InvalidXLogRecPtr);
 			}
 			else
-				sendTablespace(ti->path, false);
+				sendTablespace(ti->path, false, opt->prev_backup_start_lsn);
 
 			/*
 			 * If we're including WAL, and this is the main data directory we
@@ -592,7 +603,7 @@ perform_base_backup(basebackup_options *opt)
 						(errcode_for_file_access(),
 						 errmsg("could not stat file \"%s\": %m", pathbuf)));
 
-			sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid);
+			sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid, InvalidXLogRecPtr);
 
 			/* unconditionally mark file as archived */
 			StatusFilePath(pathbuf, fname, ".done");
@@ -650,6 +661,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_maxrate = false;
 	bool		o_tablespace_map = false;
 	bool		o_noverify_checksums = false;
+	bool		o_prev_backup_start_lsn = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	foreach(lopt, options)
@@ -738,6 +750,25 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			noverify_checksums = true;
 			o_noverify_checksums = true;
 		}
+		else if (strcmp(defel->defname, "prev_backup_start_lsn") == 0)
+		{
+			char *prev_backup_start_lsn_str;
+			XLogRecPtr prev_backup_start_lsn;
+			bool		have_error = false;
+
+			if (o_prev_backup_start_lsn)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+
+			prev_backup_start_lsn_str = strVal(defel->arg);
+			elog(WARNING, "prev_backup_start_lsn_str: %s", prev_backup_start_lsn_str);
+			prev_backup_start_lsn = pg_lsn_in_internal(prev_backup_start_lsn_str, &have_error);
+			//TODO handle parsing error
+
+			opt->prev_backup_start_lsn = (XLogRecPtr) prev_backup_start_lsn;
+			o_prev_backup_start_lsn = true;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
@@ -966,7 +997,9 @@ sendFileWithContent(const char *filename, const char *content)
  * Only used to send auxiliary tablespaces, not PGDATA.
  */
 int64
-sendTablespace(char *path, bool sizeonly)
+sendTablespace(char* path, bool sizeonly,
+					   XLogRecPtr prev_backup_start_lsn)
+
 {
 	int64		size;
 	char		pathbuf[MAXPGPATH];
@@ -999,7 +1032,9 @@ sendTablespace(char *path, bool sizeonly)
 						   sizeonly);
 
 	/* Send all the files in the tablespace version directory */
-	size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true);
+
+	size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true,
+							prev_backup_start_lsn);
 
 	return size;
 }
@@ -1018,7 +1053,7 @@ sendTablespace(char *path, bool sizeonly)
  */
 static int64
 sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
-		bool sendtblspclinks)
+		bool sendtblspclinks, XLogRecPtr prev_backup_start_lsn)
 {
 	DIR		   *dir;
 	struct dirent *de;
@@ -1294,7 +1329,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 				skip_this_dir = true;
 
 			if (!skip_this_dir)
-				size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
+				size += sendDir(pathbuf, basepathlen, sizeonly,
+										tablespaces, sendtblspclinks, prev_backup_start_lsn);
 		}
 		else if (S_ISREG(statbuf.st_mode))
 		{
@@ -1302,7 +1338,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 
 			if (!sizeonly)
 				sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
-								true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
+								true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid,
+										prev_backup_start_lsn);
 
 			if (sent || sizeonly)
 			{
@@ -1363,10 +1400,14 @@ is_checksummed_file(const char *fullpath, const char *filename)
  *
  * Returns true if the file was successfully sent, false if 'missing_ok',
  * and the file did not exist.
+ *
+ * If prev_backup_start_lsn is not InvalidXLogRecPtr, send .partial file,
+ * containing blocks for incremental backup and .blockmap file.
  */
+
 static bool
 sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf,
-		 bool missing_ok, Oid dboid)
+		 bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn)
 {
 	FILE	   *fp;
 	BlockNumber blkno = 0;
@@ -1383,6 +1424,21 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 	int			segmentno = 0;
 	char	   *segmentpath;
 	bool		verify_checksum = false;
+	bool		file_has_map = false;
+	int expected_write_size = 0;
+
+	/* Send map, if requesred. */
+	if (prev_backup_start_lsn)
+		file_has_map = sendFileMap(readfilename, tarfilename, statbuf,
+					missing_ok, dboid, prev_backup_start_lsn, &expected_write_size);
+
+	/*
+	 * If possible, send incremental version of file
+	 * all non-relation files will be send in code below.
+	 */
+	if (file_has_map)
+		return sendFilePartial(readfilename, tarfilename, statbuf,
+					missing_ok, dboid, prev_backup_start_lsn, expected_write_size);
 
 	fp = AllocateFile(readfilename, "rb");
 	if (fp == NULL)
@@ -1447,6 +1503,8 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 			verify_checksum = false;
 		}
 
+		/* iterate over pages to get info we need.
+		 * ither it is checksum verification or collecting a map  */
 		if (verify_checksum)
 		{
 			for (i = 0; i < cnt / BLCKSZ; i++)
@@ -1468,15 +1526,15 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 					if (phdr->pd_checksum != checksum)
 					{
 						/*
-						 * Retry the block on the first failure.  It's
-						 * possible that we read the first 4K page of the
-						 * block just before postgres updated the entire block
-						 * so it ends up looking torn to us.  We only need to
-						 * retry once because the LSN should be updated to
-						 * something we can ignore on the next pass.  If the
-						 * error happens again then it is a true validation
-						 * failure.
-						 */
+						* Retry the block on the first failure.  It's
+						* possible that we read the first 4K page of the
+						* block just before postgres updated the entire block
+						* so it ends up looking torn to us.  We only need to
+						* retry once because the LSN should be updated to
+						* something we can ignore on the next pass.  If the
+						* error happens again then it is a true validation
+						* failure.
+						*/
 						if (block_retry == false)
 						{
 							/* Reread the failed block */
@@ -1484,7 +1542,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 							{
 								ereport(ERROR,
 										(errcode_for_file_access(),
-										 errmsg("could not fseek in file \"%s\": %m",
+										errmsg("could not fseek in file \"%s\": %m",
 												readfilename)));
 							}
 
@@ -1492,7 +1550,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 							{
 								ereport(ERROR,
 										(errcode_for_file_access(),
-										 errmsg("could not reread block %d of file \"%s\": %m",
+										errmsg("could not reread block %d of file \"%s\": %m",
 												blkno, readfilename)));
 							}
 
@@ -1500,7 +1558,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 							{
 								ereport(ERROR,
 										(errcode_for_file_access(),
-										 errmsg("could not fseek in file \"%s\": %m",
+										errmsg("could not fseek in file \"%s\": %m",
 												readfilename)));
 							}
 
@@ -1593,6 +1651,232 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 }
 
 
+static bool
+sendFileMap(const char *readfilename, const char *tarfilename, struct stat *statbuf,
+		 bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn,
+		 int *expected_write_size)
+{
+	FILE	   *fp;
+	BlockNumber blkno = 0;
+	char		buf[TAR_SEND_SIZE];
+	off_t		cnt;
+	int			i;
+	pgoff_t		len = 0;
+	char	   *page;
+	size_t		pad;
+	char		*tarfilename_blockmap = NULL;
+	BlockNumber *pagemap = NULL;
+	char	   *filename;
+	int			statbuf_size = statbuf->st_size;
+	int 		pagemap_real_size;
+	int 		n_blocks_to_send = 0;
+
+	Assert(prev_backup_start_lsn != InvalidXLogRecPtr);
+
+	tarfilename_blockmap = psprintf("%s.blockmap", tarfilename);
+
+	/*
+	 * Get the filename (excluding path).  As last_dir_separator()
+	 * includes the last directory separator, we chop that off by
+	 * incrementing the pointer.
+
+	 */
+	filename = last_dir_separator(readfilename) + 1;
+
+	/*
+	 * Handle all non relation files here.
+	 * Do nothing.
+	 */
+	if (!is_checksummed_file(readfilename, filename) ||
+		!S_ISREG(statbuf->st_mode) ||
+		(filename[0] == 't' && isdigit(filename[1])) || // exclude all temp files
+		!isdigit(filename[0]) || // relfiles always start with number
+		strstr(filename, "_")) // exclude all fork files
+	{
+		elog(INFO, "sendFileMap %s, no datafile", filename);
+		return false;
+	}
+	elog(INFO, "sendFileMap %s, datafile", filename);
+
+	fp = AllocateFile(readfilename, "rb");
+	if (fp == NULL)
+	{
+		if (errno == ENOENT && missing_ok)
+			return false;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m", readfilename)));
+	}
+
+	/* allocate pagemap of the size enough to write all file blocks */
+	pagemap = palloc0((statbuf->st_size / BLCKSZ)*sizeof(BlockNumber));
+
+	while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
+	{
+		/* iterate over pages to collect a map */
+		for (i = 0; i < cnt / BLCKSZ; i++)
+		{
+			page = buf + BLCKSZ * i;
+			/* add block to map */
+			if (!PageIsNew(page) && PageGetLSN(page) > prev_backup_start_lsn)
+			{
+				pagemap[n_blocks_to_send] = blkno;
+				elog(INFO, "expected_write_size %d add to map blkno %d pagemap[n_blocks_to_send] %d of file %s page lsn %X/%X prev_backup_start_lsn %X/%X",
+						*expected_write_size, blkno, pagemap[n_blocks_to_send], readfilename, (uint32) (PageGetLSN(page) >> 32), (uint32) PageGetLSN(page),
+						(uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn);
+				*expected_write_size += BLCKSZ;
+				n_blocks_to_send++;
+			}
+			blkno++;
+		}
+
+		len += cnt;
+
+		if (len >= statbuf->st_size)
+		{
+			/*
+			 * Reached end of file. The file could be longer, if it was
+			 * extended while we were sending it, but for a base backup we can
+			 * ignore such extended data. It will be restored from WAL.
+			 */
+			break;
+		}
+	}
+
+	pagemap_real_size = n_blocks_to_send*sizeof(BlockNumber);
+
+	statbuf->st_size = pagemap_real_size;
+	_tarWriteHeader(tarfilename_blockmap, NULL, statbuf, false);
+
+	if (pagemap_real_size)
+	{
+		pq_putmessage('d', (char *) pagemap, pagemap_real_size);
+
+		/*
+		* Pad to 512 byte boundary, per tar format requirements. (This small
+		* piece of data is probably not worth throttling.)
+		*/
+		pad = ((pagemap_real_size + 511) & ~511) - pagemap_real_size;
+		if (pad > 0)
+		{
+			MemSet(buf, 0, pad);
+			pq_putmessage('d', buf, pad);
+		}
+	}
+
+	statbuf->st_size = statbuf_size;
+	FreeFile(fp);
+
+	pfree(pagemap);
+	return true;
+}
+
+static bool
+sendFilePartial(const char *readfilename, const char *tarfilename, struct stat *statbuf,
+		 bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn,
+		 int expected_write_size)
+{
+	FILE	   *fp;
+	BlockNumber blkno = 0;
+	char		buf[TAR_SEND_SIZE];
+	char		sendbuf[TAR_SEND_SIZE];
+	int			n_blocks_to_send = 0;
+	off_t		cnt;
+	int			i;
+	pgoff_t		len = 0;
+	char	   *page;
+	char		*tarfilename_partial = NULL;
+	int			pad;
+	int			statbuf_size;
+	int			write_len = 0;
+
+	Assert(prev_backup_start_lsn != InvalidXLogRecPtr);
+
+	tarfilename_partial = psprintf("%s.partial", tarfilename);
+
+	statbuf_size = statbuf->st_size;
+	statbuf->st_size = expected_write_size;
+	_tarWriteHeader(tarfilename_partial, NULL, statbuf, false);
+	statbuf->st_size = statbuf_size;
+
+	fp = AllocateFile(readfilename, "rb");
+	if (fp == NULL)
+	{
+		if (errno == ENOENT && missing_ok)
+			return false;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m", readfilename)));
+	}
+
+	while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
+	{
+		/* iterate over pages to collect a map */
+		for (i = 0; i < cnt / BLCKSZ; i++)
+		{
+			page = buf + BLCKSZ * i;
+
+			if (!PageIsNew(page) && PageGetLSN(page) > prev_backup_start_lsn)
+			{
+				elog(INFO, "add to sendbuf blkno %d, n_blocks_to_send %d of file %s page lsn %X/%X prev_backup_start_lsn %X/%X",
+						blkno, n_blocks_to_send, readfilename, (uint32) (PageGetLSN(page) >> 32), (uint32) PageGetLSN(page),
+						(uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn);
+				memcpy(sendbuf + BLCKSZ * n_blocks_to_send, page, BLCKSZ);
+				n_blocks_to_send++;
+			}
+			blkno++;
+		}
+
+		{
+			elog(INFO, "send n_blocks_to_send %d of file %s",
+							n_blocks_to_send, readfilename);
+			/* Send the chunk as a CopyData message */
+			write_len += n_blocks_to_send*BLCKSZ;
+			if (pq_putmessage('d', sendbuf, n_blocks_to_send*BLCKSZ))
+				ereport(ERROR,
+						(errmsg("base backup could not send data, aborting backup")));
+			n_blocks_to_send = 0;
+		}
+
+		len += cnt;
+
+		if (len >= statbuf->st_size)
+		{
+			/*
+			 * Reached end of file. The file could be longer, if it was
+			 * extended while we were sending it, but for a base backup we can
+			 * ignore such extended data. It will be restored from WAL.
+			 */
+			break;
+		}
+	}
+
+	if (write_len < expected_write_size)
+	{
+		MemSet(buf, 0, sizeof(buf));
+		while (write_len < expected_write_size)
+		{
+			cnt = Min(sizeof(buf), expected_write_size - write_len);
+			pq_putmessage('d', buf, cnt);
+			write_len += cnt;
+			throttle(cnt);
+		}
+	}
+
+	/* Pad to 512 byte boundary, per tar format requirements */
+	pad = ((write_len + 511) & ~511) - write_len;
+	if (pad > 0)
+	{
+		char		buf[512];
+
+		MemSet(buf, 0, pad);
+		pq_putmessage('d', buf, pad);
+	}
+
+	FreeFile(fp);
+	return true;
+}
+
 static int64
 _tarWriteHeader(const char *filename, const char *linktarget,
 				struct stat *statbuf, bool sizeonly)
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc..cb883a8 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,6 +87,7 @@ static SQLCmd *make_sqlcmd(void);
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_PREV_BACKUP_START_LSN
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -103,6 +104,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
 
+
 %%
 
 firstcmd: command opt_semicolon
@@ -155,7 +157,7 @@ var_name:	IDENT	{ $$ = $1; }
 
 /*
  * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
- * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS]
+ * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS] [K_PREV_BACKUP_START_LSN 'start_lsn']
  */
 base_backup:
 			K_BASE_BACKUP base_backup_opt_list
@@ -213,6 +215,11 @@ base_backup_opt:
 				{
 				  $$ = makeDefElem("noverify_checksums",
 								   (Node *)makeInteger(true), -1);
+			}
+			| K_PREV_BACKUP_START_LSN SCONST
+				{
+				  $$ = makeDefElem("prev_backup_start_lsn",
+								   (Node *)makeString($2), -1);
 				}
 			;
 
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb..042e148 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,7 @@ EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
 WAIT				{ return K_WAIT; }
+PREV_BACKUP_START_LSN	{ return K_PREV_BACKUP_START_LSN; }
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 15f43f9..bd2930e 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -106,6 +106,12 @@ static bool create_slot = false;
 static bool no_slot = false;
 static bool verify_checksums = true;
 
+
+static char* prev_backup_start_lsn = NULL;
+static char* prev_backup_start_lsn_str = NULL;
+static char* incremental_basedir = NULL;
+static bool merge_backups = false;
+
 static bool success = false;
 static bool made_new_pgdata = false;
 static bool found_existing_pgdata = false;
@@ -150,6 +156,7 @@ static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void GenerateRecoveryConf(PGconn *conn);
 static void WriteRecoveryConf(void);
 static void BaseBackup(void);
+static void MergeBackups(void);
 
 static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 								 bool segment_finished);
@@ -1473,6 +1480,9 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 			 */
 			snprintf(filename, sizeof(filename), "%s/%s", current_path,
 					 copybuf);
+
+			pg_log_info("filename %s current_len_left %ld", filename, current_len_left);
+
 			if (filename[strlen(filename) - 1] == '/')
 			{
 				/*
@@ -1863,8 +1873,12 @@ BaseBackup(void)
 			fprintf(stderr, "\n");
 	}
 
+	if (prev_backup_start_lsn)
+		prev_backup_start_lsn_str = psprintf("PREV_BACKUP_START_LSN \'%s\'",
+											 prev_backup_start_lsn);
+
 	basebkp =
-		psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+		psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s %s",
 				 escaped_label,
 				 showprogress ? "PROGRESS" : "",
 				 includewal == FETCH_WAL ? "WAL" : "",
@@ -1872,7 +1886,9 @@ BaseBackup(void)
 				 includewal == NO_WAL ? "" : "NOWAIT",
 				 maxrate_clause ? maxrate_clause : "",
 				 format == 't' ? "TABLESPACE_MAP" : "",
-				 verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+				 verify_checksums ? "" : "NOVERIFY_CHECKSUMS",
+				 prev_backup_start_lsn_str?prev_backup_start_lsn_str:""
+				);
 
 	if (PQsendQuery(conn, basebkp) == 0)
 	{
@@ -2158,6 +2174,88 @@ BaseBackup(void)
 		pg_log_info("base backup completed");
 }
 
+static void
+walkdir(const char *path, const char *basepath)
+{
+	DIR		   *dir;
+	struct dirent *de;
+
+	dir = opendir(path);
+	if (dir == NULL)
+	{
+		pg_log_error("could not open directory \"%s\": %m", path);
+		return;
+	}
+
+	while (errno = 0, (de = readdir(dir)) != NULL)
+	{
+		char		subpath[MAXPGPATH * 2];
+		char		basesubpath[MAXPGPATH * 2];
+		struct stat fst;
+		int			sret;
+
+		if (strcmp(de->d_name, ".") == 0 ||
+			strcmp(de->d_name, "..") == 0)
+			continue;
+
+		snprintf(subpath, sizeof(subpath), "%s/%s", path, de->d_name);
+		snprintf(basesubpath, sizeof(subpath), "%s/%s", basepath, de->d_name);
+
+		/* Don't process symlinks */
+		sret = lstat(subpath, &fst);
+
+		if (sret < 0)
+		{
+			pg_log_error("could not stat file \"%s\": %m", subpath);
+			continue;
+		}
+
+		if (S_ISREG(fst.st_mode))
+		{
+			char	basicfilename[MAXPGPATH * 2];
+			char	topath[MAXPGPATH * 2];
+			char *partial_suffix = NULL;
+			if ((partial_suffix = strstr(de->d_name, ".partial")) != NULL) //handle incremental files
+			{
+				char	mappath[MAXPGPATH * 2];
+
+				strncpy(&basicfilename, de->d_name, partial_suffix - de->d_name);
+
+				snprintf(mappath, sizeof(mappath), "%s.blockmap", basicfilename);
+				partial_suffix = strstr(basesubpath, ".partial");
+				pg_log_info("incremental basic %s, map %s, partial %s",
+							basicfilename, mappath, de->d_name);
+
+				strncpy(&topath, basesubpath, partial_suffix - basesubpath);
+				pg_log_info("incremental move from %s to %s", subpath, topath);
+			}
+			else if (!strstr(de->d_name, ".blockmap")) //skip .blockmap files
+			{
+				pg_log_info("non-incremental move from %s to %s", subpath, basesubpath);
+			}
+		}
+		else if (S_ISDIR(fst.st_mode))
+			walkdir(subpath, basesubpath);
+	}
+
+	if (errno)
+		pg_log_error("could not read directory \"%s\": %m", path);
+
+	(void) closedir(dir);
+}
+
+static void
+MergeBackups(void)
+{
+	/*
+	 * walk all files in incremental_basedir
+	 * For files that doesn't have ".blockmap",
+	 * just replace file in a basedir with a new one.
+	 * For files that have ".blockmap"
+	 * read incremental file block by block and update file in basedir
+	 */
+	walkdir(incremental_basedir, basedir);
+}
 
 int
 main(int argc, char **argv)
@@ -2191,6 +2289,9 @@ main(int argc, char **argv)
 		{"waldir", required_argument, NULL, 1},
 		{"no-slot", no_argument, NULL, 2},
 		{"no-verify-checksums", no_argument, NULL, 3},
+		{"prev-backup-start-lsn", required_argument, NULL, 5},
+		{"incremental-pgdata", required_argument, NULL, 6},
+		{"merge-backups", no_argument, NULL, 7},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -2359,6 +2460,15 @@ main(int argc, char **argv)
 			case 3:
 				verify_checksums = false;
 				break;
+			case 5:
+				prev_backup_start_lsn = pg_strdup(optarg);
+				break;
+			case 6:
+				incremental_basedir = pg_strdup(optarg);
+				break;
+			case 7:
+				merge_backups = true;
+				break;
 			default:
 
 				/*
@@ -2393,6 +2503,18 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (merge_backups && (incremental_basedir == NULL))
+	{
+		pg_log_error("no target incremental directory specified");
+		exit(1);
+	}
+
+	if (merge_backups && incremental_basedir)
+	{
+		MergeBackups();
+		return 0;
+	}
+
 	/*
 	 * Mutually exclusive arguments
 	 */
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index 503a5b9..974c126 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,5 @@ typedef struct
 
 extern void SendBaseBackup(BaseBackupCmd *cmd);
 
-extern int64 sendTablespace(char *path, bool sizeonly);
-
+extern int64 sendTablespace(char *path, bool sizeonly, XLogRecPtr prev_backup_start_lsn);
 #endif							/* _BASEBACKUP_H */