0006-replace-created-flag-with-XLOG_SMGR_CREATE-20230719b.patch
text/x-patch
Filename: 0006-replace-created-flag-with-XLOG_SMGR_CREATE-20230719b.patch
Type: text/x-patch
Part: 5
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0006
Subject: replace created flag with XLOG_SMGR_CREATE
| File | + | − |
|---|---|---|
| src/backend/access/rmgrdesc/seqdesc.c | 2 | 2 |
| src/backend/commands/sequence.c | 22 | 4 |
| src/backend/replication/logical/decode.c | 70 | 7 |
| src/backend/replication/logical/reorderbuffer.c | 46 | 44 |
| src/include/access/rmgrlist.h | 1 | 1 |
| src/include/commands/sequence.h | 0 | 1 |
| src/include/replication/decode.h | 1 | 0 |
| src/include/replication/reorderbuffer.h | 5 | 3 |
| src/test/subscription/t/034_sequences.pl | 6 | 6 |
From a2e483f248af4b74c90930ec610e47d64d11f0d5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Wed, 19 Jul 2023 12:39:58 +0200
Subject: [PATCH 6/6] replace created flag with XLOG_SMGR_CREATE
---
src/backend/access/rmgrdesc/seqdesc.c | 4 +-
src/backend/commands/sequence.c | 26 +++++-
src/backend/replication/logical/decode.c | 77 ++++++++++++++--
.../replication/logical/reorderbuffer.c | 90 ++++++++++---------
src/include/access/rmgrlist.h | 2 +-
src/include/commands/sequence.h | 1 -
src/include/replication/decode.h | 1 +
src/include/replication/reorderbuffer.h | 8 +-
src/test/subscription/t/034_sequences.pl | 12 +--
9 files changed, 153 insertions(+), 68 deletions(-)
diff --git a/src/backend/access/rmgrdesc/seqdesc.c b/src/backend/access/rmgrdesc/seqdesc.c
index 3dabc252d8..ba60544085 100644
--- a/src/backend/access/rmgrdesc/seqdesc.c
+++ b/src/backend/access/rmgrdesc/seqdesc.c
@@ -25,9 +25,9 @@ seq_desc(StringInfo buf, XLogReaderState *record)
xl_seq_rec *xlrec = (xl_seq_rec *) rec;
if (info == XLOG_SEQ_LOG)
- appendStringInfo(buf, "rel %u/%u/%u created %d",
+ appendStringInfo(buf, "rel %u/%u/%u",
xlrec->locator.spcOid, xlrec->locator.dbOid,
- xlrec->locator.relNumber, xlrec->created);
+ xlrec->locator.relNumber);
}
const char *
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 035ff1fdae..70bfcf0802 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -217,6 +217,10 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
stmt->tablespacename = NULL;
stmt->if_not_exists = seq->if_not_exists;
+ /* make sure the relfilenode creation is associated with the XID */
+ if (XLogLogicalInfoActive())
+ GetCurrentTransactionId();
+
address = DefineRelation(stmt, RELKIND_SEQUENCE, seq->ownerId, NULL, NULL);
seqoid = address.objectId;
Assert(seqoid != InvalidOid);
@@ -315,6 +319,10 @@ ResetSequence(Oid seq_relid)
seq->is_called = false;
seq->log_cnt = 0;
+ /* make sure the relfilenode creation is associated with the XID */
+ if (XLogLogicalInfoActive())
+ GetCurrentTransactionId();
+
/*
* Create a new storage file for the sequence.
*/
@@ -389,7 +397,6 @@ SetSequence_non_transactional(Oid seqrelid, int64 value)
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.locator = seqrel->rd_locator;
- xlrec.created = false;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -446,6 +453,10 @@ SetSequence_transactional(Oid seq_relid, int64 value)
seq->is_called = false;
seq->log_cnt = 0;
+ /* make sure the relfilenode creation is associated with the XID */
+ if (XLogLogicalInfoActive())
+ GetCurrentTransactionId();
+
/*
* Create a new storage file for the sequence - this is needed for the
* transactional behavior.
@@ -591,7 +602,6 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.locator = rel->rd_locator;
- xlrec.created = true;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) tuple->t_data, tuple->t_len);
@@ -678,6 +688,10 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
if (RelationNeedsWAL(seqrel))
GetTopTransactionId();
+ /* make sure the relfilenode creation is associated with the XID */
+ if (XLogLogicalInfoActive())
+ GetCurrentTransactionId();
+
/*
* Create a new storage file for the sequence, making the state
* changes transactional.
@@ -727,8 +741,14 @@ SequenceChangePersistence(Oid relid, char newrelpersistence)
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(seqrel))
+ {
GetTopTransactionId();
+ /* make sure the relfilenode creation is associated with the XID */
+ if (XLogLogicalInfoActive())
+ GetCurrentTransactionId();
+ }
+
(void) read_seq_tuple(seqrel, &buf, &seqdatatuple);
RelationSetNewRelfilenumber(seqrel, newrelpersistence);
fill_seq_with_data(seqrel, &seqdatatuple);
@@ -1034,7 +1054,6 @@ nextval_internal(Oid relid, bool check_permissions)
seq->log_cnt = 0;
xlrec.locator = seqrel->rd_locator;
- xlrec.created = false;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
@@ -1233,7 +1252,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.locator = seqrel->rd_locator;
- xlrec.created = false;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index bbaffceb17..38b7e620dc 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,6 +35,7 @@
#include "access/xlogrecord.h"
#include "access/xlogutils.h"
#include "catalog/pg_control.h"
+#include "catalog/storage_xlog.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/message.h"
@@ -1351,7 +1352,6 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
Size datalen = 0;
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
- xl_seq_rec *xlrec;
Snapshot snapshot = NULL;
RepOriginId origin_id = XLogRecGetOrigin(r);
bool transactional;
@@ -1398,9 +1398,6 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
datalen = XLogRecGetDataLen(r);
tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
- /* extract the WAL record, with "created" flag */
- xlrec = (xl_seq_rec *) XLogRecGetData(r);
-
/* the sequence should not have changed without data */
if(!datalen || !tupledata)
elog(ERROR, "sequence decode missing tuple data");
@@ -1417,8 +1414,7 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* the plugin right away.
*/
transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
- target_locator,
- xlrec->created);
+ target_locator);
/* Skip the change if already processed (per the snapshot). */
if (transactional &&
@@ -1435,5 +1431,72 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
origin_id, target_locator, transactional,
- xlrec->created, tuplebuf);
+ tuplebuf);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc.
+ */
+void
+smgr_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogReaderState *r = buf->record;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+ xl_smgr_create *xlrec;
+
+ /*
+ * Bail out when not decoding sequences, which is currently the only case
+ * when we need to know about relfilenodes created in a transaction.
+ */
+ if (!ctx->sequences)
+ return;
+
+ /* Also, we only care about XLOG_SMGR_CREATE. */
+ if (info != XLOG_SMGR_CREATE)
+ return;
+
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding messages.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ /* only interested in our database */
+ xlrec = (xl_smgr_create *) XLogRecGetData(r);
+ if (xlrec->rlocator.dbOid != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding changes.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ ReorderBufferAddRelFileLocator(ctx->reorder, xid, xlrec->rlocator);
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b2e03a7183..d0bcc5a2f8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -961,13 +961,10 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
*/
bool
ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
- RelFileLocator rlocator, bool created)
+ RelFileLocator rlocator)
{
bool found = false;
- if (created)
- return true;
-
hash_search(rb->sequences,
(void *) &rlocator,
HASH_FIND,
@@ -1015,7 +1012,7 @@ ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
void
ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
- RelFileLocator rlocator, bool transactional, bool created,
+ RelFileLocator rlocator, bool transactional,
ReorderBufferTupleBuf *tuplebuf)
{
/*
@@ -1033,42 +1030,8 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
{
MemoryContext oldcontext;
ReorderBufferChange *change;
-
- /* lookup sequence by relfilelocator */
- ReorderBufferSequenceEnt *ent;
- bool found;
-
- /* transactional changes require a transaction */
- Assert(xid != InvalidTransactionId);
-
- /*
- * No explicit snapshot for transactional changes - we'll use a
- * snapshot derived later during apply (if not skipped).
- */
- Assert(!snapshot);
-
- /* search the lookup table (we ignore the return value, found is enough) */
- ent = hash_search(rb->sequences,
- (void *) &rlocator,
- created ? HASH_ENTER : HASH_FIND,
- &found);
-
- /*
- * If this is the "create" increment, we must not have found any
- * pre-existing entry in the hash table (i.e. there must not be
- * any conflicting sequence).
- */
- Assert(!(created && found));
-
- /* But we must have either created or found an existing entry. */
- Assert(created || found);
-
- /*
- * When creating the sequence, remember the XID of the transaction
- * that created id.
- */
- if (created)
- ent->xid = xid;
+ ReorderBufferSequenceEnt *ent;
+ bool found;
/* allocate and queue the transactional sequence change */
oldcontext = MemoryContextSwitchTo(rb->context);
@@ -1082,6 +1045,17 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
change->data.sequence.tuple = tuplebuf;
+ /*
+ * Lookup XID of the transaction where to queue the change (the one
+ * that did the ALTER SEQUENCE etc.)
+ */
+ ent = hash_search(rb->sequences,
+ (void *) &rlocator,
+ HASH_FIND,
+ &found);
+
+ Assert(found);
+
/* add it to the same subxact that created the sequence */
ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
@@ -1102,9 +1076,6 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
Assert(snapshot_now);
#ifdef USE_ASSERT_CHECKING
- /* All "creates" have to be handled as transactional. */
- Assert(!created);
-
/* Make sure the sequence is not in the hash table. */
{
bool found;
@@ -1201,6 +1172,37 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
}
}
+/*
+ * ReorderBufferAddRelFileLocator
+ * Add newly created relfilenode to the global hash table.
+ */
+void
+ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+ RelFileLocator rlocator)
+{
+ /* lookup sequence by relfilelocator */
+ ReorderBufferSequenceEnt *ent;
+ bool found;
+
+ /* sequence changes require a transaction */
+ if (xid == InvalidTransactionId)
+ return;
+
+ /* search the lookup table */
+ ent = hash_search(rb->sequences,
+ (void *) &rlocator,
+ HASH_ENTER,
+ &found);
+
+ /*
+ * We've just decoded creation of the relfilenode, so if we found it in
+ * the hash table, something is wrong.
+ */
+ Assert(!found);
+
+ ent->xid = xid;
+}
+
/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 683fe9e930..afd1d35221 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -27,7 +27,7 @@
/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */
PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, smgr_decode)
PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 1cb2cd16f7..fe3c813d99 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -48,7 +48,6 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data;
typedef struct xl_seq_rec
{
RelFileLocator locator;
- bool created; /* created a new relfilenode (CREATE/ALTER) */
/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
} xl_seq_rec;
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 5728bb46e9..e077a2b3c7 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -28,6 +28,7 @@ extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void smgr_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *record);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index c0e35dc24f..1f3a39c311 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -730,7 +730,7 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
Size message_size, const char *message);
extern void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
- RelFileLocator locator, bool transactional, bool created,
+ RelFileLocator locator, bool transactional,
ReorderBufferTupleBuf *tuplebuf);
extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
@@ -786,7 +786,9 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
extern void StartupReorderBuffer(void);
-bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
- RelFileLocator locator, bool created);
+extern void ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+ RelFileLocator rlocator);
+extern bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+ RelFileLocator locator);
#endif
diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl
index b8cac5fbfa..d94ce2a9aa 100644
--- a/src/test/subscription/t/034_sequences.pl
+++ b/src/test/subscription/t/034_sequences.pl
@@ -110,6 +110,12 @@ $node_publisher->safe_psql(
ROLLBACK;
));
+# Refresh publication after sequence is added to publication
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ ALTER SUBSCRIPTION seq_sub REFRESH PUBLICATION
+));
+
$node_publisher->wait_for_catchup('seq_sub');
# Check the data on subscriber
@@ -134,12 +140,6 @@ $node_publisher->safe_psql(
COMMIT;
));
-# Refresh publication after sequence is added to publication
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- ALTER SUBSCRIPTION seq_sub REFRESH PUBLICATION
-));
-
$node_publisher->wait_for_catchup('seq_sub');
# Wait for sync of the second sequence we just added to finish
--
2.41.0