v6_2-0005-support-2PC.patch

application/octet-stream

Filename: v6_2-0005-support-2PC.patch
Type: application/octet-stream
Part: 4
Message: RE: Parallel Apply
From 7db41cfc2b7690a00ece9d0baa3244a6772b2866 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 2 Dec 2025 13:01:26 +0900
Subject: [PATCH v6_2 5/8] support 2PC

This patch allows the PREPARE transaction to be applied in parallel. Parallel
apply workers are assigned to a transaction when BEGIN_PREPARE is received. This
part and the dependency-waiting mechanism are the same as a normal transaction.

A parallel worker can be freed after it handles a PREPARE message. The prepared
transaction can be deleted from parallelized_txns at that time; the upcoming
transactions will wait until then.

The leader apply worker resolves COMMIT PREPARED/ROLLBACK PREPARED. Since it can
be serialized automatically, it does not add the transaction to parallelized_txns.
---
 src/backend/replication/logical/worker.c      | 230 +++++++++++++++---
 src/test/subscription/t/050_parallel_apply.pl |  57 +++++
 2 files changed, 259 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3832481647e..ab757e3fac9 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2116,6 +2116,11 @@ static void
 apply_handle_begin_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData begin_data;
+	ParallelApplyWorkerInfo *winfo;
+	TransApplyAction apply_action;
+
+	/* Save the message before it is consumed. */
+	StringInfoData original_msg = *s;
 
 	/* Tablesync should never receive prepare. */
 	if (am_tablesync_worker())
@@ -2127,12 +2132,61 @@ apply_handle_begin_prepare(StringInfo s)
 	Assert(!TransactionIdIsValid(stream_xid));
 
 	logicalrep_read_begin_prepare(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
+
+	remote_xid = begin_data.xid;
+
+	set_apply_error_context_xact(remote_xid, begin_data.prepare_lsn);
 
 	remote_final_lsn = begin_data.prepare_lsn;
 
 	maybe_start_skipping_changes(begin_data.prepare_lsn);
 
+	pa_allocate_worker(remote_xid, false);
+
+	apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+	elog(DEBUG1, "new remote_xid %u", remote_xid);
+	switch (apply_action)
+	{
+		case TRANS_LEADER_APPLY:
+			break;
+
+		case TRANS_LEADER_SEND_TO_PARALLEL:
+			Assert(winfo);
+
+			if (pa_send_data(winfo, s->len, s->data))
+			{
+				pa_set_stream_apply_worker(winfo);
+				break;
+			}
+
+			/*
+			 * Switch to serialize mode when we are not able to send the
+			 * change to parallel apply worker.
+			 */
+			pa_switch_to_partial_serialize(winfo, true);
+
+/* fall through */
+		case TRANS_LEADER_PARTIAL_SERIALIZE:
+			Assert(winfo);
+
+			stream_write_change(LOGICAL_REP_MSG_BEGIN_PREPARE, &original_msg);
+
+			/* Cache the parallel apply worker for this transaction. */
+			pa_set_stream_apply_worker(winfo);
+			break;
+
+		case TRANS_PARALLEL_APPLY:
+			/* Hold the lock until the end of the transaction. */
+			pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+			pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
+			break;
+
+		default:
+			elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+			break;
+	}
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -2182,6 +2236,11 @@ static void
 apply_handle_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData prepare_data;
+	ParallelApplyWorkerInfo *winfo;
+	TransApplyAction apply_action;
+
+	/* Save the message before it is consumed. */
+	StringInfoData original_msg = *s;
 
 	logicalrep_read_prepare(s, &prepare_data);
 
@@ -2192,36 +2251,136 @@ apply_handle_prepare(StringInfo s)
 								 LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
 								 LSN_FORMAT_ARGS(remote_final_lsn))));
 
-	/*
-	 * Unlike commit, here, we always prepare the transaction even though no
-	 * change has happened in this transaction or all changes are skipped. It
-	 * is done this way because at commit prepared time, we won't know whether
-	 * we have skipped preparing a transaction because of those reasons.
-	 *
-	 * XXX, We can optimize such that at commit prepared time, we first check
-	 * whether we have prepared the transaction or not but that doesn't seem
-	 * worthwhile because such cases shouldn't be common.
-	 */
-	begin_replication_step();
+	apply_action = get_transaction_apply_action(remote_xid, &winfo);
 
