v1-0002-Add-some-simple-statistics.txt
text/plain
Filename: v1-0002-Add-some-simple-statistics.txt
Type: text/plain
Part: 0
Message:
Re: Parallel Apply
From 00c05e510015fd72e9f1ede34868e0f691ded299 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Thu, 14 Aug 2025 18:38:08 +0800
Subject: [PATCH v1] Add some simple statistics
---
src/backend/replication/logical/worker.c | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 11726b691fa..ff550900c2e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -507,6 +507,12 @@ static BufFile *stream_fd = NULL;
*/
static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
+static uint64 parallelized_nxact = 0;
+static uint64 dependent_nxact = 0;
+static uint64 leader_applied_nxact = 0;
+
+static bool dependent_xact = false;
+
typedef struct SubXactInfo
{
TransactionId xid; /* XID of the subxact */
@@ -1138,6 +1144,8 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s,
if (!depends_on_xids)
return;
+ dependent_xact = true;
+
/*
* Notify the transactions that they are dependent on the current
* transaction.
@@ -1831,6 +1839,8 @@ apply_handle_begin(StringInfo s)
/* There must not be an active streaming transaction. */
Assert(!TransactionIdIsValid(stream_xid));
+ dependent_xact = false;
+
logicalrep_read_begin(s, &begin_data);
remote_xid = begin_data.xid;
@@ -1903,11 +1913,17 @@ apply_handle_commit(StringInfo s)
{
case TRANS_LEADER_APPLY:
apply_handle_commit_internal(&commit_data);
+ leader_applied_nxact++;
break;
case TRANS_LEADER_SEND_TO_PARALLEL:
Assert(winfo);
+ if (dependent_xact)
+ dependent_nxact++;
+ else
+ parallelized_nxact++;
+
if (pa_send_data(winfo, s->len, s->data))
{
/* Finish processing the transaction. */
@@ -1967,6 +1983,8 @@ apply_handle_commit(StringInfo s)
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
+
+ dependent_xact = false;
}
/*
@@ -5058,6 +5076,9 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
return;
send_time = now;
+ elog(LOG, "parallelized_nxact: " UINT64_FORMAT " dependent_nxact: " UINT64_FORMAT " leader_applied_nxact: " UINT64_FORMAT,
+ parallelized_nxact, dependent_nxact, leader_applied_nxact);
+
if (!reply_message)
{
MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
--
2.50.1.windows.1