0003-async-avoid-pallocs-in-critical-section-v3.patch
application/octet-stream
Filename: 0003-async-avoid-pallocs-in-critical-section-v3.patch
Type: application/octet-stream
Part: 2
From 0ab676629c405a030603274872788c11e7508d59 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Tue, 25 Nov 2025 15:42:58 +0100
Subject: [PATCH 3/3] Execute LISTEN/UNLISTEN during PreCommit
---
src/backend/commands/async.c | 239 +++++++++++++++--------------------
1 file changed, 100 insertions(+), 139 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index ea6ea9e09a8..36b83896363 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -438,10 +438,9 @@ static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
-static void Exec_UnlistenAllCommit(void);
+static void Exec_Listen(const char *channel);
+static void Exec_Unlisten(const char *channel);
+static void Exec_UnlistenAll(void);
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
@@ -848,7 +847,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
static void
Async_UnlistenOnExit(int code, Datum arg)
{
- Exec_UnlistenAllCommit();
+ Exec_UnlistenAll();
asyncQueueUnregister();
}
@@ -877,7 +876,7 @@ AtPrepare_Notify(void)
* If there are pending LISTEN actions, make sure we are listed in the
* shared-memory listener array. This must happen before commit to
* ensure we don't miss any notifies from transactions that commit
- * just after ours.
+ * just after ours. This function also update listenChannels.
*
* If there are outbound notify requests in the pendingNotifies list,
* add them to the global queue. We do that before commit so that
@@ -894,7 +893,7 @@ PreCommit_Notify(void)
if (Trace_notify)
elog(DEBUG1, "PreCommit_Notify");
- /* Preflight for any pending listen/unlisten actions */
+ /* Perform any pending listen/unlisten actions */
if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
@@ -904,18 +903,22 @@ PreCommit_Notify(void)
switch (actrec->action)
{
case LISTEN_LISTEN:
- Exec_ListenPreCommit();
+ Exec_Listen(actrec->channel);
break;
case LISTEN_UNLISTEN:
- /* there is no Exec_UnlistenPreCommit() */
+ Exec_Unlisten(actrec->channel);
break;
case LISTEN_UNLISTEN_ALL:
- /* there is no Exec_UnlistenAllPreCommit() */
+ Exec_UnlistenAll();
break;
}
}
}
+ /* If no longer listening to anything, get out of listener array */
+ if (amRegisteredListener && listenChannels == NIL)
+ asyncQueueUnregister();
+
/* Queue any pending notifies (must happen after the above) */
if (pendingNotifies)
{
@@ -1005,18 +1008,16 @@ PreCommit_Notify(void)
*
* This is called at transaction commit, after committing to clog.
*
- * Update listenChannels and clear transaction-local state.
- *
* If we issued any notifications in the transaction, send signals to
* listening backends (possibly including ourselves) to process them.
* Also, if we filled enough queue pages with new notifies, try to
* advance the queue tail pointer.
+ *
+ * Finally, clear transaction-local state.
*/
void
AtCommit_Notify(void)
{
- ListCell *p;
-
/*
* Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
* return as soon as possible
@@ -1027,32 +1028,6 @@ AtCommit_Notify(void)
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify");
- /* Perform any pending listen/unlisten actions */
- if (pendingActions != NULL)
- {
- foreach(p, pendingActions->actions)
- {
- ListenAction *actrec = (ListenAction *) lfirst(p);
-
- switch (actrec->action)
- {
- case LISTEN_LISTEN:
- Exec_ListenCommit(actrec->channel);
- break;
- case LISTEN_UNLISTEN:
- Exec_UnlistenCommit(actrec->channel);
- break;
- case LISTEN_UNLISTEN_ALL:
- Exec_UnlistenAllCommit();
- break;
- }
- }
- }
-
- /* If no longer listening to anything, get out of listener array */
- if (amRegisteredListener && listenChannels == NIL)
- asyncQueueUnregister();
-
/*
* Send signals to listening backends. We need do this only if there are
* pending notifies, which were previously added to the shared queue by
@@ -1066,109 +1041,100 @@ AtCommit_Notify(void)
}
/*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * Exec_Listen --- subroutine for PreCommit_Notify
*
- * This function must make sure we are ready to catch any incoming messages.
+ * This function must make sure we are ready to catch any incoming messages,
+ * and adds the channel to the list of channels we are listening on.
*/
static void
-Exec_ListenPreCommit(void)
+Exec_Listen(const char *channel)
{
QueuePosition head;
QueuePosition max;
ProcNumber prevListener;
+ MemoryContext oldcontext;
+
+ if (Trace_notify)
+ elog(DEBUG1, "Exec_Listen(%s,%d)", channel, MyProcPid);
/*
* Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction.
*/
- if (amRegisteredListener)
- return;
-
- if (Trace_notify)
- elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
-
- /*
- * Before registering, make sure we will unlisten before dying. (Note:
- * this action does not get undone if we abort later.)
- */
- if (!unlistenExitRegistered)
+ if (!amRegisteredListener)
{
- before_shmem_exit(Async_UnlistenOnExit, 0);
- unlistenExitRegistered = true;
- }
+ /*
+ * Before registering, make sure we will unlisten before dying. (Note:
+ * this action does not get undone if we abort later.)
+ */
+ if (!unlistenExitRegistered)
+ {
+ before_shmem_exit(Async_UnlistenOnExit, 0);
+ unlistenExitRegistered = true;
+ }
- /*
- * This is our first LISTEN, so establish our pointer.
- *
- * We set our pointer to the global tail pointer and then move it forward
- * over already-committed notifications. This ensures we cannot miss any
- * not-yet-committed notifications. We might get a few more but that
- * doesn't hurt.
- *
- * In some scenarios there might be a lot of committed notifications that
- * have not yet been pruned away (because some backend is being lazy about
- * reading them). To reduce our startup time, we can look at other
- * backends and adopt the maximum "pos" pointer of any backend that's in
- * our database; any notifications it's already advanced over are surely
- * committed and need not be re-examined by us. (We must consider only
- * backends connected to our DB, because others will not have bothered to
- * check committed-ness of notifications in our DB.)
- *
- * We need exclusive lock here so we can look at other backends' entries
- * and manipulate the list links.
- */
- LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- head = QUEUE_HEAD;
- max = QUEUE_TAIL;
- prevListener = INVALID_PROC_NUMBER;
- for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
- {
- if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
- max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
- /* Also find last listening backend before this one */
- if (i < MyProcNumber)
- prevListener = i;
- }
- QUEUE_BACKEND_POS(MyProcNumber) = max;
- QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
- QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
- /* Insert backend into list of listeners at correct position */
- if (prevListener != INVALID_PROC_NUMBER)
- {
- QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
- QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
- }
- else
- {
- QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
- QUEUE_FIRST_LISTENER = MyProcNumber;
- }
- LWLockRelease(NotifyQueueLock);
+ /*
+ * This is our first LISTEN, so establish our pointer.
+ *
+ * We set our pointer to the global tail pointer and then move it forward
+ * over already-committed notifications. This ensures we cannot miss any
+ * not-yet-committed notifications. We might get a few more but that
+ * doesn't hurt.
+ *
+ * In some scenarios there might be a lot of committed notifications that
+ * have not yet been pruned away (because some backend is being lazy about
+ * reading them). To reduce our startup time, we can look at other
+ * backends and adopt the maximum "pos" pointer of any backend that's in
+ * our database; any notifications it's already advanced over are surely
+ * committed and need not be re-examined by us. (We must consider only
+ * backends connected to our DB, because others will not have bothered to
+ * check committed-ness of notifications in our DB.)
+ *
+ * We need exclusive lock here so we can look at other backends' entries
+ * and manipulate the list links.
+ */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ head = QUEUE_HEAD;
+ max = QUEUE_TAIL;
+ prevListener = INVALID_PROC_NUMBER;
+ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+ {
+ if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+ /* Also find last listening backend before this one */
+ if (i < MyProcNumber)
+ prevListener = i;
+ }
+ QUEUE_BACKEND_POS(MyProcNumber) = max;
+ QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
+ QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+ /* Insert backend into list of listeners at correct position */
+ if (prevListener != INVALID_PROC_NUMBER)
+ {
+ QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
+ QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
+ }
+ else
+ {
+ QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
+ QUEUE_FIRST_LISTENER = MyProcNumber;
+ }
+ LWLockRelease(NotifyQueueLock);
- /* Now we are listed in the global array, so remember we're listening */
- amRegisteredListener = true;
+ /* Now we are listed in the global array, so remember we're listening */
+ amRegisteredListener = true;
- /*
- * Try to move our pointer forward as far as possible. This will skip
- * over already-committed notifications, which we want to do because they
- * might be quite stale. Note that we are not yet listening on anything,
- * so we won't deliver such notifications to our frontend. Also, although
- * our transaction might have executed NOTIFY, those message(s) aren't
- * queued yet so we won't skip them here.
- */
- if (!QUEUE_POS_EQUAL(max, head))
- asyncQueueReadAllNotifications();
-}
-
-/*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
- *
- * Add the channel to the list of channels we are listening on.
- */
-static void
-Exec_ListenCommit(const char *channel)
-{
- MemoryContext oldcontext;
+ /*
+ * Try to move our pointer forward as far as possible. This will skip
+ * over already-committed notifications, which we want to do because they
+ * might be quite stale. Note that we are not yet listening on anything,
+ * so we won't deliver such notifications to our frontend. Also, although
+ * our transaction might have executed NOTIFY, those message(s) aren't
+ * queued yet so we won't skip them here.
+ */
+ if (!QUEUE_POS_EQUAL(max, head))
+ asyncQueueReadAllNotifications();
+ }
/* Do nothing if we are already listening on this channel */
if (IsListeningOn(channel))
@@ -1176,11 +1142,6 @@ Exec_ListenCommit(const char *channel)
/*
* Add the new channel name to listenChannels.
- *
- * XXX It is theoretically possible to get an out-of-memory failure here,
- * which would be bad because we already committed. For the moment it
- * doesn't seem worth trying to guard against that, but maybe improve this
- * later.
*/
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
listenChannels = lappend(listenChannels, pstrdup(channel));
@@ -1188,17 +1149,17 @@ Exec_ListenCommit(const char *channel)
}
/*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * Exec_Unlisten --- subroutine for AtCommit_Notify
*
* Remove the specified channel name from listenChannels.
*/
static void
-Exec_UnlistenCommit(const char *channel)
+Exec_Unlisten(const char *channel)
{
ListCell *q;
if (Trace_notify)
- elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
+ elog(DEBUG1, "Exec_Unlisten(%s,%d)", channel, MyProcPid);
foreach(q, listenChannels)
{
@@ -1219,15 +1180,15 @@ Exec_UnlistenCommit(const char *channel)
}
/*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenAll --- subroutine for PreCommit_Notify
*
* Unlisten on all channels for this backend.
*/
static void
-Exec_UnlistenAllCommit(void)
+Exec_UnlistenAll(void)
{
if (Trace_notify)
- elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
+ elog(DEBUG1, "Exec_UnlistenAll(%d)", MyProcPid);
list_free_deep(listenChannels);
listenChannels = NIL;
@@ -1927,7 +1888,7 @@ asyncQueueReadAllNotifications(void)
*
* What we do guarantee is that we'll see all notifications from
* transactions committing after the snapshot we take here.
- * Exec_ListenPreCommit has already added us to the listener array,
+ * Exec_Listen has already added us to the listener array,
* so no not-yet-committed messages can be removed from the queue
* before we see them.
*----------
--
2.50.1