lwlock-v1.patch
application/octet-stream
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 32cb819..dae33c6 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -27,6 +27,7 @@
#include "commands/async.h"
#include "miscadmin.h"
#include "pg_trace.h"
+#include "storage/atomic.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/spin.h"
@@ -36,18 +37,58 @@
extern slock_t *ShmemLock;
+/*
+ * If compare-and-swap is not available, the mutex protects the entire LWLock.
+ * But when compare-and-swap is available, the state is manipulated using
+ * cas_int32() and only the remaining members are protected by the mutex.
+ */
typedef struct LWLock
{
- slock_t mutex; /* Protects LWLock and queue of PGPROCs */
+ slock_t mutex; /* See notes above */
+ int32 state; /* Lock state */
bool releaseOK; /* T if ok to release waiters */
- char exclusive; /* # of exclusive holders (0 or 1) */
- int shared; /* # of shared holders (0..MaxBackends) */
PGPROC *head; /* head of list of waiting PGPROCs */
PGPROC *tail; /* tail of list of waiting PGPROCs */
/* tail is undefined when head is NULL */
} LWLock;
/*
+ * A lightweight lock can be unlocked, exclusively locked (by exactly one
+ * locker), or share-locked (by one or more lockers). We maintain the state of
+ * the lock in a single integer variable so that it can be atomically
+ * updated using compare-and-swap, where available.
+ */
+#define LWSTATE_UNLOCKED 0
+#define LWSTATE_EXCLUSIVE (-2)
+#define LWSTATE_SHARED 2
+#define LWSTATE_IS_EXCLUSIVE(n) ((n) < 0)
+#define LWSTATE_IS_SHARED(n) ((n) > 1)
+#ifdef HAVE_COMPARE_AND_SWAP
+#define LWSTATE_IS_UNLOCKED(n) ((n & ~1) == 0)
+#else
+#define LWSTATE_IS_UNLOCKED(n) ((n) == 0)
+#endif
+
+#ifdef HAVE_COMPARE_AND_SWAP
+/*
+ * When HAVE_COMPARE_AND_SWAP is set, we use the low bit of the state to
+ * track whether waiters are present. This allows the lwlock to be released
+ * without taking the mutex, unless there are actually waiters that need to
+ * be woken up.
+ */
+#define LWSTATE_WAITERS 1
+#define LWSTATE_HAS_WAITERS(n) ((n) & 1)
+/* Last shared or exclusive lock requires a wakeup. */
+#define LWSTATE_NEEDS_WAKEUP(n) ((n) == -1 || (n) == 3)
+
+static bool LWLockCASAcquire(volatile LWLock *lock, LWLockMode mode,
+ int wait_ok);
+static bool LWLockCASRelease(volatile LWLock *lock, LWLockMode mode,
+ int wake_ok);
+static void LWLockCASClearWaitMark(volatile LWLock *lock);
+#endif
+
+/*
* All the LWLock structs are allocated as an array in shared memory.
* (LWLockIds are indexes into the array.) We force the array stride to
* be a power of 2, which saves a few cycles in indexing, but more
@@ -85,6 +126,9 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
static int num_held_lwlocks = 0;
static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
+#ifdef HAVE_COMPARE_AND_SWAP
+static LWLockMode held_lwlock_modes[MAX_SIMUL_LWLOCKS];
+#endif
static int lock_addin_request = 0;
static bool lock_addin_request_allowed = true;
@@ -260,8 +304,7 @@ CreateLWLocks(void)
{
SpinLockInit(&lock->lock.mutex);
lock->lock.releaseOK = true;
- lock->lock.exclusive = 0;
- lock->lock.shared = 0;
+ lock->lock.state = LWSTATE_UNLOCKED;
lock->lock.head = NULL;
lock->lock.tail = NULL;
}
@@ -360,6 +403,15 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
*/
HOLD_INTERRUPTS();
+#ifdef HAVE_COMPARE_AND_SWAP
+ /*
+ * If we have compare-and-swap, we can try to grap the lock using an atomic
+ * operation, without actually taking the mutex lock.
+ */
+ if (LWLockCASAcquire(lock, mode, false))
+ goto lock_acquired;
+#endif
+
/*
* Loop here to try to acquire lock after each time we are signaled by
* LWLockRelease.
@@ -378,7 +430,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
*/
for (;;)
{
- bool mustwait;
+ bool acquired;
/* Acquire mutex. Time spent holding mutex should be short! */
SpinLockAcquire(&lock->mutex);
@@ -387,30 +439,36 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
if (retry)
lock->releaseOK = true;
+#ifdef HAVE_COMPARE_AND_SWAP
+ /* Either acquire the mutex, or mark it as having waiters. */
+ acquired = LWLockCASAcquire(lock, mode, true);
+#else
+
/* If I can get the lock, do so quickly. */
if (mode == LW_EXCLUSIVE)
{
- if (lock->exclusive == 0 && lock->shared == 0)
+ if (LWSTATE_IS_UNLOCKED(lock->state))
{
- lock->exclusive++;
- mustwait = false;
+ lock->state = LWSTATE_EXCLUSIVE;
+ acquired = true;
}
else
- mustwait = true;
+ acquired = false;
}
else
{
- if (lock->exclusive == 0)
+ if (!LWSTATE_IS_EXCLUSIVE(lock->state))
{
- lock->shared++;
- mustwait = false;
+ lock->state += LWSTATE_SHARED;
+ acquired = true;
}
else
- mustwait = true;
+ acquired = false;
}
+#endif
- if (!mustwait)
- break; /* got the lock */
+ if (acquired)
+ break; /* got the lock */
/*
* Add myself to wait queue.
@@ -474,9 +532,17 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex);
+#ifdef HAVE_COMPARE_AND_SWAP
+ /* The mutex-free fast-path jumps here if it acquires the lock. */
+lock_acquired:
+#endif
+
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
/* Add lock to list of locks held by this backend */
+#ifdef HAVE_COMPARE_AND_SWAP
+ held_lwlock_modes[num_held_lwlocks] = mode;
+#endif
held_lwlocks[num_held_lwlocks++] = lockid;
/*
@@ -497,7 +563,7 @@ bool
LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
{
volatile LWLock *lock = &(LWLockArray[lockid].lock);
- bool mustwait;
+ bool acquired;
PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
@@ -512,35 +578,50 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
*/
HOLD_INTERRUPTS();
- /* Acquire mutex. Time spent holding mutex should be short! */
+#ifdef HAVE_COMPARE_AND_SWAP
+
+ /*
+ * If we're using compare-and-swap, LWLockCASAcquire() does all the heavy
+ * lifting on our behalf.
+ */
+ acquired = LWLockCASAcquire(lock, mode, false);
+
+#else
+
+ /*
+ * We don't have compare-and-swap, and must acquire the mutex to check
+ * and update the lock state. Time spent holding mutex should be short!
+ */
SpinLockAcquire(&lock->mutex);
/* If I can get the lock, do so quickly. */
if (mode == LW_EXCLUSIVE)
{
- if (lock->exclusive == 0 && lock->shared == 0)
+ if (LWSTATE_IS_UNLOCKED(lock->state))
{
- lock->exclusive++;
- mustwait = false;
+ lock->state = LWSTATE_EXCLUSIVE;
+ acquired = true;
}
else
- mustwait = true;
+ acquired = false;
}
else
{
- if (lock->exclusive == 0)
+ if (LWSTATE_IS_UNLOCKED(lock->state))
{
- lock->shared++;
- mustwait = false;
+ lock->state += LWSTATE_SHARED;
+ acquired = true;
}
else
- mustwait = true;
+ acquired = false;
}
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex);
- if (mustwait)
+#endif
+
+ if (!acquired)
{
/* Failed to get lock, so release interrupt holdoff */
RESUME_INTERRUPTS();
@@ -550,11 +631,14 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
else
{
/* Add lock to list of locks held by this backend */
+#ifdef HAVE_COMPARE_AND_SWAP
+ held_lwlock_modes[num_held_lwlocks] = mode;
+#endif
held_lwlocks[num_held_lwlocks++] = lockid;
TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
}
- return !mustwait;
+ return acquired;
}
/*
@@ -567,6 +651,10 @@ LWLockRelease(LWLockId lockid)
PGPROC *head;
PGPROC *proc;
int i;
+ bool wakeup;
+#ifdef HAVE_COMPARE_AND_SWAP
+ LWLockMode mode;
+#endif
PRINT_LWDEBUG("LWLockRelease", lockid, lock);
@@ -581,21 +669,47 @@ LWLockRelease(LWLockId lockid)
}
if (i < 0)
elog(ERROR, "lock %d is not held", (int) lockid);
+#ifdef HAVE_COMPARE_AND_SWAP
+ mode = held_lwlock_modes[i];
+#endif
num_held_lwlocks--;
for (; i < num_held_lwlocks; i++)
+ {
held_lwlocks[i] = held_lwlocks[i + 1];
+#ifdef HAVE_COMPARE_AND_SWAP
+ held_lwlock_modes[i] = held_lwlock_modes[i + 1];
+#endif
+ }
+
+#ifdef HAVE_COMPARE_AND_SWAP
+ /*
+ * If no waiters need to be awoken, we can drop the LWLock without needing
+ * to acquire the spinlock.
+ */
+ if (!LWLockCASRelease(lock, mode, false))
+ {
+ head = NULL;
+ goto fast_exit;
+ }
+#endif
/* Acquire mutex. Time spent holding mutex should be short! */
SpinLockAcquire(&lock->mutex);
+#ifdef HAVE_COMPARE_AND_SWAP
+ /* LWLockCASRelease does all the hard work. */
+ wakeup = LWLockCASRelease(lock, mode, true);
+#else
/* Release my hold on lock */
- if (lock->exclusive > 0)
- lock->exclusive--;
+ if (LWSTATE_IS_EXCLUSIVE(lock->state))
+ lock->state = LWSTATE_UNLOCKED;
else
{
- Assert(lock->shared > 0);
- lock->shared--;
+ Assert(LWSTATE_IS_SHARED(lock->state));
+ lock->state -= LWSTATE_SHARE_INCREMENT;
}
+ wakeup = LWSTATE_IS_UNLOCKED(lock->state);
+#endif
/*
* See if I need to awaken any waiters. If I released a non-last shared
@@ -606,7 +720,7 @@ LWLockRelease(LWLockId lockid)
head = lock->head;
if (head != NULL)
{
- if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
+ if (wakeup && lock->releaseOK)
{
/*
* Remove the to-be-awakened PGPROCs from the queue. If the front
@@ -625,6 +739,14 @@ LWLockRelease(LWLockId lockid)
proc->lwWaitLink = NULL;
/* prevent additional wakeups until retryer gets to run */
lock->releaseOK = false;
+
+#ifdef HAVE_COMPARE_AND_SWAP
+ /*
+ * Clear the waiters bit if we removed all of the waiters.
+ */
+ if (lock->head == NULL)
+ LWLockCASClearWaitMark(lock);
+#endif
}
else
{
@@ -636,6 +758,9 @@ LWLockRelease(LWLockId lockid)
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex);
+#ifdef HAVE_COMPARE_AND_SWAP
+fast_exit:
+#endif
TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
/*
@@ -697,3 +822,131 @@ LWLockHeldByMe(LWLockId lockid)
}
return false;
}
+
+#ifdef HAVE_COMPARE_AND_SWAP
+
+/*
+ * LWLockCASAcquire - attempt to acquire an LWLock using compare-and-swap
+ *
+ * Returns true if the LWLock has been acquired, false if not.
+ *
+ * If wait_ok is true, and the lock cannot be acquired, try to set the waiters
+ * bit instead.
+ */
+static bool
+LWLockCASAcquire(volatile LWLock *lock, LWLockMode mode, int wait_ok)
+{
+ int current_guess = LWSTATE_UNLOCKED;
+
+ while (1)
+ {
+ int delta;
+ int result;
+
+ /* Compute delta to apply, or give up! */
+ if (mode == LW_EXCLUSIVE)
+ {
+ if (LWSTATE_IS_UNLOCKED(current_guess))
+ delta = LWSTATE_EXCLUSIVE;
+ else if (LWSTATE_HAS_WAITERS(current_guess) || !wait_ok)
+ return false;
+ else
+ delta = LWSTATE_WAITERS;
+ }
+ else
+ {
+ if (!LWSTATE_IS_EXCLUSIVE(current_guess))
+ delta = LWSTATE_SHARED;
+ else if (LWSTATE_HAS_WAITERS(current_guess) || !wait_ok)
+ return false;
+ else
+ delta = LWSTATE_WAITERS;
+ }
+
+ /* Attempt to add delta. */
+ result = cas_int32(&lock->state, current_guess, current_guess + delta);
+
+ /* If it worked, we're done. */
+ if (current_guess == result)
+ return (delta != LWSTATE_WAITERS);
+
+ /* We are unlucky and must retry. */
+ current_guess = result;
+ }
+}
+
+/*
+ * LWLockCASRelease - release an LWLock using compare-and-swap
+ *
+ * If wake_ok is true, the lock is released always; if false, it's released
+ * only when no wakeups are required. The return value is true if wakeups
+ * are required and false otherwise.
+ */
+static bool
+LWLockCASRelease(volatile LWLock *lock, LWLockMode mode, int wake_ok)
+{
+ int current_guess;
+ int delta;
+
+ if (mode == LW_EXCLUSIVE)
+ {
+ current_guess = LWSTATE_EXCLUSIVE;
+ delta = LWSTATE_EXCLUSIVE;
+ }
+ else
+ {
+ current_guess = LWSTATE_SHARED;
+ delta = LWSTATE_SHARED;
+ }
+
+ while (1)
+ {
+ int result;
+
+ /* Attempt to subtract delta. */
+ result = cas_int32(&lock->state, current_guess, current_guess - delta);
+ if (current_guess == result)
+ return LWSTATE_NEEDS_WAKEUP(result);
+
+ /* Be careful about releasing the last lock. */
+ if (!wake_ok && LWSTATE_NEEDS_WAKEUP(result))
+ return true;
+
+#ifdef USE_ASSERT_CHECKING
+ if (mode == LW_EXCLUSIVE)
+ Assert(LWSTATE_IS_EXCLUSIVE(result));
+ else
+ Assert(LWSTATE_IS_SHARED(result));
+#endif
+
+ /* We are unlucky and must retry. */
+ current_guess = result;
+ }
+}
+
+/*
+ * LWLockCASClearWaitMark - remove waiters mark from lock state
+ */
+static void
+LWLockCASClearWaitMark(volatile LWLock *lock)
+{
+ int current_guess = LWSTATE_UNLOCKED + LWSTATE_WAITERS;
+
+ while (1)
+ {
+ int result;
+
+ /* Attempt to subtract delta. */
+ result = cas_int32(&lock->state, current_guess,
+ current_guess - LWSTATE_WAITERS);
+ if (current_guess == result)
+ return;
+
+ Assert(LWSTATE_HAS_WAITERS(result));
+
+ /* We are unlucky and must retry. */
+ current_guess = result;
+ }
+}
+
+#endif
diff --git a/src/include/storage/atomic.h b/src/include/storage/atomic.h
new file mode 100644
index 0000000..a1f909f
--- /dev/null
+++ b/src/include/storage/atomic.h
@@ -0,0 +1,65 @@
+/*-------------------------------------------------------------------------
+ *
+ * atomic.h
+ * Hardware-dependent atomic primitives.
+ *
+ * NOTE: No code must rely absolutely on the availability of the
+ * primitives in this file. On architectures where don't have a
+ * working implementation, or where they are not supported, use
+ * spinlocks or some other mechanism.
+ *
+ * CAUTION: Be sure that any operation defined here represents a sequence
+ * point - ie, loads and stores of other values must not be moved across
+ * a lock or unlock. In most cases it suffices to make the operation be
+ * done through a "volatile" pointer.
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/atomic.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef ATOMIC_H
+#define ATOMIC_H
+
+#if defined(__GNUC__) || defined(__INTEL_COMPILER)
+
+/* 32-bit i386, AMD Opteron, Intel EM64T */
+#if defined(__i386__) || defined(__x86_64__)
+
+#define HAVE_COMPARE_AND_SWAP 1
+
+/*
+ * We might someday want to have more than one CAS variant (e.g. CAS is
+ * commonly used with pointers to build lock-free data structures), but int32
+ * is sufficient for now.
+ */
+static __inline__ int32
+cas_int32(volatile int32 *var, volatile int32 old, volatile int32 new)
+{
+ /*
+ * cmpxchg compares EAX to its second operand (which must be a memory
+ * location). If they are equal, it writes its first operand to that
+ * address; otherwise, it writes the original value back to that address.
+ * In either case, the original value of the memory location is left in
+ * EAX. We therefore force GCC to allocate old in EAX (using the "a"
+ * constraints). We mark memory and the condition code register as
+ * clobbered. The former is not true but prevents GCC from reordering
+ * other instructions around our atomic primitive; the latter is correct
+ * (ZF will be set if old was equal to *var).
+ */
+ __asm__ __volatile__(
+ " lock \n"
+ " cmpxchg %2,%1 \n"
+: "+a" (old), "+m" (*var)
+: "q" (new), "a" (old)
+: "memory", "cc");
+ return old;
+}
+
+#endif /* defined(__i386__) || defined(__x86_64__) */
+
+#endif /* defined(__GNUC__) || defined(__INTEL_COMPILER) */
+
+#endif /* ATOMIC_H */