pg_get_async_wakeup_stats-patch.txt
text/plain
Filename: pg_get_async_wakeup_stats-patch.txt
Type: text/plain
Part: 1
Message:
Re: Optimize LISTEN/NOTIFY
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dac12f8124..7e8e0b14f42 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -137,6 +137,7 @@
#include <signal.h>
#include <string.h>
+#include "access/htup_details.h"
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
@@ -332,6 +333,13 @@ typedef struct AsyncQueueControl
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
dsa_handle channelHashDSA;
dshash_table_handle channelHashDSH;
+ pg_atomic_uint32 signaledNeeded; /* listening to some of the channels; signal needed */
+ pg_atomic_uint32 avoidedWakeups; /* directly advanced */
+ pg_atomic_uint32 alreadyAdvancing; /* already advancing its position */
+ pg_atomic_uint32 signaledUncertain; /* signaled due to uncertain need */
+ pg_atomic_uint32 alreadyAhead; /* already ahead, no action needed */
+ pg_atomic_uint32 necessaryWakeups; /* wakeups where at least one message was interesting */
+ pg_atomic_uint32 unnecessaryWakeups; /* wakeups where no messages were interesting */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
} AsyncQueueControl;
@@ -517,7 +525,8 @@ static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer,
- Snapshot snapshot);
+ Snapshot snapshot,
+ bool *interested);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(bool flush);
static bool AsyncExistsPendingNotify(Notification *n);
@@ -683,6 +692,13 @@ AsyncShmemInit(void)
asyncQueueControl->lastQueueFillWarn = 0;
asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+ pg_atomic_init_u32(&asyncQueueControl->signaledNeeded, 0);
+ pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0);
+ pg_atomic_init_u32(&asyncQueueControl->alreadyAdvancing, 0);
+ pg_atomic_init_u32(&asyncQueueControl->signaledUncertain, 0);
+ pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0);
+ pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0);
+ pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0);
for (int i = 0; i < MaxBackends; i++)
{
@@ -997,6 +1013,81 @@ pg_listening_channels(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+/*
+ * SQL function: return statistics about NOTIFY wakeups
+ *
+ * This function returns a single row with:
+ * - necessary_wakeups: wakeups where at least one message was interesting
+ * - unnecessary_wakeups: wakeups where no messages were interesting
+ * - direct_advancements_success: directly advanced
+ * - already_advancing: already advancing its position
+ * - signaled_uncertain: signaled due to uncertain need
+ * - already_ahead: already ahead, no action needed
+ */
+Datum
+pg_get_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+ TupleDesc tupdesc;
+ Datum values[7];
+ bool nulls[7];
+ HeapTuple tuple;
+ uint32 signaled_needed;
+ uint32 direct_advancements_success;
+ uint32 already_advancing;
+ uint32 signaled_uncertain;
+ uint32 already_ahead;
+ uint32 necessary_wakeups;
+ uint32 unnecessary_wakeups;
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context that cannot accept type record")));
+
+ /* Read the atomic counters */
+ signaled_needed = pg_atomic_read_u32(&asyncQueueControl->signaledNeeded);
+ direct_advancements_success = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups);
+ already_advancing = pg_atomic_read_u32(&asyncQueueControl->alreadyAdvancing);
+ signaled_uncertain = pg_atomic_read_u32(&asyncQueueControl->signaledUncertain);
+ already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead);
+ necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups);
+ unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups);
+
+ /* Fill in the values */
+ memset(nulls, 0, sizeof(nulls));
+ values[0] = Int64GetDatum((int64) signaled_needed);
+ values[1] = Int64GetDatum((int64) direct_advancements_success);
+ values[2] = Int64GetDatum((int64) already_advancing);
+ values[3] = Int64GetDatum((int64) signaled_uncertain);
+ values[4] = Int64GetDatum((int64) already_ahead);
+ values[5] = Int64GetDatum((int64) necessary_wakeups);
+ values[6] = Int64GetDatum((int64) unnecessary_wakeups);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * SQL function: reset NOTIFY wakeup statistics
+ *
+ * This function resets all the async wakeup counters to zero.
+ */
+Datum
+pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+ /* Reset all the atomic counters to zero */
+ pg_atomic_write_u32(&asyncQueueControl->signaledNeeded, 0);
+ pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0);
+ pg_atomic_write_u32(&asyncQueueControl->alreadyAdvancing, 0);
+ pg_atomic_write_u32(&asyncQueueControl->signaledUncertain, 0);
+ pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0);
+ pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0);
+ pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
+ PG_RETURN_VOID();
+}
+
/*
* Async_UnlistenOnExit
*
@@ -2014,6 +2105,7 @@ SignalBackends(void)
Assert(pid != InvalidPid);
+ pg_atomic_fetch_add_u32(&asyncQueueControl->signaledNeeded, 1);
QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
pids[count] = pid;
procnos[count] = i;
@@ -2049,7 +2141,14 @@ SignalBackends(void)
* currently advancing its position.
*/
if (!QUEUE_BACKEND_ADVANCING_POS(i))
+ {
QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+ pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1);
+ }
+ else
+ {
+ pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAdvancing, 1);
+ }
}
else if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
{
@@ -2060,6 +2159,7 @@ SignalBackends(void)
*/
Assert(pid != InvalidPid);
+ pg_atomic_fetch_add_u32(&asyncQueueControl->signaledUncertain, 1);
QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
pids[count] = pid;
procnos[count] = i;
@@ -2071,6 +2171,7 @@ SignalBackends(void)
* The backend is already ahead of the notifications we wrote.
* No need to do anything.
*/
+ pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1);
Assert(QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos));
}
}
@@ -2301,6 +2402,7 @@ asyncQueueReadAllNotifications(void)
volatile QueuePosition pos;
QueuePosition head;
Snapshot snapshot;
+ bool interested = false;
/* page_buffer must be adequately aligned, so use a union */
union
@@ -2435,7 +2537,8 @@ asyncQueueReadAllNotifications(void)
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
page_buffer.buf,
- snapshot);
+ snapshot,
+ &interested);
} while (!reachedStop);
}
PG_FINALLY();
@@ -2450,6 +2553,11 @@ asyncQueueReadAllNotifications(void)
}
PG_END_TRY();
+ if (interested)
+ pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1);
+ else
+ pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1);
+
/* Done with snapshot */
UnregisterSnapshot(snapshot);
}
@@ -2474,7 +2582,8 @@ static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer,
- Snapshot snapshot)
+ Snapshot snapshot,
+ bool *interested)
{
bool reachedStop = false;
bool reachedEndOfPage;
@@ -2535,6 +2644,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
char *payload = qe->data + strlen(channel) + 1;
NotifyMyFrontEnd(channel, payload, qe->srcPid);
+
+ /* Mark were interested in at least one message */
+ *interested = true;
}
}
else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..0bbd7db39c7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8571,7 +8571,18 @@
proname => 'pg_notification_queue_usage', provolatile => 'v',
proparallel => 'r', prorettype => 'float8', proargtypes => '',
prosrc => 'pg_notification_queue_usage' },
-
+{ oid => '9315',
+ descr => 'get statistics about NOTIFY wakeups',
+ proname => 'pg_get_async_wakeup_stats', provolatile => 'v',
+ proparallel => 'r', prorettype => 'record', proargtypes => '',
+ proallargtypes => '{int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o}',
+ proargnames => '{signaled_needed,avoided_wakeups,already_advancing,signaled_uncertain,already_ahead,necessary_wakeups,unnecessary_wakeups}',
+ prosrc => 'pg_get_async_wakeup_stats' },
+{ oid => '9316',
+ descr => 'reset statistics about NOTIFY wakeups',
+ proname => 'pg_reset_async_wakeup_stats', provolatile => 'v',
+ proparallel => 'r', prorettype => 'void', proargtypes => '',
+ prosrc => 'pg_reset_async_wakeup_stats' },
# shared memory usage
{ oid => '5052', descr => 'allocations from the main shared memory segment',
proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',