diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 0fe7ce4..9cf6513 100644 *** a/src/backend/storage/lmgr/lwlock.c --- b/src/backend/storage/lmgr/lwlock.c *************** *** 31,36 **** --- 31,38 ---- #include "storage/predicate.h" #include "storage/proc.h" #include "storage/spin.h" + #include "storage/backendid.h" + #include "storage/atomic.h" /* We use the ShmemLock spinlock to protect LWLockAssign */ *************** typedef struct LWLock *** 42,48 **** --- 44,52 ---- slock_t mutex; /* Protects LWLock and queue of PGPROCs */ bool releaseOK; /* T if ok to release waiters */ char exclusive; /* # of exclusive holders (0 or 1) */ + #if LWLOCK_LOCK_PARTS == 1 int shared; /* # of shared holders (0..MaxBackends) */ + #endif PGPROC *head; /* head of list of waiting PGPROCs */ PGPROC *tail; /* tail of list of waiting PGPROCs */ /* tail is undefined when head is NULL */ *************** NON_EXEC_STATIC LWLockPadded *LWLockArra *** 77,82 **** --- 81,275 ---- /* + * Partitioned shared counter case + */ + #if LWLOCK_LOCK_PARTS > 1 + + /* + * Size of one LWLock shared counter partition in bytes. + * Choosen to fill exactly one cache line on common architectures. + */ + #define LWLOCK_PART_SIZE 64 + + /* + * Number of locks per partition. + * If we don't have an atomic increment instruction, we fall back to using a + * per-partition mutex + */ + #ifdef HAVE_ATOMIC_FETCH_ADD_FENCE_32 + + #define LWLOCK_PART_LOCKS (LWLOCK_PART_SIZE / sizeof(int)) + + #else /* !HAVE_ATOMIL_FETCH_ADD_FENCE_32 */ + + #define LWLOCK_PART_LOCKS ((LWLOCK_PART_SIZE - sizeof(slock_t)) / sizeof(int)) + + #endif /* HAVE_ATOMIC_FETCH_ADD_FENCE_32 */ + + /* Number of LWLockPart structs we need for nlocks locks. */ + #define LWLOCK_PARTS(nlocks) (\ + (1 + (nlocks-1) / LWLOCK_PART_LOCKS) * LWLOCK_LOCK_PARTS \ + ) + + /* + * Shared-counter partition for LWLocks. Each Backend increments the + * shared counter in *its* partition when acquiring a LWLock in shared + * mode. We limit the size of one partition to the size of a cache-line, + * and therefore create LWLOCK_LOCK_PARTS * ( / LWLOCK_PART_LOCKS) + * partition, not only LWLOCK_LOCK_PARTS partitions. The shared-partition + * array can thus be thought of as an two-dimensional array, indexed by + * (BackendId % LWLOCK_LOCK_PARTS) and (lockid / LWLOCK_PART_LOCKS). + * + * An int is quite excessive here, but making it any smaller would require + * an overflow check in the shared lock acquisition fast path. That in turn + * would require us to use a compare-and-exchange instead of an atomic + * increment there. So overall, using an int seems benefical, even if it + * means we'll waste a bit of processor cache. + */ + typedef struct LWLockPart + { + #ifndef HAVE_ATOMIC_FETCH_ADD_FENCE_32 + slock_t mutex; + #endif + int shared[LWLOCK_PART_LOCKS]; + } LWLockPart; + + NON_EXEC_STATIC LWLockPart *LWLockPartArray = NULL; + + /* + * The resulting layout of LWLockPartArray is + * + * + * ... + * + * + * ... + * + * ... + * + * where each line contains the shared counter + * partition P for locks N to M, i.e. (N-M+1) individual counter. + */ + #define LWLOCK_PART(lock, lockid, backendid) ( \ + LWLockPartArray + ( \ + ((lockid) / LWLOCK_PART_LOCKS) * LWLOCK_LOCK_PARTS + \ + ((backendid) % LWLOCK_LOCK_PARTS) \ + )) \ + + #ifdef HAVE_ATOMIC_FETCH_ADD_FENCE_32 + + #define LWLOCK_PART_SHARED_OPS_ATOMIC + + #define LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, partid) \ + AtomicFetchAddFence32((part)->shared + \ + (lockid % LWLOCK_PART_LOCKS), \ + 1) + + #define LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, partid) \ + AtomicFetchAddFence32((part)->shared + \ + (lockid % LWLOCK_PART_LOCKS), \ + -1) + + /* Taken care of by AtomicFetchAddFence32() */ + #define LWLOCK_PART_SHARED_FENCE() + + #else /* !HAVE_ATOMIC_FETCH_ADD_FENCE_32 */ + + #define LWLOCK_PART_SHARED_OPS_ATOMIC + + #define LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, partid) \ + xadd_atomic(&((part)->mutex), \ + (part)->shared + (lockid % LWLOCK_PART_LOCKS), \ + 1) + + #define LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, partid) \ + xadd_atomic(&((part)->mutex), \ + (part)->shared + (lockid % LWLOCK_PART_LOCKS), \ + -1) + + /* *Not* taken care of by xadd_atomic() */ + #define LWLOCK_PART_SHARED_FENCE() fence() + + inline static int xadd_atomic( + volatile slock_t* mutex, + volatile int* ptr, + int delta + ) { + int previous; + + /* Atomic add, returning the old value */ + SpinLockAcquire(mutex); + previous = *ptr; + *ptr += delta; + SpinLockRelease(mutex); + + return previous; + } + + inline static void fence() + { + slock_t fence; + + /* For the lack of a better method ... */ + SpinLockInit(&fence); + SpinLockAcquire(&fence); + SpinLockRelease(&fence); + } + + #endif /* HAVE_ATOMIC_FETCH_ADD_FENCE_32 */ + + #define LWLOCK_IS_SHARED(result, lock, lockid) do { \ + int i; \ + result = false; \ + for(i=0; i < LWLOCK_LOCK_PARTS; ++i) { \ + volatile LWLockPart *p = LWLOCK_PART(lock, lockid, i); \ + if (p->shared[lockid % LWLOCK_PART_LOCKS] == 0) \ + continue; \ + result = true; \ + break; \ + } \ + } while(0) + + + #else /* LWLOCK_LOCK_PARTS == 1 */ + + /* + * Unpartitioned shared counter case + */ + + #ifdef HAVE_ATOMIC_FETCH_ADD_FENCE_32 + + #define LWLOCK_PART_SHARED_OPS_ATOMIC + + #define LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, partid) \ + AtomicFetchAddFence32(&(lock)->shared, \ + 1) + + #define LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, partid) \ + AtomicFetchAddFence32(&(lock)->shared, \ + -1) + + /* Taken care of by AtomicFetchAddFence32() */ + #define LWLOCK_PART_SHARED_FENCE() + + #else /* !HAVE_ATOMIC_FETCH_ADD_FENCE_32 */ + + /* We could of course provide that if we used the lock's mutex to make + * the action atomic. But then we'd take the lock's mutex twice during + * LWLock Acquisition. So instead, we special-case the situation of + * no atomic xadd and one shared counter partition in the implementations + * of lock acquisition and release + */ + #undef LWLOCK_PART_SHARED_OPS_ATOMIC + + #endif /* HAVE_ATOMIC_FETCH_ADD_FENCE_32 */ + + #define LWLOCK_IS_SHARED(result, lock, lockid) \ + result = ((lock)->shared != 0) + + #endif /* LWLOCK_LOCK_PARTS == 1 */ + + /* * We use this structure to keep track of locked LWLocks for release * during error recovery. The maximum size could be determined at runtime * if necessary, but it seems unlikely that more than a few locks could *************** NON_EXEC_STATIC LWLockPadded *LWLockArra *** 86,91 **** --- 279,285 ---- static int num_held_lwlocks = 0; static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; + static LWLockMode held_lwlocks_mode[MAX_SIMUL_LWLOCKS]; static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; *************** LWLockShmemSize(void) *** 225,233 **** /* Space for the LWLock array. */ size = mul_size(numLocks, sizeof(LWLockPadded)); ! ! /* Space for dynamic allocation counter, plus room for alignment. */ size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE); return size; } --- 419,438 ---- /* Space for the LWLock array. */ size = mul_size(numLocks, sizeof(LWLockPadded)); ! ! /* Space for dynamic allocation counter, ! * plus room for alignment of LWLockArray. ! */ size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE); + + #if LWLOCK_PART_SIZE > 1 + /* Space for the LWLockPart array */ + size = add_size(size, mul_size(LWLOCK_PARTS(numLocks), + sizeof(LWLockPart))); + + /* Room for alignment of LWLockPartArray */ + size = add_size(size, sizeof(LWLockPart)); + #endif return size; } *************** CreateLWLocks(void) *** 242,251 **** --- 447,462 ---- int numLocks = NumLWLocks(); Size spaceLocks = LWLockShmemSize(); LWLockPadded *lock; + #if LWLOCK_PART_SIZE > 1 + LWLockPart *part; + #endif int *LWLockCounter; char *ptr; int id; + /* Ensure that we didn't mess up the computation of LWLOCK_PART_LOCKS */ + Assert(sizeof(LWLockPart) == LWLOCK_PART_SIZE); + /* Allocate space */ ptr = (char *) ShmemAlloc(spaceLocks); *************** CreateLWLocks(void) *** 256,275 **** --- 467,507 ---- ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE; LWLockArray = (LWLockPadded *) ptr; + ptr += sizeof(LWLockPadded) * numLocks; + + #if LWLOCK_LOCK_PARTS > 1 + /* Ensure desired alignment of LWLockPart array */ + ptr += LWLOCK_PART_SIZE - ((uintptr_t) ptr) % LWLOCK_PART_SIZE; + + LWLockPartArray = (LWLockPart *) ptr; + ptr += sizeof(LWLockPart) * LWLOCK_PARTS(numLocks); + #endif /* * Initialize all LWLocks to "unlocked" state */ + for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++) { SpinLockInit(&lock->lock.mutex); lock->lock.releaseOK = true; lock->lock.exclusive = 0; + #if LWLOCK_LOCK_PARTS == 1 lock->lock.shared = 0; + #endif lock->lock.head = NULL; lock->lock.tail = NULL; } + #if LWLOCK_LOCK_PARTS > 1 + for(id = 0, part = LWLockPartArray; id < LWLOCK_PARTS(numLocks); id++, part++) { + #ifndef LWLOCK_PART_SHARED_OPS_ATOMIC + SpinLockInit(&part->mutex); + #endif + memset((char *) part->shared, 0, sizeof(int) * LWLOCK_PART_LOCKS); + } + #endif + /* * Initialize the dynamic-allocation counter, which is stored just before * the first LWLock. *************** void *** 320,325 **** --- 552,560 ---- LWLockAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = &(LWLockArray[lockid].lock); + #if LWLOCK_LOCK_PARTS > 1 + volatile LWLockPart *part = LWLOCK_PART(lock, lockid, MyBackendId); + #endif PGPROC *proc = MyProc; bool retry = false; int extraWaits = 0; *************** LWLockAcquire(LWLockId lockid, LWLockMod *** 356,362 **** /* Ensure we will have room to remember the lock */ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) elog(ERROR, "too many LWLocks taken"); ! /* * Lock out cancel/die interrupts until we exit the code section protected * by the LWLock. This ensures that interrupts will not interfere with --- 591,597 ---- /* Ensure we will have room to remember the lock */ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) elog(ERROR, "too many LWLocks taken"); ! /* * Lock out cancel/die interrupts until we exit the code section protected * by the LWLock. This ensures that interrupts will not interfere with *************** LWLockAcquire(LWLockId lockid, LWLockMod *** 383,423 **** for (;;) { bool mustwait; ! ! /* Acquire mutex. Time spent holding mutex should be short! */ ! SpinLockAcquire(&lock->mutex); ! ! /* If retrying, allow LWLockRelease to release waiters again */ ! if (retry) ! lock->releaseOK = true; ! ! /* If I can get the lock, do so quickly. */ ! if (mode == LW_EXCLUSIVE) { ! if (lock->exclusive == 0 && lock->shared == 0) { ! lock->exclusive++; mustwait = false; } else mustwait = true; } else { if (lock->exclusive == 0) { ! lock->shared++; ! mustwait = false; } else mustwait = true; } ! if (!mustwait) break; /* got the lock */ ! /* ! * Add myself to wait queue. * * If we don't have a PGPROC structure, there's no way to wait. This * should never occur, since MyProc should only be null during shared --- 618,759 ---- for (;;) { bool mustwait; ! ! if (mode == LW_SHARED) { ! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC ! /* Increment shared counter partition. If there's no contention, ! * this is sufficient to take the lock ! */ ! LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, MyBackendId); ! LWLOCK_PART_SHARED_FENCE(); ! ! /* A concurrent exclusive locking attempt does the following ! * three steps ! * 1) Acquire mutex ! * 2) Check shared counter partitions for readers. ! * 3a) If found add proc to wait queue, block, restart at (1) ! * 3b) If not found, set exclusive flag, continue with (4) ! * 4) Enter protected section ! * The fence after the atomic add above ensures that no further ! * such attempt can proceed to (3b) or beyond. There may be ! * pre-existing exclusive locking attempts at step (3b) or beyond, ! * but we can recognize those by either the mutex being taken, or ! * the exclusive flag being set. Conversely, if we see neither, we ! * may proceed and enter the protected section. ! * ! * FIXME: This doesn't work if slock_t is a struct or doesn't ! * use 0 for state "unlocked". ! */ ! ! if ((lock->mutex == 0) && (lock->exclusive == 0)) { ! /* If retrying, allow LWLockRelease to release waiters again. ! * Usually this happens after we acquired the mutex, but if ! * we skip that, we still need to set releaseOK. ! * ! * Acquiring the mutex here is not really an option - if many ! * reader are awoken simultaneously by an exclusive unlock, ! * that would be a source of considerable contention. ! * ! * Fotunately, this is safe even without the mutex. First, ! * there actually cannot be any non-fast path unlocking ! * attempt in progress, because we'd then either still see ! * the exclusive flag set or the mutex being taken. And ! * even if there was, and such an attempt cleared the flag ! * immediately after we set it, it'd also wake up some waiter ! * who'd then re-set the flag. ! * ! * The only reason to do this here, and not directly ! * after returning from PGSemaphoreLock(), is that it seems ! * benefical to make SpinLockAcquire() the first thing to ! * touch the lock if possible, in case we acquire the spin ! * lock at all. That way, the cache line doesn't go through ! * a possible shared state, but instead directly to exclusive. ! * On Opterons at least, there seems to be a difference, c.f. ! * the comment above tas() for x86_64 in s_lock.h ! */ ! if (retry && !lock->releaseOK) ! lock->releaseOK = true; ! ! goto lock_acquired; ! } ! ! /* At this point, we don't know if the concurrent exclusive locker ! * has proceeded to (3b) or blocked. We must take the mutex and ! * re-check ! */ ! #endif /* LWLOCK_PART_SHARED_OPS_ATOMIC */ ! ! /* Acquire mutex. Time spent holding mutex should be short! */ ! SpinLockAcquire(&lock->mutex); ! ! if (lock->exclusive == 0) { ! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC ! /* Already incremented the shared counter partition above */ ! #else ! lock->shared++; ! #endif mustwait = false; } else + { + #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC + /* Must undo shared counter partition increment. Note that + * we *need* to do that while holding the mutex. Otherwise, + * the exclusive lock could be released and attempted to be + * re-acquired before we undo the increment. That attempt + * would then block, even though there'd be no lock holder + * left + */ + LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, MyBackendId); + #endif mustwait = true; + } } else { + /* Step (1). Acquire mutex. Time spent holding mutex should be + * short! + */ + SpinLockAcquire(&lock->mutex); + if (lock->exclusive == 0) { ! /* Step (2). Check for shared lockers. This surely happens ! * after (1), otherwise SpinLockAcquire() is broken. Lock ! * acquire semantics demand that no load must be re-ordered ! * from after a lock acquisition to before, for obvious ! * reasons. ! */ ! ! LWLOCK_IS_SHARED(mustwait, lock, lockid); ! ! if (!mustwait) { ! /* Step (3a). Set exclusive flag. This surely happens ! * after (2) because it depends on the result of (2), ! * no matter how much reordering is going on here. ! */ ! lock->exclusive++; ! } } else mustwait = true; } ! ! /* If retrying, allow LWLockRelease to release waiters again. ! * This is also separately done in the LW_SHARED early exit case ! * above, and in contrast to there we don't hold the mutex there. ! * See the comment there for why this is safe ! */ ! if (retry) ! lock->releaseOK = true; ! if (!mustwait) break; /* got the lock */ ! /* ! * Step (3b). Add myself to wait queue. * * If we don't have a PGPROC structure, there's no way to wait. This * should never occur, since MyProc should only be null during shared *************** LWLockAcquire(LWLockId lockid, LWLockMod *** 477,487 **** /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode); /* Add lock to list of locks held by this backend */ ! held_lwlocks[num_held_lwlocks++] = lockid; /* * Fix the process wait semaphore's count for any absorbed wakeups. --- 813,835 ---- /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); + + /* Step 4. Enter protected section. This surely happens after (3), + * this time because lock release semantics demand that no store + * must be moved from before a lock release to after the release, + * again for obvious reasons + */ + + #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC + lock_acquired: + #endif TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode); /* Add lock to list of locks held by this backend */ ! held_lwlocks[num_held_lwlocks] = lockid; ! held_lwlocks_mode[num_held_lwlocks] = mode; ! ++num_held_lwlocks; /* * Fix the process wait semaphore's count for any absorbed wakeups. *************** bool *** 501,506 **** --- 849,857 ---- LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = &(LWLockArray[lockid].lock); + #if LWLOCK_LOCK_PARTS > 1 + volatile LWLockPart *part = LWLOCK_PART(lock, lockid, MyBackendId); + #endif bool mustwait; PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock); *************** LWLockConditionalAcquire(LWLockId lockid *** 516,541 **** */ HOLD_INTERRUPTS(); ! /* Acquire mutex. Time spent holding mutex should be short! */ ! SpinLockAcquire(&lock->mutex); ! ! /* If I can get the lock, do so quickly. */ ! if (mode == LW_EXCLUSIVE) { ! if (lock->exclusive == 0 && lock->shared == 0) { ! lock->exclusive++; mustwait = false; } else mustwait = true; } else { if (lock->exclusive == 0) { ! lock->shared++; ! mustwait = false; } else mustwait = true; --- 867,960 ---- */ HOLD_INTERRUPTS(); ! if (mode == LW_SHARED) { ! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC ! /* Increment shared counter partition. If there's no contention, ! * this is sufficient to take the lock ! */ ! LWLOCK_PART_SHARED_POSTINC_ATOMIC(lock, lockid, part, MyBackendId); ! LWLOCK_PART_SHARED_FENCE(); ! ! /* A concurrent exclusive locking attempt does the following ! * three steps ! * 1) Acquire mutex ! * 2) Check shared counter partitions for readers. ! * 3a) If found add proc to wait queue, block, restart at (1) ! * 3b) If not found, set exclusive flag, continue with (4) ! * 4) Enter protected section ! * The fence after the atomic add above ensures that no further ! * such attempt can proceed to (3b) or beyond. There may be ! * pre-existing exclusive locking attempts at step (3b) or beyond, ! * but we can recognize those by either the mutex being taken, or ! * the exclusive flag being set. Conversely, if we see neither, we ! * may proceed and enter the protected section. ! * ! * FIXME: This doesn't work if slock_t is a struct or doesn't ! * use 0 for state "unlocked". ! */ ! ! if ((lock->mutex == 0) && (lock->exclusive == 0)) ! goto lock_acquired; ! ! /* At this point, we don't know if the concurrent exclusive locker ! * has proceeded to (3b) or blocked. We must take the mutex and ! * re-check ! */ ! #endif /* LWLOCK_PART_SHARED_OPS_ATOMIC */ ! ! /* Acquire mutex. Time spent holding mutex should be short! */ ! SpinLockAcquire(&lock->mutex); ! ! if (lock->exclusive == 0) { ! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC ! /* Already incremented the shared counter partition above */ ! #else ! lock->shared++; ! #endif mustwait = false; } else + { + #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC + /* Must undo shared counter partition increment. Note that + * we *need* to do that while holding the mutex. Otherwise, + * the exclusive lock could be released and attempted to be + * re-acquired before we undo the increment. That attempt + * would then block, even though there'd be no lock holder + * left + */ + LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, MyBackendId); + #endif mustwait = true; + } } else { + /* Step (1). Acquire mutex. Time spent holding mutex should be + * short! + */ + SpinLockAcquire(&lock->mutex); + if (lock->exclusive == 0) { ! /* Step (2). Check for shared lockers. This surely happens ! * after (1), otherwise SpinLockAcquire() is broken. Lock ! * acquire semantics demand that no load must be re-ordered ! * from after a lock acquisition to before, for obvious ! * reasons. ! */ ! ! LWLOCK_IS_SHARED(mustwait, lock, lockid); ! ! if (!mustwait) { ! /* Step (3a). Set exclusive flag. This surely happens ! * after (2) because it depends on the result of (2), ! * no matter how much reordering is going on here. ! */ ! lock->exclusive++; ! } } else mustwait = true; *************** LWLockConditionalAcquire(LWLockId lockid *** 550,564 **** RESUME_INTERRUPTS(); LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed"); TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode); ! } ! else ! { ! /* Add lock to list of locks held by this backend */ ! held_lwlocks[num_held_lwlocks++] = lockid; ! TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode); } ! return !mustwait; } /* --- 969,990 ---- RESUME_INTERRUPTS(); LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed"); TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode); ! ! return false; } ! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC ! lock_acquired: ! #endif ! ! TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode); ! ! /* Add lock to list of locks held by this backend */ ! held_lwlocks[num_held_lwlocks] = lockid; ! held_lwlocks_mode[num_held_lwlocks] = mode; ! ++num_held_lwlocks; ! ! return true; } /* *************** void *** 568,575 **** LWLockRelease(LWLockId lockid) { volatile LWLock *lock = &(LWLockArray[lockid].lock); ! PGPROC *head; PGPROC *proc; int i; PRINT_LWDEBUG("LWLockRelease", lockid, lock); --- 994,1005 ---- LWLockRelease(LWLockId lockid) { volatile LWLock *lock = &(LWLockArray[lockid].lock); ! #if LWLOCK_LOCK_PARTS > 1 ! volatile LWLockPart *part = LWLOCK_PART(lock, lockid, MyBackendId); ! #endif ! PGPROC *head = NULL; PGPROC *proc; + LWLockMode mode; int i; PRINT_LWDEBUG("LWLockRelease", lockid, lock); *************** LWLockRelease(LWLockId lockid) *** 578,583 **** --- 1008,1014 ---- * Remove lock from list of locks held. Usually, but not always, it will * be the latest-acquired lock; so search array backwards. */ + for (i = num_held_lwlocks; --i >= 0;) { if (lockid == held_lwlocks[i]) *************** LWLockRelease(LWLockId lockid) *** 585,604 **** } if (i < 0) elog(ERROR, "lock %d is not held", (int) lockid); num_held_lwlocks--; ! for (; i < num_held_lwlocks; i++) held_lwlocks[i] = held_lwlocks[i + 1]; ! /* Acquire mutex. Time spent holding mutex should be short! */ ! SpinLockAcquire(&lock->mutex); ! ! /* Release my hold on lock */ ! if (lock->exclusive > 0) ! lock->exclusive--; ! else ! { ! Assert(lock->shared > 0); lock->shared--; } /* --- 1016,1105 ---- } if (i < 0) elog(ERROR, "lock %d is not held", (int) lockid); + + mode = held_lwlocks_mode[i]; + num_held_lwlocks--; ! for (; i < num_held_lwlocks; i++) { held_lwlocks[i] = held_lwlocks[i + 1]; + held_lwlocks_mode[i] = held_lwlocks_mode[i + 1]; + } + + if (mode == LW_SHARED) { + #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC + int shared_pre; ! /* Release my hold on lock */ ! Assert(lock->exclusive == 0); ! shared_pre = LWLOCK_PART_SHARED_POSTDEC_ATOMIC(lock, lockid, part, MyBackendId); ! Assert(shared_pre > 0); ! ! /* If the count didn't drop to zero (i.e., there are more lockers ! * using the same shared counter partition), we can leave waiting ! * up blocked exclusive locking attempts to them. Note that there ! * may also be shared lockers using a *different* partition, so ! * we're not necessarily the last share lockers, even if we continue. ! * Still, it's an easy optimization, so we got for it ! */ ! if (shared_pre > 1) ! goto lock_released; ! ! LWLOCK_PART_SHARED_FENCE(); ! ! /* A concurrent exclusive locking attempt does the following ! * three steps ! * 1) Acquire mutex ! * 2) Check shared counter partitions for readers. ! * 3a) If found add proc to wait queue, block, restart at (1) ! * 3b) If not found, set exclusive flag, continue with (4) ! * 4) Enter protected section ! * Assume now that we're that last share lock holder. Then, the ! * fence after the atomic add above ensures that no further such ! * concurrent exclusive locking attempts will proceed to (3a) and ! * thus block. There may be such attempts currently blocking or ! * about to block, but we can recognize those by either wait queue ! * being non-empty or the mutex being taken. Conversely, if we see ! * neither, we may assume that nobody needs to be signalled. ! * ! * Note that if two shared lockers release their lock while ! * an exclusive locking attempt is in progress, both may decide ! * they need to signal here. Taking the mutex below will sort that ! * out, but it's a bit unfortunate that they have to race for the ! * mutex here. Also, taking the mutex will force *other* shared ! * lockers to take the mutex also in their release path. ! * XXX: We may be able to improve that if we could dinstinguish ! * been mutexed held for the purpose of unlocking and mutexes ! * held for the purpose of locking. ! * ! * FIXME: This doesn't work if slock_t is a struct. ! */ ! ! if ((lock->mutex == 0) && (lock->head == NULL)) ! goto lock_released; ! ! /* At this point, we don't know if the concurrent exclusive locker ! * has seen on-zero in our shared counter partition in his step ! * (2) or not. We must thus take the mutex and re-check. ! */ ! #endif /* LWLOCK_PART_SHARED_OPS_ATOMIC */ ! ! /* Acquire mutex. Time spent holding mutex should be short! */ ! SpinLockAcquire(&lock->mutex); ! ! #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC ! /* Already decremented the shared counter partition above */ ! #else ! /* Release my hold on lock */ lock->shared--; + #endif + } + else { + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&lock->mutex); + + /* Release my hold on lock */ + lock->exclusive--; + Assert(lock->exclusive == 0); } /* *************** LWLockRelease(LWLockId lockid) *** 610,616 **** head = lock->head; if (head != NULL) { ! if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK) { /* * Remove the to-be-awakened PGPROCs from the queue. If the front --- 1111,1124 ---- head = lock->head; if (head != NULL) { ! bool is_shared; ! ! if (mode == LW_SHARED) ! LWLOCK_IS_SHARED(is_shared, lock, lockid); ! else ! is_shared = false; ! ! if (lock->exclusive == 0 && !is_shared && lock->releaseOK) { /* * Remove the to-be-awakened PGPROCs from the queue. If the front *************** LWLockRelease(LWLockId lockid) *** 640,645 **** --- 1148,1157 ---- /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); + #ifdef LWLOCK_PART_SHARED_OPS_ATOMIC + lock_released: + #endif + TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid); /* diff --git a/src/include/storage/atomic.h b/src/include/storage/atomic.h index ...c9216e3 . *** a/src/include/storage/atomic.h --- b/src/include/storage/atomic.h *************** *** 0 **** --- 1,18 ---- + #ifndef ATOMIC_H + #define ATOMIC_H + + #if ( \ + (defined(__GNUC__) && (__GNUC__ * 100 + __GNUC_MINOR__ >= 401 )) || \ + defined(__INTEL_COMPILER) \ + ) \ + + #define HAVE_ATOMIC_FETCH_ADD_FENCE_32 + + static __inline__ uint16 AtomicFetchAddFence32(volatile int* ptr, int delta) + { + return __sync_fetch_and_add(ptr, delta); + } + + #endif /* GCC >= 4.1 or ICC */ + + #endif /* ATOMIC_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 438a48d..2feed97 100644 *** a/src/include/storage/lwlock.h --- b/src/include/storage/lwlock.h *************** *** 20,25 **** --- 20,28 ---- * this file include lock.h or bufmgr.h would be backwards. */ + /* Number of shared counter partitions per LWLock */ + #define LWLOCK_LOCK_PARTS 4 + /* Number of partitions of the shared buffer mapping hashtable */ #define NUM_BUFFER_PARTITIONS 16