v6-0004-support-2PC.patch
application/octet-stream
Filename: v6-0004-support-2PC.patch
Type: application/octet-stream
Part: 3
Message:
RE: Parallel Apply
From 7db41cfc2b7690a00ece9d0baa3244a6772b2866 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 2 Dec 2025 13:01:26 +0900
Subject: [PATCH v6 4/7] support 2PC
This patch allows the PREPARE transaction to be applied in parallel. Parallel
apply workers are assigned to a transaction when BEGIN_PREPARE is received. This
part and the dependency-waiting mechanism are the same as a normal transaction.
A parallel worker can be freed after it handles a PREPARE message. The prepared
transaction can be deleted from parallelized_txns at that time; the upcoming
transactions will wait until then.
The leader apply worker resolves COMMIT PREPARED/ROLLBACK PREPARED. Since it can
be serialized automatically, it does not add the transaction to parallelized_txns.
---
src/backend/replication/logical/worker.c | 230 +++++++++++++++---
src/test/subscription/t/050_parallel_apply.pl | 57 +++++
2 files changed, 259 insertions(+), 28 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3832481647e..ab757e3fac9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2116,6 +2116,11 @@ static void
apply_handle_begin_prepare(StringInfo s)
{
LogicalRepPreparedTxnData begin_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
/* Tablesync should never receive prepare. */
if (am_tablesync_worker())
@@ -2127,12 +2132,61 @@ apply_handle_begin_prepare(StringInfo s)
Assert(!TransactionIdIsValid(stream_xid));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
+
+ remote_xid = begin_data.xid;
+
+ set_apply_error_context_xact(remote_xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn;
maybe_start_skipping_changes(begin_data.prepare_lsn);
+ pa_allocate_worker(remote_xid, false);
+
+ apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+ elog(DEBUG1, "new remote_xid %u", remote_xid);
+ switch (apply_action)
+ {
+ case TRANS_LEADER_APPLY:
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ pa_set_stream_apply_worker(winfo);
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+/* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ stream_write_change(LOGICAL_REP_MSG_BEGIN_PREPARE, &original_msg);
+
+ /* Cache the parallel apply worker for this transaction. */
+ pa_set_stream_apply_worker(winfo);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+ /* Hold the lock until the end of the transaction. */
+ pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
+ break;
+
+ default:
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+ break;
+ }
+
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
@@ -2182,6 +2236,11 @@ static void
apply_handle_prepare(StringInfo s)
{
LogicalRepPreparedTxnData prepare_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
logicalrep_read_prepare(s, &prepare_data);
@@ -2192,36 +2251,136 @@ apply_handle_prepare(StringInfo s)
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
- /*
- * Unlike commit, here, we always prepare the transaction even though no
- * change has happened in this transaction or all changes are skipped. It
- * is done this way because at commit prepared time, we won't know whether
- * we have skipped preparing a transaction because of those reasons.
- *
- * XXX, We can optimize such that at commit prepared time, we first check
- * whether we have prepared the transaction or not but that doesn't seem
- * worthwhile because such cases shouldn't be common.
- */
- begin_replication_step();
+ apply_action = get_transaction_apply_action(remote_xid, &winfo);
- apply_handle_prepare_internal(&prepare_data);
+ switch (apply_action)
+ {
+ case TRANS_LEADER_APPLY:
+ /*
+ * Unlike commit, here, we always prepare the transaction even
+ * though no change has happened in this transaction or all changes
+ * are skipped. It is done this way because at commit prepared
+ * time, we won't know whether we have skipped preparing a
+ * transaction because of those reasons.
+ *
+ * XXX, We can optimize such that at commit prepared time, we first
+ * check whether we have prepared the transaction or not but that
+ * doesn't seem worthwhile because such cases shouldn't be common.
+ */
+ begin_replication_step();
- end_replication_step();
- CommitTransactionCommand();
- pgstat_report_stat(false);
+ /* Wait until the last transaction finishes */
+ if (TransactionIdIsValid(last_remote_xid))
+ pa_wait_for_depended_transaction(last_remote_xid);
- /*
- * It is okay not to set the local_end LSN for the prepare because we
- * always flush the prepare record. So, we can send the acknowledgment of
- * the remote_end LSN as soon as prepare is finished.
- *
- * XXX For the sake of consistency with commit, we could have set it with
- * the LSN of prepare but as of now we don't track that value similar to
- * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
- * it.
- */
- store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
- InvalidTransactionId);
+ apply_handle_prepare_internal(&prepare_data);
+
+ end_replication_step();
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ /*
+ * It is okay not to set the local_end LSN for the prepare because
+ * we always flush the prepare record. So, we can send the
+ * acknowledgment of the remote_end LSN as soon as prepare is
+ * finished.
+ *
+ * XXX For the sake of consistency with commit, we could have set
+ * it with the LSN of prepare but as of now we don't track that
+ * value similar to XactLastCommitEnd, and adding it for this
+ * purpose doesn't seems worth
+ * it.
+ */
+ store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+ InvalidTransactionId);
+
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ /*
+ * Mark this transaction as parallelized. This ensures that
+ * upcoming transactions wait until this transaction is committed.
+ */
+ pa_add_parallelized_transaction(remote_xid);
+
+ /*
+ * Build a dependency between this transaction and the lastly
+ * committed transaction to preserve the commit order. Then try to
+ * send a COMMIT message if succeeded.
+ */
+ if (build_dependency_with_last_committed_txn(winfo) &&
+ pa_send_data(winfo, s->len, s->data))
+ {
+ /* Finish processing the transaction. */
+ pa_xact_finish(winfo, prepare_data.end_lsn);
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+/* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_PREPARE,
+ &original_msg);
+
+ pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+ /* Finish processing the transaction. */
+ pa_xact_finish(winfo, prepare_data.end_lsn);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If the parallel apply worker is applying spooled messages then
+ * close the file before committing.
+ */
+ if (stream_fd)
+ stream_close_file();
+
+ begin_replication_step();
+
+ INJECTION_POINT("parallel-worker-before-prepare", NULL);
+
+ /* Mark the transaction as prepared. */
+ apply_handle_prepare_internal(&prepare_data);
+
+ end_replication_step();
+
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+ InvalidTransactionId);
+
+ /*
+ * It is okay not to set the local_end LSN for the prepare because
+ * we always flush the prepare record. See apply_handle_prepare.
+ */
+ MyParallelShared->last_commit_end = InvalidXLogRecPtr;
+ pa_commit_transaction();
+
+ pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+
+ pa_reset_subtrans();
+ break;
+
+ default:
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+ break;
+ }
+
+ /* Cache the remote_xid */
+ last_remote_xid = remote_xid;
+
+ remote_xid = InvalidTransactionId;
in_remote_transaction = false;
@@ -2269,6 +2428,9 @@ apply_handle_commit_prepared(StringInfo s)
/* There is no transaction when COMMIT PREPARED is called */
begin_replication_step();
+ if (TransactionIdIsValid(last_remote_xid))
+ pa_wait_for_depended_transaction(last_remote_xid);
+
/*
* Update origin state so we can restart streaming from correct position
* in case of crash.
@@ -2281,6 +2443,14 @@ apply_handle_commit_prepared(StringInfo s)
CommitTransactionCommand();
pgstat_report_stat(false);
+ /*
+ * No need to update last_remote_xid because leader worker applied the
+ * message thus upcoming transaction preserves the order automatically.
+ * Let's set the xid to an invalid value to skip sending the
+ * INTERNAL_DEPENDENCY message.
+ */
+ last_remote_xid = InvalidTransactionId;
+
store_flush_position(prepare_data.end_lsn, XactLastCommitEnd,
InvalidTransactionId);
in_remote_transaction = false;
@@ -2337,6 +2507,10 @@ apply_handle_rollback_prepared(StringInfo s)
/* There is no transaction when ABORT/ROLLBACK PREPARED is called */
begin_replication_step();
+
+ if (TransactionIdIsValid(last_remote_xid))
+ pa_wait_for_depended_transaction(last_remote_xid);
+
FinishPreparedTransaction(gid, false);
end_replication_step();
CommitTransactionCommand();
diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl
index 69cf48cb7ac..57bcfde513e 100644
--- a/src/test/subscription/t/050_parallel_apply.pl
+++ b/src/test/subscription/t/050_parallel_apply.pl
@@ -17,6 +17,8 @@ if ($ENV{enable_injection_points} ne 'yes')
# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ "max_prepared_transactions = 10");
$node_publisher->start;
# Insert initial data
@@ -35,6 +37,8 @@ $node_subscriber->init;
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->append_conf('postgresql.conf',
"max_logical_replication_workers = 10");
+$node_subscriber->append_conf('postgresql.conf',
+ "max_prepared_transactions = 10");
$node_subscriber->start;
# Check if the extension injection_points is available, as it may be
@@ -127,4 +131,57 @@ $result =
"SELECT count(1) FROM regress_tab WHERE value = 'updated'");
is ($result, 5, 'updates are also replicated to subscriber');
+# Ensure PREPAREd transaction also affects the parallel apply
+
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_sub DISABLE;");
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_sub SET (two_phase = on);");
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_sub ENABLE;");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(1) FROM pg_stat_activity WHERE backend_type = 'logical replication parallel worker'");
+is($result, '0', "no parallel apply workers exist after restart");
+
+# Attach an injection_point. Parallel workers would wait before the prepare
+$node_subscriber->safe_psql('postgres',
+ "SELECT injection_points_attach('parallel-worker-before-prepare','wait');"
+);
+
+# PREPARE a transaction on publisher. It would be handled by a parallel apply
+# worker.
+$node_publisher->safe_psql('postgres', qq[
+ BEGIN;
+ INSERT INTO regress_tab VALUES (generate_series(51, 60), 'prepare');
+ PREPARE TRANSACTION 'regress_prepare';
+]);
+
+# Wait until the parallel worker enters the injection point.
+$node_subscriber->wait_for_event('logical replication parallel worker',
+ 'parallel-worker-before-prepare');
+
+$offset = -s $node_subscriber->logfile;
+
+# Insert tuples on publisher again. This transaction waits for the prepared
+# transaction
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO regress_tab VALUES (generate_series(61, 70), 'test');");
+
+# Verify the parallel worker waits for the transaction
+$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset);
+$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/;
+
+# Wakeup the parallel worker
+$node_subscriber->safe_psql('postgres', qq[
+ SELECT injection_points_detach('parallel-worker-before-prepare');
+ SELECT injection_points_wakeup('parallel-worker-before-prepare');
+]);
+
+$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset);
+
+# COMMIT the prepared transaction. It is always handled by the leader
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'regress_prepare';");
+$node_publisher->wait_for_catchup('regress_sub');
+
done_testing();
--
2.47.3