From 1e1f21ac0f86c1efc64acec36f7ee249e5c576af Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 23 Mar 2023 22:19:33 +0100
Subject: [PATCH 4/5] add interlock with ALTER SEQUENCE

---
 src/backend/commands/sequence.c             | 19 +++++++++++
 src/backend/replication/logical/tablesync.c | 36 +++++++++++++++++++++
 src/include/catalog/pg_proc.dat             |  4 +++
 3 files changed, 59 insertions(+)

diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 3311ad034f7..3b36c346e03 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -2029,6 +2029,25 @@ pg_sequence_last_value(PG_FUNCTION_ARGS)
 		PG_RETURN_NULL();
 }
 
+Datum
+pg_sequence_lock_for_sync(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	Relation	seqrel;
+
+	seqrel = relation_open(relid, RowExclusiveLock);
+
+	if (seqrel->rd_rel->relkind != RELKIND_SEQUENCE)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("\"%s\" is not a sequence",
+						RelationGetRelationName(seqrel))));
+
+	/* close the relation, but keep the lock */
+	relation_close(seqrel, NoLock);
+
+	PG_RETURN_VOID();
+}
 
 void
 seq_redo(XLogReaderState *record)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 78a280dda01..df220ba6629 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1506,6 +1506,42 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 					   slotname, false /* permanent */ , false /* two_phase */ ,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
+	/*
+	 * If we're syncing a sequence, lock it on the source to prevent concurrent
+	 * ALTER SEQUENCE changes that might be written to WAL before the slot gets
+	 * created (so not replicated), but invisible to the copy.
+	 *
+	 * XXX Has to happen after creating the slot, because it also installs a
+	 * snapshot and so there must not be any queries before it.
+	 *
+	 * XXX Does this need a version check? Probably not, because for older
+	 * versions we don't replicate sequences.
+	 */
+	if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
+	{
+		StringInfoData	cmd;
+		Oid				lockRow[] = {VOIDOID};
+
+		initStringInfo(&cmd);
+
+		/*
+		 * XXX maybe this should do fetch_remote_table_info and use the relation
+		 * and namespace names from the result?
+		 */
+		appendStringInfo(&cmd, "SELECT pg_catalog.pg_sequence_lock_for_sync('%s')",
+						 quote_qualified_identifier(get_namespace_name(RelationGetNamespace(rel)),
+													RelationGetRelationName(rel)));
+		elog(LOG, "locking: %s", cmd.data);
+		res = walrcv_exec(LogRepWorkerWalRcvConn,
+						  cmd.data, 1, lockRow);
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("sequence copy failed to lock on publisher: %s",
+							res->err)));
+		walrcv_clear_result(res);
+	}
+
 	/*
 	 * Setup replication origin tracking. The purpose of doing this before the
 	 * copy is to avoid doing the copy again due to any error in setting up
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b3843467205..dbc9804040c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11982,4 +11982,8 @@
   proname => 'any_value_transfn', prorettype => 'anyelement',
   proargtypes => 'anyelement anyelement', prosrc => 'any_value_transfn' },
 
+{ oid => '8003', descr => 'lock sequence for logical replication sync',
+  proname => 'pg_sequence_lock_for_sync', prorettype => 'void',
+  proargtypes => 'regclass', prosrc => 'pg_sequence_lock_for_sync' },
+
 ]
-- 
2.39.2

