v10-0004-Write-combining-for-BAS_BULKWRITE.patch

text/x-patch

Filename: v10-0004-Write-combining-for-BAS_BULKWRITE.patch
Type: text/x-patch
Part: 3
Message: Re: Checkpointer write combining

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch v10-0004
Subject: Write combining for BAS_BULKWRITE
File+
src/backend/storage/buffer/bufmgr.c 204 12
src/backend/storage/buffer/freelist.c 23 0
src/backend/storage/page/bufpage.c 20 0
src/backend/utils/probes.d 2 0
src/include/storage/buf_internals.h 32 0
src/include/storage/bufpage.h 2 0
src/tools/pgindent/typedefs.list 1 0
From eb89dfc1488a0410da06cbc5d387c41623b7bc1b Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 15 Oct 2025 13:42:47 -0400
Subject: [PATCH v10 4/7] Write combining for BAS_BULKWRITE

Implement write combining for users of the bulkwrite buffer access
strategy (e.g. COPY FROM). When the buffer access strategy needs to
clean a buffer for reuse, it already opportunistically flushes some
other buffers. Now, combine any contiguous blocks from the same relation
into larger writes and issue them with smgrwritev().

The performance benefit for COPY FROM is mostly noticeable for multiple
concurrent COPY FROMs because a single COPY FROM is either CPU bound or
bound by WAL writes.

The infrastructure for flushing larger batches of IOs will be reused by
checkpointer and other processes doing writes of dirty data.

XXX: Because this sets in-place checksums for batches, it is not
committable until additional infrastructure goes in place.

Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/flat/CAAKRu_bcWRvRwZUop_d9vzF9nHAiT%2B-uPzkJ%3DS3ShZ1GqeAYOw%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c   | 216 ++++++++++++++++++++++++--
 src/backend/storage/buffer/freelist.c |  23 +++
 src/backend/storage/page/bufpage.c    |  20 +++
 src/backend/utils/probes.d            |   2 +
 src/include/storage/buf_internals.h   |  32 ++++
 src/include/storage/bufpage.h         |   2 +
 src/tools/pgindent/typedefs.list      |   1 +
 7 files changed, 284 insertions(+), 12 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 57a3eae865e..8a3a5a04d91 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -537,7 +537,11 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 static BufferDesc *NextStrategyBufToFlush(BufferAccessStrategy strategy,
 										  Buffer sweep_end,
 										  XLogRecPtr *lsn, int *sweep_cursor);
-
+static void FindFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end,
+							   BufferDesc *batch_start,
+							   uint32 max_batch_size,
+							   BufWriteBatch *batch,
+							   int *sweep_cursor);
 static bool BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn);
 static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
 												   RelFileLocator *rlocator, bool skip_pinned,
@@ -4310,10 +4314,90 @@ BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn)
 }
 
 
