List:Commits« Previous MessageNext Message »
From:Konstantin Osipov Date:January 12 2010 7:37pm
Subject:bzr commit into mysql-5.6-next-mr branch (kostja:3049) Bug#46272
View as plain text  
#At file:///opt/local/work/next-4284-stage/ based on revid:jon.hauglid@stripped

 3049 Konstantin Osipov	2010-01-12
      A prerequisite patch for the fix for Bug#46272
      "Bug #46272 MySQL 5.4.4, new MDL: unnecessary deadlock".
      Instead of a single global mutex that protects the MDL
      subsystem, use a mutex per one MDL lock.
      Commit the patch into the staging tree to be able to 
      use work on review comments.

    modified:
      mysql-test/include/handler.inc
      mysql-test/r/handler_innodb.result
      mysql-test/r/handler_myisam.result
      mysql-test/r/mdl_sync.result
      mysql-test/t/mdl_sync.test
      sql/mdl.cc
      sql/mdl.h
      sql/sql_plist.h
=== modified file 'mysql-test/include/handler.inc'
--- a/mysql-test/include/handler.inc	2009-12-30 17:53:30 +0000
+++ b/mysql-test/include/handler.inc	2010-01-12 19:37:18 +0000
@@ -543,7 +543,7 @@ disconnect flush;
 #
 
 --disable_warnings
-drop table if exists t1,t2;
+drop table if exists t1, t0;
 --enable_warnings
 create table t1 (c1 int);
 --echo connection: default
@@ -552,31 +552,31 @@ handler t1 read first;
 connect (flush,localhost,root,,);
 connection flush;
 --echo connection: flush
---send rename table t1 to t2;
+--send rename table t1 to t0;
 connection waiter;
 --echo connection: waiter 
 let $wait_condition=
   select count(*) = 1 from information_schema.processlist
-  where state = "Waiting for table" and info = "rename table t1 to t2";
+  where state = "Waiting for table" and info = "rename table t1 to t0";
 --source include/wait_condition.inc
 connection default;
 --echo connection: default
 --echo #
 --echo # RENAME placed two pending locks and waits.
---echo # When HANDLER t2 OPEN does open_tables(), it calls
+--echo # When HANDLER t0 OPEN does open_tables(), it calls
 --echo # mysql_ha_flush(), which in turn closes the open HANDLER for t1.
 --echo # RENAME TABLE gets unblocked. If it gets scheduled quickly
 --echo # and manages to complete before open_tables()
---echo # of HANDLER t2 OPEN, open_tables() and therefore the whole
---echo # HANDLER t2 OPEN succeeds. Otherwise open_tables() 
+--echo # of HANDLER t0 OPEN, open_tables() and therefore the whole
+--echo # HANDLER t0 OPEN succeeds. Otherwise open_tables() 
 --echo # notices a pending or active exclusive metadata lock on t2
---echo # and the whole HANDLER t2 OPEN fails with ER_LOCK_DEADLOCK
+--echo # and the whole HANDLER t0 OPEN fails with ER_LOCK_DEADLOCK
 --echo # error.
 --echo #
 --error 0, ER_LOCK_DEADLOCK
-handler t2 open;
+handler t0 open;
 --error 0, ER_UNKNOWN_TABLE
-handler t2 close;
+handler t0 close;
 --echo connection: flush
 connection flush;
 reap;
@@ -585,7 +585,7 @@ handler t1 read next;
 --error ER_UNKNOWN_TABLE
 handler t1 close;
 connection default;
-drop table t2;
+drop table t0;
 connection flush;
 disconnect flush;
 --source include/wait_until_disconnected.inc
@@ -972,35 +972,29 @@ connection default;
 --echo #
 create table t1 (a int, key a (a));
 insert into t1 (a) values (1), (2), (3), (4), (5);
-create table t2 (a int, key a (a));
-insert into t2 (a) values (1), (2), (3), (4), (5);
+create table t0 (a int, key a (a));
+insert into t0 (a) values (1), (2), (3), (4), (5);
 begin;
 select * from t1;
---echo # --> connection con1
-connection con1;
-lock table t2 read;
 --echo # --> connection con2 
 connection con2;
 --echo # Sending:
-send rename table t2 to t3, t1 to t2, t3 to t1;
+send rename table t0 to t3, t1 to t0, t3 to t1;
 --echo # --> connection con1 
 connection con1;
 --echo # Waiting for 'rename table ...' to get blocked...
 let $wait_condition=select count(*)=1 from information_schema.processlist
-where state='Waiting for table' and info='rename table t2 to t3, t1 to t2, t3 to t1';
+where state='Waiting for table' and info='rename table t0 to t3, t1 to t0, t3 to t1';
 --source include/wait_condition.inc
 --echo # --> connection default
 connection default;
 --error ER_LOCK_DEADLOCK
-handler t2 open;
+handler t0 open;
 --error ER_LOCK_DEADLOCK
-select * from t2;
+select * from t0;
 handler t1 open;
 commit;
 handler t1 close;
---echo # --> connection con1
-connection con1;
-unlock tables;
 --echo # --> connection con2
 connection con2;
 --echo # Reaping 'rename table ...'...
@@ -1010,7 +1004,7 @@ connection default;
 handler t1 open;
 handler t1 read a prev;
 handler t1 close;
-drop table t2;
+drop table t0;
 --echo #
 --echo # Originally there was a deadlock error in this test.
 --echo # With implementation of deadlock detector

=== modified file 'mysql-test/r/handler_innodb.result'
--- a/mysql-test/r/handler_innodb.result	2009-12-30 17:53:30 +0000
+++ b/mysql-test/r/handler_innodb.result	2010-01-12 19:37:18 +0000
@@ -560,36 +560,36 @@ c1
 handler t1 close;
 handler t2 close;
 drop table t1,t2;
-drop table if exists t1,t2;
+drop table if exists t1, t0;
 create table t1 (c1 int);
 connection: default
 handler t1 open;
 handler t1 read first;
 c1
 connection: flush
-rename table t1 to t2;;
+rename table t1 to t0;;
 connection: waiter 
 connection: default
 #
 # RENAME placed two pending locks and waits.
-# When HANDLER t2 OPEN does open_tables(), it calls
+# When HANDLER t0 OPEN does open_tables(), it calls
 # mysql_ha_flush(), which in turn closes the open HANDLER for t1.
 # RENAME TABLE gets unblocked. If it gets scheduled quickly
 # and manages to complete before open_tables()
-# of HANDLER t2 OPEN, open_tables() and therefore the whole
-# HANDLER t2 OPEN succeeds. Otherwise open_tables() 
+# of HANDLER t0 OPEN, open_tables() and therefore the whole
+# HANDLER t0 OPEN succeeds. Otherwise open_tables() 
 # notices a pending or active exclusive metadata lock on t2
-# and the whole HANDLER t2 OPEN fails with ER_LOCK_DEADLOCK
+# and the whole HANDLER t0 OPEN fails with ER_LOCK_DEADLOCK
 # error.
 #
-handler t2 open;
-handler t2 close;
+handler t0 open;
+handler t0 close;
 connection: flush
 handler t1 read next;
 ERROR 42S02: Unknown table 't1' in HANDLER
 handler t1 close;
 ERROR 42S02: Unknown table 't1' in HANDLER
-drop table t2;
+drop table t0;
 drop table if exists t1;
 create temporary table t1 (a int, b char(1), key a(a), key b(a,b));
 insert into t1 values (0,"a"),(1,"b"),(2,"c"),(3,"d"),(4,"e"),
@@ -989,8 +989,8 @@ handler t1 close;
 #
 create table t1 (a int, key a (a));
 insert into t1 (a) values (1), (2), (3), (4), (5);
-create table t2 (a int, key a (a));
-insert into t2 (a) values (1), (2), (3), (4), (5);
+create table t0 (a int, key a (a));
+insert into t0 (a) values (1), (2), (3), (4), (5);
 begin;
 select * from t1;
 a
@@ -999,23 +999,19 @@ a
 3
 4
 5
-# --> connection con1
-lock table t2 read;
 # --> connection con2 
 # Sending:
-rename table t2 to t3, t1 to t2, t3 to t1;
+rename table t0 to t3, t1 to t0, t3 to t1;
 # --> connection con1 
 # Waiting for 'rename table ...' to get blocked...
 # --> connection default
-handler t2 open;
+handler t0 open;
 ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
-select * from t2;
+select * from t0;
 ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
 handler t1 open;
 commit;
 handler t1 close;
-# --> connection con1
-unlock tables;
 # --> connection con2
 # Reaping 'rename table ...'...
 # --> connection default
@@ -1024,7 +1020,7 @@ handler t1 read a prev;
 a
 5
 handler t1 close;
-drop table t2;
+drop table t0;
 #
 # Originally there was a deadlock error in this test.
 # With implementation of deadlock detector

=== modified file 'mysql-test/r/handler_myisam.result'
--- a/mysql-test/r/handler_myisam.result	2009-12-30 17:53:30 +0000
+++ b/mysql-test/r/handler_myisam.result	2010-01-12 19:37:18 +0000
@@ -559,36 +559,36 @@ c1
 handler t1 close;
 handler t2 close;
 drop table t1,t2;
-drop table if exists t1,t2;
+drop table if exists t1, t0;
 create table t1 (c1 int);
 connection: default
 handler t1 open;
 handler t1 read first;
 c1
 connection: flush
-rename table t1 to t2;;
+rename table t1 to t0;;
 connection: waiter 
 connection: default
 #
 # RENAME placed two pending locks and waits.
-# When HANDLER t2 OPEN does open_tables(), it calls
+# When HANDLER t0 OPEN does open_tables(), it calls
 # mysql_ha_flush(), which in turn closes the open HANDLER for t1.
 # RENAME TABLE gets unblocked. If it gets scheduled quickly
 # and manages to complete before open_tables()
-# of HANDLER t2 OPEN, open_tables() and therefore the whole
-# HANDLER t2 OPEN succeeds. Otherwise open_tables() 
+# of HANDLER t0 OPEN, open_tables() and therefore the whole
+# HANDLER t0 OPEN succeeds. Otherwise open_tables() 
 # notices a pending or active exclusive metadata lock on t2
-# and the whole HANDLER t2 OPEN fails with ER_LOCK_DEADLOCK
+# and the whole HANDLER t0 OPEN fails with ER_LOCK_DEADLOCK
 # error.
 #
-handler t2 open;
-handler t2 close;
+handler t0 open;
+handler t0 close;
 connection: flush
 handler t1 read next;
 ERROR 42S02: Unknown table 't1' in HANDLER
 handler t1 close;
 ERROR 42S02: Unknown table 't1' in HANDLER
-drop table t2;
+drop table t0;
 drop table if exists t1;
 create temporary table t1 (a int, b char(1), key a(a), key b(a,b));
 insert into t1 values (0,"a"),(1,"b"),(2,"c"),(3,"d"),(4,"e"),
@@ -986,8 +986,8 @@ handler t1 close;
 #
 create table t1 (a int, key a (a));
 insert into t1 (a) values (1), (2), (3), (4), (5);
-create table t2 (a int, key a (a));
-insert into t2 (a) values (1), (2), (3), (4), (5);
+create table t0 (a int, key a (a));
+insert into t0 (a) values (1), (2), (3), (4), (5);
 begin;
 select * from t1;
 a
