0004-Catchup-up-to-a-LSN-after-copy-of-the-seque-20230729.patch

text/x-patch

Filename: 0004-Catchup-up-to-a-LSN-after-copy-of-the-seque-20230729.patch
Type: text/x-patch
Part: 3
Message: Re: logical decoding and replication of sequences, take 2

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch 0004
Subject: Catchup up to a LSN after copy of the sequence
File+
src/backend/replication/logical/tablesync.c 48 3
From ef452c842d08ede3bf237e828cb0e7903b45a061 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Tue, 25 Jul 2023 16:22:39 +0200
Subject: [PATCH 4/6] Catchup up to a LSN after copy of the sequence

---
 src/backend/replication/logical/tablesync.c | 51 +++++++++++++++++++--
 1 file changed, 48 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index cbd1c6426c..694ce70b7d 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -118,6 +118,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
@@ -1250,12 +1251,52 @@ fetch_sequence_data(char *nspname, char *relname)
 	return value;
 }
 
+/*
+ * Fetch remote insert LSN from the remote node.
+ */
+static XLogRecPtr
+fetch_remote_lsn(void)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[1] = {LSNOID};
+	XLogRecPtr	value = InvalidXLogRecPtr;
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "SELECT pg_current_wal_lsn()");
+
+	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive current LSN from the publisher: %s",
+						res->err)));
+
+	/* Process the sequence. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		bool		isnull;
+
+		value = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+	}
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+
+	return value;
+}
+
 /*
  * Copy existing data of a sequence from publisher.
  *
  * Caller is responsible for locking the local relation.
  */
-static void
+static XLogRecPtr
 copy_sequence(Relation rel)
 {
 	LogicalRepRelMapEntry *relmapentry;
@@ -1299,6 +1340,9 @@ copy_sequence(Relation rel)
 	SetSequence(RelationGetRelid(rel), false, sequence_value);
 
 	logicalrep_rel_close(relmapentry, NoLock);
+
+	/* also fetch current remote LSN (after the data was selected) */
+	return fetch_remote_lsn();
 }
 
 /*
@@ -1349,6 +1393,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UserContext ucxt;
 	bool		must_use_password;
 	bool		run_as_owner;
+	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -1567,7 +1612,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	{
 		/* Now do the initial sequence copy */
 		PushActiveSnapshot(GetTransactionSnapshot());
-		copy_sequence(rel);
+		remote_lsn = copy_sequence(rel);
 		PopActiveSnapshot();
 	}
 	else
@@ -1616,7 +1661,7 @@ copy_table_done:
 	 */
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
-	MyLogicalRepWorker->relstate_lsn = *origin_startpos;
+	MyLogicalRepWorker->relstate_lsn = Max(*origin_startpos, remote_lsn);
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
 	/*
-- 
2.41.0