/*
 * Simple test program to exercise SetLatch/WaitLatch.
 *
 * This just passes a "token" flag around among some number of worker
 * processes.
 *
 * To use: compile this into a .so, then execute
 *
 * CREATE FUNCTION start_worker() RETURNS void AS '/path/to/.so' LANGUAGE c;
 *
 * Then, for each of up to MAX_WORKERS sessions, execute
 *
 * psql -c "SELECT start_worker()" dbname &
 *
 * About one session per physical CPU should be good.
 *
 * Watch the postmaster log for awhile to confirm nobody drops the ball.
 * If anyone does, all the processes will exit with a timeout failure
 * after about a minute.  If you get bored, "pg_ctl stop -m immediate"
 * is the easiest way out.
 *
 * Be sure to restart the postmaster between attempts, since there's no
 * logic for re-initializing a workspace that's already present.
 *
 * Note: this will NOT work with the Windows implementation of latches, since
 * we are cheesy about how we initialize the shared memory.  Doesn't matter
 * for now, since Windows doesn't run on any machines with weak memory
 * ordering anyway.
 */

#include "postgres.h"

#include "fmgr.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "storage/shmem.h"

PG_MODULE_MAGIC;


#define MAX_WORKERS 16

#define BIAS 500000				/* to reduce time-to-failure */


/*
 * Arrays have padding to try to ensure active values are in different cache
 * lines
 */
typedef struct
{
	int			flag;
	int			pad2[4];
	int			junk;
	char		pad[333];
} TokStruct;

typedef struct
{
	Latch		latch;
	char		pad[333];
} LatchStruct;

typedef struct
{
	int			num_workers;

	TokStruct	flags[MAX_WORKERS];

	LatchStruct	latches[MAX_WORKERS];
} MySharedSpace;

volatile long	gsum = 0;

Datum		start_worker(PG_FUNCTION_ARGS);

PG_FUNCTION_INFO_V1(start_worker);

Datum
start_worker(PG_FUNCTION_ARGS)
{
	volatile MySharedSpace *workspace;
	bool		found;
	int			i;
	int			my_number;
	long		counter = 0;
	unsigned short xseed[3];

	/* Set up non-interlocked RNG */
	xseed[0] = xseed[1] = xseed[2] = 42;

	/* Create or access the shared data */
	workspace = (MySharedSpace *)
		ShmemInitStruct("Latch Testing", sizeof(MySharedSpace), &found);

	if (!found)
	{
		/* I'm the first, initialize */
		memset((void *) workspace, 0, sizeof(MySharedSpace));

		for (i = 0; i < MAX_WORKERS; i++)
			InitSharedLatch(&(workspace->latches[i].latch));

		/* Give the token to the first worker (ie, me) */
		workspace->flags[0].flag = 1;
	}

	/*
	 * Add self to the arrays.  This assumes no two would-be workers do it
	 * concurrently, which should be OK if they're started manually.  Note
	 * that we do NOT initialize flags[my_number], as that was set up
	 * correctly during the workspace creation above.
	 */
	my_number = workspace->num_workers;
	if (my_number >= MAX_WORKERS)
		elog(ERROR, "too many workers");

	OwnLatch(&(workspace->latches[my_number].latch));
	workspace->num_workers++;

	/*
	 * And play ball ...
	 */
	for (;;)
	{
		ResetLatch(&(workspace->latches[my_number].latch));

		/* If I have the token ... */
		if (workspace->flags[my_number].flag)
		{
			int		next_worker;
			int		cnt;

			/* delete it ... */
			workspace->flags[my_number].flag = 0;

			/*
			 * ... and pass it on to the next guy.  Note we might fetch a
			 * stale value of num_workers here, but that's okay, it'll just
			 * mean the recently-added guy doesn't get the token during this
			 * cycle.
			 */
			next_worker = my_number + 1;
			if (next_worker >= workspace->num_workers)
				next_worker = 0;

			workspace->flags[next_worker].flag = 1;

			SetLatch(&(workspace->latches[next_worker].latch));

			/* Report that we're still alive every so often */
			counter++;
			if (counter % (256*1024) == 0)
				elog(LOG, "worker %d still alive after %ld cycles",
					 my_number, counter);

			CHECK_FOR_INTERRUPTS();

			/*
			 * Compute for a little while, hoping our latch will be set
			 * again before we wait.  The delay is random with a range that
			 * grows slowly as we run.
			 *
			 * While we're at it, create a lot of r/w traffic on the
			 * TokStructs to try to saturate their cache lines.  This is
			 * needed to cause other CPUs to not see flag updates promptly
			 * compared to latch updates.
			 */
			cnt = (counter + BIAS) / 64 +
				(int) (pg_erand48(xseed) * ((counter + BIAS) / 64));

			for (i = 0; i < cnt; i++)
			{
				int		k = i % MAX_WORKERS;

				gsum += workspace->flags[k].flag;
				workspace->flags[k].junk++;
			}
		}

		if (!(WaitLatch(&(workspace->latches[my_number].latch),
						WL_LATCH_SET | WL_TIMEOUT, 60000000L) & WL_LATCH_SET))
			elog(ERROR, "WaitLatch timed out in worker %d after %ld cycles, with flag %d latch %d",
				 my_number, counter,
				 workspace->flags[my_number].flag,
				 workspace->latches[my_number].latch.is_set);
	}

	PG_RETURN_NULL();
}
