pg_lwlock_part.patch

application/octet-stream

Filename: pg_lwlock_part.patch
Type: application/octet-stream
Part: 0
Message: Re: spinlock contention
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 0fe7ce4..9cf6513 100644
*** a/src/backend/storage/lmgr/lwlock.c
--- b/src/backend/storage/lmgr/lwlock.c
***************
*** 31,36 ****
--- 31,38 ----
  #include "storage/predicate.h"
  #include "storage/proc.h"
  #include "storage/spin.h"
+ #include "storage/backendid.h"
+ #include "storage/atomic.h"
  
  
  /* We use the ShmemLock spinlock to protect LWLockAssign */
*************** typedef struct LWLock
*** 42,48 ****
--- 44,52 ----
  	slock_t		mutex;			/* Protects LWLock and queue of PGPROCs */
  	bool		releaseOK;		/* T if ok to release waiters */
  	char		exclusive;		/* # of exclusive holders (0 or 1) */
+ #if LWLOCK_LOCK_PARTS == 1
  	int			shared;			/* # of shared holders (0..MaxBackends) */
+ #endif
  	PGPROC	   *head;			/* head of list of waiting PGPROCs */
  	PGPROC	   *tail;			/* tail of list of waiting PGPROCs */
  	/* tail is undefined when head is NULL */
