ssi-atomic-commit.patch
text/plain
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 1298,1314 **** FinishPreparedTransaction(const char *gid, bool isCommit)
* callbacks will release the locks the transaction held.
*/
if (isCommit)
RecordTransactionCommitPrepared(xid,
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
hdr->initfileinval);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
hdr->nabortrels, abortrels);
!
! ProcArrayRemove(&gxact->proc, latestXid);
/*
* In case we fail while running the callbacks, mark the gxact invalid so
--- 1298,1318 ----
* callbacks will release the locks the transaction held.
*/
if (isCommit)
+ {
RecordTransactionCommitPrepared(xid,
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
hdr->initfileinval);
+ CommitRemove2PC(&gxact->proc, latestXid);
+ }
else
+ {
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
hdr->nabortrels, abortrels);
! ProcArrayRemove(&gxact->proc, latestXid);
! }
/*
* In case we fail while running the callbacks, mark the gxact invalid so
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 1878,1884 **** CommitTransaction(void)
* must be done _before_ releasing locks we hold and _after_
* RecordTransactionCommit.
*/
! ProcArrayEndTransaction(MyProc, latestXid);
/*
* This is all post-commit cleanup. Note that if an error is raised here,
--- 1878,1884 ----
* must be done _before_ releasing locks we hold and _after_
* RecordTransactionCommit.
*/
! CommitEndTransaction(MyProc, latestXid);
/*
* This is all post-commit cleanup. Note that if an error is raised here,
*** a/src/backend/storage/lmgr/predicate.c
--- b/src/backend/storage/lmgr/predicate.c
***************
*** 120,125 ****
--- 120,128 ----
* - The same lock protects a target, all locks on that target, and
* the linked list of locks on the target..
* - When more than one is needed, acquire in ascending order.
+ & - There are times when this lock must be held concurrently with
+ * ProcArrayLock (acquired in called functions, not directly).
+ * In such cases this lock must be acquired first.
*
* SerializableXactHashLock
* - Protects both PredXact and SerializableXidHash.
***************
*** 158,163 ****
--- 161,168 ----
* PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
* BlockNumber newblkno);
* TransferPredicateLocksToHeapRelation(Relation relation)
+ * CommitEndTransaction(PGPROC *proc, TransactionId latestXid)
+ * CommitRemove2PC(PGPROC *proc, TransactionId latestXid)
* ReleasePredicateLocks(bool isCommit)
*
* conflict detection (may also trigger rollback)
***************
*** 252,261 ****
* The PREPARED flag remains set after commit, so SxactIsCommitted
* implies SxactIsPrepared.
*/
! #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
! #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
! #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
#define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
#define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
#define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
#define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
--- 257,267 ----
* The PREPARED flag remains set after commit, so SxactIsCommitted
* implies SxactIsPrepared.
*/
! #define SxactIsCommitted(sxact) ((sxact)->commitSeqNo != InvalidSerCommitSeqNo)
!
#define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
+ #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
+ #define SxactIsReleased(sxact) (((sxact)->flags & SXACT_FLAG_RELEASED) != 0)
#define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
#define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
#define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
***************
*** 1194,1200 **** InitPredicateLocks(void)
PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
! PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
PredXact->OldCommittedSxact->pid = 0;
}
/* This never changes, so let's keep a local copy. */
--- 1200,1207 ----
PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
! PredXact->OldCommittedSxact->flags =
! SXACT_FLAG_PREPARED | SXACT_FLAG_RELEASED;
PredXact->OldCommittedSxact->pid = 0;
}
/* This never changes, so let's keep a local copy. */
***************
*** 1669,1676 **** RegisterSerializableTransactionInt(Snapshot snapshot)
othersxact != NULL;
othersxact = NextPredXact(othersxact))
{
! if (!SxactIsOnFinishedList(othersxact) &&
! !SxactIsReadOnly(othersxact))
{
SetPossibleUnsafeConflict(sxact, othersxact);
}
--- 1676,1684 ----
othersxact != NULL;
othersxact = NextPredXact(othersxact))
{
! if (!SxactIsCommitted(othersxact)
! && !SxactIsDoomed(othersxact)
! && !SxactIsReadOnly(othersxact))
{
SetPossibleUnsafeConflict(sxact, othersxact);
}
***************
*** 3052,3059 **** SetNewSxactGlobalXmin(void)
for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
{
! if (!SxactIsRolledBack(sxact)
! && !SxactIsCommitted(sxact)
&& sxact != OldCommittedSxact)
{
Assert(sxact->xmin != InvalidTransactionId);
--- 3060,3066 ----
for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
{
! if (!SxactIsReleased(sxact)
&& sxact != OldCommittedSxact)
{
Assert(sxact->xmin != InvalidTransactionId);
***************
*** 3074,3079 **** SetNewSxactGlobalXmin(void)
--- 3081,3167 ----
}
/*
+ * CommitEndTransaction
+ *
+ * For correct SERIALIZABLE semantics, the commitSeqNo must appear to be set
+ * atomically with the work of the transaction becoming visible to other
+ * transactions.
+ *
+ * Since a transaction without an xid has not made any changes which will be
+ * visible to other transactions, it doesn't need such treatment. These
+ * transactions will be flagged as committed and assigned a commitSeqNo in
+ * ReleasePredicateLocks(). This is after the actual commit, but the timing
+ * for these transactions isn't critical. Since they are explicitly or
+ * implicitly READ ONLY, it is the start time of the transaction which is
+ * critical for correct detection of serialization failures; the commit
+ * information is only really needed for cleanup purposes.
+ */
+ void
+ CommitEndTransaction(PGPROC *proc, TransactionId latestXid)
+ {
+ if (MySerializableXact == InvalidSerializableXact
+ || !TransactionIdIsValid(latestXid))
+ {
+ ProcArrayEndTransaction(proc, latestXid);
+ return;
+ }
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ ProcArrayEndTransaction(proc, latestXid);
+
+ MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
+
+ LWLockRelease(SerializableXactHashLock);
+ }
+
+ /*
+ * CommitRemove2PC
+ *
+ * Like CommitEndTransaction, except for a prepared transaction which is being
+ * committed.
+ */
+ void
+ CommitRemove2PC(PGPROC *proc, TransactionId latestXid)
+ {
+ SERIALIZABLEXIDTAG sxidtag;
+ SERIALIZABLEXID *sxid;
+
+ if (!TransactionIdIsValid(latestXid))
+ {
+ ProcArrayRemove(proc, latestXid);
+ return;
+ }
+
+ LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+ sxidtag.xid = latestXid;
+ sxid = (SERIALIZABLEXID *)
+ hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+
+ /*
+ * xid will not be found if it wasn't a serializable transaction. In that
+ * case, make the transaction's work visible while not holding
+ * SerializableXactHashLock.
+ */
+ if (sxid == NULL)
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ProcArrayRemove(proc, latestXid);
+ return;
+ }
+
+ ProcArrayRemove(proc, latestXid);
+
+ MySerializableXact = sxid->myXact;
+ MyXactDidWrite = true; /* conservatively assume that we wrote
+ * something */
+ MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
+
+ LWLockRelease(SerializableXactHashLock);
+ }
+
+ /*
* ReleasePredicateLocks
*
* Releases predicate locks based on completion of the current transaction,
***************
*** 3116,3126 **** ReleasePredicateLocks(bool isCommit)
return;
}
- Assert(!isCommit || SxactIsPrepared(MySerializableXact));
- Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
- Assert(!SxactIsCommitted(MySerializableXact));
- Assert(!SxactIsRolledBack(MySerializableXact));
-
/* may not be serializable during COMMIT/ROLLBACK PREPARED */
if (MySerializableXact->pid != 0)
Assert(IsolationIsSerializable());
--- 3204,3209 ----
***************
*** 3132,3137 **** ReleasePredicateLocks(bool isCommit)
--- 3215,3224 ----
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ Assert(!SxactIsReleased(MySerializableXact));
+ Assert(!isCommit || SxactIsPrepared(MySerializableXact));
+ Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
+
/*
* We don't hold XidGenLock lock here, assuming that TransactionId is
* atomic!
***************
*** 3145,3158 **** ReleasePredicateLocks(bool isCommit)
*/
MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
/*
* If it's not a commit it's a rollback, and we can clear our locks
* immediately.
*/
if (isCommit)
{
! MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
! MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
/* Recognize implicit read-only transaction (commit without write). */
if (!MyXactDidWrite)
MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
--- 3232,3249 ----
*/
MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
+ MySerializableXact->flags |= SXACT_FLAG_RELEASED;
+
/*
* If it's not a commit it's a rollback, and we can clear our locks
* immediately.
*/
if (isCommit)
{
! if (MySerializableXact->commitSeqNo == InvalidSerCommitSeqNo)
! {
! MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
! }
/* Recognize implicit read-only transaction (commit without write). */
if (!MyXactDidWrite)
MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
***************
*** 3165,3178 **** ReleasePredicateLocks(bool isCommit)
* other transactions that conflict with it. Note that this flag might
* already be set, if another backend marked this transaction for
* abort.
- *
- * The ROLLED_BACK flag further indicates that ReleasePredicateLocks
- * has been called, and so the SerializableXact is eligible for
- * cleanup. This means it should not be considered when calculating
- * SxactGlobalXmin.
*/
MySerializableXact->flags |= SXACT_FLAG_DOOMED;
! MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
/*
* If the transaction was previously prepared, but is now failing due
* to a ROLLBACK PREPARED or (hopefully very rare) error after the
--- 3256,3264 ----
* other transactions that conflict with it. Note that this flag might
* already be set, if another backend marked this transaction for
* abort.
*/
MySerializableXact->flags |= SXACT_FLAG_DOOMED;
!
/*
* If the transaction was previously prepared, but is now failing due
* to a ROLLBACK PREPARED or (hopefully very rare) error after the
***************
*** 3260,3274 **** ReleasePredicateLocks(bool isCommit)
&& !SxactIsReadOnly(MySerializableXact)
&& SxactIsCommitted(conflict->sxactIn))
{
! if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
|| conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
}
! if (!isCommit
! || SxactIsCommitted(conflict->sxactIn)
! || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
ReleaseRWConflict(conflict);
conflict = nextConflict;
--- 3346,3358 ----
&& !SxactIsReadOnly(MySerializableXact)
&& SxactIsCommitted(conflict->sxactIn))
{
! if (!SxactHasConflictOut(MySerializableXact)
|| conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
}
! if (!isCommit || SxactIsCommitted(conflict->sxactIn))
ReleaseRWConflict(conflict);
conflict = nextConflict;
***************
*** 3290,3296 **** ReleasePredicateLocks(bool isCommit)
offsetof(RWConflictData, inLink));
if (!isCommit
! || SxactIsCommitted(conflict->sxactOut)
|| SxactIsReadOnly(conflict->sxactOut))
ReleaseRWConflict(conflict);
--- 3374,3380 ----
offsetof(RWConflictData, inLink));
if (!isCommit
! || SxactIsReleased(conflict->sxactOut)
|| SxactIsReadOnly(conflict->sxactOut))
ReleaseRWConflict(conflict);
***************
*** 3556,3562 **** ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
nextConflict;
Assert(sxact != NULL);
! Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
Assert(LWLockHeldByMe(SerializableFinishedListLock));
/*
--- 3640,3646 ----
nextConflict;
Assert(sxact != NULL);
! Assert(SxactIsReleased(sxact));
Assert(LWLockHeldByMe(SerializableFinishedListLock));
/*
***************
*** 4702,4722 **** PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
SERIALIZABLEXID *sxid;
SERIALIZABLEXIDTAG sxidtag;
! sxidtag.xid = xid;
! LWLockAcquire(SerializableXactHashLock, LW_SHARED);
! sxid = (SERIALIZABLEXID *)
! hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
! LWLockRelease(SerializableXactHashLock);
! /* xid will not be found if it wasn't a serializable transaction */
! if (sxid == NULL)
! return;
! /* Release its locks */
! MySerializableXact = sxid->myXact;
! MyXactDidWrite = true; /* conservatively assume that we wrote
* something */
ReleasePredicateLocks(isCommit);
}
--- 4786,4810 ----
SERIALIZABLEXID *sxid;
SERIALIZABLEXIDTAG sxidtag;
! if (MySerializableXact == InvalidSerializableXact)
! {
! sxidtag.xid = xid;
! LWLockAcquire(SerializableXactHashLock, LW_SHARED);
! sxid = (SERIALIZABLEXID *)
! hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
! LWLockRelease(SerializableXactHashLock);
! /* xid will not be found if it wasn't a serializable transaction */
! if (sxid == NULL)
! return;
! /* Release its locks */
! MySerializableXact = sxid->myXact;
! MyXactDidWrite = true; /* conservatively assume that we wrote
* something */
+ }
+
ReleasePredicateLocks(isCommit);
}
*** a/src/include/storage/predicate.h
--- b/src/include/storage/predicate.h
***************
*** 14,19 ****
--- 14,20 ----
#ifndef PREDICATE_H
#define PREDICATE_H
+ #include "storage/proc.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
***************
*** 50,55 **** extern void PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snap
--- 51,58 ----
extern void PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
extern void PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
extern void TransferPredicateLocksToHeapRelation(Relation relation);
+ extern void CommitEndTransaction(PGPROC *proc, TransactionId latestXid);
+ extern void CommitRemove2PC(PGPROC *proc, TransactionId latestXid);
extern void ReleasePredicateLocks(bool isCommit);
/* conflict detection (may also trigger rollback) */
*** a/src/include/storage/predicate_internals.h
--- b/src/include/storage/predicate_internals.h
***************
*** 90,111 **** typedef struct SERIALIZABLEXACT
int pid; /* pid of associated process */
} SERIALIZABLEXACT;
! #define SXACT_FLAG_COMMITTED 0x00000001 /* already committed */
#define SXACT_FLAG_PREPARED 0x00000002 /* about to commit */
! #define SXACT_FLAG_ROLLED_BACK 0x00000004 /* already rolled back */
! #define SXACT_FLAG_DOOMED 0x00000008 /* will roll back */
/*
* The following flag actually means that the flagged transaction has a
* conflict out *to a transaction which committed ahead of it*. It's hard
* to get that into a name of a reasonable length.
*/
! #define SXACT_FLAG_CONFLICT_OUT 0x00000010
! #define SXACT_FLAG_READ_ONLY 0x00000020
! #define SXACT_FLAG_DEFERRABLE_WAITING 0x00000040
! #define SXACT_FLAG_RO_SAFE 0x00000080
! #define SXACT_FLAG_RO_UNSAFE 0x00000100
! #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000200
! #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400
/*
* The following types are used to provide an ad hoc list for holding
--- 90,110 ----
int pid; /* pid of associated process */
} SERIALIZABLEXACT;
! #define SXACT_FLAG_DOOMED 0x00000001 /* will roll back */
#define SXACT_FLAG_PREPARED 0x00000002 /* about to commit */
! #define SXACT_FLAG_RELEASED 0x00000004 /* OK to clean up */
/*
* The following flag actually means that the flagged transaction has a
* conflict out *to a transaction which committed ahead of it*. It's hard
* to get that into a name of a reasonable length.
*/
! #define SXACT_FLAG_CONFLICT_OUT 0x00000008
! #define SXACT_FLAG_READ_ONLY 0x00000010
! #define SXACT_FLAG_DEFERRABLE_WAITING 0x00000020
! #define SXACT_FLAG_RO_SAFE 0x00000040
! #define SXACT_FLAG_RO_UNSAFE 0x00000080
! #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000100
! #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000200
/*
* The following types are used to provide an ad hoc list for holding
*** a/src/test/regress/expected/prepared_xacts.out
--- b/src/test/regress/expected/prepared_xacts.out
***************
*** 134,140 **** SELECT * FROM pxtest1;
-- This should fail, because the two transactions have a write-skew anomaly
INSERT INTO pxtest1 VALUES ('fff');
ERROR: could not serialize access due to read/write dependencies among transactions
! DETAIL: Cancelled on identification as a pivot, during write.
HINT: The transaction might succeed if retried.
PREPARE TRANSACTION 'foo5';
SELECT gid FROM pg_prepared_xacts;
--- 134,140 ----
-- This should fail, because the two transactions have a write-skew anomaly
INSERT INTO pxtest1 VALUES ('fff');
ERROR: could not serialize access due to read/write dependencies among transactions
! DETAIL: Canceled on identification as a pivot, during write.
HINT: The transaction might succeed if retried.
PREPARE TRANSACTION 'foo5';
SELECT gid FROM pg_prepared_xacts;