From bb374ecb991cecf1376af084e9ed95d7481ab7c0 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 2 Dec 2025 13:01:26 +0900 Subject: [PATCH v5 6/8] support 2PC This patch allows the PREPARE transaction to be applied in parallel. Parallel apply workers are assigned to a transaction when BEGIN_PREPARE is received. This part and the dependency-waiting mechanism are the same as a normal transaction. A parallel worker can be freed after it handles a PREPARE message. The prepared transaction can be deleted from parallelized_txns at that time; the upcoming transactions will wait until then. The leader apply worker resolves COMMIT PREPARED/ROLLBACK PREPARED. Since it can be serialized automatically, it does not add the transaction to parallelized_txns. --- src/backend/replication/logical/worker.c | 230 +++++++++++++++--- src/test/subscription/t/050_parallel_apply.pl | 57 +++++ 2 files changed, 259 insertions(+), 28 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5808cd11c15..b6d3d43e8c0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2116,6 +2116,11 @@ static void apply_handle_begin_prepare(StringInfo s) { LogicalRepPreparedTxnData begin_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; /* Tablesync should never receive prepare. */ if (am_tablesync_worker()) @@ -2127,12 +2132,61 @@ apply_handle_begin_prepare(StringInfo s) Assert(!TransactionIdIsValid(stream_xid)); logicalrep_read_begin_prepare(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); + + remote_xid = begin_data.xid; + + set_apply_error_context_xact(remote_xid, begin_data.prepare_lsn); remote_final_lsn = begin_data.prepare_lsn; maybe_start_skipping_changes(begin_data.prepare_lsn); + pa_allocate_worker(remote_xid, false); + + apply_action = get_transaction_apply_action(remote_xid, &winfo); + + elog(DEBUG1, "new remote_xid %u", remote_xid); + switch (apply_action) + { + case TRANS_LEADER_APPLY: + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + if (pa_send_data(winfo, s->len, s->data)) + { + pa_set_stream_apply_worker(winfo); + break; + } + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, true); + +/* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_write_change(LOGICAL_REP_MSG_BEGIN_PREPARE, &original_msg); + + /* Cache the parallel apply worker for this transaction. */ + pa_set_stream_apply_worker(winfo); + break; + + case TRANS_PARALLEL_APPLY: + /* Hold the lock until the end of the transaction. */ + pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock); + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -2182,6 +2236,11 @@ static void apply_handle_prepare(StringInfo s) { LogicalRepPreparedTxnData prepare_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; logicalrep_read_prepare(s, &prepare_data); @@ -2192,36 +2251,136 @@ apply_handle_prepare(StringInfo s) LSN_FORMAT_ARGS(prepare_data.prepare_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - /* - * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction or all changes are skipped. It - * is done this way because at commit prepared time, we won't know whether - * we have skipped preparing a transaction because of those reasons. - * - * XXX, We can optimize such that at commit prepared time, we first check - * whether we have prepared the transaction or not but that doesn't seem - * worthwhile because such cases shouldn't be common. - */ - begin_replication_step(); + apply_action = get_transaction_apply_action(remote_xid, &winfo); - apply_handle_prepare_internal(&prepare_data); + switch (apply_action) + { + case TRANS_LEADER_APPLY: + /* + * Unlike commit, here, we always prepare the transaction even + * though no change has happened in this transaction or all changes + * are skipped. It is done this way because at commit prepared + * time, we won't know whether we have skipped preparing a + * transaction because of those reasons. + * + * XXX, We can optimize such that at commit prepared time, we first + * check whether we have prepared the transaction or not but that + * doesn't seem worthwhile because such cases shouldn't be common. + */ + begin_replication_step(); - end_replication_step(); - CommitTransactionCommand(); - pgstat_report_stat(false); + /* Wait until the last transaction finishes */ + if (TransactionIdIsValid(last_remote_xid)) + pa_wait_for_depended_transaction(last_remote_xid); - /* - * It is okay not to set the local_end LSN for the prepare because we - * always flush the prepare record. So, we can send the acknowledgment of - * the remote_end LSN as soon as prepare is finished. - * - * XXX For the sake of consistency with commit, we could have set it with - * the LSN of prepare but as of now we don't track that value similar to - * XactLastCommitEnd, and adding it for this purpose doesn't seems worth - * it. - */ - store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, - InvalidTransactionId); + apply_handle_prepare_internal(&prepare_data); + + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + /* + * It is okay not to set the local_end LSN for the prepare because + * we always flush the prepare record. So, we can send the + * acknowledgment of the remote_end LSN as soon as prepare is + * finished. + * + * XXX For the sake of consistency with commit, we could have set + * it with the LSN of prepare but as of now we don't track that + * value similar to XactLastCommitEnd, and adding it for this + * purpose doesn't seems worth + * it. + */ + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); + + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + /* + * Mark this transaction as parallelized. This ensures that + * upcoming transactions wait until this transaction is committed. + */ + pa_add_parallelized_transaction(remote_xid); + + /* + * Build a dependency between this transaction and the lastly + * committed transaction to preserve the commit order. Then try to + * send a COMMIT message if succeeded. + */ + if (build_dependency_with_last_committed_txn(winfo) && + pa_send_data(winfo, s->len, s->data)) + { + /* Finish processing the transaction. */ + pa_xact_finish(winfo, prepare_data.end_lsn); + break; + } + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, true); +/* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_PREPARE, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + + /* Finish processing the transaction. */ + pa_xact_finish(winfo, prepare_data.end_lsn); + break; + + case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before committing. + */ + if (stream_fd) + stream_close_file(); + + begin_replication_step(); + + INJECTION_POINT("parallel-worker-before-prepare", NULL); + + /* Mark the transaction as prepared. */ + apply_handle_prepare_internal(&prepare_data); + + end_replication_step(); + + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr, + InvalidTransactionId); + + /* + * It is okay not to set the local_end LSN for the prepare because + * we always flush the prepare record. See apply_handle_prepare. + */ + MyParallelShared->last_commit_end = InvalidXLogRecPtr; + pa_commit_transaction(); + + pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock); + + pa_reset_subtrans(); + break; + + default: + elog(ERROR, "unexpected apply action: %d", (int) apply_action); + break; + } + + /* Cache the remote_xid */ + last_remote_xid = remote_xid; + + remote_xid = InvalidTransactionId; in_remote_transaction = false; @@ -2269,6 +2428,9 @@ apply_handle_commit_prepared(StringInfo s) /* There is no transaction when COMMIT PREPARED is called */ begin_replication_step(); + if (TransactionIdIsValid(last_remote_xid)) + pa_wait_for_depended_transaction(last_remote_xid); + /* * Update origin state so we can restart streaming from correct position * in case of crash. @@ -2281,6 +2443,14 @@ apply_handle_commit_prepared(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); + /* + * No need to update last_remote_xid because leader worker applied the + * message thus upcoming transaction preserves the order automatically. + * Let's set the xid to an invalid value to skip sending the + * INTERNAL_DEPENDENCY message. + */ + last_remote_xid = InvalidTransactionId; + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd, InvalidTransactionId); in_remote_transaction = false; @@ -2337,6 +2507,10 @@ apply_handle_rollback_prepared(StringInfo s) /* There is no transaction when ABORT/ROLLBACK PREPARED is called */ begin_replication_step(); + + if (TransactionIdIsValid(last_remote_xid)) + pa_wait_for_depended_transaction(last_remote_xid); + FinishPreparedTransaction(gid, false); end_replication_step(); CommitTransactionCommand(); diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index 69cf48cb7ac..57bcfde513e 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -17,6 +17,8 @@ if ($ENV{enable_injection_points} ne 'yes') # Initialize publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + "max_prepared_transactions = 10"); $node_publisher->start; # Insert initial data @@ -35,6 +37,8 @@ $node_subscriber->init; $node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); $node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10"); +$node_subscriber->append_conf('postgresql.conf', + "max_prepared_transactions = 10"); $node_subscriber->start; # Check if the extension injection_points is available, as it may be @@ -127,4 +131,57 @@ $result = "SELECT count(1) FROM regress_tab WHERE value = 'updated'"); is ($result, 5, 'updates are also replicated to subscriber'); +# Ensure PREPAREd transaction also affects the parallel apply + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_sub DISABLE;"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_sub SET (two_phase = on);"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_sub ENABLE;"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_activity WHERE backend_type = 'logical replication parallel worker'"); +is($result, '0', "no parallel apply workers exist after restart"); + +# Attach an injection_point. Parallel workers would wait before the prepare +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-prepare','wait');" +); + +# PREPARE a transaction on publisher. It would be handled by a parallel apply +# worker. +$node_publisher->safe_psql('postgres', qq[ + BEGIN; + INSERT INTO regress_tab VALUES (generate_series(51, 60), 'prepare'); + PREPARE TRANSACTION 'regress_prepare'; +]); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-prepare'); + +$offset = -s $node_subscriber->logfile; + +# Insert tuples on publisher again. This transaction waits for the prepared +# transaction +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (generate_series(61, 70), 'test');"); + +# Verify the parallel worker waits for the transaction +$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-prepare'); + SELECT injection_points_wakeup('parallel-worker-before-prepare'); +]); + +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +# COMMIT the prepared transaction. It is always handled by the leader +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'regress_prepare';"); +$node_publisher->wait_for_catchup('regress_sub'); + done_testing(); -- 2.47.3