From 2762886b6cc1b0942ab842cf80748e6c248238c7 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Wed, 26 Jul 2023 14:58:19 +0200
Subject: [PATCH 5/5] use page LSN for sequences

---
 src/backend/commands/sequence.c             | 92 ++++++++++++++++++---
 src/backend/replication/logical/tablesync.c | 78 +++++++----------
 src/include/catalog/pg_proc.dat             |  8 ++
 3 files changed, 118 insertions(+), 60 deletions(-)

diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index cd04c0ad058..f5b7ef7171a 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -44,6 +44,7 @@
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
 #include "utils/resowner.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -101,7 +102,8 @@ static Relation lock_and_open_sequence(SeqTable seq);
 static void create_seq_hashtable(void);
 static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
 static Form_pg_sequence_data read_seq_tuple(Relation rel,
-											Buffer *buf, HeapTuple seqdatatuple);
+											Buffer *buf, HeapTuple seqdatatuple,
+											XLogRecPtr *lsn);
 static void init_params(ParseState *pstate, List *options, bool for_identity,
 						bool isInit,
 						Form_pg_sequence seqform,
@@ -293,7 +295,7 @@ ResetSequence(Oid seq_relid)
 	 * indeed a sequence.
 	 */
 	init_sequence(seq_relid, &elm, &seq_rel);
-	(void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
+	(void) read_seq_tuple(seq_rel, &buf, &seqdatatuple, NULL);
 
 	pgstuple = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seq_relid));
 	if (!HeapTupleIsValid(pgstuple))
@@ -366,7 +368,7 @@ SetSequence_non_transactional(Oid seqrelid, int64 value)
 	init_sequence(seqrelid, &elm, &seqrel);
 
 	/* lock page' buffer and read tuple */
-	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(seqrel))
@@ -439,7 +441,7 @@ SetSequence_transactional(Oid seq_relid, int64 value)
 	init_sequence(seq_relid, &elm, &seqrel);
 
 	/* lock page' buffer and read tuple */
-	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
 
 	/* Copy the existing sequence tuple. */
 	tuple = heap_copytuple(&seqdatatuple);
@@ -670,7 +672,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
 	seqform = (Form_pg_sequence) GETSTRUCT(seqtuple);
 
 	/* lock page's buffer and read tuple into new sequence structure */
-	(void) read_seq_tuple(seqrel, &buf, &datatuple);
+	(void) read_seq_tuple(seqrel, &buf, &datatuple, NULL);
 
 	/* copy the existing sequence data tuple, so it can be modified locally */
 	newdatatuple = heap_copytuple(&datatuple);
@@ -755,7 +757,7 @@ SequenceChangePersistence(Oid relid, char newrelpersistence)
 			GetCurrentTransactionId();
 	}
 
-	(void) read_seq_tuple(seqrel, &buf, &seqdatatuple);
+	(void) read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
 	RelationSetNewRelfilenumber(seqrel, newrelpersistence);
 	fill_seq_with_data(seqrel, &seqdatatuple);
 	UnlockReleaseBuffer(buf);
@@ -884,7 +886,7 @@ nextval_internal(Oid relid, bool check_permissions)
 	ReleaseSysCache(pgstuple);
 
 	/* lock page' buffer and read tuple */
-	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
 	page = BufferGetPage(buf);
 
 	elm->increment = incby;
@@ -1207,7 +1209,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
 	PreventCommandIfParallelMode("setval()");
 
 	/* lock page' buffer and read tuple */
-	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
 
 	if ((next < minv) || (next > maxv))
 		ereport(ERROR,
@@ -1427,11 +1429,13 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
  * *buf receives the reference to the pinned-and-ex-locked buffer
  * *seqdatatuple receives the reference to the sequence tuple proper
  *		(this arg should point to a local variable of type HeapTupleData)
+ * *lsn receives LSN of the last sequence change (page LSN), optional
  *
  * Function's return value points to the data payload of the tuple
  */
 static Form_pg_sequence_data
-read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple)
+read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple,
+			   XLogRecPtr *lsn)
 {
 	Page		page;
 	ItemId		lp;
@@ -1448,6 +1452,13 @@ read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple)
 		elog(ERROR, "bad magic number in sequence \"%s\": %08X",
 			 RelationGetRelationName(rel), sm->magic);
 
