From ffa6d0946af34d78e59eb5b82f1572f2537fffeb Mon Sep 17 00:00:00 2001 From: Asif Rehman Date: Wed, 21 Aug 2019 18:35:45 +0500 Subject: [PATCH] Initial POC on parallel backup --- src/backend/replication/basebackup.c | 765 +++++++++++++++++-------- src/backend/replication/repl_gram.y | 53 +- src/backend/replication/repl_scanner.l | 4 + src/bin/pg_basebackup/pg_basebackup.c | 285 ++++++++- src/include/nodes/replnodes.h | 8 + 5 files changed, 865 insertions(+), 250 deletions(-) diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index c91f66dcbe..9cbee408ff 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -52,11 +52,26 @@ typedef struct bool includewal; uint32 maxrate; bool sendtblspcmapfile; + bool parallel; } basebackup_options; +typedef struct +{ + bool isdir; + char *path; +} pathinfo; + +#define MAKE_PATHINFO(a, b) \ + do { \ + pi = palloc0(sizeof(pathinfo)); \ + pi->isdir = a; \ + pi->path = pstrdup(b); \ + } while(0) static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sendtblspclinks); +static int64 sendDir_(const char *path, int basepathlen, bool sizeonly, + List *tablespaces, bool sendtblspclinks, List **files); static bool sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf, bool missing_ok, Oid dboid); static void sendFileWithContent(const char *filename, const char *content); @@ -74,12 +89,18 @@ static int compareWalFileNames(const ListCell *a, const ListCell *b); static void throttle(size_t increment); static bool is_checksummed_file(const char *fullpath, const char *filename); +static void StopBackup(basebackup_options *opt); +static void SendBackupFileList(List *tablespaces); +static void SendFilesContents(List *files, bool missing_ok); +static void includeWALFiles(basebackup_options *opt, XLogRecPtr endptr, TimeLineID endtli); + /* Was the backup currently in-progress initiated in recovery mode? */ static bool backup_started_in_recovery = false; /* Relative path of temporary statistics directory */ static char *statrelpath = NULL; +#define TMP_BACKUP_LABEL_FILE BACKUP_LABEL_FILE".tmp" /* * Size of each block sent into the tar stream for larger files. */ @@ -305,6 +326,33 @@ perform_base_backup(basebackup_options *opt) throttling_counter = -1; } + /* + * In the parallel mode, we will not be closing the backup or sending the files right away. + * Instead we will only send the list of file names in the $PGDATA direcotry. + */ + if (opt->parallel) + { + /* save backup label into temp file for now. So stop backup can send it to pg_basebackup later on. */ + FILE *fp = AllocateFile(TMP_BACKUP_LABEL_FILE, "w"); + if (!fp) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", + TMP_BACKUP_LABEL_FILE))); + if (fwrite(labelfile->data, labelfile->len, 1, fp) != 1 || + fflush(fp) != 0 || + pg_fsync(fileno(fp)) != 0 || + ferror(fp) || + FreeFile(fp)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + TMP_BACKUP_LABEL_FILE))); + + SendBackupFileList(tablespaces); + return; + } + /* Send off our tablespaces one by one */ foreach(lc, tablespaces) { @@ -367,234 +415,8 @@ perform_base_backup(basebackup_options *opt) if (opt->includewal) - { - /* - * We've left the last tar file "open", so we can now append the - * required WAL files to it. - */ - char pathbuf[MAXPGPATH]; - XLogSegNo segno; - XLogSegNo startsegno; - XLogSegNo endsegno; - struct stat statbuf; - List *historyFileList = NIL; - List *walFileList = NIL; - char firstoff[MAXFNAMELEN]; - char lastoff[MAXFNAMELEN]; - DIR *dir; - struct dirent *de; - ListCell *lc; - TimeLineID tli; - - /* - * I'd rather not worry about timelines here, so scan pg_wal and - * include all WAL files in the range between 'startptr' and 'endptr', - * regardless of the timeline the file is stamped with. If there are - * some spurious WAL files belonging to timelines that don't belong in - * this server's history, they will be included too. Normally there - * shouldn't be such files, but if there are, there's little harm in - * including them. - */ - XLByteToSeg(startptr, startsegno, wal_segment_size); - XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size); - XLByteToPrevSeg(endptr, endsegno, wal_segment_size); - XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size); - - dir = AllocateDir("pg_wal"); - while ((de = ReadDir(dir, "pg_wal")) != NULL) - { - /* Does it look like a WAL segment, and is it in the range? */ - if (IsXLogFileName(de->d_name) && - strcmp(de->d_name + 8, firstoff + 8) >= 0 && - strcmp(de->d_name + 8, lastoff + 8) <= 0) - { - walFileList = lappend(walFileList, pstrdup(de->d_name)); - } - /* Does it look like a timeline history file? */ - else if (IsTLHistoryFileName(de->d_name)) - { - historyFileList = lappend(historyFileList, pstrdup(de->d_name)); - } - } - FreeDir(dir); - - /* - * Before we go any further, check that none of the WAL segments we - * need were removed. - */ - CheckXLogRemoved(startsegno, ThisTimeLineID); - - /* - * Sort the WAL filenames. We want to send the files in order from - * oldest to newest, to reduce the chance that a file is recycled - * before we get a chance to send it over. - */ - list_sort(walFileList, compareWalFileNames); - - /* - * There must be at least one xlog file in the pg_wal directory, since - * we are doing backup-including-xlog. - */ - if (walFileList == NIL) - ereport(ERROR, - (errmsg("could not find any WAL files"))); - - /* - * Sanity check: the first and last segment should cover startptr and - * endptr, with no gaps in between. - */ - XLogFromFileName((char *) linitial(walFileList), - &tli, &segno, wal_segment_size); - if (segno != startsegno) - { - char startfname[MAXFNAMELEN]; + includeWALFiles(opt, endptr, endtli); - XLogFileName(startfname, ThisTimeLineID, startsegno, - wal_segment_size); - ereport(ERROR, - (errmsg("could not find WAL file \"%s\"", startfname))); - } - foreach(lc, walFileList) - { - char *walFileName = (char *) lfirst(lc); - XLogSegNo currsegno = segno; - XLogSegNo nextsegno = segno + 1; - - XLogFromFileName(walFileName, &tli, &segno, wal_segment_size); - if (!(nextsegno == segno || currsegno == segno)) - { - char nextfname[MAXFNAMELEN]; - - XLogFileName(nextfname, ThisTimeLineID, nextsegno, - wal_segment_size); - ereport(ERROR, - (errmsg("could not find WAL file \"%s\"", nextfname))); - } - } - if (segno != endsegno) - { - char endfname[MAXFNAMELEN]; - - XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size); - ereport(ERROR, - (errmsg("could not find WAL file \"%s\"", endfname))); - } - - /* Ok, we have everything we need. Send the WAL files. */ - foreach(lc, walFileList) - { - char *walFileName = (char *) lfirst(lc); - FILE *fp; - char buf[TAR_SEND_SIZE]; - size_t cnt; - pgoff_t len = 0; - - snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName); - XLogFromFileName(walFileName, &tli, &segno, wal_segment_size); - - fp = AllocateFile(pathbuf, "rb"); - if (fp == NULL) - { - int save_errno = errno; - - /* - * Most likely reason for this is that the file was already - * removed by a checkpoint, so check for that to get a better - * error message. - */ - CheckXLogRemoved(segno, tli); - - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", pathbuf))); - } - - if (fstat(fileno(fp), &statbuf) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", - pathbuf))); - if (statbuf.st_size != wal_segment_size) - { - CheckXLogRemoved(segno, tli); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("unexpected WAL file size \"%s\"", walFileName))); - } - - /* send the WAL file itself */ - _tarWriteHeader(pathbuf, NULL, &statbuf, false); - - while ((cnt = fread(buf, 1, - Min(sizeof(buf), wal_segment_size - len), - fp)) > 0) - { - CheckXLogRemoved(segno, tli); - /* Send the chunk as a CopyData message */ - if (pq_putmessage('d', buf, cnt)) - ereport(ERROR, - (errmsg("base backup could not send data, aborting backup"))); - - len += cnt; - throttle(cnt); - - if (len == wal_segment_size) - break; - } - - if (len != wal_segment_size) - { - CheckXLogRemoved(segno, tli); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("unexpected WAL file size \"%s\"", walFileName))); - } - - /* wal_segment_size is a multiple of 512, so no need for padding */ - - FreeFile(fp); - - /* - * Mark file as archived, otherwise files can get archived again - * after promotion of a new node. This is in line with - * walreceiver.c always doing an XLogArchiveForceDone() after a - * complete segment. - */ - StatusFilePath(pathbuf, walFileName, ".done"); - sendFileWithContent(pathbuf, ""); - } - - /* - * Send timeline history files too. Only the latest timeline history - * file is required for recovery, and even that only if there happens - * to be a timeline switch in the first WAL segment that contains the - * checkpoint record, or if we're taking a base backup from a standby - * server and the target timeline changes while the backup is taken. - * But they are small and highly useful for debugging purposes, so - * better include them all, always. - */ - foreach(lc, historyFileList) - { - char *fname = lfirst(lc); - - snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname); - - if (lstat(pathbuf, &statbuf) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", pathbuf))); - - sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid); - - /* unconditionally mark file as archived */ - StatusFilePath(pathbuf, fname, ".done"); - sendFileWithContent(pathbuf, ""); - } - - /* Send CopyDone message for the last tar file */ - pq_putemptymessage('c'); - } SendXlogRecPtrResult(endptr, endtli); if (total_checksum_failures) @@ -638,6 +460,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_maxrate = false; bool o_tablespace_map = false; bool o_noverify_checksums = false; + bool o_parallel = false; MemSet(opt, 0, sizeof(*opt)); foreach(lopt, options) @@ -726,6 +549,16 @@ parse_basebackup_options(List *options, basebackup_options *opt) noverify_checksums = true; o_noverify_checksums = true; } + else if (strcmp(defel->defname, "parallel") == 0) + { + if (o_parallel) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + + opt->parallel = true; + o_parallel = true; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -760,7 +593,12 @@ SendBaseBackup(BaseBackupCmd *cmd) set_ps_display(activitymsg, false); } - perform_base_backup(&opt); + if (cmd->cmdtag == SEND_FILES_CONTENT) + SendFilesContents(cmd->backupfiles, true); + else if (cmd->cmdtag == STOP_BACKUP) + StopBackup(&opt); + else + perform_base_backup(&opt); } static void @@ -1004,9 +842,16 @@ sendTablespace(char *path, bool sizeonly) * information in the tar file. If not, we can skip that * as it will be sent separately in the tablespace_map file. */ + +static int64 sendDir(const char *path, int basepathlen, bool sizeonly, + List *tablespaces, bool sendtblspclinks) +{ + return sendDir_(path, basepathlen, sizeonly, tablespaces, sendtblspclinks, NULL); +} + static int64 -sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, - bool sendtblspclinks) +sendDir_(const char *path, int basepathlen, bool sizeonly, List *tablespaces, + bool sendtblspclinks, List **files) { DIR *dir; struct dirent *de; @@ -1160,6 +1005,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0) { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name); + + if (files != NULL) + { + pathinfo *pi; + + MAKE_PATHINFO(true, pathbuf); + *files = lappend(*files, pi); + } + size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly); excludeFound = true; break; @@ -1197,6 +1051,13 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf, sizeonly); + if (files != NULL) + { + pathinfo *pi; + + MAKE_PATHINFO(true, pathbuf); + *files = lappend(*files, pi); + } continue; /* don't recurse into pg_wal */ } @@ -1282,13 +1143,29 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, skip_this_dir = true; if (!skip_this_dir) - size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks); + { + if (files != NULL) + { + pathinfo *pi; + + MAKE_PATHINFO(true, pathbuf); + *files = lappend(*files, pi); + } + size += sendDir_(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, files); + } } else if (S_ISREG(statbuf.st_mode)) { bool sent = false; - if (!sizeonly) + if (files != NULL) + { + pathinfo *pi; + + MAKE_PATHINFO(false, pathbuf); + *files = lappend(*files, pi); + } + else if (!sizeonly) sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid); @@ -1711,3 +1588,427 @@ throttle(size_t increment) */ throttled_last = GetCurrentTimestamp(); } + + +static void +StopBackup(basebackup_options *opt) +{ + TimeLineID endtli; + XLogRecPtr endptr; + char *labelfile; + struct stat statbuf; + int r; + StringInfoData buf; + + /* Disable throttling. */ + throttling_counter = -1; + + /* send backup_label.tmp and pg_control files */ + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); /* overall format */ + pq_sendint16(&buf, 0); /* natts */ + pq_endmessage(&buf); + + /* ... and pg_control after everything else. */ + if (lstat(TMP_BACKUP_LABEL_FILE, &statbuf) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + TMP_BACKUP_LABEL_FILE))); + sendFile(TMP_BACKUP_LABEL_FILE, BACKUP_LABEL_FILE, &statbuf, false, InvalidOid); + + /* read backup_label file into buffer, we need it for do_pg_stop_backup */ + FILE *lfp = AllocateFile(TMP_BACKUP_LABEL_FILE, "r"); + if (!lfp) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + TMP_BACKUP_LABEL_FILE))); + } + + labelfile = palloc(statbuf.st_size + 1); + r = fread(labelfile, statbuf.st_size, 1, lfp); + labelfile[statbuf.st_size] = '\0'; + + + /* ... and pg_control after everything else. */ + if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + XLOG_CONTROL_FILE))); + sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid); + + pq_putemptymessage('c'); /* CopyDone */ + + /* stop backup */ + endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli); + + /* + * FIXME: opt->includewal is not avaiable here. so just calling it unconditionaly. but should add + * includewal option to STOP_BACKUP command that pg_basebacup sends. + */ + + // if (opt->includewal) + includeWALFiles(opt, endptr, endtli); + + /* send ending wal record. */ + SendXlogRecPtrResult(endptr, endtli); +} + +static void +SendBackupFileList(List *tablespaces) +{ + StringInfoData buf; + ListCell *lc; + + List *files = NIL; + foreach(lc, tablespaces) + { + tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc); + if (ti->path == NULL) + sendDir_(".", 1, false, NIL, true, &files); + else + sendDir_(ti->path, 1, false, NIL, true, &files); + } + + // add backup label file + pathinfo *pi; + MAKE_PATHINFO(false, TMP_BACKUP_LABEL_FILE); + files = lcons(pi, files); + + /* Construct and send the directory information */ + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint16(&buf, 2); /* 2 fields */ + + /* First field - isdirectory */ + pq_sendstring(&buf, "isDir"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, INT4OID); /* type oid */ + pq_sendint16(&buf, 4); /* typlen */ + pq_sendint32(&buf, 0); /* typmod */ + pq_sendint16(&buf, 0); /* format code */ + + /* Second field - file path */ + pq_sendstring(&buf, "path"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, TEXTOID); + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + + pq_endmessage(&buf); + + foreach(lc, files) + { + pathinfo *pi = (pathinfo *) lfirst(lc); + char *path = pi->path; + + /* Send one datarow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, 2); /* number of columns */ + + int32 isdir = pi->isdir ? 1 : 0; + send_int8_string(&buf, isdir); + + Size len = strlen(path); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, path, len); + + pq_endmessage(&buf); + } + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); + } + +static void +SendFilesContents(List *files, bool missing_ok) +{ + StringInfoData buf; + ListCell *lc; + + /* Disable throttling. */ + throttling_counter = -1; + + /* Send CopyOutResponse message */ + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); /* overall format */ + pq_sendint16(&buf, 0); /* natts */ + pq_endmessage(&buf); + + foreach(lc, files) + { + Value *strval = lfirst(lc); + char *pathbuf = (char *) strVal(strval); + + // send file + struct stat statbuf; + if (lstat(pathbuf, &statbuf) != 0) + { + if (errno != ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file or directory \"%s\": %m", + pathbuf))); + + /* If the file went away while scanning, it's not an error. */ + continue; + } + + /* + * TODO: perhaps create directory entry in the tar file, to avoid the need of manually creating + * directories in pg_basebackup.c + */ +// if (S_ISDIR(statbuf.st_mode)) +// { +// bool skip_this_dir = false; +// ListCell *lc; +// +// /* +// * Store a directory entry in the tar file so we can get the +// * permissions right. +// */ +// +// _tarWriteHeader(pathbuf, NULL, &statbuf, false); +// } + sendFile(pathbuf, pathbuf, &statbuf, true, InvalidOid); + } + + pq_putemptymessage('c'); /* CopyDone */ + return; +} + +static void +includeWALFiles(basebackup_options *opt, XLogRecPtr endptr, TimeLineID endtli) +{ + /* + * We've left the last tar file "open", so we can now append the + * required WAL files to it. + */ + char pathbuf[MAXPGPATH]; + XLogSegNo segno; + XLogSegNo startsegno; + XLogSegNo endsegno; + struct stat statbuf; + List *historyFileList = NIL; + List *walFileList = NIL; + char firstoff[MAXFNAMELEN]; + char lastoff[MAXFNAMELEN]; + DIR *dir; + struct dirent *de; + ListCell *lc; + TimeLineID tli; + + /* + * I'd rather not worry about timelines here, so scan pg_wal and + * include all WAL files in the range between 'startptr' and 'endptr', + * regardless of the timeline the file is stamped with. If there are + * some spurious WAL files belonging to timelines that don't belong in + * this server's history, they will be included too. Normally there + * shouldn't be such files, but if there are, there's little harm in + * including them. + */ + XLByteToSeg(startptr, startsegno, wal_segment_size); + XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size); + XLByteToPrevSeg(endptr, endsegno, wal_segment_size); + XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size); + + dir = AllocateDir("pg_wal"); + while ((de = ReadDir(dir, "pg_wal")) != NULL) + { + /* Does it look like a WAL segment, and is it in the range? */ + if (IsXLogFileName(de->d_name) && + strcmp(de->d_name + 8, firstoff + 8) >= 0 && + strcmp(de->d_name + 8, lastoff + 8) <= 0) + { + walFileList = lappend(walFileList, pstrdup(de->d_name)); + } + /* Does it look like a timeline history file? */ + else if (IsTLHistoryFileName(de->d_name)) + { + historyFileList = lappend(historyFileList, pstrdup(de->d_name)); + } + } + FreeDir(dir); + + /* + * Before we go any further, check that none of the WAL segments we + * need were removed. + */ + CheckXLogRemoved(startsegno, ThisTimeLineID); + + /* + * Sort the WAL filenames. We want to send the files in order from + * oldest to newest, to reduce the chance that a file is recycled + * before we get a chance to send it over. + */ + list_sort(walFileList, compareWalFileNames); + + /* + * There must be at least one xlog file in the pg_wal directory, since + * we are doing backup-including-xlog. + */ + if (walFileList == NIL) + ereport(ERROR, + (errmsg("could not find any WAL files"))); + + /* + * Sanity check: the first and last segment should cover startptr and + * endptr, with no gaps in between. + */ + XLogFromFileName((char *) linitial(walFileList), + &tli, &segno, wal_segment_size); + if (segno != startsegno) + { + char startfname[MAXFNAMELEN]; + + XLogFileName(startfname, ThisTimeLineID, startsegno, + wal_segment_size); + ereport(ERROR, + (errmsg("could not find WAL file \"%s\"", startfname))); + } + foreach(lc, walFileList) + { + char *walFileName = (char *) lfirst(lc); + XLogSegNo currsegno = segno; + XLogSegNo nextsegno = segno + 1; + + XLogFromFileName(walFileName, &tli, &segno, wal_segment_size); + if (!(nextsegno == segno || currsegno == segno)) + { + char nextfname[MAXFNAMELEN]; + + XLogFileName(nextfname, ThisTimeLineID, nextsegno, + wal_segment_size); + ereport(ERROR, + (errmsg("could not find WAL file \"%s\"", nextfname))); + } + } + if (segno != endsegno) + { + char endfname[MAXFNAMELEN]; + + XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size); + ereport(ERROR, + (errmsg("could not find WAL file \"%s\"", endfname))); + } + + /* Ok, we have everything we need. Send the WAL files. */ + foreach(lc, walFileList) + { + char *walFileName = (char *) lfirst(lc); + FILE *fp; + char buf[TAR_SEND_SIZE]; + size_t cnt; + pgoff_t len = 0; + + snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName); + XLogFromFileName(walFileName, &tli, &segno, wal_segment_size); + + fp = AllocateFile(pathbuf, "rb"); + if (fp == NULL) + { + int save_errno = errno; + + /* + * Most likely reason for this is that the file was already + * removed by a checkpoint, so check for that to get a better + * error message. + */ + CheckXLogRemoved(segno, tli); + + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", pathbuf))); + } + + if (fstat(fileno(fp), &statbuf) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + pathbuf))); + if (statbuf.st_size != wal_segment_size) + { + CheckXLogRemoved(segno, tli); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected WAL file size \"%s\"", walFileName))); + } + + /* send the WAL file itself */ + _tarWriteHeader(pathbuf, NULL, &statbuf, false); + + while ((cnt = fread(buf, 1, + Min(sizeof(buf), wal_segment_size - len), + fp)) > 0) + { + CheckXLogRemoved(segno, tli); + /* Send the chunk as a CopyData message */ + if (pq_putmessage('d', buf, cnt)) + ereport(ERROR, + (errmsg("base backup could not send data, aborting backup"))); + + len += cnt; + throttle(cnt); + + if (len == wal_segment_size) + break; + } + + if (len != wal_segment_size) + { + CheckXLogRemoved(segno, tli); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected WAL file size \"%s\"", walFileName))); + } + + /* wal_segment_size is a multiple of 512, so no need for padding */ + + FreeFile(fp); + + /* + * Mark file as archived, otherwise files can get archived again + * after promotion of a new node. This is in line with + * walreceiver.c always doing an XLogArchiveForceDone() after a + * complete segment. + */ + StatusFilePath(pathbuf, walFileName, ".done"); + sendFileWithContent(pathbuf, ""); + } + + /* + * Send timeline history files too. Only the latest timeline history + * file is required for recovery, and even that only if there happens + * to be a timeline switch in the first WAL segment that contains the + * checkpoint record, or if we're taking a base backup from a standby + * server and the target timeline changes while the backup is taken. + * But they are small and highly useful for debugging purposes, so + * better include them all, always. + */ + foreach(lc, historyFileList) + { + char *fname = lfirst(lc); + + snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname); + + if (lstat(pathbuf, &statbuf) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", pathbuf))); + + sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid); + + /* unconditionally mark file as archived */ + StatusFilePath(pathbuf, fname, ".done"); + sendFileWithContent(pathbuf, ""); + } + + /* Send CopyDone message for the last tar file */ + pq_putemptymessage('c'); +} diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index c4e11cc4e8..56b6934e43 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -87,6 +87,10 @@ static SQLCmd *make_sqlcmd(void); %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT +%token K_PARALLEL +%token K_START_BACKUP +%token K_SEND_FILES_CONTENT +%token K_STOP_BACKUP %type command %type base_backup start_replication start_logical_replication @@ -102,6 +106,8 @@ static SQLCmd *make_sqlcmd(void); %type opt_temporary %type create_slot_opt_list %type create_slot_opt +%type backup_files backup_files_list +%type backup_file %% @@ -155,13 +161,29 @@ var_name: IDENT { $$ = $1; } /* * BASE_BACKUP [LABEL '