Re: logical decoding and replication of sequences, take 2

Tomas Vondra <tomas.vondra@enterprisedb.com>

From: Tomas Vondra <tomas.vondra@enterprisedb.com>
To: Andres Freund <andres@anarazel.de>
Cc: Robert Haas <robertmhaas@gmail.com>, PostgreSQL Hackers <pgsql-hackers@lists.postgresql.org>, Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: 2023-01-11T21:46:26Z
Lists: pgsql-hackers

Commits

Same data as JSON: GET /api/v1/messages/:b64id/commits the thread's linked commits as JSON, with link sources. API reference →
  1. Migrate logical slots to the new node during an upgrade.

  2. Make test_decoding ddl.out shorter

  3. Fix snapshot handling in logicalmsg_decode

  4. doc: Adjust a few more references to "postmaster"

  5. Revert "Logical decoding of sequences"


On 1/11/23 21:12, Andres Freund wrote:
> Hi,
> 
> 
> Heikki, CCed you due to the point about 2c03216d8311 below.
> 
> 
> On 2023-01-10 19:32:12 +0100, Tomas Vondra wrote:
>> 0001 is a fix for the pre-existing issue in logicalmsg_decode,
>> attempting to build a snapshot before getting into a consistent state.
>> AFAICS this only affects assert-enabled builds and is otherwise
>> harmless, because we are not actually using the snapshot (apply gets a
>> valid snapshot from the transaction).
> 
> LGTM.
> 
> 
>> 0002 is a rebased version of the original approach, committed as
>> 0da92dc530 (and then reverted in 2c7ea57e56). This includes the same fix
>> as 0001 (for the sequence messages), the primary reason for the revert.
>>
>> The rebase was not quite straightforward, due to extensive changes in
>> how publications deal with tables/schemas, and so on. So this adopts
>> them, but other than that it behaves just like the original patch.
> 
> This is a huge diff:
>>  72 files changed, 4715 insertions(+), 612 deletions(-)
> 
> It'd be nice to split it to make review easier. Perhaps the sequence decoding
> support could be split from the whole publication rigamarole?
> 
> 
>> This does not include any changes to test_decoding and/or the built-in
>> replication - those will be committed in separate patches.
> 
> Looks like that's not the case anymore?
> 

Ah, right!  Now I realized I originally committed this in chunks, but
the revert was a single commit. And I just "reverted the revert" to
create this patch.

I'll definitely split this into smaller patches. This also explains the
obsolete commit message about test_decoding not being included, etc.

