From 80f690258b51c3fe53ba80cc985aa00a83a9bedf Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 16 Mar 2023 16:16:05 +0100
Subject: [PATCH 5/6] fixup syncing/refresh sequences

---
 src/backend/replication/logical/proto.c     | 19 ++++---
 src/backend/replication/logical/worker.c    | 63 ++++++++++++++-------
 src/backend/replication/pgoutput/pgoutput.c | 22 ++++---
 src/include/replication/logicalproto.h      |  2 +-
 4 files changed, 70 insertions(+), 36 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 2b9e4b59fe1..58e5128e8a3 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -672,7 +672,6 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
 						  int64 last_value, int64 log_cnt, bool is_called)
 {
 	uint8		flags = 0;
-	char	   *relname;
 
 	pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
 
@@ -683,10 +682,10 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
 	pq_sendint8(out, flags);
 	pq_sendint64(out, lsn);
 
-	logicalrep_write_namespace(out, RelationGetNamespace(rel));
-	relname = RelationGetRelationName(rel);
-	pq_sendstring(out, relname);
+	/* use Oid as relation identifier */
+	pq_sendint32(out, RelationGetRelid(rel));
 
+	/* write sequence info */
 	pq_sendint8(out, transactional);
 	pq_sendint64(out, last_value);
 	pq_sendint64(out, log_cnt);
@@ -696,21 +695,25 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
 /*
  * Read SEQUENCE from the stream.
  */
-void
+LogicalRepRelId
 logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
 {
+	LogicalRepRelId relid;
+
 	/* XXX skipping flags and lsn */
 	pq_getmsgint(in, 1);
 	pq_getmsgint64(in);
 
-	/* Read relation name from stream */
-	seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
-	seqdata->seqname = pstrdup(pq_getmsgstring(in));
+	/* read the relation id */
+	relid = pq_getmsgint(in, 4);
 
+	/* info about the sequence */
 	seqdata->transactional = pq_getmsgint(in, 1);
 	seqdata->last_value = pq_getmsgint64(in);
 	seqdata->log_cnt = pq_getmsgint64(in);
 	seqdata->is_called = pq_getmsgint(in, 1);
+
+	return relid;
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a5be5ce044d..b21dc855541 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1447,45 +1447,70 @@ static void
 apply_handle_sequence(StringInfo s)
 {
 	LogicalRepSequence	seq;
+	LogicalRepRelMapEntry *rel;
 	Oid					relid;
+	bool				already_in_transaction PG_USED_FOR_ASSERTS_ONLY;
 
-	if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
 		return;
 
-	logicalrep_read_sequence(s, &seq);
-
 	/*
-	 * Non-transactional sequence updates should not be part of a remote
-	 * transaction. There should not be any running transaction.
+	 * Remember if we're already in transaction (begin_replication step
+	 * starts a transaction, so we can't use IsTransactionState() after
+	 * that point.
 	 */
-	Assert((!seq.transactional) || in_remote_transaction);
-	Assert(!(!seq.transactional && in_remote_transaction));
-	Assert(!(!seq.transactional && IsTransactionState()));
+	already_in_transaction = IsTransactionState();
 
 	/*
 	 * Make sure we're in a transaction (needed by SetSequence). For
 	 * non-transactional updates we're guaranteed to start a new one,
 	 * and we'll commit it at the end.
 	 */
-	if (!IsTransactionState())
-	{
-		StartTransactionCommand();
-		maybe_reread_subscription();
-	}
+	begin_replication_step();
 
-	relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
-										  seq.seqname, -1),
-							 RowExclusiveLock, false);
+	relid = logicalrep_read_sequence(s, &seq);
+
+	/*
+	 * Non-transactional sequence updates should not be part of a remote
+	 * transaction. There should not be any running transaction.
+	 */
+	Assert((!seq.transactional) || in_remote_transaction);
+	Assert(!(!seq.transactional && in_remote_transaction));
+	Assert(!(!seq.transactional && already_in_transaction));
 
 	/* lock the sequence in AccessExclusiveLock, as expected by SetSequence */
-	LockRelationOid(relid, AccessExclusiveLock);
+	rel = logicalrep_rel_open(relid, AccessExclusiveLock);
+	if (!should_apply_changes_for_rel(rel))
+	{
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, AccessExclusiveLock);
+		end_replication_step();
+
+		/*
+		 * Commit the per-stream transaction (we only do this when not in
+		 * remote transaction, i.e. for non-transactional sequence updates.)
+		 */
+		if (!in_remote_transaction)
+			CommitTransactionCommand();
+
+		return;
+	}
 
 	/* apply the sequence change */
