0006-replace-created-flag-with-XLOG_SMGR_CREATE-20230719b.patch

text/x-patch

Filename: 0006-replace-created-flag-with-XLOG_SMGR_CREATE-20230719b.patch
Type: text/x-patch
Part: 5
Message: Re: logical decoding and replication of sequences, take 2

Patch

Same data as JSON: GET /api/v1/attachments/:id/patch the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes. API reference →
Format: format-patch
Series: patch 0006
Subject: replace created flag with XLOG_SMGR_CREATE
File+
src/backend/access/rmgrdesc/seqdesc.c 2 2
src/backend/commands/sequence.c 22 4
src/backend/replication/logical/decode.c 70 7
src/backend/replication/logical/reorderbuffer.c 46 44
src/include/access/rmgrlist.h 1 1
src/include/commands/sequence.h 0 1
src/include/replication/decode.h 1 0
src/include/replication/reorderbuffer.h 5 3
src/test/subscription/t/034_sequences.pl 6 6
From a2e483f248af4b74c90930ec610e47d64d11f0d5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Wed, 19 Jul 2023 12:39:58 +0200
Subject: [PATCH 6/6] replace created flag with XLOG_SMGR_CREATE

---
 src/backend/access/rmgrdesc/seqdesc.c         |  4 +-
 src/backend/commands/sequence.c               | 26 +++++-
 src/backend/replication/logical/decode.c      | 77 ++++++++++++++--
 .../replication/logical/reorderbuffer.c       | 90 ++++++++++---------
 src/include/access/rmgrlist.h                 |  2 +-
 src/include/commands/sequence.h               |  1 -
 src/include/replication/decode.h              |  1 +
 src/include/replication/reorderbuffer.h       |  8 +-
 src/test/subscription/t/034_sequences.pl      | 12 +--
 9 files changed, 153 insertions(+), 68 deletions(-)

diff --git a/src/backend/access/rmgrdesc/seqdesc.c b/src/backend/access/rmgrdesc/seqdesc.c
index 3dabc252d8..ba60544085 100644
--- a/src/backend/access/rmgrdesc/seqdesc.c
+++ b/src/backend/access/rmgrdesc/seqdesc.c
@@ -25,9 +25,9 @@ seq_desc(StringInfo buf, XLogReaderState *record)
 	xl_seq_rec *xlrec = (xl_seq_rec *) rec;
 
 	if (info == XLOG_SEQ_LOG)
-		appendStringInfo(buf, "rel %u/%u/%u created %d",
+		appendStringInfo(buf, "rel %u/%u/%u",
 						 xlrec->locator.spcOid, xlrec->locator.dbOid,
-						 xlrec->locator.relNumber, xlrec->created);
+						 xlrec->locator.relNumber);
 }
 
 const char *
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 035ff1fdae..70bfcf0802 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -217,6 +217,10 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
 	stmt->tablespacename = NULL;
 	stmt->if_not_exists = seq->if_not_exists;
 
+	/* make sure the relfilenode creation is associated with the XID */
+	if (XLogLogicalInfoActive())
+		GetCurrentTransactionId();
+
 	address = DefineRelation(stmt, RELKIND_SEQUENCE, seq->ownerId, NULL, NULL);
 	seqoid = address.objectId;
 	Assert(seqoid != InvalidOid);
@@ -315,6 +319,10 @@ ResetSequence(Oid seq_relid)
 	seq->is_called = false;
 	seq->log_cnt = 0;
 
+	/* make sure the relfilenode creation is associated with the XID */
+	if (XLogLogicalInfoActive())
+		GetCurrentTransactionId();
+
 	/*
 	 * Create a new storage file for the sequence.
 	 */
@@ -389,7 +397,6 @@ SetSequence_non_transactional(Oid seqrelid, int64 value)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.locator = seqrel->rd_locator;
-		xlrec.created = false;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -446,6 +453,10 @@ SetSequence_transactional(Oid seq_relid, int64 value)
 	seq->is_called = false;
 	seq->log_cnt = 0;
 
+	/* make sure the relfilenode creation is associated with the XID */
+	if (XLogLogicalInfoActive())
+		GetCurrentTransactionId();
+
 	/*
 	 * Create a new storage file for the sequence - this is needed for the
 	 * transactional behavior.
@@ -591,7 +602,6 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.locator = rel->rd_locator;
-		xlrec.created = true;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -678,6 +688,10 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
 		if (RelationNeedsWAL(seqrel))
 			GetTopTransactionId();
 
+		/* make sure the relfilenode creation is associated with the XID */
+		if (XLogLogicalInfoActive())
+			GetCurrentTransactionId();
+
 		/*
 		 * Create a new storage file for the sequence, making the state
 		 * changes transactional.
@@ -727,8 +741,14 @@ SequenceChangePersistence(Oid relid, char newrelpersistence)
 
 	/* check the comment above nextval_internal()'s equivalent call. */
 	if (RelationNeedsWAL(seqrel))
