race.c
text/plain
#include <assert.h>
#include <pthread.h>
#define read_barrier() __atomic_thread_fence(__ATOMIC_ACQUIRE)
#define write_barrier() __atomic_thread_fence(__ATOMIC_RELEASE)
enum PgAioOp
{
/* intentionally the zero value, to help catch zeroed memory etc */
PGAIO_OP_INVALID = 0,
PGAIO_OP_READV = 1,
};
enum PgAioTargetID
{
/* intentionally the zero value, to help catch zeroed memory etc */
PGAIO_TID_INVALID = 0,
PGAIO_TID_SMGR,
} PgAioTargetID;
enum PgAioHandleState
{
/* not in use */
PGAIO_HS_IDLE = 0,
/*
* Returned by pgaio_io_acquire(). The next state is either DEFINED (if
* pgaio_io_start_*() is called), or IDLE (if pgaio_io_release() is
* called).
*/
PGAIO_HS_HANDED_OUT,
/*
* pgaio_io_start_*() has been called, but IO is not yet staged. At this
* point the handle has all the information for the IO to be executed.
*/
PGAIO_HS_DEFINED,
/*
* stage() callbacks have been called, handle ready to be submitted for
* execution. Unless in batchmode (see c.f. pgaio_enter_batchmode()), the
* IO will be submitted immediately after.
*/
PGAIO_HS_STAGED,
/* IO has been submitted to the IO method for execution */
PGAIO_HS_SUBMITTED,
/* IO finished, but result has not yet been processed */
PGAIO_HS_COMPLETED_IO,
/*
* IO completed, shared completion has been called.
*
* If the IO completion occurs in the issuing backend, local callbacks
* will immediately be called. Otherwise the handle stays in
* COMPLETED_SHARED until the issuing backend waits for the completion of
* the IO.
*/
PGAIO_HS_COMPLETED_SHARED,
/*
* IO completed, local completion has been called.
*
* After this the handle will be made reusable and go into IDLE state.
*/
PGAIO_HS_COMPLETED_LOCAL,
} PgAioHandleState;
typedef struct PgAioHandle
{
/* all state updates should go through pgaio_io_update_state() */
int state:8;
/* what are we operating on */
int target:8;
/* which IO operation */
int op:8;
} PgAioHandle;
void update_state(PgAioHandle* ioh, int new_state)
{
write_barrier();
ioh->state = new_state;
}
void reclaim(PgAioHandle* ioh)
{
assert(ioh->state != PGAIO_HS_IDLE);
if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
{
// read_barrier();
update_state(ioh, PGAIO_HS_COMPLETED_LOCAL);
}
update_state(ioh, PGAIO_HS_IDLE);
ioh->op = PGAIO_OP_INVALID;
ioh->target = PGAIO_TID_INVALID;
}
void wait_for_free(PgAioHandle* ioh)
{
while (1)
{
if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
{
read_barrier();
reclaim(ioh);
return;
}
}
}
void* io_thread(void* arg)
{
PgAioHandle* ioh = (PgAioHandle*)arg;
while (1)
{
while (ioh->state != PGAIO_HS_SUBMITTED);
read_barrier();
assert(ioh->op != PGAIO_OP_INVALID);
update_state(ioh, PGAIO_HS_COMPLETED_SHARED);
}
}
int main()
{
PgAioHandle ioh = {PGAIO_HS_IDLE, PGAIO_TID_INVALID, PGAIO_OP_INVALID};
pthread_t thread;
pthread_create(&thread, 0, &io_thread, &ioh);
while (1) {
assert(ioh.op == PGAIO_OP_INVALID);
assert(ioh.state == PGAIO_HS_IDLE);
ioh.op = PGAIO_OP_READV;
update_state(&ioh, PGAIO_HS_SUBMITTED);
wait_for_free(&ioh);
}
}