v6-0007-Implement-checkpointer-data-write-combining.patch
text/x-patch
Filename: v6-0007-Implement-checkpointer-data-write-combining.patch
Type: text/x-patch
Part: 6
Message:
Re: Checkpointer write combining
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch v6-0007
Subject: Implement checkpointer data write combining
| File | + | − |
|---|---|---|
| src/backend/storage/buffer/bufmgr.c | 210 | 26 |
| src/backend/utils/probes.d | 1 | 1 |
From 898db59ca9a02f8eb5481caa66dca6cd02f30082 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 2 Sep 2025 15:42:29 -0400
Subject: [PATCH v6 7/9] Implement checkpointer data write combining
When the checkpointer writes out dirty buffers, writing multiple
contiguous blocks as a single IO is a substantial performance
improvement. The checkpointer is usually bottlenecked on IO, so issuing
larger IOs leads to increased write throughput and faster checkpoints.
Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com
---
src/backend/storage/buffer/bufmgr.c | 236 +++++++++++++++++++++++++---
src/backend/utils/probes.d | 2 +-
2 files changed, 211 insertions(+), 27 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 2ea91e777e2..2475b1c85be 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -512,6 +512,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf);
static void UnpinBufferNoOwner(BufferDesc *buf);
+static uint32 CheckpointerMaxBatchSize(void);
static void BufferSync(int flags);
static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
static int SyncOneBuffer(int buf_id, bool skip_recently_used,
@@ -3319,7 +3320,6 @@ UnpinBufferNoOwner(BufferDesc *buf)
static void
BufferSync(int flags)
{
- uint32 buf_state;
int buf_id;
int num_to_scan;
int num_spaces;
@@ -3331,6 +3331,8 @@ BufferSync(int flags)
int i;
int mask = BM_DIRTY;
WritebackContext wb_context;
+ uint32 max_batch_size;
+ BufWriteBatch batch;
/*
* Unless this is a shutdown checkpoint or we have been explicitly told,
@@ -3361,6 +3363,7 @@ BufferSync(int flags)
for (buf_id = 0; buf_id < NBuffers; buf_id++)
{
BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+ uint32 buf_state;
/*
* Header spinlock is enough to examine BM_DIRTY, see comment in
@@ -3501,48 +3504,212 @@ BufferSync(int flags)
*/
num_processed = 0;
num_written = 0;
+ max_batch_size = CheckpointerMaxBatchSize();
while (!binaryheap_empty(ts_heap))
{
+ BlockNumber limit = max_batch_size;
BufferDesc *bufHdr = NULL;
CkptTsStatus *ts_stat = (CkptTsStatus *)
DatumGetPointer(binaryheap_first(ts_heap));
+ int ts_end = ts_stat->index - ts_stat->num_scanned + ts_stat->num_to_scan;
+ int processed = 0;
- buf_id = CkptBufferIds[ts_stat->index].buf_id;
- Assert(buf_id != -1);
+ batch.start = InvalidBlockNumber;
+ batch.max_lsn = InvalidXLogRecPtr;
+ batch.n = 0;
- bufHdr = GetBufferDescriptor(buf_id);
+ while (batch.n < limit)
+ {
+ uint32 buf_state;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ LWLock *content_lock;
+ CkptSortItem item;
- num_processed++;
+ if (ProcSignalBarrierPending)
+ ProcessProcSignalBarrier();
- /*
- * We don't need to acquire the lock here, because we're only looking
- * at a single bit. It's possible that someone else writes the buffer
- * and clears the flag right after we check, but that doesn't matter
- * since SyncOneBuffer will then do nothing. However, there is a
- * further race condition: it's conceivable that between the time we
- * examine the bit here and the time SyncOneBuffer acquires the lock,
- * someone else not only wrote the buffer but replaced it with another
- * page and dirtied it. In that improbable case, SyncOneBuffer will
- * write the buffer though we didn't need to. It doesn't seem worth
- * guarding against this, though.
- */
- if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
- {
- if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+ /* Check if we are done with this tablespace */
+ if (ts_stat->index + processed >= ts_end)
+ break;
+
+ item = CkptBufferIds[ts_stat->index + processed];
+
+ buf_id = item.buf_id;
+ Assert(buf_id != -1);
+
+ bufHdr = GetBufferDescriptor(buf_id);
+
+ /*
+ * If this is the first block of the batch, then check if we need
+ * to open a new relation. Open the relation now because we have
+ * to determine the maximum IO size based on how many blocks
+ * remain in the file.
+ */
+ if (!BlockNumberIsValid(batch.start))
+ {
+ Assert(batch.max_lsn == InvalidXLogRecPtr && batch.n == 0);
+ batch.rlocator.spcOid = item.tsId;
+ batch.rlocator.dbOid = item.dbId;
+ batch.rlocator.relNumber = item.relNumber;
+ batch.forkno = item.forkNum;
+ batch.start = item.blockNum;
+ batch.reln = smgropen(batch.rlocator, INVALID_PROC_NUMBER);
+ limit = smgrmaxcombine(batch.reln, batch.forkno, batch.start);
+ limit = Max(max_batch_size, limit);
+ limit = Min(GetAdditionalPinLimit(), limit);
+
+ /*
+ * If we aren't allowed any more pins or there are no more
+ * blocks in the relation, break out of the loop and issue the
+ * IO.
+ */
+ if (limit <= 0)
+ break;
+ }
+
+ /*
+ * Once we hit blocks from the next relation or fork of the
+ * relation, break out of the loop and issue the IO we've built up
+ * so far. It is important that we don't increment processed
+ * becasue we want to start the next IO with this item.
+ */
+ if (item.dbId != batch.rlocator.dbOid)
+ break;
+
+ if (item.relNumber != batch.rlocator.relNumber)
+ break;
+
+ if (item.forkNum != batch.forkno)
+ break;
+
+ /*
+ * It the next block is not contiguous, we can't include it in the
+ * IO we will issue. Break out of the loop and issue what we have
+ * so far. Do not count this item as processed -- otherwise we
+ * will end up skipping it.
+ */
+ if (item.blockNum != batch.start + batch.n)
+ break;
+
+ /*
+ * We don't need to acquire the lock here, because we're only
+ * looking at a single bit. It's possible that someone else writes
+ * the buffer and clears the flag right after we check, but that
+ * doesn't matter since StartBufferIO will then return false. If
+ * the buffer doesn't need checkpointing, don't include it in the
+ * batch we are building. We're done with the item, so count it as
+ * processed and break out of the loop to issue the IO we have
+ * built so far.
+ */
+ if (!(pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED))
+ {
+ processed++;
+ break;
+ }
+
+ ReservePrivateRefCountEntry();
+ ResourceOwnerEnlarge(CurrentResourceOwner);
+
+ buf_state = LockBufHdr(bufHdr);
+
+ /*
+ * If the buffer doesn't need eviction, we're done with the item,
+ * so count it as processed and break out of the loop to issue the
+ * IO so far.
+ */
+ if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
+ {
+ processed++;
+ UnlockBufHdr(bufHdr, buf_state);
+ break;
+ }
+
+ PinBuffer_Locked(bufHdr);
+
+ /*
+ * There is a race condition here: it's conceivable that between
+ * the time we examine the buffer header for BM_CHECKPOINT_NEEDED
+ * above and when we are now acquiring the lock that, someone else
+ * not only wrote the buffer but replaced it with another page and
+ * dirtied it. In that improbable case, we will write the buffer
+ * though we didn't need to. It doesn't seem worth guarding
+ * against this, though.
+ */
+ content_lock = BufferDescriptorGetContentLock(bufHdr);
+
+ /*
+ * We are willing to wait for the content lock on the first IO in
+ * the batch. However, for subsequent IOs, waiting could lead to
+ * deadlock. We have to eventually flush all eligible buffers,
+ * though. So, if we fail to acquire the lock on a subsequent
+ * buffer, we break out and issue the IO we've built up so far.
+ * Then we come back and start a new IO with that buffer as the
+ * starting buffer. As such, we must not count the item as
+ * processed if we end up failing to acquire the content lock.
+ */
+ if (batch.n == 0)
+ LWLockAcquire(content_lock, LW_SHARED);
+ else if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
{
- TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
- PendingCheckpointerStats.buffers_written++;
- num_written++;
+ UnpinBuffer(bufHdr);
+ break;
+ }
+
+ /*
+ * If the buffer doesn't need IO, count the item as processed,
+ * release the buffer, and break out of the loop to issue the IO
+ * we have built up so far.
+ */
+ if (!StartBufferIO(bufHdr, false, true))
+ {
+ processed++;
+ LWLockRelease(content_lock);
+ UnpinBuffer(bufHdr);
+ break;
}
+
+ buf_state = LockBufHdr(bufHdr);
+ lsn = BufferGetLSN(bufHdr);
+ buf_state &= ~BM_JUST_DIRTIED;
+ UnlockBufHdr(bufHdr, buf_state);
+
+ /*
+ * Keep track of the max LSN so that we can be sure to flush
+ * enough WAL before flushing data from the buffers. See comment
+ * in DoFlushBuffer() for more on why we don't consider the LSNs
+ * of unlogged relations.
+ */
+ if (buf_state & BM_PERMANENT && lsn > batch.max_lsn)
+ batch.max_lsn = lsn;
+
+ batch.bufdescs[batch.n++] = bufHdr;
+ processed++;
}
/*
* Measure progress independent of actually having to flush the buffer
* - otherwise writing become unbalanced.
*/
- ts_stat->progress += ts_stat->progress_slice;
- ts_stat->num_scanned++;
- ts_stat->index++;
+ num_processed += processed;
+ ts_stat->progress += ts_stat->progress_slice * processed;
+ ts_stat->num_scanned += processed;
+ ts_stat->index += processed;
+
+ /*
+ * If we built up an IO, issue it. There's a chance we didn't find any
+ * items referencing buffers that needed flushing this time, but we
+ * still want to check if we should update the heap if we examined and
+ * processed the items.
+ */
+ if (batch.n > 0)
+ {
+ FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+ CompleteWriteBatchIO(&batch, IOCONTEXT_NORMAL, &wb_context);
+
+ TRACE_POSTGRESQL_BUFFER_BATCH_SYNC_WRITTEN(batch.n);
+ PendingCheckpointerStats.buffers_written += batch.n;
+ num_written += batch.n;
+ }
/* Have all the buffers from the tablespace been processed? */
if (ts_stat->num_scanned == ts_stat->num_to_scan)
@@ -4268,6 +4435,23 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
DoFlushBuffer(buf, reln, io_object, io_context, lsn);
}
+/*
+ * The maximum number of blocks that can be written out in a single batch by
+ * the checkpointer.
+ */
+static uint32
+CheckpointerMaxBatchSize(void)
+{
+ uint32 result;
+ uint32 pin_limit = GetPinLimit();
+
+ result = Max(pin_limit, 1);
+ result = Min(pin_limit, io_combine_limit);
+ result = Max(result, 1);
+ Assert(result < MAX_IO_COMBINE_LIMIT);
+ return result;
+}
+
/*
* Given a buffer descriptor, start, from a strategy ring, strategy, that
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index 36dd4f8375b..d6970731ba9 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -68,7 +68,7 @@ provider postgresql {
probe buffer__checkpoint__sync__start();
probe buffer__checkpoint__done();
probe buffer__sync__start(int, int);
- probe buffer__sync__written(int);
+ probe buffer__batch__sync__written(BlockNumber);
probe buffer__sync__done(int, int, int);
probe deadlock__found();
--
2.43.0