0005-fixup-syncing-refresh-sequences-20230316.patch
text/x-patch
Filename: 0005-fixup-syncing-refresh-sequences-20230316.patch
Type: text/x-patch
Part: 4
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0005
Subject: fixup syncing/refresh sequences
| File | + | − |
|---|---|---|
| src/backend/replication/logical/proto.c | 11 | 8 |
| src/backend/replication/logical/worker.c | 44 | 19 |
| src/backend/replication/pgoutput/pgoutput.c | 14 | 8 |
| src/include/replication/logicalproto.h | 1 | 1 |
From 80f690258b51c3fe53ba80cc985aa00a83a9bedf Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Thu, 16 Mar 2023 16:16:05 +0100
Subject: [PATCH 5/6] fixup syncing/refresh sequences
---
src/backend/replication/logical/proto.c | 19 ++++---
src/backend/replication/logical/worker.c | 63 ++++++++++++++-------
src/backend/replication/pgoutput/pgoutput.c | 22 ++++---
src/include/replication/logicalproto.h | 2 +-
4 files changed, 70 insertions(+), 36 deletions(-)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 2b9e4b59fe1..58e5128e8a3 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -672,7 +672,6 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
int64 last_value, int64 log_cnt, bool is_called)
{
uint8 flags = 0;
- char *relname;
pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
@@ -683,10 +682,10 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
pq_sendint8(out, flags);
pq_sendint64(out, lsn);
- logicalrep_write_namespace(out, RelationGetNamespace(rel));
- relname = RelationGetRelationName(rel);
- pq_sendstring(out, relname);
+ /* use Oid as relation identifier */
+ pq_sendint32(out, RelationGetRelid(rel));
+ /* write sequence info */
pq_sendint8(out, transactional);
pq_sendint64(out, last_value);
pq_sendint64(out, log_cnt);
@@ -696,21 +695,25 @@ logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
/*
* Read SEQUENCE from the stream.
*/
-void
+LogicalRepRelId
logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
{
+ LogicalRepRelId relid;
+
/* XXX skipping flags and lsn */
pq_getmsgint(in, 1);
pq_getmsgint64(in);
- /* Read relation name from stream */
- seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
- seqdata->seqname = pstrdup(pq_getmsgstring(in));
+ /* read the relation id */
+ relid = pq_getmsgint(in, 4);
+ /* info about the sequence */
seqdata->transactional = pq_getmsgint(in, 1);
seqdata->last_value = pq_getmsgint64(in);
seqdata->log_cnt = pq_getmsgint64(in);
seqdata->is_called = pq_getmsgint(in, 1);
+
+ return relid;
}
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a5be5ce044d..b21dc855541 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1447,45 +1447,70 @@ static void
apply_handle_sequence(StringInfo s)
{
LogicalRepSequence seq;
+ LogicalRepRelMapEntry *rel;
Oid relid;
+ bool already_in_transaction PG_USED_FOR_ASSERTS_ONLY;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
return;
- logicalrep_read_sequence(s, &seq);
-
/*
- * Non-transactional sequence updates should not be part of a remote
- * transaction. There should not be any running transaction.
+ * Remember if we're already in transaction (begin_replication step
+ * starts a transaction, so we can't use IsTransactionState() after
+ * that point.
*/
- Assert((!seq.transactional) || in_remote_transaction);
- Assert(!(!seq.transactional && in_remote_transaction));
- Assert(!(!seq.transactional && IsTransactionState()));
+ already_in_transaction = IsTransactionState();
/*
* Make sure we're in a transaction (needed by SetSequence). For
* non-transactional updates we're guaranteed to start a new one,
* and we'll commit it at the end.
*/
- if (!IsTransactionState())
- {
- StartTransactionCommand();
- maybe_reread_subscription();
- }
+ begin_replication_step();
- relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
- seq.seqname, -1),
- RowExclusiveLock, false);
+ relid = logicalrep_read_sequence(s, &seq);
+
+ /*
+ * Non-transactional sequence updates should not be part of a remote
+ * transaction. There should not be any running transaction.
+ */
+ Assert((!seq.transactional) || in_remote_transaction);
+ Assert(!(!seq.transactional && in_remote_transaction));
+ Assert(!(!seq.transactional && already_in_transaction));
/* lock the sequence in AccessExclusiveLock, as expected by SetSequence */
- LockRelationOid(relid, AccessExclusiveLock);
+ rel = logicalrep_rel_open(relid, AccessExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, AccessExclusiveLock);
+ end_replication_step();
+
+ /*
+ * Commit the per-stream transaction (we only do this when not in
+ * remote transaction, i.e. for non-transactional sequence updates.)
+ */
+ if (!in_remote_transaction)
+ CommitTransactionCommand();
+
+ return;
+ }
/* apply the sequence change */
- SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
+ SetSequence(rel->localreloid, seq.transactional,
+ seq.last_value, seq.log_cnt, seq.is_called);
+
+ logicalrep_rel_close(rel, NoLock);
+
+ end_replication_step();
/*
* Commit the per-stream transaction (we only do this when not in
- * remote transaction, i.e. for non-transactional sequence updates.
+ * remote transaction, i.e. for non-transactional sequence updates.)
*/
if (!in_remote_transaction)
CommitTransactionCommand();
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1b4331e2c5a..e024aad6761 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -695,7 +695,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
*/
static void
maybe_send_schema(LogicalDecodingContext *ctx,
- ReorderBufferChange *change,
+ ReorderBufferTXN *txn,
Relation relation, RelationSyncEntry *relentry)
{
bool schema_sent;
@@ -711,10 +711,10 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* the write methods will not include it.
*/
if (in_streaming)
- xid = change->txn->xid;
+ xid = txn->xid;
- if (change->txn->toptxn)
- topxid = change->txn->toptxn->xid;
+ if (txn->toptxn)
+ topxid = txn->toptxn->xid;
else
topxid = xid;
@@ -1512,7 +1512,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* Schema should be sent using the original relation because it
* also sends the ancestor's relation.
*/
- maybe_send_schema(ctx, change, relation, relentry);
+ maybe_send_schema(ctx, change->txn, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
@@ -1562,7 +1562,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
- maybe_send_schema(ctx, change, relation, relentry);
+ maybe_send_schema(ctx, change->txn, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
@@ -1627,7 +1627,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
- maybe_send_schema(ctx, change, relation, relentry);
+ maybe_send_schema(ctx, change->txn, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_delete(ctx->out, xid, targetrel,
@@ -1702,7 +1702,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (txndata && !txndata->sent_begin_txn)
pgoutput_send_begin(ctx, txn);
- maybe_send_schema(ctx, change, relation, relentry);
+ maybe_send_schema(ctx, change->txn, relation, relentry);
}
if (nrelids > 0)
@@ -1808,6 +1808,12 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
pgoutput_send_begin(ctx, txn);
}
+ /*
+ * Schema should be sent using the original relation because it
+ * also sends the ancestor's relation.
+ */
+ maybe_send_schema(ctx, txn, relation, relentry);
+
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_sequence(ctx->out,
relation,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a5cd04f8511..6d82d5b6cc3 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -264,7 +264,7 @@ extern void logicalrep_write_sequence(StringInfo out, Relation rel,
bool transactional,
int64 last_value, int64 log_cnt,
bool is_called);
-extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
+extern LogicalRepRelId logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel, Bitmapset *columns);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
--
2.39.2