0004-update-replorigin_session_origin_lsn-20230816.patch

text/x-patch

Filename: 0004-update-replorigin_session_origin_lsn-20230816.patch
Type: text/x-patch
Part: 3
Message: Re: logical decoding and replication of sequences, take 2

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch 0004
Subject: update replorigin_session_origin_lsn
File+
src/backend/replication/logical/proto.c 2 2
src/backend/replication/logical/worker.c 11 0
src/include/replication/logicalproto.h 1 0
From 04095f4e516e15414252949c3cab53bd26225529 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Wed, 16 Aug 2023 16:05:42 +0200
Subject: [PATCH 4/4] update replorigin_session_origin_lsn

---
 src/backend/replication/logical/proto.c  |  4 ++--
 src/backend/replication/logical/worker.c | 11 +++++++++++
 src/include/replication/logicalproto.h   |  1 +
 3 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 5a537c6db9f..78a04613ba5 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -698,9 +698,9 @@ logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
 {
 	LogicalRepRelId relid;
 
-	/* XXX skipping flags and lsn */
+	/* XXX skipping flags */
 	pq_getmsgint(in, 1);
-	pq_getmsgint64(in);
+	seqdata->lsn = pq_getmsgint64(in);
 
 	/* read the relation id */
 	relid = pq_getmsgint(in, 4);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index dc9e3989ca6..2e6cb565f49 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1506,7 +1506,18 @@ apply_handle_sequence(StringInfo s)
 	 * remote transaction, i.e. for non-transactional sequence updates.)
 	 */
 	if (!in_remote_transaction)
+	{
 		CommitTransactionCommand();
+
+		/*
+		 * Update origin state so we don't try applying this sequence
+		 * change in case of crash.
+		 *
+		 * XXX We don't have replorigin_session_origin_timestamp, but we
+		 * can just leave that set to 0.
+		 */
+		replorigin_session_origin_lsn = seq.lsn;
+	}
 }
 
 /*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 546b3e36839..d1f5bab658a 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -128,6 +128,7 @@ typedef struct LogicalRepTyp
 /* Sequence info */
 typedef struct LogicalRepSequence
 {
+	XLogRecPtr	lsn;			/* LSN of the sequence change */
 	Oid			remoteid;		/* unique id of the remote sequence */
 	char	   *nspname;		/* schema name of remote sequence */
 	char	   *seqname;		/* name of the remote sequence */
-- 
2.41.0