atomic-openrv-v4.patch

text/plain

Filename: atomic-openrv-v4.patch
Type: text/plain
Part: 0
Message: Re: Make relation_openrv atomic wrt DDL
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index c9b1d5f..a345e39 100644
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 975,1000 **** relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
  {
  	Oid			relOid;
  
! 	/*
! 	 * Check for shared-cache-inval messages before trying to open the
! 	 * relation.  This is needed to cover the case where the name identifies a
! 	 * rel that has been dropped and recreated since the start of our
! 	 * transaction: if we don't flush the old syscache entry then we'll latch
! 	 * onto that entry and suffer an error when we do RelationIdGetRelation.
! 	 * Note that relation_open does not need to do this, since a relation's
! 	 * OID never changes.
! 	 *
! 	 * We skip this if asked for NoLock, on the assumption that the caller has
! 	 * already ensured some appropriate lock is held.
! 	 */
! 	if (lockmode != NoLock)
! 		AcceptInvalidationMessages();
! 
! 	/* Look up the appropriate relation using namespace search */
! 	relOid = RangeVarGetRelid(relation, false);
  
  	/* Let relation_open do the rest */
! 	return relation_open(relOid, lockmode);
  }
  
  /* ----------------
--- 975,985 ----
  {
  	Oid			relOid;
  
! 	/* Look up and lock the appropriate relation using namespace search */
! 	relOid = RangeVarLockRelid(relation, lockmode, false);
  
  	/* Let relation_open do the rest */
! 	return relation_open(relOid, NoLock);
  }
  
  /* ----------------
***************
*** 1012,1041 **** relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
  {
  	Oid			relOid;
  
! 	/*
! 	 * Check for shared-cache-inval messages before trying to open the
! 	 * relation.  This is needed to cover the case where the name identifies a
! 	 * rel that has been dropped and recreated since the start of our
! 	 * transaction: if we don't flush the old syscache entry then we'll latch
! 	 * onto that entry and suffer an error when we do RelationIdGetRelation.
! 	 * Note that relation_open does not need to do this, since a relation's
! 	 * OID never changes.
! 	 *
! 	 * We skip this if asked for NoLock, on the assumption that the caller has
! 	 * already ensured some appropriate lock is held.
! 	 */
! 	if (lockmode != NoLock)
! 		AcceptInvalidationMessages();
! 
! 	/* Look up the appropriate relation using namespace search */
! 	relOid = RangeVarGetRelid(relation, missing_ok);
  
  	/* Return NULL on not-found */
  	if (!OidIsValid(relOid))
  		return NULL;
  
  	/* Let relation_open do the rest */
! 	return relation_open(relOid, lockmode);
  }
  
  /* ----------------
--- 997,1011 ----
  {
  	Oid			relOid;
  
! 	/* Look up and lock the appropriate relation using namespace search */
! 	relOid = RangeVarLockRelid(relation, lockmode, missing_ok);
  
  	/* Return NULL on not-found */
  	if (!OidIsValid(relOid))
  		return NULL;
  
  	/* Let relation_open do the rest */
! 	return relation_open(relOid, NoLock);
  }
  
  /* ----------------
diff --git a/src/backend/catalog/namespindex ce795a6..f75fcef 100644
*** a/src/backend/catalog/namespace.c
--- b/src/backend/catalog/namespace.c
***************
*** 44,49 ****
--- 44,51 ----
  #include "parser/parse_func.h"
  #include "storage/backendid.h"
  #include "storage/ipc.h"
+ #include "storage/lmgr.h"
+ #include "storage/sinval.h"
  #include "utils/acl.h"
  #include "utils/builtins.h"
  #include "utils/guc.h"
***************
*** 285,290 **** RangeVarGetRelid(const RangeVar *relation, bool failOK)
--- 287,372 ----
  }
  
  /*
+  * RangeVarLockRelid
+  *		Like RangeVarGetRelid, but simultaneously acquire the specified lock
+  *		on the relation.  This works properly in the face of concurrent DDL
+  *		that may drop, create or rename relations.
+  *
+  * If the relation is not found and failOK = true, take no lock and return
+  * InvalidOid.  Otherwise, raise an error.
+  */
+ Oid
+ RangeVarLockRelid(const RangeVar *relation, LOCKMODE lockmode,
+ 				  bool failOK)
+ {
+ 	int			lastCounter;
+ 	Oid			relOid1,
+ 				relOid2;
+ 
+ 	/*
+ 	 * If the caller requested NoLock, it already acquired an appropriate lock
+ 	 * with LockRelationOid().  In this case, our first search is always
+ 	 * correct, and we degenerate to RangeVarGetRelid().
+ 	 */
+ 	if (lockmode == NoLock)
+ 		return RangeVarGetRelid(relation, failOK);
+ 
+ 	/*
+ 	 * By the time we acquire the lock, our RangeVar may denote a different
+ 	 * relation or no relation at all.  In particular, this can happen when
+ 	 * the lock acquisition blocks on a transaction performing DROP or ALTER
+ 	 * TABLE RENAME.  However, once and while we do hold a lock of any level,
+ 	 * we can count on the name of the found relation remaining stable.
+ 	 *
+ 	 * Even so, DDL activity could cause an object in a schema earlier in the
+ 	 * search path to mask our original selection undetected.  No current lock
+ 	 * would prevent that.  We let the user worry about it, such as by taking
+ 	 * additional explicit locks.
+ 	 */
+ 
+ 	/*
+ 	 * First attempt.  Always allow it to fail; RangeVarGetRelid() may hit a
+ 	 * negative catcache entry for a recently-created relation.  Find out by
+ 	 * making recent DDL effects visible and checking again.  Attempting to
+ 	 * open nonexistent relations is not a hot path in applications, so the
+ 	 * extra cost is of little concern.
+ 	 */
+ 	lastCounter = SharedInvalidMessageCounter;
+ 	relOid1 = RangeVarGetRelid(relation, true);
+ 	if (!OidIsValid(relOid1))
+ 	{
+ 		AcceptInvalidationMessages();
+ 		lastCounter = SharedInvalidMessageCounter;
+ 		relOid1 = RangeVarGetRelid(relation, failOK);
+ 	}
+ 
+ 	do
+ 	{
+ 		/* After the initial precautions above, not-found is final. */
+ 		if (!OidIsValid(relOid1))
+ 			return relOid1;
+ 
+ 		/*
+ 		 * LockRelationOid() also calls AcceptInvalidationMessages() to make
+ 		 * recent DDL effects visible, if needed.  Finding none, we're done.
+ 		 */
+ 		LockRelationOid(relOid1, lockmode);
+ 		if (lastCounter == SharedInvalidMessageCounter)
+ 			break;
+ 		else
+ 			lastCounter = SharedInvalidMessageCounter;
+ 
+ 		/* Some invalidation messages arrived; search again. */
+ 		relOid2 = relOid1;
+ 		relOid1 = RangeVarGetRelid(relation, failOK);
+ 
+ 		/* Done when our RangeVar denotes the same relation we locked. */
+ 	} while (relOid1 != relOid2);
+ 
+ 	return relOid1;
+ }
+ 
+ /*
   * RangeVarGetCreationNamespace
   *		Given a RangeVar describing a to-be-created relation,
   *		choose which namespace to create it in.
diff --git a/src/backend/storage/ipc/sindex 9ab16b1..8499615 100644
*** a/src/backend/storage/ipc/sinval.c
--- b/src/backend/storage/ipc/sinval.c
***************
*** 22,27 ****
--- 22,30 ----
  #include "utils/inval.h"
  
  
+ uint64 SharedInvalidMessageCounter;
+ 
+ 
  /*
   * Because backends sitting idle will not be reading sinval events, we
   * need a way to give an idle backend a swift kick in the rear and make
***************
*** 90,95 **** ReceiveSharedInvalidMessages(
--- 93,99 ----
  	{
  		SharedInvalidationMessage *msg = &messages[nextmsg++];
  
+ 		SharedInvalidMessageCounter++;
  		invalFunction(msg);
  	}
  
***************
*** 106,111 **** ReceiveSharedInvalidMessages(
--- 110,116 ----
  		{
  			/* got a reset message */
  			elog(DEBUG4, "cache state reset");
