0005-use-page-LSN-for-sequences-20230729.patch
text/x-patch
Filename: 0005-use-page-LSN-for-sequences-20230729.patch
Type: text/x-patch
Part: 4
Patch
Same data as JSON:
GET /api/v1/attachments/:id/patch
the parsed metadata as JSON — format, series position, per-file stats; never the diff bytes.
API reference →
Format: format-patch
Series: patch 0005
Subject: use page LSN for sequences
| File | + | − |
|---|---|---|
| src/backend/commands/sequence.c | 82 | 10 |
| src/backend/replication/logical/tablesync.c | 28 | 50 |
| src/include/catalog/pg_proc.dat | 8 | 0 |
From 686f10d930bcbc06e0c8297068e455bd678b5234 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Wed, 26 Jul 2023 14:58:19 +0200
Subject: [PATCH 5/6] use page LSN for sequences
---
src/backend/commands/sequence.c | 92 ++++++++++++++++++---
src/backend/replication/logical/tablesync.c | 78 +++++++----------
src/include/catalog/pg_proc.dat | 8 ++
3 files changed, 118 insertions(+), 60 deletions(-)
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index cd04c0ad05..f5b7ef7171 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -44,6 +44,7 @@
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
#include "utils/resowner.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
@@ -101,7 +102,8 @@ static Relation lock_and_open_sequence(SeqTable seq);
static void create_seq_hashtable(void);
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
static Form_pg_sequence_data read_seq_tuple(Relation rel,
- Buffer *buf, HeapTuple seqdatatuple);
+ Buffer *buf, HeapTuple seqdatatuple,
+ XLogRecPtr *lsn);
static void init_params(ParseState *pstate, List *options, bool for_identity,
bool isInit,
Form_pg_sequence seqform,
@@ -293,7 +295,7 @@ ResetSequence(Oid seq_relid)
* indeed a sequence.
*/
init_sequence(seq_relid, &elm, &seq_rel);
- (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
+ (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple, NULL);
pgstuple = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seq_relid));
if (!HeapTupleIsValid(pgstuple))
@@ -366,7 +368,7 @@ SetSequence_non_transactional(Oid seqrelid, int64 value)
init_sequence(seqrelid, &elm, &seqrel);
/* lock page' buffer and read tuple */
- seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+ seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(seqrel))
@@ -439,7 +441,7 @@ SetSequence_transactional(Oid seq_relid, int64 value)
init_sequence(seq_relid, &elm, &seqrel);
/* lock page' buffer and read tuple */
- seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+ seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
/* Copy the existing sequence tuple. */
tuple = heap_copytuple(&seqdatatuple);
@@ -670,7 +672,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
seqform = (Form_pg_sequence) GETSTRUCT(seqtuple);
/* lock page's buffer and read tuple into new sequence structure */
- (void) read_seq_tuple(seqrel, &buf, &datatuple);
+ (void) read_seq_tuple(seqrel, &buf, &datatuple, NULL);
/* copy the existing sequence data tuple, so it can be modified locally */
newdatatuple = heap_copytuple(&datatuple);
@@ -755,7 +757,7 @@ SequenceChangePersistence(Oid relid, char newrelpersistence)
GetCurrentTransactionId();
}
- (void) read_seq_tuple(seqrel, &buf, &seqdatatuple);
+ (void) read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
RelationSetNewRelfilenumber(seqrel, newrelpersistence);
fill_seq_with_data(seqrel, &seqdatatuple);
UnlockReleaseBuffer(buf);
@@ -884,7 +886,7 @@ nextval_internal(Oid relid, bool check_permissions)
ReleaseSysCache(pgstuple);
/* lock page' buffer and read tuple */
- seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+ seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
page = BufferGetPage(buf);
elm->increment = incby;
@@ -1207,7 +1209,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
PreventCommandIfParallelMode("setval()");
/* lock page' buffer and read tuple */
- seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+ seq = read_seq_tuple(seqrel, &buf, &seqdatatuple, NULL);
if ((next < minv) || (next > maxv))
ereport(ERROR,
@@ -1427,11 +1429,13 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
* *buf receives the reference to the pinned-and-ex-locked buffer
* *seqdatatuple receives the reference to the sequence tuple proper
* (this arg should point to a local variable of type HeapTupleData)
+ * *lsn receives LSN of the last sequence change (page LSN), optional
*
* Function's return value points to the data payload of the tuple
*/
static Form_pg_sequence_data
-read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple)
+read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple,
+ XLogRecPtr *lsn)
{
Page page;
ItemId lp;
@@ -1448,6 +1452,13 @@ read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple)
elog(ERROR, "bad magic number in sequence \"%s\": %08X",
RelationGetRelationName(rel), sm->magic);
+ /*
+ * If the caller requested it, set the page LSN. This allows deciding which
+ * sequence changes are before/after the returned sequence state.
+ */
+ if (lsn)
+ *lsn = PageGetLSN(page);
+
lp = PageGetItemId(page, FirstOffsetNumber);
Assert(ItemIdIsNormal(lp));
@@ -2043,7 +2054,7 @@ pg_sequence_last_value(PG_FUNCTION_ARGS)
errmsg("permission denied for sequence %s",
RelationGetRelationName(seqrel))));
- seq = read_seq_tuple(seqrel, &buf, &seqtuple);
+ seq = read_seq_tuple(seqrel, &buf, &seqtuple, NULL);
is_called = seq->is_called;
result = seq->last_value;
@@ -2057,6 +2068,67 @@ pg_sequence_last_value(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}
+/*
+ * Return the current on-disk state of the sequence.
+ *
+ * Note: This is roughly equivalent to selecting the data from the sequence,
+ * except that it also returns the page LSN.
+ */
+Datum
+pg_sequence_state(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ SeqTable elm;
+ Relation seqrel;
+ Buffer buf;
+ HeapTupleData seqtuple;
+ Form_pg_sequence_data seq;
+ Datum result;
+
+ int64 last_value;
+ int64 log_cnt;
+ bool is_called;
+ XLogRecPtr lsn;
+
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+ Datum values[4];
+ bool nulls[4];
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ /* open and lock sequence */
+ init_sequence(relid, &elm, &seqrel);
+
+ if (pg_class_aclcheck(elm->relid, GetUserId(),
+ ACL_SELECT | ACL_USAGE) != ACLCHECK_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied for sequence %s",
+ RelationGetRelationName(seqrel))));
+
+ seq = read_seq_tuple(seqrel, &buf, &seqtuple, &lsn);
+
+ is_called = seq->is_called;
+ last_value = seq->last_value;
+ log_cnt = seq->log_cnt;
+
+ UnlockReleaseBuffer(buf);
+ relation_close(seqrel, NoLock);
+
+ values[0] = LSNGetDatum(lsn);
+ values[1] = Int64GetDatum(last_value);
+ values[2] = Int64GetDatum(log_cnt);
+ values[3] = BoolGetDatum(is_called);
+
+ memset(nulls, 0, sizeof(nulls));
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+
+ PG_RETURN_DATUM(result);
+}
void
seq_redo(XLogReaderState *record)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 694ce70b7d..44e64ed1de 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1211,22 +1211,23 @@ copy_table(Relation rel)
}
/*
- * Fetch sequence data (current state) from the remote node.
+ * Fetch sequence data (current state) from the remote node, including the
+ * page LSN.
*/
static int64
-fetch_sequence_data(char *nspname, char *relname)
+fetch_sequence_data(Oid remoteid, XLogRecPtr *lsn)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
- Oid tableRow[1] = {INT8OID};
+ Oid tableRow[2] = {INT8OID, LSNOID};
int64 value = (Datum) 0;
initStringInfo(&cmd);
- appendStringInfo(&cmd, "SELECT (last_value + log_cnt)\n"
- " FROM %s", quote_qualified_identifier(nspname, relname));
+ appendStringInfo(&cmd, "SELECT (last_value + log_cnt), page_lsn "
+ "FROM pg_sequence_state(%d)", remoteid);
- res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, tableRow);
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 2, tableRow);
pfree(cmd.data);
if (res->status != WALRCV_OK_TUPLES)
@@ -1242,45 +1243,8 @@ fetch_sequence_data(char *nspname, char *relname)
value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
Assert(!isnull);
- }
-
- ExecDropSingleTupleTableSlot(slot);
-
- walrcv_clear_result(res);
-
- return value;
-}
-
-/*
- * Fetch remote insert LSN from the remote node.
- */
-static XLogRecPtr
-fetch_remote_lsn(void)
-{
- WalRcvExecResult *res;
- StringInfoData cmd;
- TupleTableSlot *slot;
- Oid tableRow[1] = {LSNOID};
- XLogRecPtr value = InvalidXLogRecPtr;
-
- initStringInfo(&cmd);
- appendStringInfo(&cmd, "SELECT pg_current_wal_lsn()");
-
- res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, tableRow);
- pfree(cmd.data);
- if (res->status != WALRCV_OK_TUPLES)
- ereport(ERROR,
- (errmsg("could not receive current LSN from the publisher: %s",
- res->err)));
-
- /* Process the sequence. */
- slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
- while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
- {
- bool isnull;
-
- value = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+ *lsn = DatumGetInt64(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
}
@@ -1304,6 +1268,7 @@ copy_sequence(Relation rel)
List *qual = NIL;
StringInfoData cmd;
int64 sequence_value;
+ XLogRecPtr lsn = InvalidXLogRecPtr;
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
@@ -1334,15 +1299,15 @@ copy_sequence(Relation rel)
* Otherwise we might get duplicate values (on subscriber) if we failed
* over right after the sync.
*/
- sequence_value = fetch_sequence_data(lrel.nspname, lrel.relname);
+ sequence_value = fetch_sequence_data(lrel.remoteid, &lsn);
/* tablesync sets the sequences in non-transactional way */
SetSequence(RelationGetRelid(rel), false, sequence_value);
logicalrep_rel_close(relmapentry, NoLock);
- /* also fetch current remote LSN (after the data was selected) */
- return fetch_remote_lsn();
+ /* return the LSN when the sequence state was set */
+ return lsn;
}
/*
@@ -1393,7 +1358,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UserContext ucxt;
bool must_use_password;
bool run_as_owner;
- XLogRecPtr remote_lsn = InvalidXLogRecPtr;
+ XLogRecPtr sequence_lsn = InvalidXLogRecPtr;
/* Check the state of the table synchronization. */
StartTransactionCommand();
@@ -1612,8 +1577,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{
/* Now do the initial sequence copy */
PushActiveSnapshot(GetTransactionSnapshot());
- remote_lsn = copy_sequence(rel);
+ sequence_lsn = copy_sequence(rel);
PopActiveSnapshot();
+
+ /*
+ * Sequences are not consistent (in the MVCC sense) with respect to the
+ * replication slot, so the copy might have read a more recent state
+ * than origin_startpos. The sequence_lsn comes from page LSN (which is
+ * LSN of the last sequence change), so that's the right position where
+ * to start with the catchup apply.
+ *
+ * It might be before the slot, though (if the sequence was not used
+ * since between the slot creation and copy), so make sure the position
+ * does not move backwards.
+ */
+ *origin_startpos = Max(*origin_startpos, sequence_lsn);
}
else
{
@@ -1661,7 +1639,7 @@ copy_table_done:
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
- MyLogicalRepWorker->relstate_lsn = Max(*origin_startpos, remote_lsn);
+ MyLogicalRepWorker->relstate_lsn = *origin_startpos;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 00124946c0..dc150dfd1c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -3326,6 +3326,14 @@
proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u',
prorettype => 'int8', proargtypes => 'regclass',
prosrc => 'pg_sequence_last_value' },
+{ oid => '4549',
+ descr => 'current on-disk sequence state',
+ proname => 'pg_sequence_state', provolatile => 'v',
+ prorettype => 'record', proargtypes => 'regclass',
+ proallargtypes => '{oid,pg_lsn,int8,int8,bool}',
+ proargmodes => '{i,o,o,o,o}',
+ proargnames => '{seq_oid,page_lsn,last_value,log_cnt,is_called}',
+ prosrc => 'pg_sequence_state' },
{ oid => '275', descr => 'return the next oid for a system table',
proname => 'pg_nextoid', provolatile => 'v', proparallel => 'u',
--
2.41.0