ssi-atomic-commit.patch

text/plain

Filename: ssi-atomic-commit.patch
Type: text/plain
Part: 0
Message: SSI atomic commit
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 1298,1314 **** FinishPreparedTransaction(const char *gid, bool isCommit)
  	 * callbacks will release the locks the transaction held.
  	 */
  	if (isCommit)
  		RecordTransactionCommitPrepared(xid,
  										hdr->nsubxacts, children,
  										hdr->ncommitrels, commitrels,
  										hdr->ninvalmsgs, invalmsgs,
  										hdr->initfileinval);
  	else
  		RecordTransactionAbortPrepared(xid,
  									   hdr->nsubxacts, children,
  									   hdr->nabortrels, abortrels);
! 
! 	ProcArrayRemove(&gxact->proc, latestXid);
  
  	/*
  	 * In case we fail while running the callbacks, mark the gxact invalid so
--- 1298,1318 ----
  	 * callbacks will release the locks the transaction held.
  	 */
  	if (isCommit)
+ 	{
  		RecordTransactionCommitPrepared(xid,
  										hdr->nsubxacts, children,
  										hdr->ncommitrels, commitrels,
  										hdr->ninvalmsgs, invalmsgs,
  										hdr->initfileinval);
+ 		CommitRemove2PC(&gxact->proc, latestXid);
+ 	}
  	else
+ 	{
  		RecordTransactionAbortPrepared(xid,
  									   hdr->nsubxacts, children,
  									   hdr->nabortrels, abortrels);
! 		ProcArrayRemove(&gxact->proc, latestXid);
! 	}
  
  	/*
  	 * In case we fail while running the callbacks, mark the gxact invalid so
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 1878,1884 **** CommitTransaction(void)
  	 * must be done _before_ releasing locks we hold and _after_
  	 * RecordTransactionCommit.
  	 */
! 	ProcArrayEndTransaction(MyProc, latestXid);
  
  	/*
  	 * This is all post-commit cleanup.  Note that if an error is raised here,
--- 1878,1884 ----
  	 * must be done _before_ releasing locks we hold and _after_
  	 * RecordTransactionCommit.
  	 */
! 	CommitEndTransaction(MyProc, latestXid);
  
  	/*
  	 * This is all post-commit cleanup.  Note that if an error is raised here,
*** a/src/backend/storage/lmgr/predicate.c
--- b/src/backend/storage/lmgr/predicate.c
***************
*** 120,125 ****
--- 120,128 ----
   *		- The same lock protects a target, all locks on that target, and
   *			the linked list of locks on the target..
   *		- When more than one is needed, acquire in ascending order.
+  &		- There are times when this lock must be held concurrently with
+  *			ProcArrayLock (acquired in called functions, not directly).
+  *			In such cases this lock must be acquired first.
   *
   *	SerializableXactHashLock
   *		- Protects both PredXact and SerializableXidHash.
***************
*** 158,163 ****
--- 161,168 ----
   *		PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
   *								 BlockNumber newblkno);
   *		TransferPredicateLocksToHeapRelation(Relation relation)
+  *		CommitEndTransaction(PGPROC *proc, TransactionId latestXid)
+  *		CommitRemove2PC(PGPROC *proc, TransactionId latestXid)
   *		ReleasePredicateLocks(bool isCommit)
   *
   * conflict detection (may also trigger rollback)
***************
*** 252,261 ****
   * The PREPARED flag remains set after commit, so SxactIsCommitted
   * implies SxactIsPrepared.
   */
! #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
! #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
! #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
  #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
  #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
  #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
  #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
--- 257,267 ----
   * The PREPARED flag remains set after commit, so SxactIsCommitted
   * implies SxactIsPrepared.
   */
! #define SxactIsCommitted(sxact) ((sxact)->commitSeqNo != InvalidSerCommitSeqNo)
! 
  #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
+ #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
+ #define SxactIsReleased(sxact) (((sxact)->flags & SXACT_FLAG_RELEASED) != 0)
  #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
  #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
  #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
