From 961da3bf64b6d91949db83b9de72dcd72df9aa7e Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Mon, 27 Nov 2023 18:48:45 +0100
Subject: [PATCH v20231128 8/8] log XID instead of a boolean flag

---
 src/backend/access/rmgrdesc/seqdesc.c         |  4 ++--
 src/backend/access/transam/xact.c             | 20 ++++++++++++++++++
 src/backend/commands/sequence.c               | 21 ++++++++++++-------
 src/backend/replication/logical/decode.c      | 11 +++++++++-
 .../replication/logical/reorderbuffer.c       |  5 +++++
 src/include/access/xact.h                     |  1 +
 src/include/commands/sequence.h               |  2 +-
 7 files changed, 52 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/rmgrdesc/seqdesc.c b/src/backend/access/rmgrdesc/seqdesc.c
index ba60544085e..296fa5d9169 100644
--- a/src/backend/access/rmgrdesc/seqdesc.c
+++ b/src/backend/access/rmgrdesc/seqdesc.c
@@ -25,9 +25,9 @@ seq_desc(StringInfo buf, XLogReaderState *record)
 	xl_seq_rec *xlrec = (xl_seq_rec *) rec;
 
 	if (info == XLOG_SEQ_LOG)
-		appendStringInfo(buf, "rel %u/%u/%u",
+		appendStringInfo(buf, "rel %u/%u/%u xid %u",
 						 xlrec->locator.spcOid, xlrec->locator.dbOid,
-						 xlrec->locator.relNumber);
+						 xlrec->locator.relNumber, xlrec->xid);
 }
 
 const char *
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 536edb3792f..ba0522d14b2 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -805,6 +805,26 @@ SubTransactionIsActive(SubTransactionId subxid)
 	return false;
 }
 
+/*
+ *	SubTransactionIsActive
+ *
+ * Test if the specified subxact ID is still active.  Note caller is
+ * responsible for checking whether this ID is relevant to the current xact.
+ */
+TransactionId
+SubTransactionGetXid(SubTransactionId subxid)
+{
+	TransactionState s;
+
+	for (s = CurrentTransactionState; s != NULL; s = s->parent)
+	{
+		if (s->state == TRANS_ABORT)
+			continue;
+		if (s->subTransactionId == subxid)
+			return XidFromFullTransactionId(s->fullTransactionId);
+	}
+	return InvalidTransactionId;
+}
 
 /*
  *	GetCurrentCommandId
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 9da9c8270d9..f3e1a7a462c 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -112,7 +112,7 @@ static void init_params(ParseState *pstate, List *options, bool for_identity,
 						List **owned_by);
 static void do_setval(Oid relid, int64 next, bool iscalled);
 static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity);
-static inline bool is_sequence_transactional(Relation seqrel);
+static inline TransactionId sequence_xid(Relation seqrel);
 
 
 /*
@@ -256,11 +256,16 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
 	return address;
 }
 
-static inline bool
-is_sequence_transactional(Relation seqrel)
+static inline TransactionId
+sequence_xid(Relation seqrel)
 {
-	return (seqrel->rd_newRelfilelocatorSubid != InvalidSubTransactionId) ||
-		   (seqrel->rd_createSubid != InvalidSubTransactionId);
+	if (seqrel->rd_newRelfilelocatorSubid != InvalidSubTransactionId)
+		return SubTransactionGetXid(seqrel->rd_newRelfilelocatorSubid);
+
+	if (seqrel->rd_createSubid != InvalidSubTransactionId)
+		return SubTransactionGetXid(seqrel->rd_createSubid);
+
+	return InvalidTransactionId;
 }
 
 /*
@@ -609,7 +614,7 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.locator = rel->rd_locator;
-		xlrec.is_transactional = is_sequence_transactional(rel);
+		xlrec.xid = sequence_xid(rel);
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -1074,7 +1079,7 @@ nextval_internal(Oid relid, bool check_permissions)
 		seq->log_cnt = 0;
 
 		xlrec.locator = seqrel->rd_locator;
-		xlrec.is_transactional = is_sequence_transactional(seqrel);
+		xlrec.xid = sequence_xid(seqrel);
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -1276,7 +1281,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.locator = seqrel->rd_locator;
-		xlrec.is_transactional = is_sequence_transactional(seqrel);
+		xlrec.xid = sequence_xid(seqrel);
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c58b5b45f29..ff2f3f4e1aa 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -1390,6 +1390,7 @@ seq_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	Snapshot	snapshot = NULL;
 	RepOriginId origin_id = XLogRecGetOrigin(r);
 	bool		transactional;
+	TransactionId	sequence_xid;
 	xl_seq_rec *xlrec;
 
 	/* ignore sequences when the plugin does not have the callbacks */
@@ -1436,7 +1437,8 @@ seq_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 * XXX: can xlrec be combined with tupledata?
 	 */
 	xlrec = (xl_seq_rec *) XLogRecGetData(r);
-	transactional = xlrec->is_transactional;
+	sequence_xid = xlrec->xid;
+	transactional = TransactionIdIsValid(sequence_xid);
 
 	/* Skip the change if already processed (per the snapshot). */
 	if (transactional &&
@@ -1478,6 +1480,13 @@ seq_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (!transactional)
 		snapshot = SnapBuildGetOrBuildSnapshot(builder);
 
+	/*
+	 * FIXME can we override the xid like this? or should we pass both the
+	 * original XID and the XID we recorded.
+	 */
+	if (transactional)
+		xid = sequence_xid;
+
 	/* Queue the change (or send immediately if not transactional). */
 	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
 							   origin_id, target_locator, transactional,
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4121269a097..82fbfce8031 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1016,6 +1016,11 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 		/* non-transactional changes require a valid snapshot */
 		Assert(snapshot_now);
 
+		/*
+		 * FIXME can this actually be InvalidTransactionId? We get txn=NULL
+		 * and then rb->sequence() fails in pgoutput_sequence(), as it tries
+		 * to do maybe_send_schema().
+		 */
 		if (xid != InvalidTransactionId)
 			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index cb90f227ceb..fdb38c598bc 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -444,6 +444,7 @@ extern FullTransactionId GetCurrentFullTransactionId(void);
 extern FullTransactionId GetCurrentFullTransactionIdIfAny(void);
 extern void MarkCurrentTransactionIdLoggedIfAny(void);
 extern bool SubTransactionIsActive(SubTransactionId subxid);
+extern TransactionId SubTransactionGetXid(SubTransactionId subxid);
 extern CommandId GetCurrentCommandId(bool used);
 extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts);
 extern TimestampTz GetCurrentTransactionStartTimestamp(void);
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index b863f9cda36..11dfe57f04b 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,7 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
 typedef struct xl_seq_rec
 {
 	RelFileLocator locator;
-	bool		   is_transactional;
+	TransactionId  xid;	/* valid XID, if transactional */
 	/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
 } xl_seq_rec;
 
-- 
2.41.0