+	{
 		GetTopTransactionId();
 
+		/* make sure the relfilenode creation is associated with the XID */
+		if (XLogLogicalInfoActive())
+			GetCurrentTransactionId();
+	}
+
 	(void) read_seq_tuple(seqrel, &buf, &seqdatatuple);
 	RelationSetNewRelfilenumber(seqrel, newrelpersistence);
 	fill_seq_with_data(seqrel, &seqdatatuple);
@@ -1034,7 +1054,6 @@ nextval_internal(Oid relid, bool check_permissions)
 		seq->log_cnt = 0;
 
 		xlrec.locator = seqrel->rd_locator;
-		xlrec.created = false;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -1233,7 +1252,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
 		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
 
 		xlrec.locator = seqrel->rd_locator;
-		xlrec.created = false;
 
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index bbaffceb17..38b7e620dc 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,6 +35,7 @@
 #include "access/xlogrecord.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
+#include "catalog/storage_xlog.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/message.h"
@@ -1351,7 +1352,6 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	Size		datalen = 0;
 	TransactionId xid = XLogRecGetXid(r);
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
-	xl_seq_rec *xlrec;
 	Snapshot	snapshot = NULL;
 	RepOriginId origin_id = XLogRecGetOrigin(r);
 	bool		transactional;
@@ -1398,9 +1398,6 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	datalen = XLogRecGetDataLen(r);
 	tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
 
-	/* extract the WAL record, with "created" flag */
-	xlrec = (xl_seq_rec *) XLogRecGetData(r);
-
 	/* the sequence should not have changed without data */
 	if(!datalen || !tupledata)
 		elog(ERROR, "sequence decode missing tuple data");
@@ -1417,8 +1414,7 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 * the plugin right away.
 	 */
 	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
-														 target_locator,
-														 xlrec->created);
+														 target_locator);
 
 	/* Skip the change if already processed (per the snapshot). */
 	if (transactional &&
@@ -1435,5 +1431,72 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
 							   origin_id, target_locator, transactional,
-							   xlrec->created, tuplebuf);
+							   tuplebuf);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc.
+ */
+void
+smgr_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	XLogReaderState *r = buf->record;
+	TransactionId xid = XLogRecGetXid(r);
+	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+	xl_smgr_create *xlrec;
+
+	/*
+	 * Bail out when not decoding sequences, which is currently the only case
+	 * when we need to know about relfilenodes created in a transaction.
+	 */
+	if (!ctx->sequences)
+		return;
+
+	/* Also, we only care about XLOG_SMGR_CREATE. */
+	if (info != XLOG_SMGR_CREATE)
+		return;
+
+	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding messages.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	/* only interested in our database */
+	xlrec = (xl_smgr_create *) XLogRecGetData(r);
+	if (xlrec->rlocator.dbOid != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	/*
+	 * If we don't have snapshot or we are just fast-forwarding, there is no
+	 * point in decoding changes.
+	 */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+		ctx->fast_forward)
+		return;
+
+	ReorderBufferAddRelFileLocator(ctx->reorder, xid, xlrec->rlocator);
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b2e03a7183..d0bcc5a2f8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -961,13 +961,10 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
  */
 bool
 ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
