0001-Introduce-read_stream_-pause-resume-yield.patch
text/x-patch
Filename: 0001-Introduce-read_stream_-pause-resume-yield.patch
Type: text/x-patch
Part: 0
From ff6da44df5418d73ad9ec1911c87b66897f3b086 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 15 Jun 2024 14:37:26 +1200
Subject: [PATCH 1/2] Introduce read_stream_{pause,resume,yield}().
---
src/backend/storage/aio/read_stream.c | 50 ++++++++++++++++++++++++++-
src/include/storage/read_stream.h | 3 ++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 031fde9f4cb..964e1aa281c 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -100,11 +100,13 @@ struct ReadStream
int16 pinned_buffers;
int16 distance;
int16 initialized_buffers;
+ int16 resume_distance;
int read_buffers_flags;
bool sync_mode; /* using io_method=sync */
bool batch_mode; /* READ_STREAM_USE_BATCHING */
bool advice_enabled;
bool temporary;
+ bool yielded;
/*
* One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -879,7 +881,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
/* End of stream reached? */
if (stream->distance == 0)
- return InvalidBuffer;
+ {
+ if (!stream->yielded)
+ return InvalidBuffer;
+
+ /* The callback yielded. Resume. */
+ stream->yielded = false;
+ read_stream_resume(stream);
+ Assert(stream->distance != 0);
+ }
/*
* The usual order of operations is that we look ahead at the bottom
@@ -1034,6 +1044,44 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
return read_stream_get_block(stream, NULL);
}
+/*
+ * Temporarily stop consuming block numbers from the block number callback. If
+ * called inside the block number callback, its return value should be
+ * returned by the callback.
+ */
+BlockNumber
+read_stream_pause(ReadStream *stream)
+{
+ stream->resume_distance = stream->distance;
+ stream->distance = 0;
+ return InvalidBlockNumber;
+}
+
+/*
+ * Resume looking ahead after the block number callback reported end-of-stream.
+ * This is useful for streams of self-referential blocks, after a buffer needed
+ * to be consumed and examined to find more block numbers.
+ */
+void
+read_stream_resume(ReadStream *stream)
+{
+ stream->distance = stream->resume_distance;
+}
+
+/*
+ * Called from inside a block number callback, to return control to the caller
+ * of read_stream_next_buffer() without looking further ahead. Its return
+ * value should be returned by the callback. This is equivalent to pausing and
+ * resuming automatically at the next call to read_stream_next_buffer().
+ */
+BlockNumber
+read_stream_yield(ReadStream *stream)
+{
+ read_stream_pause(stream);
+ stream->yielded = true;
+ return InvalidBlockNumber;
+}
+
/*
* Reset a read stream by releasing any queued up buffers, allowing the stream
* to be used again for different blocks. This can be used to clear an
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..8ac53d2902d 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -99,6 +99,9 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
ReadStreamBlockNumberCB callback,
void *callback_private_data,
size_t per_buffer_data_size);
+extern BlockNumber read_stream_pause(ReadStream *stream);
+extern void read_stream_resume(ReadStream *stream);
+extern BlockNumber read_stream_yield(ReadStream *stream);
extern void read_stream_reset(ReadStream *stream);
extern void read_stream_end(ReadStream *stream);
--
2.51.2