atomic-openrv-v3.patch
application/octet-stream
Filename: atomic-openrv-v3.patch
Type: application/octet-stream
Part: 0
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index c9b1d5f..a345e39 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -975,26 +975,11 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
{
Oid relOid;
- /*
- * Check for shared-cache-inval messages before trying to open the
- * relation. This is needed to cover the case where the name identifies a
- * rel that has been dropped and recreated since the start of our
- * transaction: if we don't flush the old syscache entry then we'll latch
- * onto that entry and suffer an error when we do RelationIdGetRelation.
- * Note that relation_open does not need to do this, since a relation's
- * OID never changes.
- *
- * We skip this if asked for NoLock, on the assumption that the caller has
- * already ensured some appropriate lock is held.
- */
- if (lockmode != NoLock)
- AcceptInvalidationMessages();
-
- /* Look up the appropriate relation using namespace search */
- relOid = RangeVarGetRelid(relation, false);
+ /* Look up and lock the appropriate relation using namespace search */
+ relOid = RangeVarLockRelid(relation, lockmode, false);
/* Let relation_open do the rest */
- return relation_open(relOid, lockmode);
+ return relation_open(relOid, NoLock);
}
/* ----------------
@@ -1012,30 +997,15 @@ relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
{
Oid relOid;
- /*
- * Check for shared-cache-inval messages before trying to open the
- * relation. This is needed to cover the case where the name identifies a
- * rel that has been dropped and recreated since the start of our
- * transaction: if we don't flush the old syscache entry then we'll latch
- * onto that entry and suffer an error when we do RelationIdGetRelation.
- * Note that relation_open does not need to do this, since a relation's
- * OID never changes.
- *
- * We skip this if asked for NoLock, on the assumption that the caller has
- * already ensured some appropriate lock is held.
- */
- if (lockmode != NoLock)
- AcceptInvalidationMessages();
-
- /* Look up the appropriate relation using namespace search */
- relOid = RangeVarGetRelid(relation, missing_ok);
+ /* Look up and lock the appropriate relation using namespace search */
+ relOid = RangeVarLockRelid(relation, lockmode, missing_ok);
/* Return NULL on not-found */
if (!OidIsValid(relOid))
return NULL;
/* Let relation_open do the rest */
- return relation_open(relOid, lockmode);
+ return relation_open(relOid, NoLock);
}
/* ----------------
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index ce795a6..60862a4 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -44,6 +44,8 @@
#include "parser/parse_func.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/sinval.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
@@ -285,6 +287,72 @@ RangeVarGetRelid(const RangeVar *relation, bool failOK)
}
/*
+ * RangeVarLockRelid
+ * Like RangeVarGetRelid, but simulatenously acquire the specified lock on
+ * the relation. This works properly in the face of concurrent DDL that
+ * may drop, create or rename relations.
+ *
+ * If the relation is not found and failOK = true, take no lock and return
+ * InvalidOid. Otherwise, raise an error.
+ */
+Oid
+RangeVarLockRelid(const RangeVar *relation, LOCKMODE lockmode,
+ bool failOK)
+{
+ int lastCounter;
+ Oid relOid1,
+ relOid2;
+
+ /*
+ * First attempt. If the caller requested NoLock, it already acquired an
+ * appropriate lock and has called AcceptInvalidationMessages() since
+ * doing so. In this case, our first search is always correct, and we
+ * degenerate to behave exactly like RangeVarGetRelid().
+ */
+ lastCounter = SharedInvalidMessageCounter;
+ relOid1 = RangeVarGetRelid(relation, failOK);
+ if (lockmode == NoLock)
+ return relOid1;
+
+ /*
+ * By the time we acquire the lock, our RangeVar may denote a different
+ * relation or no relation at all. In particular, this can happen when
+ * the lock acquisition blocks on a transaction performing DROP or ALTER
+ * TABLE RENAME. However, once and while we do hold a lock of any level,
+ * we can count on the name of the found relation remaining stable.
+ *
+ * Even so, DDL activity could cause an object in a schema earlier in the
+ * search path to mask our original selection undetected. No current lock
+ * would prevent that. We let the user worry about it, such as by taking
+ * additional explicit locks.
+ */
+ do
+ {
+ /* Not-found is always final. */
+ if (!OidIsValid(relOid1))
+ return relOid1;
+
+ /*
+ * LockRelationOid also calls AcceptInvalidationMessages() to make
+ * recent DDL effects visible, if needed. Finding none, we're done.
+ */
+ LockRelationOid(relOid1, lockmode);
+ if (lastCounter == SharedInvalidMessageCounter)
+ break;
+ else
+ lastCounter = SharedInvalidMessageCounter;
+
+ /* Some invalidation messages arrived; search again. */
+ relOid2 = relOid1;
+ relOid1 = RangeVarGetRelid(relation, failOK);
+
+ /* Done when our RangeVar denotes the same relation we locked. */
+ } while (relOid1 != relOid2);
+
+ return relOid1;
+}
+
+/*
* RangeVarGetCreationNamespace
* Given a RangeVar describing a to-be-created relation,
* choose which namespace to create it in.
diff --git a/src/backend/storage/ipc/sinval.c b/src/backend/storage/ipc/sinval.c
index 9ab16b1..9b1ec82 100644
--- a/src/backend/storage/ipc/sinval.c
+++ b/src/backend/storage/ipc/sinval.c
@@ -22,6 +22,9 @@
#include "utils/inval.h"
+unsigned SharedInvalidMessageCounter;
+
+
/*
* Because backends sitting idle will not be reading sinval events, we
* need a way to give an idle backend a swift kick in the rear and make
@@ -90,6 +93,7 @@ ReceiveSharedInvalidMessages(
{
SharedInvalidationMessage *msg = &messages[nextmsg++];
+ SharedInvalidMessageCounter++;
invalFunction(msg);
}
@@ -106,6 +110,7 @@ ReceiveSharedInvalidMessages(
{
/* got a reset message */
elog(DEBUG4, "cache state reset");
+ SharedInvalidMessageCounter++;
resetFunction();
break; /* nothing more to do */
}
@@ -118,6 +123,7 @@ ReceiveSharedInvalidMessages(
{
SharedInvalidationMessage *msg = &messages[nextmsg++];
+ SharedInvalidMessageCounter++;
invalFunction(msg);
}
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 859b385..90b2ecc 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -81,10 +81,11 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
/*
* Now that we have the lock, check for invalidation messages, so that we
* will update or flush any stale relcache entry before we try to use it.
- * We can skip this in the not-uncommon case that we already had the same
- * type of lock being requested, since then no one else could have
- * modified the relcache entry in an undesirable way. (In the case where
- * our own xact modifies the rel, the relcache update happens via
+ * RangeVarLockRelid() specifically relies on us for this. We can skip
+ * this in the not-uncommon case that we already had the same type of lock
+ * being requested, since then no one else could have modified the
+ * relcache entry in an undesirable way. (In the case where our own xact
+ * modifies the rel, the relcache update happens via
* CommandCounterIncrement, not here.)
*/
if (res != LOCKACQUIRE_ALREADY_HELD)
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index 7e1e194..da1ddfd 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -15,6 +15,7 @@
#define NAMESPACE_H
#include "nodes/primnodes.h"
+#include "storage/lock.h"
/*
@@ -48,6 +49,8 @@ typedef struct OverrideSearchPath
extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK);
+extern Oid RangeVarLockRelid(const RangeVar *relation, LOCKMODE lockmode,
+ bool failOK);
extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid);
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index e9ce025..a90aa2d 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -116,6 +116,10 @@ typedef union
} SharedInvalidationMessage;
+/* Counter of messages processed; may overflow. */
+extern unsigned SharedInvalidMessageCounter;
+
+
extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs,
int n);
extern void ReceiveSharedInvalidMessages(