fix_race_condition.patch

application/x-patch

Filename: fix_race_condition.patch
Type: application/x-patch
Part: 0
Message: Re: Fix race during concurrent logical decoding activation
diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c
index 72f68ec58ef..c637e9f6358 100644
--- a/src/backend/replication/logical/logicalctl.c
+++ b/src/backend/replication/logical/logicalctl.c
@@ -256,33 +256,20 @@ write_logical_decoding_status_update_record(bool status)
 }
 
 /*
- * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
- * the shared flags to revert the logical decoding activation process.
+ * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding.
+ *
+ * Rather than directly reverting xlog_logical_info here, we request
+ * that the checkpointer handle it via the normal disable path. This
+ * avoids race conditions when multiple backends attempt concurrent
+ * activation: the checkpointer will only reset xlog_logical_info when
+ * no valid logical slots exist, which naturally protects any other
+ * in-progress activation.
  */
 static void
 abort_logical_decoding_activation(int code, Datum arg)
 {
-	Assert(MyReplicationSlot);
-	Assert(!LogicalDecodingCtl->logical_decoding_enabled);
-
 	elog(DEBUG1, "aborting logical decoding activation process");
-
-	/*
-	 * Abort the change to xlog_logical_info. We don't need to check
-	 * CheckLogicalSlotExists() as we're still holding a logical slot.
-	 */
-	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
-	LogicalDecodingCtl->xlog_logical_info = false;
-	LWLockRelease(LogicalDecodingControlLock);
-
-	/*
-	 * Some processes might have already started logical info WAL logging, so
-	 * tell all running processes to update their caches. We don't need to
-	 * wait for all processes to disable xlog_logical_info locally as it's
-	 * always safe to write logical information to WAL records, even when not
-	 * strictly required.
-	 */
-	EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+	RequestDisableLogicalDecoding();
 }
 
 /*
@@ -489,16 +476,19 @@ void
 DisableLogicalDecoding(void)
 {
 	bool		in_recovery = RecoveryInProgress();
+	bool		was_enabled;
 
 	LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
 
 	/*
 	 * Check if we can disable logical decoding.
 	 *
-	 * Skip CheckLogicalSlotExists() check during recovery because the
-	 * existing slots will be invalidated after disabling logical decoding.
+	 * Nothing to do if both flags are already off, or if valid slots
+	 * exist (skip the slot check during recovery because the existing
+	 * slots will be invalidated after disabling logical decoding.)
 	 */
-	if (!LogicalDecodingCtl->logical_decoding_enabled ||
+	if ((!LogicalDecodingCtl->logical_decoding_enabled &&
+		 !LogicalDecodingCtl->xlog_logical_info) ||
 		(!in_recovery && CheckLogicalSlotExists()))
 	{
 		LogicalDecodingCtl->pending_disable = false;
@@ -506,26 +496,41 @@ DisableLogicalDecoding(void)
 		return;
 	}
 
-	START_CRIT_SECTION();
+	was_enabled = LogicalDecodingCtl->logical_decoding_enabled;
 
-	/*
-	 * We need to disable logical decoding first and then disable logical
-	 * information WAL logging in order to ensure that no logical decoding
-	 * processes WAL records with insufficient information.
-	 */
-	LogicalDecodingCtl->logical_decoding_enabled = false;
+	if (was_enabled)
+	{
+		START_CRIT_SECTION();
 
-	/* Write the WAL to disable logical decoding on standbys too */
-	if (!in_recovery)
-		write_logical_decoding_status_update_record(false);
+		/*
+		 * We need to disable logical decoding first and then disable logical
+		 * information WAL logging in order to ensure that no logical decoding
+		 * processes WAL records with insufficient information.
+		 */
+		LogicalDecodingCtl->logical_decoding_enabled = false;
 
-	/* Now disable logical information WAL logging */
-	LogicalDecodingCtl->xlog_logical_info = false;
-	LogicalDecodingCtl->pending_disable = false;
+		/* Write the WAL to disable logical decoding on standbys too */
+		if (!in_recovery)
+			write_logical_decoding_status_update_record(false);
 
-	END_CRIT_SECTION();
+		/* Now disable logical information WAL logging */
+		LogicalDecodingCtl->xlog_logical_info = false;
+		LogicalDecodingCtl->pending_disable = false;
 
-	if (!in_recovery)
+		END_CRIT_SECTION();
+	}
+	else
+	{
+		/*
+		 * Logical decoding is disabled while xlog_logical_info is true.
+		 * This could happen if an activation is interrupted. Reset only
+		 * xlog_logical_info.
+		 */
+		LogicalDecodingCtl->xlog_logical_info = false;
+		LogicalDecodingCtl->logical_decoding_enabled = false;
+	}
+
+	if (!in_recovery && was_enabled)
 		ereport(LOG,
 				errmsg("logical decoding is disabled because there are no valid logical replication slots"));
 
diff --git a/src/test/recovery/t/051_effective_wal_level.pl b/src/test/recovery/t/051_effective_wal_level.pl
index c862073c34e..8e01ef4e45e 100644
--- a/src/test/recovery/t/051_effective_wal_level.pl
+++ b/src/test/recovery/t/051_effective_wal_level.pl
@@ -410,8 +410,39 @@ select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled
 
 	# Verify that the backend aborted the activation process.
 	$primary->wait_for_log("aborting logical decoding activation process");
-	test_wal_level($primary, "replica|replica",
-		"the activation process aborted");
+	wait_for_logical_decoding_disabled($primary);
+	is(1, 1, "the activation process aborted");
+
+	# Test concurrent activation processes run and one is interrupted.
+	$psql_create_slot = $primary->background_psql('postgres');
+
+	# Start a psql session and stops in the middle of the activation
+	# process.
+	$psql_create_slot->query_until(
+		qr/create_slot_canceled/,
+		q(\echo create_slot_canceled
+select injection_points_set_local();
+select injection_points_attach('logical-decoding-activation', 'wait');
+select pg_create_logical_replication_slot('slot_canceled2', 'pgoutput');
+\q
+));
+	$primary->wait_for_event('client backend', 'logical-decoding-activation');
+	note("injection_point 'logical-decoding-activation' is reached");
+
+	# Another backend concurrently enables the logical decoding.
+	$primary->safe_psql('postgres',
+						qq[select pg_create_logical_replication_slot('test_slot2', 'pgoutput')]);
+
+	# Cancel the backend initiated by $psql_create_slot, aborting its activation
+	# process.
+	$primary->safe_psql(
+		'postgres',
+		qq[
+select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled2' and pid <> pg_backend_pid()
+]);
+
+	test_wal_level($primary, "replica|logical",
+				   "the concurrent activation interrupt is handled property");
 }
 
 $primary->stop;