0005-Introduce-pss_barrierReceivedGeneration-20250610.patch

text/x-patch

Filename: 0005-Introduce-pss_barrierReceivedGeneration-20250610.patch
Type: text/x-patch
Part: 0
Message: Re: Changing shared_buffers without restart

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch 0005
Subject: Introduce pss_barrierReceivedGeneration
File+
src/backend/storage/ipc/procsignal.c 53 14
src/include/storage/procsignal.h 1 0
From fb470c019742f2e9eaa7666ab81a24f816066387 Mon Sep 17 00:00:00 2001
From: Dmitrii Dolgov <9erthalion6@gmail.com>
Date: Fri, 4 Apr 2025 21:46:14 +0200
Subject: [PATCH 05/17] Introduce pss_barrierReceivedGeneration

Currently WaitForProcSignalBarrier allows to make sure the message sent
via EmitProcSignalBarrier was processed by all ProcSignal mechanism
participants.

Add pss_barrierReceivedGeneration alongside with pss_barrierGeneration,
which will be updated when a process has received the message, but not
processed it yet. This makes it possible to support a new mode of
waiting, when ProcSignal participants want to synchronize message
processing. To do that, a participant can wait via
WaitForProcSignalBarrierReceived when processing a message, effectively
making sure that all processes are going to start processing
ProcSignalBarrier simultaneously.
---
 src/backend/storage/ipc/procsignal.c | 67 ++++++++++++++++++++++------
 src/include/storage/procsignal.h     |  1 +
 2 files changed, 54 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index a9bb540b55a..c6bec9be423 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -58,7 +58,10 @@
  * of it. For such use cases, we set a bit in pss_barrierCheckMask and then
  * increment the current "barrier generation"; when the new barrier generation
  * (or greater) appears in the pss_barrierGeneration flag of every process,
- * we know that the message has been received everywhere.
+ * we know that the message has been received and processed everywhere. In case
+ * if we only need to know only that the message was received everywhere (e.g.
+ * receiving processes need to handle the message in a coordinated fashion)
+ * use pss_barrierReceivedGeneration in the same way.
  */
 typedef struct
 {
@@ -70,6 +73,7 @@ typedef struct
 
 	/* Barrier-related fields (not protected by pss_mutex) */
 	pg_atomic_uint64 pss_barrierGeneration;
+	pg_atomic_uint64 pss_barrierReceivedGeneration;
 	pg_atomic_uint32 pss_barrierCheckMask;
 	ConditionVariable pss_barrierCV;
 } ProcSignalSlot;
@@ -152,6 +156,8 @@ ProcSignalShmemInit(void)
 			slot->pss_cancel_key_len = 0;
 			MemSet(slot->pss_signalFlags, 0, sizeof(slot->pss_signalFlags));
 			pg_atomic_init_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX);
+			pg_atomic_init_u64(&slot->pss_barrierReceivedGeneration,
+							   PG_UINT64_MAX);
 			pg_atomic_init_u32(&slot->pss_barrierCheckMask, 0);
 			ConditionVariableInit(&slot->pss_barrierCV);
 		}
@@ -199,6 +205,8 @@ ProcSignalInit(const uint8 *cancel_key, int cancel_key_len)
 	barrier_generation =
 		pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration);
 	pg_atomic_write_u64(&slot->pss_barrierGeneration, barrier_generation);
+	pg_atomic_write_u64(&slot->pss_barrierReceivedGeneration,
+						barrier_generation);
 
 	if (cancel_key_len > 0)
 		memcpy(slot->pss_cancel_key, cancel_key, cancel_key_len);
@@ -263,6 +271,7 @@ CleanupProcSignalState(int status, Datum arg)
 	 * no barrier waits block on it.
 	 */
 	pg_atomic_write_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX);
+	pg_atomic_write_u64(&slot->pss_barrierReceivedGeneration, PG_UINT64_MAX);
 
 	SpinLockRelease(&slot->pss_mutex);
 
@@ -416,12 +425,8 @@ EmitProcSignalBarrier(ProcSignalBarrierType type)
 	return generation;
 }
 