> 
>> +/*
>> + * Update the sequence state by modifying the existing sequence data row.
>> + *
>> + * This keeps the same relfilenode, so the behavior is non-transactional.
>> + */
>> +static void
>> +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called)
>> +{
>> +	SeqTable	elm;
>> +	Relation	seqrel;
>> +	Buffer		buf;
>> +	HeapTupleData seqdatatuple;
>> +	Form_pg_sequence_data seq;
>> +
>> +	/* open and lock sequence */
>> +	init_sequence(seqrelid, &elm, &seqrel);
>> +
>> +	/* lock page' buffer and read tuple */
>> +	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
>> +
>> +	/* check the comment above nextval_internal()'s equivalent call. */
>> +	if (RelationNeedsWAL(seqrel))
>> +	{
>> +		GetTopTransactionId();
>> +
>> +		if (XLogLogicalInfoActive())
>> +			GetCurrentTransactionId();
>> +	}
>> +
>> +	/* ready to change the on-disk (or really, in-buffer) tuple */
>> +	START_CRIT_SECTION();
>> +
>> +	seq->last_value = last_value;
>> +	seq->is_called = is_called;
>> +	seq->log_cnt = log_cnt;
>> +
>> +	MarkBufferDirty(buf);
>> +
>> +	/* XLOG stuff */
>> +	if (RelationNeedsWAL(seqrel))
>> +	{
>> +		xl_seq_rec	xlrec;
>> +		XLogRecPtr	recptr;
>> +		Page		page = BufferGetPage(buf);
>> +
>> +		XLogBeginInsert();
>> +		XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
>> +
>> +		xlrec.locator = seqrel->rd_locator;
>> +		xlrec.created = false;
>> +
>> +		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
>> +		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
>> +
>> +		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
>> +
>> +		PageSetLSN(page, recptr);
>> +	}
>> +
>> +	END_CRIT_SECTION();
>> +
>> +	UnlockReleaseBuffer(buf);
>> +
>> +	/* Clear local cache so that we don't think we have cached numbers */
>> +	/* Note that we do not change the currval() state */
>> +	elm->cached = elm->last;
>> +
>> +	relation_close(seqrel, NoLock);
>> +}
>> +
>> +/*
>> + * Update the sequence state by creating a new relfilenode.
>> + *
>> + * This creates a new relfilenode, to allow transactional behavior.
>> + */
>> +static void
>> +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
>> +{
>> +	SeqTable	elm;
>> +	Relation	seqrel;
>> +	Buffer		buf;
>> +	HeapTupleData seqdatatuple;
>> +	Form_pg_sequence_data seq;
>> +	HeapTuple	tuple;
>> +
>> +	/* open and lock sequence */
>> +	init_sequence(seq_relid, &elm, &seqrel);
>> +
>> +	/* lock page' buffer and read tuple */
>> +	seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
>> +
>> +	/* Copy the existing sequence tuple. */
>> +	tuple = heap_copytuple(&seqdatatuple);
>> +
>> +	/* Now we're done with the old page */
>> +	UnlockReleaseBuffer(buf);
>> +
>> +	/*
>> +	 * Modify the copied tuple to update the sequence state (similar to what
>> +	 * ResetSequence does).
>> +	 */
>> +	seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
>> +	seq->last_value = last_value;
>> +	seq->is_called = is_called;
>> +	seq->log_cnt = log_cnt;
>> +
>> +	/*
>> +	 * Create a new storage file for the sequence - this is needed for the
>> +	 * transactional behavior.
>> +	 */
>> +	RelationSetNewRelfilenumber(seqrel, seqrel->rd_rel->relpersistence);
>> +
>> +	/*
>> +	 * Ensure sequence's relfrozenxid is at 0, since it won't contain any
>> +	 * unfrozen XIDs.  Same with relminmxid, since a sequence will never
>> +	 * contain multixacts.
>> +	 */
>> +	Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId);
>> +	Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId);
>> +
>> +	/*
>> +	 * Insert the modified tuple into the new storage file. This does all the
>> +	 * necessary WAL-logging etc.
>> +	 */
>> +	fill_seq_with_data(seqrel, tuple);
>> +
>> +	/* Clear local cache so that we don't think we have cached numbers */
>> +	/* Note that we do not change the currval() state */
>> +	elm->cached = elm->last;
>> +
>> +	relation_close(seqrel, NoLock);
>> +}
>> +
>> +/*
>> + * Set a sequence to a specified internal state.
>> + *
>> + * The change is made transactionally, so that on failure of the current
>> + * transaction, the sequence will be restored to its previous state.
>> + * We do that by creating a whole new relfilenode for the sequence; so this
>> + * works much like the rewriting forms of ALTER TABLE.
>> + *
>> + * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
>> + * which must not be released until end of transaction.  Caller is also
>> + * responsible for permissions checking.
>> + */
>> +void
>> +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called)
>> +{
>> +	if (transactional)
>> +		SetSequence_transactional(seq_relid, last_value, log_cnt, is_called);
>> +	else
>> +		SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called);
>> +}
> 
> That's a lot of duplication with existing code. There's no explanation why
> SetSequence() as well as do_setval() exists.
> 

Thanks, I'll look into this.

> 
>>  /*
>>   * Initialize a sequence's relation with the specified tuple as content
>>   *
>> @@ -406,8 +560,13 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
>>  
>>  	/* check the comment above nextval_internal()'s equivalent call. */
>>  	if (RelationNeedsWAL(rel))
>> +	{
>>  		GetTopTransactionId();
>>  
>> +		if (XLogLogicalInfoActive())
>> +			GetCurrentTransactionId();
>> +	}
> 
> Is it actually possible to reach this without an xid already having been
> assigned for the current xact?
> 

I believe it is. That's probably how I found this change is needed,
actually.

