incremental_basebackup_v0.patch
text/x-patch
Filename: incremental_basebackup_v0.patch
Type: text/x-patch
Part: 0
Message:
Re: block-level incremental backup
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: unified
Series: patch v0
| File | + | − |
|---|---|---|
| src/backend/access/transam/xlog.c | 1 | 1 |
| src/backend/replication/basebackup.c | 310 | 26 |
| src/backend/replication/repl_gram.y | 8 | 1 |
| src/backend/replication/repl_scanner.l | 1 | 0 |
| src/bin/pg_basebackup/pg_basebackup.c | 124 | 2 |
| src/include/replication/basebackup.h | 1 | 2 |
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 13e0d23..e757bba 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -10459,7 +10459,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
ti->oid = pstrdup(de->d_name);
ti->path = pstrdup(buflinkpath.data);
ti->rpath = relpath ? pstrdup(relpath) : NULL;
- ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+ ti->size = infotbssize ? sendTablespace(fullpath, true, InvalidXLogRecPtr) : -1;
if (tablespaces)
*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index c2978a9..3560da1 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -41,6 +41,7 @@
#include "utils/ps_status.h"
#include "utils/relcache.h"
#include "utils/timestamp.h"
+#include "utils/pg_lsn.h"
typedef struct
@@ -52,13 +53,22 @@ typedef struct
bool includewal;
uint32 maxrate;
bool sendtblspcmapfile;
+ XLogRecPtr prev_backup_start_lsn;
} basebackup_options;
static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
- List *tablespaces, bool sendtblspclinks);
+ List *tablespaces, bool sendtblspclinks, XLogRecPtr prev_backup_start_lsn);
static bool sendFile(const char *readfilename, const char *tarfilename,
- struct stat *statbuf, bool missing_ok, Oid dboid);
+ struct stat *statbuf, bool missing_ok, Oid dboid,
+ XLogRecPtr prev_backup_start_lsn);
+static bool sendFileMap(const char *readfilename, const char *tarfilename,
+ struct stat *statbuf, bool missing_ok, Oid dboid,
+ XLogRecPtr prev_backup_start_lsn, int *expected_write_size);
+static bool sendFilePartial(const char *readfilename, const char *tarfilename,
+ struct stat *statbuf, bool missing_ok, Oid dboid,
+ XLogRecPtr prev_backup_start_lsn, int expected_write_size);
+
static void sendFileWithContent(const char *filename, const char *content);
static int64 _tarWriteHeader(const char *filename, const char *linktarget,
struct stat *statbuf, bool sizeonly);
@@ -275,7 +285,8 @@ perform_base_backup(basebackup_options *opt)
/* Add a node for the base directory at the end */
ti = palloc0(sizeof(tablespaceinfo));
- ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+ ti->size = opt->progress ? sendDir(".", 1, true, tablespaces,
+ true, opt->prev_backup_start_lsn) : -1;
tablespaces = lappend(tablespaces, ti);
/* Send tablespace header */
@@ -331,10 +342,10 @@ perform_base_backup(basebackup_options *opt)
if (tblspc_map_file && opt->sendtblspcmapfile)
{
sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data);
- sendDir(".", 1, false, tablespaces, false);
+ sendDir(".", 1, false, tablespaces, false, opt->prev_backup_start_lsn);
}
else
- sendDir(".", 1, false, tablespaces, true);
+ sendDir(".", 1, false, tablespaces, true, opt->prev_backup_start_lsn);
/* ... and pg_control after everything else. */
if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -342,10 +353,10 @@ perform_base_backup(basebackup_options *opt)
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
XLOG_CONTROL_FILE)));
- sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+ sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid, InvalidXLogRecPtr);
}
else
- sendTablespace(ti->path, false);
+ sendTablespace(ti->path, false, opt->prev_backup_start_lsn);
/*
* If we're including WAL, and this is the main data directory we
@@ -592,7 +603,7 @@ perform_base_backup(basebackup_options *opt)
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", pathbuf)));
- sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid);
+ sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid, InvalidXLogRecPtr);
/* unconditionally mark file as archived */
StatusFilePath(pathbuf, fname, ".done");
@@ -650,6 +661,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_maxrate = false;
bool o_tablespace_map = false;
bool o_noverify_checksums = false;
+ bool o_prev_backup_start_lsn = false;
MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
@@ -738,6 +750,25 @@ parse_basebackup_options(List *options, basebackup_options *opt)
noverify_checksums = true;
o_noverify_checksums = true;
}
+ else if (strcmp(defel->defname, "prev_backup_start_lsn") == 0)
+ {
+ char *prev_backup_start_lsn_str;
+ XLogRecPtr prev_backup_start_lsn;
+ bool have_error = false;
+
+ if (o_prev_backup_start_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ prev_backup_start_lsn_str = strVal(defel->arg);
+ elog(WARNING, "prev_backup_start_lsn_str: %s", prev_backup_start_lsn_str);
+ prev_backup_start_lsn = pg_lsn_in_internal(prev_backup_start_lsn_str, &have_error);
+ //TODO handle parsing error
+
+ opt->prev_backup_start_lsn = (XLogRecPtr) prev_backup_start_lsn;
+ o_prev_backup_start_lsn = true;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
@@ -966,7 +997,9 @@ sendFileWithContent(const char *filename, const char *content)
* Only used to send auxiliary tablespaces, not PGDATA.
*/
int64
-sendTablespace(char *path, bool sizeonly)
+sendTablespace(char* path, bool sizeonly,
+ XLogRecPtr prev_backup_start_lsn)
+
{
int64 size;
char pathbuf[MAXPGPATH];
@@ -999,7 +1032,9 @@ sendTablespace(char *path, bool sizeonly)
sizeonly);
/* Send all the files in the tablespace version directory */
- size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true);
+
+ size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true,
+ prev_backup_start_lsn);
return size;
}
@@ -1018,7 +1053,7 @@ sendTablespace(char *path, bool sizeonly)
*/
static int64
sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
- bool sendtblspclinks)
+ bool sendtblspclinks, XLogRecPtr prev_backup_start_lsn)
{
DIR *dir;
struct dirent *de;
@@ -1294,7 +1329,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
skip_this_dir = true;
if (!skip_this_dir)
- size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
+ size += sendDir(pathbuf, basepathlen, sizeonly,
+ tablespaces, sendtblspclinks, prev_backup_start_lsn);
}
else if (S_ISREG(statbuf.st_mode))
{
@@ -1302,7 +1338,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
if (!sizeonly)
sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
- true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
+ true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid,
+ prev_backup_start_lsn);
if (sent || sizeonly)
{
@@ -1363,10 +1400,14 @@ is_checksummed_file(const char *fullpath, const char *filename)
*
* Returns true if the file was successfully sent, false if 'missing_ok',
* and the file did not exist.
+ *
+ * If prev_backup_start_lsn is not InvalidXLogRecPtr, send .partial file,
+ * containing blocks for incremental backup and .blockmap file.
*/
+
static bool
sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf,
- bool missing_ok, Oid dboid)
+ bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn)
{
FILE *fp;
BlockNumber blkno = 0;
@@ -1383,6 +1424,21 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
int segmentno = 0;
char *segmentpath;
bool verify_checksum = false;
+ bool file_has_map = false;
+ int expected_write_size = 0;
+
+ /* Send map, if requesred. */
+ if (prev_backup_start_lsn)
+ file_has_map = sendFileMap(readfilename, tarfilename, statbuf,
+ missing_ok, dboid, prev_backup_start_lsn, &expected_write_size);
+
+ /*
+ * If possible, send incremental version of file
+ * all non-relation files will be send in code below.
+ */
+ if (file_has_map)
+ return sendFilePartial(readfilename, tarfilename, statbuf,
+ missing_ok, dboid, prev_backup_start_lsn, expected_write_size);
fp = AllocateFile(readfilename, "rb");
if (fp == NULL)
@@ -1447,6 +1503,8 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
verify_checksum = false;
}
+ /* iterate over pages to get info we need.
+ * ither it is checksum verification or collecting a map */
if (verify_checksum)
{
for (i = 0; i < cnt / BLCKSZ; i++)
@@ -1468,15 +1526,15 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
if (phdr->pd_checksum != checksum)
{
/*
- * Retry the block on the first failure. It's
- * possible that we read the first 4K page of the
- * block just before postgres updated the entire block
- * so it ends up looking torn to us. We only need to
- * retry once because the LSN should be updated to
- * something we can ignore on the next pass. If the
- * error happens again then it is a true validation
- * failure.
- */
+ * Retry the block on the first failure. It's
+ * possible that we read the first 4K page of the
+ * block just before postgres updated the entire block
+ * so it ends up looking torn to us. We only need to
+ * retry once because the LSN should be updated to
+ * something we can ignore on the next pass. If the
+ * error happens again then it is a true validation
+ * failure.
+ */
if (block_retry == false)
{
/* Reread the failed block */
@@ -1484,7 +1542,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
{
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not fseek in file \"%s\": %m",
+ errmsg("could not fseek in file \"%s\": %m",
readfilename)));
}
@@ -1492,7 +1550,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
{
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not reread block %d of file \"%s\": %m",
+ errmsg("could not reread block %d of file \"%s\": %m",
blkno, readfilename)));
}
@@ -1500,7 +1558,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
{
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not fseek in file \"%s\": %m",
+ errmsg("could not fseek in file \"%s\": %m",
readfilename)));
}
@@ -1593,6 +1651,232 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
}
+static bool
+sendFileMap(const char *readfilename, const char *tarfilename, struct stat *statbuf,
+ bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn,
+ int *expected_write_size)
+{
+ FILE *fp;
+ BlockNumber blkno = 0;
+ char buf[TAR_SEND_SIZE];
+ off_t cnt;
+ int i;
+ pgoff_t len = 0;
+ char *page;
+ size_t pad;
+ char *tarfilename_blockmap = NULL;
+ BlockNumber *pagemap = NULL;
+ char *filename;
+ int statbuf_size = statbuf->st_size;
+ int pagemap_real_size;
+ int n_blocks_to_send = 0;
+
+ Assert(prev_backup_start_lsn != InvalidXLogRecPtr);
+
+ tarfilename_blockmap = psprintf("%s.blockmap", tarfilename);
+
+ /*
+ * Get the filename (excluding path). As last_dir_separator()
+ * includes the last directory separator, we chop that off by
+ * incrementing the pointer.
+
+ */
+ filename = last_dir_separator(readfilename) + 1;
+
+ /*
+ * Handle all non relation files here.
+ * Do nothing.
+ */
+ if (!is_checksummed_file(readfilename, filename) ||
+ !S_ISREG(statbuf->st_mode) ||
+ (filename[0] == 't' && isdigit(filename[1])) || // exclude all temp files
+ !isdigit(filename[0]) || // relfiles always start with number
+ strstr(filename, "_")) // exclude all fork files
+ {
+ elog(INFO, "sendFileMap %s, no datafile", filename);
+ return false;
+ }
+ elog(INFO, "sendFileMap %s, datafile", filename);
+
+ fp = AllocateFile(readfilename, "rb");
+ if (fp == NULL)
+ {
+ if (errno == ENOENT && missing_ok)
+ return false;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", readfilename)));
+ }
+
+ /* allocate pagemap of the size enough to write all file blocks */
+ pagemap = palloc0((statbuf->st_size / BLCKSZ)*sizeof(BlockNumber));
+
+ while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
+ {
+ /* iterate over pages to collect a map */
+ for (i = 0; i < cnt / BLCKSZ; i++)
+ {
+ page = buf + BLCKSZ * i;
+ /* add block to map */
+ if (!PageIsNew(page) && PageGetLSN(page) > prev_backup_start_lsn)
+ {
+ pagemap[n_blocks_to_send] = blkno;
+ elog(INFO, "expected_write_size %d add to map blkno %d pagemap[n_blocks_to_send] %d of file %s page lsn %X/%X prev_backup_start_lsn %X/%X",
+ *expected_write_size, blkno, pagemap[n_blocks_to_send], readfilename, (uint32) (PageGetLSN(page) >> 32), (uint32) PageGetLSN(page),
+ (uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn);
+ *expected_write_size += BLCKSZ;
+ n_blocks_to_send++;
+ }
+ blkno++;
+ }
+
+ len += cnt;
+
+ if (len >= statbuf->st_size)
+ {
+ /*
+ * Reached end of file. The file could be longer, if it was
+ * extended while we were sending it, but for a base backup we can
+ * ignore such extended data. It will be restored from WAL.
+ */
+ break;
+ }
+ }
+
+ pagemap_real_size = n_blocks_to_send*sizeof(BlockNumber);
+
+ statbuf->st_size = pagemap_real_size;
+ _tarWriteHeader(tarfilename_blockmap, NULL, statbuf, false);
+
+ if (pagemap_real_size)
+ {
+ pq_putmessage('d', (char *) pagemap, pagemap_real_size);
+
+ /*
+ * Pad to 512 byte boundary, per tar format requirements. (This small
+ * piece of data is probably not worth throttling.)
+ */
+ pad = ((pagemap_real_size + 511) & ~511) - pagemap_real_size;
+ if (pad > 0)
+ {
+ MemSet(buf, 0, pad);
+ pq_putmessage('d', buf, pad);
+ }
+ }
+
+ statbuf->st_size = statbuf_size;
+ FreeFile(fp);
+
+ pfree(pagemap);
+ return true;
+}
+
+static bool
+sendFilePartial(const char *readfilename, const char *tarfilename, struct stat *statbuf,
+ bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn,
+ int expected_write_size)
+{
+ FILE *fp;
+ BlockNumber blkno = 0;
+ char buf[TAR_SEND_SIZE];
+ char sendbuf[TAR_SEND_SIZE];
+ int n_blocks_to_send = 0;
+ off_t cnt;
+ int i;
+ pgoff_t len = 0;
+ char *page;
+ char *tarfilename_partial = NULL;
+ int pad;
+ int statbuf_size;
+ int write_len = 0;
+
+ Assert(prev_backup_start_lsn != InvalidXLogRecPtr);
+
+ tarfilename_partial = psprintf("%s.partial", tarfilename);
+
+ statbuf_size = statbuf->st_size;
+ statbuf->st_size = expected_write_size;
+ _tarWriteHeader(tarfilename_partial, NULL, statbuf, false);
+ statbuf->st_size = statbuf_size;
+
+ fp = AllocateFile(readfilename, "rb");
+ if (fp == NULL)
+ {
+ if (errno == ENOENT && missing_ok)
+ return false;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", readfilename)));
+ }
+
+ while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
+ {
+ /* iterate over pages to collect a map */
+ for (i = 0; i < cnt / BLCKSZ; i++)
+ {
+ page = buf + BLCKSZ * i;
+
+ if (!PageIsNew(page) && PageGetLSN(page) > prev_backup_start_lsn)
+ {
+ elog(INFO, "add to sendbuf blkno %d, n_blocks_to_send %d of file %s page lsn %X/%X prev_backup_start_lsn %X/%X",
+ blkno, n_blocks_to_send, readfilename, (uint32) (PageGetLSN(page) >> 32), (uint32) PageGetLSN(page),
+ (uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn);
+ memcpy(sendbuf + BLCKSZ * n_blocks_to_send, page, BLCKSZ);
+ n_blocks_to_send++;
+ }
+ blkno++;
+ }
+
+ {
+ elog(INFO, "send n_blocks_to_send %d of file %s",
+ n_blocks_to_send, readfilename);
+ /* Send the chunk as a CopyData message */
+ write_len += n_blocks_to_send*BLCKSZ;
+ if (pq_putmessage('d', sendbuf, n_blocks_to_send*BLCKSZ))
+ ereport(ERROR,
+ (errmsg("base backup could not send data, aborting backup")));
+ n_blocks_to_send = 0;
+ }
+
+ len += cnt;
+
+ if (len >= statbuf->st_size)
+ {
+ /*
+ * Reached end of file. The file could be longer, if it was
+ * extended while we were sending it, but for a base backup we can
+ * ignore such extended data. It will be restored from WAL.
+ */
+ break;
+ }
+ }
+
+ if (write_len < expected_write_size)
+ {
+ MemSet(buf, 0, sizeof(buf));
+ while (write_len < expected_write_size)
+ {
+ cnt = Min(sizeof(buf), expected_write_size - write_len);
+ pq_putmessage('d', buf, cnt);
+ write_len += cnt;
+ throttle(cnt);
+ }
+ }
+
+ /* Pad to 512 byte boundary, per tar format requirements */
+ pad = ((write_len + 511) & ~511) - write_len;
+ if (pad > 0)
+ {
+ char buf[512];
+
+ MemSet(buf, 0, pad);
+ pq_putmessage('d', buf, pad);
+ }
+
+ FreeFile(fp);
+ return true;
+}
+
static int64
_tarWriteHeader(const char *filename, const char *linktarget,
struct stat *statbuf, bool sizeonly)
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc..cb883a8 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,6 +87,7 @@ static SQLCmd *make_sqlcmd(void);
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
+%token K_PREV_BACKUP_START_LSN
%type <node> command
%type <node> base_backup start_replication start_logical_replication
@@ -103,6 +104,7 @@ static SQLCmd *make_sqlcmd(void);
%type <list> create_slot_opt_list
%type <defelt> create_slot_opt
+
%%
firstcmd: command opt_semicolon
@@ -155,7 +157,7 @@ var_name: IDENT { $$ = $1; }
/*
* BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
- * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS]
+ * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS] [K_PREV_BACKUP_START_LSN 'start_lsn']
*/
base_backup:
K_BASE_BACKUP base_backup_opt_list
@@ -213,6 +215,11 @@ base_backup_opt:
{
$$ = makeDefElem("noverify_checksums",
(Node *)makeInteger(true), -1);
+ }
+ | K_PREV_BACKUP_START_LSN SCONST
+ {
+ $$ = makeDefElem("prev_backup_start_lsn",
+ (Node *)makeString($2), -1);
}
;
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb..042e148 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,7 @@ EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
WAIT { return K_WAIT; }
+PREV_BACKUP_START_LSN { return K_PREV_BACKUP_START_LSN; }
"," { return ','; }
";" { return ';'; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 15f43f9..bd2930e 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -106,6 +106,12 @@ static bool create_slot = false;
static bool no_slot = false;
static bool verify_checksums = true;
+
+static char* prev_backup_start_lsn = NULL;
+static char* prev_backup_start_lsn_str = NULL;
+static char* incremental_basedir = NULL;
+static bool merge_backups = false;
+
static bool success = false;
static bool made_new_pgdata = false;
static bool found_existing_pgdata = false;
@@ -150,6 +156,7 @@ static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void GenerateRecoveryConf(PGconn *conn);
static void WriteRecoveryConf(void);
static void BaseBackup(void);
+static void MergeBackups(void);
static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
bool segment_finished);
@@ -1473,6 +1480,9 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
*/
snprintf(filename, sizeof(filename), "%s/%s", current_path,
copybuf);
+
+ pg_log_info("filename %s current_len_left %ld", filename, current_len_left);
+
if (filename[strlen(filename) - 1] == '/')
{
/*
@@ -1863,8 +1873,12 @@ BaseBackup(void)
fprintf(stderr, "\n");
}
+ if (prev_backup_start_lsn)
+ prev_backup_start_lsn_str = psprintf("PREV_BACKUP_START_LSN \'%s\'",
+ prev_backup_start_lsn);
+
basebkp =
- psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+ psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
includewal == FETCH_WAL ? "WAL" : "",
@@ -1872,7 +1886,9 @@ BaseBackup(void)
includewal == NO_WAL ? "" : "NOWAIT",
maxrate_clause ? maxrate_clause : "",
format == 't' ? "TABLESPACE_MAP" : "",
- verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+ verify_checksums ? "" : "NOVERIFY_CHECKSUMS",
+ prev_backup_start_lsn_str?prev_backup_start_lsn_str:""
+ );
if (PQsendQuery(conn, basebkp) == 0)
{
@@ -2158,6 +2174,88 @@ BaseBackup(void)
pg_log_info("base backup completed");
}
+static void
+walkdir(const char *path, const char *basepath)
+{
+ DIR *dir;
+ struct dirent *de;
+
+ dir = opendir(path);
+ if (dir == NULL)
+ {
+ pg_log_error("could not open directory \"%s\": %m", path);
+ return;
+ }
+
+ while (errno = 0, (de = readdir(dir)) != NULL)
+ {
+ char subpath[MAXPGPATH * 2];
+ char basesubpath[MAXPGPATH * 2];
+ struct stat fst;
+ int sret;
+
+ if (strcmp(de->d_name, ".") == 0 ||
+ strcmp(de->d_name, "..") == 0)
+ continue;
+
+ snprintf(subpath, sizeof(subpath), "%s/%s", path, de->d_name);
+ snprintf(basesubpath, sizeof(subpath), "%s/%s", basepath, de->d_name);
+
+ /* Don't process symlinks */
+ sret = lstat(subpath, &fst);
+
+ if (sret < 0)
+ {
+ pg_log_error("could not stat file \"%s\": %m", subpath);
+ continue;
+ }
+
+ if (S_ISREG(fst.st_mode))
+ {
+ char basicfilename[MAXPGPATH * 2];
+ char topath[MAXPGPATH * 2];
+ char *partial_suffix = NULL;
+ if ((partial_suffix = strstr(de->d_name, ".partial")) != NULL) //handle incremental files
+ {
+ char mappath[MAXPGPATH * 2];
+
+ strncpy(&basicfilename, de->d_name, partial_suffix - de->d_name);
+
+ snprintf(mappath, sizeof(mappath), "%s.blockmap", basicfilename);
+ partial_suffix = strstr(basesubpath, ".partial");
+ pg_log_info("incremental basic %s, map %s, partial %s",
+ basicfilename, mappath, de->d_name);
+
+ strncpy(&topath, basesubpath, partial_suffix - basesubpath);
+ pg_log_info("incremental move from %s to %s", subpath, topath);
+ }
+ else if (!strstr(de->d_name, ".blockmap")) //skip .blockmap files
+ {
+ pg_log_info("non-incremental move from %s to %s", subpath, basesubpath);
+ }
+ }
+ else if (S_ISDIR(fst.st_mode))
+ walkdir(subpath, basesubpath);
+ }
+
+ if (errno)
+ pg_log_error("could not read directory \"%s\": %m", path);
+
+ (void) closedir(dir);
+}
+
+static void
+MergeBackups(void)
+{
+ /*
+ * walk all files in incremental_basedir
+ * For files that doesn't have ".blockmap",
+ * just replace file in a basedir with a new one.
+ * For files that have ".blockmap"
+ * read incremental file block by block and update file in basedir
+ */
+ walkdir(incremental_basedir, basedir);
+}
int
main(int argc, char **argv)
@@ -2191,6 +2289,9 @@ main(int argc, char **argv)
{"waldir", required_argument, NULL, 1},
{"no-slot", no_argument, NULL, 2},
{"no-verify-checksums", no_argument, NULL, 3},
+ {"prev-backup-start-lsn", required_argument, NULL, 5},
+ {"incremental-pgdata", required_argument, NULL, 6},
+ {"merge-backups", no_argument, NULL, 7},
{NULL, 0, NULL, 0}
};
int c;
@@ -2359,6 +2460,15 @@ main(int argc, char **argv)
case 3:
verify_checksums = false;
break;
+ case 5:
+ prev_backup_start_lsn = pg_strdup(optarg);
+ break;
+ case 6:
+ incremental_basedir = pg_strdup(optarg);
+ break;
+ case 7:
+ merge_backups = true;
+ break;
default:
/*
@@ -2393,6 +2503,18 @@ main(int argc, char **argv)
exit(1);
}
+ if (merge_backups && (incremental_basedir == NULL))
+ {
+ pg_log_error("no target incremental directory specified");
+ exit(1);
+ }
+
+ if (merge_backups && incremental_basedir)
+ {
+ MergeBackups();
+ return 0;
+ }
+
/*
* Mutually exclusive arguments
*/
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index 503a5b9..974c126 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,5 @@ typedef struct
extern void SendBaseBackup(BaseBackupCmd *cmd);
-extern int64 sendTablespace(char *path, bool sizeonly);
-
+extern int64 sendTablespace(char *path, bool sizeonly, XLogRecPtr prev_backup_start_lsn);
#endif /* _BASEBACKUP_H */