*************** NON_EXEC_STATIC LWLockPadded *LWLockArra
*** 77,82 ****
--- 81,275 ----
  
  
  /*
+  * Partitioned shared counter case
+  */
+ #if LWLOCK_LOCK_PARTS > 1
+ 
+ /*
+  * Size of one LWLock shared counter partition in bytes.
+  * Choosen to fill exactly one cache line on common architectures.
+  */
+ #define LWLOCK_PART_SIZE 64
+ 
+ /*
+  * Number of locks per partition.
+  * If we don't have an atomic increment instruction, we fall back to using a
+  * per-partition mutex
+  */
+ #ifdef HAVE_ATOMIC_FETCH_ADD_FENCE_32
+ 
+ #define LWLOCK_PART_LOCKS (LWLOCK_PART_SIZE / sizeof(int))
+ 
+ #else /* !HAVE_ATOMIL_FETCH_ADD_FENCE_32 */
+ 
+ #define LWLOCK_PART_LOCKS ((LWLOCK_PART_SIZE - sizeof(slock_t)) / sizeof(int))
+ 
+ #endif /* HAVE_ATOMIC_FETCH_ADD_FENCE_32 */
+ 
+ /* Number of LWLockPart structs we need for nlocks locks. */
+ #define LWLOCK_PARTS(nlocks) (\
+ 	(1 + (nlocks-1) / LWLOCK_PART_LOCKS) * LWLOCK_LOCK_PARTS \
+ )
+ 
+ /*
+  * Shared-counter partition for LWLocks. Each Backend increments the
+  * shared counter in *its* partition when acquiring a LWLock in shared
+  * mode. We limit the size of one partition to the size of a cache-line,
+  * and therefore create LWLOCK_LOCK_PARTS * (<NLocks> / LWLOCK_PART_LOCKS)
+  * partition, not only LWLOCK_LOCK_PARTS partitions. The shared-partition
+  * array can thus be thought of as an two-dimensional array, indexed by
+  * (BackendId % LWLOCK_LOCK_PARTS) and (lockid / LWLOCK_PART_LOCKS).
+  *
+  * An int is quite excessive here, but making it any smaller would require
+  * an overflow check in the shared lock acquisition fast path. That in turn
+  * would require us to use a compare-and-exchange instead of an atomic
+  * increment there. So overall, using an int seems benefical, even if it
+  * means we'll waste a bit of processor cache.
+  */
+ typedef struct LWLockPart
+ {
+ #ifndef HAVE_ATOMIC_FETCH_ADD_FENCE_32
+ 	slock_t		mutex;
+ #endif
+ 	int			shared[LWLOCK_PART_LOCKS];
+ } LWLockPart;
+ 
+ NON_EXEC_STATIC LWLockPart *LWLockPartArray = NULL;
+ 
+ /*
+  * The resulting layout of LWLockPartArray is
+  * 
+  * <Lcks 0                 .. LWLOCK_PART_LOCKS-1,   Part 0>
+  * ...
+  * <Lcks 0                 .. LWLOCK_PART_LOCKS-1,   Part LWLOCK_LOCK_PARTS-1>
+  * <Lcks LWLOCK_PART_LOCKS .. 2*LWLOCK_PART_LOCKS-1, Part 0>
+  * ...
+  * <Lcks LWLOCK_PART_LOCKS .. 2*LWLOCK_PART_LOCKS-1, Part LWLOCK_LOCK_PARTS-1>
+  * ...
+  *
+  * where each line <Locks N .. M, Part P> contains the shared counter
+  * partition P for locks N to M, i.e. (N-M+1) individual counter.
+  */
+ #define LWLOCK_PART(lock, lockid, backendid) ( \
+ 	LWLockPartArray + ( \
+ 		((lockid) / LWLOCK_PART_LOCKS) * LWLOCK_LOCK_PARTS + \
+ 		((backendid) % LWLOCK_LOCK_PARTS) \
+ 	)) \
+ 
+ #ifdef HAVE_ATOMIC_FETCH_ADD_FENCE_32
+ 
+ #define LWLOCK_PART_SHARED_OPS_ATOMIC
+ 
+ #define LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, partid) \
+ 	AtomicFetchAddFence32((part)->shared + \
+ 	                      (lockid % LWLOCK_PART_LOCKS), \
+ 	                      1)
+ 
+ #define LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, partid) \
+ 	AtomicFetchAddFence32((part)->shared + \
+ 	                      (lockid % LWLOCK_PART_LOCKS), \
+ 	                      -1)
+ 
+ /* Taken care of by AtomicFetchAddFence32() */
+ #define LWLOCK_PART_SHARED_FENCE()
+ 
+ #else /* !HAVE_ATOMIC_FETCH_ADD_FENCE_32 */
+ 
+ #define LWLOCK_PART_SHARED_OPS_ATOMIC
+ 
+ #define LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, partid) \
+ 	xadd_atomic(&((part)->mutex), \
+ 	            (part)->shared + (lockid % LWLOCK_PART_LOCKS), \
+ 	            1)
+ 
+ #define LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, partid) \
+ 	xadd_atomic(&((part)->mutex), \
+ 	            (part)->shared + (lockid % LWLOCK_PART_LOCKS), \
+ 	            -1)
+ 	
+ /* *Not* taken care of by xadd_atomic() */
+ #define LWLOCK_PART_SHARED_FENCE() fence()
+ 
+ inline static int xadd_atomic(
+ 	volatile slock_t* mutex,
+ 	volatile int* ptr,
+ 	int delta
+ ) {
+ 	int 		previous;
+ 	
+ 	/* Atomic add, returning the old value */
+ 	SpinLockAcquire(mutex);
+ 	previous = *ptr;
+ 	*ptr += delta;
+ 	SpinLockRelease(mutex);
+ 	
+ 	return previous;
+ }
+ 
+ inline static void fence()
+ {
+ 	slock_t		fence;
+ 
+ 	/* For the lack of a better method ... */
+ 	SpinLockInit(&fence);
+ 	SpinLockAcquire(&fence);
+ 	SpinLockRelease(&fence);
+ }
+ 
+ #endif /* HAVE_ATOMIC_FETCH_ADD_FENCE_32 */
+ 
+ #define LWLOCK_IS_SHARED(result, lock, lockid) do { \
+ 		int i; \
+ 		result = false; \
+ 		for(i=0; i < LWLOCK_LOCK_PARTS; ++i) { \
+ 			volatile LWLockPart *p = LWLOCK_PART(lock, lockid, i); \
+ 			if (p->shared[lockid % LWLOCK_PART_LOCKS] == 0) \
+ 				continue; \
+ 			result = true; \
+ 			break; \
+ 		} \
+ 	} while(0)
+ 
+ 
+ #else /* LWLOCK_LOCK_PARTS == 1 */
+ 
+ /*
+  * Unpartitioned shared counter case
+  */
+ 
+ #ifdef HAVE_ATOMIC_FETCH_ADD_FENCE_32
+ 
+ #define LWLOCK_PART_SHARED_OPS_ATOMIC
+ 
+ #define LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, partid) \
+ 	AtomicFetchAddFence32(&(lock)->shared, \
+ 	                      1)
+ 
+ #define LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, partid) \
+ 	AtomicFetchAddFence32(&(lock)->shared, \
+ 	                      -1)
+ 
+ /* Taken care of by AtomicFetchAddFence32() */
+ #define LWLOCK_PART_SHARED_FENCE()
+ 
+ #else /* !HAVE_ATOMIC_FETCH_ADD_FENCE_32 */
+ 
+ /* We could of course provide that if we used the lock's mutex to make
+  * the action atomic. But then we'd take the lock's mutex twice during
+  * LWLock Acquisition. So instead, we special-case the situation of
+  * no atomic xadd and one shared counter partition in the implementations
+  * of lock acquisition and release
+  */
+ #undef LWLOCK_PART_SHARED_OPS_ATOMIC
+ 
+ #endif /* HAVE_ATOMIC_FETCH_ADD_FENCE_32 */
+ 
+ #define LWLOCK_IS_SHARED(result, lock, lockid) \
+ 		result = ((lock)->shared != 0)
+ 
+ #endif /* LWLOCK_LOCK_PARTS == 1 */
+ 
+ /*
   * We use this structure to keep track of locked LWLocks for release
   * during error recovery.  The maximum size could be determined at runtime
   * if necessary, but it seems unlikely that more than a few locks could
*************** NON_EXEC_STATIC LWLockPadded *LWLockArra
*** 86,91 ****
--- 279,285 ----
  
  static int	num_held_lwlocks = 0;
  static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
+ static LWLockMode held_lwlocks_mode[MAX_SIMUL_LWLOCKS];
  
  static int	lock_addin_request = 0;
  static bool lock_addin_request_allowed = true;
*************** LWLockShmemSize(void)
*** 225,233 ****
  
  	/* Space for the LWLock array. */
  	size = mul_size(numLocks, sizeof(LWLockPadded));
