Thread

  1. Re: Parallel Apply

    Konstantin Knizhnik <knizhnik@garret.ru> — 2025-08-12T15:52:40Z

    On 11.08.2025 7:45 AM, Amit Kapila wrote:
    
    Hi,
    
    Background and Motivation
    -------------------------------------
    In high-throughput systems, where hundreds of sessions generate data
    on the publisher, the subscriber's apply process often becomes a
    bottleneck due to the single apply worker model. While users can
    mitigate this by creating multiple publication-subscription pairs,
    this approach has scalability and usability limitations.
    
    Currently, PostgreSQL supports parallel apply only for large streaming
    transactions (streaming=parallel). This proposal aims to extend
    parallelism to non-streaming transactions, thereby improving
    replication performance in workloads dominated by smaller, frequent
    transactions.
    
    Design Overview
    ------------------------
    To safely parallelize non-streaming transactions, we must ensure that
    transaction dependencies are respected to avoid failures and
    deadlocks. Consider the following scenarios to understand it better:
    (a) Transaction failures: Say, if we insert a row in the first
    transaction and update it in the second transaction on the publisher,
    then allowing the subscriber to apply both in parallel can lead to
    failure in the update; (b) Deadlocks - allowing transactions that
    update the same set of rows in a table in the opposite order in
    parallel can lead to deadlocks.
    
    The core idea is that the leader apply worker ensures the following:
    a. Identifies dependencies between transactions. b. Coordinates
    parallel workers to apply independent transactions concurrently. c.
    Ensures correct ordering for dependent transactions.
    
    Dependency Detection
    --------------------------------
    1. Basic Dependency Tracking: Maintain a hash table keyed by
    (RelationId, ReplicaIdentity) with the value as the transaction XID.
    Before dispatching a change to a parallel worker, the leader checks
    for existing entries: (a) If no match: add the entry and proceed; (b)
    If match: instruct the worker to wait until the dependent transaction
    completes.
    
    2. Unique Keys
    In addition to RI, track unique keys to detect conflicts. Example:
    CREATE TABLE tab1(a INT PRIMARY KEY, b INT UNIQUE);
    Transactions on publisher:
    Txn1: INSERT (1,1)
    Txn2: INSERT (2,2)
    Txn3: DELETE (2,2)
    Txn4: UPDATE (1,1) → (1,2)
    
    If Txn4 is applied before Txn2 and Txn3, it will fail due to a unique
    constraint violation. To prevent this, track both RI and unique keys
    in the hash table. Compare keys of both old and new tuples to detect
    dependencies. Then old_tuple's RI needs to be compared, and new
    tuple's, both unique key and RI (new tuple's RI is required to detect
    some prior insertion with the same key) needs to be compared with
    existing hash table entries to identify transaction dependency.
    
    3. Foreign Keys
    Consider FK constraints between tables. Example:
    
    TABLE owner(user_id INT PRIMARY KEY);
    TABLE car(car_name TEXT, user_id INT REFERENCES owner);
    
    Transactions:
    Txn1: INSERT INTO owner(1)
    Txn2: INSERT INTO car('bz', 1)
    
    Applying Txn2 before Txn1 will fail. To avoid this, check if FK values
    in new tuples match any RI or unique key in the hash table. If
    matched, treat the transaction as dependent.
    
    4. Triggers and Constraints
    For the initial version, exclude tables with user-defined triggers or
    constraints from parallel apply due to complexity in dependency
    detection. We may need some parallel-apply-safe marking to allow this.
    
    Replication Progress Tracking
    -----------------------------------------
    Parallel apply introduces out-of-order commit application,
    complicating replication progress tracking. To handle restarts and
    ensure consistency:
    
    Track Three Key Metrics:
    lowest_remote_lsn: Starting point for applying transactions.
    highest_remote_lsn: Highest LSN that has been applied.
    list_remote_lsn: List of commit LSNs applied between the lowest and highest.
    
    Mechanism:
    Store these in ReplicationState: lowest_remote_lsn,
    highest_remote_lsn, list_remote_lsn. Flush these to disk during
    checkpoints similar to CheckPointReplicationOrigin.
    
    After Restart, Start from lowest_remote_lsn and for each transaction,
    if its commit LSN is in list_remote_lsn, skip it, otherwise, apply it.
    Once commit LSN > highest_remote_lsn, apply without checking the list.
    
    During apply, the leader maintains list_in_progress_xacts in the
    increasing commit order. On commit, update highest_remote_lsn. If
    commit LSN matches the first in-progress xact of
    list_in_progress_xacts, update lowest_remote_lsn, otherwise, add to
    list_remote_lsn. After commit, also remove it from the
    list_in_progress_xacts. We need to clean up entries below
    lowest_remote_lsn in list_remote_lsn while updating its value.
    
    To illustrate how this mechanism works, consider the following four
    transactions:
    
    Transaction ID Commit LSN
    501 1000
    502 1100
    503 1200
    504 1300
    
    Assume:
    Transactions 501 and 502 take longer to apply whereas transactions 503
    and 504 finish earlier. Parallel apply workers are assigned as
    follows:
    pa-1 → 501
    pa-2 → 502
    pa-3 → 503
    pa-4 → 504
    
    Initial state: list_in_progress_xacts = [501, 502, 503, 504]
    
    Step 1: Transaction 503 commits first and in RecordTransactionCommit,
    it updates highest_remote_lsn to 1200. In apply_handle_commit, since
    503 is not the first in list_in_progress_xacts, add 1200 to
    list_remote_lsn. Remove 503 from list_in_progress_xacts.
    Step 2: Transaction 504 commits, Update highest_remote_lsn to 1300.
    Add 1300 to list_remote_lsn. Remove 504 from list_in_progress_xacts.
    ReplicationState now:
    lowest_remote_lsn = 0
    list_remote_lsn = [1200, 1300]
    highest_remote_lsn = 1300
    list_in_progress_xacts = [501, 502]
    
    Step 3: Transaction 501 commits. Since 501 is now the first in
    list_in_progress_xacts, update lowest_remote_lsn to 1000. Remove 501
    from list_in_progress_xacts. Clean up list_remote_lsn to remove
    entries < lowest_remote_lsn (none in this case).
    ReplicationState now:
    lowest_remote_lsn = 1000
    list_remote_lsn = [1200, 1300]
    highest_remote_lsn = 1300
    list_in_progress_xacts = [502]
    
    Step 4: System crash and restart
    Upon restart, Start replication from lowest_remote_lsn = 1000. First
    transaction encountered is 502, since it is not present in
    list_remote_lsn, apply it. As transactions 503 and 504 are present in
    list_remote_lsn, we skip them. Note that each transaction's
    end_lsn/commit_lsn has to be compared which the apply worker receives
    along with the first transaction command BEGIN. This ensures
    correctness and avoids duplicate application of already committed
    transactions.
    
    Upon restart, start replication from lowest_remote_lsn = 1000. First
    transaction encountered is 502 with commit LSN 1100, since it is not
    present in list_remote_lsn, apply it. As transactions 503 and 504's
    respective commit LSNs [1200, 1300] are present in list_remote_lsn, we
    skip them. This ensures correctness and avoids duplicate application
    of already committed transactions.
    
    Now, it is possible that some users may want to parallelize the
    transaction but still want to maintain commit order because they don't
    explicitly annotate FK, PK for columns but maintain the integrity via
    application. So, in such cases as we won't be able to detect
    transaction dependencies, it would be better to allow out-of-order
    commits optionally.
    
    Thoughts?
    
    
    
    Hi,
    This is something similar to what I have in mind when starting my
    experiments with LR apply speed improvements. I think that maintaining a
    full  (RelationId, ReplicaIdentity) hash may be too expensive - there can
    be hundreds of active transactions updating millions of rows.
    I thought about something like a bloom filter. But frankly speaking I
    didn't go far in thinking about all implementation details. Your proposal
    is much more concrete.
    
    But I decided to implement first approach with prefetch, which is much more
    simple, similar with prefetching currently used for physical replication
    and still provide quite significant improvement:
    https://www.postgresql.org/message-id/flat/84ed36b8-7d06-4945-9a6b-3826b3f999a6%40garret.ru#70b45c44814c248d3d519a762f528753
    
    There is one thing which I do not completely understand with your proposal:
    do you assume that LR walsender at publisher will use reorder buffer to
    "serialize" transactions
    or you assume that streaming mode will be used (now it is possible to
    enforce parallel apply of short transactions using
    `debug_logical_replication_streaming`)?
    
    It seems to be senseless to spend time and memory trying to serialize
    transactions at the publisher if we in any case want to apply them in
    parallel at subscriber.
    But then there is another problem: at publisher there can be hundreds of
    concurrent active transactions  (limited only by `max_connections`) which
    records are intermixed in WAL.
    If we try to apply them concurrently at subscriber, we need a corresponding
    number of parallel apply workers. But usually the number of such workers is
    less than 10 (and default is 2).
    So looks like we need to serialize transactions at subscriber side.
    
    Assume that there are 100 concurrent transactions T1..T100, i.e. before
    first COMMIT record there are mixed records of 100 transactions.
    And there are just two parallel apply workers W1 and W2. Main LR apply
    worker with send T1 record to W1, T2  record to W2 and ... there are not
    more vacant workers.
    It has either to spawn additional ones, but it is not always possible
    because total number of background workers is limited.
    Either serialize all other transactions in memory or on disk, until it
    reaches COMMIT of T1 or T2.
    I afraid that such serialization will eliminate any advantages of parallel
    apply.
    
    Certainly if we do reordering of transactions at publisher side, then there
    is no such problem. Subscriber receives all records for T1, then all
    records for T2, ... If there are no more vacant workers, it can just wait
    until any of this transactions is completed. But I am afraid that in this
    case the reorder buffer at the publisher will be a bottleneck.