@@ -996,23 +996,19 @@ a
 3
 4
 5
-# --> connection con1
-lock table t2 read;
 # --> connection con2 
 # Sending:
-rename table t2 to t3, t1 to t2, t3 to t1;
+rename table t0 to t3, t1 to t0, t3 to t1;
 # --> connection con1 
 # Waiting for 'rename table ...' to get blocked...
 # --> connection default
-handler t2 open;
+handler t0 open;
 ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
-select * from t2;
+select * from t0;
 ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
 handler t1 open;
 commit;
 handler t1 close;
-# --> connection con1
-unlock tables;
 # --> connection con2
 # Reaping 'rename table ...'...
 # --> connection default
@@ -1021,7 +1017,7 @@ handler t1 read a prev;
 a
 5
 handler t1 close;
-drop table t2;
+drop table t0;
 #
 # Originally there was a deadlock error in this test.
 # With implementation of deadlock detector

=== modified file 'mysql-test/r/mdl_sync.result'
--- a/mysql-test/r/mdl_sync.result	2009-12-30 17:53:30 +0000
+++ b/mysql-test/r/mdl_sync.result	2010-01-12 19:37:18 +0000
@@ -23,7 +23,7 @@ SET DEBUG_SYNC= 'RESET';
 # Test coverage for basic deadlock detection in metadata
 # locking subsystem.
 #
-drop tables if exists t1, t2, t3, t4;
+drop tables if exists t0, t1, t2, t3, t4, t5;
 create table t1 (i int);
 create table t2 (j int);
 create table t3 (k int);
@@ -90,7 +90,7 @@ commit;
 #
 # Switching to connection 'deadlock_con1'.
 begin;
-insert into t1 values (2);
+insert into t2 values (2);
 #
 # Switching to connection 'default'.
 # Send:
@@ -98,11 +98,11 @@ rename table t2 to t0, t1 to t2, t0 to t
 #
 # Switching to connection 'deadlock_con1'.
 # Wait until the above RENAME TABLE is blocked because it has to wait
-# for 'deadlock_con1' which holds shared metadata lock on 't1'.
+# for 'deadlock_con1' which holds shared metadata lock on 't2'.
 # 
 # The below statement should not wait as doing so will cause deadlock.
 # Instead it should fail and emit ER_LOCK_DEADLOCK statement.
-select * from t2;
+select * from t1;
 ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
 #
 # Let us check that failure of the above statement has not released
@@ -141,7 +141,7 @@ select * from t2;;
 # for an exclusive metadata lock to go away.
 # Send RENAME TABLE statement that will deadlock with the
 # SELECT statement and thus should abort the latter.
-rename table t1 to t0, t2 to t1, t0 to t2;;
+rename table t1 to t5, t2 to t1, t5 to t2;;
 #
 # Switching to connection 'deadlock_con1'.
 # Since the latest RENAME TABLE entered in deadlock with SELECT
@@ -156,15 +156,17 @@ ERROR 40001: Deadlock found when trying 
 # Commit transaction to unblock this RENAME TABLE.
 commit;
 #
-# Switching to connection 'deadlock_con3'.
-# Reap RENAME TABLE t1 TO t0 ... .
-#
 # Switching to connection 'deadlock_con2'.
 # Commit transaction to unblock the first RENAME TABLE.
 commit;
 #
 # Switching to connection 'default'.
 # Reap RENAME TABLE t2 TO t0 ... .
+#
+# Switching to connection 'deadlock_con3'.
+# Reap RENAME TABLE t1 TO t5 ... .
+#
+# Switching to connection 'default'.
 drop tables t1, t2, t3, t4;
 #
 # Now, test case which shows that deadlock detection empiric

=== modified file 'mysql-test/t/mdl_sync.test'
--- a/mysql-test/t/mdl_sync.test	2009-12-30 17:53:30 +0000
+++ b/mysql-test/t/mdl_sync.test	2010-01-12 19:37:18 +0000
@@ -78,7 +78,7 @@ SET DEBUG_SYNC= 'RESET';
 --echo # locking subsystem.
 --echo #
 --disable_warnings
-drop tables if exists t1, t2, t3, t4;
+drop tables if exists t0, t1, t2, t3, t4, t5;
 --enable_warnings
 
 connect(deadlock_con1,localhost,root,,);
@@ -189,7 +189,7 @@ connection default;
 --echo # Switching to connection 'deadlock_con1'.
 connection deadlock_con1;
 begin;
-insert into t1 values (2);
+insert into t2 values (2);
 
 --echo #
 --echo # Switching to connection 'default'.
@@ -201,7 +201,7 @@ connection default;
 --echo # Switching to connection 'deadlock_con1'.
 connection deadlock_con1;
 --echo # Wait until the above RENAME TABLE is blocked because it has to wait
---echo # for 'deadlock_con1' which holds shared metadata lock on 't1'.
+--echo # for 'deadlock_con1' which holds shared metadata lock on 't2'.
 let $wait_condition=
   select count(*) = 1 from information_schema.processlist
   where state = "Waiting for table" and info = "rename table t2 to t0, t1 to t2, t0 to t1";
@@ -210,7 +210,7 @@ let $wait_condition=
 --echo # The below statement should not wait as doing so will cause deadlock.
 --echo # Instead it should fail and emit ER_LOCK_DEADLOCK statement.
 --error ER_LOCK_DEADLOCK
-select * from t2;
+select * from t1;
 
 --echo #
 --echo # Let us check that failure of the above statement has not released
@@ -276,7 +276,7 @@ let $wait_condition=
 
 --echo # Send RENAME TABLE statement that will deadlock with the
 --echo # SELECT statement and thus should abort the latter.
---send rename table t1 to t0, t2 to t1, t0 to t2;
+--send rename table t1 to t5, t2 to t1, t5 to t2;
 
 --echo #
 --echo # Switching to connection 'deadlock_con1'.
@@ -294,18 +294,12 @@ connection deadlock_con1;
 --echo # is blocked.
 let $wait_condition=
   select count(*) = 1 from information_schema.processlist
-  where state = "Waiting for table" and info = "rename table t1 to t0, t2 to t1, t0 to t2";
+  where state = "Waiting for table" and info = "rename table t1 to t5, t2 to t1, t5 to t2";
 --source include/wait_condition.inc
 --echo # Commit transaction to unblock this RENAME TABLE.
 commit;
 
 --echo #
---echo # Switching to connection 'deadlock_con3'.
-connection deadlock_con3;
---echo # Reap RENAME TABLE t1 TO t0 ... .
---reap;
-
---echo #
 --echo # Switching to connection 'deadlock_con2'.
 connection deadlock_con2;
 --echo # Commit transaction to unblock the first RENAME TABLE.
@@ -317,6 +311,16 @@ connection default;
 --echo # Reap RENAME TABLE t2 TO t0 ... .
 --reap
 
+--echo #
+--echo # Switching to connection 'deadlock_con3'.
+connection deadlock_con3;
+--echo # Reap RENAME TABLE t1 TO t5 ... .
+--reap;
+
+--echo #
+--echo # Switching to connection 'default'.
+connection default;
+
 drop tables t1, t2, t3, t4;
 
 --echo #

=== modified file 'sql/mdl.cc'
--- a/sql/mdl.cc	2009-12-30 17:53:30 +0000
+++ b/sql/mdl.cc	2010-01-12 19:37:18 +0000
@@ -49,19 +49,25 @@ public:
   MDL_key key;
   /** List of granted tickets for this lock. */
   Ticket_list granted;
+  /** Tickets for contexts waiting to acquire shared lock. */
+  Ticket_list waiting_shared;
   /**
+    Tickets for contexts waiting to acquire exclusive lock.
     There can be several upgraders and active exclusive
     locks belonging to the same context. E.g.
     in case of RENAME t1 to t2, t2 to t3, we attempt to
     exclusively lock t2 twice.
   */
-  Ticket_list waiting;
+  Ticket_list waiting_exclusive;
   void   *cached_object;
   mdl_cached_object_release_hook cached_object_release_hook;
+  /** Mutex protecting this lock context. */
+  pthread_mutex_t lock;
 
   bool is_empty() const
   {
-    return (granted.is_empty() && waiting.is_empty());
+    return (granted.is_empty() && waiting_shared.is_empty() &&
+            waiting_exclusive.is_empty());
   }
 
   bool can_grant_lock(const MDL_context *requestor_ctx,
@@ -76,12 +82,17 @@ private:
     cached_object(NULL),
     cached_object_release_hook(NULL)
   {
+    pthread_mutex_init(&lock, NULL);
+  }
+
+  ~MDL_lock()
+  {
+    pthread_mutex_destroy(&lock);
   }
 };
 
 
-static pthread_mutex_t LOCK_mdl;
-static pthread_cond_t  COND_mdl;
+static pthread_mutex_t LOCK_mdl_hash;
 static HASH mdl_locks;
 
 /**
@@ -95,19 +106,31 @@ static HASH mdl_locks;
 class MDL_global_lock
 {
 public:
-  uint waiting_shared;
+  /*
+    QQ: In theory we can get rid of these lists by using separate
+        pthread_cond_t used specifically for global lock.
+        But before doing this we should think about possible
+        consequences for deadlock detection/resolving.
+        Also long term it probably makes sense to have a more
+        generic/uniform structure.
+        Should we do this ?
+  */
+  MDL_lock::Ticket_list waiting_shared;
+  MDL_lock::Ticket_list waiting_intention_exclusive;
   uint active_shared;
   uint active_intention_exclusive;
 
   bool is_empty() const
   {
-    return (waiting_shared == 0 && active_shared == 0 &&
-            active_intention_exclusive == 0);
+    return (waiting_shared.is_empty() &&
+            waiting_intention_exclusive.is_empty() &&
+            active_shared == 0 && active_intention_exclusive == 0);
   }
   bool is_lock_type_compatible(enum_mdl_type type, bool is_upgrade) const;
 };
 
 
+static pthread_mutex_t LOCK_mdl_global;
 static MDL_global_lock global_lock;
 
 
@@ -147,8 +170,8 @@ void mdl_init()
 {
   DBUG_ASSERT(! mdl_initialized);
   mdl_initialized= TRUE;
-  pthread_mutex_init(&LOCK_mdl, NULL);
-  pthread_cond_init(&COND_mdl, NULL);
+  pthread_mutex_init(&LOCK_mdl_hash, NULL);
+  pthread_mutex_init(&LOCK_mdl_global, NULL);
   my_hash_init(&mdl_locks, &my_charset_bin, 16 /* FIXME */, 0, 0,
                mdl_locks_key, 0, 0);
   /* The global lock is zero-initialized by the loader. */
@@ -170,8 +193,8 @@ void mdl_destroy()
     mdl_initialized= FALSE;
     DBUG_ASSERT(!mdl_locks.records);
     DBUG_ASSERT(global_lock.is_empty());
-    pthread_mutex_destroy(&LOCK_mdl);
-    pthread_cond_destroy(&COND_mdl);
+    pthread_mutex_destroy(&LOCK_mdl_global);
+    pthread_mutex_destroy(&LOCK_mdl_hash);
     my_hash_free(&mdl_locks);
   }
 }
