0001-Add-write-combining-to-checkpoint-buffer-writes.patch
text/x-patch
Filename: 0001-Add-write-combining-to-checkpoint-buffer-writes.patch
Type: text/x-patch
Part: 0
Message:
Re: Checkpointer write combining
From 9aba47eea0f5a856d023fbc32ab61ec62cacf22a Mon Sep 17 00:00:00 2001
From: Soumya S Murali <soumyamurali.work@gmail.com>
Date: Mon, 15 Dec 2025 14:08:42 +0530
Subject: [PATCH] Add write-combining to checkpoint buffer writes.
Signed-off-by: Soumya S Murali <soumyamurali.work@gmail.com>
---
src/backend/storage/buffer/bufmgr.c | 750 ++++++++++++++++------------
src/backend/storage/page/bufpage.c | 19 +
src/include/storage/buf_internals.h | 22 +
src/include/storage/bufmgr.h | 4 +-
4 files changed, 485 insertions(+), 310 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f373cead95f..2ea5311aed2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -537,6 +537,12 @@ static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
+static bool PrepareBufferForCheckpoint(BufferDesc *bufdesc,
+ XLogRecPtr *lsn);
+static bool BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn);
+static void CleanVictimBuffer(BufferDesc *bufdesc, bool from_ring,
+ IOContext io_context);
+static int FlushBufferBatch(BufWriteBatch *batch, IOContext io_context);
static void FindAndDropRelationBuffers(RelFileLocator rlocator,
ForkNumber forkNum,
BlockNumber nForkBlock,
@@ -2331,34 +2337,35 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
ReservePrivateRefCountEntry();
ResourceOwnerEnlarge(CurrentResourceOwner);
- /* we return here if a prospective victim buffer gets used concurrently */
-again:
+ /* retry using loop instead of goto */
+ for (;;)
+ {
- /*
- * Select a victim buffer. The buffer is returned pinned and owned by
- * this backend.
- */
- buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring);
- buf = BufferDescriptorGetBuffer(buf_hdr);
+ /*
+ * Select a victim buffer. The buffer is returned pinned and owned by
+ * this backend.
+ */
+ buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring);
+ buf = BufferDescriptorGetBuffer(buf_hdr);
- /*
- * We shouldn't have any other pins for this buffer.
- */
- CheckBufferIsPinnedOnce(buf);
+ /*
+ * We shouldn't have any other pins for this buffer.
+ */
+ CheckBufferIsPinnedOnce(buf);
- /*
- * If the buffer was dirty, try to write it out. There is a race
- * condition here, in that someone might dirty it after we released the
- * buffer header lock above, or even while we are writing it out (since
- * our share-lock won't prevent hint-bit updates). We will recheck the
- * dirty bit after re-locking the buffer header.
- */
- if (buf_state & BM_DIRTY)
- {
- LWLock *content_lock;
+ /*
+ * If the buffer was dirty, try to write it out. There is a race
+ * condition here, in that someone might dirty it after we released the
+ * buffer header lock above, or even while we are writing it out (since
+ * our share-lock won't prevent hint-bit updates). We will recheck the
+ * dirty bit after re-locking the buffer header.
+ */
+ if (buf_state & BM_DIRTY)
+ {
+ LWLock *content_lock;
- Assert(buf_state & BM_TAG_VALID);
- Assert(buf_state & BM_VALID);
+ Assert(buf_state & BM_TAG_VALID);
+ Assert(buf_state & BM_VALID);
/*
* We need a share-lock on the buffer contents to write it out (else
@@ -2374,16 +2381,16 @@ again:
* one just happens to be trying to split the page the first one got
* from StrategyGetBuffer.)
*/
- content_lock = BufferDescriptorGetContentLock(buf_hdr);
- if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
- {
+ content_lock = BufferDescriptorGetContentLock(buf_hdr);
+ if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+ {
/*
* Someone else has locked the buffer, so give it up and loop back
* to get another one.
*/
- UnpinBuffer(buf_hdr);
- goto again;
- }
+ UnpinBuffer(buf_hdr);
+ continue;
+ }
/*
* If using a nondefault strategy, and writing the buffer would
@@ -2392,35 +2399,34 @@ again:
* lock to inspect the page LSN, so this can't be done inside
* StrategyGetBuffer.
*/
- if (strategy != NULL)
- {
- XLogRecPtr lsn;
+ if (strategy != NULL)
+ {
+ XLogRecPtr lsn;
- /* Read the LSN while holding buffer header lock */
- buf_state = LockBufHdr(buf_hdr);
- lsn = BufferGetLSN(buf_hdr);
- UnlockBufHdr(buf_hdr);
+ /* Read the LSN while holding buffer header lock */
+ buf_state = LockBufHdr(buf_hdr);
+ lsn = BufferGetLSN(buf_hdr);
+ UnlockBufHdr(buf_hdr);
- if (XLogNeedsFlush(lsn)
- && StrategyRejectBuffer(strategy, buf_hdr, from_ring))
- {
- LWLockRelease(content_lock);
- UnpinBuffer(buf_hdr);
- goto again;
+ if (XLogNeedsFlush(lsn)
+ && StrategyRejectBuffer(strategy, buf_hdr, from_ring))
+ {
+ LWLockRelease(content_lock);
+ UnpinBuffer(buf_hdr);
+ continue;
+ }
}
- }
- /* OK, do the I/O */
- FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
- LWLockRelease(content_lock);
+ /* OK, do the I/O */
+ FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
+ LWLockRelease(content_lock);
- ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+ ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
&buf_hdr->tag);
- }
-
+ }
- if (buf_state & BM_VALID)
- {
+ if (buf_state & BM_VALID)
+ {
/*
* When a BufferAccessStrategy is in use, blocks evicted from shared
* buffers are counted as IOOP_EVICT in the corresponding context
@@ -2437,32 +2443,33 @@ again:
* we may have been forced to release the buffer due to concurrent
* pinners or erroring out.
*/
- pgstat_count_io_op(IOOBJECT_RELATION, io_context,
- from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0);
- }
+ pgstat_count_io_op(IOOBJECT_RELATION, io_context,
+ from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0);
+ }
- /*
- * If the buffer has an entry in the buffer mapping table, delete it. This
- * can fail because another backend could have pinned or dirtied the
- * buffer.
- */
- if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr))
- {
- UnpinBuffer(buf_hdr);
- goto again;
- }
+ /*
+ * If the buffer has an entry in the buffer mapping table, delete it. This
+ * can fail because another backend could have pinned or dirtied the
+ * buffer.
+ */
+ if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr))
+ {
+ UnpinBuffer(buf_hdr);
+ continue;
+ }
- /* a final set of sanity checks */
+ /* a final set of sanity checks */
#ifdef USE_ASSERT_CHECKING
- buf_state = pg_atomic_read_u32(&buf_hdr->state);
+ buf_state = pg_atomic_read_u32(&buf_hdr->state);
- Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
- Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY)));
+ Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
+ Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY)));
- CheckBufferIsPinnedOnce(buf);
+ CheckBufferIsPinnedOnce(buf);
#endif
- return buf;
+ return buf;
+ } /* end of for(;;) */
}
/*
@@ -3329,6 +3336,92 @@ TrackNewBufferPin(Buffer buf)
#define ST_DEFINE
#include "lib/sort_template.h"
+static void
+ResetBufWriteBatch(BufWriteBatch *batch)
+{
+ batch->n = 0;
+ batch->max_lsn = InvalidXLogRecPtr;
+ batch->reln = NULL;
+ WritebackContextInit(&batch->wb_context, &checkpoint_flush_after);
+}
+
+/*
+ * SyncOneBuffer -- process a single buffer during syncing.
+ *
+ * If skip_recently_used is true, we don't write currently-pinned buffers, nor
+ * buffers marked recently used, as these are not replacement candidates.
+ *
+ * Returns a bitmask containing the following flag bits:
+ * BUF_WRITTEN: we wrote the buffer.
+ * BUF_REUSABLE: buffer is available for replacement, ie, it has
+ * pin count 0 and usage count 0.
+ *
+ * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean
+ * after locking it, but we don't care all that much.)
+ */
+static int
+SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
+{
+ BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+ int result = 0;
+ uint32 buf_state;
+ BufferTag tag;
+
+ /* Make sure we can handle the pin */
+ ReservePrivateRefCountEntry();
+ ResourceOwnerEnlarge(CurrentResourceOwner);
+
+ /*
+ * Check whether buffer needs writing.
+ *
+ * We can make this check without taking the buffer content lock so long
+ * as we mark pages dirty in access methods *before* logging changes with
+ * XLogInsert(): if someone marks the buffer dirty just after our check we
+ * don't worry because our checkpoint.redo points before log record for
+ * upcoming changes and so we are not required to write such dirty buffer.
+ */
+ buf_state = LockBufHdr(bufHdr);
+
+ if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
+ BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
+ {
+ result |= BUF_REUSABLE;
+ }
+ else if (skip_recently_used)
+ {
+ /* Caller told us not to write recently-used buffers */
+ UnlockBufHdr(bufHdr);
+ return result;
+ }
+
+ if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
+ {
+ /* It's clean, so nothing to do */
+ UnlockBufHdr(bufHdr);
+ return result;
+ }
+
+ /*
+ * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the
+ * buffer is clean by the time we've locked it.)
+ */
+ PinBuffer_Locked(bufHdr);
+
+ FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
+
+ tag = bufHdr->tag;
+
+ UnpinBuffer(bufHdr);
+
+ /*
+ * SyncOneBuffer() is only called by checkpointer and bgwriter, so
+ * IOContext will always be IOCONTEXT_NORMAL.
+ */
+ ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
+
+ return result | BUF_WRITTEN;
+}
+
/*
* BufferSync -- Write out all dirty buffers in the pool.
*
@@ -3344,144 +3437,81 @@ BufferSync(int flags)
{
uint32 buf_state;
int buf_id;
- int num_to_scan;
- int num_spaces;
- int num_processed;
- int num_written;
+ int num_to_scan = 0;
+ int num_spaces = 0;
+ int num_processed = 0;
+ int num_written = 0;
CkptTsStatus *per_ts_stat = NULL;
Oid last_tsid;
binaryheap *ts_heap;
- int i;
- uint32 mask = BM_DIRTY;
WritebackContext wb_context;
+ uint32 mask = BM_DIRTY;
+ int i;
- /*
- * Unless this is a shutdown checkpoint or we have been explicitly told,
- * we write only permanent, dirty buffers. But at shutdown or end of
- * recovery, we write all dirty buffers.
- */
- if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
+ /* Determine which buffers must be written */
+ if (!((flags & (CHECKPOINT_IS_SHUTDOWN |
+ CHECKPOINT_END_OF_RECOVERY |
CHECKPOINT_FLUSH_UNLOGGED))))
mask |= BM_PERMANENT;
- /*
- * Loop over all buffers, and mark the ones that need to be written with
- * BM_CHECKPOINT_NEEDED. Count them as we go (num_to_scan), so that we
- * can estimate how much work needs to be done.
- *
- * This allows us to write only those pages that were dirty when the
- * checkpoint began, and not those that get dirtied while it proceeds.
- * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
- * later in this function, or by normal backends or the bgwriter cleaning
- * scan, the flag is cleared. Any buffer dirtied after this point won't
- * have the flag set.
- *
- * Note that if we fail to write some buffer, we may leave buffers with
- * BM_CHECKPOINT_NEEDED still set. This is OK since any such buffer would
- * certainly need to be written for the next checkpoint attempt, too.
- */
- num_to_scan = 0;
+ /* identify dirty buffers at checkpoint start */
for (buf_id = 0; buf_id < NBuffers; buf_id++)
{
BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
uint32 set_bits = 0;
- /*
- * Header spinlock is enough to examine BM_DIRTY, see comment in
- * SyncOneBuffer.
- */
buf_state = LockBufHdr(bufHdr);
if ((buf_state & mask) == mask)
{
- CkptSortItem *item;
+ CkptSortItem *item = &CkptBufferIds[num_to_scan++];
set_bits = BM_CHECKPOINT_NEEDED;
- item = &CkptBufferIds[num_to_scan++];
- item->buf_id = buf_id;
- item->tsId = bufHdr->tag.spcOid;
- item->relNumber = BufTagGetRelNumber(&bufHdr->tag);
- item->forkNum = BufTagGetForkNum(&bufHdr->tag);
- item->blockNum = bufHdr->tag.blockNum;
+ item->buf_id = buf_id;
+ item->tsId = bufHdr->tag.spcOid;
+ item->relNumber = bufHdr->tag.relNumber;
+ item->forkNum = bufHdr->tag.forkNum;
+ item->blockNum = bufHdr->tag.blockNum;
}
UnlockBufHdrExt(bufHdr, buf_state,
- set_bits, 0,
- 0);
+ set_bits, 0, 0);
- /* Check for barrier events in case NBuffers is large. */
if (ProcSignalBarrierPending)
ProcessProcSignalBarrier();
}
if (num_to_scan == 0)
- return; /* nothing to do */
+ return;
WritebackContextInit(&wb_context, &checkpoint_flush_after);
TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
- /*
- * Sort buffers that need to be written to reduce the likelihood of random
- * IO. The sorting is also important for the implementation of balancing
- * writes between tablespaces. Without balancing writes we'd potentially
- * end up writing to the tablespaces one-by-one; possibly overloading the
- * underlying system.
- */
+ /* Sort by tablespace */
sort_checkpoint_bufferids(CkptBufferIds, num_to_scan);
- num_spaces = 0;
-
- /*
- * Allocate progress status for each tablespace with buffers that need to
- * be flushed. This requires the to-be-flushed array to be sorted.
- */
+ /* Build per-tablespace progress tracking */
last_tsid = InvalidOid;
+
for (i = 0; i < num_to_scan; i++)
{
+ Oid cur_tsid = CkptBufferIds[i].tsId;
CkptTsStatus *s;
- Oid cur_tsid;
-
- cur_tsid = CkptBufferIds[i].tsId;
- /*
- * Grow array of per-tablespace status structs, every time a new
- * tablespace is found.
- */
- if (last_tsid == InvalidOid || last_tsid != cur_tsid)
+ if (last_tsid == InvalidOid || cur_tsid != last_tsid)
{
- Size sz;
-
num_spaces++;
-
- /*
- * Not worth adding grow-by-power-of-2 logic here - even with a
- * few hundred tablespaces this should be fine.
- */
- sz = sizeof(CkptTsStatus) * num_spaces;
-
- if (per_ts_stat == NULL)
- per_ts_stat = (CkptTsStatus *) palloc(sz);
- else
- per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz);
+ per_ts_stat = (num_spaces == 1)
+ ? palloc(sizeof(CkptTsStatus))
+ : repalloc(per_ts_stat, sizeof(CkptTsStatus) * num_spaces);
s = &per_ts_stat[num_spaces - 1];
memset(s, 0, sizeof(*s));
- s->tsId = cur_tsid;
-
- /*
- * The first buffer in this tablespace. As CkptBufferIds is sorted
- * by tablespace all (s->num_to_scan) buffers in this tablespace
- * will follow afterwards.
- */
+ s->tsId = cur_tsid;
s->index = i;
- /*
- * progress_slice will be determined once we know how many buffers
- * are in each tablespace, i.e. after this loop.
- */
-
last_tsid = cur_tsid;
}
else
@@ -3491,117 +3521,132 @@ BufferSync(int flags)
s->num_to_scan++;
- /* Check for barrier events. */
if (ProcSignalBarrierPending)
ProcessProcSignalBarrier();
}
- Assert(num_spaces > 0);
-
- /*
- * Build a min-heap over the write-progress in the individual tablespaces,
- * and compute how large a portion of the total progress a single
- * processed buffer is.
- */
ts_heap = binaryheap_allocate(num_spaces,
ts_ckpt_progress_comparator,
NULL);
for (i = 0; i < num_spaces; i++)
{
- CkptTsStatus *ts_stat = &per_ts_stat[i];
-
- ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
+ CkptTsStatus *st = &per_ts_stat[i];
- binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
+ st->progress_slice = (float8) num_to_scan / st->num_to_scan;
+ binaryheap_add_unordered(ts_heap, PointerGetDatum(st));
}
binaryheap_build(ts_heap);
- /*
- * Iterate through to-be-checkpointed buffers and write the ones (still)
- * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
- * tablespaces; otherwise the sorting would lead to only one tablespace
- * receiving writes at a time, making inefficient use of the hardware.
- */
- num_processed = 0;
- num_written = 0;
+ /* write combining state */
+ BufWriteBatch batch;
+ ResetBufWriteBatch(&batch);
+
while (!binaryheap_empty(ts_heap))
{
- BufferDesc *bufHdr = NULL;
- CkptTsStatus *ts_stat = (CkptTsStatus *)
- DatumGetPointer(binaryheap_first(ts_heap));
+ CkptTsStatus *ts_stat =
+ (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap));
buf_id = CkptBufferIds[ts_stat->index].buf_id;
- Assert(buf_id != -1);
-
- bufHdr = GetBufferDescriptor(buf_id);
+ Assert(buf_id >= 0);
+ BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
num_processed++;
/*
- * We don't need to acquire the lock here, because we're only looking
- * at a single bit. It's possible that someone else writes the buffer
- * and clears the flag right after we check, but that doesn't matter
- * since SyncOneBuffer will then do nothing. However, there is a
- * further race condition: it's conceivable that between the time we
- * examine the bit here and the time SyncOneBuffer acquires the lock,
- * someone else not only wrote the buffer but replaced it with another
- * page and dirtied it. In that improbable case, SyncOneBuffer will
- * write the buffer though we didn't need to. It doesn't seem worth
- * guarding against this, though.
+ * Write Combining:
+ * We accumulate buffers that still have BM_CHECKPOINT_NEEDED.
*/
if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
{
- if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+
+ bool start_new;
+ BufferDesc *buf = bufHdr;
+ BufferTag tag = buf->tag;
+
+ /* Extract locator once */
+ RelFileLocator rlocator = BufTagGetRelFileLocator(&tag);
+
+ start_new = false;
+
+ if (batch.n == 0)
+ {
+ batch.rlocator = rlocator;
+ batch.forkno = tag.forkNum;
+ batch.start = tag.blockNum;
+ }
+ else
{
- TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
- PendingCheckpointerStats.buffers_written++;
- num_written++;
+ bool same_rel = RelFileLocatorEquals(batch.rlocator, rlocator);
+ bool same_fork = (batch.forkno == tag.forkNum);
+ bool contiguous = (tag.blockNum == batch.start + batch.n);
+
+ if (!(same_rel && same_fork && contiguous))
+ start_new = true;
+ }
+
+ if (start_new)
+ {
+ /* Flush accumulated boatch */
+ int w = FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+ num_written += w;
+ PendingCheckpointerStats.buffers_written += w;
+ ResetBufWriteBatch(&batch);
+
+ /* initialize new batch */
+ batch.rlocator = BufTagGetRelFileLocator(&tag);
+ batch.forkno = tag.forkNum;
+ batch.start = tag.blockNum;
+ }
+
+ /* Add buffer to batch */
+ batch.bufdescs[batch.n++] = buf;
+
+ /* Track max LSN */
+ XLogRecPtr lsn;
+ BufferNeedsWALFlush(buf, &lsn);
+ if (lsn > batch.max_lsn)
+ batch.max_lsn = lsn;
+
+ /* If full, flush immediately */
+ if (batch.n == MAX_IO_COMBINE_LIMIT)
+ {
+ int w = FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+ num_written += w;
+ PendingCheckpointerStats.buffers_written += w;
+ ResetBufWriteBatch(&batch);
}
}
- /*
- * Measure progress independent of actually having to flush the buffer
- * - otherwise writing become unbalanced.
- */
+ /* Progress accounting */
ts_stat->progress += ts_stat->progress_slice;
ts_stat->num_scanned++;
ts_stat->index++;
- /* Have all the buffers from the tablespace been processed? */
if (ts_stat->num_scanned == ts_stat->num_to_scan)
- {
binaryheap_remove_first(ts_heap);
- }
else
- {
- /* update heap with the new progress */
binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
- }
- /*
- * Sleep to throttle our I/O rate.
- *
- * (This will check for barrier events even if it doesn't sleep.)
- */
CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
}
- /*
- * Issue all pending flushes. Only checkpointer calls BufferSync(), so
- * IOContext will always be IOCONTEXT_NORMAL.
- */
+ /* Flush trailing batch */
+ if (batch.n > 0)
+ {
+ int w = FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+ num_written += w;
+ PendingCheckpointerStats.buffers_written += w;
+ ResetBufWriteBatch(&batch);
+ }
+
+ /* Complete writeback */
IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL);
pfree(per_ts_stat);
- per_ts_stat = NULL;
binaryheap_free(ts_heap);
- /*
- * Update checkpoint statistics. As noted above, this doesn't include
- * buffers written by other backends or bgwriter scan.
- */
CheckpointStats.ckpt_bufs_written += num_written;
TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);
@@ -3786,6 +3831,7 @@ BgBufferSync(WritebackContext *wb_context)
* a true average we want a fast-attack, slow-decline behavior: we
* immediately follow any increase.
*/
+
if (smoothed_alloc <= (float) recent_alloc)
smoothed_alloc = recent_alloc;
else
@@ -3841,8 +3887,7 @@ BgBufferSync(WritebackContext *wb_context)
/* Execute the LRU scan */
while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
{
- int sync_state = SyncOneBuffer(next_to_clean, true,
- wb_context);
+ int sync_state = SyncOneBuffer(next_to_clean, true, wb_context);
if (++next_to_clean >= NBuffers)
{
@@ -3902,83 +3947,6 @@ BgBufferSync(WritebackContext *wb_context)
return (bufs_to_lap == 0 && recent_alloc == 0);
}
-/*
- * SyncOneBuffer -- process a single buffer during syncing.
- *
- * If skip_recently_used is true, we don't write currently-pinned buffers, nor
- * buffers marked recently used, as these are not replacement candidates.
- *
- * Returns a bitmask containing the following flag bits:
- * BUF_WRITTEN: we wrote the buffer.
- * BUF_REUSABLE: buffer is available for replacement, ie, it has
- * pin count 0 and usage count 0.
- *
- * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean
- * after locking it, but we don't care all that much.)
- */
-static int
-SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
-{
- BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
- int result = 0;
- uint32 buf_state;
- BufferTag tag;
-
- /* Make sure we can handle the pin */
- ReservePrivateRefCountEntry();
- ResourceOwnerEnlarge(CurrentResourceOwner);
-
- /*
- * Check whether buffer needs writing.
- *
- * We can make this check without taking the buffer content lock so long
- * as we mark pages dirty in access methods *before* logging changes with
- * XLogInsert(): if someone marks the buffer dirty just after our check we
- * don't worry because our checkpoint.redo points before log record for
- * upcoming changes and so we are not required to write such dirty buffer.
- */
- buf_state = LockBufHdr(bufHdr);
-
- if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
- BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
- {
- result |= BUF_REUSABLE;
- }
- else if (skip_recently_used)
- {
- /* Caller told us not to write recently-used buffers */
- UnlockBufHdr(bufHdr);
- return result;
- }
-
- if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
- {
- /* It's clean, so nothing to do */
- UnlockBufHdr(bufHdr);
- return result;
- }
-
- /*
- * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the
- * buffer is clean by the time we've locked it.)
- */
- PinBuffer_Locked(bufHdr);
-
- FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
-
- tag = bufHdr->tag;
-
- UnpinBuffer(bufHdr);
-
- /*
- * SyncOneBuffer() is only called by checkpointer and bgwriter, so
- * IOContext will always be IOCONTEXT_NORMAL.
- */
- ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
-
- return result | BUF_WRITTEN;
-}
-
/*
* AtEOXact_Buffers - clean up at end of transaction.
*
@@ -4412,6 +4380,169 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
error_context_stack = errcallback.previous;
}
+/*
+ * BufferNeedsWALFlush
+ *
+ * Returns true if this buffer belongs to a permanent relation AND
+ * page LSN indicates - WAL must be flushed before writing it.
+ *
+ * If no flush is needed, *lsn is set to InvalidXLogRecPtr.
+ */
+static bool
+BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn)
+{
+ uint32 buf_state;
+
+ /* Lock header to read BM_PERMANENT and LSN safely */
+ buf_state = LockBufHdr(bufdesc);
+
+ /* Extract LSN only if it is a permanent buffer */
+ if (buf_state & BM_PERMANENT)
+ *lsn = BufferGetLSN(bufdesc);
+ else
+ *lsn = InvalidXLogRecPtr;
+
+ UnlockBufHdr(bufdesc);
+
+ /* Skip all WAL flush logic if relation is not logged */
+ if (!(*lsn != InvalidXLogRecPtr))
+ return false;
+
+ /* Must flush WAL up to this LSN before writing the page */
+ return XLogNeedsFlush(*lsn);
+}
+
+/*
+ * CleanVictimBuffer
+ *
+ * Called by checkpointer/bgwriter when selecting a buffer to flush in a
+ * write-combining batch. The buffer must already be pinned.
+ *
+ */
+static void
+CleanVictimBuffer(BufferDesc *bufdesc, bool from_ring, IOContext io_context)
+{
+ XLogRecPtr max_lsn = InvalidXLogRecPtr;
+ LWLock *content_lock;
+
+ /*
+ * Acquire share-lock on buffer contents. We must hold this until either:
+ * - we decide the buffer does not need flushing, OR
+ * - we finish preparing it for a write.
+ */
+ content_lock = BufHdrGetContentLock(bufdesc);
+ LWLockAcquire(content_lock, LW_SHARED);
+
+ /*
+ * Now the buffer is a valid flush target.
+ * Switch to exclusive lock for checksum + IO preparation.
+ */
+ LWLockRelease(content_lock);
+ LWLockAcquire(content_lock, LW_EXCLUSIVE);
+
+ /*
+ * Mark the buffer ready for checksum and write.
+ */
+ PrepareBufferForCheckpoint(bufdesc, &max_lsn);
+
+ /* Release exclusive lock; the batch will write the page later */
+ LWLockRelease(content_lock);
+
+ /*
+ * Add LSN to caller's batch tracking.
+ * Caller handles XLogFlush() using highest LSN.
+ */
+ PrepareBufferForCheckpoint(bufdesc, max_lsn);
+}
+
+/*
+ * FlushBufferBatch
+ *
+ * Write a batch of contiguous dirty buffers belonging to the same
+ * relation + fork, using a single smgrwritev() call.
+ *
+ * This is the minimal correct version:
+ * - prepare buffers (caller has already selected them)
+ * - flush WAL up to max_lsn
+ * - compute batch checksums
+ * - write using smgrwritev()
+ * - schedule writeback
+ */
+int
+FlushBufferBatch(BufWriteBatch *batch, IOContext io_context)
+{
+ uint32 n = batch->n;
+ BlockNumber blknums[MAX_IO_COMBINE_LIMIT];
+ Page pages[MAX_IO_COMBINE_LIMIT];
+ instr_time io_start;
+ int written = 0;
+
+ /* Nothing to do */
+ if (n == 0)
+ return 0;
+
+ /* Error context for cleaner messages on I/O failure */
+ ErrorContextCallback errcallback;
+ errcallback.callback = shared_buffer_write_error_callback;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* Flush WAL if required */
+ if (XLogRecPtrIsValid(batch->max_lsn))
+ XLogFlush(batch->max_lsn);
+
+ /* Open smgr relation if needed */
+ if (batch->reln == NULL)
+ batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER);
+
+ /*
+ * Copy buffer pages into local array and prepare block numbers.
+ * (Later patches may eliminate this copying.)
+ */
+ for (uint32 i = 0; i < n; i++)
+ {
+ BufferDesc *buf = batch->bufdescs[i];
+ blknums[i] = batch->start + i;
+ pages[i] = (Page) BufHdrGetBlock(buf);
+ }
+
+ /* Compute checksums over the batch */
+ PageSetBatchChecksumInplace(pages, blknums, n);
+
+ /* IO timing */
+ io_start = pgstat_prepare_io_time(track_io_timing);
+
+ /* Write all pages in one system call */
+ smgrwritev(batch->reln,
+ batch->forkno,
+ batch->start,
+ (const void **) pages,
+ n,
+ false);
+
+ pgstat_count_io_op_time(IOOBJECT_RELATION,
+ io_context,
+ IOOP_WRITE,
+ io_start,
+ n,
+ BLCKSZ);
+
+ /* Schedule writeback for each buffer */
+ for (uint32 i = 0; i < n; i++)
+ {
+ BufferDesc *buf = batch->bufdescs[i];
+ ScheduleBufferTagForWriteback(&batch->wb_context,
+ io_context,
+ &buf->tag);
+ written++;
+ }
+
+ /* restore error callback */
+ error_context_stack = errcallback.previous;
+
+ return written;
+}
+
/*
* Convenience wrapper around FlushBuffer() that locks/unlocks the buffer
* before/after calling FlushBuffer().
@@ -5083,12 +5214,13 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
}
else
{
- RelFileLocator rlocator;
+ RelFileLocator rlocator = BufTagGetRelFileLocator(&bufHdr->tag);
- rlocator = BufTagGetRelFileLocator(&bufHdr->tag);
- srelent = bsearch(&rlocator,
- srels, nrels, sizeof(SMgrSortArray),
- rlocator_comparator);
+ srelent = bsearch(&rlocator,
+ srels,
+ nrels,
+ sizeof(SMgrSortArray),
+ rlocator_comparator);
}
/* buffer doesn't belong to any of the given relfilelocators; skip it */
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index aac6e695954..408a4abc444 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1546,3 +1546,22 @@ PageSetChecksumInplace(Page page, BlockNumber blkno)
((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno);
}
+
+void
+PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, uint32 length)
+{
+ /* If we don't need a checksum, just return */
+ if (!DataChecksumsEnabled())
+ return;
+
+ for (uint32 i = 0; i < length; i++)
+ {
+ Page page = pages[i];
+
+ if (PageIsNew(page))
+ continue;
+
+ ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]);
+ }
+}
+
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 5400c56a965..6a75233c1e6 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -482,6 +482,28 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
{
ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc);
}
+/*
+* Used to write out multiple blocks at a time in a combined IO. bufdescs
+* contains buffer descriptors for buffers containing adjacent blocks of the
+* same fork of the same relation.
+*/
+typedef struct BufWriteBatch
+{
+ RelFileLocator rlocator;
+ ForkNumber forkno;
+ SMgrRelation reln;
+
+ BlockNumber start;
+
+ XLogRecPtr max_lsn;
+
+ WritebackContext wb_context;
+
+ uint32 n;
+ BufferDesc *bufdescs[MAX_IO_COMBINE_LIMIT];
+} BufWriteBatch;
+
+
/*
* Internal buffer management routines
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 9f6785910e0..f9e23838e48 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -223,7 +223,9 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
ForkNumber forkNum, BlockNumber blockNum,
ReadBufferMode mode, BufferAccessStrategy strategy,
bool permanent);
-
+extern void PageSetBatchChecksumInplace(Page *pages,
+ const BlockNumber *blknos,
+ uint32 length);
extern bool StartReadBuffer(ReadBuffersOperation *operation,
Buffer *buffer,
BlockNumber blocknum,
--
2.34.1