+ 			SharedInvalidMessageCounter++;
  			resetFunction();
  			break;				/* nothing more to do */
  		}
***************
*** 118,123 **** ReceiveSharedInvalidMessages(
--- 123,129 ----
  		{
  			SharedInvalidationMessage *msg = &messages[nextmsg++];
  
+ 			SharedInvalidMessageCounter++;
  			invalFunction(msg);
  		}
  
diff --git a/src/backend/storage/lmgr/lindex 859b385..90b2ecc 100644
*** a/src/backend/storage/lmgr/lmgr.c
--- b/src/backend/storage/lmgr/lmgr.c
***************
*** 81,90 **** LockRelationOid(Oid relid, LOCKMODE lockmode)
  	/*
  	 * Now that we have the lock, check for invalidation messages, so that we
  	 * will update or flush any stale relcache entry before we try to use it.
! 	 * We can skip this in the not-uncommon case that we already had the same
! 	 * type of lock being requested, since then no one else could have
! 	 * modified the relcache entry in an undesirable way.  (In the case where
! 	 * our own xact modifies the rel, the relcache update happens via
  	 * CommandCounterIncrement, not here.)
  	 */
  	if (res != LOCKACQUIRE_ALREADY_HELD)
--- 81,91 ----
  	/*
  	 * Now that we have the lock, check for invalidation messages, so that we
  	 * will update or flush any stale relcache entry before we try to use it.
! 	 * RangeVarLockRelid() specifically relies on us for this.  We can skip
! 	 * this in the not-uncommon case that we already had the same type of lock
! 	 * being requested, since then no one else could have modified the
! 	 * relcache entry in an undesirable way.  (In the case where our own xact
! 	 * modifies the rel, the relcache update happens via
  	 * CommandCounterIncrement, not here.)
  	 */
  	if (res != LOCKACQUIRE_ALREADY_HELD)
diff --git a/src/include/catalog/namesindex 7e1e194..da1ddfd 100644
*** a/src/include/catalog/namespace.h
--- b/src/include/catalog/namespace.h
***************
*** 15,20 ****
--- 15,21 ----
  #define NAMESPACE_H
  
  #include "nodes/primnodes.h"
+ #include "storage/lock.h"
  
  
  /*
***************
*** 48,53 **** typedef struct OverrideSearchPath
--- 49,56 ----
  
  
  extern Oid	RangeVarGetRelid(const RangeVar *relation, bool failOK);
+ extern Oid	RangeVarLockRelid(const RangeVar *relation, LOCKMODE lockmode,
+ 				  bool failOK);
  extern Oid	RangeVarGetCreationNamespace(const RangeVar *newRelation);
  extern Oid	RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
  extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
diff --git a/src/include/storage/sinvaindex e9ce025..aba474d 100644
*** a/src/include/storage/sinval.h
--- b/src/include/storage/sinval.h
***************
*** 116,121 **** typedef union
--- 116,125 ----
  } SharedInvalidationMessage;
  
  
+ /* Counter of messages processed; don't worry about overflow. */
+ extern uint64 SharedInvalidMessageCounter;
+ 
+ 
  extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs,
  						  int n);
  extern void ReceiveSharedInvalidMessages(