+	/*
+	 * If the caller requested it, set the page LSN. This allows deciding which
+	 * sequence changes are before/after the returned sequence state.
+	 */
+	if (lsn)
+		*lsn = PageGetLSN(page);
+
 	lp = PageGetItemId(page, FirstOffsetNumber);
 	Assert(ItemIdIsNormal(lp));
 
@@ -2043,7 +2054,7 @@ pg_sequence_last_value(PG_FUNCTION_ARGS)
 				 errmsg("permission denied for sequence %s",
 						RelationGetRelationName(seqrel))));
 
-	seq = read_seq_tuple(seqrel, &buf, &seqtuple);
+	seq = read_seq_tuple(seqrel, &buf, &seqtuple, NULL);
 
 	is_called = seq->is_called;
 	result = seq->last_value;
@@ -2057,6 +2068,67 @@ pg_sequence_last_value(PG_FUNCTION_ARGS)
 		PG_RETURN_NULL();
 }
 
+/*
+ * Return the current on-disk state of the sequence.
+ *
+ * Note: This is roughly equivalent to selecting the data from the sequence,
+ * except that it also returns the page LSN.
+ */
+Datum
+pg_sequence_state(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	SeqTable	elm;
+	Relation	seqrel;
+	Buffer		buf;
+	HeapTupleData seqtuple;
+	Form_pg_sequence_data seq;
+	Datum		result;
+
+	int64		last_value;
+	int64		log_cnt;
+	bool		is_called;
+	XLogRecPtr	lsn;
+
+	TupleDesc	tupdesc;
+	HeapTuple	tuple;
+	Datum		values[4];
+	bool		nulls[4];
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	/* open and lock sequence */
+	init_sequence(relid, &elm, &seqrel);
+
+	if (pg_class_aclcheck(elm->relid, GetUserId(),
+						  ACL_SELECT | ACL_USAGE) != ACLCHECK_OK)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("permission denied for sequence %s",
+						RelationGetRelationName(seqrel))));
+
+	seq = read_seq_tuple(seqrel, &buf, &seqtuple, &lsn);
+
+	is_called = seq->is_called;
+	last_value = seq->last_value;
+	log_cnt = seq->log_cnt;
+
+	UnlockReleaseBuffer(buf);
+	relation_close(seqrel, NoLock);
+
+	values[0] = LSNGetDatum(lsn);
+	values[1] = Int64GetDatum(last_value);
+	values[2] = Int64GetDatum(log_cnt);
+	values[3] = BoolGetDatum(is_called);
+
+	memset(nulls, 0, sizeof(nulls));
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	result = HeapTupleGetDatum(tuple);
+
+	PG_RETURN_DATUM(result);
+}
 
 void
 seq_redo(XLogReaderState *record)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 694ce70b7d8..44e64ed1de6 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1211,22 +1211,23 @@ copy_table(Relation rel)
 }
 
 /*
- * Fetch sequence data (current state) from the remote node.
+ * Fetch sequence data (current state) from the remote node, including the
+ * page LSN.
  */
 static int64
-fetch_sequence_data(char *nspname, char *relname)
+fetch_sequence_data(Oid remoteid, XLogRecPtr *lsn)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[1] = {INT8OID};
+	Oid			tableRow[2] = {INT8OID, LSNOID};
 	int64		value = (Datum) 0;
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT (last_value + log_cnt)\n"
-					   "  FROM %s", quote_qualified_identifier(nspname, relname));
+	appendStringInfo(&cmd, "SELECT (last_value + log_cnt), page_lsn "
+						   "FROM pg_sequence_state(%d)", remoteid);
 
-	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, tableRow);
+	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 2, tableRow);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
@@ -1242,45 +1243,8 @@ fetch_sequence_data(char *nspname, char *relname)
 
 		value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
