latch_test.c
text/x-patch
Filename: latch_test.c
Type: text/x-patch
Part: 0
/*
* Simple test program to exercise SetLatch/WaitLatch.
*
* This just passes a "token" flag around among some number of worker
* processes.
*
* To use: compile this into a .so, then execute
*
* CREATE FUNCTION start_worker() RETURNS void AS '/path/to/.so' LANGUAGE c;
*
* Then, for each of up to MAX_WORKERS sessions, execute
*
* psql -c "SELECT start_worker()" dbname &
*
* About one session per physical CPU should be good.
*
* Watch the postmaster log for awhile to confirm nobody drops the ball.
* If anyone does, all the processes will exit with a timeout failure
* after about a minute. If you get bored, "pg_ctl stop -m immediate"
* is the easiest way out.
*
* Be sure to restart the postmaster between attempts, since there's no
* logic for re-initializing a workspace that's already present.
*
* Note: this will NOT work with the Windows implementation of latches, since
* we are cheesy about how we initialize the shared memory. Doesn't matter
* for now, since Windows doesn't run on any machines with weak memory
* ordering anyway.
*/
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "storage/shmem.h"
PG_MODULE_MAGIC;
#define MAX_WORKERS 16
#define BIAS 500000 /* to reduce time-to-failure */
/*
* Arrays have padding to try to ensure active values are in different cache
* lines
*/
typedef struct
{
int flag;
int pad2[4];
int junk;
char pad[333];
} TokStruct;
typedef struct
{
Latch latch;
char pad[333];
} LatchStruct;
typedef struct
{
int num_workers;
TokStruct flags[MAX_WORKERS];
LatchStruct latches[MAX_WORKERS];
} MySharedSpace;
volatile long gsum = 0;
Datum start_worker(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(start_worker);
Datum
start_worker(PG_FUNCTION_ARGS)
{
volatile MySharedSpace *workspace;
bool found;
int i;
int my_number;
long counter = 0;
unsigned short xseed[3];
/* Set up non-interlocked RNG */
xseed[0] = xseed[1] = xseed[2] = 42;
/* Create or access the shared data */
workspace = (MySharedSpace *)
ShmemInitStruct("Latch Testing", sizeof(MySharedSpace), &found);
if (!found)
{
/* I'm the first, initialize */
memset((void *) workspace, 0, sizeof(MySharedSpace));
for (i = 0; i < MAX_WORKERS; i++)
InitSharedLatch(&(workspace->latches[i].latch));
/* Give the token to the first worker (ie, me) */
workspace->flags[0].flag = 1;
}
/*
* Add self to the arrays. This assumes no two would-be workers do it
* concurrently, which should be OK if they're started manually. Note
* that we do NOT initialize flags[my_number], as that was set up
* correctly during the workspace creation above.
*/
my_number = workspace->num_workers;
if (my_number >= MAX_WORKERS)
elog(ERROR, "too many workers");
OwnLatch(&(workspace->latches[my_number].latch));
workspace->num_workers++;
/*
* And play ball ...
*/
for (;;)
{
ResetLatch(&(workspace->latches[my_number].latch));
/* If I have the token ... */
if (workspace->flags[my_number].flag)
{
int next_worker;
int cnt;
/* delete it ... */
workspace->flags[my_number].flag = 0;
/*
* ... and pass it on to the next guy. Note we might fetch a
* stale value of num_workers here, but that's okay, it'll just
* mean the recently-added guy doesn't get the token during this
* cycle.
*/
next_worker = my_number + 1;
if (next_worker >= workspace->num_workers)
next_worker = 0;
workspace->flags[next_worker].flag = 1;
SetLatch(&(workspace->latches[next_worker].latch));
/* Report that we're still alive every so often */
counter++;
if (counter % (256*1024) == 0)
elog(LOG, "worker %d still alive after %ld cycles",
my_number, counter);
CHECK_FOR_INTERRUPTS();
/*
* Compute for a little while, hoping our latch will be set
* again before we wait. The delay is random with a range that
* grows slowly as we run.
*
* While we're at it, create a lot of r/w traffic on the
* TokStructs to try to saturate their cache lines. This is
* needed to cause other CPUs to not see flag updates promptly
* compared to latch updates.
*/
cnt = (counter + BIAS) / 64 +
(int) (pg_erand48(xseed) * ((counter + BIAS) / 64));
for (i = 0; i < cnt; i++)
{
int k = i % MAX_WORKERS;
gsum += workspace->flags[k].flag;
workspace->flags[k].junk++;
}
}
if (!(WaitLatch(&(workspace->latches[my_number].latch),
WL_LATCH_SET | WL_TIMEOUT, 60000000L) & WL_LATCH_SET))
elog(ERROR, "WaitLatch timed out in worker %d after %ld cycles, with flag %d latch %d",
my_number, counter,
workspace->flags[my_number].flag,
workspace->latches[my_number].latch.is_set);
}
PG_RETURN_NULL();
}