-	SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
+	SetSequence(rel->localreloid, seq.transactional,
+				seq.last_value, seq.log_cnt, seq.is_called);
+
+	logicalrep_rel_close(rel, NoLock);
+
+	end_replication_step();
 
 	/*
 	 * Commit the per-stream transaction (we only do this when not in
-	 * remote transaction, i.e. for non-transactional sequence updates.
+	 * remote transaction, i.e. for non-transactional sequence updates.)
 	 */
 	if (!in_remote_transaction)
 		CommitTransactionCommand();
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1b4331e2c5a..e024aad6761 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -695,7 +695,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
  */
 static void
 maybe_send_schema(LogicalDecodingContext *ctx,
-				  ReorderBufferChange *change,
+				  ReorderBufferTXN *txn,
 				  Relation relation, RelationSyncEntry *relentry)
 {
 	bool		schema_sent;
@@ -711,10 +711,10 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	 * the write methods will not include it.
 	 */
 	if (in_streaming)
-		xid = change->txn->xid;
+		xid = txn->xid;
 
-	if (change->txn->toptxn)
-		topxid = change->txn->toptxn->xid;
+	if (txn->toptxn)
+		topxid = txn->toptxn->xid;
 	else
 		topxid = xid;
 
@@ -1512,7 +1512,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			 * Schema should be sent using the original relation because it
 			 * also sends the ancestor's relation.
 			 */
-			maybe_send_schema(ctx, change, relation, relentry);
+			maybe_send_schema(ctx, change->txn, relation, relentry);
 
 			OutputPluginPrepareWrite(ctx, true);
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
@@ -1562,7 +1562,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			if (txndata && !txndata->sent_begin_txn)
 				pgoutput_send_begin(ctx, txn);
 
-			maybe_send_schema(ctx, change, relation, relentry);
+			maybe_send_schema(ctx, change->txn, relation, relentry);
 
 			OutputPluginPrepareWrite(ctx, true);
 
@@ -1627,7 +1627,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (txndata && !txndata->sent_begin_txn)
 					pgoutput_send_begin(ctx, txn);
 
-				maybe_send_schema(ctx, change, relation, relentry);
+				maybe_send_schema(ctx, change->txn, relation, relentry);
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_delete(ctx->out, xid, targetrel,
@@ -1702,7 +1702,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		if (txndata && !txndata->sent_begin_txn)
 			pgoutput_send_begin(ctx, txn);
 
-		maybe_send_schema(ctx, change, relation, relentry);
+		maybe_send_schema(ctx, change->txn, relation, relentry);
 	}
 
 	if (nrelids > 0)
@@ -1808,6 +1808,12 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
 			pgoutput_send_begin(ctx, txn);
 	}
 
+	/*
+	 * Schema should be sent using the original relation because it
+	 * also sends the ancestor's relation.
+	 */
+	maybe_send_schema(ctx, txn, relation, relentry);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_sequence(ctx->out,
 							  relation,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a5cd04f8511..6d82d5b6cc3 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -264,7 +264,7 @@ extern void logicalrep_write_sequence(StringInfo out, Relation rel,
 									  bool transactional,
 									  int64 last_value, int64 log_cnt,
 									  bool is_called);
-extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
+extern LogicalRepRelId logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel, Bitmapset *columns);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
-- 
2.39.2