-	}
-
-	ExecDropSingleTupleTableSlot(slot);
-
-	walrcv_clear_result(res);
-
-	return value;
-}
-
-/*
- * Fetch remote insert LSN from the remote node.
- */
-static XLogRecPtr
-fetch_remote_lsn(void)
-{
-	WalRcvExecResult *res;
-	StringInfoData cmd;
-	TupleTableSlot *slot;
-	Oid			tableRow[1] = {LSNOID};
-	XLogRecPtr	value = InvalidXLogRecPtr;
-
-	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT pg_current_wal_lsn()");
-
-	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, tableRow);
-	pfree(cmd.data);
 
-	if (res->status != WALRCV_OK_TUPLES)
-		ereport(ERROR,
-				(errmsg("could not receive current LSN from the publisher: %s",
-						res->err)));
-
-	/* Process the sequence. */
-	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
-	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
-	{
-		bool		isnull;
-
-		value = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+		*lsn = DatumGetInt64(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
 	}
 
@@ -1304,6 +1268,7 @@ copy_sequence(Relation rel)
 	List	   *qual = NIL;
 	StringInfoData cmd;
 	int64		sequence_value;
+	XLogRecPtr	lsn = InvalidXLogRecPtr;
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
@@ -1334,15 +1299,15 @@ copy_sequence(Relation rel)
 	 * Otherwise we might get duplicate values (on subscriber) if we failed
 	 * over right after the sync.
 	 */
-	sequence_value = fetch_sequence_data(lrel.nspname, lrel.relname);
+	sequence_value = fetch_sequence_data(lrel.remoteid, &lsn);
 
 	/* tablesync sets the sequences in non-transactional way */
 	SetSequence(RelationGetRelid(rel), false, sequence_value);
 
 	logicalrep_rel_close(relmapentry, NoLock);
 
-	/* also fetch current remote LSN (after the data was selected) */
-	return fetch_remote_lsn();
+	/* return the LSN when the sequence state was set */
+	return lsn;
 }
 
 /*
@@ -1393,7 +1358,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UserContext ucxt;
 	bool		must_use_password;
 	bool		run_as_owner;
-	XLogRecPtr	remote_lsn = InvalidXLogRecPtr;
+	XLogRecPtr	sequence_lsn = InvalidXLogRecPtr;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -1612,8 +1577,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	{
 		/* Now do the initial sequence copy */
 		PushActiveSnapshot(GetTransactionSnapshot());
-		remote_lsn = copy_sequence(rel);
+		sequence_lsn = copy_sequence(rel);
 		PopActiveSnapshot();
+
+		/*
+		 * Sequences are not consistent (in the MVCC sense) with respect to the
+		 * replication slot, so the copy might have read a more recent state
+		 * than origin_startpos. The sequence_lsn comes from page LSN (which is
+		 * LSN of the last sequence change), so that's the right position where
+		 * to start with the catchup apply.
+		 *
+		 * It might be before the slot, though (if the sequence was not used
+		 * since between the slot creation and copy), so make sure the position
+		 * does not move backwards.
+		 */
+		*origin_startpos = Max(*origin_startpos, sequence_lsn);
 	}
 	else
 	{
@@ -1661,7 +1639,7 @@ copy_table_done:
 	 */
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
-	MyLogicalRepWorker->relstate_lsn = Max(*origin_startpos, remote_lsn);
+	MyLogicalRepWorker->relstate_lsn = *origin_startpos;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
 	/*
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 00124946c02..dc150dfd1cb 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -3326,6 +3326,14 @@
   proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u',
   prorettype => 'int8', proargtypes => 'regclass',
   prosrc => 'pg_sequence_last_value' },
+{ oid => '4549',
+  descr => 'current on-disk sequence state',
+  proname => 'pg_sequence_state', provolatile => 'v',
+  prorettype => 'record', proargtypes => 'regclass',
+  proallargtypes => '{oid,pg_lsn,int8,int8,bool}',
+  proargmodes => '{i,o,o,o,o}',
+  proargnames => '{seq_oid,page_lsn,last_value,log_cnt,is_called}',
+  prosrc => 'pg_sequence_state' },
 
 { oid => '275', descr => 'return the next oid for a system table',
   proname => 'pg_nextoid', provolatile => 'v', proparallel => 'u',
-- 
2.41.0

