0001-injection_points-Switch-wait-wakeup-to-rely-on-atomi.patch
text/plain
Filename: 0001-injection_points-Switch-wait-wakeup-to-rely-on-atomi.patch
Type: text/plain
Part: 0
From 038b0f55dfe80f168fdc1b01b8cdadbf38fedfa2 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Thu, 28 May 2026 11:15:33 +0900
Subject: [PATCH] injection_points: Switch wait/wakeup to rely on atomics
This removes the dependency based on counters and environment variables,
replacing the waiting loop by a wait on an atomic counter, whose check
increases over time in an exponential manner (starts at 10us, up to
100ms).
---
.../injection_points/injection_points.c | 53 ++++++++++---------
1 file changed, 27 insertions(+), 26 deletions(-)
diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c
index ba282e3dcabf..9b8e1aaad0b0 100644
--- a/src/test/modules/injection_points/injection_points.c
+++ b/src/test/modules/injection_points/injection_points.c
@@ -23,11 +23,11 @@
#include "miscadmin.h"
#include "nodes/pg_list.h"
#include "nodes/value.h"
-#include "storage/condition_variable.h"
#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
+#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/injection_point.h"
@@ -59,13 +59,10 @@ typedef struct InjectionPointSharedState
slock_t lock;
/* Counters advancing when injection_points_wakeup() is called */
- uint32 wait_counts[INJ_MAX_WAIT];
+ pg_atomic_uint32 wait_counts[INJ_MAX_WAIT];
/* Names of injection points attached to wait counters */
char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
-
- /* Condition variable used for waits and wakeups */
- ConditionVariable wait_point;
} InjectionPointSharedState;
/* Pointer to shared-memory state. */
@@ -102,9 +99,9 @@ injection_point_init_state(void *ptr, void *arg)
InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
SpinLockInit(&state->lock);
- memset(state->wait_counts, 0, sizeof(state->wait_counts));
memset(state->name, 0, sizeof(state->name));
- ConditionVariableInit(&state->wait_point);
+ for (int i = 0; i < INJ_MAX_WAIT; i++)
+ pg_atomic_init_u32(&state->wait_counts[i], 0);
}
static void
@@ -222,7 +219,7 @@ injection_notice(const char *name, const void *private_data, void *arg)
elog(NOTICE, "notice triggered for injection point %s", name);
}
-/* Wait on a condition variable, awaken by injection_points_wakeup() */
+/* Wait until injection_points_wakeup() is called */
void
injection_wait(const char *name, const void *private_data, void *arg)
{
@@ -254,31 +251,37 @@ injection_wait(const char *name, const void *private_data, void *arg)
{
index = i;
strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
- old_wait_counts = inj_state->wait_counts[i];
+ old_wait_counts = pg_atomic_read_u32(&inj_state->wait_counts[i]);
break;
}
}
SpinLockRelease(&inj_state->lock);
if (index < 0)
- elog(ERROR, "could not find free slot for wait of injection point %s ",
+ elog(ERROR, "could not find free slot for wait of injection point %s",
name);
- /* And sleep.. */
- ConditionVariablePrepareToSleep(&inj_state->wait_point);
- for (;;)
+ /*
+ * Wait until the counter is bumped by injection_points_wakeup().
+ *
+ * This loop starts with a short delay for responsiveness, enlarged to
+ * ease the CPU workload in slower environments.
+ */
+#define INJ_WAIT_INITIAL_US 10 /* 10us */
+#define INJ_WAIT_MAX_US 100000 /* 100ms */
+ pgstat_report_wait_start(injection_wait_event);
{
- uint32 new_wait_counts;
+ int delay_us = INJ_WAIT_INITIAL_US;
- SpinLockAcquire(&inj_state->lock);
- new_wait_counts = inj_state->wait_counts[index];
- SpinLockRelease(&inj_state->lock);
-
- if (old_wait_counts != new_wait_counts)
- break;
- ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
+ while (pg_atomic_read_u32(&inj_state->wait_counts[index]) == old_wait_counts)
+ {
+ CHECK_FOR_INTERRUPTS();
+ pg_usleep(delay_us);
+ if (delay_us < INJ_WAIT_MAX_US)
+ delay_us *= 2;
+ }
}
- ConditionVariableCancelSleep();
+ pgstat_report_wait_end();
/* Remove this injection point from the waiters. */
SpinLockAcquire(&inj_state->lock);
@@ -443,7 +446,7 @@ injection_points_wakeup(PG_FUNCTION_ARGS)
if (inj_state == NULL)
injection_init_shmem();
- /* First bump the wait counter for the injection point to wake up */
+ /* Find the injection point then bump its wait counter */
SpinLockAcquire(&inj_state->lock);
for (int i = 0; i < INJ_MAX_WAIT; i++)
{
@@ -458,11 +461,9 @@ injection_points_wakeup(PG_FUNCTION_ARGS)
SpinLockRelease(&inj_state->lock);
elog(ERROR, "could not find injection point %s to wake up", name);
}
- inj_state->wait_counts[index]++;
SpinLockRelease(&inj_state->lock);
- /* And broadcast the change to the waiters */
- ConditionVariableBroadcast(&inj_state->wait_point);
+ pg_atomic_fetch_add_u32(&inj_state->wait_counts[index], 1);
PG_RETURN_VOID();
}
--
2.54.0