0005-Introduce-pss_barrierReceivedGeneration-20250616.patch
text/x-patch
Filename: 0005-Introduce-pss_barrierReceivedGeneration-20250616.patch
Type: text/x-patch
Part: 3
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0005
Subject: Introduce pss_barrierReceivedGeneration
| File | + | − |
|---|---|---|
| src/backend/storage/ipc/procsignal.c | 53 | 14 |
| src/include/storage/procsignal.h | 1 | 0 |
From fb470c019742f2e9eaa7666ab81a24f816066387 Mon Sep 17 00:00:00 2001
From: Dmitrii Dolgov <9erthalion6@gmail.com>
Date: Fri, 4 Apr 2025 21:46:14 +0200
Subject: [PATCH 05/17] Introduce pss_barrierReceivedGeneration
Currently WaitForProcSignalBarrier allows to make sure the message sent
via EmitProcSignalBarrier was processed by all ProcSignal mechanism
participants.
Add pss_barrierReceivedGeneration alongside with pss_barrierGeneration,
which will be updated when a process has received the message, but not
processed it yet. This makes it possible to support a new mode of
waiting, when ProcSignal participants want to synchronize message
processing. To do that, a participant can wait via
WaitForProcSignalBarrierReceived when processing a message, effectively
making sure that all processes are going to start processing
ProcSignalBarrier simultaneously.
---
src/backend/storage/ipc/procsignal.c | 67 ++++++++++++++++++++++------
src/include/storage/procsignal.h | 1 +
2 files changed, 54 insertions(+), 14 deletions(-)
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index a9bb540b55a..c6bec9be423 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -58,7 +58,10 @@
* of it. For such use cases, we set a bit in pss_barrierCheckMask and then
* increment the current "barrier generation"; when the new barrier generation
* (or greater) appears in the pss_barrierGeneration flag of every process,
- * we know that the message has been received everywhere.
+ * we know that the message has been received and processed everywhere. In case
+ * if we only need to know only that the message was received everywhere (e.g.
+ * receiving processes need to handle the message in a coordinated fashion)
+ * use pss_barrierReceivedGeneration in the same way.
*/
typedef struct
{
@@ -70,6 +73,7 @@ typedef struct
/* Barrier-related fields (not protected by pss_mutex) */
pg_atomic_uint64 pss_barrierGeneration;
+ pg_atomic_uint64 pss_barrierReceivedGeneration;
pg_atomic_uint32 pss_barrierCheckMask;
ConditionVariable pss_barrierCV;
} ProcSignalSlot;
@@ -152,6 +156,8 @@ ProcSignalShmemInit(void)
slot->pss_cancel_key_len = 0;
MemSet(slot->pss_signalFlags, 0, sizeof(slot->pss_signalFlags));
pg_atomic_init_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX);
+ pg_atomic_init_u64(&slot->pss_barrierReceivedGeneration,
+ PG_UINT64_MAX);
pg_atomic_init_u32(&slot->pss_barrierCheckMask, 0);
ConditionVariableInit(&slot->pss_barrierCV);
}
@@ -199,6 +205,8 @@ ProcSignalInit(const uint8 *cancel_key, int cancel_key_len)
barrier_generation =
pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration);
pg_atomic_write_u64(&slot->pss_barrierGeneration, barrier_generation);
+ pg_atomic_write_u64(&slot->pss_barrierReceivedGeneration,
+ barrier_generation);
if (cancel_key_len > 0)
memcpy(slot->pss_cancel_key, cancel_key, cancel_key_len);
@@ -263,6 +271,7 @@ CleanupProcSignalState(int status, Datum arg)
* no barrier waits block on it.
*/
pg_atomic_write_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX);
+ pg_atomic_write_u64(&slot->pss_barrierReceivedGeneration, PG_UINT64_MAX);
SpinLockRelease(&slot->pss_mutex);
@@ -416,12 +425,8 @@ EmitProcSignalBarrier(ProcSignalBarrierType type)
return generation;
}
-/*
- * WaitForProcSignalBarrier - wait until it is guaranteed that all changes
- * requested by a specific call to EmitProcSignalBarrier() have taken effect.
- */
-void
-WaitForProcSignalBarrier(uint64 generation)
+static void
+WaitForProcSignalBarrierInternal(uint64 generation, bool receivedOnly)
{
Assert(generation <= pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration));
@@ -436,12 +441,17 @@ WaitForProcSignalBarrier(uint64 generation)
uint64 oldval;
/*
- * It's important that we check only pss_barrierGeneration here and
- * not pss_barrierCheckMask. Bits in pss_barrierCheckMask get cleared
- * before the barrier is actually absorbed, but pss_barrierGeneration
+ * It's important that we check only pss_barrierGeneration &
+ * pss_barrierGeneration here and not pss_barrierCheckMask. Bits in
+ * pss_barrierCheckMask get cleared before the barrier is actually
+ * absorbed, but pss_barrierGeneration & pss_barrierReceivedGeneration
* is updated only afterward.
*/
- oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
+ if (receivedOnly)
+ oldval = pg_atomic_read_u64(&slot->pss_barrierReceivedGeneration);
+ else
+ oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
+
while (oldval < generation)
{
if (ConditionVariableTimedSleep(&slot->pss_barrierCV,
@@ -450,7 +460,11 @@ WaitForProcSignalBarrier(uint64 generation)
ereport(LOG,
(errmsg("still waiting for backend with PID %d to accept ProcSignalBarrier",
(int) pg_atomic_read_u32(&slot->pss_pid))));
- oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
+
+ if (receivedOnly)
+ oldval = pg_atomic_read_u64(&slot->pss_barrierReceivedGeneration);
+ else
+ oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
}
ConditionVariableCancelSleep();
}
@@ -464,12 +478,33 @@ WaitForProcSignalBarrier(uint64 generation)
* The caller is probably calling this function because it wants to read
* the shared state or perform further writes to shared state once all
* backends are known to have absorbed the barrier. However, the read of
- * pss_barrierGeneration was performed unlocked; insert a memory barrier
- * to separate it from whatever follows.
+ * pss_barrierGeneration & pss_barrierReceivedGeneration was performed
+ * unlocked; insert a memory barrier to separate it from whatever follows.
*/
pg_memory_barrier();
}
+/*
+ * WaitForProcSignalBarrier - wait until it is guaranteed that all changes
+ * requested by a specific call to EmitProcSignalBarrier() have taken effect.
+ */
+void
+WaitForProcSignalBarrier(uint64 generation)
+{
+ WaitForProcSignalBarrierInternal(generation, false);
+}
+
+/*
+ * WaitForProcSignalBarrierReceived - wait until it is guaranteed that all
+ * backends have observed the message sent by a specific call to
+ * EmitProcSignalBarrier().
+ */
+void
+WaitForProcSignalBarrierReceived(uint64 generation)
+{
+ WaitForProcSignalBarrierInternal(generation, true);
+}
+
/*
* Handle receipt of an interrupt indicating a global barrier event.
*
@@ -523,6 +558,10 @@ ProcessProcSignalBarrier(void)
if (local_gen == shared_gen)
return;
+ /* The message is observed, record that */
+ pg_atomic_write_u64(&MyProcSignalSlot->pss_barrierReceivedGeneration,
+ shared_gen);
+
/*
* Get and clear the flags that are set for this backend. Note that
* pg_atomic_exchange_u32 is a full barrier, so we're guaranteed that the
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index afeeb1ca019..2733bbb8c5b 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -79,6 +79,7 @@ extern void SendCancelRequest(int backendPID, const uint8 *cancel_key, int cance
extern uint64 EmitProcSignalBarrier(ProcSignalBarrierType type);
extern void WaitForProcSignalBarrier(uint64 generation);
+extern void WaitForProcSignalBarrierReceived(uint64 generation);
extern void ProcessProcSignalBarrier(void);
extern void procsignal_sigusr1_handler(SIGNAL_ARGS);
--
2.34.1