snapshotnow_consistent.v1.patch
application/octet-stream
Filename: snapshotnow_consistent.v1.patch
Type: application/octet-stream
Part: 0
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index db04e26..d66f434 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -263,6 +263,8 @@ systable_beginscan(Relation heapRelation,
sysscan->heap_rel = heapRelation;
sysscan->irel = irel;
+ InitSnapshotNow(snapshot);
+
if (irel)
{
int i;
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index d81ee7d..e349777 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -347,7 +347,7 @@ HeapTupleSatisfiesNow(HeapTupleHeader tuple, Snapshot snapshot, Buffer buffer)
if (TransactionIdIsCurrentTransactionId(xvac))
return false;
- if (!TransactionIdIsInProgress(xvac))
+ if (!XidIsInProgressForSnapshotNow(xvac, snapshot))
{
if (TransactionIdDidCommit(xvac))
{
@@ -366,7 +366,7 @@ HeapTupleSatisfiesNow(HeapTupleHeader tuple, Snapshot snapshot, Buffer buffer)
if (!TransactionIdIsCurrentTransactionId(xvac))
{
- if (TransactionIdIsInProgress(xvac))
+ if (XidIsInProgressForSnapshotNow(xvac, snapshot))
return false;
if (TransactionIdDidCommit(xvac))
SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
@@ -405,7 +405,7 @@ HeapTupleSatisfiesNow(HeapTupleHeader tuple, Snapshot snapshot, Buffer buffer)
else
return false; /* deleted before scan started */
}
- else if (TransactionIdIsInProgress(HeapTupleHeaderGetXmin(tuple)))
+ else if (XidIsInProgressForSnapshotNow(HeapTupleHeaderGetXmin(tuple), snapshot))
return false;
else if (TransactionIdDidCommit(HeapTupleHeaderGetXmin(tuple)))
SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
@@ -448,7 +448,7 @@ HeapTupleSatisfiesNow(HeapTupleHeader tuple, Snapshot snapshot, Buffer buffer)
return false; /* deleted before scan started */
}
- if (TransactionIdIsInProgress(HeapTupleHeaderGetXmax(tuple)))
+ if (XidIsInProgressForSnapshotNow(HeapTupleHeaderGetXmax(tuple)))
return true;
if (!TransactionIdDidCommit(HeapTupleHeaderGetXmax(tuple)))
@@ -1342,3 +1342,116 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
return false;
}
+
+#define MAX_SIZE_SNAPSHOT_NOW (MaxBackends * 64)
+
+void
+InitSnapshotNow(Snapshot snapshot)
+{
+ if (!IsMVCCSnapshot(snapshot))
+ return;
+
+ snapshot->xcnt = 0;
+ snapshot->xmin = InvalidTransactionId;
+ snapshot->xmax = InvalidTransactionId;
+
+ if (snapshot->xip == NULL)
+ {
+ /*
+ * First call for this snapshot. Snapshot is same size whether or not
+ * we are in recovery, see later comments.
+ */
+ snapshot->xip = (TransactionId *)
+ malloc(MAX_SIZE_SNAPSHOT_NOW * sizeof(TransactionId));
+ if (snapshot->xip == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+}
+
+/*
+ * SnapshotNow scans used to dynamically test TransactionIdIsInProgress()
+ * as the scan progressed. This resulted in sometimes missing a tuple
+ * altogether if a transaction committed during the scan. This function
+ * allows us to remember the state of a running transaction, so we can
+ * ensure we give the same answer each and every time we see this xid.
+ *
+ * Initial tests are similar to TransactionIdIsInProgress()
+ */
+static bool
+XidIsInProgressForSnapshotNow(TransactionId xid, Snapshot snapshot)
+{
+ /*
+ * Don't bother checking a transaction older than RecentXmin; it could not
+ * possibly still be running. (Note: in particular, this guarantees that
+ * we reject InvalidTransactionId, FrozenTransactionId, etc as not
+ * running.)
+ */
+ if (TransactionIdPrecedes(xid, RecentXmin))
+ return false;
+
+ /*
+ * We may have just checked the status of this transaction, so if it is
+ * already known to be completed, we can fall out quickly.
+ */
+ if (TransactionIdIsKnownCompleted(xid))
+ return false;
+
+ /*
+ * Also, we can handle our own transaction (and subtransactions) without
+ * any access to the the xip array.
+ */
+ if (TransactionIdIsCurrentTransactionId(xid))
+ return true;
+
+ /*
+ * Is it within the bounds of the remembered xids?
+ */
+ if (TransactionIdIsValid(snapshot->xmin) &&
+ (TransactionIdFollowsOrEquals(xid, snapshot->xmin) ||
+ TransactionIdPrecedesOrEquals(xid, snapshot->xmax)))
+ {
+ int i;
+
+ /*
+ * If we saw this xid was in progress previously then we give the
+ * same answer the next time this scan sees the xid again.
+ */
+ for (i = 0; i < snapshot->xcnt; i++)
+ if (snapshot->xip[i] == xid)
+ return true;
+ }
+
+ /*
+ * If we haven't seen it before, is it in progress? If so remember.
+ */
+ if (!TransactionIdIsInProgress(xid))
+ return false;
+
+ /*
+ * xid is in progress - do we have space to remember?
+ */
+ if ((snapshot->xcnt + 1) >= MAX_SIZE_SNAPSHOT_NOW)
+ elog(ERROR, "reached limit of SnapshotNow array");
+
+ /*
+ * Remember xid is in progress
+ */
+ snapshot->xip[snapshot->xcnt++] = xid;
+
+ /*
+ * Set or expand upper or lower limits if appropriate.
+ */
+ if (!TransactionIdIsValid(snapshot->xmin))
+ {
+ snapshot->xmin = xid;
+ snapshot->xmax = xid;
+ }
+ else if (TransactionIdPrecedes(xid, snapshot->xmin))
+ snapshot->xmin = xid;
+ else if (TransactionIdFollows(xid, snapshot->xmax))
+ snapshot->xmax = xid;
+
+ return true;
+}
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 859e52a..24b6cc0 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -35,6 +35,14 @@ typedef struct SnapshotData
SnapshotSatisfiesFunc satisfies; /* tuple test function */
/*
+ * Fields used by all snapshot types.
+ */
+ uint32 xcnt; /* # of xact ids in xip[] */
+ TransactionId *xip; /* array of xact IDs in progress */
+ TransactionId xmin; /* all XID < xmin are visible to me */
+ TransactionId xmax; /* all XID >= xmax are invisible to me */
+
+ /*
* The remaining fields are used only for MVCC snapshots, and are normally
* just zeroes in special snapshots. (But xmin and xmax are used
* specially by HeapTupleSatisfiesDirty.)
@@ -44,10 +52,6 @@ typedef struct SnapshotData
* is stored as an optimization to avoid needing to search the XID arrays
* for most tuples.
*/
- TransactionId xmin; /* all XID < xmin are visible to me */
- TransactionId xmax; /* all XID >= xmax are invisible to me */
- uint32 xcnt; /* # of xact ids in xip[] */
- TransactionId *xip; /* array of xact IDs in progress */
/* note: all ids in xip[] satisfy xmin <= xip[i] < xmax */
int32 subxcnt; /* # of xact ids in subxip[] */
TransactionId *subxip; /* array of subxact IDs in progress */