! 
! 	/* Space for dynamic allocation counter, plus room for alignment. */
  	size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
  
  	return size;
  }
--- 419,438 ----
  
  	/* Space for the LWLock array. */
  	size = mul_size(numLocks, sizeof(LWLockPadded));
! 	
! 	/* Space for dynamic allocation counter,
! 	 * plus room for alignment of LWLockArray.
! 	 */
  	size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
+ 	
+ #if LWLOCK_PART_SIZE > 1
+ 	/* Space for the LWLockPart array */
+ 	size = add_size(size, mul_size(LWLOCK_PARTS(numLocks),
+ 	                               sizeof(LWLockPart)));
+ 	
+ 	/* Room for alignment of LWLockPartArray */
+ 	size = add_size(size, sizeof(LWLockPart));
+ #endif
  
  	return size;
  }
*************** CreateLWLocks(void)
*** 242,251 ****
--- 447,462 ----
  	int			numLocks = NumLWLocks();
  	Size		spaceLocks = LWLockShmemSize();
  	LWLockPadded *lock;
+ #if LWLOCK_PART_SIZE > 1
+ 	LWLockPart *part;
+ #endif
  	int		   *LWLockCounter;
  	char	   *ptr;
  	int			id;
  
+ 	/* Ensure that we didn't mess up the computation of LWLOCK_PART_LOCKS */
+ 	Assert(sizeof(LWLockPart) == LWLOCK_PART_SIZE);
+ 
  	/* Allocate space */
  	ptr = (char *) ShmemAlloc(spaceLocks);
  
*************** CreateLWLocks(void)
*** 256,275 ****
--- 467,507 ----
  	ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
  
  	LWLockArray = (LWLockPadded *) ptr;
+ 	ptr += sizeof(LWLockPadded) * numLocks;
+ 	
+ #if LWLOCK_LOCK_PARTS > 1
+ 	/* Ensure desired alignment of LWLockPart array */
+ 	ptr += LWLOCK_PART_SIZE - ((uintptr_t) ptr) % LWLOCK_PART_SIZE;
+ 	
+ 	LWLockPartArray = (LWLockPart *) ptr;
+ 	ptr += sizeof(LWLockPart) * LWLOCK_PARTS(numLocks);
+ #endif
  
  	/*
  	 * Initialize all LWLocks to "unlocked" state
  	 */
+ 
  	for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
  	{
  		SpinLockInit(&lock->lock.mutex);
  		lock->lock.releaseOK = true;
  		lock->lock.exclusive = 0;
+ #if LWLOCK_LOCK_PARTS == 1
  		lock->lock.shared = 0;
+ #endif
  		lock->lock.head = NULL;
  		lock->lock.tail = NULL;
  	}
  
