pg_get_async_wakeup_stats-patch.txt

text/plain

Filename: pg_get_async_wakeup_stats-patch.txt
Type: text/plain
Part: 1
Message: Re: Optimize LISTEN/NOTIFY
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dac12f8124..7e8e0b14f42 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -137,6 +137,7 @@
 #include <signal.h>
 #include <string.h>
 
+#include "access/htup_details.h"
 #include "access/parallel.h"
 #include "access/slru.h"
 #include "access/transam.h"
@@ -332,6 +333,13 @@ typedef struct AsyncQueueControl
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
 	dsa_handle	channelHashDSA;
 	dshash_table_handle channelHashDSH;
+	pg_atomic_uint32 signaledNeeded;	/* listening to some of the channels; signal needed */
+	pg_atomic_uint32 avoidedWakeups;	/* directly advanced */
+	pg_atomic_uint32 alreadyAdvancing;	/* already advancing its position */
+	pg_atomic_uint32 signaledUncertain;	/* signaled due to uncertain need */
+	pg_atomic_uint32 alreadyAhead;	/* already ahead, no action needed */
+	pg_atomic_uint32 necessaryWakeups;		/* wakeups where at least one message was interesting */
+	pg_atomic_uint32 unnecessaryWakeups;	/* wakeups where no messages were interesting */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
@@ -517,7 +525,8 @@ static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
 										 char *page_buffer,
-										 Snapshot snapshot);
+										 Snapshot snapshot,
+										 bool *interested);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -683,6 +692,13 @@ AsyncShmemInit(void)
 		asyncQueueControl->lastQueueFillWarn = 0;
 		asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
 		asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+		pg_atomic_init_u32(&asyncQueueControl->signaledNeeded, 0);
+		pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0);
+		pg_atomic_init_u32(&asyncQueueControl->alreadyAdvancing, 0);
+		pg_atomic_init_u32(&asyncQueueControl->signaledUncertain, 0);
+		pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0);
+		pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0);
+		pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0);
 
 		for (int i = 0; i < MaxBackends; i++)
 		{
@@ -997,6 +1013,81 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 	SRF_RETURN_DONE(funcctx);
 }
 
+/*
+ * SQL function: return statistics about NOTIFY wakeups
+ *
+ * This function returns a single row with:
+ * - necessary_wakeups: wakeups where at least one message was interesting
+ * - unnecessary_wakeups: wakeups where no messages were interesting
+ * - direct_advancements_success: directly advanced
+ * - already_advancing: already advancing its position
+ * - signaled_uncertain: signaled due to uncertain need
+ * - already_ahead: already ahead, no action needed
+ */
+Datum
+pg_get_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+	TupleDesc	tupdesc;
+	Datum		values[7];
+	bool		nulls[7];
+	HeapTuple	tuple;
+	uint32		signaled_needed;
+	uint32		direct_advancements_success;
+	uint32		already_advancing;
+	uint32		signaled_uncertain;
+	uint32		already_ahead;
+	uint32		necessary_wakeups;
+	uint32		unnecessary_wakeups;
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("function returning record called in context that cannot accept type record")));
+
+	/* Read the atomic counters */
+	signaled_needed = pg_atomic_read_u32(&asyncQueueControl->signaledNeeded);
+	direct_advancements_success = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups);
+	already_advancing = pg_atomic_read_u32(&asyncQueueControl->alreadyAdvancing);
+	signaled_uncertain = pg_atomic_read_u32(&asyncQueueControl->signaledUncertain);
+	already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead);
+	necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups);
+	unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups);
+
+	/* Fill in the values */
+	memset(nulls, 0, sizeof(nulls));
+	values[0] = Int64GetDatum((int64) signaled_needed);
+	values[1] = Int64GetDatum((int64) direct_advancements_success);
+	values[2] = Int64GetDatum((int64) already_advancing);
+	values[3] = Int64GetDatum((int64) signaled_uncertain);
+	values[4] = Int64GetDatum((int64) already_ahead);
+	values[5] = Int64GetDatum((int64) necessary_wakeups);
+	values[6] = Int64GetDatum((int64) unnecessary_wakeups);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * SQL function: reset NOTIFY wakeup statistics
+ *
+ * This function resets all the async wakeup counters to zero.
+ */
+Datum
+pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+	/* Reset all the atomic counters to zero */
+	pg_atomic_write_u32(&asyncQueueControl->signaledNeeded, 0);
+	pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0);
+	pg_atomic_write_u32(&asyncQueueControl->alreadyAdvancing, 0);
+	pg_atomic_write_u32(&asyncQueueControl->signaledUncertain, 0);
+	pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0);
+	pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0);
+	pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
+	PG_RETURN_VOID();
+}
+
 /*
  * Async_UnlistenOnExit
  *
@@ -2014,6 +2105,7 @@ SignalBackends(void)
 
 			Assert(pid != InvalidPid);
 
+			pg_atomic_fetch_add_u32(&asyncQueueControl->signaledNeeded, 1);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
 			pids[count] = pid;
 			procnos[count] = i;
@@ -2049,7 +2141,14 @@ SignalBackends(void)
 				 * currently advancing its position.
 				 */
 				if (!QUEUE_BACKEND_ADVANCING_POS(i))