-/*
- * WaitForProcSignalBarrier - wait until it is guaranteed that all changes
- * requested by a specific call to EmitProcSignalBarrier() have taken effect.
- */
-void
-WaitForProcSignalBarrier(uint64 generation)
+static void
+WaitForProcSignalBarrierInternal(uint64 generation, bool receivedOnly)
 {
 	Assert(generation <= pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration));
 
@@ -436,12 +441,17 @@ WaitForProcSignalBarrier(uint64 generation)
 		uint64		oldval;
 
 		/*
-		 * It's important that we check only pss_barrierGeneration here and
-		 * not pss_barrierCheckMask. Bits in pss_barrierCheckMask get cleared
-		 * before the barrier is actually absorbed, but pss_barrierGeneration
+		 * It's important that we check only pss_barrierGeneration &
+		 * pss_barrierGeneration here and not pss_barrierCheckMask. Bits in
+		 * pss_barrierCheckMask get cleared before the barrier is actually
+		 * absorbed, but pss_barrierGeneration & pss_barrierReceivedGeneration
 		 * is updated only afterward.
 		 */
-		oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
+		if (receivedOnly)
+			oldval = pg_atomic_read_u64(&slot->pss_barrierReceivedGeneration);
+		else
+			oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
+
 		while (oldval < generation)
 		{
 			if (ConditionVariableTimedSleep(&slot->pss_barrierCV,
@@ -450,7 +460,11 @@ WaitForProcSignalBarrier(uint64 generation)
 				ereport(LOG,
 						(errmsg("still waiting for backend with PID %d to accept ProcSignalBarrier",
 								(int) pg_atomic_read_u32(&slot->pss_pid))));
-			oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
+
+			if (receivedOnly)
+				oldval = pg_atomic_read_u64(&slot->pss_barrierReceivedGeneration);
+			else
+				oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration);
 		}
 		ConditionVariableCancelSleep();
 	}
@@ -464,12 +478,33 @@ WaitForProcSignalBarrier(uint64 generation)
 	 * The caller is probably calling this function because it wants to read
 	 * the shared state or perform further writes to shared state once all
 	 * backends are known to have absorbed the barrier. However, the read of
-	 * pss_barrierGeneration was performed unlocked; insert a memory barrier
-	 * to separate it from whatever follows.
+	 * pss_barrierGeneration & pss_barrierReceivedGeneration was performed
+	 * unlocked; insert a memory barrier to separate it from whatever follows.
 	 */
 	pg_memory_barrier();
 }
 
+/*
+ * WaitForProcSignalBarrier - wait until it is guaranteed that all changes
+ * requested by a specific call to EmitProcSignalBarrier() have taken effect.
+ */
+void
+WaitForProcSignalBarrier(uint64 generation)
+{
+	WaitForProcSignalBarrierInternal(generation, false);
+}
+
+/*
+ * WaitForProcSignalBarrierReceived - wait until it is guaranteed that all
+ * backends have observed the message sent by a specific call to
+ * EmitProcSignalBarrier().
+ */
+void
+WaitForProcSignalBarrierReceived(uint64 generation)
+{
+	WaitForProcSignalBarrierInternal(generation, true);
+}
+
 /*
  * Handle receipt of an interrupt indicating a global barrier event.
  *
@@ -523,6 +558,10 @@ ProcessProcSignalBarrier(void)
 	if (local_gen == shared_gen)
 		return;
 
+	/* The message is observed, record that */
+	pg_atomic_write_u64(&MyProcSignalSlot->pss_barrierReceivedGeneration,
+						shared_gen);
+
 	/*
 	 * Get and clear the flags that are set for this backend. Note that
 	 * pg_atomic_exchange_u32 is a full barrier, so we're guaranteed that the
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index afeeb1ca019..2733bbb8c5b 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -79,6 +79,7 @@ extern void SendCancelRequest(int backendPID, const uint8 *cancel_key, int cance
 
 extern uint64 EmitProcSignalBarrier(ProcSignalBarrierType type);
 extern void WaitForProcSignalBarrier(uint64 generation);
+extern void WaitForProcSignalBarrierReceived(uint64 generation);
 extern void ProcessProcSignalBarrier(void);
 
 extern void procsignal_sigusr1_handler(SIGNAL_ARGS);
-- 
2.34.1