+ #if LWLOCK_LOCK_PARTS > 1
+ 	for(id = 0, part = LWLockPartArray; id < LWLOCK_PARTS(numLocks); id++, part++) {
+ #ifndef LWLOCK_PART_SHARED_OPS_ATOMIC
+ 		SpinLockInit(&part->mutex);
+ #endif
+ 		memset((char *) part->shared, 0, sizeof(int) * LWLOCK_PART_LOCKS);
+ 	}
+ #endif
+ 
  	/*
  	 * Initialize the dynamic-allocation counter, which is stored just before
  	 * the first LWLock.
*************** void
*** 320,325 ****
--- 552,560 ----
  LWLockAcquire(LWLockId lockid, LWLockMode mode)
  {
  	volatile LWLock *lock = &(LWLockArray[lockid].lock);
+ #if LWLOCK_LOCK_PARTS > 1
+ 	volatile LWLockPart *part = LWLOCK_PART(lock, lockid, MyBackendId);
+ #endif
  	PGPROC	   *proc = MyProc;
  	bool		retry = false;
  	int			extraWaits = 0;
*************** LWLockAcquire(LWLockId lockid, LWLockMod
*** 356,362 ****
  	/* Ensure we will have room to remember the lock */
  	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
  		elog(ERROR, "too many LWLocks taken");
! 
  	/*
  	 * Lock out cancel/die interrupts until we exit the code section protected
  	 * by the LWLock.  This ensures that interrupts will not interfere with
--- 591,597 ----
  	/* Ensure we will have room to remember the lock */
  	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
  		elog(ERROR, "too many LWLocks taken");
! 		
  	/*
  	 * Lock out cancel/die interrupts until we exit the code section protected
  	 * by the LWLock.  This ensures that interrupts will not interfere with
*************** LWLockAcquire(LWLockId lockid, LWLockMod
*** 383,423 ****
  	for (;;)
  	{
  		bool		mustwait;
! 
! 		/* Acquire mutex.  Time spent holding mutex should be short! */
! 		SpinLockAcquire(&lock->mutex);
! 
! 		/* If retrying, allow LWLockRelease to release waiters again */
! 		if (retry)
! 			lock->releaseOK = true;
! 
! 		/* If I can get the lock, do so quickly. */
! 		if (mode == LW_EXCLUSIVE)
  		{
! 			if (lock->exclusive == 0 && lock->shared == 0)
  			{
! 				lock->exclusive++;
  				mustwait = false;
  			}
  			else
  				mustwait = true;
  		}
  		else
  		{
  			if (lock->exclusive == 0)
  			{
! 				lock->shared++;
! 				mustwait = false;
  			}
  			else
  				mustwait = true;
  		}
! 
  		if (!mustwait)
  			break;				/* got the lock */
! 
  		/*
! 		 * Add myself to wait queue.
  		 *
  		 * If we don't have a PGPROC structure, there's no way to wait. This
  		 * should never occur, since MyProc should only be null during shared
--- 618,759 ----
  	for (;;)
  	{
  		bool		mustwait;
! 		
! 		if (mode == LW_SHARED)
  		{
! #ifdef		LWLOCK_PART_SHARED_OPS_ATOMIC
! 			/* Increment shared counter partition. If there's no contention,
! 			 * this is sufficient to take the lock
! 			 */
! 			LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, MyBackendId);
! 			LWLOCK_PART_SHARED_FENCE();
! 			
! 			/* A concurrent exclusive locking attempt does the following
! 			 * three steps
! 			 *   1) Acquire mutex
! 			 *   2) Check shared counter partitions for readers.
! 			 *   3a) If found add proc to wait queue, block, restart at (1)
! 			 *   3b) If not found, set exclusive flag, continue with (4)
! 			 *   4) Enter protected section
! 			 * The fence after the atomic add above ensures that no further
! 			 * such attempt can proceed to (3b) or beyond. There may be
! 			 * pre-existing exclusive locking attempts at step (3b) or beyond,
! 			 * but we can recognize those by either the mutex being taken, or
! 			 * the exclusive flag being set. Conversely, if we see neither, we
! 			 * may proceed and enter the protected section.
! 			 *
! 			 * FIXME: This doesn't work if slock_t is a struct or doesn't
! 			 * use 0 for state "unlocked".
! 			 */
! 
! 			if ((lock->mutex == 0) && (lock->exclusive == 0)) {
! 				/* If retrying, allow LWLockRelease to release waiters again.
! 				 * Usually this happens after we acquired the mutex, but if
! 				 * we skip that, we still need to set releaseOK.
! 				 *
! 				 * Acquiring the mutex here is not really an option - if many
! 				 * reader are awoken simultaneously by an exclusive unlock,
! 				 * that would be a source of considerable contention.
! 				 *
! 				 * Fotunately, this is safe even without the mutex. First,
! 				 * there actually cannot be any non-fast path unlocking
! 				 * attempt in progress, because we'd then either still see
! 				 * the exclusive flag set or the mutex being taken. And
! 				 * even if there was, and such an attempt cleared the flag
! 				 * immediately after we set it, it'd also wake up some waiter
! 				 * who'd then re-set the flag.
! 				 *
! 				 * The only reason to do this here, and not directly
! 				 * after returning from PGSemaphoreLock(), is that it seems
! 				 * benefical to make SpinLockAcquire() the first thing to
! 				 * touch the lock if possible, in case we acquire the spin
! 				 * lock at all. That way, the cache line doesn't go through
! 				 * a possible shared state, but instead directly to exclusive.
! 				 * On Opterons at least, there seems to be a difference, c.f.
! 				 * the comment above tas() for x86_64 in s_lock.h
! 				 */
! 				if (retry && !lock->releaseOK)
! 					lock->releaseOK = true;
! 					
! 				goto lock_acquired;
! 			}
! 				
! 			/* At this point, we don't know if the concurrent exclusive locker
! 			 * has proceeded to (3b) or blocked. We must take the mutex and
! 			 * re-check
! 			 */
! #endif /* LWLOCK_PART_SHARED_OPS_ATOMIC */
! 			
! 			/* Acquire mutex.  Time spent holding mutex should be short! */
! 			SpinLockAcquire(&lock->mutex);
! 			
! 			if (lock->exclusive == 0)
  			{
! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
! 				/* Already incremented the shared counter partition above */
! #else
! 				lock->shared++;
! #endif
  				mustwait = false;
  			}
  			else