@@ -186,6 +209,7 @@ void mdl_destroy()
 void MDL_context::init(THD *thd_arg)
 {
   m_has_global_shared_lock= FALSE;
+  m_global_intention_exclusive_locks= 0;
   m_thd= thd_arg;
   m_lt_or_ha_sentinel= NULL;
   /*
@@ -196,7 +220,7 @@ void MDL_context::init(THD *thd_arg)
     to empty the list.
   */
   m_tickets.empty();
-  m_is_waiting_in_mdl= FALSE;
+  pthread_cond_init(&m_cond, NULL);
 }
 
 
@@ -215,7 +239,9 @@ void MDL_context::init(THD *thd_arg)
 void MDL_context::destroy()
 {
   DBUG_ASSERT(m_tickets.is_empty());
+  DBUG_ASSERT(! m_global_intention_exclusive_locks);
   DBUG_ASSERT(! m_has_global_shared_lock);
+  pthread_cond_destroy(&m_cond);
 }
 
 
@@ -354,18 +380,21 @@ void MDL_ticket::destroy(MDL_ticket *tic
         will probably introduce too much overhead.
 */
 
-#define MDL_ENTER_COND(A, B) mdl_enter_cond(A, B, __func__, __FILE__, __LINE__)
+#define MDL_ENTER_COND(A, B, C, D) \
+        mdl_enter_cond(A, B, C, D, __func__, __FILE__, __LINE__)
 
 static inline const char *mdl_enter_cond(THD *thd,
                                          st_my_thread_var *mysys_var,
+                                         pthread_cond_t *cond,
+                                         pthread_mutex_t *mutex,
                                          const char *calling_func,
                                          const char *calling_file,
                                          const unsigned int calling_line)
 {
-  safe_mutex_assert_owner(&LOCK_mdl);
+  safe_mutex_assert_owner(mutex);
 
-  mysys_var->current_mutex= &LOCK_mdl;
-  mysys_var->current_cond= &COND_mdl;
+  mysys_var->current_mutex= mutex;
+  mysys_var->current_cond= cond;
 
   DEBUG_SYNC(thd, "mdl_enter_cond");
 
@@ -373,18 +402,20 @@ static inline const char *mdl_enter_cond
                            calling_func, calling_file, calling_line);
 }
 
-#define MDL_EXIT_COND(A, B, C) mdl_exit_cond(A, B, C, __func__, __FILE__, __LINE__)
+#define MDL_EXIT_COND(A, B, C, D) \
+        mdl_exit_cond(A, B, C, D, __func__, __FILE__, __LINE__)
 
 static inline void mdl_exit_cond(THD *thd,
                                  st_my_thread_var *mysys_var,
+                                 pthread_mutex_t *mutex,
                                  const char* old_msg,
                                  const char *calling_func,
                                  const char *calling_file,
                                  const unsigned int calling_line)
 {
-  DBUG_ASSERT(&LOCK_mdl == mysys_var->current_mutex);
+  DBUG_ASSERT(mutex == mysys_var->current_mutex);
 
-  pthread_mutex_unlock(&LOCK_mdl);
+  pthread_mutex_unlock(mutex);
   pthread_mutex_lock(&mysys_var->mutex);
   mysys_var->current_mutex= 0;
   mysys_var->current_cond= 0;
@@ -446,7 +477,7 @@ MDL_global_lock::is_lock_type_compatible
     return TRUE;
     break;
   case MDL_SHARED_UPGRADABLE:
-    if (active_shared || waiting_shared)
+    if (active_shared || ! waiting_shared.is_empty())
     {
       /*
         We are going to obtain intention exclusive global lock and
@@ -471,7 +502,7 @@ MDL_global_lock::is_lock_type_compatible
     }
     else
     {
-      if (active_shared || waiting_shared)
+      if (active_shared || ! waiting_shared.is_empty())
       {
         /*
           We are going to obtain intention exclusive global lock and
@@ -535,7 +566,7 @@ MDL_lock::can_grant_lock(const MDL_conte
     if (type == MDL_lock::MDL_LOCK_SHARED)
     {
       /* Pending exclusive locks have higher priority over shared locks. */
-      if (waiting.is_empty() || type_arg == MDL_SHARED_HIGH_PRIO)
+      if (waiting_exclusive.is_empty() || type_arg == MDL_SHARED_HIGH_PRIO)
         can_grant= TRUE;
     }
     else if (granted.front()->get_ctx() == requestor_ctx)
@@ -626,6 +657,180 @@ MDL_context::find_ticket(MDL_request *md
 
 
 /**
+  Try to acquire global intention exclusive lock.
+
+  @param[in]  mdl_request  Lock request object for lock to be acquired
+  @param[out] acquired     FALSE - if lock was not acquired due to conflict.
+                           TRUE  - if lock was successfully acquired.
+
+  @retval  FALSE   Success. The lock may have not been acquired.
+                   One needs to check value of 'acquired' out-parameter
+                   to find out what has happened.
+  @retval  TRUE    Error.
+*/
+
+bool
+MDL_context::
+try_acquire_global_intention_exclusive_lock(MDL_request *mdl_request,
+                                            bool *acquired)
+{
+  DBUG_ASSERT(mdl_request->type == MDL_SHARED_UPGRADABLE ||
+              mdl_request->type == MDL_EXCLUSIVE);
+
+  *acquired= FALSE;
+
+  if (m_has_global_shared_lock)
+  {
+    my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
+    return TRUE;
+  }
+
+  if (! m_global_intention_exclusive_locks)
+  {
+    pthread_mutex_lock(&LOCK_mdl_global);
+    if (!global_lock.is_lock_type_compatible(mdl_request->type, FALSE))
+    {
+      pthread_mutex_unlock(&LOCK_mdl_global);
+      return FALSE;
+    }
+    global_lock.active_intention_exclusive++;
+    pthread_mutex_unlock(&LOCK_mdl_global);
+  }
+
+  m_global_intention_exclusive_locks++;
+  *acquired= TRUE;
+
+  return FALSE;
+}
+
+
+/**
+  Acquire global intention exclusive lock.
+
+  @param[in]  mdl_request  Lock request object for lock to be acquired
+
+  @retval  FALSE   Success. The lock has been acquired.
+  @retval  TRUE    Error.
+*/
+
+bool
+MDL_context::acquire_global_intention_exclusive_lock(MDL_request *mdl_request)
+{
+  const char *old_msg;
+  st_my_thread_var *mysys_var= my_thread_var;
+
+  DBUG_ASSERT(mdl_request->type == MDL_EXCLUSIVE);
+
+  if (m_has_global_shared_lock)
+  {
+    my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
+    return TRUE;
+  }
+
+  /*
+    Grant global intention exclusive lock without waiting if this context
+    already has global intention exclusive lock (or more precisely mark
+    in the context that another instance of such lock was acquired).
+
+    The fact that we don't wait in such situation allows to avoid deadlocks
+    in cases when pending request for global shared lock pops up after the
+    moment when thread has acquired its first intention exclusive lock but
+    before it has requested the second instance of such lock.
+  */
+  if (m_global_intention_exclusive_locks)
+  {
+    m_global_intention_exclusive_locks++;
+    return FALSE;
+  }
+
+  /*
+    Otherwise we might have to wait until active global shared lock or
+    pending requests will go away. Since we won't hold any resources
+    while doing it deadlocks are not possible,
+
+    QQ: is this assumption still true after HANDLER patch?
+  */
+  DBUG_ASSERT(! has_locks() ||
+              (m_lt_or_ha_sentinel &&
+               m_tickets.front() == m_lt_or_ha_sentinel));
+
+  pthread_mutex_lock(&LOCK_mdl_global);
+
+  old_msg= MDL_ENTER_COND(m_thd, mysys_var, &m_cond, &LOCK_mdl_global);
+
+  if (unlikely(! global_lock.is_lock_type_compatible(mdl_request->type, FALSE)))
+  {
+    MDL_ticket *pending_ticket;
+
+    /*
+      Create a temporary ticket and add it to waiters list, to allow
+      threads releasing shared global lock wake-up this thread.
+    */
+    if (! (pending_ticket= MDL_ticket::create(this, mdl_request->type)))
+    {
+      pthread_mutex_unlock(&LOCK_mdl_global);
+      return TRUE;
+    }
+    global_lock.waiting_intention_exclusive.push_front(pending_ticket);
+
+    do
+    {
+      pthread_cond_wait(&this->m_cond, &LOCK_mdl_global);
+    }
+    while (! global_lock.is_lock_type_compatible(mdl_request->type, FALSE) &&
+           ! mysys_var->abort);
+
+    global_lock.waiting_intention_exclusive.remove(pending_ticket);
+    MDL_ticket::destroy(pending_ticket);
+
+    if (mysys_var->abort)
+    {
+      MDL_EXIT_COND(m_thd, mysys_var, &LOCK_mdl_global, old_msg);
+      return TRUE;
+    }
+  }
+
+  global_lock.active_intention_exclusive++;
+
+  MDL_EXIT_COND(m_thd, mysys_var, &LOCK_mdl_global, old_msg);
+
+  m_global_intention_exclusive_locks++;
+
+  return FALSE;
+}
+
+
+/**
+  Release global intention exclusive lock.
+*/
+
+void MDL_context::release_global_intention_exclusive_lock()
+{
+  m_global_intention_exclusive_locks--;
+
+  if (! m_global_intention_exclusive_locks)
+  {
+    pthread_mutex_lock(&LOCK_mdl_global);
+    global_lock.active_intention_exclusive--;
+
+    /*
+      Wake up waiters if this was the last of global intention exclusive
+      locks and there are pending shared locks.
+    */
+    if (unlikely(global_lock.active_intention_exclusive == 0 &&
+                 ! global_lock.waiting_shared.is_empty()))
+    {
+      MDL_lock::Ticket_iterator it(global_lock.waiting_shared);
+      MDL_ticket *ticket;
+      while ((ticket= it++))
+        ticket->get_ctx()->wake_up();
+    }
+    pthread_mutex_unlock(&LOCK_mdl_global);
+  }
+}
+
+
+/**
   Try to acquire one shared lock.
 
   Unlike exclusive locks, shared locks are acquired one by
@@ -660,13 +865,6 @@ MDL_context::try_acquire_shared_lock(MDL
   mdl_request->ticket= NULL;
   safe_mutex_assert_not_owner(&LOCK_open);
 
-  if (m_has_global_shared_lock &&
-      mdl_request->type == MDL_SHARED_UPGRADABLE)
-  {
-    my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
-    return TRUE;
-  }
-
   /*
     Check whether the context already holds a shared lock on the object,
     and if so, grant the request.
@@ -674,7 +872,6 @@ MDL_context::try_acquire_shared_lock(MDL
   if ((ticket= find_ticket(mdl_request, &is_lt_or_ha)))
   {
     DBUG_ASSERT(ticket->m_state == MDL_ACQUIRED);
-    /* Only shared locks can be recursive. */
     DBUG_ASSERT(ticket->is_shared());
     /*
       If the request is for a transactional lock, and we found
@@ -703,20 +900,24 @@ MDL_context::try_acquire_shared_lock(MDL
     return FALSE;
   }
 
-  pthread_mutex_lock(&LOCK_mdl);
-
-  if (!global_lock.is_lock_type_compatible(mdl_request->type, FALSE))
+  if (mdl_request->type == MDL_SHARED_UPGRADABLE)
   {
-    pthread_mutex_unlock(&LOCK_mdl);
-    return FALSE;
+    bool acquired;
+
+    if (try_acquire_global_intention_exclusive_lock(mdl_request, &acquired))
+      return TRUE;
+    if (! acquired)
+      return FALSE;
   }
 
   if (!(ticket= MDL_ticket::create(this, mdl_request->type)))
   {
-    pthread_mutex_unlock(&LOCK_mdl);
+    if (mdl_request->type == MDL_SHARED_UPGRADABLE)
+      release_global_intention_exclusive_lock();
     return TRUE;
   }
 
+  pthread_mutex_lock(&LOCK_mdl_hash);
   if (!(lock= (MDL_lock*) my_hash_search(&mdl_locks,
                                          key->ptr(), key->length())))
   {
@@ -725,12 +926,18 @@ MDL_context::try_acquire_shared_lock(MDL
     if (!lock || my_hash_insert(&mdl_locks, (uchar*)lock))
     {
       MDL_lock::destroy(lock);
+      pthread_mutex_unlock(&LOCK_mdl_hash);
       MDL_ticket::destroy(ticket);
-      pthread_mutex_unlock(&LOCK_mdl);
+      if (mdl_request->type == MDL_SHARED_UPGRADABLE)
+        release_global_intention_exclusive_lock();
       return TRUE;
     }
   }
 
+  pthread_mutex_lock(&lock->lock);
+
+  pthread_mutex_unlock(&LOCK_mdl_hash);
+
   if (lock->can_grant_lock(this, mdl_request->type, FALSE))
   {
     mdl_request->ticket= ticket;
@@ -738,16 +945,17 @@ MDL_context::try_acquire_shared_lock(MDL
     m_tickets.push_front(ticket);
     ticket->m_state= MDL_ACQUIRED;
     ticket->m_lock= lock;
-    if (mdl_request->type == MDL_SHARED_UPGRADABLE)
-      global_lock.active_intention_exclusive++;
+    pthread_mutex_unlock(&lock->lock);
   }
   else
   {
     /* We can't get here if we allocated a new lock. */
     DBUG_ASSERT(! lock->is_empty());
+    pthread_mutex_unlock(&lock->lock);
     MDL_ticket::destroy(ticket);
+    if (mdl_request->type == MDL_SHARED_UPGRADABLE)
+      release_global_intention_exclusive_lock();
   }
-  pthread_mutex_unlock(&LOCK_mdl);
 
   return FALSE;
 }
