v4-0001-Introduce-new-type-of-logical-replication-message.patch
application/octet-stream
Filename: v4-0001-Introduce-new-type-of-logical-replication-message.patch
Type: application/octet-stream
Part: 0
Message:
RE: Parallel Apply
From 926b08dbbfedc199e101d407b27a5a57fd76b9c4 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 1 Dec 2025 10:37:27 +0900
Subject: [PATCH v4 1/4] Introduce new type of logical replication messages to
track dependencies
This patch introduces two logical replication messages,
LOGICAL_REP_MSG_INTERNAL_DEPENDENCY and LOGICAL_REP_MSG_INTERNAL_RELATION.
Apart from other messages, they are not sent by walsnders; the leader worker
sends to parallel workers based on the needs.
LOGICAL_REP_MSG_INTERNAL_DEPENDENCY ensures that dependent transactions are
committed in the correct order. It has a list of transaction IDs that parallel
workers must wait for. The message type would be generated when the leader
detects a dependency between the current and other transactions, or just before
the COMMIT message. The latter one is used to preserve the commit ordering
between the publisher and the subscriber.
LOGICAL_REP_MSG_INTERNAL_RELATION is used to synchronize the relation
information between the leader and parallel workers. It has a list of relations
that the leader already knows, and parallel workers also update the relmap in
response to the message. This type of message is generated when the leader
allocates a new parallel worker to the transaction, or when the publisher sends
additional RELATION messages.
Author: Hou Zhijie <houzj.fnst@fujitsu.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
---
.../replication/logical/applyparallelworker.c | 16 ++++++
src/backend/replication/logical/proto.c | 4 ++
src/backend/replication/logical/worker.c | 49 +++++++++++++++++++
src/include/replication/logicalproto.h | 2 +
src/include/replication/worker_internal.h | 4 ++
5 files changed, 75 insertions(+)
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index baa68c1ab6c..735a3e9acad 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -1645,3 +1645,19 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
pa_free_worker(winfo);
}
+
+/*
+ * Wait for the given transaction to finish.
+ */
+void
+pa_wait_for_depended_transaction(TransactionId xid)
+{
+ elog(DEBUG1, "wait for depended xid %u", xid);
+
+ for (;;)
+ {
+ /* XXX wait until given transaction is finished */
+ }
+
+ elog(DEBUG1, "finish waiting for depended xid %u", xid);
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index f0a913892b9..72dedee3a43 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -1253,6 +1253,10 @@ logicalrep_message_type(LogicalRepMsgType action)
return "STREAM ABORT";
case LOGICAL_REP_MSG_STREAM_PREPARE:
return "STREAM PREPARE";
+ case LOGICAL_REP_MSG_INTERNAL_DEPENDENCY:
+ return "INTERNAL DEPENDENCY";
+ case LOGICAL_REP_MSG_INTERNAL_RELATION:
+ return "INTERNAL RELATION";
}
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 93970c6af29..ebf8cd62552 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -629,6 +629,47 @@ static TransApplyAction get_transaction_apply_action(TransactionId xid,
static void replorigin_reset(int code, Datum arg);
+/*
+ * Handle internal dependency information.
+ *
+ * Wait for all transactions listed in the message to commit.
+ */
+static void
+apply_handle_internal_dependency(StringInfo s)
+{
+ int nxids = pq_getmsgint(s, 4);
+
+ for (int i = 0; i < nxids; i++)
+ {
+ TransactionId xid = pq_getmsgint(s, 4);
+
+ pa_wait_for_depended_transaction(xid);
+ }
+}
+
+/*
+ * Handle internal relation information.
+ *
+ * Update all relation details in the relation map cache.
+ */
+static void
+apply_handle_internal_relation(StringInfo s)
+{
+ int num_rels;
+
+ num_rels = pq_getmsgint(s, 4);
+
+ for (int i = 0; i < num_rels; i++)
+ {
+ LogicalRepRelation *rel = logicalrep_read_rel(s);
+
+ logicalrep_relmap_update(rel);
+
+ elog(DEBUG1, "parallel apply worker worker init relmap for %s",
+ rel->relname);
+ }
+}
+
/*
* Form the origin name for the subscription.
*
@@ -3868,6 +3909,14 @@ apply_dispatch(StringInfo s)
apply_handle_stream_prepare(s);
break;
+ case LOGICAL_REP_MSG_INTERNAL_RELATION:
+ apply_handle_internal_relation(s);
+ break;
+
+ case LOGICAL_REP_MSG_INTERNAL_DEPENDENCY:
+ apply_handle_internal_dependency(s);
+ break;
+
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..5d91e2a4287 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -75,6 +75,8 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
LOGICAL_REP_MSG_STREAM_ABORT = 'A',
LOGICAL_REP_MSG_STREAM_PREPARE = 'p',
+ LOGICAL_REP_MSG_INTERNAL_DEPENDENCY = 'd',
+ LOGICAL_REP_MSG_INTERNAL_RELATION = 'i',
} LogicalRepMsgType;
/*
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f081619f151..a3526eae578 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -359,6 +359,8 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
+extern void pa_wait_for_depended_transaction(TransactionId xid);
+
#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
#define isTableSyncWorker(worker) ((worker)->in_use && \
@@ -366,6 +368,8 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
#define isSequenceSyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_SEQUENCESYNC)
+#define PARALLEL_APPLY_INTERNAL_MESSAGE 'i'
+
static inline bool
am_tablesync_worker(void)
{
--
2.47.3