+ 			{
+ #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
+ 				/* Must undo shared counter partition increment. Note that
+ 				 * we *need* to do that while holding the mutex. Otherwise,
+ 				 * the exclusive lock could be released and attempted to be
+ 				 * re-acquired before we undo the increment. That attempt
+ 				 * would then block, even though there'd be no lock holder
+ 				 * left
+ 				 */
+ 				LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, MyBackendId);
+ #endif
  				mustwait = true;
+ 			}
  		}
  		else
  		{
+ 			/* Step (1). Acquire mutex. Time spent holding mutex should be
+ 			 *                          short!
+ 			 */
+ 			SpinLockAcquire(&lock->mutex);
+ 			
  			if (lock->exclusive == 0)
  			{
! 				/* Step (2). Check for shared lockers. This surely happens
! 				 * after (1), otherwise SpinLockAcquire() is broken. Lock
! 				 * acquire semantics demand that no load must be re-ordered
! 				 * from after a lock acquisition to before, for obvious
! 				 * reasons.
! 				 */
! 
! 				LWLOCK_IS_SHARED(mustwait, lock, lockid);
! 				
! 				if (!mustwait) {
! 					/* Step (3a). Set exclusive flag. This surely happens
! 					 * after (2) because it depends on the result of (2),
! 					 * no matter how much reordering is going on here.
! 					 */
! 					lock->exclusive++;
! 				}
  			}
  			else
  				mustwait = true;
  		}
! 		
! 		/* If retrying, allow LWLockRelease to release waiters again.
! 		 * This is also separately done in the LW_SHARED early exit case
! 		 * above, and in contrast to there we don't hold the mutex there.
! 		 * See the comment there for why this is safe
! 		 */
! 		if (retry)
! 			lock->releaseOK = true;
! 		
  		if (!mustwait)
  			break;				/* got the lock */
! 			
  		/*
! 		 * Step (3b). Add myself to wait queue.
  		 *
  		 * If we don't have a PGPROC structure, there's no way to wait. This
  		 * should never occur, since MyProc should only be null during shared
*************** LWLockAcquire(LWLockId lockid, LWLockMod
*** 477,487 ****
  
  	/* We are done updating shared state of the lock itself. */
  	SpinLockRelease(&lock->mutex);
  
  	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
  
  	/* Add lock to list of locks held by this backend */
! 	held_lwlocks[num_held_lwlocks++] = lockid;
  
  	/*
  	 * Fix the process wait semaphore's count for any absorbed wakeups.
--- 813,835 ----
  
  	/* We are done updating shared state of the lock itself. */
  	SpinLockRelease(&lock->mutex);
+ 	
+ 	/* Step 4. Enter protected section. This surely happens after (3),
+ 	 * this time because lock release semantics demand that no store
+ 	 * must be moved from before a lock release to after the release,
+ 	 * again for obvious reasons
+ 	 */
+ 
+ #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
+ lock_acquired:
+ #endif
  
  	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
  
  	/* Add lock to list of locks held by this backend */
