MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:Konstantin Osipov Date:December 3 2009 11:29pm
Subject:bzr commit into mysql-5.6-next-mr branch (kostja:2990) Bug#989 WL#4284
View as plain text  
#At file:///opt/local/work/next-4284/ based on revid:kostja@stripped

 2990 Konstantin Osipov	2009-12-04
      Backport of:
      ------------------------------------------------------------
      revno: 2617.23.18
      committer: Davi Arnaut <Davi.Arnaut@stripped>
      branch nick: 4284-6.0
      timestamp: Mon 2009-03-02 18:18:26 -0300
      message:
      Bug#989: If DROP TABLE while there's an active transaction, wrong binlog order
      WL#4284: Transactional DDL locking
      
      This is a prerequisite patch:
      
      These changes are intended to split lock requests from granted
      locks and to allow the memory and lifetime of granted locks to
      be managed within the MDL subsystem. Furthermore, tickets can
      now be shared and therefore are used to satisfy multiple lock
      requests, but only shared locks can be recursive.
      
      The problem is that the MDL subsystem morphs lock requests into
      granted locks locks but does not manage the memory and lifetime
      of lock requests, and hence, does not manage the memory of
      granted locks either. This can be problematic because it puts the
      burden of tracking references on the users of the subsystem and
      it can't be easily done in transactional contexts where the locks
      have to be kept around for the duration of a transaction.
      
      Another issue is that recursive locks (when the context trying to
      acquire a lock already holds a lock on the same object) requires
      that each time the lock is granted, a unique lock request/granted
      lock structure structure must be kept around until the lock is
      released. This can lead to memory leaks in transactional contexts
      as locks taken during the transaction should only be released at
      the end of the transaction. This also leads to unnecessary wake
      ups (broadcasts) in the MDL subsystem if the context still holds
      a equivalent of the lock being released.
      
      These issues are exacerbated due to the fact that WL#4284 low-level
      design says that the implementation should "2) Store metadata locks
      in transaction memory root, rather than statement memory root" but
      this is not possible because a memory root, as implemented in mysys,
      requires all objects allocated from it to be freed all at once.
      
      This patch combines review input and significant code contributions
      from Konstantin Osipov (kostja) and Dmitri Lenev (dlenev).
     @ mysql-test/r/mdl_sync.result
        Add test case result.
     @ mysql-test/t/mdl_sync.test
        Add test case for shared lock upgrade case.
     @ sql/event_db_repository.cc
        Rename mdl_alloc_lock to mdl_request_alloc.
     @ sql/ha_ndbcluster_binlog.cc
        Use new function names to initialize MDL lock requests.
     @ sql/lock.cc
        Rename MDL functions.
     @ sql/log_event.cc
        The MDL request now holds the table and database name data (MDL_KEY).
     @ sql/mdl.cc
        Move the MDL key to the MDL_LOCK structure in order to make the
        object suitable for allocation from a fixed-size allocator. This
        allows the simplification of the lists in the MDL_LOCK object,
        which now are just two, one for granted tickets and other for
        waiting (upgraders) tickets.
        
        Recursive requests for a shared lock on the same object can now
        be granted using the same lock ticket. This schema is only used
        for shared locks because that the only case that matters. This
        is used to avoid waste of resources in case a context (connection)
        already holds a shared lock on a object.
     @ sql/mdl.h
        Introduce a metadata lock object key which is used  to uniquely
        identify lock objects.
        
        Separate the structure used to represent pending lock requests
        from the structure used to represent granted metadata locks.
        
        Rename functions used to manipulate locks requests in order to
        have a more consistent function naming schema.
     @ sql/sp_head.cc
        Rename mdl_alloc_lock to mdl_request_alloc.
     @ sql/sql_acl.cc
        Rename alloc_mdl_locks to alloc_mdl_requests.
     @ sql/sql_base.cc
        Various changes to accommodate that lock requests are separated
        from lock tickets (granted locks).
     @ sql/sql_class.h
        Last acquired lock before the savepoint was set.
     @ sql/sql_delete.cc
        Various changes to accommodate that lock requests are separated
        from lock tickets (granted locks).
     @ sql/sql_handler.cc
        Various changes to accommodate that lock requests are separated
        from lock tickets (granted locks).
     @ sql/sql_insert.cc
        Rename alloc_mdl_locks to alloc_mdl_requests.
     @ sql/sql_parse.cc
        Rename alloc_mdl_locks to alloc_mdl_requests.
     @ sql/sql_plist.h
        Typedef for iterator type.
     @ sql/sql_plugin.cc
        Rename alloc_mdl_locks to alloc_mdl_requests.
     @ sql/sql_servers.cc
        Rename alloc_mdl_locks to alloc_mdl_requests.
     @ sql/sql_show.cc
        Various changes to accommodate that lock requests are separated
        from lock tickets (granted locks).
     @ sql/sql_table.cc
        Various changes to accommodate that lock requests are separated
        from lock tickets (granted locks).
     @ sql/sql_trigger.cc
        Save reference to the lock ticket so it can be downgraded later.
     @ sql/sql_udf.cc
        Rename alloc_mdl_locks to alloc_mdl_requests.
     @ sql/table.cc
        Rename mdl_alloc_lock to mdl_request_alloc.
     @ sql/table.h
        Separate MDL lock requests from lock tickets (granted locks).
     @ storage/myisammrg/ha_myisammrg.cc
        Rename alloc_mdl_locks to alloc_mdl_requests.

    added:
      mysql-test/r/mdl_sync.result
      mysql-test/t/mdl_sync.test
    modified:
      sql/event_db_repository.cc
      sql/ha_ndbcluster_binlog.cc
      sql/lock.cc
      sql/log_event.cc
      sql/mdl.cc
      sql/mdl.h
      sql/sp_head.cc
      sql/sql_acl.cc
      sql/sql_base.cc
      sql/sql_class.h
      sql/sql_delete.cc
      sql/sql_handler.cc
      sql/sql_insert.cc
      sql/sql_parse.cc
      sql/sql_plist.h
      sql/sql_plugin.cc
      sql/sql_servers.cc
      sql/sql_show.cc
      sql/sql_table.cc
      sql/sql_trigger.cc
      sql/sql_udf.cc
      sql/table.cc
      sql/table.h
      storage/myisammrg/ha_myisammrg.cc
=== added file 'mysql-test/r/mdl_sync.result'
--- a/mysql-test/r/mdl_sync.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/r/mdl_sync.result	2009-12-03 23:29:40 +0000
@@ -0,0 +1,21 @@
+SET DEBUG_SYNC= 'RESET';
+drop table if exists t1,t2,t3;
+create table t1 (i int);
+create table t2 (i int);
+connection: default
+lock tables t2 read;
+connection: con1
+set debug_sync='mdl_upgrade_shared_lock_to_exclusive SIGNAL parked WAIT_FOR go';
+alter table t1 rename t3;
+connection: default
+set debug_sync= 'now WAIT_FOR parked';
+connection: con2
+set debug_sync='mdl_acquire_exclusive_locks_wait SIGNAL go';
+drop table t1,t2;
+connection: con1
+connection: default
+unlock tables;
+connection: con2
+ERROR 42S02: Unknown table 't1'
+drop table t3;
+SET DEBUG_SYNC= 'RESET';

=== added file 'mysql-test/t/mdl_sync.test'
--- a/mysql-test/t/mdl_sync.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/t/mdl_sync.test	2009-12-03 23:29:40 +0000
@@ -0,0 +1,69 @@
+#
+# We need the Debug Sync Facility.
+#
+--source include/have_debug_sync.inc
+
+# Clean up resources used in this test case.
+--disable_warnings
+SET DEBUG_SYNC= 'RESET';
+--enable_warnings
+
+#
+# Test the case of when a exclusive lock request waits for a
+# shared lock being upgraded to a exclusive lock.
+#
+
+connect (con1,localhost,root,,test,,);
+connect (con2,localhost,root,,test,,);
+connect (con3,localhost,root,,test,,);
+
+connection default;
+
+--disable_warnings
+drop table if exists t1,t2,t3;
+--enable_warnings
+
+create table t1 (i int);
+create table t2 (i int);
+
+--echo connection: default
+lock tables t2 read;
+
+connection con1;
+--echo connection: con1
+set debug_sync='mdl_upgrade_shared_lock_to_exclusive SIGNAL parked WAIT_FOR go';
+--send alter table t1 rename t3
+
+connection default;
+--echo connection: default
+set debug_sync= 'now WAIT_FOR parked';
+
+connection con2;
+--echo connection: con2
+set debug_sync='mdl_acquire_exclusive_locks_wait SIGNAL go';
+--send drop table t1,t2
+
+connection con1;
+--echo connection: con1
+--reap
+
+connection default;
+--echo connection: default
+unlock tables;
+
+connection con2;
+--echo connection: con2
+--error ER_BAD_TABLE_ERROR
+--reap
+
+connection default;
+drop table t3;
+
+disconnect con1;
+disconnect con2;
+disconnect con3;
+
+# Clean up resources used in this test case.
+--disable_warnings
+SET DEBUG_SYNC= 'RESET';
+--enable_warnings

=== modified file 'sql/event_db_repository.cc'
--- a/sql/event_db_repository.cc	2009-12-02 15:37:10 +0000
+++ b/sql/event_db_repository.cc	2009-12-03 23:29:40 +0000
@@ -555,7 +555,7 @@ Event_db_repository::open_event_table(TH
   DBUG_ENTER("Event_db_repository::open_event_table");
 
   tables.init_one_table("mysql", 5, "event", 5, "event", lock_type);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &tables))
   {
@@ -1110,7 +1110,7 @@ Event_db_repository::check_system_tables
 
   /* Check mysql.db */
   tables.init_one_table("mysql", 5, "db", 2, "db", TL_READ);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &tables))
   {
@@ -1128,7 +1128,7 @@ Event_db_repository::check_system_tables
   }
   /* Check mysql.user */
   tables.init_one_table("mysql", 5, "user", 4, "user", TL_READ);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &tables))
   {
@@ -1149,7 +1149,7 @@ Event_db_repository::check_system_tables
   }
   /* Check mysql.event */
   tables.init_one_table("mysql", 5, "event", 5, "event", TL_READ);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &tables))
   {

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2009-12-02 16:31:57 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2009-12-03 23:29:40 +0000
@@ -140,8 +140,7 @@ static Uint64 *p_latest_trans_gci= 0;
 */
 static TABLE *ndb_binlog_index= 0;
 static TABLE_LIST binlog_tables;
-static MDL_LOCK_DATA binlog_mdl_lock_data;
-static char binlog_mdlkey[MAX_MDLKEY_LENGTH];
+static MDL_LOCK_REQUEST binlog_mdl_lock_request;
 
 /*
   Helper functions
@@ -2343,9 +2342,8 @@ static int open_ndb_binlog_index(THD *th
   tables->alias= tables->table_name= reptable;
   tables->lock_type= TL_WRITE;
   thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
-  mdl_init_lock(&binlog_mdl_lock_data, binlog_mdlkey, 0, tables->db,
-                tables->table_name);
-  tables->mdl_lock_data= &binlog_mdl_lock_data;
+  mdl_request_init(&binlog_mdl_lock_request, 0, tables->db, tables->table_name);
+  tables->mdl_lock_request= &binlog_mdl_lock_request;
   tables->required_type= FRMTYPE_TABLE;
   uint counter;
   thd->clear_error();

=== modified file 'sql/lock.cc'
--- a/sql/lock.cc	2009-12-03 20:08:27 +0000
+++ b/sql/lock.cc	2009-12-03 23:29:40 +0000
@@ -953,24 +953,24 @@ static MYSQL_LOCK *get_lock_data(THD *th
 bool lock_table_names(THD *thd, TABLE_LIST *table_list)
 {
   TABLE_LIST *lock_table;
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_LOCK_REQUEST *mdl_lock_req;
 
   for (lock_table= table_list; lock_table; lock_table= lock_table->next_local)
   {
-    if (!(mdl_lock_data= mdl_alloc_lock(0, lock_table->db,
-                                        lock_table->table_name,
-                                        thd->mem_root)))
+    mdl_lock_req= mdl_request_alloc(0, lock_table->db, lock_table->table_name,
+                                    thd->mem_root);
+    if (!mdl_lock_req)
       goto end;
-    mdl_set_lock_type(mdl_lock_data, MDL_EXCLUSIVE);
-    mdl_add_lock(&thd->mdl_context, mdl_lock_data);
-    lock_table->mdl_lock_data= mdl_lock_data;
+    mdl_request_set_type(mdl_lock_req, MDL_EXCLUSIVE);
+    mdl_request_add(&thd->mdl_context, mdl_lock_req);
+    lock_table->mdl_lock_request= mdl_lock_req;
   }
   if (mdl_acquire_exclusive_locks(&thd->mdl_context))
     goto end;
   return 0;
 
 end:
-  mdl_remove_all_locks(&thd->mdl_context);
+  mdl_request_remove_all(&thd->mdl_context);
   return 1;
 }
 
@@ -986,8 +986,8 @@ end:
 void unlock_table_names(THD *thd)
 {
   DBUG_ENTER("unlock_table_names");
-  mdl_release_locks(&thd->mdl_context);
-  mdl_remove_all_locks(&thd->mdl_context);
+  mdl_ticket_release_all(&thd->mdl_context);
+  mdl_request_remove_all(&thd->mdl_context);
   DBUG_VOID_RETURN;
 }
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2009-12-03 18:37:38 +0000
+++ b/sql/log_event.cc	2009-12-03 23:29:40 +0000
@@ -8059,8 +8059,8 @@ Table_map_log_event::~Table_map_log_even
 int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
 {
   RPL_TABLE_LIST *table_list;
-  char *db_mem, *tname_mem, *mdlkey;
-  MDL_LOCK_DATA *mdl_lock_data;
+  char *db_mem, *tname_mem;
+  MDL_LOCK_REQUEST *mdl_lock_request;
   size_t dummy_len;
   void *memory;
   DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
@@ -8075,8 +8075,7 @@ int Table_map_log_event::do_apply_event(
                                 &table_list, (uint) sizeof(RPL_TABLE_LIST),
                                 &db_mem, (uint) NAME_LEN + 1,
                                 &tname_mem, (uint) NAME_LEN + 1,
-                                &mdl_lock_data, sizeof(MDL_LOCK_DATA),
-                                &mdlkey, MAX_MDLKEY_LENGTH,
+                                &mdl_lock_request, sizeof(MDL_LOCK_REQUEST),
                                 NullS)))
     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
@@ -8089,9 +8088,9 @@ int Table_map_log_event::do_apply_event(
   table_list->updating= 1;
   strmov(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
   strmov(table_list->table_name, m_tblnam);
-  mdl_init_lock(mdl_lock_data, mdlkey, 0, table_list->db,
-                table_list->table_name);
-  table_list->mdl_lock_data= mdl_lock_data;
+  mdl_request_init(mdl_lock_request, 0, table_list->db,
+                   table_list->table_name);
+  table_list->mdl_lock_request= mdl_lock_request;
 
   int error= 0;
 

=== modified file 'sql/mdl.cc'
--- a/sql/mdl.cc	2009-12-03 22:48:14 +0000
+++ b/sql/mdl.cc	2009-12-03 23:29:40 +0000
@@ -16,6 +16,7 @@
 
 
 #include "mdl.h"
+#include "debug_sync.h"
 #include <hash.h>
 #include <mysqld_error.h>
 
@@ -29,39 +30,43 @@
 
 struct MDL_LOCK
 {
-  I_P_List<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> active_shared;
+  typedef I_P_List<MDL_LOCK_TICKET,
+                   I_P_List_adapter<MDL_LOCK_TICKET,
+                                    &MDL_LOCK_TICKET::next_in_lock,
+                                    &MDL_LOCK_TICKET::prev_in_lock> >
+          Ticket_list;
+
+  typedef Ticket_list::Iterator Ticket_iterator;
+
+  /** The type of lock (shared or exclusive). */
+  enum
+  {
+    SHARED,
+    EXCLUSIVE,
+  } type;
+  /** The key of the object (data) being protected. */
+  MDL_KEY key;
+  /** List of granted tickets for this lock. */
+  Ticket_list granted;
   /*
     There can be several upgraders and active exclusive
     belonging to the same context.
   */
-  I_P_List<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> active_shared_waiting_upgrade;
-  I_P_List<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> active_exclusive;
-  I_P_List<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> waiting_exclusive;
-  /**
-     Number of MDL_LOCK_DATA objects associated with this MDL_LOCK instance
-     and therefore present in one of above lists. Note that this number
-     doesn't account for pending requests for shared lock since we don't
-     associate them with MDL_LOCK and don't keep them in any list.
-  */
-  uint   lock_data_count;
+  Ticket_list waiting;
   void   *cached_object;
   mdl_cached_object_release_hook cached_object_release_hook;
 
-  MDL_LOCK() : cached_object(0), cached_object_release_hook(0) {}
-
-  MDL_LOCK_DATA *get_key_owner()
+  MDL_LOCK(const MDL_KEY *mdl_key)
+  : type(SHARED), cached_object(0), cached_object_release_hook(0)
   {
-     return !active_shared.is_empty() ?
-            active_shared.head() :
-            (!active_shared_waiting_upgrade.is_empty() ?
-             active_shared_waiting_upgrade.head() :
-             (!active_exclusive.is_empty() ?
-              active_exclusive.head() : waiting_exclusive.head()));
+    key.mdl_key_init(mdl_key);
+    granted.empty();
+    waiting.empty();
   }
 
-  bool has_one_lock_data()
+  bool is_empty()
   {
-    return (lock_data_count == 1);
+    return (granted.is_empty() && waiting.is_empty());
   }
 };
 
@@ -86,12 +91,13 @@ struct MDL_GLOBAL_LOCK
 } global_lock;
 
 
-extern "C" uchar *mdl_locks_key(const uchar *record, size_t *length,
-                                my_bool not_used __attribute__((unused)))
+extern "C" uchar *
+mdl_locks_key(const uchar *record, size_t *length,
+              my_bool not_used __attribute__((unused)))
 {
   MDL_LOCK *entry=(MDL_LOCK*) record;
-  *length= entry->get_key_owner()->key_length;
-  return (uchar*) entry->get_key_owner()->key;
+  *length= entry->key.length();
+  return (uchar*) entry->key.ptr();
 }
 
 
@@ -148,7 +154,8 @@ void mdl_destroy()
 
 void mdl_context_init(MDL_CONTEXT *context, THD *thd)
 {
-  context->locks.empty();
+  context->requests.empty();
+  context->tickets.empty();
   context->thd= thd;
   context->has_global_shared_lock= FALSE;
 }