+
+/*
+ * Given a starting buffer descriptor from a strategy ring that supports eager
+ * flushing, find additional buffers from the ring that can be combined into a
+ * single write batch with the starting buffer.
+ *
+ * max_batch_size is the maximum number of blocks that can be combined into a
+ * single write in general. This function, based on the block number of start,
+ * will determine the maximum IO size for this particular write given how much
+ * of the file remains. max_batch_size is provided by the caller so it doesn't
+ * have to be recalculated for each write.
+ *
+ * batch is an output parameter that this function will fill with the needed
+ * information to issue this IO.
+ *
+ * This function will pin and content lock all of the buffers that it
+ * assembles for the IO batch. The caller is responsible for issuing the IO.
+ */
+static void
+FindFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end,
+				   BufferDesc *batch_start,
+				   uint32 max_batch_size,
+				   BufWriteBatch *batch,
+				   int *sweep_cursor)
+{
+	BlockNumber limit;
+
+	Assert(batch_start);
+	batch->bufdescs[0] = batch_start;
+
+	LockBufHdr(batch_start);
+	batch->max_lsn = BufferGetLSN(batch_start);
+	UnlockBufHdr(batch_start);
+
+	batch->start = batch->bufdescs[0]->tag.blockNum;
+	Assert(BlockNumberIsValid(batch->start));
+	batch->n = 1;
+	batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag);
+	batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag);
+	batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER);
+
+	limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start);
+	limit = Min(max_batch_size, limit);
+	limit = Min(GetAdditionalPinLimit(), limit);
+
+	/*
+	 * It's possible we're not allowed any more pins or there aren't more
+	 * blocks in the target relation. In this case, just return. Our batch
+	 * will have only one buffer.
+	 */
+	if (limit <= 0)
+		return;
+
+	/* Now assemble a run of blocks to write out. */
+	for (; batch->n < limit; batch->n++)
+	{
+		Buffer		bufnum;
+
+		if ((bufnum =
+			 StrategyNextBuffer(strategy, sweep_cursor)) == sweep_end)
+			break;
+
+		/*
+		 * For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining
+		 * buffers in the ring will be invalid.
+		 */
+		if (!BufferIsValid(bufnum))
+			break;
+
+		/* Stop when we encounter a buffer that will break the run */
+		if ((batch->bufdescs[batch->n] =
+			 PrepareOrRejectEagerFlushBuffer(bufnum,
+											 batch->start + batch->n,
+											 &batch->rlocator,
+											 true,
+											 &batch->max_lsn)) == NULL)
+			break;
+	}
+}
+
 /*
  * Returns the buffer descriptor of the buffer containing the next block we
  * should eagerly flush or NULL when there are no further buffers to consider
- * writing out.
+ * writing out. This will be the start of a new batch of buffers to write out.
  */
 static BufferDesc *
 NextStrategyBufToFlush(BufferAccessStrategy strategy,
@@ -4360,7 +4444,6 @@ CleanVictimBuffer(BufferAccessStrategy strategy,
 {
 	XLogRecPtr	max_lsn = InvalidXLogRecPtr;
 	LWLock	   *content_lock;
-	bool		first_buffer = true;
 
 	/* Set up this victim buffer to be flushed */
 	if (!PrepareFlushBuffer(bufdesc, &max_lsn))
@@ -4370,19 +4453,22 @@ CleanVictimBuffer(BufferAccessStrategy strategy,
 	{
 		Buffer		sweep_end = BufferDescriptorGetBuffer(bufdesc);
 		int			cursor = StrategyGetCurrentIndex(strategy);
+		uint32		max_batch_size = StrategyMaxWriteBatchSize(strategy);
+
+		/* Pin our victim again so it stays ours even after batch released */
+		ReservePrivateRefCountEntry();
+		ResourceOwnerEnlarge(CurrentResourceOwner);
+		IncrBufferRefCount(BufferDescriptorGetBuffer(bufdesc));
 
 		/* Clean victim buffer and find more to flush opportunistically */
 		do
 		{
-			DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
-			content_lock = BufferDescriptorGetContentLock(bufdesc);
-			LWLockRelease(content_lock);
-			ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
-										  &bufdesc->tag);
-			/* We leave the first buffer pinned for the caller */
-			if (!first_buffer)
-				UnpinBuffer(bufdesc);
-			first_buffer = false;
+			BufWriteBatch batch;
+
+			FindFlushAdjacents(strategy, sweep_end, bufdesc, max_batch_size,
+							   &batch, &cursor);
+			FlushBufferBatch(&batch, io_context);
+			CompleteWriteBatchIO(&batch, io_context, &BackendWritebackContext);
 		} while ((bufdesc = NextStrategyBufToFlush(strategy, sweep_end,
 												   &max_lsn, &cursor)) != NULL);
 	}
@@ -4526,6 +4612,70 @@ except_unpin_buffer:
 	return NULL;
 }
 
+/*
+ * Given a prepared batch of buffers write them out as a vector.
+ */
+void
+FlushBufferBatch(BufWriteBatch *batch,
+				 IOContext io_context)
+{
+	BlockNumber blknums[MAX_IO_COMBINE_LIMIT];
+	Block		blocks[MAX_IO_COMBINE_LIMIT];
+	instr_time	io_start;
+	ErrorContextCallback errcallback =
+	{
+		.callback = shared_buffer_write_error_callback,
+		.previous = error_context_stack,
+	};
+
+	error_context_stack = &errcallback;
+
+	if (XLogRecPtrIsValid(batch->max_lsn))
+		XLogFlush(batch->max_lsn);
+
+	if (batch->reln == NULL)
+		batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER);
+
+#ifdef USE_ASSERT_CHECKING
+	for (uint32 i = 0; i < batch->n; i++)
+	{
+		XLogRecPtr	lsn;
+
+		Assert(!BufferNeedsWALFlush(batch->bufdescs[i], &lsn));
+	}
+#endif
+
+	TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_START(batch->forkno,
+											  batch->reln->smgr_rlocator.locator.spcOid,
+											  batch->reln->smgr_rlocator.locator.dbOid,
+											  batch->reln->smgr_rlocator.locator.relNumber,
+											  batch->reln->smgr_rlocator.backend,
+											  batch->n);
+
+	/*
+	 * XXX: All blocks should be copied and then checksummed but doing so
+	 * takes a lot of extra memory and a future patch will eliminate this
+	 * requirement.
+	 */
+	for (BlockNumber i = 0; i < batch->n; i++)
+	{
+		blknums[i] = batch->start + i;
+		blocks[i] = BufHdrGetBlock(batch->bufdescs[i]);
+	}
+
+	PageSetBatchChecksumInplace((Page *) blocks, blknums, batch->n);
+
+	io_start = pgstat_prepare_io_time(track_io_timing);
+
+	smgrwritev(batch->reln, batch->forkno,
+			   batch->start, (const void **) blocks, batch->n, false);
+
+	pgstat_count_io_op_time(IOOBJECT_RELATION, io_context, IOOP_WRITE,
+							io_start, batch->n, BLCKSZ);
+
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Prepare the buffer with bufdesc for writing. Returns true if the buffer
  * acutally needs writing and false otherwise. lsn returns the buffer's LSN if
@@ -4687,6 +4837,48 @@ FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 	LWLockRelease(BufferDescriptorGetContentLock(buf));
 }
 
+/*
+ * Given a previously initialized batch with buffers that have already been
+ * flushed, terminate the IO on each buffer and then unlock and unpin them.
+ * This assumes all the buffers were locked and pinned. wb_context will be
+ * modified.
+ */
+void
+CompleteWriteBatchIO(BufWriteBatch *batch, IOContext io_context,
+					 WritebackContext *wb_context)
+{
+	ErrorContextCallback errcallback =
+	{
+		.callback = shared_buffer_write_error_callback,
+		.previous = error_context_stack,
+	};
+
+	error_context_stack = &errcallback;
+	pgBufferUsage.shared_blks_written += batch->n;
+
+	for (uint32 i = 0; i < batch->n; i++)
+	{
+		Buffer		buffer = BufferDescriptorGetBuffer(batch->bufdescs[i]);
+
+		errcallback.arg = batch->bufdescs[i];
+
+		/* Mark the buffer as clean and end the BM_IO_IN_PROGRESS state. */
+		TerminateBufferIO(batch->bufdescs[i], true, 0, true, false);
+		LWLockRelease(BufferDescriptorGetContentLock(batch->bufdescs[i]));
+		ReleaseBuffer(buffer);
+		ScheduleBufferTagForWriteback(wb_context, io_context,
+									  &batch->bufdescs[i]->tag);
+	}
+
+	TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_DONE(batch->forkno,
+											 batch->reln->smgr_rlocator.locator.spcOid,
+											 batch->reln->smgr_rlocator.locator.dbOid,
+											 batch->reln->smgr_rlocator.locator.relNumber,
+											 batch->reln->smgr_rlocator.backend,
+											 batch->n, batch->start);
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * RelationGetNumberOfBlocksInFork
  *		Determines the current number of pages in the specified relation fork.
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 4a3009d190c..189274fc0c0 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -776,6 +776,29 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
 	return NULL;
 }
 
+
+/*
+ * Determine the largest IO we can assemble from the given strategy ring given
+ * strategy-specific as well as global constraints on the number of pinned
+ * buffers and max IO size.
+ */
+uint32
+StrategyMaxWriteBatchSize(BufferAccessStrategy strategy)
+{
+	uint32		max_write_batch_size = Min(io_combine_limit, MAX_IO_COMBINE_LIMIT);
+	int			strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
+	uint32		max_possible_buffer_limit = GetPinLimit();
+
+	/* Identify the minimum of the above */
+	max_write_batch_size = Min(strategy_pin_limit, max_write_batch_size);
+	max_write_batch_size = Min(max_possible_buffer_limit, max_write_batch_size);
+
+	/* Must allow at least 1 IO for forward progress */
+	max_write_batch_size = Max(1, max_write_batch_size);
+
+	return max_write_batch_size;
+}
+
 /*
  * AddBufferToRing -- add a buffer to the buffer ring
  *
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index aac6e695954..7c2ec99f939 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1546,3 +1546,23 @@ PageSetChecksumInplace(Page page, BlockNumber blkno)
 
 	((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno);
 }
+
+/*
+ * A helper to set multiple block's checksums
+ */
+void
+PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, uint32 length)
+{
+	/* If we don't need a checksum, just return */
+	if (!DataChecksumsEnabled())
+		return;
+
+	for (uint32 i = 0; i < length; i++)
+	{
+		Page		page = pages[i];
+
+		if (PageIsNew(page))
+			continue;
+		((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]);
+	}
+}
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index e9e413477ba..36dd4f8375b 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -61,6 +61,8 @@ provider postgresql {
 	probe buffer__flush__done(ForkNumber, BlockNumber, Oid, Oid, Oid);
 	probe buffer__extend__start(ForkNumber, Oid, Oid, Oid, int, unsigned int);
 	probe buffer__extend__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber);
