race.c

text/plain

Filename: race.c
Type: text/plain
Part: 0
Message: Re: Non-reproducible AIO failure
#include <assert.h>
#include <pthread.h>


#define read_barrier()		__atomic_thread_fence(__ATOMIC_ACQUIRE)
#define write_barrier()		__atomic_thread_fence(__ATOMIC_RELEASE)

enum PgAioOp
{
	/* intentionally the zero value, to help catch zeroed memory etc */
	PGAIO_OP_INVALID = 0,
	PGAIO_OP_READV = 1,
};

enum PgAioTargetID
{
	/* intentionally the zero value, to help catch zeroed memory etc */
	PGAIO_TID_INVALID = 0,
	PGAIO_TID_SMGR,
} PgAioTargetID;

enum PgAioHandleState
{
	/* not in use */
	PGAIO_HS_IDLE = 0,

	/*
	 * Returned by pgaio_io_acquire(). The next state is either DEFINED (if
	 * pgaio_io_start_*() is called), or IDLE (if pgaio_io_release() is
	 * called).
	 */
	PGAIO_HS_HANDED_OUT,

	/*
	 * pgaio_io_start_*() has been called, but IO is not yet staged. At this
	 * point the handle has all the information for the IO to be executed.
	 */
	PGAIO_HS_DEFINED,

	/*
	 * stage() callbacks have been called, handle ready to be submitted for
	 * execution. Unless in batchmode (see c.f. pgaio_enter_batchmode()), the
	 * IO will be submitted immediately after.
	 */
	PGAIO_HS_STAGED,

	/* IO has been submitted to the IO method for execution */
	PGAIO_HS_SUBMITTED,

	/* IO finished, but result has not yet been processed */
	PGAIO_HS_COMPLETED_IO,

	/*
	 * IO completed, shared completion has been called.
	 *
	 * If the IO completion occurs in the issuing backend, local callbacks
	 * will immediately be called. Otherwise the handle stays in
	 * COMPLETED_SHARED until the issuing backend waits for the completion of
	 * the IO.
	 */
	PGAIO_HS_COMPLETED_SHARED,

	/*
	 * IO completed, local completion has been called.
	 *
	 * After this the handle will be made reusable and go into IDLE state.
	 */
	PGAIO_HS_COMPLETED_LOCAL,
} PgAioHandleState;


typedef struct PgAioHandle
{
	/* all state updates should go through pgaio_io_update_state() */
	int state:8;

	/* what are we operating on */
	int target:8;

	/* which IO operation */
	int op:8;
} PgAioHandle;


void update_state(PgAioHandle* ioh, int new_state)
{
	write_barrier();
	ioh->state = new_state;
}

void reclaim(PgAioHandle* ioh)
{
	assert(ioh->state != PGAIO_HS_IDLE);
	if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
	{
		// read_barrier();
		update_state(ioh, PGAIO_HS_COMPLETED_LOCAL);
	}
	update_state(ioh, PGAIO_HS_IDLE);
	ioh->op = PGAIO_OP_INVALID;
	ioh->target = PGAIO_TID_INVALID;
}


void wait_for_free(PgAioHandle* ioh)
{
	while (1)
	{
		if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
		{
			read_barrier();
			reclaim(ioh);
			return;
		}
	}
}

void* io_thread(void* arg)
{
	PgAioHandle* ioh = (PgAioHandle*)arg;
	while (1)
	{
		while (ioh->state != PGAIO_HS_SUBMITTED);
		read_barrier();
		assert(ioh->op != PGAIO_OP_INVALID);
		update_state(ioh, PGAIO_HS_COMPLETED_SHARED);
	}
}


int main()
{
	PgAioHandle ioh = {PGAIO_HS_IDLE, PGAIO_TID_INVALID, PGAIO_OP_INVALID};
	pthread_t thread;
	pthread_create(&thread, 0, &io_thread, &ioh);
	while (1) {
		assert(ioh.op == PGAIO_OP_INVALID);
		assert(ioh.state == PGAIO_HS_IDLE);
		ioh.op = PGAIO_OP_READV;
		update_state(&ioh, PGAIO_HS_SUBMITTED);
		wait_for_free(&ioh);
	}
}