Re: logical decoding and replication of sequences, take 2

Tomas Vondra <tomas.vondra@enterprisedb.com>

From: Tomas Vondra <tomas.vondra@enterprisedb.com>
To: Andres Freund <andres@anarazel.de>
Cc: Robert Haas <robertmhaas@gmail.com>, PostgreSQL Hackers <pgsql-hackers@lists.postgresql.org>, Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: 2023-01-15T23:18:54Z
Lists: pgsql-hackers

Commits

Same data as JSON: GET /api/v1/messages/:b64id/commits the thread's linked commits as JSON, with link sources. API reference →
  1. Migrate logical slots to the new node during an upgrade.

  2. Make test_decoding ddl.out shorter

  3. Fix snapshot handling in logicalmsg_decode

  4. doc: Adjust a few more references to "postmaster"

  5. Revert "Logical decoding of sequences"

Attachments

cfbot didn't like the rebased / split patch, and after looking at it I
believe it's a bug in parallel apply of large transactions (216a784829),
which seems to have changed interpretation of in_remote_transaction and
in_streamed_transaction. I've reported the issue on that thread [1], but
here's a version with a temporary workaround so that we can continue
reviewing it.

regards

[1]
https://www.postgresql.org/message-id/984ff689-adde-9977-affe-cd6029e850be%40enterprisedb.com