> 
> 
>> @@ -806,10 +966,28 @@ nextval_internal(Oid relid, bool check_permissions)
>>  	 * It's sufficient to ensure the toplevel transaction has an xid, no need
>>  	 * to assign xids subxacts, that'll already trigger an appropriate wait.
>>  	 * (Have to do that here, so we're outside the critical section)
>> +	 *
>> +	 * We have to ensure we have a proper XID, which will be included in
>> +	 * the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
>> +	 * in a subxact (without any preceding changes) would get XID 0, and it
>> +	 * would then be impossible to decide which top xact it belongs to.
>> +	 * It'd also trigger assert in DecodeSequence. We only do that with
>> +	 * wal_level=logical, though.
>> +	 *
>> +	 * XXX This might seem unnecessary, because if there's no XID the xact
>> +	 * couldn't have done anything important yet, e.g. it could not have
>> +	 * created a sequence. But that's incorrect, because of subxacts. The
>> +	 * current subtransaction might not have done anything yet (thus no XID),
>> +	 * but an earlier one might have created the sequence.
>>  	 */
> 
> What about restricting this to the case you're mentioning,
> i.e. subtransactions?
> 

That might work, but I need to think about it a bit.

I don't think it'd save us much, though. I mean, vast majority of
transactions (and subtransactions) calling nextval() will then do
something else which requires a XID. This just moves the XID a bit,
that's all.

> 
>> @@ -845,6 +1023,7 @@ nextval_internal(Oid relid, bool check_permissions)
>>  		seq->log_cnt = 0;
>>  
>>  		xlrec.locator = seqrel->rd_locator;
> 
> I realize this isn't from this patch, but:
> 
> Why do we include the locator in the record? We already have it via
> XLogRegisterBuffer(), no? And afaict we don't even use it, as we read the page
> via XLogInitBufferForRedo() during recovery.
> 
> Kinda looks like an oversight in 2c03216d8311
> 

I don't know, it's what the code did.

> 
> 
> 
>> +/*
>> + * Handle sequence decode
>> + *
>> + * Decoding sequences is a bit tricky, because while most sequence actions
>> + * are non-transactional (not subject to rollback), some need to be handled
>> + * as transactional.
>> + *
>> + * By default, a sequence increment is non-transactional - we must not queue
>> + * it in a transaction as other changes, because the transaction might get
>> + * rolled back and we'd discard the increment. The downstream would not be
>> + * notified about the increment, which is wrong.
>> + *
>> + * On the other hand, the sequence may be created in a transaction. In this
>> + * case we *should* queue the change as other changes in the transaction,
>> + * because we don't want to send the increments for unknown sequence to the
>> + * plugin - it might get confused about which sequence it's related to etc.
>> + */
>> +void
>> +sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>> +{
> 
>> +	/* extract the WAL record, with "created" flag */
>> +	xlrec = (xl_seq_rec *) XLogRecGetData(r);
>> +
>> +	/* XXX how could we have sequence change without data? */
>> +	if(!datalen || !tupledata)
>> +		return;
> 
> Yea, I think we should error out here instead, something has gone quite wrong
> if this happens.
> 

OK

> 
>> +	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
>> +	DecodeSeqTuple(tupledata, datalen, tuplebuf);
>> +
>> +	/*
>> +	 * Should we handle the sequence increment as transactional or not?
>> +	 *
>> +	 * If the sequence was created in a still-running transaction, treat
>> +	 * it as transactional and queue the increments. Otherwise it needs
>> +	 * to be treated as non-transactional, in which case we send it to
>> +	 * the plugin right away.
>> +	 */
>> +	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
>> +														 target_locator,
>> +														 xlrec->created);
> 
> Why re-create this information during decoding, when we basically already have
> it available on the primary? I think we already pay the price for that
> tracking, which we e.g. use for doing a non-transactional truncate:
> 
> 		/*
> 		 * Normally, we need a transaction-safe truncation here.  However, if
> 		 * the table was either created in the current (sub)transaction or has
> 		 * a new relfilenumber in the current (sub)transaction, then we can
> 		 * just truncate it in-place, because a rollback would cause the whole
> 		 * table or the current physical file to be thrown away anyway.
> 		 */
> 		if (rel->rd_createSubid == mySubid ||
> 			rel->rd_newRelfilelocatorSubid == mySubid)
> 		{
> 			/* Immediate, non-rollbackable truncation is OK */
> 			heap_truncate_one_rel(rel);
> 		}
> 
> Afaict we could do something similar for sequences, except that I think we
> would just check if the sequence was created in the current transaction
> (i.e. any of the fields are set).
> 

Hmm, good point.

> 
>> +/*
>> + * A transactional sequence increment is queued to be processed upon commit
>> + * and a non-transactional increment gets processed immediately.
>> + *
>> + * A sequence update may be both transactional and non-transactional. When
>> + * created in a running transaction, treat it as transactional and queue
>> + * the change in it. Otherwise treat it as non-transactional, so that we
>> + * don't forget the increment in case of a rollback.
>> + */
>> +void
>> +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
>> +						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
>> +						   RelFileLocator rlocator, bool transactional, bool created,
>> +						   ReorderBufferTupleBuf *tuplebuf)
> 
> 
>> +		/*
>> +		 * Decoding needs access to syscaches et al., which in turn use
>> +		 * heavyweight locks and such. Thus we need to have enough state around to
>> +		 * keep track of those.  The easiest way is to simply use a transaction
>> +		 * internally.  That also allows us to easily enforce that nothing writes
>> +		 * to the database by checking for xid assignments.
>> +		 *
>> +		 * When we're called via the SQL SRF there's already a transaction
>> +		 * started, so start an explicit subtransaction there.
>> +		 */
>> +		using_subtxn = IsTransactionOrTransactionBlock();
> 
> This duplicates a lot of the code from ReorderBufferProcessTXN(). But only
> does so partially. It's hard to tell whether some of the differences are
> intentional. Could we de-duplicate that code with ReorderBufferProcessTXN()?
> 
> Maybe something like
> 
> void
> ReorderBufferSetupXactEnv(ReorderBufferXactEnv *, bool process_invals);
> 
> void
> ReorderBufferTeardownXactEnv(ReorderBufferXactEnv *, bool is_error);
> 

Thanks for the suggestion, I'll definitely consider that in the next
version of the patch.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company