bugfix_early_archiving_v1.0.patch
application/octet-stream
Filename: bugfix_early_archiving_v1.0.patch
Type: application/octet-stream
Part: 1
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a1256a1..520bcd2 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -644,6 +644,9 @@ typedef struct XLogCtlData
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
int XLogCacheBlck; /* highest allocated xlog buffer index */
+ XLogRecPtr *CrossBoundaryEndRecPtrs;
+ XLogSegNo latestArchiveNotifiedSegNo;
+
/*
* Shared copy of ThisTimeLineID. Does not change after end-of-recovery.
* If we created a new timeline when the system was started up,
@@ -968,6 +971,11 @@ static void WALInsertLockAcquireExclusive(void);
static void WALInsertLockRelease(void);
static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
+static void XLogMarkEndRecPtrIfNeeded(XLogRecPtr start, XLogRecPtr end);
+static void XLogArchiveNotifySegmentsInPrimary(void);
+static void XLogArchiveNotifySegInStandby(XLogSegNo segno);
+static void XLogArchiveNotifySegmentsInStandby(XLogRecPtr start, XLogRecPtr end);
+
/*
* Insert an XLOG record represented by an already-constructed chain of data
* chunks. This is a low-level routine; to construct the WAL record header
@@ -1641,6 +1649,9 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
CurrPos = MAXALIGN64(CurrPos);
}
+ if (XLogArchivingActive())
+ XLogMarkEndRecPtrIfNeeded(StartPos, EndPos);
+
if (CurrPos != EndPos)
elog(PANIC, "space reserved for WAL record does not match what was written");
}
@@ -2567,11 +2578,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
* later. Doing it here ensures that one and only one backend will
* perform this fsync.
*
- * This is also the right place to notify the Archiver that the
- * segment is ready to copy to archival storage, and to update the
- * timer for archive_timeout, and to signal for a checkpoint if
- * too many logfile segments have been used since the last
- * checkpoint.
+ * This is also the right place to update the timer for
+ * archive_timeout, and to signal for a checkpoint if too many
+ * logfile segments have been used since the last checkpoint.
*/
if (finishing_seg)
{
@@ -2583,7 +2592,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
- XLogArchiveNotifySeg(openLogSegNo);
+ XLogArchiveNotifySegmentsInPrimary();
XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
@@ -2653,6 +2662,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
WalSndWakeupRequest();
LogwrtResult.Flush = LogwrtResult.Write;
+
+ if (XLogArchivingActive())
+ XLogArchiveNotifySegmentsInPrimary();
}
/*
@@ -2673,6 +2685,89 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
}
}
+static void
+XLogMarkEndRecPtrIfNeeded(XLogRecPtr start, XLogRecPtr end)
+{
+ XLogSegNo start_segno;
+ XLogSegNo end_segno;
+
+ XLByteToSeg(start, start_segno, wal_segment_size);
+ XLByteToSeg(end, end_segno, wal_segment_size);
+ if (start_segno != end_segno)
+ XLogCtl->CrossBoundaryEndRecPtrs[XLogRecPtrToBufIdx(end)] = end;
+}
+
+static void
+XLogArchiveNotifySegmentsInPrimary(void)
+{
+ XLogSegNo target_segno;
+ XLogSegNo flushing_segno;
+ XLogSegNo latest_target_segno;
+ int idx;
+
+ idx = XLogRecPtrToBufIdx(LogwrtResult.Flush);
+
+ /* Is WAL record crossing from previous segment flushed completely? */
+ if (XLogRecPtrIsInvalid(XLogCtl->CrossBoundaryEndRecPtrs[idx]) ||
+ LogwrtResult.Flush < XLogCtl->CrossBoundaryEndRecPtrs[idx])
+ return; /* No, the WAL record is not flushed completely. */
+
+ /* OK. Notification against previous segment is safe. */
+ XLByteToSeg(LogwrtResult.Flush, flushing_segno, wal_segment_size);
+ latest_target_segno = flushing_segno - 1;
+
+ /* These segments have to be notified. */
+ for (target_segno = XLogCtl->latestArchiveNotifiedSegNo + 1;
+ target_segno <= latest_target_segno; ++target_segno)
+ XLogArchiveNotifySeg(target_segno);
+
+ XLogCtl->CrossBoundaryEndRecPtrs[idx] = InvalidXLogRecPtr;
+ XLogCtl->latestArchiveNotifiedSegNo = latest_target_segno;
+}
+
+static void
+XLogArchiveNotifySegInStandby(XLogSegNo segno)
+{
+ char xlogfname[MAXFNAMELEN];
+ char xlogPath[MAXPGPATH];
+ struct stat stat_buf;
+
+ XLogFileName(xlogfname, ThisTimeLineID, segno, wal_segment_size);
+ snprintf(xlogPath, MAXPGPATH, XLOGDIR "/%s", xlogfname);
+
+ /*
+ * In case of switching TLI, the last segment file in previous TLI
+ * may have been deleted because it is not needed. (Same segment
+ * file in new TLI is read, but one in previous TLI is not read.)
+ */
+ if (stat(xlogPath, &stat_buf) != 0)
+ return;
+
+ if (XLogArchiveIsReadyOrDone(xlogfname))
+ return;
+
+ if (!XLogArchivingAlways())
+ XLogArchiveForceDone(xlogfname);
+ else
+ XLogArchiveNotify(xlogfname);
+}
+
+static void
+XLogArchiveNotifySegmentsInStandby(XLogRecPtr start, XLogRecPtr end)
+{
+ XLogSegNo target_segno;
+ XLogSegNo start_segno;
+ XLogSegNo end_segno;
+
+ XLByteToSeg(start, start_segno, wal_segment_size);
+ XLByteToSeg(end, end_segno, wal_segment_size);
+
+ for (target_segno = start_segno; target_segno < end_segno; ++target_segno)
+ XLogArchiveNotifySegInStandby(target_segno);
+
+ XLogCtl->latestArchiveNotifiedSegNo = target_segno;
+}
+
/*
* Record the LSN for an asynchronous transaction commit/abort
* and nudge the WALWriter if there is work for it to do.
@@ -5062,6 +5157,8 @@ XLOGShmemSize(void)
/* WAL insertion locks, plus alignment */
size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1));
+ /* CrossBoundaryEndRecPtrs array */
+ size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
/* xlblocks array */
size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
/* extra alignment padding for XLOG I/O buffers */
@@ -5145,6 +5242,9 @@ XLOGShmemInit(void)
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
+ XLogCtl->CrossBoundaryEndRecPtrs = (XLogRecPtr *) allocptr;
+ memset(XLogCtl->CrossBoundaryEndRecPtrs, 0, sizeof(XLogRecPtr) * XLOGbuffers);
+ allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
/* WAL insertion locks. Ensure they're aligned to the full padded size */
allocptr += sizeof(WALInsertLockPadded) -
@@ -7296,6 +7396,18 @@ StartupXLOG(void)
/* Check that it's OK to switch to this TLI */
checkTimeLineSwitch(EndRecPtr, newTLI, prevTLI);
+ /*
+ * The last segment in current TLI has to be notified
+ * before TLI switching.
+ */
+ if (StandbyMode)
+ {
+ XLogSegNo lastSegNo;
+
+ XLByteToPrevSeg(ReadRecPtr, lastSegNo, wal_segment_size);
+ XLogArchiveNotifySegInStandby(lastSegNo);
+ }
+
/* Following WAL records should be run with new TLI */
ThisTimeLineID = newTLI;
switchedTLI = true;
@@ -7303,6 +7415,13 @@ StartupXLOG(void)
}
/*
+ * If the record is a cross-segment-boundary, we can notify to
+ * archive it.
+ */
+ if (StandbyMode)
+ XLogArchiveNotifySegmentsInStandby(ReadRecPtr, EndRecPtr);
+
+ /*
* Update shared replayEndRecPtr before replaying this record,
* so that XLogFlush will update minRecoveryPoint correctly.
*/
@@ -7703,6 +7822,13 @@ StartupXLOG(void)
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
+ if (XLogArchivingActive())
+ {
+ XLogSegNo endLogSegNo;
+ XLByteToSeg(EndOfLog, endLogSegNo, wal_segment_size);
+ XLogCtl->latestArchiveNotifiedSegNo = endLogSegNo - 1;
+ }
+
/*
* Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
* record before resource manager writes cleanup WAL records or checkpoint
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index d1ad75d..cb6b3bf 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -621,15 +621,6 @@ WalReceiverMain(void)
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
xlogfname)));
-
- /*
- * Create .done file forcibly to prevent the streamed segment from
- * being archived later.
- */
- if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
- XLogArchiveForceDone(xlogfname);
- else
- XLogArchiveNotify(xlogfname);
}
recvFile = -1;
@@ -928,15 +919,6 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
xlogfname)));
-
- /*
- * Create .done file forcibly to prevent the streamed segment
- * from being archived later.
- */
- if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
- XLogArchiveForceDone(xlogfname);
- else
- XLogArchiveNotify(xlogfname);
}
recvFile = -1;