From 5e9ebf1c1cafdb5805671d3c57cfeadab5e8c434 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 2 Sep 2025 15:42:29 -0400
Subject: [PATCH v5 7/9] Implement checkpointer data write combining

When the checkpointer writes out dirty buffers, writing multiple
contiguous blocks as a single IO is a substantial performance
improvement. The checkpointer is usually bottlenecked on IO, so issuing
larger IOs leads to increased write throughput and faster checkpoints.
---
 src/backend/storage/buffer/bufmgr.c | 232 ++++++++++++++++++++++++----
 src/backend/utils/probes.d          |   2 +-
 2 files changed, 207 insertions(+), 27 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 246f675333e..e7c789dffd7 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -512,6 +512,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
 static void PinBuffer_Locked(BufferDesc *buf);
 static void UnpinBuffer(BufferDesc *buf);
 static void UnpinBufferNoOwner(BufferDesc *buf);
+static uint32 checkpointer_max_batch_size(void);
 static void BufferSync(int flags);
 static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
 static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
@@ -3313,7 +3314,6 @@ UnpinBufferNoOwner(BufferDesc *buf)
 static void
 BufferSync(int flags)
 {
-	uint32		buf_state;
 	int			buf_id;
 	int			num_to_scan;
 	int			num_spaces;
@@ -3325,6 +3325,8 @@ BufferSync(int flags)
 	int			i;
 	int			mask = BM_DIRTY;
 	WritebackContext wb_context;
+	uint32		max_batch_size;
+	BufWriteBatch batch;
 
 	/*
 	 * Unless this is a shutdown checkpoint or we have been explicitly told,
@@ -3355,6 +3357,7 @@ BufferSync(int flags)
 	for (buf_id = 0; buf_id < NBuffers; buf_id++)
 	{
 		BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+		uint32		buf_state;
 
 		/*
 		 * Header spinlock is enough to examine BM_DIRTY, see comment in
@@ -3495,48 +3498,208 @@ BufferSync(int flags)
 	 */
 	num_processed = 0;
 	num_written = 0;
+	max_batch_size = checkpointer_max_batch_size();
 	while (!binaryheap_empty(ts_heap))
 	{
+		BlockNumber limit = max_batch_size;
 		BufferDesc *bufHdr = NULL;
 		CkptTsStatus *ts_stat = (CkptTsStatus *)
 			DatumGetPointer(binaryheap_first(ts_heap));
-
-		buf_id = CkptBufferIds[ts_stat->index].buf_id;
-		Assert(buf_id != -1);
-
-		bufHdr = GetBufferDescriptor(buf_id);
-
-		num_processed++;
+		int			ts_end = ts_stat->index - ts_stat->num_scanned + ts_stat->num_to_scan;
+		int			processed = 0;
 
 		/*
-		 * We don't need to acquire the lock here, because we're only looking
-		 * at a single bit. It's possible that someone else writes the buffer
-		 * and clears the flag right after we check, but that doesn't matter
-		 * since SyncOneBuffer will then do nothing.  However, there is a
-		 * further race condition: it's conceivable that between the time we
-		 * examine the bit here and the time SyncOneBuffer acquires the lock,
-		 * someone else not only wrote the buffer but replaced it with another
-		 * page and dirtied it.  In that improbable case, SyncOneBuffer will
-		 * write the buffer though we didn't need to.  It doesn't seem worth
-		 * guarding against this, though.
+		 * Each batch will have exactly one start and one max lsn and one
+		 * length.
 		 */
-		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
+		batch.start = InvalidBlockNumber;
+		batch.max_lsn = InvalidXLogRecPtr;
+		batch.n = 0;
+
+		while (batch.n < limit)
 		{
-			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+			uint32		buf_state;
+			XLogRecPtr	lsn = InvalidXLogRecPtr;
+			LWLock	   *content_lock;
+			CkptSortItem item;
+
+			if (ProcSignalBarrierPending)
+				ProcessProcSignalBarrier();
+
+			/* Check if we are done with this tablespace */
+			if (ts_stat->index + processed >= ts_end)
+				break;
+
+			item = CkptBufferIds[ts_stat->index + processed];
+
+			buf_id = item.buf_id;
+			Assert(buf_id != -1);
+
+			bufHdr = GetBufferDescriptor(buf_id);
+
+			/*
+			 * If this is the first block of the batch, then check if we need
+			 * to open a new relation. Open the relation now because we have
+			 * to determine the maximum IO size based on how many blocks
+			 * remain in the file.
+			 */
+			if (!BlockNumberIsValid(batch.start))
+			{
+				Assert(batch.max_lsn == InvalidXLogRecPtr && batch.n == 0);
+				batch.rlocator.spcOid = item.tsId;
+				batch.rlocator.dbOid = item.db_id;
+				batch.rlocator.relNumber = item.relNumber;
+				batch.forkno = item.forkNum;
+				batch.start = item.blockNum;
+				batch.reln = smgropen(batch.rlocator, INVALID_PROC_NUMBER);
+				limit = smgrmaxcombine(batch.reln, batch.forkno, batch.start);
+				limit = Max(1, limit);
+				limit = Min(limit, max_batch_size);
+			}
+
+			/*
+			 * Once we hit blocks from the next relation or fork of the
+			 * relation, break out of the loop and issue the IO we've built up
+			 * so far. It is important that we don't increment processed
+			 * becasue we want to start the next IO with this item.
+			 */
+			if (item.db_id != batch.rlocator.dbOid)
+				break;
+
+			if (item.relNumber != batch.rlocator.relNumber)
+				break;
+
+			if (item.forkNum != batch.forkno)
+				break;
+
+			/*
+			 * It the next block is not contiguous, we can't include it in the
+			 * IO we will issue. Break out of the loop and issue what we have
+			 * so far. Do not count this item as processed -- otherwise we
+			 * will end up skipping it.
+			 */
+			if (item.blockNum != batch.start + batch.n)
+				break;
+
+			/*
+			 * We don't need to acquire the lock here, because we're only
+			 * looking at a single bit. It's possible that someone else writes
+			 * the buffer and clears the flag right after we check, but that
+			 * doesn't matter since StartBufferIO will then return false. If
+			 * the buffer doesn't need checkpointing, don't include it in the
+			 * batch we are building. We're done with the item, so count it as
+			 * processed and break out of the loop to issue the IO we have
+			 * built so far.
+			 */
+			if (!(pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED))
+			{
+				processed++;
+				break;
+			}
+
+			ReservePrivateRefCountEntry();
+			ResourceOwnerEnlarge(CurrentResourceOwner);
+
+			buf_state = LockBufHdr(bufHdr);
+
+			/*
+			 * If the buffer doesn't need eviction, we're done with the item,
+			 * so count it as processed and break out of the loop to issue the
+			 * IO so far.
+			 */
+			if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
 			{
-				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
-				PendingCheckpointerStats.buffers_written++;
-				num_written++;
+				processed++;
+				UnlockBufHdr(bufHdr, buf_state);
+				break;
+			}
+
+			PinBuffer_Locked(bufHdr);
+
+			/*
+			 * There is a race condition here: it's conceivable that between
+			 * the time we examine the buffer header for BM_CHECKPOINT_NEEDED
+			 * above and when we are now acquiring the lock that, someone else
+			 * not only wrote the buffer but replaced it with another page and
+			 * dirtied it.  In that improbable case, we will write the buffer
+			 * though we didn't need to.  It doesn't seem worth guarding
+			 * against this, though.
+			 */
+			content_lock = BufferDescriptorGetContentLock(bufHdr);
+
+			/*
+			 * We are willing to wait for the content lock on the first IO in
+			 * the batch. However, for subsequent IOs, waiting could lead to
+			 * deadlock. We have to eventually flush all eligible buffers,
+			 * though. So, if we fail to acquire the lock on a subsequent
+			 * buffer, we break out and issue the IO we've built up so far.
+			 * Then we come back and start a new IO with that buffer as the
+			 * starting buffer. As such, we must not count the item as
+			 * processed if we end up failing to acquire the content lock.
+			 */
+			if (batch.n == 0)
+				LWLockAcquire(content_lock, LW_SHARED);
+			else if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+			{
+				UnpinBuffer(bufHdr);
+				break;
+			}
+
+			/*
+			 * If the buffer doesn't need IO, count the item as processed,
+			 * release the buffer, and break out of the loop to issue the IO
+			 * we have built up so far.
+			 */
+			if (!StartBufferIO(bufHdr, false, true))
+			{
+				processed++;
+				LWLockRelease(content_lock);
+				UnpinBuffer(bufHdr);
+				break;
 			}
+
+			buf_state = LockBufHdr(bufHdr);
+			lsn = BufferGetLSN(bufHdr);
+			buf_state &= ~BM_JUST_DIRTIED;
+			UnlockBufHdr(bufHdr, buf_state);
+
+			/*
+			 * Keep track of the max LSN so that we can be sure to flush
+			 * enough WAL before flushing data from the buffers. See comment
+			 * in DoFlushBuffer() for more on why we don't consider the LSNs
+			 * of unlogged relations.
+			 */
+			if (buf_state & BM_PERMANENT && lsn > batch.max_lsn)
+				batch.max_lsn = lsn;
+
+			batch.bufdescs[batch.n++] = bufHdr;
+			processed++;
 		}
 
 		/*
 		 * Measure progress independent of actually having to flush the buffer
 		 * - otherwise writing become unbalanced.
 		 */
-		ts_stat->progress += ts_stat->progress_slice;
-		ts_stat->num_scanned++;
-		ts_stat->index++;
+		num_processed += processed;
+		ts_stat->progress += ts_stat->progress_slice * processed;
+		ts_stat->num_scanned += processed;
+		ts_stat->index += processed;
+
+		/*
+		 * If we built up an IO, issue it. There's a chance we didn't find any
+		 * items referencing buffers that needed flushing this time, but we
+		 * still want to check if we should update the heap if we examined and
+		 * processed the items.
+		 */
+		if (batch.n > 0)
+		{
+			FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+			CompleteWriteBatchIO(&batch, IOCONTEXT_NORMAL, &wb_context);
+
+			TRACE_POSTGRESQL_BUFFER_BATCH_SYNC_WRITTEN(batch.n);
+			PendingCheckpointerStats.buffers_written += batch.n;
+			num_written += batch.n;
+		}
 
 		/* Have all the buffers from the tablespace been processed? */
 		if (ts_stat->num_scanned == ts_stat->num_to_scan)
@@ -4262,6 +4425,23 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 		DoFlushBuffer(buf, reln, io_object, io_context, lsn);
 }
 
+/*
+ * The maximum number of blocks that can be written out in a single batch by
+ * the checkpointer.
+ */
+static uint32
+checkpointer_max_batch_size(void)
+{
+	uint32		result;
+	uint32		pin_limit = GetPinLimit();
+
+	result = Max(pin_limit, 1);
+	result = Min(pin_limit, io_combine_limit);
+	result = Max(result, 1);
+	Assert(result < MAX_IO_COMBINE_LIMIT);
+	return result;
+}
+
 
 /*
  * Given a buffer descriptor, start, from a strategy ring, strategy, that
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index 36dd4f8375b..d6970731ba9 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -68,7 +68,7 @@ provider postgresql {
 	probe buffer__checkpoint__sync__start();
 	probe buffer__checkpoint__done();
 	probe buffer__sync__start(int, int);
-	probe buffer__sync__written(int);
+	probe buffer__batch__sync__written(BlockNumber);
 	probe buffer__sync__done(int, int, int);
 
 	probe deadlock__found();
-- 
2.43.0

