0003-optimize_listen_notify-v24.txt

text/plain

Filename: 0003-optimize_listen_notify-v24.txt
Type: text/plain
Part: 0
Message: Re: Optimize LISTEN/NOTIFY
From 9f2da84a9c58df155961481aa0802ffb95460811 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Fri, 7 Nov 2025 19:24:37 +0100
Subject: [PATCH 3/3] Add instrumentation for analyzing LISTEN/NOTIFY wakeup
 behavior

This commit adds a set of atomic counters and SQL-accessible functions
to help understand how SignalBackends and asyncQueueReadAllNotifications
interact under various workloads.  The instrumentation is intended only
for development and performance analysis and will not be included in the
final patch.

Specifically:

* Added several pg_atomic_uint32 counters in AsyncQueueControl tracking
  wakeup categories such as signaled backends, advancing vs. idle
  positions, direct advancements, and unnecessary wakeups.
* Incremented these counters in SignalBackends() and
  asyncQueueReadAllNotifications() to classify wakeup decisions.
* Added SQL functions pg_get_async_wakeup_stats() and
  pg_reset_async_wakeup_stats() for reading and resetting these counters
  during test runs.
* Modified asyncQueueProcessPageEntries() to report whether any
  notifications were of interest to the backend, allowing
  differentiation between necessary and unnecessary wakeups.

This is purely diagnostic code to help reason about backend wakeup
patterns and validate assumptions during optimization.  It introduces no
user-visible or behavioral changes and is not intended for commit to the
main tree.
---
 src/backend/commands/async.c    | 171 +++++++++++++++++++++++++++-----
 src/include/catalog/pg_proc.dat |  13 ++-
 2 files changed, 159 insertions(+), 25 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 9f7b8a3324a..2ec6b6b9e2b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -136,6 +136,7 @@
 #include <unistd.h>
 #include <signal.h>
 
+#include "access/htup_details.h"
 #include "access/parallel.h"
 #include "access/slru.h"
 #include "access/transam.h"
@@ -332,6 +333,14 @@ typedef struct AsyncQueueControl
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
 	dsa_handle	channelHashDSA;
 	dshash_table_handle channelHashDSH;
+	pg_atomic_uint32 signaledTargeted;		/* listening to some of the channels; signal needed */
+	pg_atomic_uint32 advancingBehind;		/* advancing, position behind queue head before write */
+	pg_atomic_uint32 advancingAhead;		/* advancing, position ahead of queue head after write */
+	pg_atomic_uint32 idleBehind;			/* stationary at a position behind queue head before write */
+	pg_atomic_uint32 avoidedWakeups;		/* directly advanced */
+	pg_atomic_uint32 alreadyAhead;			/* already caught up or ahead, no action needed */
+	pg_atomic_uint32 necessaryWakeups;		/* wakeups where at least one message was interesting */
+	pg_atomic_uint32 unnecessaryWakeups;	/* wakeups where we had no interest in any of the messages */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
@@ -518,7 +527,8 @@ static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
 										 char *page_buffer,
-										 Snapshot snapshot);
+										 Snapshot snapshot,
+										 bool *interested);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -684,6 +694,15 @@ AsyncShmemInit(void)
 		asyncQueueControl->lastQueueFillWarn = 0;
 		asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
 		asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+		pg_atomic_init_u32(&asyncQueueControl->signaledTargeted, 0);
+		pg_atomic_init_u32(&asyncQueueControl->advancingBehind, 0);
+		pg_atomic_init_u32(&asyncQueueControl->advancingAhead, 0);
+		pg_atomic_init_u32(&asyncQueueControl->idleBehind, 0);
+		pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0);
+		pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0);
+		pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0);
+		pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
 		for (int i = 0; i < MaxBackends; i++)
 		{
 			QUEUE_BACKEND_PID(i) = InvalidPid;
@@ -998,6 +1017,85 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 	SRF_RETURN_DONE(funcctx);
 }
 