@@ -782,11 +990,11 @@ MDL_context::clone_ticket(MDL_request *m
   ticket->m_lock= mdl_request->ticket->m_lock;
   mdl_request->ticket= ticket;
 
-  pthread_mutex_lock(&LOCK_mdl);
+  pthread_mutex_lock(&LOCK_mdl_global);
   ticket->m_lock->granted.push_front(ticket);
   if (mdl_request->type == MDL_SHARED_UPGRADABLE)
     global_lock.active_intention_exclusive++;
-  pthread_mutex_unlock(&LOCK_mdl);
+  pthread_mutex_unlock(&LOCK_mdl_global);
 
   m_tickets.push_front(ticket);
 
@@ -799,154 +1007,108 @@ MDL_context::clone_ticket(MDL_request *m
 
   @param thd               Current thread context
   @param conflicting_ticket  Conflicting metadata lock
-
-  @retval TRUE   A thread was woken up
-  @retval FALSE  Lock is not a shared one or no thread was woken up
 */
 
-bool notify_shared_lock(THD *thd, MDL_ticket *conflicting_ticket)
+void notify_shared_lock(THD *thd, MDL_ticket *conflicting_ticket)
 {
-  bool woke= FALSE;
   if (conflicting_ticket->is_shared())
   {
     THD *conflicting_thd= conflicting_ticket->get_ctx()->get_thd();
     DBUG_ASSERT(thd != conflicting_thd); /* Self-deadlock */
 
     /*
-      If the thread that holds the conflicting lock is waiting
-      on an MDL lock, wake it up by broadcasting on COND_mdl.
-      Otherwise it must be waiting on a table-level lock
-      or some other non-MDL resource, so delegate its waking up
-      to an external call.
+      If the thread that holds the conflicting lock is waiting in MDL
+      subsystem it has to be woken up by calling MDL_context::wake_up().
     */
-    if (conflicting_ticket->get_ctx()->is_waiting_in_mdl())
-    {
-      pthread_cond_broadcast(&COND_mdl);
-      woke= TRUE;
-    }
-    else
-      woke= mysql_notify_thread_having_shared_lock(thd, conflicting_thd);
+    conflicting_ticket->get_ctx()->wake_up();
+    /*
+      If it is waiting on table-level lock or some other non-MDL resource
+      we delegate its waking up to code outside of MDL.
+    */
+    mysql_notify_thread_having_shared_lock(thd, conflicting_thd);
   }
-  return woke;
 }
 
 
 /**
-  Acquire a single exclusive lock. A convenience
-  wrapper around the method acquiring a list of locks.
-*/
+  Auxiliary method for acquiring an exclusive lock.
 
-bool MDL_context::acquire_exclusive_lock(MDL_request *mdl_request)
-{
-  MDL_request_list mdl_requests;
-  mdl_requests.push_front(mdl_request);
-  return acquire_exclusive_locks(&mdl_requests);
-}
-
-
-/**
-  Acquire exclusive locks. The context must contain the list of
-  locks to be acquired. There must be no granted locks in the
-  context.
-
-  This is a replacement of lock_table_names(). It is used in
-  RENAME, DROP and other DDL SQL statements.
+  @param mdl_request  Request for the lock to be acqured.
 
-  @note The MDL context may not have non-exclusive lock requests
-        or acquired locks.
+  @note Should not be used outside of MDL subsystem. Instead one should
+        call acquire_exclusive_lock() or acquire_exclusive_locks() methods
+        which ensure that conditions for deadlock-free lock acquisition are
+        fulfilled.
 
   @retval FALSE  Success
   @retval TRUE   Failure
 */
 
-bool MDL_context::acquire_exclusive_locks(MDL_request_list *mdl_requests)
+bool MDL_context::acquire_exclusive_lock_impl(MDL_request *mdl_request)
 {
   MDL_lock *lock;
-  bool signalled= FALSE;
   const char *old_msg;
-  MDL_request *mdl_request;
   MDL_ticket *ticket;
+  bool not_used;
   st_my_thread_var *mysys_var= my_thread_var;
-  MDL_request_list::Iterator it(*mdl_requests);
+  MDL_key *key= &mdl_request->key;
+
+  DBUG_ASSERT(mdl_request->type == MDL_EXCLUSIVE &&
+              mdl_request->ticket == NULL);
 
   safe_mutex_assert_not_owner(&LOCK_open);
-  /* Exclusive locks must always be acquired first, all at once. */
-  DBUG_ASSERT(! has_locks() ||
-              (m_lt_or_ha_sentinel &&
-               m_tickets.front() == m_lt_or_ha_sentinel));
 
-  if (m_has_global_shared_lock)
+  /* Don't take chances in production. */
+  mdl_request->ticket= NULL;
+
+  /*
+    Check whether the context already holds an exclusive lock on the object,
+    and if so, grant the request.
+  */
+  if ((ticket= find_ticket(mdl_request, &not_used)))
   {
-    my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
-    return TRUE;
+    DBUG_ASSERT(ticket->m_state == MDL_ACQUIRED);
+    DBUG_ASSERT(ticket->m_type == MDL_EXCLUSIVE);
+    mdl_request->ticket= ticket;
+    return FALSE;
   }
 
-  pthread_mutex_lock(&LOCK_mdl);
-
-  old_msg= MDL_ENTER_COND(m_thd, mysys_var);
+  if (acquire_global_intention_exclusive_lock(mdl_request))
+    return TRUE;
 
-  while ((mdl_request= it++))
+  /* Early allocation: ticket will be needed in any case. */
+  if (!(ticket= MDL_ticket::create(this, mdl_request->type)))
   {
-    MDL_key *key= &mdl_request->key;
-    DBUG_ASSERT(mdl_request->type == MDL_EXCLUSIVE &&
-                mdl_request->ticket == NULL);
-
-    /* Don't take chances in production. */
-    mdl_request->ticket= NULL;
-
-    /* Early allocation: ticket is used as a shortcut to the lock. */
-    if (!(ticket= MDL_ticket::create(this, mdl_request->type)))
-      goto err;
-
-    if (!(lock= (MDL_lock*) my_hash_search(&mdl_locks,
-                                           key->ptr(), key->length())))
-    {
-      lock= MDL_lock::create(key);
-      if (!lock || my_hash_insert(&mdl_locks, (uchar*)lock))
-      {
-        MDL_ticket::destroy(ticket);
-        MDL_lock::destroy(lock);
-        goto err;
-      }
-    }
-
-    mdl_request->ticket= ticket;
-    lock->waiting.push_front(ticket);
-    ticket->m_lock= lock;
+    release_global_intention_exclusive_lock();
+    return TRUE;
   }
 
-  while (1)
+  pthread_mutex_lock(&LOCK_mdl_hash);
+  if (!(lock= (MDL_lock*) my_hash_search(&mdl_locks, key->ptr(),
+                                         key->length())))
   {
-    it.rewind();
-    while ((mdl_request= it++))
+    lock= MDL_lock::create(key);
+    if (!lock || my_hash_insert(&mdl_locks, (uchar*)lock))
     {
-      lock= mdl_request->ticket->m_lock;
-
-      if (!global_lock.is_lock_type_compatible(mdl_request->type, FALSE))
-      {
-        /*
-          Someone owns or wants to acquire the global shared lock so
-          we have to wait until he goes away.
-        */
-        signalled= TRUE;
-        break;
-      }
-      else if (!lock->can_grant_lock(this, mdl_request->type, FALSE))
-      {
-        MDL_ticket *conflicting_ticket;
-        MDL_lock::Ticket_iterator it(lock->granted);
+      MDL_lock::destroy(lock);
+      pthread_mutex_unlock(&LOCK_mdl_hash);
+      MDL_ticket::destroy(ticket);
+      release_global_intention_exclusive_lock();
+      return TRUE;
+    }
+  }
 
-        signalled= (lock->type == MDL_lock::MDL_LOCK_EXCLUSIVE);
+  pthread_mutex_lock(&lock->lock);
+  pthread_mutex_unlock(&LOCK_mdl_hash);
 
-        while ((conflicting_ticket= it++))
-          signalled|= notify_shared_lock(m_thd, conflicting_ticket);
+  mdl_request->ticket= ticket;
+  lock->waiting_exclusive.push_front(ticket);
+  ticket->m_lock= lock;
 
-        break;
-      }
-    }
-    if (!mdl_request)
-      break;
+  old_msg= MDL_ENTER_COND(m_thd, mysys_var, &m_cond, &lock->lock);
 
+  while (!lock->can_grant_lock(this, mdl_request->type, FALSE))
+  {
     if (m_lt_or_ha_sentinel)
     {
       /*
@@ -954,70 +1116,217 @@ bool MDL_context::acquire_exclusive_lock
         HANDLER locks (we can't have any other locks here).
         Waiting with locks may lead to a deadlock.
       */
+      MDL_EXIT_COND(m_thd, mysys_var, &lock->lock, old_msg);
+      pthread_mutex_lock(&LOCK_mdl_hash);
+      pthread_mutex_lock(&lock->lock);
+      /* Get rid of pending ticket. */
+      lock->waiting_exclusive.remove(ticket);
+      /*
+        If there are no active/pending exclusive locks wake up contexts
+        waiting for shared lock.
+      */
+      if (lock->type == MDL_lock::MDL_LOCK_SHARED &&
+          ! lock->waiting_shared.is_empty() &&
+          lock->waiting_exclusive.is_empty())
+      {
+        MDL_lock::Ticket_iterator it(lock->waiting_shared);
+        MDL_ticket *wake_up_ticket;
+        while ((ticket= it++))
+          pthread_cond_signal(&wake_up_ticket->get_ctx()->m_cond);
+        pthread_mutex_unlock(&lock->lock);
+      }
+      else if (lock->is_empty())
+      {
+        my_hash_delete(&mdl_locks, (uchar *)lock);
+        if (lock->cached_object)
+          (*lock->cached_object_release_hook)(lock->cached_object);
+        pthread_mutex_unlock(&lock->lock);
+        MDL_lock::destroy(lock);
+      }
+      else
+        pthread_mutex_unlock(&lock->lock);
+      pthread_mutex_unlock(&LOCK_mdl_hash);
+      MDL_ticket::destroy(ticket);
+      release_global_intention_exclusive_lock();
+      mdl_request->ticket= NULL;
       my_error(ER_LOCK_DEADLOCK, MYF(0));
-      goto err;
+      return TRUE;
     }
 
+    MDL_ticket *conflicting_ticket;
+    MDL_lock::Ticket_iterator it(lock->granted);
+
+    while ((conflicting_ticket= it++))
+      notify_shared_lock(m_thd, conflicting_ticket);
+
     /* There is a shared or exclusive lock on the object. */
     DEBUG_SYNC(m_thd, "mdl_acquire_exclusive_locks_wait");
 
-    if (signalled)
-      pthread_cond_wait(&COND_mdl, &LOCK_mdl);
-    else
+    /*
+      Another thread might have obtained a shared MDL lock on some table
+      but has not yet opened it and/or tried to obtain data lock on it.
+      Also invocation of acquire_exclusive_lock() method and consequently
+      first call to notify_shared_lock() might have happened right after
+      thread holding shared metadata lock in wait_for_locks() method
+      checked that there are no pending conflicting locks but before
+      it has started waiting.
+      In both these cases we need to sleep until these threads will start
+      waiting and try to abort them once again.
+
+      QQ: What is the optimal value for this sleep?
+    */
+    struct timespec abstime;
+    set_timespec(abstime, 1);
+    pthread_cond_timedwait(&m_cond, &lock->lock, &abstime);
+
+    if (mysys_var->abort)
     {
       /*
-        Another thread obtained a shared MDL lock on some table but
-        has not yet opened it and/or tried to obtain data lock on
-        it. In this case we need to wait until this happens and try
-        to abort this thread once again.
+        Since we might have to delete MDL_lock object from the hash we have
+        to acquire LOCK_mdl_hash mutex.
+        To do this we need to temporarily release MDL_lock::lock mutex first.
+      */
+      MDL_EXIT_COND(m_thd, mysys_var, &lock->lock, old_msg);
+      pthread_mutex_lock(&LOCK_mdl_hash);
+      pthread_mutex_lock(&lock->lock);
+      /* Get rid of pending ticket. */
+      lock->waiting_exclusive.remove(ticket);
+      /*
+        If there are no active/pending exclusive locks wake up contexts
+        waiting for shared lock.
       */
-      struct timespec abstime;
-      set_timespec(abstime, 1);
-      pthread_cond_timedwait(&COND_mdl, &LOCK_mdl, &abstime);
+      if (lock->type == MDL_lock::MDL_LOCK_SHARED &&
+          ! lock->waiting_shared.is_empty() &&
+          lock->waiting_exclusive.is_empty())
+      {
+        MDL_lock::Ticket_iterator it(lock->waiting_shared);
+        MDL_ticket *wake_up_ticket;
+        while ((ticket= it++))
+          pthread_cond_signal(&wake_up_ticket->get_ctx()->m_cond);
+        pthread_mutex_unlock(&lock->lock);
+      }
+      else if (lock->is_empty())
+      {
+        my_hash_delete(&mdl_locks, (uchar *)lock);
+        if (lock->cached_object)
+          (*lock->cached_object_release_hook)(lock->cached_object);
+        pthread_mutex_unlock(&lock->lock);
+        MDL_lock::destroy(lock);
+      }
+      else
+        pthread_mutex_unlock(&lock->lock);
+      pthread_mutex_unlock(&LOCK_mdl_hash);
+      MDL_ticket::destroy(ticket);
+      release_global_intention_exclusive_lock();
+      mdl_request->ticket= NULL;
+      return TRUE;
     }
-    if (mysys_var->abort)
-      goto err;
   }
-  it.rewind();
-  while ((mdl_request= it++))
+
+  lock->type= MDL_lock::MDL_LOCK_EXCLUSIVE;
+
+  lock->waiting_exclusive.remove(ticket);
+  lock->granted.push_front(ticket);
+  m_tickets.push_front(ticket);
+  ticket->m_state= MDL_ACQUIRED;
+
+  if (lock->cached_object)
+    (*lock->cached_object_release_hook)(lock->cached_object);
+  lock->cached_object= NULL;
+
+  MDL_EXIT_COND(m_thd, mysys_var, &lock->lock, old_msg);
+
+  return FALSE;
+}
+
+
+/**
+  Acquire an exclusive lock.
+
+  @param mdl_request  Request for the lock to be acqured.
+
+  @retval FALSE  Success
+  @retval TRUE   Failure
+*/
+
+bool MDL_context::acquire_exclusive_lock(MDL_request *mdl_request)
+{
+  /* Exclusive locks must always be acquired first, all at once. */
+  DBUG_ASSERT(! has_locks() ||
+              (m_lt_or_ha_sentinel &&
+               m_tickets.front() == m_lt_or_ha_sentinel));
+
+  return acquire_exclusive_lock_impl(mdl_request);
+}
+
+
+extern "C" int mdl_request_ptr_cmp(const void* ptr1, const void* ptr2)
+{
+  MDL_request *req1= *(MDL_request**)ptr1;
+  MDL_request *req2= *(MDL_request**)ptr2;
+  return req1->key.cmp(&req2->key);
+}
+
+
+/**
+  Acquire exclusive locks. There must be no granted locks in the
+  context.
+
+  This is a replacement of lock_table_names(). It is used in
+  RENAME, DROP and other DDL SQL statements.
+
+  @param  mdl_requests  List of requests for locks to be acquired.
+
+  @note The list of requests should not contain non-exclusive lock requests.
+        There should not be any acquired locks in the context.
+
+  @retval FALSE  Success
+  @retval TRUE   Failure
+*/
+
+bool MDL_context::acquire_exclusive_locks(MDL_request_list *mdl_requests)
+{
+  MDL_request_list::Iterator it(*mdl_requests);
+  MDL_request **sort_buf;
+  uint i;
+
+  /* Exclusive locks must always be acquired first, all at once. */
+  DBUG_ASSERT(! has_locks() ||
+              (m_lt_or_ha_sentinel &&
+               m_tickets.front() == m_lt_or_ha_sentinel));
+
+  if (mdl_requests->is_empty())
+    return FALSE;
+
+  /* Sort requests according to MDL_key. */
+  if (! (sort_buf= (MDL_request **)my_malloc(mdl_requests->elements() *
+                                             sizeof(MDL_request *),
+                                             MYF(MY_WME))))
+    return TRUE;
+
+  for (i= 0; i < mdl_requests->elements(); i++)
+    sort_buf[i]= it++;
+
+  my_qsort(sort_buf, mdl_requests->elements(), sizeof(MDL_request*),
+           mdl_request_ptr_cmp);
+
+  for (i= 0; i < mdl_requests->elements(); i++)
   {
-    global_lock.active_intention_exclusive++;
-    ticket= mdl_request->ticket;
-    lock= ticket->m_lock;
-    lock->type= MDL_lock::MDL_LOCK_EXCLUSIVE;
-    lock->waiting.remove(ticket);
-    lock->granted.push_front(ticket);
-    m_tickets.push_front(ticket);
-    ticket->m_state= MDL_ACQUIRED;
-    if (lock->cached_object)
-      (*lock->cached_object_release_hook)(lock->cached_object);
-    lock->cached_object= NULL;
+    if (acquire_exclusive_lock_impl(sort_buf[i]))
+      goto err;
   }
-  /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-  MDL_EXIT_COND(m_thd, mysys_var, old_msg);
+  my_free(sort_buf, MYF(0));
   return FALSE;
 
 err:
-  /* Remove our pending tickets from the locks. */
-  it.rewind();
-  while ((mdl_request= it++) && mdl_request->ticket)
-  {
-    ticket= mdl_request->ticket;
-    DBUG_ASSERT(ticket->m_state == MDL_PENDING);
-    lock= ticket->m_lock;
-    lock->waiting.remove(ticket);
-    MDL_ticket::destroy(ticket);
+  /* Release locks we have managed to acquire so far. */
+  for (i= 0; i < mdl_requests->elements() && sort_buf[i]->ticket; i++)
+  {
+    release_lock(sort_buf[i]->ticket);
     /* Reset lock request back to its initial state. */
-    mdl_request->ticket= NULL;
-    if (lock->is_empty())
-    {
-      my_hash_delete(&mdl_locks, (uchar *)lock);
-      MDL_lock::destroy(lock);
-    }
+    sort_buf[i]->ticket= NULL;
   }
-  /* May be some pending requests for shared locks can be satisfied now. */
-  pthread_cond_broadcast(&COND_mdl);
-  MDL_EXIT_COND(m_thd, mysys_var, old_msg);
+  my_free(sort_buf, MYF(0));
   return TRUE;
 }
 
@@ -1062,6 +1371,13 @@ MDL_ticket::upgrade_shared_lock_to_exclu
   DBUG_ASSERT(m_type == MDL_SHARED_UPGRADABLE);
 
   /*
+    Since we should have already acquired an intention exclusive
+    global lock this call is only enforcing asserts.
+  */
+  DBUG_ASSERT(m_ctx->is_global_intention_exclusive_lock_owner());
+
+  pthread_mutex_lock(&m_lock->lock);
+  /*
     Create an auxiliary ticket to represent a pending exclusive
     lock and add it to the 'waiting' queue for the duration
     of upgrade. During upgrade we abort waits of connections
@@ -1070,26 +1386,24 @@ MDL_ticket::upgrade_shared_lock_to_exclu
     must back off, rather than fall into sleep again.
   */
   if (! (pending_ticket= MDL_ticket::create(m_ctx, MDL_EXCLUSIVE)))
+  {
+    pthread_mutex_unlock(&m_lock->lock);
     DBUG_RETURN(TRUE);
-
-  pthread_mutex_lock(&LOCK_mdl);
+  }
 
   pending_ticket->m_lock= m_lock;
-  m_lock->waiting.push_front(pending_ticket);
+  m_lock->waiting_exclusive.push_front(pending_ticket);
 
-  old_msg= MDL_ENTER_COND(thd, mysys_var);
-
-  /*
-    Since we should have already acquired an intention exclusive
-    global lock this call is only enforcing asserts.
-  */
-  DBUG_ASSERT(global_lock.is_lock_type_compatible(MDL_EXCLUSIVE, TRUE));
+  old_msg= MDL_ENTER_COND(thd, mysys_var, &m_ctx->m_cond, &m_lock->lock);
 
   while (1)
   {
     if (m_lock->can_grant_lock(m_ctx, MDL_EXCLUSIVE, TRUE))
       break;
 
+    MDL_ticket *conflicting_ticket;
+    MDL_lock::Ticket_iterator it(m_lock->granted);
+
     /*
       If m_ctx->lt_or_ha_sentinel(), and this sentinel is for HANDLER,
       we can deadlock. However, HANDLER is not allowed under
@@ -1113,12 +1427,7 @@ MDL_ticket::upgrade_shared_lock_to_exclu
 
       (*) There is no requirement to upgrade lock in
       CREATE/DROP TRIGGER, it's used there just for convenience.
-    */
-    bool signalled= FALSE;
-    MDL_ticket *conflicting_ticket;
-    MDL_lock::Ticket_iterator it(m_lock->granted);
 
-    /*
       A temporary work-around to avoid deadlocks/livelocks in
       a situation when in one connection ALTER TABLE tries to
       upgrade its metadata lock and in another connection
@@ -1145,35 +1454,44 @@ MDL_ticket::upgrade_shared_lock_to_exclu
     while ((conflicting_ticket= it++))
     {
       if (conflicting_ticket->m_ctx != m_ctx)
-        signalled|= notify_shared_lock(thd, conflicting_ticket);
+        notify_shared_lock(thd, conflicting_ticket);
     }
 
     /* There is a shared or exclusive lock on the object. */
     DEBUG_SYNC(thd, "mdl_upgrade_shared_lock_to_exclusive_wait");
 
-    if (signalled)
-      pthread_cond_wait(&COND_mdl, &LOCK_mdl);
-    else
-    {
-      /*
-        Another thread obtained a shared MDL lock on some table but
-        has not yet opened it and/or tried to obtain data lock on
-        it. In this case we need to wait until this happens and try
-        to abort this thread once again.
-      */
-      struct timespec abstime;
-      set_timespec(abstime, 1);
-      DBUG_PRINT("info", ("Failed to wake-up from table-level lock ... sleeping"));
-      pthread_cond_timedwait(&COND_mdl, &LOCK_mdl, &abstime);
-    }
+    /*
+      Another thread might have obtained a shared MDL lock on some table
+      but has not yet opened it and/or tried to obtain data lock on it.
+      Also invocation of acquire_exclusive_lock() method and consequently
+      first call to notify_shared_lock() might have happened right after
+      thread holding shared metadata lock in wait_for_locks() method
+      checked that there are no pending conflicting locks but before
+      it has started waiting.
+      In both these cases we need to sleep until these threads will start
+      waiting and try to abort them once again.
+    */
+    struct timespec abstime;
+    set_timespec(abstime, 1);
+    pthread_cond_timedwait(&m_ctx->m_cond, &m_lock->lock, &abstime);
+
     if (mysys_var->abort)
     {
-      /* Remove and destroy the auxiliary pending ticket. */
-      m_lock->waiting.remove(pending_ticket);
+      m_lock->waiting_exclusive.remove(pending_ticket);
       MDL_ticket::destroy(pending_ticket);
-      /* Pending requests for shared locks can be satisfied now. */
-      pthread_cond_broadcast(&COND_mdl);
-      MDL_EXIT_COND(thd, mysys_var, old_msg);
+      /*
+        If there are no other pending requests for exclusive locks
+        wake up threads waiting for chance to acquire shared lock.
+      */
+      if (! m_lock->waiting_shared.is_empty() &&
+          m_lock->waiting_exclusive.is_empty())
+      {
+        MDL_lock::Ticket_iterator it(m_lock->waiting_shared);
+        MDL_ticket *wake_up_ticket;
+        while ((wake_up_ticket= it++))
+          wake_up_ticket->get_ctx()->wake_up();
+      }
+      MDL_EXIT_COND(thd, mysys_var, &m_lock->lock, old_msg);
       DBUG_RETURN(TRUE);
     }
   }
@@ -1183,15 +1501,14 @@ MDL_ticket::upgrade_shared_lock_to_exclu
   m_type= MDL_EXCLUSIVE;
 
   /* Remove and destroy the auxiliary pending ticket. */
-  m_lock->waiting.remove(pending_ticket);
+  m_lock->waiting_exclusive.remove(pending_ticket);
   MDL_ticket::destroy(pending_ticket);
 
   if (m_lock->cached_object)
     (*m_lock->cached_object_release_hook)(m_lock->cached_object);
   m_lock->cached_object= 0;
 
-  /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-  MDL_EXIT_COND(thd, mysys_var, old_msg);
+  MDL_EXIT_COND(thd, mysys_var, &m_lock->lock, old_msg);
   DBUG_RETURN(FALSE);
 }
 
@@ -1225,6 +1542,7 @@ MDL_context::try_acquire_exclusive_lock(
   MDL_lock *lock;
   MDL_ticket *ticket;
   MDL_key *key= &mdl_request->key;
+  bool acquired;
 
   DBUG_ASSERT(mdl_request->type == MDL_EXCLUSIVE &&
               mdl_request->ticket == NULL);
@@ -1233,7 +1551,19 @@ MDL_context::try_acquire_exclusive_lock(
 
   mdl_request->ticket= NULL;
 
-  pthread_mutex_lock(&LOCK_mdl);
+  if (try_acquire_global_intention_exclusive_lock(mdl_request, &acquired))
+    return TRUE;
+  /*
+    Since in MySQL this method is called only in cases when context
+    already has global intention exclusive lock the above call should
+    always succeed to acquire another instance of such lock.
+    But we prefer to play safe and handle failure to acquire global
+    lock as well.
+  */
+  if (! acquired)
+    return FALSE;
+
+  pthread_mutex_lock(&LOCK_mdl_hash);
 
   if (!(lock= (MDL_lock*) my_hash_search(&mdl_locks,
                                          key->ptr(), key->length())))
@@ -1244,18 +1574,24 @@ MDL_context::try_acquire_exclusive_lock(
     {
       MDL_ticket::destroy(ticket);
       MDL_lock::destroy(lock);
-      pthread_mutex_unlock(&LOCK_mdl);
+      pthread_mutex_unlock(&LOCK_mdl_hash);
       return TRUE;
     }
+    pthread_mutex_lock(&lock->lock);
+    pthread_mutex_unlock(&LOCK_mdl_hash);
     mdl_request->ticket= ticket;
     lock->type= MDL_lock::MDL_LOCK_EXCLUSIVE;
     lock->granted.push_front(ticket);
     m_tickets.push_front(ticket);
     ticket->m_state= MDL_ACQUIRED;
     ticket->m_lock= lock;
-    global_lock.active_intention_exclusive++;
+    pthread_mutex_unlock(&lock->lock);
+  }
+  else
+  {
+    pthread_mutex_unlock(&LOCK_mdl_hash);
+    release_global_intention_exclusive_lock();
   }
-  pthread_mutex_unlock(&LOCK_mdl);
   return FALSE;
 }
 
@@ -1274,44 +1610,69 @@ bool MDL_context::acquire_global_shared_
 {
   st_my_thread_var *mysys_var= my_thread_var;
   const char *old_msg;
+  MDL_ticket *pending_ticket;
 
   safe_mutex_assert_not_owner(&LOCK_open);
   DBUG_ASSERT(!m_has_global_shared_lock);
 
-  pthread_mutex_lock(&LOCK_mdl);
+  if (! (pending_ticket= MDL_ticket::create(this, MDL_SHARED)))
+    return TRUE;
+
+  pthread_mutex_lock(&LOCK_mdl_global);
+  /*
+    Add temporary ticket to the list of waiters so we can be properly
+    woken-up once all active intention exclusive locks go away.
+  */
+  global_lock.waiting_shared.push_front(pending_ticket);
 
-  global_lock.waiting_shared++;
-  old_msg= MDL_ENTER_COND(m_thd, mysys_var);
+  old_msg= MDL_ENTER_COND(m_thd, mysys_var, &m_cond, &LOCK_mdl_global);
 
-  while (!mysys_var->abort && global_lock.active_intention_exclusive)
-    pthread_cond_wait(&COND_mdl, &LOCK_mdl);
+  while (global_lock.active_intention_exclusive && ! mysys_var->abort)
+    pthread_cond_wait(&m_cond, &LOCK_mdl_global);
 
-  global_lock.waiting_shared--;
   if (mysys_var->abort)
   {
-    /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-    MDL_EXIT_COND(m_thd, mysys_var, old_msg);
+    global_lock.waiting_shared.remove(pending_ticket);
+    /*
+      If this was the last request for global shared lock and there is
+      no active shared lock we need to wake up all waiters.
+    */
+    if (global_lock.active_shared == 0 &&
+        global_lock.waiting_shared.is_empty() &&
+        ! global_lock.waiting_intention_exclusive.is_empty())
+    {
+      MDL_lock::Ticket_iterator it(global_lock.waiting_intention_exclusive);
+      MDL_ticket *wake_up_ticket;
+      while ((wake_up_ticket= it++))
+        pthread_cond_signal(&wake_up_ticket->get_ctx()->m_cond);
+    }
+    MDL_EXIT_COND(m_thd, mysys_var, &LOCK_mdl_global, old_msg);
+    MDL_ticket::destroy(pending_ticket);
     return TRUE;
   }
+
+  global_lock.waiting_shared.remove(pending_ticket);
   global_lock.active_shared++;
   m_has_global_shared_lock= TRUE;
-  /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-  MDL_EXIT_COND(m_thd, mysys_var, old_msg);
+  MDL_EXIT_COND(m_thd, mysys_var, &LOCK_mdl_global, old_msg);
+
+  MDL_ticket::destroy(pending_ticket);
+
   return FALSE;
 }
 
 
 /**
-  Check if there are any pending exclusive locks which conflict
-  with shared locks held by this thread.
-
-  @pre The caller already has acquired LOCK_mdl.
+  Implement a simple deadlock detection heuristic: check if there
+  are any pending exclusive locks which conflict with shared locks
+  held by this thread. In that case waiting can be circular,
+  i.e. lead to a deadlock.
 
   @return TRUE   If there are any pending conflicting locks.
           FALSE  Otherwise.
 */
 
-bool MDL_context::can_wait_lead_to_deadlock_impl() const
+bool MDL_context::can_wait_lead_to_deadlock() const
 {
   Ticket_iterator ticket_it(m_tickets);
   MDL_ticket *ticket;
@@ -1328,7 +1689,7 @@ bool MDL_context::can_wait_lead_to_deadl
     */
     DBUG_ASSERT(! ticket->is_upgradable_or_exclusive());
 
-    if (ticket->has_pending_conflicting_lock_impl())
+    if (ticket->has_pending_conflicting_lock())
       return TRUE;
   }
   return FALSE;
@@ -1336,25 +1697,6 @@ bool MDL_context::can_wait_lead_to_deadl
 
 
 /**
-  Implement a simple deadlock detection heuristic: check if there
-  are any pending exclusive locks which conflict with shared locks
-  held by this thread. In that case waiting can be circular,
-  i.e. lead to a deadlock.
-
-  @return TRUE if there are any conflicting locks, FALSE otherwise.
-*/
-
-bool MDL_context::can_wait_lead_to_deadlock() const
-{
-  bool result;
-  pthread_mutex_lock(&LOCK_mdl);
-  result= can_wait_lead_to_deadlock_impl();
-  pthread_mutex_unlock(&LOCK_mdl);
-  return result;
-}
-
-
-/**
   Wait until there will be no locks that conflict with lock requests
   in the given list.
 
@@ -1391,8 +1733,6 @@ MDL_context::wait_for_locks(MDL_request_
             COND_mdl because of above scenario.
     */
     mysql_ha_flush(m_thd);
-    pthread_mutex_lock(&LOCK_mdl);
-    old_msg= MDL_ENTER_COND(m_thd, mysys_var);
 
     /*
       In cases when we wait while still holding some metadata
@@ -1406,9 +1746,8 @@ MDL_context::wait_for_locks(MDL_request_
       negatives) in situations when conflicts are rare (in our
       case this is true since DDL statements should be rare).
     */
-    if (can_wait_lead_to_deadlock_impl())
+    if (can_wait_lead_to_deadlock())
     {
-      MDL_EXIT_COND(m_thd, mysys_var, old_msg);
       my_error(ER_LOCK_DEADLOCK, MYF(0));
       return TRUE;
     }
@@ -1418,58 +1757,124 @@ MDL_context::wait_for_locks(MDL_request_
     {
       MDL_key *key= &mdl_request->key;
       DBUG_ASSERT(mdl_request->ticket == NULL);
-      if (!global_lock.is_lock_type_compatible(mdl_request->type, FALSE))
+
+      pthread_mutex_lock(&LOCK_mdl_global);
+      if (unlikely (!global_lock.is_lock_type_compatible(mdl_request->type, FALSE)))
+      {
+        MDL_ticket *pending_ticket;
+        if (! (pending_ticket= MDL_ticket::create(this, mdl_request->type)))
+        {
+          pthread_mutex_unlock(&LOCK_mdl_global);
+          return TRUE;
+        }
+        global_lock.waiting_intention_exclusive.push_front(pending_ticket);
+
+        old_msg= MDL_ENTER_COND(m_thd, mysys_var, &m_cond, &LOCK_mdl_global);
+
+        pthread_cond_wait(&m_cond, &LOCK_mdl_global);
+
+        global_lock.waiting_intention_exclusive.remove(pending_ticket);
+        MDL_ticket::destroy(pending_ticket);
+        /*
+          We might have been woken-up to resolve deadlock...
+        */
+        MDL_EXIT_COND(m_thd, mysys_var, &LOCK_mdl_global, old_msg);
         break;
+      }
+      else
+        pthread_mutex_unlock(&LOCK_mdl_global);
+
+
       /*
         To avoid starvation we don't wait if we have a conflict against
         request for MDL_EXCLUSIVE lock.
       */
-      if (mdl_request->is_shared() &&
-          (lock= (MDL_lock*) my_hash_search(&mdl_locks, key->ptr(),
-                                            key->length())) &&
-          !lock->can_grant_lock(this, mdl_request->type, FALSE))
+      if (mdl_request->is_shared())
+      {
+        pthread_mutex_lock(&LOCK_mdl_hash);
+        if (! (lock= (MDL_lock*) my_hash_search(&mdl_locks, key->ptr(),
+                                                key->length())))
+        {
+          pthread_mutex_unlock(&LOCK_mdl_hash);
+          continue;
+        }
+        pthread_mutex_lock(&lock->lock);
+        pthread_mutex_unlock(&LOCK_mdl_hash);
+        if (lock->can_grant_lock(this, mdl_request->type, FALSE))
+        {
+          pthread_mutex_unlock(&lock->lock);
+          continue;
+        }
+        MDL_ticket *pending_ticket;
+        if (! (pending_ticket= MDL_ticket::create(this, mdl_request->type)))
+        {
+          pthread_mutex_unlock(&lock->lock);
+          return TRUE;
+        }
+        lock->waiting_shared.push_front(pending_ticket);
+
+        old_msg= MDL_ENTER_COND(m_thd, mysys_var, &m_cond, &lock->lock);
+
+        pthread_cond_wait(&this->m_cond, &lock->lock);
+
+        MDL_EXIT_COND(m_thd, mysys_var, &lock->lock, old_msg);
+
+        pthread_mutex_lock(&LOCK_mdl_hash);
+        pthread_mutex_lock(&lock->lock);
+        lock->waiting_shared.remove(pending_ticket);
+        if (lock->is_empty())
+        {
+          my_hash_delete(&mdl_locks, (uchar *)lock);
+          if (lock->cached_object)
+            (*lock->cached_object_release_hook)(lock->cached_object);
+          pthread_mutex_unlock(&lock->lock);
+          MDL_lock::destroy(lock);
+        }
+        else
+          pthread_mutex_unlock(&lock->lock);
+        pthread_mutex_unlock(&LOCK_mdl_hash);
+        MDL_ticket::destroy(pending_ticket);
         break;
+      }
     }
     if (!mdl_request)
     {
-      /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-      MDL_EXIT_COND(m_thd, mysys_var, old_msg);
+      /* There are no conflicts for any locks! */
       break;
     }
-    m_is_waiting_in_mdl= TRUE;
-    pthread_cond_wait(&COND_mdl, &LOCK_mdl);
-    m_is_waiting_in_mdl= FALSE;
-    /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-    MDL_EXIT_COND(m_thd, mysys_var, old_msg);
   }
   return mysys_var->abort;
 }
 
 
 /**
-  Auxiliary function which allows to release particular lock
-  ownership of which is represented by a lock ticket object.
+  Release lock.
+
+  @param ticket Ticket for lock to be released.
 */
 
-void MDL_context::release_ticket(MDL_ticket *ticket)
+void MDL_context::release_lock(MDL_ticket *ticket)
 {
   MDL_lock *lock= ticket->m_lock;
-  DBUG_ENTER("release_ticket");
+  DBUG_ENTER("MDL_context::release_lock");
   DBUG_PRINT("enter", ("db=%s name=%s", lock->key.db_name(),
                                         lock->key.name()));
 
-  safe_mutex_assert_owner(&LOCK_mdl);
+  DBUG_ASSERT(this == ticket->m_ctx);
+  safe_mutex_assert_not_owner(&LOCK_open);
 
   if (ticket == m_lt_or_ha_sentinel)
     m_lt_or_ha_sentinel= ++Ticket_list::Iterator(m_tickets, ticket);
 
-  m_tickets.remove(ticket);
+  /*
+    QQ: Can we do anything to minimize time during which LOCK_mdl_hash is held?
+  */
+  pthread_mutex_lock(&LOCK_mdl_hash);
+  pthread_mutex_lock(&lock->lock);
 
   switch (ticket->m_type)
   {
     case MDL_SHARED_UPGRADABLE:
-      global_lock.active_intention_exclusive--;
-      /* Fallthrough. */
     case MDL_SHARED:
     case MDL_SHARED_HIGH_PRIO:
       lock->granted.remove(ticket);
@@ -1477,14 +1882,11 @@ void MDL_context::release_ticket(MDL_tic
     case MDL_EXCLUSIVE:
       lock->type= MDL_lock::MDL_LOCK_SHARED;
       lock->granted.remove(ticket);
-      global_lock.active_intention_exclusive--;
       break;
     default:
       DBUG_ASSERT(0);
   }
 
-  MDL_ticket::destroy(ticket);
-
   if (lock->is_empty())
   {
     my_hash_delete(&mdl_locks, (uchar *)lock);
@@ -1492,8 +1894,37 @@ void MDL_context::release_ticket(MDL_tic
                         lock->cached_object));
     if (lock->cached_object)
       (*lock->cached_object_release_hook)(lock->cached_object);
+    pthread_mutex_unlock(&lock->lock);
     MDL_lock::destroy(lock);
   }
+  else
+  {
+    if (lock->type == MDL_lock::MDL_LOCK_SHARED)
+    {
+      MDL_lock::Ticket_iterator it(lock->waiting_shared);
+      MDL_lock::Ticket_iterator it2(lock->waiting_exclusive);
+      MDL_ticket *waiting_ticket;
+      /*
+        We wake up threads waiting for shared lock even if there is a
+        pending exclusive lock as some them might be trying to acquire
+        high priority shared lock.
+      */
+      while ((waiting_ticket= it++))
+        waiting_ticket->get_ctx()->wake_up();
+      while ((waiting_ticket= it2++))
+        waiting_ticket->get_ctx()->wake_up();
+    }
+    pthread_mutex_unlock(&lock->lock);
+  }
+
+  pthread_mutex_unlock(&LOCK_mdl_hash);
+
+  if (ticket->m_type == MDL_SHARED_UPGRADABLE ||
+      ticket->m_type == MDL_EXCLUSIVE)
+    release_global_intention_exclusive_lock();
+
+  m_tickets.remove(ticket);
+  MDL_ticket::destroy(ticket);
 
   DBUG_VOID_RETURN;
 }
@@ -1522,44 +1953,20 @@ void MDL_context::release_locks_stored_b
   Ticket_iterator it(m_tickets);
   DBUG_ENTER("MDL_context::release_locks_stored_before");
 
-  safe_mutex_assert_not_owner(&LOCK_open);
-
   if (m_tickets.is_empty())
     DBUG_VOID_RETURN;
 
-  pthread_mutex_lock(&LOCK_mdl);
   while ((ticket= it++) && ticket != sentinel)
   {
     DBUG_PRINT("info", ("found lock to release ticket=%p", ticket));
-    release_ticket(ticket);
+    release_lock(ticket);
   }
-  /* Inefficient but will do for a while */
-  pthread_cond_broadcast(&COND_mdl);
-  pthread_mutex_unlock(&LOCK_mdl);
 
   DBUG_VOID_RETURN;
 }
 
 
 /**
-  Release a lock.
-
-  @param ticket    Lock to be released
-*/
-
-void MDL_context::release_lock(MDL_ticket *ticket)
-{
-  DBUG_ASSERT(this == ticket->m_ctx);
-  safe_mutex_assert_not_owner(&LOCK_open);
-
-  pthread_mutex_lock(&LOCK_mdl);
-  release_ticket(ticket);
-  pthread_cond_broadcast(&COND_mdl);
-  pthread_mutex_unlock(&LOCK_mdl);
-}
-
-
-/**
   Release all locks in the context which correspond to the same name/
   object as this lock request.
 
@@ -1600,11 +2007,19 @@ void MDL_ticket::downgrade_exclusive_loc
   if (is_shared())
     return;
 
-  pthread_mutex_lock(&LOCK_mdl);
+  pthread_mutex_lock(&m_lock->lock);
   m_lock->type= MDL_lock::MDL_LOCK_SHARED;
   m_type= MDL_SHARED_UPGRADABLE;
-  pthread_cond_broadcast(&COND_mdl);
-  pthread_mutex_unlock(&LOCK_mdl);
+
+  if (! m_lock->waiting_shared.is_empty())
+  {
+    MDL_lock::Ticket_iterator it(m_lock->waiting_shared);
+    MDL_ticket *ticket;
+    while ((ticket= it++))
+      ticket->get_ctx()->wake_up();
+  }
+
+  pthread_mutex_unlock(&m_lock->lock);
 }
 
 
@@ -1617,11 +2032,23 @@ void MDL_context::release_global_shared_
   safe_mutex_assert_not_owner(&LOCK_open);
   DBUG_ASSERT(m_has_global_shared_lock);
 
-  pthread_mutex_lock(&LOCK_mdl);
+  pthread_mutex_lock(&LOCK_mdl_global);
   global_lock.active_shared--;
   m_has_global_shared_lock= FALSE;
-  pthread_cond_broadcast(&COND_mdl);
-  pthread_mutex_unlock(&LOCK_mdl);
+
+  /*
+    If we are releasing the last instance of global shared
+    lock we have to wake-up all waiters.
+  */
+  if (global_lock.active_shared == 0 &&
+      ! global_lock.waiting_intention_exclusive.is_empty())
+  {
+    MDL_lock::Ticket_iterator it(global_lock.waiting_intention_exclusive);
+    MDL_ticket *wake_up_ticket;
+    while ((wake_up_ticket= it++))
+      wake_up_ticket->get_ctx()->wake_up();
+  }
+  pthread_mutex_unlock(&LOCK_mdl_global);
 }
 
 