***************
*** 1194,1200 **** InitPredicateLocks(void)
  		PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
  		PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
  		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
! 		PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
  		PredXact->OldCommittedSxact->pid = 0;
  	}
  	/* This never changes, so let's keep a local copy. */
--- 1200,1207 ----
  		PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
  		PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
  		PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
! 		PredXact->OldCommittedSxact->flags =
! 			SXACT_FLAG_PREPARED | SXACT_FLAG_RELEASED;
  		PredXact->OldCommittedSxact->pid = 0;
  	}
  	/* This never changes, so let's keep a local copy. */
***************
*** 1669,1676 **** RegisterSerializableTransactionInt(Snapshot snapshot)
  			 othersxact != NULL;
  			 othersxact = NextPredXact(othersxact))
  		{
! 			if (!SxactIsOnFinishedList(othersxact) &&
! 				!SxactIsReadOnly(othersxact))
  			{
  				SetPossibleUnsafeConflict(sxact, othersxact);
  			}
--- 1676,1684 ----
  			 othersxact != NULL;
  			 othersxact = NextPredXact(othersxact))
  		{
! 			if (!SxactIsCommitted(othersxact)
! 				&& !SxactIsDoomed(othersxact)
! 				&& !SxactIsReadOnly(othersxact))
  			{
  				SetPossibleUnsafeConflict(sxact, othersxact);
  			}
***************
*** 3052,3059 **** SetNewSxactGlobalXmin(void)
  
  	for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
  	{
! 		if (!SxactIsRolledBack(sxact)
! 			&& !SxactIsCommitted(sxact)
  			&& sxact != OldCommittedSxact)
  		{
  			Assert(sxact->xmin != InvalidTransactionId);
--- 3060,3066 ----
  
  	for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
  	{
! 		if (!SxactIsReleased(sxact)
  			&& sxact != OldCommittedSxact)
  		{
  			Assert(sxact->xmin != InvalidTransactionId);
***************
*** 3074,3079 **** SetNewSxactGlobalXmin(void)
--- 3081,3167 ----
  }
  
  /*
+  *		CommitEndTransaction
+  *
+  * For correct SERIALIZABLE semantics, the commitSeqNo must appear to be set
+  * atomically with the work of the transaction becoming visible to other
+  * transactions.
+  *
+  * Since a transaction without an xid has not made any changes which will be
+  * visible to other transactions, it doesn't need such treatment.  These
+  * transactions will be flagged as committed and assigned a commitSeqNo in
+  * ReleasePredicateLocks().  This is after the actual commit, but the timing
+  * for these transactions isn't critical.  Since they are explicitly or
+  * implicitly READ ONLY, it is the start time of the transaction which is
+  * critical for correct detection of serialization failures; the commit
+  * information is only really needed for cleanup purposes.
+  */
+ void
+ CommitEndTransaction(PGPROC *proc, TransactionId latestXid)
+ {
+ 	if (MySerializableXact == InvalidSerializableXact
+ 		|| !TransactionIdIsValid(latestXid))
+ 	{
+ 		ProcArrayEndTransaction(proc, latestXid);
+ 		return;
+ 	}
+ 
+ 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ 
+ 	ProcArrayEndTransaction(proc, latestXid);
+ 
+ 	MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
+ 
+ 	LWLockRelease(SerializableXactHashLock);
+ }
+ 
+ /*
+  *		CommitRemove2PC
+  *
+  * Like CommitEndTransaction, except for a prepared transaction which is being
+  * committed.
+  */
+ void
+ CommitRemove2PC(PGPROC *proc, TransactionId latestXid)
+ {
+ 	SERIALIZABLEXIDTAG sxidtag;
+ 	SERIALIZABLEXID *sxid;
+ 
+ 	if (!TransactionIdIsValid(latestXid))
+ 	{
+ 		ProcArrayRemove(proc, latestXid);
+ 		return;
+ 	}
+ 
+ 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ 
+ 	sxidtag.xid = latestXid;
+ 	sxid = (SERIALIZABLEXID *)
+ 		hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
+ 
+ 	/*
+ 	 * xid will not be found if it wasn't a serializable transaction.  In that
+ 	 * case, make the transaction's work visible while not holding
+ 	 * SerializableXactHashLock.
+ 	 */
+ 	if (sxid == NULL)
+ 	{
+ 		LWLockRelease(SerializableXactHashLock);
+ 		ProcArrayRemove(proc, latestXid);
+ 		return;
+ 	}
+ 
+ 	ProcArrayRemove(proc, latestXid);
+ 
+ 	MySerializableXact = sxid->myXact;
+ 	MyXactDidWrite = true;		/* conservatively assume that we wrote
+ 								 * something */
+ 	MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
+ 
+ 	LWLockRelease(SerializableXactHashLock);
+ }
+ 
+ /*
   *		ReleasePredicateLocks
   *
   * Releases predicate locks based on completion of the current transaction,
***************
*** 3116,3126 **** ReleasePredicateLocks(bool isCommit)
  		return;
  	}
  
- 	Assert(!isCommit || SxactIsPrepared(MySerializableXact));
- 	Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
- 	Assert(!SxactIsCommitted(MySerializableXact));
- 	Assert(!SxactIsRolledBack(MySerializableXact));
- 
  	/* may not be serializable during COMMIT/ROLLBACK PREPARED */
  	if (MySerializableXact->pid != 0)
  		Assert(IsolationIsSerializable());
--- 3204,3209 ----
***************
*** 3132,3137 **** ReleasePredicateLocks(bool isCommit)
--- 3215,3224 ----
  
  	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
  
+ 	Assert(!SxactIsReleased(MySerializableXact));
+ 	Assert(!isCommit || SxactIsPrepared(MySerializableXact));
+ 	Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
+ 
  	/*
  	 * We don't hold XidGenLock lock here, assuming that TransactionId is
  	 * atomic!
***************
*** 3145,3158 **** ReleasePredicateLocks(bool isCommit)
  	 */
  	MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
  
  	/*
  	 * If it's not a commit it's a rollback, and we can clear our locks
  	 * immediately.
  	 */
  	if (isCommit)
  	{
! 		MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
! 		MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
  		/* Recognize implicit read-only transaction (commit without write). */
  		if (!MyXactDidWrite)
  			MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
--- 3232,3249 ----
  	 */
  	MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
  
+ 	MySerializableXact->flags |= SXACT_FLAG_RELEASED;
+ 
  	/*
  	 * If it's not a commit it's a rollback, and we can clear our locks
  	 * immediately.
  	 */
  	if (isCommit)
  	{
! 		if (MySerializableXact->commitSeqNo == InvalidSerCommitSeqNo)
! 		{
! 			MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
! 		}
  		/* Recognize implicit read-only transaction (commit without write). */
  		if (!MyXactDidWrite)
  			MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
***************
*** 3165,3178 **** ReleasePredicateLocks(bool isCommit)
  		 * other transactions that conflict with it. Note that this flag might
  		 * already be set, if another backend marked this transaction for
  		 * abort.
- 		 *
- 		 * The ROLLED_BACK flag further indicates that ReleasePredicateLocks
- 		 * has been called, and so the SerializableXact is eligible for
- 		 * cleanup. This means it should not be considered when calculating
- 		 * SxactGlobalXmin.
  		 */
  		MySerializableXact->flags |= SXACT_FLAG_DOOMED;
! 		MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
  		/*
  		 * If the transaction was previously prepared, but is now failing due
  		 * to a ROLLBACK PREPARED or (hopefully very rare) error after the
--- 3256,3264 ----
  		 * other transactions that conflict with it. Note that this flag might
  		 * already be set, if another backend marked this transaction for
  		 * abort.
  		 */
  		MySerializableXact->flags |= SXACT_FLAG_DOOMED;
! 
  		/*
  		 * If the transaction was previously prepared, but is now failing due
  		 * to a ROLLBACK PREPARED or (hopefully very rare) error after the
***************
*** 3260,3274 **** ReleasePredicateLocks(bool isCommit)
  			&& !SxactIsReadOnly(MySerializableXact)
  			&& SxactIsCommitted(conflict->sxactIn))
  		{
! 			if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
  				|| conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
  				MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
  			MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
  		}
  
! 		if (!isCommit
! 			|| SxactIsCommitted(conflict->sxactIn)
! 			|| (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
  			ReleaseRWConflict(conflict);
  
  		conflict = nextConflict;
--- 3346,3358 ----
  			&& !SxactIsReadOnly(MySerializableXact)
  			&& SxactIsCommitted(conflict->sxactIn))
  		{
! 			if (!SxactHasConflictOut(MySerializableXact)
  				|| conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
  				MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
  			MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
  		}
  
! 		if (!isCommit || SxactIsCommitted(conflict->sxactIn))
  			ReleaseRWConflict(conflict);
  
  		conflict = nextConflict;
***************
*** 3290,3296 **** ReleasePredicateLocks(bool isCommit)
  						 offsetof(RWConflictData, inLink));
  
  		if (!isCommit
! 			|| SxactIsCommitted(conflict->sxactOut)
  			|| SxactIsReadOnly(conflict->sxactOut))
  			ReleaseRWConflict(conflict);
  
--- 3374,3380 ----
  						 offsetof(RWConflictData, inLink));
  
  		if (!isCommit
! 			|| SxactIsReleased(conflict->sxactOut)
  			|| SxactIsReadOnly(conflict->sxactOut))
  			ReleaseRWConflict(conflict);
  
***************
*** 3556,3562 **** ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
  				nextConflict;
  
  	Assert(sxact != NULL);
! 	Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
  	Assert(LWLockHeldByMe(SerializableFinishedListLock));
  
  	/*
--- 3640,3646 ----
  				nextConflict;
  
  	Assert(sxact != NULL);
! 	Assert(SxactIsReleased(sxact));
  	Assert(LWLockHeldByMe(SerializableFinishedListLock));
  
  	/*
***************
*** 4702,4722 **** PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
  	SERIALIZABLEXID *sxid;
  	SERIALIZABLEXIDTAG sxidtag;
  
! 	sxidtag.xid = xid;
  
! 	LWLockAcquire(SerializableXactHashLock, LW_SHARED);
! 	sxid = (SERIALIZABLEXID *)
! 		hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
! 	LWLockRelease(SerializableXactHashLock);
  
! 	/* xid will not be found if it wasn't a serializable transaction */
! 	if (sxid == NULL)
! 		return;
  
! 	/* Release its locks */
! 	MySerializableXact = sxid->myXact;
! 	MyXactDidWrite = true;		/* conservatively assume that we wrote
  								 * something */
  	ReleasePredicateLocks(isCommit);
  }
  
--- 4786,4810 ----
  	SERIALIZABLEXID *sxid;
  	SERIALIZABLEXIDTAG sxidtag;
  
! 	if (MySerializableXact == InvalidSerializableXact)
! 	{
! 		sxidtag.xid = xid;
  
! 		LWLockAcquire(SerializableXactHashLock, LW_SHARED);
! 		sxid = (SERIALIZABLEXID *)
! 			hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
! 		LWLockRelease(SerializableXactHashLock);
  
! 		/* xid will not be found if it wasn't a serializable transaction */
! 		if (sxid == NULL)
! 			return;
  
! 		/* Release its locks */
! 		MySerializableXact = sxid->myXact;
! 		MyXactDidWrite = true;	/* conservatively assume that we wrote
  								 * something */
+ 	}
+ 
  	ReleasePredicateLocks(isCommit);
  }
  
*** a/src/include/storage/predicate.h
--- b/src/include/storage/predicate.h
***************
*** 14,19 ****
--- 14,20 ----
  #ifndef PREDICATE_H
  #define PREDICATE_H
  
+ #include "storage/proc.h"
  #include "utils/relcache.h"
  #include "utils/snapshot.h"
  
***************
*** 50,55 **** extern void PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snap
--- 51,58 ----
  extern void PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
  extern void PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
  extern void TransferPredicateLocksToHeapRelation(Relation relation);
+ extern void CommitEndTransaction(PGPROC *proc, TransactionId latestXid);
+ extern void CommitRemove2PC(PGPROC *proc, TransactionId latestXid);
  extern void ReleasePredicateLocks(bool isCommit);
  
  /* conflict detection (may also trigger rollback) */
*** a/src/include/storage/predicate_internals.h
--- b/src/include/storage/predicate_internals.h
***************
*** 90,111 **** typedef struct SERIALIZABLEXACT
  	int			pid;			/* pid of associated process */
  } SERIALIZABLEXACT;
  
! #define SXACT_FLAG_COMMITTED			0x00000001		/* already committed */
  #define SXACT_FLAG_PREPARED				0x00000002		/* about to commit */
! #define SXACT_FLAG_ROLLED_BACK			0x00000004		/* already rolled back */
! #define SXACT_FLAG_DOOMED				0x00000008		/* will roll back */
  /*
   * The following flag actually means that the flagged transaction has a
   * conflict out *to a transaction which committed ahead of it*.  It's hard
   * to get that into a name of a reasonable length.
   */
! #define SXACT_FLAG_CONFLICT_OUT			0x00000010
! #define SXACT_FLAG_READ_ONLY			0x00000020
! #define SXACT_FLAG_DEFERRABLE_WAITING	0x00000040
! #define SXACT_FLAG_RO_SAFE				0x00000080
! #define SXACT_FLAG_RO_UNSAFE			0x00000100
! #define SXACT_FLAG_SUMMARY_CONFLICT_IN	0x00000200
! #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400
  
  /*
   * The following types are used to provide an ad hoc list for holding
--- 90,110 ----
  	int			pid;			/* pid of associated process */
  } SERIALIZABLEXACT;
  
! #define SXACT_FLAG_DOOMED				0x00000001		/* will roll back */
  #define SXACT_FLAG_PREPARED				0x00000002		/* about to commit */
! #define SXACT_FLAG_RELEASED				0x00000004		/* OK to clean up */
  /*
   * The following flag actually means that the flagged transaction has a
   * conflict out *to a transaction which committed ahead of it*.  It's hard
   * to get that into a name of a reasonable length.
   */
! #define SXACT_FLAG_CONFLICT_OUT			0x00000008
! #define SXACT_FLAG_READ_ONLY			0x00000010
! #define SXACT_FLAG_DEFERRABLE_WAITING	0x00000020
! #define SXACT_FLAG_RO_SAFE				0x00000040
! #define SXACT_FLAG_RO_UNSAFE			0x00000080
! #define SXACT_FLAG_SUMMARY_CONFLICT_IN	0x00000100
! #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000200
  
  /*
   * The following types are used to provide an ad hoc list for holding
*** a/src/test/regress/expected/prepared_xacts.out
--- b/src/test/regress/expected/prepared_xacts.out
***************
*** 134,140 **** SELECT * FROM pxtest1;
  -- This should fail, because the two transactions have a write-skew anomaly
  INSERT INTO pxtest1 VALUES ('fff');
  ERROR:  could not serialize access due to read/write dependencies among transactions
! DETAIL:  Cancelled on identification as a pivot, during write.
  HINT:  The transaction might succeed if retried.
  PREPARE TRANSACTION 'foo5';
  SELECT gid FROM pg_prepared_xacts;
--- 134,140 ----
  -- This should fail, because the two transactions have a write-skew anomaly
  INSERT INTO pxtest1 VALUES ('fff');
  ERROR:  could not serialize access due to read/write dependencies among transactions
! DETAIL:  Canceled on identification as a pivot, during write.
  HINT:  The transaction might succeed if retried.
  PREPARE TRANSACTION 'foo5';
  SELECT gid FROM pg_prepared_xacts;