+	probe buffer__batch__flush__start(ForkNumber, Oid, Oid, Oid, int, unsigned int);
+	probe buffer__batch__flush__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber);
 
 	probe buffer__checkpoint__start(int);
 	probe buffer__checkpoint__sync__start();
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index c07e309a288..ab502c4f825 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -483,6 +483,34 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
 	ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc);
 }
 
+/*
+ * Used to write out multiple blocks at a time in a combined IO. bufdescs
+ * contains buffer descriptors for buffers containing adjacent blocks of the
+ * same fork of the same relation.
+ */
+typedef struct BufWriteBatch
+{
+	RelFileLocator rlocator;
+	ForkNumber	forkno;
+	SMgrRelation reln;
+
+	/*
+	 * The BlockNumber of the first block in the run of contiguous blocks to
+	 * be written out as a single IO.
+	 */
+	BlockNumber start;
+
+	/*
+	 * While assembling the buffers, we keep track of the maximum LSN so that
+	 * we can flush WAL through this LSN before flushing the buffers.
+	 */
+	XLogRecPtr	max_lsn;
+
+	/* The number of valid buffers in bufdescs */
+	uint32		n;
+	BufferDesc *bufdescs[MAX_IO_COMBINE_LIMIT];
+} BufWriteBatch;
+
 /*
  * Internal buffer management routines
  */