@@ -1687,39 +2114,20 @@ MDL_context::is_lock_owner(MDL_key::enum
   existing shared lock.
 
   @pre The ticket must match an acquired lock.
-  @pre The caller already has acquired LOCK_mdl.
 
   @return TRUE if there is a conflicting lock request, FALSE otherwise.
 */
 
-bool MDL_ticket::has_pending_conflicting_lock_impl() const
-{
-  DBUG_ASSERT(is_shared());
-  safe_mutex_assert_owner(&LOCK_mdl);
-
-  return !m_lock->waiting.is_empty();
-}
-
-
-/**
-  Check if we have any pending exclusive locks which conflict with
-  existing shared lock.
-
-  @pre The ticket must match an acquired lock.
-
-  @return TRUE if there is a pending conflicting lock request,
-          FALSE otherwise.
-*/
-
 bool MDL_ticket::has_pending_conflicting_lock() const
 {
   bool result;
 
   safe_mutex_assert_not_owner(&LOCK_open);
+  DBUG_ASSERT(is_shared());
 
-  pthread_mutex_lock(&LOCK_mdl);
-  result= has_pending_conflicting_lock_impl();
-  pthread_mutex_unlock(&LOCK_mdl);
+  pthread_mutex_lock(&m_lock->lock);
+  result= !m_lock->waiting_exclusive.is_empty();
+  pthread_mutex_unlock(&m_lock->lock);
   return result;
 }
 

=== modified file 'sql/mdl.h'
--- a/sql/mdl.h	2009-12-30 17:53:30 +0000
+++ b/sql/mdl.h	2010-01-12 19:37:18 +0000
@@ -119,6 +119,19 @@ public:
     return (m_length == rhs->m_length &&
             memcmp(m_ptr, rhs->m_ptr, m_length) == 0);
   }