! 	held_lwlocks[num_held_lwlocks] = lockid;
! 	held_lwlocks_mode[num_held_lwlocks] = mode;
! 	++num_held_lwlocks;
  
  	/*
  	 * Fix the process wait semaphore's count for any absorbed wakeups.
*************** bool
*** 501,506 ****
--- 849,857 ----
  LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
  {
  	volatile LWLock *lock = &(LWLockArray[lockid].lock);
+ #if LWLOCK_LOCK_PARTS > 1
+ 	volatile LWLockPart *part = LWLOCK_PART(lock, lockid, MyBackendId);
+ #endif
  	bool		mustwait;
  
  	PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
*************** LWLockConditionalAcquire(LWLockId lockid
*** 516,541 ****
  	 */
  	HOLD_INTERRUPTS();
  
! 	/* Acquire mutex.  Time spent holding mutex should be short! */
! 	SpinLockAcquire(&lock->mutex);
! 
! 	/* If I can get the lock, do so quickly. */
! 	if (mode == LW_EXCLUSIVE)
  	{
! 		if (lock->exclusive == 0 && lock->shared == 0)
  		{
! 			lock->exclusive++;
  			mustwait = false;
  		}
  		else
  			mustwait = true;
  	}
  	else
  	{
  		if (lock->exclusive == 0)
  		{
! 			lock->shared++;
! 			mustwait = false;
  		}
  		else
  			mustwait = true;
--- 867,960 ----
  	 */
  	HOLD_INTERRUPTS();
  
! 	if (mode == LW_SHARED)
  	{
! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
! 		/* Increment shared counter partition. If there's no contention,
! 		 * this is sufficient to take the lock
! 		 */
! 		LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, MyBackendId);
! 		LWLOCK_PART_SHARED_FENCE();
! 
! 		/* A concurrent exclusive locking attempt does the following
! 		 * three steps
! 		 *   1) Acquire mutex
! 		 *   2) Check shared counter partitions for readers.
! 		 *   3a) If found add proc to wait queue, block, restart at (1)
! 		 *   3b) If not found, set exclusive flag, continue with (4)
! 		 *   4) Enter protected section
! 		 * The fence after the atomic add above ensures that no further
! 		 * such attempt can proceed to (3b) or beyond. There may be
! 		 * pre-existing exclusive locking attempts at step (3b) or beyond,
! 		 * but we can recognize those by either the mutex being taken, or
! 		 * the exclusive flag being set. Conversely, if we see neither, we
! 		 * may proceed and enter the protected section.
! 		 *
! 		 * FIXME: This doesn't work if slock_t is a struct or doesn't
! 		 * use 0 for state "unlocked".
! 		 */
! 
! 		if ((lock->mutex == 0) && (lock->exclusive == 0))
! 			goto lock_acquired;
! 
! 		/* At this point, we don't know if the concurrent exclusive locker
! 		 * has proceeded to (3b) or blocked. We must take the mutex and
! 		 * re-check
! 		 */
! #endif /* LWLOCK_PART_SHARED_OPS_ATOMIC */
! 
! 		/* Acquire mutex.  Time spent holding mutex should be short! */
! 		SpinLockAcquire(&lock->mutex);
! 
! 		if (lock->exclusive == 0)
  		{
! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
! 			/* Already incremented the shared counter partition above */
! #else
! 			lock->shared++;
! #endif
  			mustwait = false;
  		}
  		else
+ 		{
+ #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
+ 			/* Must undo shared counter partition increment. Note that
+ 			 * we *need* to do that while holding the mutex. Otherwise,
+ 			 * the exclusive lock could be released and attempted to be
+ 			 * re-acquired before we undo the increment. That attempt
+ 			 * would then block, even though there'd be no lock holder
+ 			 * left
+ 			 */
+ 			LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, MyBackendId);
+ #endif
  			mustwait = true;
+ 		}
  	}
  	else
  	{
+ 		/* Step (1). Acquire mutex. Time spent holding mutex should be
+ 		 *                          short!
+ 		 */
+ 		SpinLockAcquire(&lock->mutex);
+ 
  		if (lock->exclusive == 0)
  		{
! 			/* Step (2). Check for shared lockers. This surely happens
! 			 * after (1), otherwise SpinLockAcquire() is broken. Lock
! 			 * acquire semantics demand that no load must be re-ordered
! 			 * from after a lock acquisition to before, for obvious
! 			 * reasons.
! 			 */
! 
! 			LWLOCK_IS_SHARED(mustwait, lock, lockid);
! 
! 			if (!mustwait) {
! 				/* Step (3a). Set exclusive flag. This surely happens
! 				 * after (2) because it depends on the result of (2),
! 				 * no matter how much reordering is going on here.
! 				 */
! 				lock->exclusive++;
! 			}
  		}
  		else
  			mustwait = true;
*************** LWLockConditionalAcquire(LWLockId lockid
*** 550,564 ****
  		RESUME_INTERRUPTS();
  		LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
  		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode);
! 	}
! 	else
! 	{
! 		/* Add lock to list of locks held by this backend */
! 		held_lwlocks[num_held_lwlocks++] = lockid;
! 		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
  	}
  