+				{
 					QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+					pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1);
+				}
+				else
+				{
+					pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAdvancing, 1);
+				}
 			}
 			else if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
 			{
@@ -2060,6 +2159,7 @@ SignalBackends(void)
 				 */
 				Assert(pid != InvalidPid);
 
+				pg_atomic_fetch_add_u32(&asyncQueueControl->signaledUncertain, 1);
 				QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
 				pids[count] = pid;
 				procnos[count] = i;
@@ -2071,6 +2171,7 @@ SignalBackends(void)
 				 * The backend is already ahead of the notifications we wrote.
 				 * No need to do anything.
 				 */
+				pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1);
 				Assert(QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos));
 			}
 		}
@@ -2301,6 +2402,7 @@ asyncQueueReadAllNotifications(void)
 	volatile QueuePosition pos;
 	QueuePosition head;
 	Snapshot	snapshot;
+	bool		interested = false;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -2435,7 +2537,8 @@ asyncQueueReadAllNotifications(void)
 			 */
 			reachedStop = asyncQueueProcessPageEntries(&pos, head,
 													   page_buffer.buf,
-													   snapshot);
+													   snapshot,
+													   &interested);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -2450,6 +2553,11 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_END_TRY();
 
+	if (interested)
+		pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1);
+	else
+		pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1);
+
 	/* Done with snapshot */
 	UnregisterSnapshot(snapshot);
 }
@@ -2474,7 +2582,8 @@ static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
 							 char *page_buffer,
-							 Snapshot snapshot)
+							 Snapshot snapshot,
+							 bool *interested)
 {
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
@@ -2535,6 +2644,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 					char	   *payload = qe->data + strlen(channel) + 1;
 
 					NotifyMyFrontEnd(channel, payload, qe->srcPid);
+
+					/* Mark were interested in at least one message */
+					*interested = true;
 				}
 			}
 			else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..0bbd7db39c7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8571,7 +8571,18 @@
   proname => 'pg_notification_queue_usage', provolatile => 'v',
   proparallel => 'r', prorettype => 'float8', proargtypes => '',
   prosrc => 'pg_notification_queue_usage' },
-
+{ oid => '9315',
+  descr => 'get statistics about NOTIFY wakeups',
+  proname => 'pg_get_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o}',
+  proargnames => '{signaled_needed,avoided_wakeups,already_advancing,signaled_uncertain,already_ahead,necessary_wakeups,unnecessary_wakeups}',
+  prosrc => 'pg_get_async_wakeup_stats' },
+{ oid => '9316',
+  descr => 'reset statistics about NOTIFY wakeups',
+  proname => 'pg_reset_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'void', proargtypes => '',
+  prosrc => 'pg_reset_async_wakeup_stats' },
 # shared memory usage
 { oid => '5052', descr => 'allocations from the main shared memory segment',
   proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',