+  int cmp(const MDL_key *rhs) const
+  {
+    int res;
+    if ((res= memcmp(m_ptr, rhs->m_ptr, min(m_length, rhs->m_length))))
+      return res;
+    else if (m_length < rhs->m_length)
+      return -1;
+    else if (m_length > rhs->m_length)
+      return 1;
+    else
+      return 0;
+  }
+
   MDL_key(const MDL_key *rhs)
   {
     mdl_key_init(rhs);
@@ -265,7 +278,7 @@ public:
   void *get_cached_object();
   void set_cached_object(void *cached_object,
                          mdl_cached_object_release_hook release_hook);
-  const MDL_context *get_ctx() const { return m_ctx; }
+  MDL_context *get_ctx() const { return m_ctx; }
   bool is_shared() const { return m_type < MDL_EXCLUSIVE; }
   bool is_upgradable_or_exclusive() const
   {
@@ -300,14 +313,13 @@ private:
 private:
   MDL_ticket(const MDL_ticket &);               /* not implemented */
   MDL_ticket &operator=(const MDL_ticket &);    /* not implemented */
-
-  bool has_pending_conflicting_lock_impl() const;
 };
 
 
 typedef I_P_List<MDL_request, I_P_List_adapter<MDL_request,
                  &MDL_request::next_in_list,
-                 &MDL_request::prev_in_list> >
+                 &MDL_request::prev_in_list>,
+                 I_P_List_Counter>
         MDL_request_list;
 
 /**
@@ -385,16 +397,20 @@ public:
   bool can_wait_lead_to_deadlock() const;
 
   inline THD *get_thd() const { return m_thd; }
-  
-  bool is_waiting_in_mdl() const { return m_is_waiting_in_mdl; }
+
+  void wake_up()
+  {
+    pthread_cond_signal(&m_cond);
+  }
+
 private:
   Ticket_list m_tickets;
   bool m_has_global_shared_lock;
   /**
-    Indicates that the owner of this context is waiting in
-    wait_for_locks() method.
+    Number of instances of global intention exclusive lock which were
+    recursively acquired by this context.
   */
-  bool m_is_waiting_in_mdl;
+  uint m_global_intention_exclusive_locks;
   /**
     This member has two uses:
     1) When entering LOCK TABLES mode, remember the last taken
@@ -406,12 +422,31 @@ private:
   */
   MDL_ticket *m_lt_or_ha_sentinel;
   THD *m_thd;
+  /**
+    Condvar which is used for waiting until this context's pending
+    request can be satisfied or this thread has to perform actions
+    to resolve potential deadlock (we subscribe for such notification
+    by adding ticket corresponding to the request to an appropriate
+    queue of waiters).
+  */
+  pthread_cond_t m_cond;
 private:
-  void release_ticket(MDL_ticket *ticket);
-  bool can_wait_lead_to_deadlock_impl() const;
   MDL_ticket *find_ticket(MDL_request *mdl_req,
                           bool *is_lt_or_ha);
   void release_locks_stored_before(MDL_ticket *sentinel);
+
+  bool acquire_exclusive_lock_impl(MDL_request *mdl_request);
+
+  bool try_acquire_global_intention_exclusive_lock(MDL_request *mdl_request,
+                                                   bool *acquired);
+  bool acquire_global_intention_exclusive_lock(MDL_request *mdl_request);
+  void release_global_intention_exclusive_lock();
+  bool is_global_intention_exclusive_lock_owner()
+  {
+    return m_global_intention_exclusive_locks;
+  }
+
+  friend bool MDL_ticket::upgrade_shared_lock_to_exclusive();
 };
 
 

