relation_definition_lock.v3.patch
application/octet-stream
Filename: relation_definition_lock.v3.patch
Type: application/octet-stream
Part: 0
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 912f45c..c804f87 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -2461,6 +2461,22 @@ AlterTable(AlterTableStmt *stmt)
CheckTableNotInUse(rel, "ALTER TABLE");
+ /*
+ * Catalog relations are read using SnapshotNow, which can mislead
+ * concurrent readers of the relation's description into thinking the
+ * relaton does not exist. We avoid those problems by ensuring that
+ * nobody can re-read the relation description while we modify it.
+ * We lock the relation's description by oid to exclude readers
+ * searching for the relation by oid. Some errors are still possible
+ * since SearchCatCache() searches pg_class by name, so we do not
+ * to protect against concurrent reads at that point.
+ *
+ * Some long running command types that use concurrent lock mode
+ * have specific locking to avoid holding the Relation Definition lock
+ * for long periods.
+ */
+ LockRelationDefinitionOid(RelationGetRelid(rel), ExclusiveLock);
+
/* Check relation type against type specified in the ALTER command */
switch (stmt->relkind)
{
@@ -3389,11 +3405,25 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode)
*/
refrel = heap_open(con->refrelid, ShareRowExclusiveLock);
+ /*
+ * Release the lock so we can validate the constraint without holding
+ * up other people reading the definition. If we successfully validate
+ * the constraint we will re-acquire in exclusive mode to allow us to
+ * mark the constraint validated.
+ */
+ UnlockRelationDefinitionOid(RelationGetRelid(rel), ExclusiveLock);
+
validateForeignKeyConstraint(fkconstraint->conname, rel, refrel,
con->refindid,
con->conid);
/*
+ * Now the constraint is validated we lock the relation's definition
+ * until end of transaction.
+ */
+ LockRelationDefinitionOid(RelationGetRelid(rel), ExclusiveLock);
+
+ /*
* No need to mark the constraint row as validated, we did
* that when we inserted the row earlier.
*/
@@ -5810,11 +5840,25 @@ ATExecValidateConstraint(Relation rel, char *constrName)
*/
refrel = heap_open(con->confrelid, RowShareLock);
+ /*
+ * Release the lock so we can validate the constraint without holding
+ * up other people reading the definition. If we successfully validate
+ * the constraint we will re-acquire in exclusive mode to allow us to
+ * mark the constraint validated.
+ */
+ UnlockRelationDefinitionOid(RelationGetRelid(rel), ExclusiveLock);
+
validateForeignKeyConstraint(constrName, rel, refrel,
con->conindid,
conid);
/*
+ * Now the constraint is validated we lock the relation's definition
+ * until end of transaction.
+ */
+ LockRelationDefinitionOid(RelationGetRelid(rel), ExclusiveLock);
+
+ /*
* Now update the catalog, while we have the door open.
*/
copy_con->convalidated = true;
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 798d8a8..78dcdb2 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -43,6 +43,8 @@
#include "pgstat.h"
#include "rewrite/rewriteManip.h"
#include "storage/bufmgr.h"
+#include "storage/lmgr.h"
+#include "storage/lock.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
@@ -632,6 +634,11 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
pfree(DatumGetPointer(values[Anum_pg_trigger_tgattr - 1]));
/*
+ * Lock the relation's definition until end of transaction.
+ */
+ LockRelationDefinitionOid(RelationGetRelid(rel), ExclusiveLock);
+
+ /*
* Update relation's pg_class entry. Crucial side-effect: other backends
* (and this one too!) are sent SI message to make them rebuild relcache
* entries.
@@ -1343,6 +1350,11 @@ EnableDisableTrigger(Relation rel, const char *tgname,
else
nkeys = 1;
+ /*
+ * Lock the relation's definition until end of transaction.
+ */
+ LockRelationDefinitionOid(RelationGetRelid(rel), ExclusiveLock);
+
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
SnapshotNow, nkeys, keys);
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 859b385..a237417 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -304,6 +304,39 @@ UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
}
/*
+ * LockRelationDefinitionOid
+ *
+ * This lock tag is used to interlock reading/update of relid defintions.
+ * We need such locking because relcache invalidation and update is not
+ * race-condition-proof.
+ *
+ * We assume the caller is already holding some type of regular lock on
+ * the relation, so no AcceptInvalidationMessages call is needed here.
+ */
+void
+LockRelationDefinitionOid(Oid relid, LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ SET_LOCKTAG_RELATION_DEFINITION(tag, relid);
+
+ (void) LockAcquire(&tag, lockmode, false, false);
+}
+
+/*
+ * UnlockRelationDefinitionOid
+ */
+void
+UnlockRelationDefinitionOid(Oid relid, LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ SET_LOCKTAG_RELATION_DEFINITION(tag, relid);
+
+ LockRelease(&tag, lockmode, false);
+}
+
+/*
* LockPage
*
* Obtain a page-level lock. This is currently used by some index access
@@ -727,6 +760,12 @@ DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
tag->locktag_field2,
tag->locktag_field1);
break;
+ case LOCKTAG_RELATION_DEFINITION:
+ appendStringInfo(buf,
+ _("definition of relation %u of database %u"),
+ tag->locktag_field2,
+ tag->locktag_field1);
+ break;
case LOCKTAG_PAGE:
appendStringInfo(buf,
_("page %u of relation %u of database %u"),
diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c
index 6d7d4f4..7a464db 100644
--- a/src/backend/utils/adt/lockfuncs.c
+++ b/src/backend/utils/adt/lockfuncs.c
@@ -24,6 +24,7 @@
static const char *const LockTagTypeNames[] = {
"relation",
"extend",
+ "definition",
"page",
"tuple",
"transactionid",
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index 350e040..ebc9c58 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1189,6 +1189,17 @@ SearchCatCache(CatCache *cache,
*/
relation = heap_open(cache->cc_reloid, AccessShareLock);
+ /*
+ * If we are filling the cache for a relation oid we need to protect
+ * against concurrent updates because they can lead to missing a
+ * value that should have been visible.
+ */
+ if (cache->id == RELOID)
+ {
+ Oid reloid = DatumGetObjectId(v1);
+ LockRelationDefinitionOid(reloid, ShareLock);
+ }
+
scandesc = systable_beginscan(relation,
cache->cc_indexoid,
IndexScanOK(cache, cur_skey),
@@ -1212,6 +1223,12 @@ SearchCatCache(CatCache *cache,
systable_endscan(scandesc);
+ if (cache->id == RELOID)
+ {
+ Oid reloid = DatumGetObjectId(v1);
+ UnlockRelationDefinitionOid(reloid, ShareLock);
+ }
+
heap_close(relation, AccessShareLock);
/*
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index d7e94ff..23c45c6 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -812,6 +812,13 @@ RelationBuildDesc(Oid targetRelId, bool insertIt)
Form_pg_class relp;
/*
+ * Obtain a short duration lock on the relation's oid to ensure
+ * no concurrent updates of the definition. See ScanPgRelation().
+ * We take the lock here to protect all related information.
+ */
+ LockRelationDefinitionOid(targetRelId, ShareLock);
+
+ /*
* find the tuple in pg_class corresponding to the given relation id
*/
pg_class_tuple = ScanPgRelation(targetRelId, true);
@@ -907,6 +914,11 @@ RelationBuildDesc(Oid targetRelId, bool insertIt)
RelationParseRelOptions(relation, pg_class_tuple);
/*
+ * We've finished accessing catalog tables, so unlock
+ */
+ UnlockRelationDefinitionOid(targetRelId, ShareLock);
+
+ /*
* initialize the relation lock manager information
*/
RelationInitLockInfo(relation); /* see lmgr.c */
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index bd44d92..99fb52b 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -39,6 +39,10 @@ extern void UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
extern void LockRelationForExtension(Relation relation, LOCKMODE lockmode);
extern void UnlockRelationForExtension(Relation relation, LOCKMODE lockmode);
+/* Lock a relation definition for relcache rebuild */
+extern void LockRelationDefinitionOid(Oid relid, LOCKMODE lockmode);
+extern void UnlockRelationDefinitionOid(Oid relid, LOCKMODE lockmode);
+
/* Lock a page (currently only used within indexes) */
extern void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
extern bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 7ec961f..185475f 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -170,6 +170,8 @@ typedef enum LockTagType
/* ID info for a relation is DB OID + REL OID; DB OID = 0 if shared */
LOCKTAG_RELATION_EXTEND, /* the right to extend a relation */
/* same ID info as RELATION */
+ LOCKTAG_RELATION_DEFINITION,/* the right to rebuild the relcache for a relation */
+ /* ID is relation oid */
LOCKTAG_PAGE, /* one page of a relation */
/* ID info for a page is RELATION info + BlockNumber */
LOCKTAG_TUPLE, /* one physical tuple */
@@ -231,6 +233,14 @@ typedef struct LOCKTAG
(locktag).locktag_type = LOCKTAG_RELATION_EXTEND, \
(locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)
+#define SET_LOCKTAG_RELATION_DEFINITION(locktag,reloid) \
+ ((locktag).locktag_field1 = MyDatabaseId, \
+ (locktag).locktag_field2 = (reloid), \
+ (locktag).locktag_field3 = 0, \
+ (locktag).locktag_field4 = 0, \
+ (locktag).locktag_type = LOCKTAG_RELATION_DEFINITION, \
+ (locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)
+
#define SET_LOCKTAG_PAGE(locktag,dboid,reloid,blocknum) \
((locktag).locktag_field1 = (dboid), \
(locktag).locktag_field2 = (reloid), \