sinval-per-backend-mutex.patch
application/octet-stream
Filename: sinval-per-backend-mutex.patch
Type: application/octet-stream
Part: 0
*** a/src/backend/storage/ipc/sinvaladt.c
--- b/src/backend/storage/ipc/sinvaladt.c
***************
*** 70,105 ****
* that aren't stuck will propagate their interrupts to the next guy.
*
* We would have problems if the MsgNum values overflow an integer, so
! * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
! * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
! * large so that we don't need to do this often. It must be a multiple of
! * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
! * to be moved when we do it.
*
! * Access to the shared sinval array is protected by two locks, SInvalReadLock
! * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
! * authorizes them to modify their own ProcState but not to modify or even
! * look at anyone else's. When we need to perform array-wide updates,
! * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
! * lock out all readers. Writers take SInvalWriteLock (always in exclusive
! * mode) to serialize adding messages to the queue. Note that a writer
! * can operate in parallel with one or more readers, because the writer
! * has no need to touch anyone's ProcState, except in the infrequent cases
! * when SICleanupQueue is needed. The only point of overlap is that
! * the writer wants to change maxMsgNum while readers need to read it.
! * We deal with that by having a spinlock that readers must take for just
! * long enough to read maxMsgNum, while writers take it for just long enough
! * to write maxMsgNum. (The exact rule is that you need the spinlock to
! * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
! * spinlock to write maxMsgNum unless you are holding both locks.)
*
! * Note: since maxMsgNum is an int and hence presumably atomically readable/
! * writable, the spinlock might seem unnecessary. The reason it is needed
! * is to provide a memory barrier: we need to be sure that messages written
! * to the array are actually there before maxMsgNum is increased, and that
! * readers will see that data after fetching maxMsgNum. Multiprocessors
! * that have weak memory-ordering guarantees can fail without the memory
! * barrier instructions that are included in the spinlock sequences.
*/
--- 70,101 ----
* that aren't stuck will propagate their interrupts to the next guy.
*
* We would have problems if the MsgNum values overflow an integer, so
! * we use a 64-bit counter to make sure that doesn't happen. (Of course,
! * technically you can overflow anything... but as we only support 2^64 bytes
! * of write-ahead log over the lifetime of the cluster, assuming that we won't
! * have more sinval messages than that during one postmaster session seems
! * pretty safe. At 10 million messages per second, it would take more than
! * fifty-eight thousand years to overflow.)
*
! * Writers take SInvalWriteLock (always in exclusive mode) to serialize adding
! * messages to the queue. Note that a writer can operate in parallel with one
! * or more readers (who only need to take the spinlocks that protect their own
! * state), because the writer has no need to touch anyone's ProcState, except
! * in the infrequent cases when SICleanupQueue is needed. The only point of
! * overlap is that the writer wants to change maxMsgNum while readers need to
! * read it. We deal with that by having a spinlock that readers must take for
! * just long enough to read maxMsgNum, while writers take it for just long
! * enough to write maxMsgNum. (The exact rule is that you need the spinlock to
! * read maxMsgNum if you are not holding SInvalWriteLock, and you must always
! * hold the spinlock to write maxMsgNum.)
*
! * Note: since maxMsgNum is presumably atomically readable/writable, the
! * spinlock might seem unnecessary. The reason it is needed is to provide a
! * memory barrier: we need to be sure that messages written to the array are
! * actually there before maxMsgNum is increased, and that readers will see
! * that data after fetching maxMsgNum. Multiprocessors that have weak
! * memory-ordering guarantees can fail without the memory barrier instructions
! * that are included in the spinlock sequences.
*/
***************
*** 109,117 ****
* MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
* Must be a power of 2 for speed.
*
- * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
- * Must be a multiple of MAXNUMMESSAGES. Should be large.
- *
* CLEANUP_MIN: the minimum number of messages that must be in the buffer
* before we bother to call SICleanupQueue.
*
--- 105,110 ----
***************
*** 128,134 ****
*/
#define MAXNUMMESSAGES 4096
- #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
--- 121,126 ----
***************
*** 139,146 **** typedef struct ProcState
{
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
! int nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
bool signaled; /* backend has been sent catchup signal */
--- 131,139 ----
{
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
+ slock_t mutex; /* protects nextMsgNum, resetState, signaled */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
! uint64 nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
bool signaled; /* backend has been sent catchup signal */
***************
*** 167,175 **** typedef struct SISeg
/*
* General state information
*/
! int minMsgNum; /* oldest message still needed */
! int maxMsgNum; /* next message number to be assigned */
! int nextThreshold; /* # of messages to call SICleanupQueue */
int lastBackend; /* index of last active procState entry, +1 */
int maxBackends; /* size of procState array */
--- 160,168 ----
/*
* General state information
*/
! uint64 minMsgNum; /* oldest message still needed */
! uint64 maxMsgNum; /* next message number to be assigned */
! uint64 nextThreshold; /* # of messages to call SICleanupQueue */
int lastBackend; /* index of last active procState entry, +1 */
int maxBackends; /* size of procState array */
***************
*** 245,250 **** CreateSharedInvalidationState(void)
--- 238,244 ----
for (i = 0; i < shmInvalBuffer->maxBackends; i++)
{
shmInvalBuffer->procState[i].procPid = 0; /* inactive */
+ SpinLockInit(&shmInvalBuffer->procState[i].mutex);
shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
shmInvalBuffer->procState[i].resetState = false;
shmInvalBuffer->procState[i].signaled = false;
***************
*** 267,274 **** SharedInvalBackendInit(bool sendOnly)
* This can run in parallel with read operations, and for that matter with
* write operations; but not in parallel with additions and removals of
* backends, nor in parallel with SICleanupQueue. It doesn't seem worth
! * having a third lock, so we choose to use SInvalWriteLock to serialize
! * additions/removals.
*/
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
--- 261,267 ----
* This can run in parallel with read operations, and for that matter with
* write operations; but not in parallel with additions and removals of
* backends, nor in parallel with SICleanupQueue. It doesn't seem worth
! * having another lock, so we just use SInvalWriteLock.
*/
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
***************
*** 415,421 **** SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
while (n > 0)
{
int nthistime = Min(n, WRITE_QUANTUM);
! int numMsgs;
int max;
n -= nthistime;
--- 408,414 ----
while (n > 0)
{
int nthistime = Min(n, WRITE_QUANTUM);
! uint64 numMsgs;
int max;
n -= nthistime;
***************
*** 479,495 **** SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
* NB: this can run in parallel with other instances of SIGetDataEntries
* executing on behalf of other backends, since each instance will modify only
* fields of its own backend's ProcState, and no instance will look at fields
! * of other backends' ProcStates. We express this by grabbing SInvalReadLock
! * in shared mode. Note that this is not exactly the normal (read-only)
! * interpretation of a shared lock! Look closely at the interactions before
! * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
*
* NB: this can also run in parallel with SIInsertDataEntries. It is not
* guaranteed that we will return any messages added after the routine is
* entered.
- *
- * Note: we assume that "datasize" is not so large that it might be important
- * to break our hold on SInvalReadLock into segments.
*/
int
SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
--- 472,482 ----
* NB: this can run in parallel with other instances of SIGetDataEntries
* executing on behalf of other backends, since each instance will modify only
* fields of its own backend's ProcState, and no instance will look at fields
! * of other backends' ProcStates.
*
* NB: this can also run in parallel with SIInsertDataEntries. It is not
* guaranteed that we will return any messages added after the routine is
* entered.
*/
int
SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
***************
*** 499,506 **** SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
int max;
int n;
- LWLockAcquire(SInvalReadLock, LW_SHARED);
-
segP = shmInvalBuffer;
stateP = &segP->procState[MyBackendId - 1];
--- 486,491 ----
***************
*** 514,519 **** SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
--- 499,514 ----
SpinLockRelease(&vsegP->msgnumLock);
}
+ /*
+ * It's OK to acquire the mutex after reading maxMsgNum. It's possible
+ * that maxMsgNum will have advanced further in the meantime, but since
+ * we only guarantee that we'll read invalidation entries added prior to
+ * entry into this function, that's no problem. On the flip side, if
+ * any messages have been removed from the queue that we still need to
+ * read, SICleanupQueue will have set stateP->resetState.
+ */
+ SpinLockAcquire(&stateP->mutex);
+
if (stateP->resetState)
{
/*
***************
*** 524,530 **** SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
stateP->nextMsgNum = max;
stateP->resetState = false;
stateP->signaled = false;
! LWLockRelease(SInvalReadLock);
return -1;
}
--- 519,525 ----
stateP->nextMsgNum = max;
stateP->resetState = false;
stateP->signaled = false;
! SpinLockRelease(&stateP->mutex);
return -1;
}
***************
*** 549,555 **** SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
if (stateP->nextMsgNum >= max)
stateP->signaled = false;
! LWLockRelease(SInvalReadLock);
return n;
}
--- 544,550 ----
if (stateP->nextMsgNum >= max)
stateP->signaled = false;
! SpinLockRelease(&stateP->mutex);
return n;
}
***************
*** 573,589 **** void
SICleanupQueue(bool callerHasWriteLock, int minFree)
{
SISeg *segP = shmInvalBuffer;
! int min,
minsig,
lowbound,
! numMsgs,
! i;
ProcState *needSig = NULL;
! /* Lock out all writers and readers */
if (!callerHasWriteLock)
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
- LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
/*
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
--- 568,583 ----
SICleanupQueue(bool callerHasWriteLock, int minFree)
{
SISeg *segP = shmInvalBuffer;
! uint64 min,
minsig,
lowbound,
! numMsgs;
! int i;
ProcState *needSig = NULL;
! /* Lock out writers */
if (!callerHasWriteLock)
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
/*
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
***************
*** 599,649 **** SICleanupQueue(bool callerHasWriteLock, int minFree)
for (i = 0; i < segP->lastBackend; i++)
{
ProcState *stateP = &segP->procState[i];
! int n = stateP->nextMsgNum;
! /* Ignore if inactive or already in reset state */
! if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
continue;
/*
* If we must free some space and this backend is preventing it, force
* him into reset state and then ignore until he catches up.
*/
if (n < lowbound)
{
stateP->resetState = true;
/* no point in signaling him ... */
- continue;
}
!
! /* Track the global minimum nextMsgNum */
! if (n < min)
! min = n;
!
! /* Also see who's furthest back of the unsignaled backends */
! if (n < minsig && !stateP->signaled)
{
! minsig = n;
! needSig = stateP;
}
- }
- segP->minMsgNum = min;
! /*
! * When minMsgNum gets really large, decrement all message counters so as
! * to forestall overflow of the counters. This happens seldom enough that
! * folding it into the previous loop would be a loser.
! */
! if (min >= MSGNUMWRAPAROUND)
! {
! segP->minMsgNum -= MSGNUMWRAPAROUND;
! segP->maxMsgNum -= MSGNUMWRAPAROUND;
! for (i = 0; i < segP->lastBackend; i++)
! {
! /* we don't bother skipping inactive entries here */
! segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
! }
}
/*
* Determine how many messages are still in the queue, and set the
--- 593,642 ----
for (i = 0; i < segP->lastBackend; i++)
{
ProcState *stateP = &segP->procState[i];
! int n;
!
! /* Ignore if inactive or send-only */
! if (stateP->procPid == 0 || stateP->sendOnly)
! continue;
! /* Must acquire mutex before examining any more state */
! SpinLockAcquire(&stateP->mutex);
!
! /* Ignore if already reset */
! if (stateP->resetState)
! {
! SpinLockRelease(&stateP->mutex);
continue;
+ }
/*
* If we must free some space and this backend is preventing it, force
* him into reset state and then ignore until he catches up.
*/
+ n = stateP->nextMsgNum;
if (n < lowbound)
{
stateP->resetState = true;
/* no point in signaling him ... */
}
! else
{
! /* Track the global minimum nextMsgNum */
! if (n < min)
! min = n;
!
! /* Also see who's furthest back of the unsignaled backends */
! if (n < minsig && !stateP->signaled)
! {
! minsig = n;
! needSig = stateP;
! }
}
! /* Done examining state for this backend */
! SpinLockRelease(&stateP->mutex);
}
+ segP->minMsgNum = min;
/*
* Determine how many messages are still in the queue, and set the
***************
*** 665,672 **** SICleanupQueue(bool callerHasWriteLock, int minFree)
pid_t his_pid = needSig->procPid;
BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
needSig->signaled = true;
! LWLockRelease(SInvalReadLock);
LWLockRelease(SInvalWriteLock);
elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
--- 658,666 ----
pid_t his_pid = needSig->procPid;
BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
+ SpinLockAcquire(&needSig->mutex);
needSig->signaled = true;
! SpinLockRelease(&needSig->mutex);
LWLockRelease(SInvalWriteLock);
elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
***************
*** 675,681 **** SICleanupQueue(bool callerHasWriteLock, int minFree)
}
else
{
- LWLockRelease(SInvalReadLock);
if (!callerHasWriteLock)
LWLockRelease(SInvalWriteLock);
}
--- 669,674 ----
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 51,57 **** typedef enum LWLockId
OidGenLock,
XidGenLock,
ProcArrayLock,
! SInvalReadLock,
SInvalWriteLock,
WALInsertLock,
WALWriteLock,
--- 51,57 ----
OidGenLock,
XidGenLock,
ProcArrayLock,
! PlaceholderForSInvalReadLock, /* no longer used */
SInvalWriteLock,
WALInsertLock,
WALWriteLock,