=== modified file 'sql/sql_plist.h'
--- a/sql/sql_plist.h	2009-12-30 17:53:30 +0000
+++ b/sql/sql_plist.h	2010-01-12 19:37:18 +0000
@@ -18,7 +18,8 @@
 
 #include <my_global.h>
 
-template <typename T, typename B> class I_P_List_iterator;
+template <typename T, typename B, typename C> class I_P_List_iterator;
+class I_P_List_Null_Counter;
 
 
 /**
@@ -47,10 +48,14 @@ template <typename T, typename B> class 
                  return &el->prev;
                }
              };
+   @param C  Policy class specifying how counting of elements in the list
+             should be done. Instance of this class is also used as a place
+             where information about number of list elements is stored.
+             @sa I_P_List_Null_Counter, I_P_List_Counter
 */
 
-template <typename T, typename B>
-class I_P_List
+template <typename T, typename B, typename C = I_P_List_Null_Counter>
+class I_P_List : public C
 {
   T *first;
 
@@ -61,7 +66,7 @@ class I_P_List
   */
 public:
   I_P_List() : first(NULL) { };
-  inline void empty()      { first= NULL; }
+  inline void empty()      { first= NULL; C::reset(); }
   inline bool is_empty() const { return (first == NULL); }
   inline void push_front(T* a)
   {
@@ -70,6 +75,7 @@ public:
       *B::prev_ptr(first)= B::next_ptr(a);
     first= a;
     *B::prev_ptr(a)= &first;
+    C::inc();
   }
   inline void push_back(T *a)
   {
@@ -107,21 +113,23 @@ public:
     if (next)
       *B::prev_ptr(next)= *B::prev_ptr(a);
     **B::prev_ptr(a)= next;
+    C::dec();
   }
   inline T* front() { return first; }
   inline const T *front() const { return first; }
-  void swap(I_P_List<T,B> &rhs)
+  void swap(I_P_List<T, B, C> &rhs)
   {
     swap_variables(T *, first, rhs.first);
     if (first)
       *B::prev_ptr(first)= &first;
     if (rhs.first)
       *B::prev_ptr(rhs.first)= &rhs.first;
+    C::swap(rhs);
   }
 #ifndef _lint
-  friend class I_P_List_iterator<T, B>;
+  friend class I_P_List_iterator<T, B, C>;
 #endif
-  typedef I_P_List_iterator<T, B> Iterator;
+  typedef I_P_List_iterator<T, B, C> Iterator;
 };
 
 
