sinval-unlocked.patch
application/octet-stream
Filename: sinval-unlocked.patch
Type: application/octet-stream
Part: 1
*** a/src/backend/storage/ipc/sinvaladt.c
--- b/src/backend/storage/ipc/sinvaladt.c
***************
*** 140,146 **** typedef struct ProcState
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
! int nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
bool signaled; /* backend has been sent catchup signal */
--- 140,146 ----
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
! uint64 nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
bool signaled; /* backend has been sent catchup signal */
***************
*** 167,180 **** typedef struct SISeg
/*
* General state information
*/
! int minMsgNum; /* oldest message still needed */
! int maxMsgNum; /* next message number to be assigned */
! int nextThreshold; /* # of messages to call SICleanupQueue */
int lastBackend; /* index of last active procState entry, +1 */
int maxBackends; /* size of procState array */
- slock_t msgnumLock; /* spinlock protecting maxMsgNum */
-
/*
* Circular buffer holding shared-inval messages
*/
--- 167,178 ----
/*
* General state information
*/
! uint64 minMsgNum; /* oldest message still needed */
! uint64 maxMsgNum; /* next message number to be assigned */
! uint64 nextThreshold; /* # of messages to call SICleanupQueue */
int lastBackend; /* index of last active procState entry, +1 */
int maxBackends; /* size of procState array */
/*
* Circular buffer holding shared-inval messages
*/
***************
*** 237,243 **** CreateSharedInvalidationState(void)
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
shmInvalBuffer->lastBackend = 0;
shmInvalBuffer->maxBackends = MaxBackends;
- SpinLockInit(&shmInvalBuffer->msgnumLock);
/* The buffer[] array is initially all unused, so we need not fill it */
--- 235,240 ----
***************
*** 414,422 **** SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
*/
while (n > 0)
{
! int nthistime = Min(n, WRITE_QUANTUM);
! int numMsgs;
! int max;
n -= nthistime;
--- 411,419 ----
*/
while (n > 0)
{
! uint64 nthistime = Min(n, WRITE_QUANTUM);
! uint64 numMsgs;
! uint64 max;
n -= nthistime;
***************
*** 454,462 **** SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
/* use volatile pointer to prevent code rearrangement */
volatile SISeg *vsegP = segP;
- SpinLockAcquire(&vsegP->msgnumLock);
vsegP->maxMsgNum = max;
- SpinLockRelease(&vsegP->msgnumLock);
}
LWLockRelease(SInvalWriteLock);
--- 451,457 ----
***************
*** 495,517 **** int
SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
{
SISeg *segP;
! ProcState *stateP;
! int max;
int n;
- LWLockAcquire(SInvalReadLock, LW_SHARED);
-
segP = shmInvalBuffer;
stateP = &segP->procState[MyBackendId - 1];
! /* Fetch current value of maxMsgNum using spinlock */
{
/* use volatile pointer to prevent code rearrangement */
volatile SISeg *vsegP = segP;
- SpinLockAcquire(&vsegP->msgnumLock);
max = vsegP->maxMsgNum;
- SpinLockRelease(&vsegP->msgnumLock);
}
if (stateP->resetState)
--- 490,508 ----
SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
{
SISeg *segP;
! volatile ProcState *stateP;
! uint64 max;
int n;
segP = shmInvalBuffer;
stateP = &segP->procState[MyBackendId - 1];
! /* Fetch current value of maxMsgNum */
{
/* use volatile pointer to prevent code rearrangement */
volatile SISeg *vsegP = segP;
max = vsegP->maxMsgNum;
}
if (stateP->resetState)
***************
*** 524,530 **** SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
stateP->nextMsgNum = max;
stateP->resetState = false;
stateP->signaled = false;
- LWLockRelease(SInvalReadLock);
return -1;
}
--- 515,520 ----
***************
*** 544,555 **** SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
}
/*
* Reset our "signaled" flag whenever we have caught up completely.
*/
if (stateP->nextMsgNum >= max)
stateP->signaled = false;
- LWLockRelease(SInvalReadLock);
return n;
}
--- 534,560 ----
}
/*
+ * Recheck whether we've been reset.
+ */
+ if (stateP->resetState)
+ {
+ /*
+ * Force reset. We can say we have dealt with any messages added
+ * since the reset, as well; and that means we should clear the
+ * signaled flag, too.
+ */
+ stateP->nextMsgNum = max;
+ stateP->resetState = false;
+ stateP->signaled = false;
+ return -1;
+ }
+
+ /*
* Reset our "signaled" flag whenever we have caught up completely.
*/
if (stateP->nextMsgNum >= max)
stateP->signaled = false;
return n;
}
***************
*** 573,589 **** void
SICleanupQueue(bool callerHasWriteLock, int minFree)
{
SISeg *segP = shmInvalBuffer;
! int min,
minsig,
lowbound,
! numMsgs,
! i;
ProcState *needSig = NULL;
/* Lock out all writers and readers */
if (!callerHasWriteLock)
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
- LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
/*
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
--- 578,593 ----
SICleanupQueue(bool callerHasWriteLock, int minFree)
{
SISeg *segP = shmInvalBuffer;
! uint64 min,
minsig,
lowbound,
! numMsgs;
! int i;
ProcState *needSig = NULL;
/* Lock out all writers and readers */
if (!callerHasWriteLock)
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
/*
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
***************
*** 630,651 **** SICleanupQueue(bool callerHasWriteLock, int minFree)
segP->minMsgNum = min;
/*
- * When minMsgNum gets really large, decrement all message counters so as
- * to forestall overflow of the counters. This happens seldom enough that
- * folding it into the previous loop would be a loser.
- */
- if (min >= MSGNUMWRAPAROUND)
- {
- segP->minMsgNum -= MSGNUMWRAPAROUND;
- segP->maxMsgNum -= MSGNUMWRAPAROUND;
- for (i = 0; i < segP->lastBackend; i++)
- {
- /* we don't bother skipping inactive entries here */
- segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
- }
- }
-
- /*
* Determine how many messages are still in the queue, and set the
* threshold at which we should repeat SICleanupQueue().
*/
--- 634,639 ----
***************
*** 666,672 **** SICleanupQueue(bool callerHasWriteLock, int minFree)
BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
needSig->signaled = true;
- LWLockRelease(SInvalReadLock);
LWLockRelease(SInvalWriteLock);
elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
--- 654,659 ----
***************
*** 675,681 **** SICleanupQueue(bool callerHasWriteLock, int minFree)
}
else
{
- LWLockRelease(SInvalReadLock);
if (!callerHasWriteLock)
LWLockRelease(SInvalWriteLock);
}
--- 662,667 ----
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 51,57 **** typedef enum LWLockId
OidGenLock,
XidGenLock,
ProcArrayLock,
- SInvalReadLock,
SInvalWriteLock,
WALInsertLock,
WALWriteLock,
--- 51,56 ----