@@ -496,6 +524,7 @@ extern void WritebackContextInit(WritebackContext *context, int *max_pending);
 extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context);
 extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
 										  IOContext io_context, BufferTag *tag);
+extern void FlushBufferBatch(BufWriteBatch *batch, IOContext io_context);
 
 extern void TrackNewBufferPin(Buffer buf);
 
@@ -507,9 +536,12 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag
 
 /* freelist.c */
 extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy);
+extern uint32 StrategyMaxWriteBatchSize(BufferAccessStrategy strategy);
 extern Buffer StrategyNextBuffer(BufferAccessStrategy strategy,
 								 int *cursor);
 extern int	StrategyGetCurrentIndex(BufferAccessStrategy strategy);
+extern void CompleteWriteBatchIO(BufWriteBatch *batch, IOContext io_context,
+								 WritebackContext *wb_context);
 extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
 extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
 									 uint32 *buf_state, bool *from_ring);
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index abc2cf2a020..29a400a71eb 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -506,5 +506,7 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 									const void *newtup, Size newsize);
 extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
 extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
+extern void PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos,
+										uint32 length);
 
 #endif							/* BUFPAGE_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 57f2a9ccdc5..7ad60afe702 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -350,6 +350,7 @@ BufferManagerRelation
 BufferStrategyControl
 BufferTag
 BufferUsage
+BufWriteBatch
 BuildAccumulator
 BuiltinScript
 BulkInsertState
-- 
2.43.0