+/*
+ * SQL function: return statistics about NOTIFY wakeups
+ *
+ * This function returns a single row with:
+ * - necessary_wakeups: wakeups where at least one message was interesting
+ * - unnecessary_wakeups: wakeups where no messages were interesting
+ * - direct_advancements_success: directly advanced
+ * - already_advancing: already advancing its position
+ * - signaled_uncertain: signaled due to uncertain need
+ * - already_ahead: already ahead, no action needed
+ */
+Datum
+pg_get_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+	TupleDesc	tupdesc;
+	Datum		values[8];
+	bool		nulls[8];
+	HeapTuple	tuple;
+	uint32		signaled_targeted;
+	uint32		advancing_behind;
+	uint32		advancing_ahead;
+	uint32		idle_behind;
+	uint32		avoided_wakeups;
+	uint32		already_ahead;
+	uint32		necessary_wakeups;
+	uint32		unnecessary_wakeups;
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("function returning record called in context that cannot accept type record")));
+
+	/* Read the atomic counters */
+	signaled_targeted = pg_atomic_read_u32(&asyncQueueControl->signaledTargeted);
+	advancing_behind = pg_atomic_read_u32(&asyncQueueControl->advancingBehind);
+	advancing_ahead = pg_atomic_read_u32(&asyncQueueControl->advancingAhead);
+	idle_behind = pg_atomic_read_u32(&asyncQueueControl->idleBehind);
+	avoided_wakeups = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups);
+	already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead);
+	necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups);
+	unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups);
+
+	/* Fill in the values */
+	memset(nulls, 0, sizeof(nulls));
+	values[0] = Int64GetDatum((int64) signaled_targeted);
+	values[1] = Int64GetDatum((int64) advancing_behind);
+	values[2] = Int64GetDatum((int64) advancing_ahead);
+	values[3] = Int64GetDatum((int64) idle_behind);
+	values[4] = Int64GetDatum((int64) avoided_wakeups);
+	values[5] = Int64GetDatum((int64) already_ahead);
+	values[6] = Int64GetDatum((int64) necessary_wakeups);
+	values[7] = Int64GetDatum((int64) unnecessary_wakeups);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * SQL function: reset NOTIFY wakeup statistics
+ *
+ * This function resets all the async wakeup counters to zero.
+ */
+Datum
+pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+	/* Reset all the atomic counters to zero */
+	pg_atomic_write_u32(&asyncQueueControl->signaledTargeted, 0);
+	pg_atomic_write_u32(&asyncQueueControl->advancingBehind, 0);
+	pg_atomic_write_u32(&asyncQueueControl->advancingAhead, 0);
+	pg_atomic_write_u32(&asyncQueueControl->idleBehind, 0);
+	pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0);
+	pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0);
+	pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0);
+	pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
+	PG_RETURN_VOID();
+}
+
 /*
  * Async_UnlistenOnExit
  *
@@ -2016,6 +2114,7 @@ SignalBackends(void)
 
 			Assert(pid != InvalidPid);
 
+			pg_atomic_fetch_add_u32(&asyncQueueControl->signaledTargeted, 1);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
 			pids[count] = pid;
 			procnos[count] = i;
@@ -2037,6 +2136,7 @@ SignalBackends(void)
 		{
 			QueuePosition pos;
 			int32		pid;
+			bool		need_signal = false;
 
 			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
 				continue;
@@ -2044,21 +2144,39 @@ SignalBackends(void)
 			pos = QUEUE_BACKEND_POS(i);
 			pid = QUEUE_BACKEND_PID(i);
 
-			/*
-			 * We need to signal advancing listening backends that would get
-			 * stuck at a position before the new queue head. We also need to
-			 * signal listening backends that are idle at a position before
-			 * the old queue head since they could be interested in the
-			 * messages in-between.
-			 *
-			 * Listening backends that are not advancing and are stationary at
-			 * a position somewhere in the range we just wrote, can safely be
-			 * direct advanced to the new queue head, since we know that they
-			 * are not interested in our messages.
-			 */
-			if (QUEUE_BACKEND_IS_ADVANCING(i) ?
-				QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite) :
-				QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
+			if (QUEUE_BACKEND_IS_ADVANCING(i))
+			{
+				if (QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite))
+				{
+					need_signal = true;
+					pg_atomic_fetch_add_u32(&asyncQueueControl->advancingBehind, 1);
+				}
+				else
+				{
+					pg_atomic_fetch_add_u32(&asyncQueueControl->advancingAhead, 1);
+				}
+			}
+			else
+			{
+				if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
+				{
+					need_signal = true;
+					pg_atomic_fetch_add_u32(&asyncQueueControl->idleBehind, 1);
+				}
+				else if (QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
+				{
+					QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+					pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1);	
+				}
+				else
+				{
+					Assert(QUEUE_POS_EQUAL(pos, queueHeadAfterWrite) ||
+						   QUEUE_POS_PRECEDES(queueHeadAfterWrite, pos));
+					pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1);
+				}
+			}
+
+			if (need_signal)
 			{
 				Assert(pid != InvalidPid);
 
@@ -2067,13 +2185,7 @@ SignalBackends(void)
 				procnos[count] = i;
 				count++;
 			}
-			else if (!QUEUE_BACKEND_IS_ADVANCING(i) &&
-					 QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
-			{
-				Assert(!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite));
 
-				QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
-			}
 		}
 	}
 	LWLockRelease(NotifyQueueLock);
@@ -2302,6 +2414,7 @@ asyncQueueReadAllNotifications(void)
 	volatile QueuePosition pos;
 	QueuePosition head;
 	Snapshot	snapshot;
+	bool		interested = false;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -2438,7 +2551,8 @@ asyncQueueReadAllNotifications(void)
 			 */
 			reachedStop = asyncQueueProcessPageEntries(&pos, head,
 													   page_buffer.buf,
-													   snapshot);
+													   snapshot,
+													   &interested);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -2452,6 +2566,11 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_END_TRY();
 
+	if (interested)
+		pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1);
+	else
+		pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1);
+
 	/* Done with snapshot */
 	UnregisterSnapshot(snapshot);
 }
@@ -2476,7 +2595,8 @@ static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
 							 char *page_buffer,
-							 Snapshot snapshot)
+							 Snapshot snapshot,
+							 bool *interested)
 {
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
@@ -2537,6 +2657,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 					char	   *payload = qe->data + strlen(channel) + 1;
 
 					NotifyMyFrontEnd(channel, payload, qe->srcPid);
+
+					/* Mark were interested in at least one message */
+					*interested = true;
 				}
 			}
 			else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..b259bccfa4b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8571,7 +8571,18 @@
   proname => 'pg_notification_queue_usage', provolatile => 'v',
   proparallel => 'r', prorettype => 'float8', proargtypes => '',
   prosrc => 'pg_notification_queue_usage' },
-
+{ oid => '9315',
+  descr => 'get statistics about NOTIFY wakeups',
+  proname => 'pg_get_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{int8,int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o,o}',
+  proargnames => '{signaled_targeted,advancing_behind,advancing_ahead,idle_behind,avoided_wakeups,already_ahead,necessary_wakeups,unnecessary_wakeups}',
+  prosrc => 'pg_get_async_wakeup_stats' },
+{ oid => '9316',
+  descr => 'reset statistics about NOTIFY wakeups',
+  proname => 'pg_reset_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'void', proargtypes => '',
+  prosrc => 'pg_reset_async_wakeup_stats' },
 # shared memory usage
 { oid => '5052', descr => 'allocations from the main shared memory segment',
   proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',
-- 
2.50.1