-									 RelFileLocator rlocator, bool created)
+									 RelFileLocator rlocator)
 {
 	bool	found = false;
 
-	if (created)
-		return true;
-
 	hash_search(rb->sequences,
 				(void *) &rlocator,
 				HASH_FIND,
@@ -1015,7 +1012,7 @@ ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
 void
 ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-						   RelFileLocator rlocator, bool transactional, bool created,
+						   RelFileLocator rlocator, bool transactional,
 						   ReorderBufferTupleBuf *tuplebuf)
 {
 	/*
@@ -1033,42 +1030,8 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 	{
 		MemoryContext oldcontext;
 		ReorderBufferChange *change;
-
-		/* lookup sequence by relfilelocator */
-		ReorderBufferSequenceEnt   *ent;
-		bool						found;
-
-		/* transactional changes require a transaction */
-		Assert(xid != InvalidTransactionId);
-
-		/*
-		 * No explicit snapshot for transactional changes - we'll use a
-		 * snapshot derived later during apply (if not skipped).
-		 */
-		Assert(!snapshot);
-
-		/* search the lookup table (we ignore the return value, found is enough) */
-		ent = hash_search(rb->sequences,
-						  (void *) &rlocator,
-						  created ? HASH_ENTER : HASH_FIND,
-						  &found);
-
-		/*
-		 * If this is the "create" increment, we must not have found any
-		 * pre-existing entry in the hash table (i.e. there must not be
-		 * any conflicting sequence).
-		 */
-		Assert(!(created && found));
-
-		/* But we must have either created or found an existing entry. */
-		Assert(created || found);
-
-		/*
-		 * When creating the sequence, remember the XID of the transaction
-		 * that created id.
-		 */
-		if (created)
-			ent->xid = xid;
+		ReorderBufferSequenceEnt *ent;
+		bool	found;
 
 		/* allocate and queue the transactional sequence change */
 		oldcontext = MemoryContextSwitchTo(rb->context);
@@ -1082,6 +1045,17 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 
 		change->data.sequence.tuple = tuplebuf;
 
+		/*
+		 * Lookup XID of the transaction where to queue the change (the one
+		 * that did the ALTER SEQUENCE etc.)
+		 */
+		ent = hash_search(rb->sequences,
+						  (void *) &rlocator,
+						  HASH_FIND,
+						  &found);
+
+		Assert(found);
+
 		/* add it to the same subxact that created the sequence */
 		ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
 
@@ -1102,9 +1076,6 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 		Assert(snapshot_now);
 
 #ifdef USE_ASSERT_CHECKING
-		/* All "creates" have to be handled as transactional. */
-		Assert(!created);
-
 		/* Make sure the sequence is not in the hash table. */
 		{
 			bool	found;
@@ -1201,6 +1172,37 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * ReorderBufferAddRelFileLocator
+ *		Add newly created relfilenode to the global hash table.
+ */
+void
+ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+							   RelFileLocator rlocator)
+{
+	/* lookup sequence by relfilelocator */
+	ReorderBufferSequenceEnt   *ent;
+	bool						found;
+
+	/* sequence changes require a transaction */
+	if (xid == InvalidTransactionId)
+		return;
+
+	/* search the lookup table */
+	ent = hash_search(rb->sequences,
+					  (void *) &rlocator,
+					  HASH_ENTER,
+					  &found);
+
+	/*
+	 * We've just decoded creation of the relfilenode, so if we found it in
+	 * the hash table, something is wrong.
+	 */
+	Assert(!found);
+
+	ent->xid = xid;
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 683fe9e930..afd1d35221 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -27,7 +27,7 @@
 /* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */
 PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
 PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, smgr_decode)
 PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 1cb2cd16f7..fe3c813d99 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,7 +48,6 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
 typedef struct xl_seq_rec
 {
 	RelFileLocator locator;
-	bool		created;	/* created a new relfilenode (CREATE/ALTER) */
 	/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
 } xl_seq_rec;
 
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 5728bb46e9..e077a2b3c7 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -28,6 +28,7 @@ extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void smgr_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index c0e35dc24f..1f3a39c311 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -730,7 +730,7 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 									  Size message_size, const char *message);
 extern void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
 									   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
-									   RelFileLocator locator, bool transactional, bool created,
+									   RelFileLocator locator, bool transactional,
 									   ReorderBufferTupleBuf *tuplebuf);
 extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
@@ -786,7 +786,9 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 
 extern void StartupReorderBuffer(void);
 
-bool		ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
-												 RelFileLocator locator, bool created);
+extern void ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+										   RelFileLocator rlocator);
+extern bool	ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+												 RelFileLocator locator);
 
 #endif
diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl
index b8cac5fbfa..d94ce2a9aa 100644
--- a/src/test/subscription/t/034_sequences.pl
+++ b/src/test/subscription/t/034_sequences.pl
@@ -110,6 +110,12 @@ $node_publisher->safe_psql(
 	ROLLBACK;
 ));
 
+# Refresh publication after sequence is added to publication
+$result = $node_subscriber->safe_psql(
+	'postgres', qq(
+	ALTER SUBSCRIPTION seq_sub REFRESH PUBLICATION
+));
+
 $node_publisher->wait_for_catchup('seq_sub');
 
 # Check the data on subscriber
@@ -134,12 +140,6 @@ $node_publisher->safe_psql(
 	COMMIT;
 ));
 
-# Refresh publication after sequence is added to publication
-$result = $node_subscriber->safe_psql(
-	'postgres', qq(
-	ALTER SUBSCRIPTION seq_sub REFRESH PUBLICATION
-));
-
 $node_publisher->wait_for_catchup('seq_sub');
 
 # Wait for sync of the second sequence we just added to finish
-- 
2.41.0