On 1/15/23 00:39, Tomas Vondra wrote:
> Hi,
> 
> here's a slightly updated version - the main change is splitting the
> patch into multiple parts, along the lines of the original patch
> reverted in 2c7ea57e56ca5f668c32d4266e0a3e45b455bef5:
> 
> - basic sequence decoding infrastructure
> - support in test_decoding
> - support in built-in logical replication
> 
> The revert mentions a couple additional parts, but those were mostly
> fixes / improvements. And those are not merged into the three parts.
> 
> 
> On 1/11/23 22:46, Tomas Vondra wrote:
>>
>>> ...
>>>
>>>> +/*
>>>> + * Update the sequence state by modifying the existing sequence data row.
>>>> + *
>>>> + * This keeps the same relfilenode, so the behavior is non-transactional.
>>>> + */
>>>> +static void
>>>> +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called)
>>>> +{
>>>> ...
>>>>
>>>> +void
>>>> +SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called)
>>>> +{
>>>> +	if (transactional)
>>>> +		SetSequence_transactional(seq_relid, last_value, log_cnt, is_called);
>>>> +	else
>>>> +		SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called);
>>>> +}
>>>
>>> That's a lot of duplication with existing code. There's no explanation why
>>> SetSequence() as well as do_setval() exists.
>>>
>>
>> Thanks, I'll look into this.
>>
> 
> I haven't done anything about this yet. The functions are doing similar
> things, but there's also a fair amount of differences so I haven't found
> a good way to merge them yet.
> 
>>>
>>>>  /*
>>>>   * Initialize a sequence's relation with the specified tuple as content
>>>>   *
>>>> @@ -406,8 +560,13 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
>>>>  
>>>>  	/* check the comment above nextval_internal()'s equivalent call. */
>>>>  	if (RelationNeedsWAL(rel))
>>>> +	{
>>>>  		GetTopTransactionId();
>>>>  
>>>> +		if (XLogLogicalInfoActive())
>>>> +			GetCurrentTransactionId();
>>>> +	}
>>>
>>> Is it actually possible to reach this without an xid already having been
>>> assigned for the current xact?
>>>
>>
>> I believe it is. That's probably how I found this change is needed,
>> actually.
>>
> 
> I've added a comment explaining why this needed. I don't think it's
> worth trying to optimize this, because in plausible workloads we'd just
> delay the work a little bit.
> 
>>>
>>>
>>>> @@ -806,10 +966,28 @@ nextval_internal(Oid relid, bool check_permissions)
>>>>  	 * It's sufficient to ensure the toplevel transaction has an xid, no need
>>>>  	 * to assign xids subxacts, that'll already trigger an appropriate wait.
>>>>  	 * (Have to do that here, so we're outside the critical section)
>>>> +	 *
>>>> +	 * We have to ensure we have a proper XID, which will be included in
>>>> +	 * the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
>>>> +	 * in a subxact (without any preceding changes) would get XID 0, and it
>>>> +	 * would then be impossible to decide which top xact it belongs to.
>>>> +	 * It'd also trigger assert in DecodeSequence. We only do that with
>>>> +	 * wal_level=logical, though.
>>>> +	 *
>>>> +	 * XXX This might seem unnecessary, because if there's no XID the xact
>>>> +	 * couldn't have done anything important yet, e.g. it could not have
>>>> +	 * created a sequence. But that's incorrect, because of subxacts. The
>>>> +	 * current subtransaction might not have done anything yet (thus no XID),
>>>> +	 * but an earlier one might have created the sequence.
>>>>  	 */
>>>
>>> What about restricting this to the case you're mentioning,
>>> i.e. subtransactions?
>>>
>>
>> That might work, but I need to think about it a bit.
>>
>> I don't think it'd save us much, though. I mean, vast majority of
>> transactions (and subtransactions) calling nextval() will then do
>> something else which requires a XID. This just moves the XID a bit,
>> that's all.
>>
> 
> After thinking about this a bit more, I don't think the optimization is
> worth it, for the reasons explained above.
> 
>>>
>>>> +/*
>>>> + * Handle sequence decode
>>>> + *
>>>> + * Decoding sequences is a bit tricky, because while most sequence actions
>>>> + * are non-transactional (not subject to rollback), some need to be handled
>>>> + * as transactional.
>>>> + *
>>>> + * By default, a sequence increment is non-transactional - we must not queue
>>>> + * it in a transaction as other changes, because the transaction might get
>>>> + * rolled back and we'd discard the increment. The downstream would not be
>>>> + * notified about the increment, which is wrong.
>>>> + *
>>>> + * On the other hand, the sequence may be created in a transaction. In this
>>>> + * case we *should* queue the change as other changes in the transaction,
>>>> + * because we don't want to send the increments for unknown sequence to the
>>>> + * plugin - it might get confused about which sequence it's related to etc.
>>>> + */
>>>> +void
>>>> +sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>>>> +{
>>>
>>>> +	/* extract the WAL record, with "created" flag */
>>>> +	xlrec = (xl_seq_rec *) XLogRecGetData(r);
>>>> +
>>>> +	/* XXX how could we have sequence change without data? */
>>>> +	if(!datalen || !tupledata)
>>>> +		return;
>>>
>>> Yea, I think we should error out here instead, something has gone quite wrong
>>> if this happens.
>>>
>>
>> OK
>>
> 
> Done.
> 
>>>
>>>> +	tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
>>>> +	DecodeSeqTuple(tupledata, datalen, tuplebuf);
>>>> +
>>>> +	/*
>>>> +	 * Should we handle the sequence increment as transactional or not?
>>>> +	 *
>>>> +	 * If the sequence was created in a still-running transaction, treat
>>>> +	 * it as transactional and queue the increments. Otherwise it needs
>>>> +	 * to be treated as non-transactional, in which case we send it to
>>>> +	 * the plugin right away.
>>>> +	 */
>>>> +	transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
>>>> +														 target_locator,
>>>> +														 xlrec->created);
>>>
>>> Why re-create this information during decoding, when we basically already have
>>> it available on the primary? I think we already pay the price for that
>>> tracking, which we e.g. use for doing a non-transactional truncate:
>>>
>>> 		/*
>>> 		 * Normally, we need a transaction-safe truncation here.  However, if
>>> 		 * the table was either created in the current (sub)transaction or has
>>> 		 * a new relfilenumber in the current (sub)transaction, then we can
>>> 		 * just truncate it in-place, because a rollback would cause the whole
>>> 		 * table or the current physical file to be thrown away anyway.
>>> 		 */
>>> 		if (rel->rd_createSubid == mySubid ||
>>> 			rel->rd_newRelfilelocatorSubid == mySubid)
>>> 		{
>>> 			/* Immediate, non-rollbackable truncation is OK */
>>> 			heap_truncate_one_rel(rel);
>>> 		}
>>>
>>> Afaict we could do something similar for sequences, except that I think we
>>> would just check if the sequence was created in the current transaction
>>> (i.e. any of the fields are set).
>>>
>>
>> Hmm, good point.
>>
> 
> But rd_createSubid/rd_newRelfilelocatorSubid fields are available only
> in the original transaction, not during decoding. So we'd have to do
> this check there and add the result to the WAL record. Is that what you
> had in mind?
> 
>>>
>>>> +/*
>>>> + * A transactional sequence increment is queued to be processed upon commit
>>>> + * and a non-transactional increment gets processed immediately.
>>>> + *
>>>> + * A sequence update may be both transactional and non-transactional. When
>>>> + * created in a running transaction, treat it as transactional and queue
>>>> + * the change in it. Otherwise treat it as non-transactional, so that we
>>>> + * don't forget the increment in case of a rollback.
>>>> + */
>>>> +void
>>>> +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
>>>> +						   Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
>>>> +						   RelFileLocator rlocator, bool transactional, bool created,
>>>> +						   ReorderBufferTupleBuf *tuplebuf)
>>>
>>>
>>>> +		/*
>>>> +		 * Decoding needs access to syscaches et al., which in turn use
>>>> +		 * heavyweight locks and such. Thus we need to have enough state around to
>>>> +		 * keep track of those.  The easiest way is to simply use a transaction
>>>> +		 * internally.  That also allows us to easily enforce that nothing writes
>>>> +		 * to the database by checking for xid assignments.
>>>> +		 *
>>>> +		 * When we're called via the SQL SRF there's already a transaction
>>>> +		 * started, so start an explicit subtransaction there.
>>>> +		 */
>>>> +		using_subtxn = IsTransactionOrTransactionBlock();
>>>
>>> This duplicates a lot of the code from ReorderBufferProcessTXN(). But only
>>> does so partially. It's hard to tell whether some of the differences are
>>> intentional. Could we de-duplicate that code with ReorderBufferProcessTXN()?
>>>
>>> Maybe something like
>>>
>>> void
>>> ReorderBufferSetupXactEnv(ReorderBufferXactEnv *, bool process_invals);
>>>
>>> void
>>> ReorderBufferTeardownXactEnv(ReorderBufferXactEnv *, bool is_error);
>>>
>>
>> Thanks for the suggestion, I'll definitely consider that in the next
>> version of the patch.
> 
> I did look at the code a bit, but I'm not sure there really is a lot of
> duplicated code - yes, we start/abort the (sub)transaction, setup and
> tear down the snapshot, etc. Or what else would you put into the two new
> functions?
> 
> 
> regards
> 

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company