-	apply_handle_prepare_internal(&prepare_data);
+	switch (apply_action)
+	{
+		case TRANS_LEADER_APPLY:
+			/*
+			 * Unlike commit, here, we always prepare the transaction even
+			 * though no change has happened in this transaction or all changes
+			 * are skipped. It is done this way because at commit prepared
+			 * time, we won't know whether we have skipped preparing a
+			 * transaction because of those reasons.
+			 *
+			 * XXX, We can optimize such that at commit prepared time, we first
+			 * check whether we have prepared the transaction or not but that
+			 * doesn't seem worthwhile because such cases shouldn't be common.
+			 */
+			begin_replication_step();
 
-	end_replication_step();
-	CommitTransactionCommand();
-	pgstat_report_stat(false);
+			/* Wait until the last transaction finishes */
+			if (TransactionIdIsValid(last_remote_xid))
+				pa_wait_for_depended_transaction(last_remote_xid);
 
-	/*
-	 * It is okay not to set the local_end LSN for the prepare because we
-	 * always flush the prepare record. So, we can send the acknowledgment of
-	 * the remote_end LSN as soon as prepare is finished.
-	 *
-	 * XXX For the sake of consistency with commit, we could have set it with
-	 * the LSN of prepare but as of now we don't track that value similar to
-	 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
-	 * it.
-	 */
-	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
-						 InvalidTransactionId);
+			apply_handle_prepare_internal(&prepare_data);
+
+			end_replication_step();
+			CommitTransactionCommand();
+			pgstat_report_stat(false);
+
+			/*
+			 * It is okay not to set the local_end LSN for the prepare because
+			 * we always flush the prepare record. So, we can send the
+			 * acknowledgment of the remote_end LSN as soon as prepare is
+			 * finished.
+			 *
+			 * XXX For the sake of consistency with commit, we could have set
+			 * it with the LSN of prepare but as of now we don't track that
+			 * value similar to XactLastCommitEnd, and adding it for this
+			 * purpose doesn't seems worth
+			 * it.
+			 */
+			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+								 InvalidTransactionId);
+
+			break;
+
+		case TRANS_LEADER_SEND_TO_PARALLEL:
+			Assert(winfo);
+
+			/*
+			 * Mark this transaction as parallelized. This ensures that
+			 * upcoming transactions wait until this transaction is committed.
+			 */
+			pa_add_parallelized_transaction(remote_xid);
+
+			/*
+			 * Build a dependency between this transaction and the lastly
+			 * committed transaction to preserve the commit order. Then try to
+			 * send a COMMIT message if succeeded.
+			 */
+			if (build_dependency_with_last_committed_txn(winfo) &&
+				pa_send_data(winfo, s->len, s->data))
+			{
+				/* Finish processing the transaction. */
+				pa_xact_finish(winfo, prepare_data.end_lsn);
+				break;
+			}
+
+			/*
+			 * Switch to serialize mode when we are not able to send the
+			 * change to parallel apply worker.
+			 */
+			pa_switch_to_partial_serialize(winfo, true);
+/* fall through */
+		case TRANS_LEADER_PARTIAL_SERIALIZE:
+			Assert(winfo);
+
+			stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_PREPARE,
+										 &original_msg);
+
+			pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+			/* Finish processing the transaction. */
+			pa_xact_finish(winfo, prepare_data.end_lsn);
+			break;
+
+		case TRANS_PARALLEL_APPLY:
+
+			/*
+			 * If the parallel apply worker is applying spooled messages then
+			 * close the file before committing.
+			 */
+			if (stream_fd)
+				stream_close_file();
+
+			begin_replication_step();
+
+			INJECTION_POINT("parallel-worker-before-prepare", NULL);
+
+			/* Mark the transaction as prepared. */
+			apply_handle_prepare_internal(&prepare_data);
+
+			end_replication_step();
+
+			CommitTransactionCommand();
+			pgstat_report_stat(false);
+
+			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+								 InvalidTransactionId);
+
+			/*
+			 * It is okay not to set the local_end LSN for the prepare because
+			 * we always flush the prepare record. See apply_handle_prepare.
+			 */
+			MyParallelShared->last_commit_end = InvalidXLogRecPtr;
+			pa_commit_transaction();
+
+			pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+
+			pa_reset_subtrans();
+			break;
+
+		default:
+			elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+			break;
+	}
+
+	/* Cache the remote_xid */
+	last_remote_xid = remote_xid;
+
+	remote_xid = InvalidTransactionId;
 
 	in_remote_transaction = false;
 