@@ -129,15 +137,15 @@ public:
    Iterator for I_P_List.
 */
 
-template <typename T, typename B>
+template <typename T, typename B, typename C = I_P_List_Null_Counter>
 class I_P_List_iterator
 {
-  const I_P_List<T, B> *list;
+  const I_P_List<T, B, C> *list;
   T *current;
 public:
-  I_P_List_iterator(const I_P_List<T, B> &a) : list(&a), current(a.first) {}
-  I_P_List_iterator(const I_P_List<T, B> &a, T* current_arg) : list(&a), current(current_arg) {}
-  inline void init(I_P_List<T, B> &a)
+  I_P_List_iterator(const I_P_List<T, B, C> &a) : list(&a), current(a.first) {}
+  I_P_List_iterator(const I_P_List<T, B, C> &a, T* current_arg) : list(&a), current(current_arg) {}
+  inline void init(const I_P_List<T, B, C> &a)
   {
     list= &a;
     current= a.first;
@@ -160,4 +168,39 @@ public:
   }
 };
 
+
+/**
+  Element counting policy class for I_P_List to be used in
+  cases when no element counting should be done.
+*/
+
+class I_P_List_Null_Counter
+{
+protected:
+  void reset() {}
+  void inc() {}
+  void dec() {}
+  void swap(I_P_List_Null_Counter &rhs) {}
+};
+
+
+/**
+  Element counting policy class for I_P_List which provides
+  basic element counting.
+*/
+
+class I_P_List_Counter
+{
+  uint m_counter;
+protected:
+  I_P_List_Counter() : m_counter (0) {}
+  void reset() {m_counter= 0;}
+  void inc() {m_counter++;}
+  void dec() {m_counter--;}
+  void swap(I_P_List_Counter &rhs)
+  { swap_variables(uint, m_counter, rhs.m_counter); }
+public:
+  uint elements() const { return m_counter; }
+};
+
 #endif


Attachment: [text/bzr-bundle] bzr/kostja@sun.com-20100112193718-1uqn8emyhf82n6qg.bundle
Thread
bzr commit into mysql-5.6-next-mr branch (kostja:3049) Bug#46272Konstantin Osipov12 Jan