fix_race_condition_2.diff
application/octet-stream
Filename: fix_race_condition_2.diff
Type: application/octet-stream
Part: 0
diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c
index 72f68ec58ef..a78c8d18be1 100644
--- a/src/backend/replication/logical/logicalctl.c
+++ b/src/backend/replication/logical/logicalctl.c
@@ -256,33 +256,20 @@ write_logical_decoding_status_update_record(bool status)
}
/*
- * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
- * the shared flags to revert the logical decoding activation process.
+ * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding.
+ *
+ * Rather than directly reverting xlog_logical_info here, we request
+ * that the checkpointer handle it via the normal disable path. This
+ * avoids race conditions when multiple backends attempt concurrent
+ * activation: the checkpointer will only reset xlog_logical_info when
+ * no valid logical slots exist, which naturally protects any other
+ * in-progress activation.
*/
static void
abort_logical_decoding_activation(int code, Datum arg)
{
- Assert(MyReplicationSlot);
- Assert(!LogicalDecodingCtl->logical_decoding_enabled);
-
elog(DEBUG1, "aborting logical decoding activation process");
-
- /*
- * Abort the change to xlog_logical_info. We don't need to check
- * CheckLogicalSlotExists() as we're still holding a logical slot.
- */
- LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
- LogicalDecodingCtl->xlog_logical_info = false;
- LWLockRelease(LogicalDecodingControlLock);
-
- /*
- * Some processes might have already started logical info WAL logging, so
- * tell all running processes to update their caches. We don't need to
- * wait for all processes to disable xlog_logical_info locally as it's
- * always safe to write logical information to WAL records, even when not
- * strictly required.
- */
- EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+ RequestDisableLogicalDecoding();
}
/*
@@ -489,16 +476,19 @@ void
DisableLogicalDecoding(void)
{
bool in_recovery = RecoveryInProgress();
+ bool was_enabled;
LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
/*
* Check if we can disable logical decoding.
*
- * Skip CheckLogicalSlotExists() check during recovery because the
- * existing slots will be invalidated after disabling logical decoding.
+ * Nothing to do if both flags are already off, or if valid slots exist
+ * (skip the slot check during recovery because the existing slots will be
+ * invalidated after disabling logical decoding.)
*/
- if (!LogicalDecodingCtl->logical_decoding_enabled ||
+ if ((!LogicalDecodingCtl->logical_decoding_enabled &&
+ !LogicalDecodingCtl->xlog_logical_info) ||
(!in_recovery && CheckLogicalSlotExists()))
{
LogicalDecodingCtl->pending_disable = false;
@@ -506,7 +496,7 @@ DisableLogicalDecoding(void)
return;
}
- START_CRIT_SECTION();
+ was_enabled = LogicalDecodingCtl->logical_decoding_enabled;
/*
* We need to disable logical decoding first and then disable logical
@@ -515,22 +505,27 @@ DisableLogicalDecoding(void)
*/
LogicalDecodingCtl->logical_decoding_enabled = false;
- /* Write the WAL to disable logical decoding on standbys too */
- if (!in_recovery)
- write_logical_decoding_status_update_record(false);
+ if (was_enabled)
+ {
+ START_CRIT_SECTION();
+
+ /* Write the WAL to disable logical decoding on standbys too */
+ if (!in_recovery)
+ write_logical_decoding_status_update_record(false);
+
+ END_CRIT_SECTION();
+ }
/* Now disable logical information WAL logging */
LogicalDecodingCtl->xlog_logical_info = false;
LogicalDecodingCtl->pending_disable = false;
- END_CRIT_SECTION();
+ LWLockRelease(LogicalDecodingControlLock);
- if (!in_recovery)
+ if (!in_recovery && was_enabled)
ereport(LOG,
errmsg("logical decoding is disabled because there are no valid logical replication slots"));
- LWLockRelease(LogicalDecodingControlLock);
-
/*
* Tell all running processes to reflect the xlog_logical_info update.
* Unlike when enabling logical decoding, we don't need to wait for all
diff --git a/src/test/recovery/t/051_effective_wal_level.pl b/src/test/recovery/t/051_effective_wal_level.pl
index c862073c34e..416acac9fac 100644
--- a/src/test/recovery/t/051_effective_wal_level.pl
+++ b/src/test/recovery/t/051_effective_wal_level.pl
@@ -410,8 +410,46 @@ select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled
# Verify that the backend aborted the activation process.
$primary->wait_for_log("aborting logical decoding activation process");
- test_wal_level($primary, "replica|replica",
- "the activation process aborted");
+ wait_for_logical_decoding_disabled($primary);
+ pass("the activation process aborted");
+
+ # Test concurrent activation processes run and one is interrupted.
+ $psql_create_slot = $primary->background_psql('postgres');
+
+ # Start a psql session and stops in the middle of the activation
+ # process.
+ $psql_create_slot->query_until(
+ qr/create_slot_canceled/,
+ q(\echo create_slot_canceled
+select injection_points_set_local();
+select injection_points_attach('logical-decoding-activation', 'wait');
+select pg_create_logical_replication_slot('slot_canceled2', 'pgoutput');
+\q
+));
+ $primary->wait_for_event('client backend', 'logical-decoding-activation');
+ note("injection_point 'logical-decoding-activation' is reached");
+
+ # Another backend concurrently enables the logical decoding.
+ $primary->safe_psql('postgres',
+ qq[select pg_create_logical_replication_slot('test_slot2', 'pgoutput')]);
+
+ # Concurrent slot creation should not be blocked. So wait until
+ # test_slot2 is created and logical decoding is enabled.
+ $primary->wait_for_log("logical decoding is enabled upon creating a new logical replication slot");
+ test_wal_level($primary, "replica|logical",
+ "the concurrent activation has done properly");
+
+ # Cancel the backend initiated by $psql_create_slot, aborting its activation
+ # process.
+ $primary->safe_psql(
+ 'postgres',
+ qq[
+select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled2' and pid <> pg_backend_pid()
+]);
+ # Canceling the backend should not affect the concurrent slot creation.
+ $primary->wait_for_log("canceling statement due to user request");
+ test_wal_level($primary, "replica|logical",
+ "the concurrent activation interrupt is handled properly");
}
$primary->stop;