! 	return !mustwait;
  }
  
  /*
--- 969,990 ----
  		RESUME_INTERRUPTS();
  		LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
  		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode);
! 		
! 		return false;
  	}
  
! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
! lock_acquired:
! #endif
! 
! 	TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
! 
! 	/* Add lock to list of locks held by this backend */
! 	held_lwlocks[num_held_lwlocks] = lockid;
! 	held_lwlocks_mode[num_held_lwlocks] = mode;
! 	++num_held_lwlocks;
! 
! 	return true;
  }
  
  /*
*************** void
*** 568,575 ****
  LWLockRelease(LWLockId lockid)
  {
  	volatile LWLock *lock = &(LWLockArray[lockid].lock);
! 	PGPROC	   *head;
  	PGPROC	   *proc;
  	int			i;
  
  	PRINT_LWDEBUG("LWLockRelease", lockid, lock);
--- 994,1005 ----
  LWLockRelease(LWLockId lockid)
  {
  	volatile LWLock *lock = &(LWLockArray[lockid].lock);
! #if LWLOCK_LOCK_PARTS > 1
! 	volatile LWLockPart *part = LWLOCK_PART(lock, lockid, MyBackendId);
! #endif
! 	PGPROC	   *head = NULL;
  	PGPROC	   *proc;
+ 	LWLockMode	mode;
  	int			i;
  
  	PRINT_LWDEBUG("LWLockRelease", lockid, lock);
*************** LWLockRelease(LWLockId lockid)
*** 578,583 ****
--- 1008,1014 ----
  	 * Remove lock from list of locks held.  Usually, but not always, it will
  	 * be the latest-acquired lock; so search array backwards.
  	 */
+ 	 
  	for (i = num_held_lwlocks; --i >= 0;)
  	{
  		if (lockid == held_lwlocks[i])
*************** LWLockRelease(LWLockId lockid)
*** 585,604 ****
  	}
  	if (i < 0)
  		elog(ERROR, "lock %d is not held", (int) lockid);
  	num_held_lwlocks--;
! 	for (; i < num_held_lwlocks; i++)
  		held_lwlocks[i] = held_lwlocks[i + 1];
  
! 	/* Acquire mutex.  Time spent holding mutex should be short! */
! 	SpinLockAcquire(&lock->mutex);
! 
! 	/* Release my hold on lock */
! 	if (lock->exclusive > 0)
! 		lock->exclusive--;
! 	else
! 	{
! 		Assert(lock->shared > 0);
  		lock->shared--;
  	}
  
  	/*
--- 1016,1105 ----
  	}
  	if (i < 0)
  		elog(ERROR, "lock %d is not held", (int) lockid);
+ 	
+ 	mode = held_lwlocks_mode[i];
+ 	
  	num_held_lwlocks--;
! 	for (; i < num_held_lwlocks; i++) {
  		held_lwlocks[i] = held_lwlocks[i + 1];
+ 		held_lwlocks_mode[i] = held_lwlocks_mode[i + 1];
+ 	}
+ 		
+ 	if (mode == LW_SHARED) {
+ #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
+ 		int shared_pre;
  
! 		/* Release my hold on lock */
! 		Assert(lock->exclusive == 0);
! 		shared_pre = LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, MyBackendId);
! 		Assert(shared_pre > 0);
! 		
! 		/* If the count didn't drop to zero (i.e., there are more lockers
! 		 * using the same shared counter partition), we can leave waiting
! 		 * up blocked exclusive locking attempts to them. Note that there
! 		 * may also be shared lockers using a *different* partition, so
! 		 * we're not necessarily the last share lockers, even if we continue.
! 		 * Still, it's an easy optimization, so we got for it
! 		 */
! 		if (shared_pre > 1)
! 			goto lock_released;
! 			
! 		LWLOCK_PART_SHARED_FENCE();
! 		
! 		/* A concurrent exclusive locking attempt does the following
! 		 * three steps
! 		 *   1) Acquire mutex
! 		 *   2) Check shared counter partitions for readers.
! 		 *   3a) If found add proc to wait queue, block, restart at (1)
! 		 *   3b) If not found, set exclusive flag, continue with (4)
! 		 *   4) Enter protected section
! 		 * Assume now that we're that last share lock holder. Then, the
! 		 * fence after the atomic add above ensures that no further such
! 		 * concurrent exclusive locking attempts will proceed to (3a) and
! 		 * thus block. There may be such attempts currently blocking or
! 		 * about to block, but we can recognize those by either wait queue
! 		 * being non-empty or the mutex being taken. Conversely, if we see
! 		 * neither, we may assume that nobody needs to be signalled.
! 		 *
! 		 * Note that if two shared lockers release their lock while
! 		 * an exclusive locking attempt is in progress, both may decide
! 		 * they need to signal here. Taking the mutex below will sort that
! 		 * out, but it's a bit unfortunate that they have to race for the
! 		 * mutex here. Also, taking the mutex will force *other* shared
! 		 * lockers to take the mutex also in their release path.
! 		 * XXX: We may be able to improve that if we could dinstinguish
! 		 * been mutexed held for the purpose of unlocking and mutexes
! 		 * held for the purpose of locking.
! 		 *
! 		 * FIXME: This doesn't work if slock_t is a struct.
! 		 */
! 		
! 		if ((lock->mutex == 0) && (lock->head == NULL))
! 			goto lock_released;
! 			
! 		/* At this point, we don't know if the concurrent exclusive locker
! 		 * has seen on-zero in our shared counter partition in his step
! 		 * (2) or not. We must thus take the mutex and re-check.
! 		 */
! #endif /* LWLOCK_PART_SHARED_OPS_ATOMIC */
! 		
! 		/* Acquire mutex.  Time spent holding mutex should be short! */
! 		SpinLockAcquire(&lock->mutex);
! 		
! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
! 		/* Already decremented the shared counter partition above */
! #else
! 		/* Release my hold on lock */
  		lock->shared--;
