0003-async-avoid-pallocs-in-critical-section-v2.patch
application/octet-stream
Filename: 0003-async-avoid-pallocs-in-critical-section-v2.patch
Type: application/octet-stream
Part: 1
From 3d9c474bc60dded52df5673724f9e793b58b1bcd Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 10:11:30 +0100
Subject: [PATCH 3/3] Convert listenChannels to hash table
---
src/backend/commands/async.c | 230 ++++++++++++++++++++++---------
src/tools/pgindent/typedefs.list | 1 +
2 files changed, 168 insertions(+), 63 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index d5676d43a30..ebc06ba771d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -68,7 +68,7 @@
* CommitTransaction() which will then do the actual transaction commit.
*
* After commit we are called another time (AtCommit_Notify()). Here we
- * make any actual updates to the effective listen state (listenChannels).
+ * make any actual updates to the effective listen state (listenChannelsHash).
* Then we signal any backends that may be interested in our messages
* (including our own backend, if listening). This is done by
* SignalBackends(), which scans the list of listening backends and sends a
@@ -313,16 +313,25 @@ static SlruCtlData NotifyCtlData;
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
/*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on). It is a simple list of channel names,
- * allocated in TopMemoryContext.
+ * listenChannelsHash identifies the channels we are listening to (or will be
+ * after the current transaction commits). Entries with active=true are
+ * committed listens; entries with active=false are pre-allocated for pending
+ * LISTENs that will become active at commit.
+ *
+ * This hash table is allocated in TopMemoryContext.
*/
-static List *listenChannels = NIL; /* list of C strings */
+typedef struct ListenChannelEntry
+{
+ char channel[NAMEDATALEN]; /* hash key - must be first */
+ bool active; /* true if committed listen */
+} ListenChannelEntry;
+
+static HTAB *listenChannelsHash = NULL;
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
* all actions requested in the current transaction. As explained above,
- * we don't actually change listenChannels until we reach transaction commit.
+ * we don't actually change listenChannelsHash until we reach transaction commit.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
@@ -438,7 +447,9 @@ static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
+static void initListenChannelsHash(void);
+static void CleanupInactiveListenChannels(void);
+static void Exec_ListenPreCommit(const char *channel);
static void Exec_ListenCommit(const char *channel);
static void Exec_UnlistenCommit(const char *channel);
static void Exec_UnlistenAllCommit(void);
@@ -504,6 +515,60 @@ initSignalArrays(void)
MemoryContextSwitchTo(oldcontext);
}
+/*
+ * initListenChannelsHash
+ * Lazy initialization of the listen channels hash table.
+ */
+static void
+initListenChannelsHash(void)
+{
+ HASHCTL hash_ctl;
+
+ if (listenChannelsHash != NULL)
+ return;
+
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(ListenChannelEntry);
+
+ listenChannelsHash = hash_create("Listen Channels", 64, &hash_ctl,
+ HASH_ELEM | HASH_STRINGS);
+}
+
+/*
+ * CleanupInactiveListenChannels
+ * Remove all entries with active=false from the hash table.
+ * If the hash table becomes empty, destroy it.
+ */
+static void
+CleanupInactiveListenChannels(void)
+{
+ HASH_SEQ_STATUS status;
+ ListenChannelEntry *entry;
+
+ if (listenChannelsHash == NULL)
+ return;
+
+ hash_seq_init(&status, listenChannelsHash);
+
+ while ((entry = hash_seq_search(&status)) != NULL)
+ {
+ if (!entry->active)
+ {
+ if (hash_search(listenChannelsHash, entry->channel,
+ HASH_REMOVE, NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+ }
+ }
+
+ /* Destroy hash table if now empty */
+ if (hash_get_num_entries(listenChannelsHash) == 0)
+ {
+ hash_destroy(listenChannelsHash);
+ listenChannelsHash = NULL;
+ }
+}
+
/*
* Report space needed for our shared memory area
*/
@@ -709,7 +774,7 @@ Async_Notify(const char *channel, const char *payload)
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
- * Actual update of the listenChannels list happens during transaction
+ * Actual update of the listenChannelsHash happens during transaction
* commit.
*/
static void
@@ -809,30 +874,52 @@ Async_UnlistenAll(void)
* SQL function: return a set of the channel names this backend is actively
* listening to.
*
- * Note: this coding relies on the fact that the listenChannels list cannot
+ * Note: this coding relies on the fact that the listenChannelsHash cannot
* change within a transaction.
*/
Datum
pg_listening_channels(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
+ HASH_SEQ_STATUS *status;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
+ MemoryContext oldcontext;
+
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
+
+ if (listenChannelsHash != NULL)
+ {
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ status = palloc(sizeof(HASH_SEQ_STATUS));
+ hash_seq_init(status, listenChannelsHash);
+ funcctx->user_fctx = status;
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ funcctx->user_fctx = NULL;
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
+ status = funcctx->user_fctx;
- if (funcctx->call_cntr < list_length(listenChannels))
+ if (status != NULL)
{
- char *channel = (char *) list_nth(listenChannels,
- funcctx->call_cntr);
+ ListenChannelEntry *entry = hash_seq_search(status);
- SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+ if (entry != NULL)
+ {
+ /*
+ * All entries should be active; inactive ones are cleaned up at
+ * commit/abort
+ */
+ Assert(entry->active);
+ SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
+ }
}
SRF_RETURN_DONE(funcctx);
@@ -849,6 +936,7 @@ static void
Async_UnlistenOnExit(int code, Datum arg)
{
Exec_UnlistenAllCommit();
+ CleanupInactiveListenChannels();
asyncQueueUnregister();
}
@@ -904,7 +992,7 @@ PreCommit_Notify(void)
switch (actrec->action)
{
case LISTEN_LISTEN:
- Exec_ListenPreCommit();
+ Exec_ListenPreCommit(actrec->channel);
break;
case LISTEN_UNLISTEN:
/* there is no Exec_UnlistenPreCommit() */
@@ -1005,7 +1093,7 @@ PreCommit_Notify(void)
*
* This is called at transaction commit, after committing to clog.
*
- * Update listenChannels and clear transaction-local state.
+ * Update listenChannelsHash and clear transaction-local state.
*
* If we issued any notifications in the transaction, send signals to
* listening backends (possibly including ourselves) to process them.
@@ -1049,8 +1137,11 @@ AtCommit_Notify(void)
}
}
+ /* Clean up inactive entries from the hash table */
+ CleanupInactiveListenChannels();
+
/* If no longer listening to anything, get out of listener array */
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener && listenChannelsHash == NULL)
asyncQueueUnregister();
/*
@@ -1071,11 +1162,22 @@ AtCommit_Notify(void)
* This function must make sure we are ready to catch any incoming messages.
*/
static void
-Exec_ListenPreCommit(void)
+Exec_ListenPreCommit(const char *channel)
{
QueuePosition head;
QueuePosition max;
ProcNumber prevListener;
+ ListenChannelEntry *entry;
+ bool found;
+
+ /*
+ * Pre-allocate the hash table entry for this channel. This happens before
+ * commit, so an OOM error here is safe (causes transaction abort).
+ */
+ initListenChannelsHash();
+ entry = hash_search(listenChannelsHash, channel, HASH_ENTER, &found);
+ if (!found)
+ entry->active = false;
/*
* Nothing to do if we are already listening to something, nor if we
@@ -1085,7 +1187,7 @@ Exec_ListenPreCommit(void)
return;
if (Trace_notify)
- elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
+ elog(DEBUG1, "Exec_ListenPreCommit(%s,%d)", channel, MyProcPid);
/*
* Before registering, make sure we will unlisten before dying. (Note:
@@ -1163,54 +1265,48 @@ Exec_ListenPreCommit(void)
/*
* Exec_ListenCommit --- subroutine for AtCommit_Notify
*
- * Add the channel to the list of channels we are listening on.
+ * Set the channel's hash entry to active.
*/
static void
Exec_ListenCommit(const char *channel)
{
- MemoryContext oldcontext;
+ ListenChannelEntry *entry;
- /* Do nothing if we are already listening on this channel */
- if (IsListeningOn(channel))
- return;
+ if (Trace_notify)
+ elog(DEBUG1, "Exec_ListenCommit(%s,%d)", channel, MyProcPid);
/*
- * Add the new channel name to listenChannels.
- *
- * XXX It is theoretically possible to get an out-of-memory failure here,
- * which would be bad because we already committed. For the moment it
- * doesn't seem worth trying to guard against that, but maybe improve this
- * later.
+ * Find the pre-allocated entry and mark it active. The entry must exist
+ * because Exec_ListenPreCommit created it.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- listenChannels = lappend(listenChannels, pstrdup(channel));
- MemoryContextSwitchTo(oldcontext);
+ Assert(listenChannelsHash != NULL);
+ entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL);
+ Assert(entry != NULL);
+
+ entry->active = true;
}
/*
* Exec_UnlistenCommit --- subroutine for AtCommit_Notify
*
- * Remove the specified channel name from listenChannels.
+ * Mark the specified channel as inactive. We unset active rather than
+ * removing the entry, to avoid needing to allocate memory if a subsequent
+ * LISTEN in the same transaction re-adds it.
*/
static void
Exec_UnlistenCommit(const char *channel)
{
- ListCell *q;
+ ListenChannelEntry *entry;
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
- foreach(q, listenChannels)
- {
- char *lchan = (char *) lfirst(q);
+ if (listenChannelsHash == NULL)
+ return;
- if (strcmp(lchan, channel) == 0)
- {
- listenChannels = foreach_delete_current(listenChannels, q);
- pfree(lchan);
- break;
- }
- }
+ entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL);
+ if (entry != NULL)
+ entry->active = false;
/*
* We do not complain about unlistening something not being listened;
@@ -1226,34 +1322,35 @@ Exec_UnlistenCommit(const char *channel)
static void
Exec_UnlistenAllCommit(void)
{
+ HASH_SEQ_STATUS seq;
+ ListenChannelEntry *entry;
+
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
- list_free_deep(listenChannels);
- listenChannels = NIL;
+ if (listenChannelsHash == NULL)
+ return;
+
+ hash_seq_init(&seq, listenChannelsHash);
+ while ((entry = hash_seq_search(&seq)) != NULL)
+ entry->active = false;
}
/*
* Test whether we are actively listening on the given channel name.
*
* Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it. In practice the list is likely to be
- * fairly short, though.
*/
static bool
IsListeningOn(const char *channel)
{
- ListCell *p;
+ ListenChannelEntry *entry;
- foreach(p, listenChannels)
- {
- char *lchan = (char *) lfirst(p);
+ if (listenChannelsHash == NULL)
+ return false;
- if (strcmp(lchan, channel) == 0)
- return true;
- }
- return false;
+ entry = hash_search(listenChannelsHash, channel, HASH_FIND, NULL);
+ return (entry != NULL && entry->active);
}
/*
@@ -1263,7 +1360,7 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- Assert(listenChannels == NIL); /* else caller error */
+ Assert(listenChannelsHash == NULL); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
return;
@@ -1695,12 +1792,19 @@ SignalBackends(void)
void
AtAbort_Notify(void)
{
+ /*
+ * Clean up any pre-allocated hash entries that weren't committed. These
+ * have active=false and would have been set to active=true in
+ * Exec_ListenCommit if the transaction had committed.
+ */
+ CleanupInactiveListenChannels();
+
/*
* If we LISTEN but then roll back the transaction after PreCommit_Notify,
- * we have registered as a listener but have not made any entry in
- * listenChannels. In that case, deregister again.
+ * we have registered as a listener but have not made any active entry in
+ * listenChannelsHash. In that case, deregister again.
*/
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener && listenChannelsHash == NULL)
asyncQueueUnregister();
/* And clean up */
@@ -2080,7 +2184,7 @@ asyncQueueProcessPageEntries(QueuePosition *current,
* over it on the first LISTEN in a session, and not get stuck on
* it indefinitely.
*/
- if (listenChannels == NIL)
+ if (listenChannelsHash == NULL)
continue;
if (TransactionIdDidCommit(qe->xid))
@@ -2332,7 +2436,7 @@ ProcessIncomingNotify(bool flush)
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
- if (listenChannels == NIL)
+ if (listenChannelsHash == NULL)
return;
if (Trace_notify)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c751c25a04d..46ea8f2ff8e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1564,6 +1564,7 @@ ListDictionary
ListParsedLex
ListenAction
ListenActionKind
+ListenChannelEntry
ListenStmt
LoInfo
LoadStmt
--
2.50.1