0004-Catchup-up-to-a-LSN-after-copy-of-the-seque-20230729.patch
text/x-patch
Filename: 0004-Catchup-up-to-a-LSN-after-copy-of-the-seque-20230729.patch
Type: text/x-patch
Part: 3
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0004
Subject: Catchup up to a LSN after copy of the sequence
| File | + | − |
|---|---|---|
| src/backend/replication/logical/tablesync.c | 48 | 3 |
From ef452c842d08ede3bf237e828cb0e7903b45a061 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Tue, 25 Jul 2023 16:22:39 +0200
Subject: [PATCH 4/6] Catchup up to a LSN after copy of the sequence
---
src/backend/replication/logical/tablesync.c | 51 +++++++++++++++++++--
1 file changed, 48 insertions(+), 3 deletions(-)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index cbd1c6426c..694ce70b7d 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -118,6 +118,7 @@
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
@@ -1250,12 +1251,52 @@ fetch_sequence_data(char *nspname, char *relname)
return value;
}
+/*
+ * Fetch remote insert LSN from the remote node.
+ */
+static XLogRecPtr
+fetch_remote_lsn(void)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[1] = {LSNOID};
+ XLogRecPtr value = InvalidXLogRecPtr;
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "SELECT pg_current_wal_lsn()");
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive current LSN from the publisher: %s",
+ res->err)));
+
+ /* Process the sequence. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ bool isnull;
+
+ value = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+
+ return value;
+}
+
/*
* Copy existing data of a sequence from publisher.
*
* Caller is responsible for locking the local relation.
*/
-static void
+static XLogRecPtr
copy_sequence(Relation rel)
{
LogicalRepRelMapEntry *relmapentry;
@@ -1299,6 +1340,9 @@ copy_sequence(Relation rel)
SetSequence(RelationGetRelid(rel), false, sequence_value);
logicalrep_rel_close(relmapentry, NoLock);
+
+ /* also fetch current remote LSN (after the data was selected) */
+ return fetch_remote_lsn();
}
/*
@@ -1349,6 +1393,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UserContext ucxt;
bool must_use_password;
bool run_as_owner;
+ XLogRecPtr remote_lsn = InvalidXLogRecPtr;
/* Check the state of the table synchronization. */
StartTransactionCommand();
@@ -1567,7 +1612,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{
/* Now do the initial sequence copy */
PushActiveSnapshot(GetTransactionSnapshot());
- copy_sequence(rel);
+ remote_lsn = copy_sequence(rel);
PopActiveSnapshot();
}
else
@@ -1616,7 +1661,7 @@ copy_table_done:
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
- MyLogicalRepWorker->relstate_lsn = *origin_startpos;
+ MyLogicalRepWorker->relstate_lsn = Max(*origin_startpos, remote_lsn);
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
--
2.41.0