0001-injection_points-Switch-wait-wakeup-to-rely-on-atomi.patch

text/plain

Filename: 0001-injection_points-Switch-wait-wakeup-to-rely-on-atomi.patch
Type: text/plain
Part: 0
Message: injection_points: Switch wait/wakeup to use atomics rather than latches
From 038b0f55dfe80f168fdc1b01b8cdadbf38fedfa2 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Thu, 28 May 2026 11:15:33 +0900
Subject: [PATCH] injection_points: Switch wait/wakeup to rely on atomics

This removes the dependency based on counters and environment variables,
replacing the waiting loop by a wait on an atomic counter, whose check
increases over time in an exponential manner (starts at 10us, up to
100ms).
---
 .../injection_points/injection_points.c       | 53 ++++++++++---------
 1 file changed, 27 insertions(+), 26 deletions(-)

diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c
index ba282e3dcabf..9b8e1aaad0b0 100644
--- a/src/test/modules/injection_points/injection_points.c
+++ b/src/test/modules/injection_points/injection_points.c
@@ -23,11 +23,11 @@
 #include "miscadmin.h"
 #include "nodes/pg_list.h"
 #include "nodes/value.h"
-#include "storage/condition_variable.h"
 #include "storage/dsm_registry.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
+#include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/injection_point.h"
@@ -59,13 +59,10 @@ typedef struct InjectionPointSharedState
 	slock_t		lock;
 
 	/* Counters advancing when injection_points_wakeup() is called */
-	uint32		wait_counts[INJ_MAX_WAIT];
+	pg_atomic_uint32 wait_counts[INJ_MAX_WAIT];
 
 	/* Names of injection points attached to wait counters */
 	char		name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
-
-	/* Condition variable used for waits and wakeups */
-	ConditionVariable wait_point;
 } InjectionPointSharedState;
 
 /* Pointer to shared-memory state. */
@@ -102,9 +99,9 @@ injection_point_init_state(void *ptr, void *arg)
 	InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
 
 	SpinLockInit(&state->lock);
-	memset(state->wait_counts, 0, sizeof(state->wait_counts));
 	memset(state->name, 0, sizeof(state->name));
-	ConditionVariableInit(&state->wait_point);
+	for (int i = 0; i < INJ_MAX_WAIT; i++)
+		pg_atomic_init_u32(&state->wait_counts[i], 0);
 }
 
 static void
@@ -222,7 +219,7 @@ injection_notice(const char *name, const void *private_data, void *arg)
 		elog(NOTICE, "notice triggered for injection point %s", name);
 }
 
-/* Wait on a condition variable, awaken by injection_points_wakeup() */
+/* Wait until injection_points_wakeup() is called */
 void
 injection_wait(const char *name, const void *private_data, void *arg)
 {
@@ -254,31 +251,37 @@ injection_wait(const char *name, const void *private_data, void *arg)
 		{
 			index = i;
 			strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
-			old_wait_counts = inj_state->wait_counts[i];
+			old_wait_counts = pg_atomic_read_u32(&inj_state->wait_counts[i]);
 			break;
 		}
 	}
 	SpinLockRelease(&inj_state->lock);
 
 	if (index < 0)
-		elog(ERROR, "could not find free slot for wait of injection point %s ",
+		elog(ERROR, "could not find free slot for wait of injection point %s",
 			 name);
 
-	/* And sleep.. */
-	ConditionVariablePrepareToSleep(&inj_state->wait_point);
-	for (;;)
+	/*
+	 * Wait until the counter is bumped by injection_points_wakeup().
+	 *
+	 * This loop starts with a short delay for responsiveness, enlarged to
+	 * ease the CPU workload in slower environments.
+	 */
+#define INJ_WAIT_INITIAL_US		10	/* 10us */
+#define INJ_WAIT_MAX_US			100000	/* 100ms */
+	pgstat_report_wait_start(injection_wait_event);
 	{
-		uint32		new_wait_counts;
+		int			delay_us = INJ_WAIT_INITIAL_US;
 
-		SpinLockAcquire(&inj_state->lock);
-		new_wait_counts = inj_state->wait_counts[index];
-		SpinLockRelease(&inj_state->lock);
-
-		if (old_wait_counts != new_wait_counts)
-			break;
-		ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
+		while (pg_atomic_read_u32(&inj_state->wait_counts[index]) == old_wait_counts)
+		{
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(delay_us);
+			if (delay_us < INJ_WAIT_MAX_US)
+				delay_us *= 2;
+		}
 	}
-	ConditionVariableCancelSleep();
+	pgstat_report_wait_end();
 
 	/* Remove this injection point from the waiters. */
 	SpinLockAcquire(&inj_state->lock);
@@ -443,7 +446,7 @@ injection_points_wakeup(PG_FUNCTION_ARGS)
 	if (inj_state == NULL)
 		injection_init_shmem();
 
-	/* First bump the wait counter for the injection point to wake up */
+	/* Find the injection point then bump its wait counter */
 	SpinLockAcquire(&inj_state->lock);
 	for (int i = 0; i < INJ_MAX_WAIT; i++)
 	{
@@ -458,11 +461,9 @@ injection_points_wakeup(PG_FUNCTION_ARGS)
 		SpinLockRelease(&inj_state->lock);
 		elog(ERROR, "could not find injection point %s to wake up", name);
 	}
-	inj_state->wait_counts[index]++;
 	SpinLockRelease(&inj_state->lock);
 
-	/* And broadcast the change to the waiters */
-	ConditionVariableBroadcast(&inj_state->wait_point);
+	pg_atomic_fetch_add_u32(&inj_state->wait_counts[index], 1);
 	PG_RETURN_VOID();
 }
 
-- 
2.54.0