@@ -2269,6 +2428,9 @@ apply_handle_commit_prepared(StringInfo s)
 	/* There is no transaction when COMMIT PREPARED is called */
 	begin_replication_step();
 
+	if (TransactionIdIsValid(last_remote_xid))
+		pa_wait_for_depended_transaction(last_remote_xid);
+
 	/*
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
@@ -2281,6 +2443,14 @@ apply_handle_commit_prepared(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
+	/*
+	 * No need to update last_remote_xid because leader worker applied the
+	 * message thus upcoming transaction preserves the order automatically.
+	 * Let's set the xid to an invalid value to skip sending the
+	 * INTERNAL_DEPENDENCY message.
+	 */
+	last_remote_xid = InvalidTransactionId;
+
 	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd,
 						 InvalidTransactionId);
 	in_remote_transaction = false;
@@ -2337,6 +2507,10 @@ apply_handle_rollback_prepared(StringInfo s)
 
 		/* There is no transaction when ABORT/ROLLBACK PREPARED is called */
 		begin_replication_step();
+
+		if (TransactionIdIsValid(last_remote_xid))
+			pa_wait_for_depended_transaction(last_remote_xid);
+
 		FinishPreparedTransaction(gid, false);
 		end_replication_step();
 		CommitTransactionCommand();
diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl
index 69cf48cb7ac..57bcfde513e 100644
--- a/src/test/subscription/t/050_parallel_apply.pl
+++ b/src/test/subscription/t/050_parallel_apply.pl
@@ -17,6 +17,8 @@ if ($ENV{enable_injection_points} ne 'yes')
 # Initialize publisher node
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+    "max_prepared_transactions = 10");
 $node_publisher->start;
 
 # Insert initial data
@@ -35,6 +37,8 @@ $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
 $node_subscriber->append_conf('postgresql.conf',
 	"max_logical_replication_workers = 10");
+$node_subscriber->append_conf('postgresql.conf',
+    "max_prepared_transactions = 10");
 $node_subscriber->start;
 
 # Check if the extension injection_points is available, as it may be
@@ -127,4 +131,57 @@ $result =
     "SELECT count(1) FROM regress_tab WHERE value = 'updated'");
 is ($result, 5, 'updates are also replicated to subscriber');
 
+# Ensure PREPAREd transaction also affects the parallel apply
+
+$node_subscriber->safe_psql('postgres',
+    "ALTER SUBSCRIPTION regress_sub DISABLE;");
+$node_subscriber->safe_psql('postgres',
+    "ALTER SUBSCRIPTION regress_sub SET (two_phase = on);");
+$node_subscriber->safe_psql('postgres',
+    "ALTER SUBSCRIPTION regress_sub ENABLE;");
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(1) FROM pg_stat_activity WHERE backend_type = 'logical replication parallel worker'");
+is($result, '0', "no parallel apply workers exist after restart");
+
+# Attach an injection_point. Parallel workers would wait before the prepare
+$node_subscriber->safe_psql('postgres',
+	"SELECT injection_points_attach('parallel-worker-before-prepare','wait');"
+);
+
+# PREPARE a transaction on publisher. It would be handled by a parallel apply
+# worker.
+$node_publisher->safe_psql('postgres', qq[
+    BEGIN;
+    INSERT INTO regress_tab VALUES (generate_series(51, 60), 'prepare');
+    PREPARE TRANSACTION 'regress_prepare';
+]);
+
+# Wait until the parallel worker enters the injection point.
+$node_subscriber->wait_for_event('logical replication parallel worker',
+	'parallel-worker-before-prepare');
+
+$offset = -s $node_subscriber->logfile;
+
+# Insert tuples on publisher again. This transaction waits for the prepared
+# transaction
+$node_publisher->safe_psql('postgres',
+    "INSERT INTO regress_tab VALUES (generate_series(61, 70), 'test');");
+
+# Verify the parallel worker waits for the transaction
+$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset);
+$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/;
+
+# Wakeup the parallel worker
+$node_subscriber->safe_psql('postgres', qq[
+    SELECT injection_points_detach('parallel-worker-before-prepare');
+    SELECT injection_points_wakeup('parallel-worker-before-prepare');
+]);
+
+$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset);
+
+# COMMIT the prepared transaction. It is always handled by the leader
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'regress_prepare';");
+$node_publisher->wait_for_catchup('regress_sub');
+
 done_testing();
-- 
2.47.3