+ #endif
+ 	}
+ 	else {
+ 		/* Acquire mutex.  Time spent holding mutex should be short! */
+ 		SpinLockAcquire(&lock->mutex);
+ 		
+ 		/* Release my hold on lock */
+ 		lock->exclusive--;
+ 		Assert(lock->exclusive == 0);
  	}
  
  	/*
*************** LWLockRelease(LWLockId lockid)
*** 610,616 ****
  	head = lock->head;
  	if (head != NULL)
  	{
! 		if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
  		{
  			/*
  			 * Remove the to-be-awakened PGPROCs from the queue.  If the front
--- 1111,1124 ----
  	head = lock->head;
  	if (head != NULL)
  	{
! 		bool		is_shared;
! 		
! 		if (mode == LW_SHARED)
! 			LWLOCK_IS_SHARED(is_shared, lock, lockid);
! 		else
! 			is_shared = false;
! 			
! 		if (lock->exclusive == 0 && !is_shared && lock->releaseOK)
  		{
  			/*
  			 * Remove the to-be-awakened PGPROCs from the queue.  If the front
*************** LWLockRelease(LWLockId lockid)
*** 640,645 ****
--- 1148,1157 ----
  	/* We are done updating shared state of the lock itself. */
  	SpinLockRelease(&lock->mutex);
  
+ #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC
+ lock_released:
+ #endif
+ 
  	TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
  
  	/*
diff --git a/src/include/storage/atomic.h b/src/include/storage/atomic.h
index ...c9216e3 .
*** a/src/include/storage/atomic.h
--- b/src/include/storage/atomic.h
***************
*** 0 ****
--- 1,18 ----
+ #ifndef ATOMIC_H
+ #define ATOMIC_H
+ 
+ #if ( \
+ 	(defined(__GNUC__) && (__GNUC__ * 100 + __GNUC_MINOR__ >= 401 )) || \
+ 	defined(__INTEL_COMPILER) \
+ ) \
+ 
+ #define HAVE_ATOMIC_FETCH_ADD_FENCE_32
+ 
+ static __inline__ uint16 AtomicFetchAddFence32(volatile int* ptr, int delta)
+ {
+ 	return __sync_fetch_and_add(ptr, delta);
+ }
+ 
+ #endif /* GCC >= 4.1 or ICC */
+ 
+ #endif /* ATOMIC_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 438a48d..2feed97 100644
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 20,25 ****
--- 20,28 ----
   * this file include lock.h or bufmgr.h would be backwards.
   */
  
+ /* Number of shared counter partitions per LWLock */
+ #define LWLOCK_LOCK_PARTS 4
+ 
  /* Number of partitions of the shared buffer mapping hashtable */
  #define NUM_BUFFER_PARTITIONS  16