From 926b08dbbfedc199e101d407b27a5a57fd76b9c4 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 1 Dec 2025 10:37:27 +0900 Subject: [PATCH v4 1/4] Introduce new type of logical replication messages to track dependencies This patch introduces two logical replication messages, LOGICAL_REP_MSG_INTERNAL_DEPENDENCY and LOGICAL_REP_MSG_INTERNAL_RELATION. Apart from other messages, they are not sent by walsnders; the leader worker sends to parallel workers based on the needs. LOGICAL_REP_MSG_INTERNAL_DEPENDENCY ensures that dependent transactions are committed in the correct order. It has a list of transaction IDs that parallel workers must wait for. The message type would be generated when the leader detects a dependency between the current and other transactions, or just before the COMMIT message. The latter one is used to preserve the commit ordering between the publisher and the subscriber. LOGICAL_REP_MSG_INTERNAL_RELATION is used to synchronize the relation information between the leader and parallel workers. It has a list of relations that the leader already knows, and parallel workers also update the relmap in response to the message. This type of message is generated when the leader allocates a new parallel worker to the transaction, or when the publisher sends additional RELATION messages. Author: Hou Zhijie Author: Hayato Kuroda --- .../replication/logical/applyparallelworker.c | 16 ++++++ src/backend/replication/logical/proto.c | 4 ++ src/backend/replication/logical/worker.c | 49 +++++++++++++++++++ src/include/replication/logicalproto.h | 2 + src/include/replication/worker_internal.h | 4 ++ 5 files changed, 75 insertions(+) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index baa68c1ab6c..735a3e9acad 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -1645,3 +1645,19 @@ pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) pa_free_worker(winfo); } + +/* + * Wait for the given transaction to finish. + */ +void +pa_wait_for_depended_transaction(TransactionId xid) +{ + elog(DEBUG1, "wait for depended xid %u", xid); + + for (;;) + { + /* XXX wait until given transaction is finished */ + } + + elog(DEBUG1, "finish waiting for depended xid %u", xid); +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index f0a913892b9..72dedee3a43 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1253,6 +1253,10 @@ logicalrep_message_type(LogicalRepMsgType action) return "STREAM ABORT"; case LOGICAL_REP_MSG_STREAM_PREPARE: return "STREAM PREPARE"; + case LOGICAL_REP_MSG_INTERNAL_DEPENDENCY: + return "INTERNAL DEPENDENCY"; + case LOGICAL_REP_MSG_INTERNAL_RELATION: + return "INTERNAL RELATION"; } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 93970c6af29..ebf8cd62552 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -629,6 +629,47 @@ static TransApplyAction get_transaction_apply_action(TransactionId xid, static void replorigin_reset(int code, Datum arg); +/* + * Handle internal dependency information. + * + * Wait for all transactions listed in the message to commit. + */ +static void +apply_handle_internal_dependency(StringInfo s) +{ + int nxids = pq_getmsgint(s, 4); + + for (int i = 0; i < nxids; i++) + { + TransactionId xid = pq_getmsgint(s, 4); + + pa_wait_for_depended_transaction(xid); + } +} + +/* + * Handle internal relation information. + * + * Update all relation details in the relation map cache. + */ +static void +apply_handle_internal_relation(StringInfo s) +{ + int num_rels; + + num_rels = pq_getmsgint(s, 4); + + for (int i = 0; i < num_rels; i++) + { + LogicalRepRelation *rel = logicalrep_read_rel(s); + + logicalrep_relmap_update(rel); + + elog(DEBUG1, "parallel apply worker worker init relmap for %s", + rel->relname); + } +} + /* * Form the origin name for the subscription. * @@ -3868,6 +3909,14 @@ apply_dispatch(StringInfo s) apply_handle_stream_prepare(s); break; + case LOGICAL_REP_MSG_INTERNAL_RELATION: + apply_handle_internal_relation(s); + break; + + case LOGICAL_REP_MSG_INTERNAL_DEPENDENCY: + apply_handle_internal_dependency(s); + break; + default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index b261c60d3fa..5d91e2a4287 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -75,6 +75,8 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_STREAM_COMMIT = 'c', LOGICAL_REP_MSG_STREAM_ABORT = 'A', LOGICAL_REP_MSG_STREAM_PREPARE = 'p', + LOGICAL_REP_MSG_INTERNAL_DEPENDENCY = 'd', + LOGICAL_REP_MSG_INTERNAL_RELATION = 'i', } LogicalRepMsgType; /* diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f081619f151..a3526eae578 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -359,6 +359,8 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void pa_wait_for_depended_transaction(TransactionId xid); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTableSyncWorker(worker) ((worker)->in_use && \ @@ -366,6 +368,8 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isSequenceSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_SEQUENCESYNC) +#define PARALLEL_APPLY_INTERNAL_MESSAGE 'i' + static inline bool am_tablesync_worker(void) { -- 2.47.3