Thread

  1. Re: Optimize LISTEN/NOTIFY

    Joel Jacobson <joel@compiler.org> — 2025-11-14T16:01:59Z

    On Thu, Nov 13, 2025, at 08:13, Joel Jacobson wrote:
    > Attached, please find a new version rebased on top of the bug fix
    > patches that just got committed in 0bdc777, 797e9ea, 8eeb4a0, and
    > 1b46990.
    
    To help reviewers, here is a new write-up of the patch:
    
    PROBLEM
    =======
    
    The current implementation has no central knowledge of which backend
    listens on which channel. When a backend commits a transaction that
    issued NOTIFY, SignalBackends() iterates over all registered listeners
    in the same database and sends each one a PROCSIG_NOTIFY_INTERRUPT
    signal, regardless of whether they are listening on the notified
    channel.
    
    This behavior is fine when all listeners are on the same channel, but
    when many backends are listening on different channels, each NOTIFY
    triggers unnecessary wakeups and context switches. As the number of idle
    listeners grows, this often becomes the bottleneck and throughput drops
    sharply.
    
    Performance degrades dramatically: benchmarks show throughput dropping
    from ~9,000 TPS with few listeners to ~200 TPS with 1,000 idle listeners
    on unrelated channels - a 45x slowdown purely from waking backends that
    have no notifications to process.
    
    
    SOLUTION OVERVIEW
    =================
    
    This patch introduces two optimizations:
    
    1. Targeted Signaling
       A lazily-created dynamic shared hash table (dshash) backed by dynamic
       shared memory (DSA) maps (database OID, channel name) to arrays of
       listening backends (ProcNumbers). This allows the notifier to signal
       only those backends actually listening on the channels being
       notified.
    
    2. Direct Advancement
       Even with targeted signaling, idle backends might still need to be
       woken to advance their queue read positions past notifications they
       don't care about. This patch avoids those unnecessary wakeups by
       directly advancing the queue positions of idle backends that are not
       listening on the channels being notified.
    
       This is possible because all NOTIFY writers are serialized by a
       heavyweight lock, allowing the notifier to identify precisely which
       queue entries belong to the current transaction. The notifier can
       then determine which idle backends are positioned within that range
       and safely advance their positions without waking them, since we know
       from the shared channel hash that they are not listening on any of
       the notified channels.
    
    
    IMPLEMENTATION DETAILS
    =======================
    
    Shared Channel Hash
    -------------------
    
    The patch adds a dshash table that maps (dboid, channel) keys to
    ChannelEntry structures.
    
    The listenersArray starts with capacity for 4 listeners and doubles when
    full. Memory is allocated from a DSA area and freed when a channel has
    zero listeners.
    
    The table is created lazily on the first LISTEN command. The DSA handle
    and dshash handle are stored in AsyncQueueControl for other backends to
    attach.
    
    
    Dual Data Structures
    --------------------
    
    The implementation maintains two complementary data structures:
    
    1. Shared channelHash: Used during commit to determine which backends
       need to be signaled. Updated during Exec_ListenCommit/UnlistenCommit/
       UnlistenAllCommit.
    
    2. Local listenChannelsHash: Changed from a List to an HTAB for fast
       lookups, used by IsListeningOn().
    
    This separation avoids contention on the shared hash during the frequent
    IsListeningOn() checks that occur for every notification read from the
    queue.
    
    
    Direct Advancement Algorithm
    -----------------------------
    
    In PreCommit_Notify(), while holding the heavyweight lock on "database
    0" that serializes all NOTIFY writers:
    
    1. Before writing the first notification, capture queueHeadBeforeWrite
    2. Write all notifications for the transaction to the queue 3. After
    writing the last notification, capture queueHeadAfterWrite
    
    The heavyweight lock guarantee means the range [queueHeadBeforeWrite,
    queueHeadAfterWrite) contains only notifications written by this commit,
    and no other backend could have inserted entries in this range.
    
    SignalBackends() then processes each backend:
    
      - If the backend has wakeupPending: skip (already signaled)
    
      - If the backend is advancing (reading the queue):
          If advancingPos < queueHeadAfterWrite: signal it
          (it will get stuck before our new entries without a signal)
    
      - If the backend is idle:
          If pos < queueHeadBeforeWrite: signal it
          (it might be interested in older messages)
    
          If pos >= queueHeadBeforeWrite AND pos < queueHeadAfterWrite:
          Direct advance pos to queueHeadAfterWrite
          (skip our messages entirely, no signal needed)
    
    
    New QueueBackendStatus Fields
    -----------------------------
    
    Each backend's entry in AsyncQueueControl now includes:
    
      wakeupPending:  signal sent but not yet processed
    
      isAdvancing:    backend is advancing its position
    
      advancingPos:   target position backend is advancing to
    
    These flags ensure correct interaction between direct advancement and
    backends that are concurrently processing their queue.
    
    
    Transaction-Local State
    ------------------------
    
    PreCommit_Notify() builds a list of unique channels
    (pendingNotifyChannels) from the transaction's notifications. This list
    is used by SignalBackends() to look up listeners in the shared hash
    efficiently, avoiding duplicate lookups when multiple notifications are
    sent to the same channel.
    
    Functions Modified
    ------------------
    
    AsyncShmemInit
      Initialize channelHashDSA/DSH handles (InvalidHandle) and new
      per-backend fields: wakeupPending, isAdvancing, advancingPos.
    
    Async_Notify
       Initialize channelHashtab.
    
    pg_listening_channels
      Rewritten to iterate over listenChannelsHash using HASH_SEQ_STATUS
      instead of traversing the old listenChannels list.
    
    PreCommit_Notify
      Build pendingNotifyChannels list of unique channels from transaction's
      notifications. Capture queueHeadBeforeWrite before writing first
      notification and queueHeadAfterWrite after each write to enable direct
      advancement optimization.
    
    AtCommit_Notify
      Check hash table entry count instead of list emptiness when deciding
      whether to unregister from listener array.
    
    Exec_ListenCommit
      Complete rewrite to maintain both local listenChannelsHash and shared
      channelHash. Insert backend's ProcNumber into DSA-allocated listeners
      array, growing array (doubling strategy) when full.
    
    Exec_UnlistenCommit
      Remove from both local and shared hashes. Compact listeners array with
      memmove, free DSA memory and delete hash entry when last listener
      removed.
    
    Exec_UnlistenAllCommit
      Iterate shared channelHash with dshash_seq_*, remove this backend from
      all channel entries in current database, clean up DSA memory and
      delete entries when empty.
    
    IsListeningOn
      Simplified to single hash_search() call on listenChannelsHash.
    
    asyncQueueUnregister
      Clear QUEUE_BACKEND_WAKEUP_PENDING flag and update assertion to check
      hash table instead of list.
    
    SignalBackends
      Rewrite to use targeted signaling instead of broadcast. Iterate
      pendingNotifyChannels, look up listeners per channel in shared
      channelHash. Implement direct advancement: advance idle backends
      positioned in [queueHeadBeforeWrite, queueHeadAfterWrite) without
      signaling. Use wakeupPending flag to prevent duplicate signals and
      respect isAdvancing flag to avoid interfering with concurrent position
      updates.
    
    AtAbort_Notify
      Use listenChannelsHash instead of listenChannels.
    
    asyncQueueReadAllNotifications
      Set isAdvancing flag and advancingPos before reading, clear
      isAdvancing after advancing position.
    
    asyncQueueProcessPageEntries
      Use listenChannelsHash instead of listenChannels.
    
    ProcessIncomingNotify
      Use listenChannelsHash instead of listenChannels.
    
    AddEventToPendingNotifies
      Build channelHashtab when notification count exceeds
      MIN_HASHABLE_NOTIFIES, enabling efficient extraction of unique channel
      names in PreCommit_Notify.
    
    ClearPendingActionsAndNotifies
      Also free pendingNotifyChannels.
    
    Functions Added
    ---------------
    
    asyncQueuePagePrecedes
      Inline function returning true if page p precedes page q (p < q).
    
    channelHashFunc
      Hash function for ChannelHashKey, combining hash of dboid and channel
      name using XOR. Required callback for dshash operations.
    
    initChannelHash
      Lazy initialization of shared dshash table mapping (dboid, channel) to
      listener arrays. First caller creates DSA area and dshash, stores
      handles in asyncQueueControl; subsequent callers attach using stored
      handles.
    
    initListenChannelsHash
      Lazy initialization of backend-local hash table (listenChannelsHash)
      for faster IsListeningOn() checks.
    
    ChannelHashPrepareKey
      Inline helper to construct ChannelHashKey.
    
    TESTING
    =======
    
    The patch adds comprehensive isolation tests covering:
    
    1. Subtransaction handling:
       - LISTEN in subtransaction with SAVEPOINT/RELEASE - LISTEN merge path
       (both outer and inner transactions) - ROLLBACK TO SAVEPOINT
       discarding pending actions
    
    2. Notification deduplication:
       - Hash table duplicate detection with 17 notifications + 1 duplicate
    
    3. Listener array growth:
       - Multiple listeners triggering ChannelEntry array expansion
    
    4. Cross-session delivery:
       - Notifications from non-listening backend to listener in another
       session
    
    Total test sessions expanded from 3 to 7 to cover these scenarios.
    
    /Joel