@@ -168,7 +175,8 @@ void mdl_context_init(MDL_CONTEXT *conte
 
 void mdl_context_destroy(MDL_CONTEXT *context)
 {
-  DBUG_ASSERT(context->locks.is_empty());
+  DBUG_ASSERT(context->requests.is_empty());
+  DBUG_ASSERT(context->tickets.is_empty());
   DBUG_ASSERT(!context->has_global_shared_lock);
 }
 
@@ -185,8 +193,10 @@ void mdl_context_destroy(MDL_CONTEXT *co
 
 void mdl_context_backup_and_reset(MDL_CONTEXT *ctx, MDL_CONTEXT *backup)
 {
-  backup->locks.empty();
-  ctx->locks.swap(backup->locks);
+  backup->requests.empty();
+  backup->tickets.empty();
+  ctx->requests.swap(backup->requests);
+  ctx->tickets.swap(backup->tickets);
 }
 
 
@@ -196,8 +206,10 @@ void mdl_context_backup_and_reset(MDL_CO
 
 void mdl_context_restore(MDL_CONTEXT *ctx, MDL_CONTEXT *backup)
 {
-  DBUG_ASSERT(ctx->locks.is_empty());
-  ctx->locks.swap(backup->locks);
+  DBUG_ASSERT(ctx->requests.is_empty());
+  DBUG_ASSERT(ctx->tickets.is_empty());
+  ctx->requests.swap(backup->requests);
+  ctx->tickets.swap(backup->tickets);
 }
 
 
@@ -207,20 +219,29 @@ void mdl_context_restore(MDL_CONTEXT *ct
 
 void mdl_context_merge(MDL_CONTEXT *dst, MDL_CONTEXT *src)
 {
-  MDL_LOCK_DATA *lock_data;
+  MDL_LOCK_TICKET *ticket;
+  MDL_LOCK_REQUEST *lock_req;
 
   DBUG_ASSERT(dst->thd == src->thd);
 
-  if (!src->locks.is_empty())
+  if (!src->requests.is_empty())
+  {
+    MDL_CONTEXT::Request_iterator it(src->requests);
+    while ((lock_req= it++))
+      dst->requests.push_front(lock_req);
+    src->requests.empty();
+  }
+
+  if (!src->tickets.is_empty())
   {
-    I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(src->locks);
-    while ((lock_data= it++))
+    MDL_CONTEXT::Ticket_iterator it(src->tickets);
+    while ((ticket= it++))
     {
-      DBUG_ASSERT(lock_data->ctx);
-      lock_data->ctx= dst;
-      dst->locks.push_front(lock_data);
+      DBUG_ASSERT(ticket->ctx);
+      ticket->ctx= dst;
+      dst->tickets.push_front(ticket);
     }
-    src->locks.empty();
+    src->tickets.empty();
   }
 }
 
@@ -230,53 +251,36 @@ void mdl_context_merge(MDL_CONTEXT *dst,
 
    This is to be used for every lock request.
 
-   Note that initialization and allocation are split
-   into two calls. This is to allow flexible memory management
-   of lock requests. Normally a lock request is stored
-   in statement memory (e.g. is a member of struct TABLE_LIST),
-   but we would also like to allow allocation of lock
-   requests in other memory roots, for example in the grant
-   subsystem, to lock privilege tables.
-
-   The MDL subsystem does not own or manage memory of lock
-   requests. Instead it assumes that the life time of every lock
-   request encloses calls to mdl_acquire_shared_lock() and
-   mdl_release_locks().
-
-   @param  lock_data  Pointer to an MDL_LOCK_DATA object to initialize
-   @param  key_buff   Pointer to the buffer for key for the lock request
-                      (should be at least 4+ strlen(db) + 1 + strlen(name)
-                      + 1 bytes, or, if the lengths are not known,
-                      MAX_MDLKEY_LENGTH)
+   Note that initialization and allocation are split into two
+   calls. This is to allow flexible memory management of lock
+   requests. Normally a lock request is stored in statement memory
+   (e.g. is a member of struct TABLE_LIST), but we would also like
+   to allow allocation of lock requests in other memory roots,
+   for example in the grant subsystem, to lock privilege tables.
+
+   The MDL subsystem does not own or manage memory of lock requests.
+   Instead it assumes that the life time of every lock request (including
+   encompassed members db/name) encloses calls to mdl_request_add()
+   and mdl_request_remove() or mdl_request_remove_all().
+
+   @param  lock_req   Pointer to an MDL_LOCK_REQUEST object to initialize
    @param  type       Id of type of object to be locked
    @param  db         Name of database to which the object belongs
    @param  name       Name of of the object
 
-   Stores the database name, object name and the type in the key
-   buffer. Initializes mdl_el to point to the key.
-   We can't simply initialize MDL_LOCK_DATA with type, db and name
-   by-pointer because of the underlying HASH implementation
-   requires the key to be a contiguous buffer.
-
    The initialized lock request will have MDL_SHARED type.
 
    Suggested lock types: TABLE - 0 PROCEDURE - 1 FUNCTION - 2
-   Note that tables and views have the same lock type, since
+   Note that tables and views must have the same lock type, since
    they share the same name space in the SQL standard.
 */
 
-void mdl_init_lock(MDL_LOCK_DATA *lock_data, char *key, int type,
-                   const char *db, const char *name)
+void mdl_request_init(MDL_LOCK_REQUEST *lock_req, unsigned char type,
+                      const char *db, const char *name)
 {
-  int4store(key, type);
-  lock_data->key_length= (uint) (strmov(strmov(key+4, db)+1, name)-key)+1;
-  lock_data->key= key;
-  lock_data->type= MDL_SHARED;
-  lock_data->state= MDL_INITIALIZED;
-#ifndef DBUG_OFF
-  lock_data->ctx= 0;
-  lock_data->lock= 0;
-#endif
+  lock_req->key.mdl_key_init(type, db, name);
+  lock_req->type= MDL_SHARED;
+  lock_req->ticket= NULL;
 }
 
 
@@ -298,19 +302,19 @@ void mdl_init_lock(MDL_LOCK_DATA *lock_d
    @retval non-0  Pointer to an object representing a lock request
 */
 
-MDL_LOCK_DATA *mdl_alloc_lock(int type, const char *db, const char *name,
-                              MEM_ROOT *root)
+MDL_LOCK_REQUEST *
+mdl_request_alloc(unsigned char type, const char *db,
+                  const char *name, MEM_ROOT *root)
 {
-  MDL_LOCK_DATA *lock_data;
-  char *key;
+  MDL_LOCK_REQUEST *lock_req;
 
-  if (!multi_alloc_root(root, &lock_data, sizeof(MDL_LOCK_DATA), &key,
-                        MAX_MDLKEY_LENGTH, NULL))
+  if (!(lock_req= (MDL_LOCK_REQUEST*) alloc_root(root,
+                                                 sizeof(MDL_LOCK_REQUEST))))
     return NULL;
 
-  mdl_init_lock(lock_data, key, type, db, name);
+  mdl_request_init(lock_req, type, db, name);
 
-  return lock_data;
+  return lock_req;
 }
 
 
@@ -318,121 +322,122 @@ MDL_LOCK_DATA *mdl_alloc_lock(int type, 
    Add a lock request to the list of lock requests of the context.
 
    The procedure to acquire metadata locks is:
-     - allocate and initialize lock requests (mdl_alloc_lock())
-     - associate them with a context (mdl_add_lock())
-     - call mdl_acquire_shared_lock()/mdl_release_lock() (maybe repeatedly).
+     - allocate and initialize lock requests (mdl_request_alloc())
+     - associate them with a context (mdl_request_add())
+     - call mdl_acquire_shared_lock()/mdl_ticket_release() (maybe repeatedly).
 
    Associates a lock request with the given context.
 
    @param  context    The MDL context to associate the lock with.
                       There should be no more than one context per
                       connection, to avoid deadlocks.
-   @param  lock_data  The lock request to be added.
+   @param  lock_req   The lock request to be added.
 */
 
-void mdl_add_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data)
+void mdl_request_add(MDL_CONTEXT *context, MDL_LOCK_REQUEST *lock_req)
 {
-  DBUG_ENTER("mdl_add_lock");
-  DBUG_ASSERT(lock_data->state == MDL_INITIALIZED);
-  DBUG_ASSERT(!lock_data->ctx);
-  lock_data->ctx= context;
-  context->locks.push_front(lock_data);
+  DBUG_ENTER("mdl_request_add");
+  DBUG_ASSERT(lock_req->ticket == NULL);
+  context->requests.push_front(lock_req);
   DBUG_VOID_RETURN;
 }
 
 
 /**
-   Remove a lock request from the list of lock requests of the context.
+   Remove a lock request from the list of lock requests.
 
    Disassociates a lock request from the given context.
 
    @param  context    The MDL context to remove the lock from.
-   @param  lock_data  The lock request to be removed.
+   @param  lock_req   The lock request to be removed.
 
-   @pre The lock request being removed should correspond to lock which
+   @pre The lock request being removed should correspond to a ticket that
         was released or was not acquired.
 
-   @note Resets lock request for lock released back to its initial state
+   @note Resets lock request back to its initial state
          (i.e. sets type to MDL_SHARED).
 */
 
-void mdl_remove_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data)
+void mdl_request_remove(MDL_CONTEXT *context, MDL_LOCK_REQUEST *lock_req)
 {
-  DBUG_ENTER("mdl_remove_lock");
-  DBUG_ASSERT(lock_data->state == MDL_INITIALIZED);
-  DBUG_ASSERT(context == lock_data->ctx);
+  DBUG_ENTER("mdl_request_remove");
   /* Reset lock request back to its initial state. */
-  lock_data->type= MDL_SHARED;
-#ifndef DBUG_OFF
-  lock_data->ctx= 0;
-#endif
-  context->locks.remove(lock_data);
+  lock_req->type= MDL_SHARED;
+  lock_req->ticket= NULL;
+  context->requests.remove(lock_req);
   DBUG_VOID_RETURN;
 }
 
 
 /**
-   Clear all lock requests in the context (clear the context).
-
+   Clear all lock requests in the context.
    Disassociates lock requests from the context.
-   All granted locks must be released prior to calling this
-   function.
-
-   In other words, the expected procedure to release locks is:
-     - mdl_release_locks();
-     - mdl_remove_all_locks();
-
-   We could possibly merge mdl_remove_all_locks() and mdl_release_locks(),
-   but this function comes in handy when we need to back off: in that case
-   we release all the locks acquired so-far but do not free them, since
-   we know that the respective lock requests will be used again.
 
    Also resets lock requests back to their initial state (i.e. MDL_SHARED).
 
    @param context Context to be cleared.
 */
 
-void mdl_remove_all_locks(MDL_CONTEXT *context)
+void mdl_request_remove_all(MDL_CONTEXT *context)
 {
-  MDL_LOCK_DATA *lock_data;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
-  while ((lock_data= it++))
+  MDL_LOCK_REQUEST *lock_req;
+  MDL_CONTEXT::Request_iterator it(context->requests);
+  while ((lock_req= it++))
   {
     /* Reset lock request back to its initial state. */
-    lock_data->type= MDL_SHARED;
-#ifndef DBUG_OFF
-    lock_data->ctx= 0;
-#endif
+    lock_req->type= MDL_SHARED;
+    lock_req->ticket= NULL;
   }
-  context->locks.empty();
+  context->requests.empty();
 }
 
 
 /**
-   Auxiliary functions needed for creation/destruction of MDL_LOCK
-   objects.
+   Auxiliary functions needed for creation/destruction of MDL_LOCK objects.
 
    @todo This naive implementation should be replaced with one that saves
          on memory allocation by reusing released objects.
 */
 
-static MDL_LOCK* get_lock_object(void)
+static MDL_LOCK* alloc_lock_object(const MDL_KEY *mdl_key)
 {
-  return new MDL_LOCK();
+  return new MDL_LOCK(mdl_key);
 }
 
 
-static void release_lock_object(MDL_LOCK *lock)
+static void free_lock_object(MDL_LOCK *lock)
 {
   delete lock;
 }
 
 
 /**
+   Auxiliary functions needed for creation/destruction of MDL_LOCK_TICKET
+   objects.
+
+   @todo This naive implementation should be replaced with one that saves
+         on memory allocation by reusing released objects.
+*/
+
+static MDL_LOCK_TICKET* alloc_ticket_object(MDL_CONTEXT *context)
+{
+  MDL_LOCK_TICKET *ticket= new MDL_LOCK_TICKET;
+  return ticket;
+}
+
+
+static void free_ticket_object(MDL_LOCK_TICKET *ticket)
+{
+  delete ticket;
+}
+
+
+/**
    Helper functions which simplifies writing various checks and asserts.
 */
 
-static bool is_shared(MDL_LOCK_DATA *lock_data)
+template <typename T>
+static inline bool is_shared(T *lock_data)
 {
   return (lock_data->type < MDL_EXCLUSIVE);
 }
@@ -498,7 +503,7 @@ static inline void mdl_exit_cond(MDL_CON
          request, implying a form of lock on the global metadata, is
          compatible with the current state of the global metadata lock.
 
-   @param lock_data Request for lock on an individual object, implying a
+   @param lock_req  Request for lock on an individual object, implying a
                     certain kind of global metadata lock.
 
    @retval TRUE  - Lock request can be satisfied
@@ -528,9 +533,9 @@ static inline void mdl_exit_cond(MDL_CON
         type of locks we don't even have any accounting for them.
 */
 
-static bool can_grant_global_lock(MDL_LOCK_DATA *lock_data)
+static bool can_grant_global_lock(enum_mdl_type type, bool is_upgrade)
 {
-  switch (lock_data->type)
+  switch (type)
   {
   case MDL_SHARED:
   case MDL_SHARED_HIGH_PRIO:
@@ -549,7 +554,7 @@ static bool can_grant_global_lock(MDL_LO
       return TRUE;
     break;
   case MDL_EXCLUSIVE:
-    if (lock_data->state == MDL_PENDING_UPGRADE)
+    if (is_upgrade)
     {
       /*
         We are upgrading MDL_SHARED to MDL_EXCLUSIVE.
@@ -586,7 +591,7 @@ static bool can_grant_global_lock(MDL_LO
    Check if request for the lock can be satisfied given current state of lock.
 
    @param  lock       Lock.
-   @param  lock_data  Request for lock.
+   @param  lock_req   Request for lock.
 
    @retval TRUE   Lock request can be satisfied
    @retval FALSE  There is some conflicting lock.
@@ -612,68 +617,96 @@ static bool can_grant_global_lock(MDL_LO
         being upgraded.
 */
 
-static bool can_grant_lock(MDL_LOCK *lock, MDL_LOCK_DATA *lock_data)
+static bool can_grant_lock(MDL_CONTEXT *ctx, MDL_LOCK *lock,
+                           enum_mdl_type type, bool is_upgrade)
 {
-  switch (lock_data->type)
-  {
+  bool can_grant= FALSE;
+
+  switch (type) {
   case MDL_SHARED:
   case MDL_SHARED_UPGRADABLE:
   case MDL_SHARED_HIGH_PRIO:
-    if ((lock->active_exclusive.is_empty() &&
-         (lock_data->type == MDL_SHARED_HIGH_PRIO ||
-          lock->waiting_exclusive.is_empty() &&
-          lock->active_shared_waiting_upgrade.is_empty())) ||
-        (!lock->active_exclusive.is_empty() &&
-         lock->active_exclusive.head()->ctx == lock_data->ctx))
+    if (lock->type == MDL_LOCK::SHARED)
+    {
+      /* Pending exclusive locks have higher priority over shared locks. */
+      if (lock->waiting.is_empty() || type == MDL_SHARED_HIGH_PRIO)
+        can_grant= TRUE;
+    }
+    else if (lock->granted.head()->ctx == ctx)
     {
       /*
         When exclusive lock comes from the same context we can satisfy our
         shared lock. This is required for CREATE TABLE ... SELECT ... and
         ALTER VIEW ... AS ....
       */
-      return TRUE;
+      can_grant= TRUE;
     }
-    else
-      return FALSE;
     break;
   case MDL_EXCLUSIVE:
-    if (lock_data->state == MDL_PENDING_UPGRADE)
+    if (is_upgrade)
     {
       /* We are upgrading MDL_SHARED to MDL_EXCLUSIVE. */
-      MDL_LOCK_DATA *conf_lock_data;
-      I_P_List_iterator<MDL_LOCK_DATA,
-                        MDL_LOCK_DATA_lock> it(lock->active_shared);
+      MDL_LOCK_TICKET *conf_lock_ticket;
+      MDL_LOCK::Ticket_iterator it(lock->granted);
 
       /*
         There should be no active exclusive locks since we own shared lock
         on the object.
       */
-      DBUG_ASSERT(lock->active_exclusive.is_empty() &&
-                  lock->active_shared_waiting_upgrade.head() == lock_data);
+      DBUG_ASSERT(lock->type == MDL_LOCK::SHARED);
 
-      while ((conf_lock_data= it++))
+      while ((conf_lock_ticket= it++))
       {
         /*
           When upgrading shared lock to exclusive one we can have other shared
           locks for the same object in the same context, e.g. in case when several
           instances of TABLE are open.
         */
-        if (conf_lock_data->ctx != lock_data->ctx)
-          return FALSE;
+        if (conf_lock_ticket->ctx != ctx)
+          break;
       }
-      return TRUE;
+      /* Grant lock if there are no conflicting shared locks. */
+      if (conf_lock_ticket == NULL)
+        can_grant= TRUE;
+      break;
     }
-    else
+    else if (lock->type == MDL_LOCK::SHARED)
     {
-      return (lock->active_exclusive.is_empty() &&
-              lock->active_shared_waiting_upgrade.is_empty() &&
-              lock->active_shared.is_empty());
+      can_grant= lock->granted.is_empty();
     }
     break;
   default:
     DBUG_ASSERT(0);
   }
-  return FALSE;
+  return can_grant;
+}
+
+
+/**
+   Check whether the context already holds a compatible lock ticket
+   on a object. Only shared locks can be recursive.
+
+   @param lock_req  Lock request object for lock to be acquired
+
+   @return A pointer to the lock ticket for the object or NULL otherwise.
+*/
+
+static MDL_LOCK_TICKET *
+mdl_context_find_ticket(MDL_CONTEXT *ctx, MDL_LOCK_REQUEST *lock_req)
+{
+  MDL_LOCK_TICKET *ticket;
+  MDL_CONTEXT::Ticket_iterator it(ctx->tickets);
+
+  DBUG_ASSERT(is_shared(lock_req));
+
+  while ((ticket= it++))
+  {
+    if (lock_req->type == ticket->type &&
+        lock_req->key.is_equal(&ticket->lock->key))
+      break;
+  }
+
+  return ticket;
 }
 
 
@@ -691,7 +724,7 @@ static bool can_grant_lock(MDL_LOCK *loc
    This function must be called after the lock is added to a context.
 
    @param context    [in]  Context containing request for lock
-   @param lock_data  [in]  Lock request object for lock to be acquired
+   @param lock_req   [in]  Lock request object for lock to be acquired
    @param retry      [out] Indicates that conflicting lock exists and another
                            attempt should be made after releasing all current
                            locks and waiting for conflicting lock go away
@@ -702,81 +735,111 @@ static bool can_grant_lock(MDL_LOCK *loc
                     In the latter case "retry" parameter is set to TRUE.
 */
 
-bool mdl_acquire_shared_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data,
+bool mdl_acquire_shared_lock(MDL_CONTEXT *context, MDL_LOCK_REQUEST *lock_req,
                              bool *retry)
 {
   MDL_LOCK *lock;
+  MDL_KEY *key= &lock_req->key;
+  MDL_LOCK_TICKET *ticket;
   *retry= FALSE;
 
-  DBUG_ASSERT(is_shared(lock_data) && lock_data->state == MDL_INITIALIZED);
-
-  DBUG_ASSERT(lock_data->ctx == context);
+  DBUG_ASSERT(is_shared(lock_req) && lock_req->ticket == NULL);
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
   if (context->has_global_shared_lock &&
-      lock_data->type == MDL_SHARED_UPGRADABLE)
+      lock_req->type == MDL_SHARED_UPGRADABLE)
   {
     my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
     return TRUE;
   }
 
+  /*
+    Check whether the context already holds a shared lock on the object,
+    and if so, grant the request.
+  */
+  if ((ticket= mdl_context_find_ticket(context, lock_req)))
+  {
+    DBUG_ASSERT(ticket->state == MDL_ACQUIRED);
+    lock_req->ticket= ticket;
+    return FALSE;
+  }
+
   pthread_mutex_lock(&LOCK_mdl);
 
-  if (!can_grant_global_lock(lock_data))
+  if (!can_grant_global_lock(lock_req->type, FALSE))
   {
     pthread_mutex_unlock(&LOCK_mdl);
     *retry= TRUE;
     return TRUE;
   }
 
-  if (!(lock= (MDL_LOCK *)my_hash_search(&mdl_locks, (uchar*)lock_data->key,
-                                         lock_data->key_length)))
+  if (!(ticket= alloc_ticket_object(context)))
   {
-    if (!(lock= get_lock_object()))
-    {
-      pthread_mutex_unlock(&LOCK_mdl);
-      return TRUE;
-    }
-    /*
-      Before inserting MDL_LOCK object into hash we should add at least one
-      MDL_LOCK_DATA to its lists in order to provide key for this element.
-      Thus we can't merge two branches of the above if-statement.
-    */
-    lock->active_shared.push_front(lock_data);
-    lock->lock_data_count= 1;
-    if (my_hash_insert(&mdl_locks, (uchar*)lock))
+    pthread_mutex_unlock(&LOCK_mdl);
+    return TRUE;
+  }
+
+  if (!(lock= (MDL_LOCK*) my_hash_search(&mdl_locks,
+                                         key->ptr(), key->length())))
+  {
+    /* Default lock type is MDL_LOCK::SHARED */
+    lock= alloc_lock_object(key);
+    if (!lock || my_hash_insert(&mdl_locks, (uchar*)lock))
     {
-      release_lock_object(lock);
+      free_lock_object(lock);
+      free_ticket_object(ticket);
       pthread_mutex_unlock(&LOCK_mdl);
       return TRUE;
     }
-    lock_data->state= MDL_ACQUIRED;
-    lock_data->lock= lock;
-    if (lock_data->type == MDL_SHARED_UPGRADABLE)
+  }
+
+  if (can_grant_lock(context, lock, lock_req->type, FALSE))
+  {
+    lock->granted.push_front(ticket);
+    context->tickets.push_front(ticket);
+    ticket->state= MDL_ACQUIRED;
+    lock_req->ticket= ticket;
+    ticket->lock= lock;
+    ticket->type= lock_req->type;
+    ticket->ctx= context;
+    if (lock_req->type == MDL_SHARED_UPGRADABLE)
       global_lock.active_intention_exclusive++;
   }
   else
   {
-    if (can_grant_lock(lock, lock_data))
-    {
-      lock->active_shared.push_front(lock_data);
-      lock->lock_data_count++;
-      lock_data->state= MDL_ACQUIRED;
-      lock_data->lock= lock;
-      if (lock_data->type == MDL_SHARED_UPGRADABLE)
-        global_lock.active_intention_exclusive++;
-    }
-    else
-      *retry= TRUE;
+    /* We can't get here if we allocated a new lock. */
+    DBUG_ASSERT(! lock->is_empty());
+    *retry= TRUE;
+    free_ticket_object(ticket);
   }
+
   pthread_mutex_unlock(&LOCK_mdl);
 
   return *retry;
 }
 
 
-static void release_lock(MDL_LOCK_DATA *lock_data);
+static void release_ticket(MDL_CONTEXT *context, MDL_LOCK_TICKET *ticket);
+
+
+/**
+   Notify a thread holding a shared metadata lock of a pending exclusive lock.
+
+   @param thd               Current thread context
+   @param conf_lock_ticket  Conflicting metadata lock
+
+   @retval TRUE   A thread was woken up
+   @retval FALSE  Lock is not a shared one or no thread was woken up
+*/
+
+static bool notify_shared_lock(THD *thd, MDL_LOCK_TICKET *conf_lock_ticket)
+{
+  bool woke= FALSE;
+  if (conf_lock_ticket->type != MDL_EXCLUSIVE)
+    woke= mysql_notify_thread_having_shared_lock(thd, conf_lock_ticket->ctx->thd);
+  return woke;
+}
 
 
 /**
@@ -796,12 +859,13 @@ static void release_lock(MDL_LOCK_DATA *
 
 bool mdl_acquire_exclusive_locks(MDL_CONTEXT *context)
 {
-  MDL_LOCK_DATA *lock_data;
   MDL_LOCK *lock;
   bool signalled= FALSE;
   const char *old_msg;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
+  MDL_LOCK_REQUEST *lock_req;
+  MDL_LOCK_TICKET *ticket;
   st_my_thread_var *mysys_var= my_thread_var;
+  MDL_CONTEXT::Request_iterator it(context->requests);
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
@@ -815,46 +879,44 @@ bool mdl_acquire_exclusive_locks(MDL_CON
 
   old_msg= MDL_ENTER_COND(context, mysys_var);
 
-  while ((lock_data= it++))
+  while ((lock_req= it++))
   {
-    DBUG_ASSERT(lock_data->type == MDL_EXCLUSIVE &&
-                lock_data->state == MDL_INITIALIZED);
-    if (!(lock= (MDL_LOCK *) my_hash_search(&mdl_locks, (uchar*)lock_data->key,
-                                            lock_data->key_length)))
+    MDL_KEY *key= &lock_req->key;
+    DBUG_ASSERT(lock_req->type == MDL_EXCLUSIVE &&
+                lock_req->ticket == NULL);
+
+    /* Early allocation: ticket is used as a shortcut to the lock. */
+    if (!(ticket= alloc_ticket_object(context)))
+      goto err;
+
+    if (!(lock= (MDL_LOCK*) my_hash_search(&mdl_locks,
+                                           key->ptr(), key->length())))
     {
-      if (!(lock= get_lock_object()))
-        goto err;
-      /*
-        Again before inserting MDL_LOCK into hash provide key for
-        it by adding MDL_LOCK_DATA to one of its lists.
-      */
-      lock->waiting_exclusive.push_front(lock_data);
-      lock->lock_data_count= 1;
-      if (my_hash_insert(&mdl_locks, (uchar*)lock))
+      lock= alloc_lock_object(key);
+      if (!lock || my_hash_insert(&mdl_locks, (uchar*)lock))
       {
-        release_lock_object(lock);
+        free_ticket_object(ticket);
+        free_lock_object(lock);
         goto err;
       }
-      lock_data->lock= lock;
-      lock_data->state= MDL_PENDING;
-    }
-    else
-    {
-      lock->waiting_exclusive.push_front(lock_data);
-      lock->lock_data_count++;
-      lock_data->lock= lock;
-      lock_data->state= MDL_PENDING;
     }
+
+    lock_req->ticket= ticket;
+    ticket->state= MDL_PENDING;
+    ticket->ctx= context;
+    ticket->lock= lock;
+    ticket->type= lock_req->type;
+    lock->waiting.push_front(ticket);
   }
 
   while (1)
   {
     it.rewind();
-    while ((lock_data= it++))
+    while ((lock_req= it++))
     {
-      lock= lock_data->lock;
+      lock= lock_req->ticket->lock;
 
-      if (!can_grant_global_lock(lock_data))
+      if (!can_grant_global_lock(lock_req->type, FALSE))
       {
         /*
           There is an active or pending global shared lock so we have
@@ -863,27 +925,25 @@ bool mdl_acquire_exclusive_locks(MDL_CON
         signalled= TRUE;
         break;
       }
-      else if (!can_grant_lock(lock, lock_data))
+      else if (!can_grant_lock(context, lock, lock_req->type, FALSE))
       {
-        MDL_LOCK_DATA *conf_lock_data;
-        I_P_List_iterator<MDL_LOCK_DATA,
-                          MDL_LOCK_DATA_lock> it(lock->active_shared);
-
-        signalled= !lock->active_exclusive.is_empty() ||
-                   !lock->active_shared_waiting_upgrade.is_empty();
-
-        while ((conf_lock_data= it++))
-        {
-          signalled|=
-            mysql_notify_thread_having_shared_lock(context->thd,
-                                                   conf_lock_data->ctx->thd);
-        }
+        MDL_LOCK_TICKET *conf_lock_ticket;
+        MDL_LOCK::Ticket_iterator it(lock->granted);
+
+        signalled= (lock->type == MDL_LOCK::EXCLUSIVE);
+
+        while ((conf_lock_ticket= it++))
+          signalled|= notify_shared_lock(context->thd, conf_lock_ticket);
 
         break;
       }
     }
-    if (!lock_data)
+    if (!lock_req)
       break;
+
+    /* There is a shared or exclusive lock on the object. */
+    DEBUG_SYNC(context->thd, "mdl_acquire_exclusive_locks_wait");
+
     if (signalled)
       pthread_cond_wait(&COND_mdl, &LOCK_mdl);
     else
@@ -902,13 +962,16 @@ bool mdl_acquire_exclusive_locks(MDL_CON
       goto err;
   }
   it.rewind();
-  while ((lock_data= it++))
+  while ((lock_req= it++))
   {
     global_lock.active_intention_exclusive++;
-    lock= lock_data->lock;
-    lock->waiting_exclusive.remove(lock_data);
-    lock->active_exclusive.push_front(lock_data);
-    lock_data->state= MDL_ACQUIRED;
+    ticket= lock_req->ticket;
+    lock= ticket->lock;
+    lock->type= MDL_LOCK::EXCLUSIVE;
+    lock->waiting.remove(ticket);
+    lock->granted.push_front(ticket);
+    context->tickets.push_front(ticket);
+    ticket->state= MDL_ACQUIRED;
     if (lock->cached_object)
       (*lock->cached_object_release_hook)(lock->cached_object);
     lock->cached_object= NULL;
@@ -923,10 +986,20 @@ err:
     Ignore those lock requests which were not made MDL_PENDING.
   */
   it.rewind();
-  while ((lock_data= it++) && lock_data->state == MDL_PENDING)
+  while ((lock_req= it++) && lock_req->ticket)
   {
-    release_lock(lock_data);
-    lock_data->state= MDL_INITIALIZED;
+    ticket= lock_req->ticket;
+    DBUG_ASSERT(ticket->state == MDL_PENDING);
+    lock= ticket->lock;
+    free_ticket_object(ticket);
+    lock->waiting.remove(ticket);
+    /* Reset lock request back to its initial state. */
+    lock_req->ticket= NULL;
+    if (lock->is_empty())
+    {
+      my_hash_delete(&mdl_locks, (uchar *)lock);
+      free_lock_object(lock);
+    }
   }
   /* May be some pending requests for shared locks can be satisfied now. */
   pthread_cond_broadcast(&COND_mdl);
@@ -942,77 +1015,61 @@ err:
    new definition has been constructed.
 
    @param context   Context to which shared lock belongs
-   @param lock_data Satisfied request for shared lock to be upgraded
+   @param ticket    Ticket for shared lock to be upgraded
 
    @note In case of failure to upgrade lock (e.g. because upgrader
          was killed) leaves lock in its original state (locked in
          shared mode).
 
+   @note There can be only one upgrader for a lock or we will have deadlock.
+         This invariant is ensured by code outside of metadata subsystem usually
+         by obtaining some sort of exclusive table-level lock (e.g. TL_WRITE,
+         TL_WRITE_ALLOW_READ) before performing upgrade of metadata lock.
+
    @retval FALSE  Success
    @retval TRUE   Failure (thread was killed)
 */
 
 bool mdl_upgrade_shared_lock_to_exclusive(MDL_CONTEXT *context,
-                                          MDL_LOCK_DATA *lock_data)
+                                          MDL_LOCK_TICKET *ticket)
 {
-  MDL_LOCK *lock;
+  MDL_LOCK *lock= ticket->lock;
   const char *old_msg;
   st_my_thread_var *mysys_var= my_thread_var;
 
   DBUG_ENTER("mdl_upgrade_shared_lock_to_exclusive");
+  DEBUG_SYNC(context->thd, "mdl_upgrade_shared_lock_to_exclusive");
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED);
-
   /* Allow this function to be called twice for the same lock request. */
-  if (lock_data->type == MDL_EXCLUSIVE)
+  if (ticket->type == MDL_EXCLUSIVE)
     DBUG_RETURN(FALSE);
 
-  DBUG_ASSERT(lock_data->type == MDL_SHARED_UPGRADABLE);
-
-  lock= lock_data->lock;
-
   pthread_mutex_lock(&LOCK_mdl);
 
   old_msg= MDL_ENTER_COND(context, mysys_var);
 
-  lock_data->state= MDL_PENDING_UPGRADE;
-  /* Set type of lock request to the type at which we are aiming. */
-  lock_data->type= MDL_EXCLUSIVE;
-  lock->active_shared.remove(lock_data);
-  /*
-    There can be only one upgrader for this lock or we will have deadlock.
-    This invariant is ensured by code outside of metadata subsystem usually
-    by obtaining some sort of exclusive table-level lock (e.g. TL_WRITE,
-    TL_WRITE_ALLOW_READ) before performing upgrade of metadata lock.
-  */
-  DBUG_ASSERT(lock->active_shared_waiting_upgrade.is_empty());
-  lock->active_shared_waiting_upgrade.push_front(lock_data);
 
   /*
-    Since we should have been already acquired intention exclusive global lock
-    this call is only enforcing asserts.
+    Since we should have already acquired an intention exclusive
+    global lock this call is only enforcing asserts.
   */
-  DBUG_ASSERT(can_grant_global_lock(lock_data));
+  DBUG_ASSERT(can_grant_global_lock(MDL_EXCLUSIVE, TRUE));
 
   while (1)
   {
-    if (can_grant_lock(lock, lock_data))
+    if (can_grant_lock(context, lock, MDL_EXCLUSIVE, TRUE))
       break;
 
     bool signalled= FALSE;
-    MDL_LOCK_DATA *conf_lock_data;
-    I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> it(lock->active_shared);
+    MDL_LOCK_TICKET *conf_lock_ticket;
+    MDL_LOCK::Ticket_iterator it(lock->granted);
 
-    while ((conf_lock_data= it++))
+    while ((conf_lock_ticket= it++))
     {
-      if (conf_lock_data->ctx != context)
-      {
-        signalled|=
-          mysql_notify_thread_having_shared_lock(context->thd,
-                                                 conf_lock_data->ctx->thd);
-      }
+      if (conf_lock_ticket->ctx != context)
+        signalled|= notify_shared_lock(context->thd, conf_lock_ticket);
     }
 
     if (signalled)
@@ -1032,10 +1089,6 @@ bool mdl_upgrade_shared_lock_to_exclusiv
     }
     if (mysys_var->abort)
     {
-      lock_data->state= MDL_ACQUIRED;
-      lock_data->type= MDL_SHARED_UPGRADABLE;
-      lock->active_shared_waiting_upgrade.remove(lock_data);
-      lock->active_shared.push_front(lock_data);
       /* Pending requests for shared locks can be satisfied now. */
       pthread_cond_broadcast(&COND_mdl);
       MDL_EXIT_COND(context, mysys_var, old_msg);
@@ -1043,9 +1096,9 @@ bool mdl_upgrade_shared_lock_to_exclusiv
     }
   }
 
-  lock->active_shared_waiting_upgrade.remove(lock_data);
-  lock->active_exclusive.push_front(lock_data);
-  lock_data->state= MDL_ACQUIRED;
+  lock->type= MDL_LOCK::EXCLUSIVE;
+  /* Set the new type of lock in the ticket. */
+  ticket->type= MDL_EXCLUSIVE;
   if (lock->cached_object)
     (*lock->cached_object_release_hook)(lock->cached_object);
   lock->cached_object= 0;
@@ -1069,7 +1122,7 @@ bool mdl_upgrade_shared_lock_to_exclusiv
    block and wait for the lock if the table already exists.
 
    @param context  [in]  The context containing the lock request
-   @param lock     [in]  The lock request
+   @param lock_req [in]  The lock request
    @param conflict [out] Indicates that conflicting lock exists
 
    @retval TRUE  Failure either conflicting lock exists or some error
@@ -1081,13 +1134,15 @@ bool mdl_upgrade_shared_lock_to_exclusiv
 */
 
 bool mdl_try_acquire_exclusive_lock(MDL_CONTEXT *context,
-                                    MDL_LOCK_DATA *lock_data,
+                                    MDL_LOCK_REQUEST *lock_req,
                                     bool *conflict)
 {
   MDL_LOCK *lock;
+  MDL_LOCK_TICKET *ticket;
+  MDL_KEY *key= &lock_req->key;
 
-  DBUG_ASSERT(lock_data->type == MDL_EXCLUSIVE &&
-              lock_data->state == MDL_INITIALIZED);
+  DBUG_ASSERT(lock_req->type == MDL_EXCLUSIVE &&
+              lock_req->ticket == NULL);
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
@@ -1095,20 +1150,25 @@ bool mdl_try_acquire_exclusive_lock(MDL_
 
   pthread_mutex_lock(&LOCK_mdl);
 
-  if (!(lock= (MDL_LOCK *)my_hash_search(&mdl_locks, (uchar*)lock_data->key,
-                                         lock_data->key_length)))
+  if (!(lock= (MDL_LOCK*) my_hash_search(&mdl_locks,
+                                         key->ptr(), key->length())))
   {
-    if (!(lock= get_lock_object()))
-      goto err;
-    lock->active_exclusive.push_front(lock_data);
-    lock->lock_data_count= 1;
-    if (my_hash_insert(&mdl_locks, (uchar*)lock))
+    ticket= alloc_ticket_object(context);
+    lock= alloc_lock_object(key);
+    if (!ticket || !lock || my_hash_insert(&mdl_locks, (uchar*)lock))
     {
-      release_lock_object(lock);
+      free_ticket_object(ticket);
+      free_lock_object(lock);
       goto err;
     }
-    lock_data->state= MDL_ACQUIRED;
-    lock_data->lock= lock;
+    lock->type= MDL_LOCK::EXCLUSIVE;
+    lock->granted.push_front(ticket);
+    context->tickets.push_front(ticket);
+    ticket->state= MDL_ACQUIRED;
+    lock_req->ticket= ticket;
+    ticket->ctx= context;
+    ticket->lock= lock;
+    ticket->type= lock_req->type;
     global_lock.active_intention_exclusive++;
     pthread_mutex_unlock(&LOCK_mdl);
     return FALSE;
@@ -1183,9 +1243,9 @@ bool mdl_acquire_global_shared_lock(MDL_
 
 bool mdl_wait_for_locks(MDL_CONTEXT *context)
 {
-  MDL_LOCK_DATA *lock_data;
   MDL_LOCK *lock;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
+  MDL_LOCK_REQUEST *lock_req;
+  MDL_CONTEXT::Request_iterator it(context->requests);
   const char *old_msg;
   st_my_thread_var *mysys_var= my_thread_var;
 
@@ -1207,22 +1267,23 @@ bool mdl_wait_for_locks(MDL_CONTEXT *con
     pthread_mutex_lock(&LOCK_mdl);
     old_msg= MDL_ENTER_COND(context, mysys_var);
     it.rewind();
-    while ((lock_data= it++))
+    while ((lock_req= it++))
     {
-      DBUG_ASSERT(lock_data->state == MDL_INITIALIZED);
-      if (!can_grant_global_lock(lock_data))
+      MDL_KEY *key= &lock_req->key;
+      DBUG_ASSERT(lock_req->ticket == NULL);
+      if (!can_grant_global_lock(lock_req->type, FALSE))
         break;
       /*
         To avoid starvation we don't wait if we have a conflict against
         request for MDL_EXCLUSIVE lock.
       */
-      if (is_shared(lock_data) &&
-          (lock= (MDL_LOCK *)my_hash_search(&mdl_locks, (uchar*)lock_data->key,
-                                            lock_data->key_length)) &&
-          !can_grant_lock(lock, lock_data))
+      if (is_shared(lock_req) &&
+          (lock= (MDL_LOCK*) my_hash_search(&mdl_locks, key->ptr(),
+                                            key->length())) &&
+          !can_grant_lock(context, lock, lock_req->type, FALSE))
         break;
     }
-    if (!lock_data)
+    if (!lock_req)
     {
       pthread_mutex_unlock(&LOCK_mdl);
       break;
@@ -1237,58 +1298,48 @@ bool mdl_wait_for_locks(MDL_CONTEXT *con
 
 /**
    Auxiliary function which allows to release particular lock
-   ownership of which is represented by lock request object.
+   ownership of which is represented by a lock ticket object.
 */
 
-static void release_lock(MDL_LOCK_DATA *lock_data)
+static void release_ticket(MDL_CONTEXT *context, MDL_LOCK_TICKET *ticket)
 {
-  MDL_LOCK *lock;
+  MDL_LOCK *lock= ticket->lock;
+  DBUG_ENTER("release_ticket");
+  DBUG_PRINT("enter", ("db=%s name=%s", lock->key.db_name(),
+                                        lock->key.table_name()));
+
+  safe_mutex_assert_owner(&LOCK_mdl);
+
+  context->tickets.remove(ticket);
 
-  DBUG_ENTER("release_lock");
-  DBUG_PRINT("enter", ("db=%s name=%s", lock_data->key + 4,
-                        lock_data->key + 4 + strlen(lock_data->key + 4) + 1));
+  switch (ticket->type)
+  {
+    case MDL_SHARED_UPGRADABLE:
+      global_lock.active_intention_exclusive--;
+      /* Fallthrough. */
+    case MDL_SHARED:
+    case MDL_SHARED_HIGH_PRIO:
+      lock->granted.remove(ticket);
+      break;
+    case MDL_EXCLUSIVE:
+      lock->type= MDL_LOCK::SHARED;
+      lock->granted.remove(ticket);
+      global_lock.active_intention_exclusive--;
+      break;
+    default:
+      DBUG_ASSERT(0);
+  }
 
-  DBUG_ASSERT(lock_data->state == MDL_PENDING ||
-              lock_data->state == MDL_ACQUIRED);
+  free_ticket_object(ticket);
 
-  lock= lock_data->lock;
-  if (lock->has_one_lock_data())
+  if (lock->is_empty())
   {
     my_hash_delete(&mdl_locks, (uchar *)lock);
     DBUG_PRINT("info", ("releasing cached_object cached_object=%p",
                         lock->cached_object));
     if (lock->cached_object)
       (*lock->cached_object_release_hook)(lock->cached_object);
-    release_lock_object(lock);
-    if (lock_data->state == MDL_ACQUIRED &&
-        (lock_data->type == MDL_EXCLUSIVE ||
-         lock_data->type == MDL_SHARED_UPGRADABLE))
-      global_lock.active_intention_exclusive--;
-  }
-  else
-  {
-    switch (lock_data->type)
-    {
-      case MDL_SHARED_UPGRADABLE:
-        global_lock.active_intention_exclusive--;
-        /* Fallthrough. */
-      case MDL_SHARED:
-      case MDL_SHARED_HIGH_PRIO:
-        lock->active_shared.remove(lock_data);
-        break;
-      case MDL_EXCLUSIVE:
-        if (lock_data->state == MDL_PENDING)
-          lock->waiting_exclusive.remove(lock_data);
-        else
-        {
-          lock->active_exclusive.remove(lock_data);
-          global_lock.active_intention_exclusive--;
-        }
-        break;
-      default:
-        DBUG_ASSERT(0);
-    }
-    lock->lock_data_count--;
+    free_lock_object(lock);
   }
 
   DBUG_VOID_RETURN;
@@ -1307,42 +1358,38 @@ static void release_lock(MDL_LOCK_DATA *
                   are associated.
 */
 
-void mdl_release_locks(MDL_CONTEXT *context)
+void mdl_ticket_release_all(MDL_CONTEXT *context)
 {
-  MDL_LOCK_DATA *lock_data;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
-  DBUG_ENTER("mdl_release_locks");
+  MDL_LOCK_TICKET *ticket;
+  MDL_CONTEXT::Ticket_iterator it(context->tickets);
+  DBUG_ENTER("mdl_ticket_release_all");
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
+  /* Detach lock tickets from the requests for back off. */
+  {
+    MDL_LOCK_REQUEST *lock_req;
+    MDL_CONTEXT::Request_iterator it(context->requests);
+
+    while ((lock_req= it++))
+      lock_req->ticket= NULL;
+  }
+
+  if (context->tickets.is_empty())
+    DBUG_VOID_RETURN;
+
   pthread_mutex_lock(&LOCK_mdl);
-  while ((lock_data= it++))
+  while ((ticket= it++))
   {
-    DBUG_PRINT("info", ("found lock to release lock_data=%p", lock_data));
-    /*
-      Don't call release_lock() for a shared lock if has not been
-      granted. Lock state in this case is MDL_INITIALIZED.
-      We have pending and granted shared locks in the same context
-      when this function is called from the "back-off" path of
-      open_tables().
-    */
-    if (lock_data->state != MDL_INITIALIZED)
-    {
-      release_lock(lock_data);
-      lock_data->state= MDL_INITIALIZED;
-#ifndef DBUG_OFF
-      lock_data->lock= 0;
-#endif
-    }
-    /*
-      We will return lock request to its initial state only in
-      mdl_remove_all_locks() since we need to know type of lock
-      request in mdl_wait_for_locks().
-    */
+    DBUG_PRINT("info", ("found lock to release ticket=%p", ticket));
+    release_ticket(context, ticket);
   }
   /* Inefficient but will do for a while */
   pthread_cond_broadcast(&COND_mdl);
   pthread_mutex_unlock(&LOCK_mdl);
+
+  context->tickets.empty();
+
   DBUG_VOID_RETURN;
 }
 
@@ -1351,20 +1398,17 @@ void mdl_release_locks(MDL_CONTEXT *cont
    Release a lock.
 
    @param context   Context containing lock in question
-   @param lock_data Lock to be released
+   @param ticket    Lock to be released
 
 */
 
-void mdl_release_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data)
+void mdl_ticket_release(MDL_CONTEXT *context, MDL_LOCK_TICKET *ticket)
 {
+  DBUG_ASSERT(context == ticket->ctx);
   safe_mutex_assert_not_owner(&LOCK_open);
 
   pthread_mutex_lock(&LOCK_mdl);
-  release_lock(lock_data);
-#ifndef DBUG_OFF
-  lock_data->lock= 0;
-#endif
-  lock_data->state= MDL_INITIALIZED;
+  release_ticket(context, ticket);
   pthread_cond_broadcast(&COND_mdl);
   pthread_mutex_unlock(&LOCK_mdl);
 }
@@ -1375,34 +1419,43 @@ void mdl_release_lock(MDL_CONTEXT *conte
    object as this lock request, remove lock requests from the context.
 
    @param context   Context containing locks in question
-   @param lock_data One of the locks for the name/object for which all
+   @param ticket    One of the locks for the name/object for which all
                     locks should be released.
 */
 
-void mdl_release_and_remove_all_locks_for_name(MDL_CONTEXT *context,
-                                               MDL_LOCK_DATA *lock_data)
+void mdl_ticket_release_all_for_name(MDL_CONTEXT *context,
+                                     MDL_LOCK_TICKET *ticket)
 {
   MDL_LOCK *lock;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
-
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED);
 
   /*
-    We can use MDL_LOCK_DATA::lock here to identify other locks for the same
-    object since even altough MDL_LOCK object might be reused for different
+    We can use MDL_LOCK_TICKET::lock here to identify other locks for the same
+    object since even though MDL_LOCK object might be reused for different
     lock after the first lock for this object have been released we can't
     have references to this other MDL_LOCK object in this context.
   */
-  lock= lock_data->lock;
+  lock= ticket->lock;
+
+  /* Remove matching lock requests from the context. */
+  MDL_LOCK_REQUEST *lock_req;
+  MDL_CONTEXT::Request_iterator it_lock_req(context->requests);
 
-  while ((lock_data= it++))
+  while ((lock_req= it_lock_req++))
   {
-    DBUG_ASSERT(lock_data->state == MDL_ACQUIRED);
-    if (lock_data->lock == lock)
-    {
-      mdl_release_lock(context, lock_data);
-      mdl_remove_lock(context, lock_data);
-    }
+    DBUG_ASSERT(lock_req->ticket && lock_req->ticket->state == MDL_ACQUIRED);
+    if (lock_req->ticket->lock == lock)
+      mdl_request_remove(context, lock_req);
+  }
+
+  /* Remove matching lock tickets from the context. */
+  MDL_LOCK_TICKET *lock_tkt;
+  MDL_CONTEXT::Ticket_iterator it_lock_tkt(context->tickets);
+
+  while ((lock_tkt= it_lock_tkt++))
+  {
+    DBUG_ASSERT(lock_tkt->state == MDL_ACQUIRED);
+    if (lock_tkt->lock == lock)
+      mdl_ticket_release(context, lock_tkt);
   }
 }
 
@@ -1411,27 +1464,26 @@ void mdl_release_and_remove_all_locks_fo
    Downgrade an exclusive lock to shared metadata lock.
 
    @param context   A context to which exclusive lock belongs
-   @param lock_data Satisfied request for exclusive lock to be downgraded
+   @param ticket    Ticket for exclusive lock to be downgraded
 */
 
 void mdl_downgrade_exclusive_lock(MDL_CONTEXT *context,
-                                  MDL_LOCK_DATA *lock_data)
+                                  MDL_LOCK_TICKET *ticket)
 {
   MDL_LOCK *lock;
 
-  safe_mutex_assert_not_owner(&LOCK_open);
+  DBUG_ASSERT(context == ticket->ctx);
 
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED);
+  safe_mutex_assert_not_owner(&LOCK_open);
 
-  if (is_shared(lock_data))
+  if (is_shared(ticket))
     return;
 
-  lock= lock_data->lock;
+  lock= ticket->lock;
 
   pthread_mutex_lock(&LOCK_mdl);
-  lock->active_exclusive.remove(lock_data);
-  lock_data->type= MDL_SHARED_UPGRADABLE;
-  lock->active_shared.push_front(lock_data);
+  lock->type= MDL_LOCK::SHARED;
+  ticket->type= MDL_SHARED_UPGRADABLE;
   pthread_cond_broadcast(&COND_mdl);
   pthread_mutex_unlock(&LOCK_mdl);
 }
@@ -1469,30 +1521,29 @@ void mdl_release_global_shared_lock(MDL_
            FALSE otherwise.
 */
 
-bool mdl_is_exclusive_lock_owner(MDL_CONTEXT *context, int type,
+bool mdl_is_exclusive_lock_owner(MDL_CONTEXT *context, unsigned char type,
                                  const char *db, const char *name)
 {
-  char key[MAX_MDLKEY_LENGTH];
-  uint key_length;
-  MDL_LOCK_DATA *lock_data;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
+  MDL_KEY key;
+  MDL_LOCK_TICKET *ticket;
+  MDL_CONTEXT::Ticket_iterator it(context->tickets);
 
-  int4store(key, type);
-  key_length= (uint) (strmov(strmov(key+4, db)+1, name)-key)+1;
+  key.mdl_key_init(type, db, name);
 
-  while ((lock_data= it++) &&
-         (lock_data->key_length != key_length ||
-          memcmp(lock_data->key, key, key_length) ||
-          !(lock_data->type == MDL_EXCLUSIVE &&
-            lock_data->state == MDL_ACQUIRED)))
-    continue;
-  return lock_data;
+  while ((ticket= it++))
+  {
+    if (ticket->lock->type == MDL_LOCK::EXCLUSIVE &&
+        ticket->lock->key.is_equal(&key))
+      break;
+  }
+
+  return ticket;
 }
 
 
 /**
-   Auxiliary function which allows to check if we some kind of lock on
-   the object.
+   Auxiliary function which allows to check if we have some kind of lock on
+   a object.
 
    @param context Current context
    @param type    Id of object type
@@ -1503,24 +1554,22 @@ bool mdl_is_exclusive_lock_owner(MDL_CON
            FALSE otherwise.
 */
 
-bool mdl_is_lock_owner(MDL_CONTEXT *context, int type, const char *db,
-                        const char *name)
+bool mdl_is_lock_owner(MDL_CONTEXT *context, unsigned char type,
+                       const char *db, const char *name)
 {
-  char key[MAX_MDLKEY_LENGTH];
-  uint key_length;
-  MDL_LOCK_DATA *lock_data;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
-
-  int4store(key, type);
-  key_length= (uint) (strmov(strmov(key+4, db)+1, name)-key)+1;
-
-  while ((lock_data= it++) &&
-         (lock_data->key_length != key_length ||
-          memcmp(lock_data->key, key, key_length) ||
-          lock_data->state != MDL_ACQUIRED))
-    continue;
+  MDL_KEY key;
+  MDL_LOCK_TICKET *ticket;
+  MDL_CONTEXT::Ticket_iterator it(context->tickets);
+
+  key.mdl_key_init(type, db, name);
 
-  return lock_data;
+  while ((ticket= it++))
+  {
+    if (ticket->lock->key.is_equal(&key))
+      break;
+  }
+
+  return ticket;
 }
 
 
@@ -1528,21 +1577,20 @@ bool mdl_is_lock_owner(MDL_CONTEXT *cont
    Check if we have any pending exclusive locks which conflict with
    existing shared lock.
 
-   @param lock_data Shared lock against which check should be performed.
+   @param ticket Shared lock against which check should be performed.
 
    @return TRUE if there are any conflicting locks, FALSE otherwise.
 */
 
-bool mdl_has_pending_conflicting_lock(MDL_LOCK_DATA *lock_data)
+bool mdl_has_pending_conflicting_lock(MDL_LOCK_TICKET *ticket)
 {
   bool result;
 
-  DBUG_ASSERT(is_shared(lock_data) && lock_data->state == MDL_ACQUIRED);
+  DBUG_ASSERT(is_shared(ticket));
   safe_mutex_assert_not_owner(&LOCK_open);
 
   pthread_mutex_lock(&LOCK_mdl);
-  result= !(lock_data->lock->waiting_exclusive.is_empty() &&
-            lock_data->lock->active_shared_waiting_upgrade.is_empty());
+  result= !ticket->lock->waiting.is_empty();
   pthread_mutex_unlock(&LOCK_mdl);
   return result;
 }
@@ -1551,8 +1599,8 @@ bool mdl_has_pending_conflicting_lock(MD
 /**
    Associate pointer to an opaque object with a lock.
 
-   @param lock_data     Lock request for the lock with which the
-                        object should be associated.
+   @param ticket        Lock ticket for the lock with which the object
+                        should be associated.
    @param cached_object Pointer to the object
    @param release_hook  Cleanup function to be called when MDL subsystem
                         decides to remove lock or associate another object.
@@ -1576,27 +1624,24 @@ bool mdl_has_pending_conflicting_lock(MD
   lock on this name is released.
 */
 
-void mdl_set_cached_object(MDL_LOCK_DATA *lock_data, void *cached_object,
+void mdl_set_cached_object(MDL_LOCK_TICKET *ticket, void *cached_object,
                            mdl_cached_object_release_hook release_hook)
 {
+  MDL_LOCK *lock= ticket->lock;
   DBUG_ENTER("mdl_set_cached_object");
-  DBUG_PRINT("enter", ("db=%s name=%s cached_object=%p", lock_data->key + 4,
-                       lock_data->key + 4 + strlen(lock_data->key + 4) + 1,
-                       cached_object));
-
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED ||
-              lock_data->state == MDL_PENDING_UPGRADE);
-
+  DBUG_PRINT("enter", ("db=%s name=%s cached_object=%p",
+                        lock->key.db_name(), lock->key.table_name(),
+                        cached_object));
   /*
     TODO: This assumption works now since we do mdl_get_cached_object()
           and mdl_set_cached_object() in the same critical section. Once
           this becomes false we will have to call release_hook here and
           use additional mutex protecting 'cached_object' member.
   */
-  DBUG_ASSERT(!lock_data->lock->cached_object);
+  DBUG_ASSERT(!lock->cached_object);
 
-  lock_data->lock->cached_object= cached_object;
-  lock_data->lock->cached_object_release_hook= release_hook;
+  lock->cached_object= cached_object;
+  lock->cached_object_release_hook= release_hook;
 
   DBUG_VOID_RETURN;
 }
@@ -1605,15 +1650,46 @@ void mdl_set_cached_object(MDL_LOCK_DATA
 /**
    Get a pointer to an opaque object that associated with the lock.
 
-   @param  lock_data Lock request for the lock with which the object is
-                     associated.
+   @param ticket  Lock ticket for the lock which the object is associated to.
 
    @return Pointer to an opaque object associated with the lock.
 */
 
-void* mdl_get_cached_object(MDL_LOCK_DATA *lock_data)
+void* mdl_get_cached_object(MDL_LOCK_TICKET *ticket)
+{
+  return ticket->lock->cached_object;
+}
+
+
+
+/**
+  Releases metadata locks that were acquired after a specific savepoint.
+
+  @note Used to release tickets acquired during a savepoint unit.
+  @note It's safe to iterate and unlock any locks after taken after this
+        savepoint because other statements that take other special locks
+        cause a implicit commit (ie LOCK TABLES).
+
+  @param thd  Current thread
+  @param sv   Savepoint
+*/
+
+void mdl_rollback_to_savepoint(MDL_CONTEXT *ctx,
+                               MDL_LOCK_TICKET *mdl_savepoint)
 {
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED ||
-              lock_data->state == MDL_PENDING_UPGRADE);
-  return lock_data->lock->cached_object;
+  MDL_LOCK_TICKET *mdl_lock_ticket;
+  MDL_CONTEXT::Ticket_iterator it(ctx->tickets);
+  DBUG_ENTER("mdl_rollback_to_savepoint");
+
+  while ((mdl_lock_ticket= it++))
+  {
+    /* Stop when lock was acquired before this savepoint. */
+    if (mdl_lock_ticket == mdl_savepoint)
+      break;
+    mdl_ticket_release(ctx, mdl_lock_ticket);
+  }
+
+  DBUG_VOID_RETURN;
 }
+
+

=== modified file 'sql/mdl.h'
--- a/sql/mdl.h	2009-12-02 16:31:57 +0000
+++ b/sql/mdl.h	2009-12-03 23:29:40 +0000
@@ -23,7 +23,8 @@
 
 class THD;
 
-struct MDL_LOCK_DATA;
+struct MDL_LOCK_REQUEST;
+struct MDL_LOCK_TICKET;
 struct MDL_LOCK;
 struct MDL_CONTEXT;
 
@@ -43,87 +44,147 @@ enum enum_mdl_type {MDL_SHARED=0, MDL_SH
                     MDL_SHARED_UPGRADABLE, MDL_EXCLUSIVE};
 
 
-/** States which metadata lock request can have. */
+/** States which a metadata lock ticket can have. */
 
-enum enum_mdl_state {MDL_INITIALIZED=0, MDL_PENDING,
-                     MDL_ACQUIRED, MDL_PENDING_UPGRADE};
+enum enum_mdl_state { MDL_PENDING, MDL_ACQUIRED };
+
+
+/** Maximal length of key for metadata locking subsystem. */
+#define MAX_MDLKEY_LENGTH (1 + NAME_LEN + 1 + NAME_LEN + 1)
 
 
 /**
-   A pending lock request or a granted metadata lock. A lock is requested
-   or granted based on a fully qualified name and type. E.g. for a table
-   the key consists of <0 (=table)>+<database name>+<table name>.
-   Later in this document this triple will be referred to simply as
-   "key" or "name".
+   Metadata lock object key.
+
+   A lock is requested or granted based on a fully qualified name and type.
+   E.g. They key for a table consists of <0 (=table)>+<database>+<table name>.
+   Elsewhere in the comments this triple will be referred to simply as "key"
+   or "name".
 */
 
-struct MDL_LOCK_DATA
+class MDL_KEY
 {
-  char          *key;
-  uint          key_length;
-  enum          enum_mdl_type type;
-  enum          enum_mdl_state state;
+public:
+  const uchar *ptr() const { return (uchar*) m_ptr; }
+  uint length() const { return m_length; }
+
+  const char *db_name() const { return m_ptr + 1; }
+  uint db_name_length() const { return m_db_name_length; }
+
+  const char *table_name() const { return m_ptr + m_db_name_length + 2; }
+  uint table_name_length() const { return m_length - m_db_name_length - 3; }
 
-private:
-  /**
-     Pointers for participating in the list of lock requests for this context.
-  */
-  MDL_LOCK_DATA *next_context;
-  MDL_LOCK_DATA **prev_context;
   /**
-     Pointers for participating in the list of satisfied/pending requests
-     for the lock.
-  */
-  MDL_LOCK_DATA *next_lock;
-  MDL_LOCK_DATA **prev_lock;
+    Construct a metadata lock key from a triplet (type, database and name).
 
-  friend struct MDL_LOCK_DATA_context;
-  friend struct MDL_LOCK_DATA_lock;
+    @remark The key for a table is <0 (=table)>+<database name>+<table name>
 
-public:
-  /*
-    Pointer to the lock object for this lock request. Valid only if this lock
-    request is satisified or is present in the list of pending lock requests
-    for particular lock.
+    @param  type   Id of type of object to be locked
+    @param  db     Name of database to which the object belongs
+    @param  name   Name of of the object
+    @param  key    Where to store the the MDL key.
   */
-  MDL_LOCK      *lock;
-  MDL_CONTEXT   *ctx;
+  void mdl_key_init(char type, const char *db, const char *name)
+  {
+    m_ptr[0]= type;
+    m_db_name_length= (uint) (strmov(m_ptr + 1, db) - m_ptr - 1);
+    m_length= (uint) (strmov(m_ptr + m_db_name_length + 2, name) - m_ptr + 1);
+  }
+  void mdl_key_init(const MDL_KEY *rhs)
+  {
+    memcpy(m_ptr, rhs->m_ptr, rhs->m_length);
+    m_length= rhs->m_length;
+    m_db_name_length= rhs->m_db_name_length;
+  }
+  bool is_equal(const MDL_KEY *rhs) const
+  {
+    return (m_length == rhs->m_length &&
+            memcmp(m_ptr, rhs->m_ptr, m_length) == 0);
+  }
+private:
+  char m_ptr[MAX_MDLKEY_LENGTH];
+  uint m_length;
+  uint m_db_name_length;
 };
 
 
+
 /**
-   Helper class which specifies which members of MDL_LOCK_DATA are used for
-   participation in the list lock requests belonging to one context.
+   Hook class which via its methods specifies which members
+   of T should be used for participating in MDL lists.
 */
 
-struct MDL_LOCK_DATA_context
+template <typename T, T* T::*next, T** T::*prev>
+struct I_P_List_adapter
 {
-  static inline MDL_LOCK_DATA **next_ptr(MDL_LOCK_DATA *l)
-  {
-    return &l->next_context;
-  }
-  static inline MDL_LOCK_DATA ***prev_ptr(MDL_LOCK_DATA *l)
-  {
-    return &l->prev_context;
-  }
+  static inline T **next_ptr(T *el) { return &(el->*next); }
+
+  static inline T ***prev_ptr(T *el) { return &(el->*prev); }
 };
 
 
 /**
-   Helper class which specifies which members of MDL_LOCK_DATA are used for
-   participation in the list of satisfied/pending requests for the lock.
+   A pending metadata lock request.
+   A pending lock request or a granted metadata lock share the same abstract
+   base but are presented individually because they have different allocation
+   sites and hence different lifetimes. The allocation of lock requests is
+   controlled from outside of the MDL subsystem, while allocation of granted
+   locks (tickets) is controlled within the MDL subsystem.
 */
 
-struct MDL_LOCK_DATA_lock
+struct MDL_LOCK_REQUEST
 {
-  static inline MDL_LOCK_DATA **next_ptr(MDL_LOCK_DATA *l)
-  {
-    return &l->next_lock;
-  }
-  static inline MDL_LOCK_DATA ***prev_ptr(MDL_LOCK_DATA *l)
-  {
-    return &l->prev_lock;
-  }
+  /** Type of metadata lock. */
+  enum          enum_mdl_type type;
+
+  /**
+     Pointers for participating in the list of lock requests for this context.
+  */
+  MDL_LOCK_REQUEST *next_in_context;
+  MDL_LOCK_REQUEST **prev_in_context;
+  /** A lock is requested based on a fully qualified name and type. */
+  MDL_KEY key;
+
+  /**
+    Pointer to the lock ticket object for this lock request.
+    Valid only if this lock request is satisfied.
+  */
+  MDL_LOCK_TICKET *ticket;
+};
+
+
+/**
+   A granted metadata lock.
+
+   @warning MDL_LOCK_TICKET members are private to the MDL subsystem.
+
+   @note Multiple shared locks on a same object are represented by a
+         single ticket. The same does not apply for other lock types.
+*/
+
+struct MDL_LOCK_TICKET
+{
+  /** Type of metadata lock. */
+  enum          enum_mdl_type type;
+  /** State of the metadata lock ticket. */
+  enum enum_mdl_state state;
+
+  /**
+     Pointers for participating in the list of lock requests for this context.
+  */
+  MDL_LOCK_TICKET *next_in_context;
+  MDL_LOCK_TICKET **prev_in_context;
+  /**
+     Pointers for participating in the list of satisfied/pending requests
+     for the lock.
+  */
+  MDL_LOCK_TICKET *next_in_lock;
+  MDL_LOCK_TICKET **prev_in_lock;
+  /** Context of the owner of the metadata lock ticket. */
+  MDL_CONTEXT *ctx;
+
+  /** Pointer to the lock object for this lock ticket. */
+  MDL_LOCK *lock;
 };
 
 
@@ -134,7 +195,24 @@ struct MDL_LOCK_DATA_lock
 
 struct MDL_CONTEXT
 {
-  I_P_List <MDL_LOCK_DATA, MDL_LOCK_DATA_context> locks;
+  typedef I_P_List<MDL_LOCK_REQUEST,
+                   I_P_List_adapter<MDL_LOCK_REQUEST,
+                                    &MDL_LOCK_REQUEST::next_in_context,
+                                    &MDL_LOCK_REQUEST::prev_in_context> >
+          Request_list;
+
+  typedef Request_list::Iterator Request_iterator;
+
+  typedef I_P_List<MDL_LOCK_TICKET,
+                   I_P_List_adapter<MDL_LOCK_TICKET,
+                                    &MDL_LOCK_TICKET::next_in_context,
+                                    &MDL_LOCK_TICKET::prev_in_context> >
+          Ticket_list;
+
+  typedef Ticket_list::Iterator Ticket_iterator;
+
+  Request_list requests;
+  Ticket_list tickets;
   bool has_global_shared_lock;
   THD      *thd;
 };
@@ -149,96 +227,80 @@ void mdl_context_backup_and_reset(MDL_CO
 void mdl_context_restore(MDL_CONTEXT *ctx, MDL_CONTEXT *backup);
 void mdl_context_merge(MDL_CONTEXT *target, MDL_CONTEXT *source);
 
-/** Maximal length of key for metadata locking subsystem. */
-#define MAX_MDLKEY_LENGTH (4 + NAME_LEN + 1 + NAME_LEN + 1)
-
-void mdl_init_lock(MDL_LOCK_DATA *lock_data, char *key, int type,
-                   const char *db, const char *name);
-MDL_LOCK_DATA *mdl_alloc_lock(int type, const char *db, const char *name,
-                              MEM_ROOT *root);
-void mdl_add_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data);
-void mdl_remove_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data);
-void mdl_remove_all_locks(MDL_CONTEXT *context);
+void mdl_request_init(MDL_LOCK_REQUEST *lock_req, unsigned char type,
+                      const char *db, const char *name);
+MDL_LOCK_REQUEST *mdl_request_alloc(unsigned char type, const char *db,
+                                    const char *name, MEM_ROOT *root);
+void mdl_request_add(MDL_CONTEXT *context, MDL_LOCK_REQUEST *lock_req);
+void mdl_request_remove(MDL_CONTEXT *context, MDL_LOCK_REQUEST *lock_req);
+void mdl_request_remove_all(MDL_CONTEXT *context);
 
 /**
    Set type of lock request. Can be only applied to pending locks.
 */
 
-inline void mdl_set_lock_type(MDL_LOCK_DATA *lock_data, enum_mdl_type lock_type)
+inline void mdl_request_set_type(MDL_LOCK_REQUEST *lock_req, enum_mdl_type lock_type)
 {
-  DBUG_ASSERT(lock_data->state == MDL_INITIALIZED);
-  lock_data->type= lock_type;
+  DBUG_ASSERT(lock_req->ticket == NULL);
+  lock_req->type= lock_type;
 }
 
-bool mdl_acquire_shared_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data,
+bool mdl_acquire_shared_lock(MDL_CONTEXT *context, MDL_LOCK_REQUEST *lock_req,
                              bool *retry);
 bool mdl_acquire_exclusive_locks(MDL_CONTEXT *context);
 bool mdl_upgrade_shared_lock_to_exclusive(MDL_CONTEXT *context,
-                                          MDL_LOCK_DATA *lock_data);
+                                          MDL_LOCK_TICKET *ticket);
 bool mdl_try_acquire_exclusive_lock(MDL_CONTEXT *context,
-                                    MDL_LOCK_DATA *lock_data,
+                                    MDL_LOCK_REQUEST *lock_req,
                                     bool *conflict);
 bool mdl_acquire_global_shared_lock(MDL_CONTEXT *context);
 
 bool mdl_wait_for_locks(MDL_CONTEXT *context);
 
-void mdl_release_locks(MDL_CONTEXT *context);
-void mdl_release_and_remove_all_locks_for_name(MDL_CONTEXT *context,
-                                               MDL_LOCK_DATA *lock_data);
-void mdl_release_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data);
+void mdl_ticket_release_all(MDL_CONTEXT *context);
+void mdl_ticket_release_all_for_name(MDL_CONTEXT *context,
+                                     MDL_LOCK_TICKET *ticket);
+void mdl_ticket_release(MDL_CONTEXT *context, MDL_LOCK_TICKET *ticket);
 void mdl_downgrade_exclusive_lock(MDL_CONTEXT *context,
-                                  MDL_LOCK_DATA *lock_data);
+                                  MDL_LOCK_TICKET *ticket);
 void mdl_release_global_shared_lock(MDL_CONTEXT *context);
 
-bool mdl_is_exclusive_lock_owner(MDL_CONTEXT *context, int type, const char *db,
-                                 const char *name);
-bool mdl_is_lock_owner(MDL_CONTEXT *context, int type, const char *db,
-                       const char *name);
+bool mdl_is_exclusive_lock_owner(MDL_CONTEXT *context, unsigned char type,
+                                 const char *db, const char *name);
+bool mdl_is_lock_owner(MDL_CONTEXT *context, unsigned char type,
+                       const char *db, const char *name);
 
-bool mdl_has_pending_conflicting_lock(MDL_LOCK_DATA *lock_data);
+bool mdl_has_pending_conflicting_lock(MDL_LOCK_TICKET *ticket);
 
 inline bool mdl_has_locks(MDL_CONTEXT *context)
 {
-  return !context->locks.is_empty();
+  return !context->tickets.is_empty();
 }
 
-
-/**
-   Get iterator for walking through all lock requests in the context.
-*/
-
-inline I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context>
-mdl_get_locks(MDL_CONTEXT *ctx)
+inline MDL_LOCK_TICKET *mdl_savepoint(MDL_CONTEXT *ctx)
 {
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> result(ctx->locks);
-  return result;
+  return ctx->tickets.head();
 }
 
-/**
-   Give metadata lock request object for the table get table definition
-   cache key corresponding to it.
-
-   @param lock_data  [in]  Lock request object for the table.
-   @param key        [out] LEX_STRING object where table definition cache key
-                           should be put.
+void mdl_rollback_to_savepoint(MDL_CONTEXT *ctx,
+                               MDL_LOCK_TICKET *mdl_savepoint);
 
-   @note This key will have the same life-time as this lock request object.
-
-   @note This is yet another place where border between MDL subsystem and the
-         rest of the server is broken. OTOH it allows to save some CPU cycles
-         and memory by avoiding generating these TDC keys from table list.
+/**
+   Get iterator for walking through all lock requests in the context.
 */
 
-inline void mdl_get_tdc_key(MDL_LOCK_DATA *lock_data, LEX_STRING *key)
+inline MDL_CONTEXT::Request_iterator
+mdl_get_requests(MDL_CONTEXT *ctx)
 {
-  key->str= lock_data->key + 4;
-  key->length= lock_data->key_length - 4;
+  MDL_CONTEXT::Request_iterator result(ctx->requests);
+  return result;
 }
 
 
+void mdl_get_tdc_key(MDL_LOCK_TICKET *ticket, LEX_STRING *key);
 typedef void (* mdl_cached_object_release_hook)(void *);
-void* mdl_get_cached_object(MDL_LOCK_DATA *lock_data);
-void mdl_set_cached_object(MDL_LOCK_DATA *lock_data, void *cached_object,
+void *mdl_get_cached_object(MDL_LOCK_TICKET *ticket);
+void mdl_set_cached_object(MDL_LOCK_TICKET *ticket, void *cached_object,
                            mdl_cached_object_release_hook release_hook);
 
 

=== modified file 'sql/sp_head.cc'
--- a/sql/sp_head.cc	2009-12-03 11:37:42 +0000
+++ b/sql/sp_head.cc	2009-12-03 23:29:40 +0000
@@ -3981,10 +3981,10 @@ sp_head::add_used_tables_to_table_list(T
       table->prelocking_placeholder= 1;
       table->belong_to_view= belong_to_view;
       table->trg_event_map= stab->trg_event_map;
-      table->mdl_lock_data= mdl_alloc_lock(0, table->db, table->table_name,
-                                           thd->locked_tables_root ?
-                                           thd->locked_tables_root :
-                                           thd->mem_root);
+      table->mdl_lock_request= mdl_request_alloc(0, table->db, table->table_name,
+                                                 thd->locked_tables_root ?
+                                                 thd->locked_tables_root :
+                                                 thd->mem_root);
 
       /* Everyting else should be zeroed */
 
@@ -4026,9 +4026,10 @@ sp_add_to_query_tables(THD *thd, LEX *le
   table->lock_type= locktype;
   table->select_lex= lex->current_select;
   table->cacheable_table= 1;
-  table->mdl_lock_data= mdl_alloc_lock(0, table->db, table->table_name,
-                                       thd->locked_tables_root ?
-                                       thd->locked_tables_root : thd->mem_root);
+  table->mdl_lock_request= mdl_request_alloc(0, table->db, table->table_name,
+                                             thd->locked_tables_root ?
+                                             thd->locked_tables_root :
+                                             thd->mem_root);
 
   lex->add_to_query_tables(table);
   return table;

=== modified file 'sql/sql_acl.cc'
--- a/sql/sql_acl.cc	2009-12-03 11:37:42 +0000
+++ b/sql/sql_acl.cc	2009-12-03 23:29:40 +0000
@@ -693,7 +693,7 @@ my_bool acl_reload(THD *thd)
   tables[0].lock_type=tables[1].lock_type=tables[2].lock_type=TL_READ;
   tables[0].skip_temporary= tables[1].skip_temporary=
     tables[2].skip_temporary= TRUE;
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, tables))
   {
@@ -1598,7 +1598,7 @@ bool change_password(THD *thd, const cha
   bzero((char*) &tables, sizeof(tables));
   tables.alias= tables.table_name= (char*) "user";
   tables.db= (char*) "mysql";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
 #ifdef HAVE_REPLICATION
   /*
@@ -3110,7 +3110,7 @@ int mysql_table_grant(THD *thd, TABLE_LI
 			    ? tables+2 : 0);
   tables[0].lock_type=tables[1].lock_type=tables[2].lock_type=TL_WRITE;
   tables[0].db=tables[1].db=tables[2].db=(char*) "mysql";
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   /*
     This statement will be replicated as a statement, even when using
@@ -3328,7 +3328,7 @@ bool mysql_routine_grant(THD *thd, TABLE
   tables[0].next_local= tables[0].next_global= tables+1;
   tables[0].lock_type=tables[1].lock_type=TL_WRITE;
   tables[0].db=tables[1].db=(char*) "mysql";
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   /*
     This statement will be replicated as a statement, even when using
@@ -3467,7 +3467,7 @@ bool mysql_grant(THD *thd, const char *d
   tables[0].next_local= tables[0].next_global= tables+1;
   tables[0].lock_type=tables[1].lock_type=TL_WRITE;
   tables[0].db=tables[1].db=(char*) "mysql";
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   /*
     This statement will be replicated as a statement, even when using
@@ -3801,7 +3801,7 @@ static my_bool grant_reload_procs_priv(T
   table.db= (char *) "mysql";
   table.lock_type= TL_READ;
   table.skip_temporary= 1;
-  alloc_mdl_locks(&table, thd->mem_root);
+  alloc_mdl_requests(&table, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &table))
   {
@@ -3868,7 +3868,7 @@ my_bool grant_reload(THD *thd)
   tables[0].next_local= tables[0].next_global= tables+1;
   tables[0].lock_type= tables[1].lock_type= TL_READ;
   tables[0].skip_temporary= tables[1].skip_temporary= TRUE;
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   /*
     To avoid deadlocks we should obtain table locks before
@@ -5215,7 +5215,7 @@ int open_grant_tables(THD *thd, TABLE_LI
     (tables+4)->lock_type= TL_WRITE;
   tables->db= (tables+1)->db= (tables+2)->db= 
     (tables+3)->db= (tables+4)->db= (char*) "mysql";
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
 #ifdef HAVE_REPLICATION
   /*

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2009-12-03 18:37:38 +0000
+++ b/sql/sql_base.cc	2009-12-03 23:29:40 +0000
@@ -1055,7 +1055,7 @@ err_with_reopen:
       than picking only those tables that were flushed.
     */
     for (TABLE *tab= thd->open_tables; tab; tab= tab->next)
-      mdl_downgrade_exclusive_lock(&thd->mdl_context, tab->mdl_lock_data);
+      mdl_downgrade_exclusive_lock(&thd->mdl_context, tab->mdl_lock_ticket);
   }
   DBUG_RETURN(result);
 }
@@ -1478,10 +1478,10 @@ void close_thread_tables(THD *thd,
   if (thd->open_tables)
     close_open_tables(thd);
 
-  mdl_release_locks(&thd->mdl_context);
+  mdl_ticket_release_all(&thd->mdl_context);
   if (!skip_mdl)
   {
-    mdl_remove_all_locks(&thd->mdl_context);
+    mdl_request_remove_all(&thd->mdl_context);
   }
   DBUG_VOID_RETURN;
 }
@@ -1500,7 +1500,7 @@ bool close_thread_table(THD *thd, TABLE 
 
   *table_ptr=table->next;
 
-  table->mdl_lock_data= 0;
+  table->mdl_lock_ticket= NULL;
   if (table->needs_reopen() ||
       thd->version != refresh_version || !table->db_stat)
   {
@@ -2096,7 +2096,7 @@ bool wait_while_table_is_used(THD *thd, 
   mysql_lock_abort(thd, table, TRUE);	/* end threads waiting on lock */
 
   if (mdl_upgrade_shared_lock_to_exclusive(&thd->mdl_context,
-                                           table->mdl_lock_data))
+                                           table->mdl_lock_ticket))
   {
     mysql_lock_downgrade_write(thd, table, old_lock_type);
     DBUG_RETURN(TRUE);
@@ -2279,11 +2279,11 @@ void table_share_release_hook(void *shar
 
 static bool
 open_table_get_mdl_lock(THD *thd, TABLE_LIST *table_list,
-                        MDL_LOCK_DATA *mdl_lock_data,
+                        MDL_LOCK_REQUEST *mdl_lock_request,
                         uint flags,
                         enum_open_table_action *action)
 {
-  mdl_add_lock(&thd->mdl_context, mdl_lock_data);
+  mdl_request_add(&thd->mdl_context, mdl_lock_request);
 
   if (table_list->open_type)
   {
@@ -2296,10 +2296,10 @@ open_table_get_mdl_lock(THD *thd, TABLE_
       shared locks. This invariant is preserved here and is also
       enforced by asserts in metadata locking subsystem.
     */
-    mdl_set_lock_type(mdl_lock_data, MDL_EXCLUSIVE);
+    mdl_request_set_type(mdl_lock_request, MDL_EXCLUSIVE);
     if (mdl_acquire_exclusive_locks(&thd->mdl_context))
     {
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      mdl_request_remove(&thd->mdl_context, mdl_lock_request);
       return 1;
     }
   }
@@ -2316,16 +2316,16 @@ open_table_get_mdl_lock(THD *thd, TABLE_
 
     if (flags & MYSQL_OPEN_TAKE_UPGRADABLE_MDL &&
         table_list->lock_type >= TL_WRITE_ALLOW_WRITE)
-      mdl_set_lock_type(mdl_lock_data, MDL_SHARED_UPGRADABLE);
+      mdl_request_set_type(mdl_lock_request, MDL_SHARED_UPGRADABLE);
     if (flags & MYSQL_LOCK_IGNORE_FLUSH)
-      mdl_set_lock_type(mdl_lock_data, MDL_SHARED_HIGH_PRIO);
+      mdl_request_set_type(mdl_lock_request, MDL_SHARED_HIGH_PRIO);
 
-    if (mdl_acquire_shared_lock(&thd->mdl_context, mdl_lock_data, &retry))
+    if (mdl_acquire_shared_lock(&thd->mdl_context, mdl_lock_request, &retry))
     {
       if (retry)
         *action= OT_BACK_OFF_AND_RETRY;
       else
-        mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+        mdl_request_remove(&thd->mdl_context, mdl_lock_request);
       return 1;
     }
   }
@@ -2380,7 +2380,8 @@ bool open_table(THD *thd, TABLE_LIST *ta
   char	key[MAX_DBKEY_LENGTH];
   uint	key_length;
   char	*alias= table_list->alias;
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_LOCK_REQUEST *mdl_lock_request;
+  MDL_LOCK_TICKET *mdl_lock_ticket;
   int error;
   TABLE_SHARE *share;
   DBUG_ENTER("open_table");
@@ -2559,14 +2560,21 @@ bool open_table(THD *thd, TABLE_LIST *ta
     This is the normal use case.
   */
 
-  mdl_lock_data= table_list->mdl_lock_data;
+  mdl_lock_request= table_list->mdl_lock_request;
   if (! (flags & MYSQL_OPEN_HAS_MDL_LOCK))
   {
-    if (open_table_get_mdl_lock(thd, table_list, mdl_lock_data, flags,
+    if (open_table_get_mdl_lock(thd, table_list, mdl_lock_request, flags,
                                 action))
       DBUG_RETURN(TRUE);
   }
 
+  /*
+    Grab reference to the granted MDL lock ticket. Must be done after
+    open_table_get_mdl_lock as the lock on the table might have been
+    acquired previously (MYSQL_OPEN_HAS_MDL_LOCK).
+  */
+  mdl_lock_ticket= mdl_lock_request->ticket;
+
   pthread_mutex_lock(&LOCK_open);
 
   /*
@@ -2608,7 +2616,7 @@ bool open_table(THD *thd, TABLE_LIST *ta
     DBUG_RETURN(FALSE);
   }
 
-  if (!(share= (TABLE_SHARE *)mdl_get_cached_object(mdl_lock_data)))
+  if (!(share= (TABLE_SHARE *)mdl_get_cached_object(mdl_lock_ticket)))
   {
     if (!(share= get_table_share_with_create(thd, table_list, key,
                                              key_length, OPEN_VIEW,
@@ -2679,7 +2687,7 @@ bool open_table(THD *thd, TABLE_LIST *ta
       so we need to increase reference counter;
     */
     reference_table_share(share);
-    mdl_set_cached_object(mdl_lock_data, share, table_share_release_hook);
+    mdl_set_cached_object(mdl_lock_ticket, share, table_share_release_hook);
   }
   else
   {
@@ -2788,9 +2796,9 @@ bool open_table(THD *thd, TABLE_LIST *ta
     lock on this table to shared metadata lock.
   */
   if (table_list->open_type == TABLE_LIST::OPEN_OR_CREATE)
-    mdl_downgrade_exclusive_lock(&thd->mdl_context, table_list->mdl_lock_data);
+    mdl_downgrade_exclusive_lock(&thd->mdl_context, mdl_lock_ticket);
 
-  table->mdl_lock_data= mdl_lock_data;
+  table->mdl_lock_ticket= mdl_lock_ticket;
 
   table->next=thd->open_tables;		/* Link into simple list */
   thd->open_tables=table;
@@ -2842,8 +2850,8 @@ err_unlock2:
   pthread_mutex_unlock(&LOCK_open);
   if (! (flags & MYSQL_OPEN_HAS_MDL_LOCK))
   {
-    mdl_release_lock(&thd->mdl_context, mdl_lock_data);
-    mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+    mdl_ticket_release(&thd->mdl_context, mdl_lock_ticket);
+    mdl_request_remove(&thd->mdl_context, mdl_lock_request);
   }
   DBUG_RETURN(TRUE);
 }
@@ -2961,7 +2969,7 @@ Locked_tables_list::init_locked_tables(T
     dst_table_list->init_one_table(db, db_len, table_name, table_name_len,
                                    alias,
                                    src_table_list->table->reginfo.lock_type);
-    dst_table_list->mdl_lock_data= src_table_list->mdl_lock_data;
+    dst_table_list->mdl_lock_request= src_table_list->mdl_lock_request;
     dst_table_list->table= table;
     memcpy(db, src_table_list->db, db_len + 1);
     memcpy(table_name, src_table_list->table_name, table_name_len + 1);
@@ -3012,6 +3020,8 @@ Locked_tables_list::unlock_locked_tables
     thd->locked_tables_mode= LTM_NONE;
 
     close_thread_tables(thd);
+
+    mdl_ticket_release_all(&thd->mdl_context);
   }
   /*
     After closing tables we can free memory used for storing lock
@@ -3496,20 +3506,21 @@ recover_from_failed_open_table_attempt(T
                                        enum_open_table_action action)
 {
   bool result= FALSE;
+  MDL_LOCK_REQUEST *mdl_lock_request= table->mdl_lock_request;
 
   switch (action)
   {
     case OT_BACK_OFF_AND_RETRY:
       result= (mdl_wait_for_locks(&thd->mdl_context) ||
                tdc_wait_for_old_versions(thd, &thd->mdl_context));
-      mdl_remove_all_locks(&thd->mdl_context);
+      mdl_request_remove_all(&thd->mdl_context);
       break;
     case OT_DISCOVER:
-      mdl_set_lock_type(table->mdl_lock_data, MDL_EXCLUSIVE);
-      mdl_add_lock(&thd->mdl_context, table->mdl_lock_data);
+      mdl_request_set_type(mdl_lock_request, MDL_EXCLUSIVE);
+      mdl_request_add(&thd->mdl_context, mdl_lock_request);
       if (mdl_acquire_exclusive_locks(&thd->mdl_context))
       {
-        mdl_remove_lock(&thd->mdl_context, table->mdl_lock_data);
+        mdl_request_remove(&thd->mdl_context, mdl_lock_request);
         return TRUE;
       }
       pthread_mutex_lock(&LOCK_open);
@@ -3519,15 +3530,15 @@ recover_from_failed_open_table_attempt(T
 
       thd->warning_info->clear_warning_info(thd->query_id);
       thd->clear_error();                 // Clear error message
-      mdl_release_lock(&thd->mdl_context, table->mdl_lock_data);
-      mdl_remove_lock(&thd->mdl_context, table->mdl_lock_data);
+      mdl_ticket_release(&thd->mdl_context, mdl_lock_request->ticket);
+      mdl_request_remove(&thd->mdl_context, mdl_lock_request);
       break;
     case OT_REPAIR:
-      mdl_set_lock_type(table->mdl_lock_data, MDL_EXCLUSIVE);
-      mdl_add_lock(&thd->mdl_context, table->mdl_lock_data);
+      mdl_request_set_type(mdl_lock_request, MDL_EXCLUSIVE);
+      mdl_request_add(&thd->mdl_context, mdl_lock_request);
       if (mdl_acquire_exclusive_locks(&thd->mdl_context))
       {
-        mdl_remove_lock(&thd->mdl_context, table->mdl_lock_data);
+        mdl_request_remove(&thd->mdl_context, mdl_lock_request);
         return TRUE;
       }
       pthread_mutex_lock(&LOCK_open);
@@ -3535,8 +3546,8 @@ recover_from_failed_open_table_attempt(T
       pthread_mutex_unlock(&LOCK_open);
 
       result= auto_repair_table(thd, table);
-      mdl_release_lock(&thd->mdl_context, table->mdl_lock_data);
-      mdl_remove_lock(&thd->mdl_context, table->mdl_lock_data);
+      mdl_ticket_release(&thd->mdl_context, mdl_lock_request->ticket);
+      mdl_request_remove(&thd->mdl_context, mdl_lock_request);
       break;
     default:
       DBUG_ASSERT(0);
@@ -7730,10 +7741,9 @@ void tdc_remove_table(THD *thd, enum_tdc
 
 static bool tdc_wait_for_old_versions(THD *thd, MDL_CONTEXT *context)
 {
-  MDL_LOCK_DATA *lock_data;
   TABLE_SHARE *share;
   const char *old_msg;
-  LEX_STRING key;
+  MDL_LOCK_REQUEST *lock_req;
 
   while (!thd->killed)
   {
@@ -7746,18 +7756,16 @@ static bool tdc_wait_for_old_versions(TH
     mysql_ha_flush(thd);
     pthread_mutex_lock(&LOCK_open);
 
-    I_P_List_iterator<MDL_LOCK_DATA,
-                      MDL_LOCK_DATA_context> it= mdl_get_locks(context);
-    while ((lock_data= it++))
-    {
-      mdl_get_tdc_key(lock_data, &key);
-      if ((share= (TABLE_SHARE*) my_hash_search(&table_def_cache, (uchar*) key.str,
-                                                key.length)) &&
+    MDL_CONTEXT::Request_iterator it= mdl_get_requests(context);
+    while ((lock_req= it++))
+    {
+      if ((share= get_cached_table_share(lock_req->key.db_name(),
+                                         lock_req->key.table_name())) &&
           share->version != refresh_version &&
           !share->used_tables.is_empty())
         break;
     }
-    if (!lock_data)
+    if (!lock_req)
     {
       pthread_mutex_unlock(&LOCK_open);
       break;
@@ -7971,7 +7979,7 @@ open_system_tables_for_read(THD *thd, TA
 
   DBUG_ENTER("open_system_tables_for_read");
 
-  alloc_mdl_locks(table_list, thd->mem_root);
+  alloc_mdl_requests(table_list, thd->mem_root);
 
   /*
     Besides using new Open_tables_state for opening system tables,
@@ -8046,7 +8054,7 @@ open_system_table_for_update(THD *thd, T
 {
   DBUG_ENTER("open_system_table_for_update");
 
-  alloc_mdl_locks(one_table, thd->mem_root);
+  alloc_mdl_requests(one_table, thd->mem_root);
 
   TABLE *table= open_ltable(thd, one_table, one_table->lock_type, 0);
   if (table)
@@ -8084,7 +8092,7 @@ open_performance_schema_table(THD *thd, 
 
   thd->reset_n_backup_open_tables_state(backup);
 
-  alloc_mdl_locks(one_table, thd->mem_root);
+  alloc_mdl_requests(one_table, thd->mem_root);
   if ((table= open_ltable(thd, one_table, one_table->lock_type, flags)))
   {
     DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_PERFORMANCE);
@@ -8161,8 +8169,8 @@ void close_performance_schema_table(THD 
 
   pthread_mutex_unlock(&LOCK_open);
 
-  mdl_release_locks(&thd->mdl_context);
-  mdl_remove_all_locks(&thd->mdl_context);
+  mdl_ticket_release_all(&thd->mdl_context);
+  mdl_request_remove_all(&thd->mdl_context);
 
   thd->restore_backup_open_tables_state(backup);
 }

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2009-12-03 22:46:14 +0000
+++ b/sql/sql_class.h	2009-12-03 23:29:40 +0000
@@ -779,6 +779,8 @@ struct st_savepoint {
   char                *name;
   uint                 length;
   Ha_trx_info         *ha_list;
+  /** Last acquired lock before this savepoint was set. */
+  MDL_LOCK_TICKET     *mdl_savepoint;
 };
 
 enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};

=== modified file 'sql/sql_delete.cc'
--- a/sql/sql_delete.cc	2009-12-03 18:37:38 +0000
+++ b/sql/sql_delete.cc	2009-12-03 23:29:40 +0000
@@ -1100,7 +1100,7 @@ bool mysql_truncate(THD *thd, TABLE_LIST
   TABLE *table;
   bool error;
   uint path_length;
-  MDL_LOCK_DATA *mdl_lock_data= 0;
+  MDL_LOCK_REQUEST *mdl_lock_request= NULL;
   DBUG_ENTER("mysql_truncate");
 
   bzero((char*) &create_info,sizeof(create_info));
@@ -1175,13 +1175,13 @@ bool mysql_truncate(THD *thd, TABLE_LIST
              tries to get table enging and therefore accesses table in some way
              without holding any kind of meta-data lock.
     */
-    mdl_lock_data= mdl_alloc_lock(0, table_list->db, table_list->table_name,
-                                  thd->mem_root);
-    mdl_set_lock_type(mdl_lock_data, MDL_EXCLUSIVE);
-    mdl_add_lock(&thd->mdl_context, mdl_lock_data);
+    mdl_lock_request= mdl_request_alloc(0, table_list->db,
+                                        table_list->table_name, thd->mem_root);
+    mdl_request_set_type(mdl_lock_request, MDL_EXCLUSIVE);
+    mdl_request_add(&thd->mdl_context, mdl_lock_request);
     if (mdl_acquire_exclusive_locks(&thd->mdl_context))
     {
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      mdl_request_remove(&thd->mdl_context, mdl_lock_request);
       DBUG_RETURN(TRUE);
     }
     pthread_mutex_lock(&LOCK_open);
@@ -1212,18 +1212,18 @@ end:
       write_bin_log(thd, TRUE, thd->query(), thd->query_length());
       my_ok(thd);		// This should return record count
     }
-    if (mdl_lock_data)
+    if (mdl_lock_request)
     {
-      mdl_release_lock(&thd->mdl_context, mdl_lock_data);
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      mdl_ticket_release(&thd->mdl_context, mdl_lock_request->ticket);
+      mdl_request_remove(&thd->mdl_context, mdl_lock_request);
     }
   }
   else if (error)
   {
-    if (mdl_lock_data)
+    if (mdl_lock_request)
     {
-      mdl_release_lock(&thd->mdl_context, mdl_lock_data);
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      mdl_ticket_release(&thd->mdl_context, mdl_lock_request->ticket);
+      mdl_request_remove(&thd->mdl_context, mdl_lock_request);
     }
   }
   DBUG_RETURN(error);

=== modified file 'sql/sql_handler.cc'
--- a/sql/sql_handler.cc	2009-12-02 16:31:57 +0000
+++ b/sql/sql_handler.cc	2009-12-03 23:29:40 +0000
@@ -125,7 +125,7 @@ static void mysql_ha_hash_free(TABLE_LIS
 static void mysql_ha_close_table(THD *thd, TABLE_LIST *tables)
 {
   TABLE **table_ptr;
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_LOCK_TICKET *mdl_lock_ticket;
 
   /*
     Though we could take the table pointer from hash_tables->table,
@@ -141,7 +141,7 @@ static void mysql_ha_close_table(THD *th
   if (*table_ptr)
   {
     (*table_ptr)->file->ha_index_or_rnd_end();
-    mdl_lock_data= (*table_ptr)->mdl_lock_data;
+    mdl_lock_ticket= (*table_ptr)->mdl_lock_ticket;
     pthread_mutex_lock(&LOCK_open);
     if (close_thread_table(thd, table_ptr))
     {
@@ -149,8 +149,8 @@ static void mysql_ha_close_table(THD *th
       broadcast_refresh();
     }
     pthread_mutex_unlock(&LOCK_open);
-    mdl_release_lock(&thd->handler_mdl_context, mdl_lock_data);
-    mdl_remove_lock(&thd->handler_mdl_context, mdl_lock_data);
+    mdl_ticket_release(&thd->handler_mdl_context, mdl_lock_ticket);
+    mdl_request_remove(&thd->handler_mdl_context, tables->mdl_lock_request);
   }
   else if (tables->table)
   {
@@ -190,12 +190,12 @@ static void mysql_ha_close_table(THD *th
 bool mysql_ha_open(THD *thd, TABLE_LIST *tables, bool reopen)
 {
   TABLE_LIST    *hash_tables = NULL;
-  MDL_LOCK_DATA *mdl_lock_data;
-  char          *db, *name, *alias, *mdlkey;
+  char          *db, *name, *alias;
   uint          dblen, namelen, aliaslen, counter;
   int           error;
   TABLE         *backup_open_tables;
   MDL_CONTEXT   backup_mdl_context;
+  MDL_LOCK_REQUEST *mdl_lock_request;
   DBUG_ENTER("mysql_ha_open");
   DBUG_PRINT("enter",("'%s'.'%s' as '%s'  reopen: %d",
                       tables->db, tables->table_name, tables->alias,
@@ -246,8 +246,7 @@ bool mysql_ha_open(THD *thd, TABLE_LIST 
                           &db, (uint) dblen,
                           &name, (uint) namelen,
                           &alias, (uint) aliaslen,
-                          &mdl_lock_data, sizeof(MDL_LOCK_DATA),
-                          &mdlkey, MAX_MDLKEY_LENGTH,
+                          &mdl_lock_request, sizeof(MDL_LOCK_REQUEST),
                           NullS)))
     {
       DBUG_PRINT("exit",("ERROR"));
@@ -261,8 +260,8 @@ bool mysql_ha_open(THD *thd, TABLE_LIST 
     memcpy(hash_tables->db, tables->db, dblen);
     memcpy(hash_tables->table_name, tables->table_name, namelen);
     memcpy(hash_tables->alias, tables->alias, aliaslen);
-    mdl_init_lock(mdl_lock_data, mdlkey, 0, db, name);
-    hash_tables->mdl_lock_data= mdl_lock_data;
+    mdl_request_init(mdl_lock_request, 0, db, name);
+    hash_tables->mdl_lock_request= mdl_lock_request;
 
     /* add to hash */
     if (my_hash_insert(&thd->handler_tables_hash, (uchar*) hash_tables))
@@ -801,11 +800,11 @@ void mysql_ha_flush(THD *thd)
   {
     hash_tables= (TABLE_LIST*) my_hash_element(&thd->handler_tables_hash, i);
     /*
-      TABLE::mdl_lock_data is 0 for temporary tables so we need extra check.
+      TABLE::mdl_lock_ticket is 0 for temporary tables so we need extra check.
     */
     if (hash_tables->table &&
-        (hash_tables->table->mdl_lock_data &&
-         mdl_has_pending_conflicting_lock(hash_tables->table->mdl_lock_data) ||
+        (hash_tables->table->mdl_lock_ticket &&
+         mdl_has_pending_conflicting_lock(hash_tables->table->mdl_lock_ticket) ||
          hash_tables->table->needs_reopen()))
       mysql_ha_close_table(thd, hash_tables);
   }

=== modified file 'sql/sql_insert.cc'
--- a/sql/sql_insert.cc	2009-12-03 18:37:38 +0000
+++ b/sql/sql_insert.cc	2009-12-03 23:29:40 +0000
@@ -2399,7 +2399,7 @@ pthread_handler_t handle_delayed_insert(
     thd->lex->set_stmt_unsafe();
     thd->set_current_stmt_binlog_row_based_if_mixed();
 
-    alloc_mdl_locks(&di->table_list, thd->mem_root);
+    alloc_mdl_requests(&di->table_list, thd->mem_root);
 
     if (di->open_and_lock_table())
       goto err;

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2009-12-03 22:46:14 +0000
+++ b/sql/sql_parse.cc	2009-12-03 23:29:40 +0000
@@ -1115,7 +1115,7 @@ bool dispatch_command(enum enum_server_c
       select_lex.table_list.link_in_list((uchar*) &table_list,
                                          (uchar**) &table_list.next_local);
     thd->lex->add_to_query_tables(&table_list);
-    alloc_mdl_locks(&table_list, thd->mem_root);
+    alloc_mdl_requests(&table_list, thd->mem_root);
 
     /* switch on VIEW optimisation: do not fill temporary tables */
     thd->lex->sql_command= SQLCOM_SHOW_FIELDS;
@@ -3324,7 +3324,7 @@ end_with_restore_list:
         !(need_start_waiting= !wait_if_global_read_lock(thd, 0, 1)))
       goto error;
 
-    alloc_mdl_locks(all_tables, thd->locked_tables_list.locked_tables_root());
+    alloc_mdl_requests(all_tables, thd->locked_tables_list.locked_tables_root());
 
     thd->options|= OPTION_TABLE_LOCK;
     thd->in_lock_tables=1;
@@ -5977,9 +5977,9 @@ TABLE_LIST *st_select_lex::add_table_to_
   ptr->next_name_resolution_table= NULL;
   /* Link table in global list (all used tables) */
   lex->add_to_query_tables(ptr);
-  ptr->mdl_lock_data= mdl_alloc_lock(0 , ptr->db, ptr->table_name,
-                                thd->locked_tables_root ?
-                                thd->locked_tables_root : thd->mem_root);
+  ptr->mdl_lock_request=
+    mdl_request_alloc(0, ptr->db, ptr->table_name, thd->locked_tables_root ?
+                      thd->locked_tables_root : thd->mem_root);
   DBUG_RETURN(ptr);
 }
 

=== modified file 'sql/sql_plist.h'
--- a/sql/sql_plist.h	2009-11-30 19:11:32 +0000
+++ b/sql/sql_plist.h	2009-12-03 23:29:40 +0000
@@ -90,6 +90,7 @@ public:
 #ifndef _lint
   friend class I_P_List_iterator<T, B>;
 #endif
+  typedef I_P_List_iterator<T, B> Iterator;
 };
 
 

=== modified file 'sql/sql_plugin.cc'
--- a/sql/sql_plugin.cc	2009-12-03 11:37:42 +0000
+++ b/sql/sql_plugin.cc	2009-12-03 23:29:40 +0000
@@ -1366,7 +1366,7 @@ static void plugin_load(MEM_ROOT *tmp_ro
   tables.alias= tables.table_name= (char*)"plugin";
   tables.lock_type= TL_READ;
   tables.db= new_thd->db;
-  alloc_mdl_locks(&tables, tmp_root);
+  alloc_mdl_requests(&tables, tmp_root);
 
 #ifdef EMBEDDED_LIBRARY
   /*
@@ -1660,7 +1660,7 @@ bool mysql_install_plugin(THD *thd, cons
   if (check_table_access(thd, INSERT_ACL, &tables, FALSE, 1, FALSE))
     DBUG_RETURN(TRUE);
 
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   /* need to open before acquiring LOCK_plugin or it will deadlock */
   if (! (table = open_ltable(thd, &tables, TL_WRITE, 0)))
@@ -1735,7 +1735,7 @@ bool mysql_uninstall_plugin(THD *thd, co
   bzero(&tables, sizeof(tables));
   tables.db= (char *)"mysql";
   tables.table_name= tables.alias= (char *)"plugin";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   /* need to open before acquiring LOCK_plugin or it will deadlock */
   if (! (table= open_ltable(thd, &tables, TL_WRITE, 0)))

=== modified file 'sql/sql_servers.cc'
--- a/sql/sql_servers.cc	2009-12-02 15:22:15 +0000
+++ b/sql/sql_servers.cc	2009-12-03 23:29:40 +0000
@@ -234,7 +234,7 @@ bool servers_reload(THD *thd)
   tables[0].alias= tables[0].table_name= (char*) "servers";
   tables[0].db= (char*) "mysql";
   tables[0].lock_type= TL_READ;
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, tables))
   {
@@ -365,7 +365,7 @@ insert_server(THD *thd, FOREIGN_SERVER *
   bzero((char*) &tables, sizeof(tables));
   tables.db= (char*) "mysql";
   tables.alias= tables.table_name= (char*) "servers";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   /* need to open before acquiring THR_LOCK_plugin or it will deadlock */
   if (! (table= open_ltable(thd, &tables, TL_WRITE, 0)))
@@ -584,7 +584,7 @@ int drop_server(THD *thd, LEX_SERVER_OPT
   bzero((char*) &tables, sizeof(tables));
   tables.db= (char*) "mysql";
   tables.alias= tables.table_name= (char*) "servers";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   rw_wrlock(&THR_LOCK_servers);
 
@@ -709,7 +709,7 @@ int update_server(THD *thd, FOREIGN_SERV
   bzero((char*) &tables, sizeof(tables));
   tables.db= (char*)"mysql";
   tables.alias= tables.table_name= (char*)"servers";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (!(table= open_ltable(thd, &tables, TL_WRITE, 0)))
   {

=== modified file 'sql/sql_show.cc'
--- a/sql/sql_show.cc	2009-12-03 11:37:42 +0000
+++ b/sql/sql_show.cc	2009-12-03 23:29:40 +0000
@@ -3050,12 +3050,8 @@ uint get_table_open_method(TABLE_LIST *t
    Acquire high priority share metadata lock on a table.
 
    @param thd            Thread context.
-   @param mdl_lock_data  Pointer to memory to be used for MDL_LOCK_DATA
+   @param mdl_lock_req   Pointer to memory to be used for MDL_LOCK_REQUEST
                          object for a lock request.
-   @param mdlkey         Pointer to the buffer for key for the lock request
-                         (should be at least strlen(db) + strlen(name) + 2
-                         bytes, or, if the lengths are not known,
-                         MAX_MDLKEY_LENGTH)
    @param table          Table list element for the table
 
    @note This is an auxiliary function to be used in cases when we want to
@@ -3069,23 +3065,23 @@ uint get_table_open_method(TABLE_LIST *t
 */
 
 static bool
-acquire_high_prio_shared_mdl_lock(THD *thd, MDL_LOCK_DATA *mdl_lock_data,
-                                  char *mdlkey, TABLE_LIST *table)
+acquire_high_prio_shared_mdl_lock(THD *thd, MDL_LOCK_REQUEST *mdl_lock_req,
+                                  TABLE_LIST *table)
 {
   bool retry;
 
-  mdl_init_lock(mdl_lock_data, mdlkey, 0, table->db, table->table_name);
-  table->mdl_lock_data= mdl_lock_data;
-  mdl_add_lock(&thd->mdl_context, mdl_lock_data);
-  mdl_set_lock_type(mdl_lock_data, MDL_SHARED_HIGH_PRIO);
+  mdl_request_init(mdl_lock_req, 0, table->db, table->table_name);
+  table->mdl_lock_request= mdl_lock_req;
+  mdl_request_add(&thd->mdl_context, mdl_lock_req);
+  mdl_request_set_type(mdl_lock_req, MDL_SHARED_HIGH_PRIO);
 
   while (1)
   {
-    if (mdl_acquire_shared_lock(&thd->mdl_context, mdl_lock_data, &retry))
+    if (mdl_acquire_shared_lock(&thd->mdl_context, mdl_lock_req, &retry))
     {
       if (!retry || mdl_wait_for_locks(&thd->mdl_context))
       {
-        mdl_remove_all_locks(&thd->mdl_context);
+        mdl_request_remove_all(&thd->mdl_context);
         return TRUE;
       }
       continue;
@@ -3127,8 +3123,7 @@ static int fill_schema_table_from_frm(TH
   char key[MAX_DBKEY_LENGTH];
   uint key_length;
   char db_name_buff[NAME_LEN + 1], table_name_buff[NAME_LEN + 1];
-  MDL_LOCK_DATA mdl_lock_data;
-  char mdlkey[MAX_MDLKEY_LENGTH];
+  MDL_LOCK_REQUEST mdl_lock_request;
 
   bzero((char*) &table_list, sizeof(TABLE_LIST));
   bzero((char*) &tbl, sizeof(TABLE));
@@ -3158,8 +3153,7 @@ static int fill_schema_table_from_frm(TH
           simply obtaining internal lock of data-dictionary (ATM it
           is LOCK_open) instead of obtaning full-blown metadata lock.
   */
-  if (acquire_high_prio_shared_mdl_lock(thd, &mdl_lock_data, mdlkey,
-                                        &table_list))
+  if (acquire_high_prio_shared_mdl_lock(thd, &mdl_lock_request, &table_list))
   {
     /*
       Some error occured (most probably we have been killed while
@@ -3220,8 +3214,8 @@ err_unlock:
   pthread_mutex_unlock(&LOCK_open);
 
 err:
-  mdl_release_lock(&thd->mdl_context, &mdl_lock_data);
-  mdl_remove_lock(&thd->mdl_context, &mdl_lock_data);
+  mdl_ticket_release(&thd->mdl_context, mdl_lock_request.ticket);
+  mdl_request_remove(&thd->mdl_context, &mdl_lock_request);
   thd->clear_error();
   return res;
 }
@@ -7317,7 +7311,7 @@ bool show_create_trigger(THD *thd, const
 
   uint num_tables; /* NOTE: unused, only to pass to open_tables(). */
 
-  alloc_mdl_locks(lst, thd->mem_root);
+  alloc_mdl_requests(lst, thd->mem_root);
 
   if (open_tables(thd, &lst, &num_tables, 0))
   {

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2009-12-03 18:37:38 +0000
+++ b/sql/sql_table.cc	2009-12-03 23:29:40 +0000
@@ -1910,7 +1910,7 @@ int mysql_rm_table_part2(THD *thd, TABLE
             Since we don't acquire metadata lock if we have found temporary
             table, we should do something to avoid releasing it at the end.
           */
-          table->mdl_lock_data= 0;
+          table->mdl_lock_request= NULL;
         }
         else
         {
@@ -1923,7 +1923,7 @@ int mysql_rm_table_part2(THD *thd, TABLE
                                                 table->table_name);
           if (!table->table)
             DBUG_RETURN(1);
-          table->mdl_lock_data= table->table->mdl_lock_data;
+          table->mdl_lock_request->ticket= table->table->mdl_lock_ticket;
         }
     }
   }
@@ -2202,15 +2202,15 @@ err:
     }
     for (table= tables; table; table= table->next_local)
     {
-      if (table->mdl_lock_data)
+      if (table->mdl_lock_request)
       {
         /*
           Under LOCK TABLES we may have several instances of table open
           and locked and therefore have to remove several metadata lock
           requests associated with them.
         */
-        mdl_release_and_remove_all_locks_for_name(&thd->mdl_context,
-                                                  table->mdl_lock_data);
+        mdl_ticket_release_all_for_name(&thd->mdl_context,
+                                        table->mdl_lock_request->ticket);
       }
     }
   }
@@ -4108,29 +4108,28 @@ warn:
 
 static bool lock_table_name_if_not_cached(THD *thd, const char *db,
                                           const char *table_name,
-                                          MDL_LOCK_DATA **lock_data)
+                                          MDL_LOCK_REQUEST **lock_req)
 {
   bool conflict;
 
-  if (!(*lock_data= mdl_alloc_lock(0, db, table_name, thd->mem_root)))
+  if (!(*lock_req= mdl_request_alloc(0, db, table_name, thd->mem_root)))
     return TRUE;
-  mdl_set_lock_type(*lock_data, MDL_EXCLUSIVE);
-  mdl_add_lock(&thd->mdl_context, *lock_data);
-  if (mdl_try_acquire_exclusive_lock(&thd->mdl_context, *lock_data,
-                                     &conflict))
+  mdl_request_set_type(*lock_req, MDL_EXCLUSIVE);
+  mdl_request_add(&thd->mdl_context, *lock_req);
+  if (mdl_try_acquire_exclusive_lock(&thd->mdl_context, *lock_req, &conflict))
   {
     /*
       To simplify our life under LOCK TABLES we remove unsatisfied
       lock request from the context.
     */
-    mdl_remove_lock(&thd->mdl_context, *lock_data);
+    mdl_request_remove(&thd->mdl_context, *lock_req);
     if (!conflict)
     {
       /* Probably OOM. */
       return TRUE;
     }
     else
-      *lock_data= 0;
+      *lock_req= NULL;
   }
   return FALSE;
 }
@@ -4146,7 +4145,7 @@ bool mysql_create_table(THD *thd, const 
                         bool internal_tmp_table,
                         uint select_field_count)
 {
-  MDL_LOCK_DATA *target_lock_data= 0;
+  MDL_LOCK_REQUEST *target_lock_req= NULL;
   bool result;
   DBUG_ENTER("mysql_create_table");
 
@@ -4169,12 +4168,12 @@ bool mysql_create_table(THD *thd, const 
 
   if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
   {
-    if (lock_table_name_if_not_cached(thd, db, table_name, &target_lock_data))
+    if (lock_table_name_if_not_cached(thd, db, table_name, &target_lock_req))
     {
       result= TRUE;
       goto unlock;
     }
-    if (!target_lock_data)
+    if (!target_lock_req)
     {
       if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
       {
@@ -4200,10 +4199,10 @@ bool mysql_create_table(THD *thd, const 
                                      select_field_count);
 
 unlock:
-  if (target_lock_data)
+  if (target_lock_req)
   {
-    mdl_release_lock(&thd->mdl_context, target_lock_data);
-    mdl_remove_lock(&thd->mdl_context, target_lock_data);
+    mdl_ticket_release(&thd->mdl_context, target_lock_req->ticket);
+    mdl_request_remove(&thd->mdl_context, target_lock_req);
   }
   pthread_mutex_lock(&LOCK_lock_db);
   if (!--creating_table && creating_database)
@@ -4368,7 +4367,7 @@ static int prepare_for_repair(THD *thd, 
   char from[FN_REFLEN],tmp[FN_REFLEN+32];
   const char **ext;
   MY_STAT stat_info;
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_LOCK_REQUEST *mdl_lock_request= NULL;
   enum enum_open_table_action ot_action_unused;
   DBUG_ENTER("prepare_for_repair");
   uint reopen_for_repair_flags= (MYSQL_LOCK_IGNORE_FLUSH |
@@ -4387,13 +4386,13 @@ static int prepare_for_repair(THD *thd, 
     uint key_length;
 
     key_length= create_table_def_key(thd, key, table_list, 0);
-    mdl_lock_data= mdl_alloc_lock(0, table_list->db, table_list->table_name,
-                                  thd->mem_root);
-    mdl_set_lock_type(mdl_lock_data, MDL_EXCLUSIVE);
-    mdl_add_lock(&thd->mdl_context, mdl_lock_data);
+    mdl_lock_request= mdl_request_alloc(0, table_list->db,
+                                        table_list->table_name, thd->mem_root);
+    mdl_request_set_type(mdl_lock_request, MDL_EXCLUSIVE);
+    mdl_request_add(&thd->mdl_context, mdl_lock_request);
     if (mdl_acquire_exclusive_locks(&thd->mdl_context))
     {
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      mdl_request_remove(&thd->mdl_context, mdl_lock_request);
       DBUG_RETURN(0);
     }
 
@@ -4413,11 +4412,7 @@ static int prepare_for_repair(THD *thd, 
     }
     pthread_mutex_unlock(&LOCK_open);
     table= &tmp_table;
-    table_list->mdl_lock_data= mdl_lock_data;
-  }
-  else
-  {
-    mdl_lock_data= table->mdl_lock_data;
+    table_list->mdl_lock_request= mdl_lock_request;
   }
 
   /* A MERGE table must not come here. */
@@ -4528,10 +4523,10 @@ end:
     pthread_mutex_unlock(&LOCK_open);
   }
   /* In case of a temporary table there will be no metadata lock. */
-  if (error && mdl_lock_data)
+  if (error && mdl_lock_request)
   {
-    mdl_release_lock(&thd->mdl_context, mdl_lock_data);
-    mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+    mdl_ticket_release(&thd->mdl_context, mdl_lock_request->ticket);
+    mdl_request_remove(&thd->mdl_context, mdl_lock_request);
   }
   DBUG_RETURN(error);
 }
@@ -5234,7 +5229,7 @@ bool mysql_create_like_schema_frm(THD* t
 bool mysql_create_like_table(THD* thd, TABLE_LIST* table, TABLE_LIST* src_table,
                              HA_CREATE_INFO *create_info)
 {
-  MDL_LOCK_DATA *target_lock_data= 0;
+  MDL_LOCK_REQUEST *target_lock_req= NULL;
   char src_path[FN_REFLEN], dst_path[FN_REFLEN + 1];
   uint dst_path_length;
   char *db= table->db;
@@ -5291,9 +5286,9 @@ bool mysql_create_like_table(THD* thd, T
   }
   else
   {
-    if (lock_table_name_if_not_cached(thd, db, table_name, &target_lock_data))
+    if (lock_table_name_if_not_cached(thd, db, table_name, &target_lock_req))
       goto err;
-    if (!target_lock_data)
+    if (!target_lock_req)
       goto table_exists;
     dst_path_length= build_table_filename(dst_path, sizeof(dst_path) - 1,
                                           db, table_name, reg_ext, 0);
@@ -5303,7 +5298,7 @@ bool mysql_create_like_table(THD* thd, T
       Make the metadata lock available to open_table() called to
       reopen the table down the road.
     */
-    table->mdl_lock_data= target_lock_data;
+    table->mdl_lock_request= target_lock_req;
   }
 
   DBUG_EXECUTE_IF("sleep_create_like_before_copy", my_sleep(6000000););
@@ -5474,10 +5469,10 @@ binlog:
   res= FALSE;
 
 err:
-  if (target_lock_data)
+  if (target_lock_req)
   {
-    mdl_release_lock(&thd->mdl_context, target_lock_data);
-    mdl_remove_lock(&thd->mdl_context, target_lock_data);
+    mdl_ticket_release(&thd->mdl_context, target_lock_req->ticket);
+    mdl_request_remove(&thd->mdl_context, target_lock_req);
   }
   DBUG_RETURN(res);
 }
@@ -6416,7 +6411,8 @@ bool mysql_alter_table(THD *thd,char *ne
                        uint order_num, ORDER *order, bool ignore)
 {
   TABLE *table, *new_table= 0;
-  MDL_LOCK_DATA *mdl_lock_data, *target_lock_data= 0;
+  MDL_LOCK_TICKET *mdl_lock_ticket;
+  MDL_LOCK_REQUEST *target_lock_req= NULL;
   int error= 0;
   char tmp_name[80],old_name[32],new_name_buff[FN_REFLEN + 1];
   char new_alias_buff[FN_REFLEN], *table_name, *db, *new_alias, *alias;
@@ -6587,7 +6583,7 @@ view_err:
                                         MYSQL_OPEN_TAKE_UPGRADABLE_MDL)))
     DBUG_RETURN(TRUE);
   table->use_all_columns();
-  mdl_lock_data= table->mdl_lock_data;
+  mdl_lock_ticket= table->mdl_lock_ticket;
 
   /*
     Prohibit changing of the UNION list of a non-temporary MERGE table
@@ -6640,9 +6636,9 @@ view_err:
       else
       {
         if (lock_table_name_if_not_cached(thd, new_db, new_name,
-                                          &target_lock_data))
+                                          &target_lock_req))
           DBUG_RETURN(TRUE);
-        if (!target_lock_data)
+        if (!target_lock_req)
         {
 	  my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_alias);
 	  DBUG_RETURN(TRUE);
@@ -6835,13 +6831,12 @@ view_err:
       */
       if (new_name != table_name || new_db != db)
       {
-        mdl_release_lock(&thd->mdl_context, target_lock_data);
-        mdl_remove_lock(&thd->mdl_context, target_lock_data);
-        mdl_release_and_remove_all_locks_for_name(&thd->mdl_context,
-                                                  mdl_lock_data);
+        mdl_ticket_release(&thd->mdl_context, target_lock_req->ticket);
+        mdl_request_remove(&thd->mdl_context, target_lock_req);
+        mdl_ticket_release_all_for_name(&thd->mdl_context, mdl_lock_ticket);
       }
       else
-        mdl_downgrade_exclusive_lock(&thd->mdl_context, mdl_lock_data);
+        mdl_downgrade_exclusive_lock(&thd->mdl_context, mdl_lock_ticket);
     }
     DBUG_RETURN(error);
   }
@@ -7074,7 +7069,7 @@ view_err:
 #ifdef WITH_PARTITION_STORAGE_ENGINE
   if (fast_alter_partition)
   {
-    DBUG_ASSERT(!target_lock_data);
+    DBUG_ASSERT(!target_lock_req);
     DBUG_RETURN(fast_alter_partition_table(thd, table, alter_info,
                                            create_info, table_list,
                                            db, table_name,
@@ -7441,16 +7436,16 @@ view_err:
       table_list->table_name_length= strlen(new_name);
       table_list->db= new_db;
       table_list->db_length= strlen(new_db);
-      table_list->mdl_lock_data= target_lock_data;
+      table_list->mdl_lock_request= target_lock_req;
     }
     else
     {
       /*
-        Under LOCK TABLES, we have a different mdl_lock_data
+        Under LOCK TABLES, we have a different mdl_lock_ticket
         points to a different instance than the one set initially
         to request the lock.
       */
-      table_list->mdl_lock_data= mdl_lock_data;
+      table_list->mdl_lock_request->ticket= mdl_lock_ticket;
     }
     if (open_table(thd, table_list, thd->mem_root,
                    &ot_action_unused, MYSQL_OPEN_REOPEN))
@@ -7516,13 +7511,12 @@ view_err:
   {
     if ((new_name != table_name || new_db != db))
     {
-      mdl_release_lock(&thd->mdl_context, target_lock_data);
-      mdl_remove_lock(&thd->mdl_context, target_lock_data);
-      mdl_release_and_remove_all_locks_for_name(&thd->mdl_context,
-                                                mdl_lock_data);
+      mdl_ticket_release(&thd->mdl_context, target_lock_req->ticket);
+      mdl_request_remove(&thd->mdl_context, target_lock_req);
+      mdl_ticket_release_all_for_name(&thd->mdl_context, mdl_lock_ticket);
     }
     else
-      mdl_downgrade_exclusive_lock(&thd->mdl_context, mdl_lock_data);
+      mdl_downgrade_exclusive_lock(&thd->mdl_context, mdl_lock_ticket);
   }
 
 end_temporary:
@@ -7577,10 +7571,10 @@ err:
                                  alter_info->datetime_field->field_name);
     thd->abort_on_warning= save_abort_on_warning;
   }
-  if (target_lock_data)
+  if (target_lock_req)
   {
-    mdl_release_lock(&thd->mdl_context, target_lock_data);
-    mdl_remove_lock(&thd->mdl_context, target_lock_data);
+    mdl_ticket_release(&thd->mdl_context, target_lock_req->ticket);
+    mdl_request_remove(&thd->mdl_context, target_lock_req);
   }
   DBUG_RETURN(TRUE);
 
@@ -7592,12 +7586,12 @@ err_with_mdl:
     tables and release the exclusive metadata lock.
   */
   thd->locked_tables_list.unlink_all_closed_tables();
-  if (target_lock_data)
+  if (target_lock_req)
   {
-    mdl_release_lock(&thd->mdl_context, target_lock_data);
-    mdl_remove_lock(&thd->mdl_context, target_lock_data);
+    mdl_ticket_release(&thd->mdl_context, target_lock_req->ticket);
+    mdl_request_remove(&thd->mdl_context, target_lock_req);
   }
-  mdl_release_and_remove_all_locks_for_name(&thd->mdl_context, mdl_lock_data);
+  mdl_ticket_release_all_for_name(&thd->mdl_context, mdl_lock_ticket);
   DBUG_RETURN(TRUE);
 }
 /* mysql_alter_table */

=== modified file 'sql/sql_trigger.cc'
--- a/sql/sql_trigger.cc	2009-12-02 15:22:15 +0000
+++ b/sql/sql_trigger.cc	2009-12-03 23:29:40 +0000
@@ -329,6 +329,7 @@ bool mysql_create_or_drop_trigger(THD *t
   String stmt_query;
   bool need_start_waiting= FALSE;
   bool lock_upgrade_done= FALSE;
+  MDL_LOCK_TICKET *mdl_lock_ticket= NULL;
 
   DBUG_ENTER("mysql_create_or_drop_trigger");
 
@@ -451,8 +452,6 @@ bool mysql_create_or_drop_trigger(THD *t
     if (!(tables->table= find_write_locked_table(thd->open_tables, tables->db,
                                                  tables->table_name)))
       goto end;
-    /* Later on we will need it to downgrade the lock */
-    tables->mdl_lock_data= tables->table->mdl_lock_data;
   }
   else
   {
@@ -465,6 +464,9 @@ bool mysql_create_or_drop_trigger(THD *t
   }
   table= tables->table;
 
+  /* Later on we will need it to downgrade the lock */
+  mdl_lock_ticket= table->mdl_lock_ticket;
+
   if (wait_while_table_is_used(thd, table, HA_EXTRA_FORCE_REOPEN))
     goto end;
 
@@ -511,8 +513,7 @@ end:
     TABLE instance created by open_n_lock_single_table() and metadata lock.
   */
   if (thd->locked_tables_mode && tables && lock_upgrade_done)
-    mdl_downgrade_exclusive_lock(&thd->mdl_context,
-                                 tables->mdl_lock_data);
+    mdl_downgrade_exclusive_lock(&thd->mdl_context, mdl_lock_ticket);
 
   if (need_start_waiting)
     start_waiting_global_read_lock(thd);

=== modified file 'sql/sql_udf.cc'
--- a/sql/sql_udf.cc	2009-11-30 15:55:03 +0000
+++ b/sql/sql_udf.cc	2009-12-03 23:29:40 +0000
@@ -142,7 +142,7 @@ void udf_init()
   tables.alias= tables.table_name= (char*) "func";
   tables.lock_type = TL_READ;
   tables.db= db;
-  alloc_mdl_locks(&tables, new_thd->mem_root);
+  alloc_mdl_requests(&tables, new_thd->mem_root);
 
   if (simple_open_n_lock_tables(new_thd, &tables))
   {
@@ -486,7 +486,7 @@ int mysql_create_function(THD *thd,udf_f
   bzero((char*) &tables,sizeof(tables));
   tables.db= (char*) "mysql";
   tables.table_name= tables.alias= (char*) "func";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
   /* Allow creation of functions even if we can't open func table */
   if (!(table = open_ltable(thd, &tables, TL_WRITE, 0)))
     goto err;
@@ -565,7 +565,7 @@ int mysql_drop_function(THD *thd,const L
   bzero((char*) &tables,sizeof(tables));
   tables.db=(char*) "mysql";
   tables.table_name= tables.alias= (char*) "func";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
   if (!(table = open_ltable(thd, &tables, TL_WRITE, 0)))
     goto err;
   table->use_all_columns();

=== modified file 'sql/table.cc'
--- a/sql/table.cc	2009-12-02 23:09:22 +0000
+++ b/sql/table.cc	2009-12-03 23:29:40 +0000
@@ -4811,12 +4811,11 @@ size_t max_row_length(TABLE *table, cons
    objects for all elements of table list.
 */
 
-void alloc_mdl_locks(TABLE_LIST *table_list, MEM_ROOT *root)
+void alloc_mdl_requests(TABLE_LIST *table_list, MEM_ROOT *root)
 {
   for ( ; table_list ; table_list= table_list->next_global)
-    table_list->mdl_lock_data= mdl_alloc_lock(0, table_list->db,
-                                              table_list->table_name,
-                                              root);
+    table_list->mdl_lock_request=
+      mdl_request_alloc(0, table_list->db, table_list->table_name, root);
 }
 
 

=== modified file 'sql/table.h'
--- a/sql/table.h	2009-12-03 11:37:42 +0000
+++ b/sql/table.h	2009-12-03 23:29:40 +0000
@@ -30,7 +30,8 @@ class st_select_lex;
 class partition_info;
 class COND_EQUAL;
 class Security_context;
-struct MDL_LOCK_DATA;
+struct MDL_LOCK_REQUEST;
+struct MDL_LOCK_TICKET;
 
 /*************************************************************************/
 
@@ -813,7 +814,7 @@ public:
   partition_info *part_info;            /* Partition related information */
   bool no_partitions_used; /* If true, all partitions have been pruned away */
 #endif
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_LOCK_TICKET *mdl_lock_ticket;
 
   bool fill_item_list(List<Item> *item_list) const;
   void reset_item_list(List<Item> *item_list) const;
@@ -1416,7 +1417,7 @@ struct TABLE_LIST
   uint table_open_method;
   enum enum_schema_table_state schema_table_state;
 
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_LOCK_REQUEST *mdl_lock_request;
 
   void calc_md5(char *buffer);
   void set_underlying_merge();
@@ -1785,6 +1786,6 @@ static inline void dbug_tmp_restore_colu
 size_t max_row_length(TABLE *table, const uchar *data);
 
 
-void alloc_mdl_locks(TABLE_LIST *table_list, MEM_ROOT *root);
+void alloc_mdl_requests(TABLE_LIST *table_list, MEM_ROOT *root);
 
 #endif /* TABLE_INCLUDED */

=== modified file 'storage/myisammrg/ha_myisammrg.cc'
--- a/storage/myisammrg/ha_myisammrg.cc	2009-12-02 23:09:22 +0000
+++ b/storage/myisammrg/ha_myisammrg.cc	2009-12-03 23:29:40 +0000
@@ -434,16 +434,15 @@ int ha_myisammrg::add_children_list(void
     /* Copy select_lex. Used in unique_table() at least. */
     child_l->select_lex= parent_l->select_lex;
 
-    child_l->mdl_lock_data= NULL; /* Safety, if alloc_mdl_locks fails. */
+    child_l->mdl_lock_request= NULL; /* Safety, if alloc_mdl_requests fails. */
 
     /* Break when this was the last child. */
     if (&child_l->next_global == this->children_last_l)
       break;
   }
 
-  alloc_mdl_locks(children_l,
-                  thd->locked_tables_root ? thd->locked_tables_root :
-                  thd->mem_root);
+  alloc_mdl_requests(children_l, thd->locked_tables_root ?
+                     thd->locked_tables_root : thd->mem_root);
 
   /* Insert children into the table list. */
   if (parent_l->next_global)


Attachment: [text/bzr-bundle] bzr/kostja@sun.com-20091203232940-qi3r7vrqthlr2982.bundle
Thread
bzr commit into mysql-5.6-next-mr branch (kostja:2990) Bug#989 WL#4284Konstantin Osipov4 Dec