List:Commits« Previous MessageNext Message »
From:Alexander Nozdrin Date:March 6 2009 7:15pm
Subject:bzr push into mysql-6.0 branch (alik:2742)
View as plain text  
 2742 Alexander Nozdrin	2009-03-06 [merge]
      Pull from 6.0
      modified:
        cmd-line-utils/libedit/makelist.sh
        cmd-line-utils/libedit/readline.c
        cmd-line-utils/libedit/readline/readline.h
        cmd-line-utils/libedit/vi.c
        configure.in
        mysql-test/Makefile.am
        mysql-test/mysql-test-run.pl
        support-files/Makefile.am
        support-files/mysql.spec.sh

=== modified file 'include/mysql.h'
--- a/include/mysql.h	2008-12-05 01:05:05 +0000
+++ b/include/mysql.h	2009-02-25 10:03:18 +0000
@@ -654,7 +654,7 @@ typedef struct st_mysql_methods
   MYSQL_RES * (*use_result)(MYSQL *mysql);
   void (*fetch_lengths)(unsigned long *to, 
 			MYSQL_ROW column, unsigned int field_count);
-  void (*flush_use_result)(MYSQL *mysql);
+  void (*flush_use_result)(MYSQL *mysql, my_bool flush_all_results);
 #if !defined(MYSQL_SERVER) || defined(EMBEDDED_LIBRARY)
   MYSQL_FIELD * (*list_fields)(MYSQL *mysql);
   my_bool (*read_prepare_result)(MYSQL *mysql, MYSQL_STMT *stmt);

=== modified file 'include/mysql.h.pp'
--- a/include/mysql.h.pp	2008-12-13 11:02:16 +0000
+++ b/include/mysql.h.pp	2009-02-25 10:03:18 +0000
@@ -558,7 +558,7 @@ typedef struct st_mysql_methods
   MYSQL_RES * (*use_result)(MYSQL *mysql);
   void (*fetch_lengths)(unsigned long *to,
    MYSQL_ROW column, unsigned int field_count);
-  void (*flush_use_result)(MYSQL *mysql);
+  void (*flush_use_result)(MYSQL *mysql, my_bool flush_all_results);
   MYSQL_FIELD * (*list_fields)(MYSQL *mysql);
   my_bool (*read_prepare_result)(MYSQL *mysql, MYSQL_STMT *stmt);
   int (*stmt_execute)(MYSQL_STMT *stmt);

=== modified file 'libmysql/libmysql.c'
--- a/libmysql/libmysql.c	2009-02-14 09:18:41 +0000
+++ b/libmysql/libmysql.c	2009-02-25 10:03:18 +0000
@@ -4608,7 +4608,7 @@ static my_bool reset_stmt_handle(MYSQL_S
         if (stmt->field_count && mysql->status != MYSQL_STATUS_READY)
         {
           /* There is a result set and it belongs to this statement */
-          (*mysql->methods->flush_use_result)(mysql);
+          (*mysql->methods->flush_use_result)(mysql, FALSE);
           if (mysql->unbuffered_fetch_owner)
             *mysql->unbuffered_fetch_owner= TRUE;
           mysql->status= MYSQL_STATUS_READY;
@@ -4692,7 +4692,7 @@ my_bool STDCALL mysql_stmt_close(MYSQL_S
           Flush result set of the connection. If it does not belong
           to this statement, set a warning.
         */
-        (*mysql->methods->flush_use_result)(mysql);
+        (*mysql->methods->flush_use_result)(mysql, TRUE);
         if (mysql->unbuffered_fetch_owner)
           *mysql->unbuffered_fetch_owner= TRUE;
         mysql->status= MYSQL_STATUS_READY;

=== modified file 'libmysqld/lib_sql.cc'
--- a/libmysqld/lib_sql.cc	2009-01-17 02:29:30 +0000
+++ b/libmysqld/lib_sql.cc	2009-03-04 10:04:58 +0000
@@ -139,7 +139,7 @@ emb_advanced_command(MYSQL *mysql, enum 
   return result;
 }
 
-static void emb_flush_use_result(MYSQL *mysql)
+static void emb_flush_use_result(MYSQL *mysql, my_bool)
 {
   THD *thd= (THD*) mysql->thd;
   if (thd->cur_data)

=== added file 'mysql-test/r/mdl_sync.result'
--- a/mysql-test/r/mdl_sync.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/r/mdl_sync.result	2009-03-02 21:18:26 +0000
@@ -0,0 +1,21 @@
+SET DEBUG_SYNC= 'RESET';
+drop table if exists t1,t2,t3;
+create table t1 (i int);
+create table t2 (i int);
+connection: default
+lock tables t2 read;
+connection: con1
+set debug_sync='mdl_upgrade_shared_lock_to_exclusive SIGNAL parked WAIT_FOR go';
+alter table t1 rename t3;
+connection: default
+set debug_sync= 'now WAIT_FOR parked';
+connection: con2
+set debug_sync='mdl_acquire_exclusive_locks_wait SIGNAL go';
+drop table t1,t2;
+connection: con1
+connection: default
+unlock tables;
+connection: con2
+ERROR 42S02: Unknown table 't1'
+drop table t3;
+SET DEBUG_SYNC= 'RESET';

=== modified file 'mysql-test/r/variables.result'
--- a/mysql-test/r/variables.result	2009-01-31 15:53:35 +0000
+++ b/mysql-test/r/variables.result	2009-02-24 11:25:46 +0000
@@ -1341,3 +1341,168 @@ SET @@session.thread_stack= 7;
 ERROR HY000: Variable 'thread_stack' is a read only variable
 SET @@global.thread_stack= 7;
 ERROR HY000: Variable 'thread_stack' is a read only variable
+
+#
+# Bug#34828: OF is taken as OFF and a value of 0 is set for variable SQL_notes.
+#
+
+# Checking sql_notes...
+SET @sql_notes_saved = @@sql_notes;
+
+SET @@sql_notes = ON;
+SELECT @@sql_notes;
+@@sql_notes
+1
+
+SET @@sql_notes = OF;
+ERROR 42000: Variable 'sql_notes' can't be set to the value of 'OF'
+SELECT @@sql_notes;
+@@sql_notes
+1
+
+SET @@sql_notes = OFF;
+SELECT @@sql_notes;
+@@sql_notes
+0
+
+SET @@sql_notes = @sql_notes_saved;
+
+# Checking delay_key_write...
+SET @delay_key_write_saved = @@delay_key_write;
+
+SET GLOBAL delay_key_write = ON;
+SELECT @@delay_key_write;
+@@delay_key_write
+ON
+
+SET GLOBAL delay_key_write = OF;
+ERROR 42000: Variable 'delay_key_write' can't be set to the value of 'OF'
+SELECT @@delay_key_write;
+@@delay_key_write
+ON
+
+SET GLOBAL delay_key_write = AL;
+ERROR 42000: Variable 'delay_key_write' can't be set to the value of 'AL'
+SELECT @@delay_key_write;
+@@delay_key_write
+ON
+
+SET GLOBAL delay_key_write = OFF;
+SELECT @@delay_key_write;
+@@delay_key_write
+OFF
+
+SET GLOBAL delay_key_write = ALL;
+SELECT @@delay_key_write;
+@@delay_key_write
+ALL
+
+SET GLOBAL delay_key_write = @delay_key_write_saved;
+
+# Checking sql_safe_updates...
+SET @sql_safe_updates_saved = @@sql_safe_updates;
+
+SET @@sql_safe_updates = ON;
+SELECT @@sql_safe_updates;
+@@sql_safe_updates
+1
+
+SET @@sql_safe_updates = OF;
+ERROR 42000: Variable 'sql_safe_updates' can't be set to the value of 'OF'
+SELECT @@sql_safe_updates;
+@@sql_safe_updates
+1
+
+SET @@sql_safe_updates = OFF;
+SELECT @@sql_safe_updates;
+@@sql_safe_updates
+0
+
+SET @@sql_safe_updates = @sql_safe_updates_saved;
+
+# Checking foreign_key_checks...
+SET @foreign_key_checks_saved = @@foreign_key_checks;
+
+SET @@foreign_key_checks = ON;
+SELECT @@foreign_key_checks;
+@@foreign_key_checks
+1
+
+SET @@foreign_key_checks = OF;
+ERROR 42000: Variable 'foreign_key_checks' can't be set to the value of 'OF'
+SELECT @@foreign_key_checks;
+@@foreign_key_checks
+1
+
+SET @@foreign_key_checks = OFF;
+SELECT @@foreign_key_checks;
+@@foreign_key_checks
+0
+
+SET @@foreign_key_checks = @foreign_key_checks_saved;
+
+# Checking unique_checks...
+SET @unique_checks_saved = @@unique_checks;
+
+SET @@unique_checks = ON;
+SELECT @@unique_checks;
+@@unique_checks
+1
+
+SET @@unique_checks = OF;
+ERROR 42000: Variable 'unique_checks' can't be set to the value of 'OF'
+SELECT @@unique_checks;
+@@unique_checks
+1
+
+SET @@unique_checks = OFF;
+SELECT @@unique_checks;
+@@unique_checks
+0
+
+SET @@unique_checks = @unique_checks_saved;
+
+# Checking sql_buffer_result...
+SET @sql_buffer_result_saved = @@sql_buffer_result;
+
+SET @@sql_buffer_result = ON;
+SELECT @@sql_buffer_result;
+@@sql_buffer_result
+1
+
+SET @@sql_buffer_result = OF;
+ERROR 42000: Variable 'sql_buffer_result' can't be set to the value of 'OF'
+SELECT @@sql_buffer_result;
+@@sql_buffer_result
+1
+
+SET @@sql_buffer_result = OFF;
+SELECT @@sql_buffer_result;
+@@sql_buffer_result
+0
+
+SET @@sql_buffer_result = @sql_buffer_result_saved;
+
+# Checking sql_quote_show_create...
+SET @sql_quote_show_create_saved = @@sql_quote_show_create;
+
+SET @@sql_quote_show_create = ON;
+SELECT @@sql_quote_show_create;
+@@sql_quote_show_create
+1
+
+SET @@sql_quote_show_create = OF;
+ERROR 42000: Variable 'sql_quote_show_create' can't be set to the value of 'OF'
+SELECT @@sql_quote_show_create;
+@@sql_quote_show_create
+1
+
+SET @@sql_quote_show_create = OFF;
+SELECT @@sql_quote_show_create;
+@@sql_quote_show_create
+0
+
+SET @@sql_quote_show_create = @sql_quote_show_create_saved;
+
+# End of Bug#34828.
+

=== added file 'mysql-test/t/mdl_sync.test'
--- a/mysql-test/t/mdl_sync.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/t/mdl_sync.test	2009-03-02 21:18:26 +0000
@@ -0,0 +1,69 @@
+#
+# We need the Debug Sync Facility.
+#
+--source include/have_debug_sync.inc
+
+# Clean up resources used in this test case.
+--disable_warnings
+SET DEBUG_SYNC= 'RESET';
+--enable_warnings
+
+#
+# Test the case of when a exclusive lock request waits for a
+# shared lock being upgraded to a exclusive lock.
+#
+
+connect (con1,localhost,root,,test,,);
+connect (con2,localhost,root,,test,,);
+connect (con3,localhost,root,,test,,);
+
+connection default;
+
+--disable_warnings
+drop table if exists t1,t2,t3;
+--enable_warnings
+
+create table t1 (i int);
+create table t2 (i int);
+
+--echo connection: default
+lock tables t2 read;
+
+connection con1;
+--echo connection: con1
+set debug_sync='mdl_upgrade_shared_lock_to_exclusive SIGNAL parked WAIT_FOR go';
+--send alter table t1 rename t3
+
+connection default;
+--echo connection: default
+set debug_sync= 'now WAIT_FOR parked';
+
+connection con2;
+--echo connection: con2
+set debug_sync='mdl_acquire_exclusive_locks_wait SIGNAL go';
+--send drop table t1,t2
+
+connection con1;
+--echo connection: con1
+--reap
+
+connection default;
+--echo connection: default
+unlock tables;
+
+connection con2;
+--echo connection: con2
+--error ER_BAD_TABLE_ERROR
+--reap
+
+connection default;
+drop table t3;
+
+disconnect con1;
+disconnect con2;
+disconnect con3;
+
+# Clean up resources used in this test case.
+--disable_warnings
+SET DEBUG_SYNC= 'RESET';
+--enable_warnings

=== modified file 'mysql-test/t/upgrade.test'
--- a/mysql-test/t/upgrade.test	2008-02-28 11:21:44 +0000
+++ b/mysql-test/t/upgrade.test	2009-02-19 10:40:49 +0000
@@ -53,7 +53,7 @@ drop table `txu#p#p1`;
 #
 
 let $MYSQLD_DATADIR= `select @@datadir`;
-system cp $MYSQL_TEST_DIR/std_data/old_table-323.frm $MYSQLD_DATADIR/test/t1.frm;
+--copy_file $MYSQL_TEST_DIR/std_data/old_table-323.frm $MYSQLD_DATADIR/test/t1.frm
 truncate t1;
 drop table t1;
 

=== modified file 'mysql-test/t/variables.test'
--- a/mysql-test/t/variables.test	2009-02-13 16:30:54 +0000
+++ b/mysql-test/t/variables.test	2009-02-24 11:25:46 +0000
@@ -1083,3 +1083,165 @@ SET @@session.thread_stack= 7;
 --error ER_INCORRECT_GLOBAL_LOCAL_VAR
 SET @@global.thread_stack= 7;
 #
+
+###########################################################################
+
+--echo
+--echo #
+--echo # Bug#34828: OF is taken as OFF and a value of 0 is set for variable SQL_notes.
+--echo #
+--echo
+
+--echo # Checking sql_notes...
+SET @sql_notes_saved = @@sql_notes;
+
+--echo
+SET @@sql_notes = ON;
+SELECT @@sql_notes;
+
+--echo
+--error ER_WRONG_VALUE_FOR_VAR
+SET @@sql_notes = OF;
+SELECT @@sql_notes;
+
+--echo
+SET @@sql_notes = OFF;
+SELECT @@sql_notes;
+
+--echo
+SET @@sql_notes = @sql_notes_saved;
+
+--echo
+--echo # Checking delay_key_write...
+SET @delay_key_write_saved = @@delay_key_write;
+
+--echo
+SET GLOBAL delay_key_write = ON;
+SELECT @@delay_key_write;
+
+--echo
+--error ER_WRONG_VALUE_FOR_VAR
+SET GLOBAL delay_key_write = OF;
+SELECT @@delay_key_write;
+
+--echo
+--error ER_WRONG_VALUE_FOR_VAR
+SET GLOBAL delay_key_write = AL;
+SELECT @@delay_key_write;
+
+--echo
+SET GLOBAL delay_key_write = OFF;
+SELECT @@delay_key_write;
+
+--echo
+SET GLOBAL delay_key_write = ALL;
+SELECT @@delay_key_write;
+
+--echo
+SET GLOBAL delay_key_write = @delay_key_write_saved;
+
+--echo
+--echo # Checking sql_safe_updates...
+SET @sql_safe_updates_saved = @@sql_safe_updates;
+
+--echo
+SET @@sql_safe_updates = ON;
+SELECT @@sql_safe_updates;
+
+--echo
+--error ER_WRONG_VALUE_FOR_VAR
+SET @@sql_safe_updates = OF;
+SELECT @@sql_safe_updates;
+
+--echo
+SET @@sql_safe_updates = OFF;
+SELECT @@sql_safe_updates;
+
+--echo
+SET @@sql_safe_updates = @sql_safe_updates_saved;
+
+--echo
+--echo # Checking foreign_key_checks...
+SET @foreign_key_checks_saved = @@foreign_key_checks;
+
+--echo
+SET @@foreign_key_checks = ON;
+SELECT @@foreign_key_checks;
+
+--echo
+--error ER_WRONG_VALUE_FOR_VAR
+SET @@foreign_key_checks = OF;
+SELECT @@foreign_key_checks;
+
+--echo
+SET @@foreign_key_checks = OFF;
+SELECT @@foreign_key_checks;
+
+--echo
+SET @@foreign_key_checks = @foreign_key_checks_saved;
+
+--echo
+--echo # Checking unique_checks...
+SET @unique_checks_saved = @@unique_checks;
+
+--echo
+SET @@unique_checks = ON;
+SELECT @@unique_checks;
+
+--echo
+--error ER_WRONG_VALUE_FOR_VAR
+SET @@unique_checks = OF;
+SELECT @@unique_checks;
+
+--echo
+SET @@unique_checks = OFF;
+SELECT @@unique_checks;
+
+--echo
+SET @@unique_checks = @unique_checks_saved;
+
+--echo
+--echo # Checking sql_buffer_result...
+SET @sql_buffer_result_saved = @@sql_buffer_result;
+
+--echo
+SET @@sql_buffer_result = ON;
+SELECT @@sql_buffer_result;
+
+--echo
+--error ER_WRONG_VALUE_FOR_VAR
+SET @@sql_buffer_result = OF;
+SELECT @@sql_buffer_result;
+
+--echo
+SET @@sql_buffer_result = OFF;
+SELECT @@sql_buffer_result;
+
+--echo
+SET @@sql_buffer_result = @sql_buffer_result_saved;
+
+--echo
+--echo # Checking sql_quote_show_create...
+SET @sql_quote_show_create_saved = @@sql_quote_show_create;
+
+--echo
+SET @@sql_quote_show_create = ON;
+SELECT @@sql_quote_show_create;
+
+--echo
+--error ER_WRONG_VALUE_FOR_VAR
+SET @@sql_quote_show_create = OF;
+SELECT @@sql_quote_show_create;
+
+--echo
+SET @@sql_quote_show_create = OFF;
+SELECT @@sql_quote_show_create;
+
+--echo
+SET @@sql_quote_show_create = @sql_quote_show_create_saved;
+
+--echo
+--echo # End of Bug#34828.
+--echo
+
+###########################################################################

=== modified file 'sql-common/client.c'
--- a/sql-common/client.c	2008-12-24 10:48:24 +0000
+++ b/sql-common/client.c	2009-02-25 10:03:18 +0000
@@ -655,10 +655,14 @@ err:
 }
 #endif
 
-/*****************************************************************************
+/**
   Read a packet from server. Give error message if socket was down
   or packet is an error message
-*****************************************************************************/
+
+  @retval  packet_error    An error occurred during reading.
+                           Error message is set.
+  @retval  
+*/
 
 ulong
 cli_safe_read(MYSQL *mysql)
@@ -824,31 +828,132 @@ void free_old_query(MYSQL *mysql)
   DBUG_VOID_RETURN;
 }
 
+
+/**
+  Finish reading of a partial result set from the server.
+  Get the EOF packet, and update mysql->status
+  and mysql->warning_count.
+
+  @return  TRUE if a communication or protocol error, an error
+           is set in this case, FALSE otherwise.
+*/
+
+my_bool flush_one_result(MYSQL *mysql)
+{
+  ulong packet_length;
+
+  DBUG_ASSERT(mysql->status != MYSQL_STATUS_READY);
+
+  do
+  {
+    packet_length= cli_safe_read(mysql);
+    /*
+      There is an error reading from the connection,
+      or (sic!) there were no error and no
+      data in the stream, i.e. no more data from the server.
+      Since we know our position in the stream (somewhere in
+      the middle of a result set), this latter case is an error too
+      -- each result set must end with a EOF packet.
+      cli_safe_read() has set an error for us, just return.
+    */
+    if (packet_length == packet_error)
+      return TRUE;
+  }
+  while (packet_length > 8 || mysql->net.read_pos[0] != 254);
+
+  /* Analyze EOF packet of the result set. */
+
+  if (protocol_41(mysql))
+  {
+    char *pos= (char*) mysql->net.read_pos + 1;
+    mysql->warning_count=uint2korr(pos);
+    pos+=2;
+    mysql->server_status=uint2korr(pos);
+    pos+=2;
+  }
+  return FALSE;
+}
+
+
+/**
+  Read a packet from network. If it's an OK packet, flush it.
+
+  @return  TRUE if error, FALSE otherwise. In case of 
+           success, is_ok_packet is set to TRUE or FALSE,
+           based on what we got from network.
+*/
+
+my_bool opt_flush_ok_packet(MYSQL *mysql, my_bool *is_ok_packet)
+{
+  ulong packet_length= cli_safe_read(mysql);
+
+  if (packet_length == packet_error)
+    return TRUE;
+
+  /* cli_safe_read always reads a non-empty packet. */
+  DBUG_ASSERT(packet_length);
+
+  *is_ok_packet= mysql->net.read_pos[0] == 0;
+  if (*is_ok_packet)
+  {
+    uchar *pos= mysql->net.read_pos + 1;
+
+    net_field_length_ll(&pos); /* affected rows */
+    net_field_length_ll(&pos); /* insert id */
+
+    mysql->server_status=uint2korr(pos);
+    pos+=2;
+
+    if (protocol_41(mysql))
+    {
+      mysql->warning_count=uint2korr(pos);
+      pos+=2;
+    }
+  }
+  return FALSE;
+}
+
+
 /*
   Flush result set sent from server
 */
 
-static void cli_flush_use_result(MYSQL *mysql)
+static void cli_flush_use_result(MYSQL *mysql, my_bool flush_all_results)
 {
   /* Clear the current execution status */
   DBUG_ENTER("cli_flush_use_result");
   DBUG_PRINT("warning",("Not all packets read, clearing them"));
-  for (;;)
+
+  if (flush_one_result(mysql))
+    DBUG_VOID_RETURN;                           /* An error occurred */
+
+  if (! flush_all_results)
+    DBUG_VOID_RETURN;
+
+  while (mysql->server_status & SERVER_MORE_RESULTS_EXISTS)
   {
-    ulong pkt_len;
-    if ((pkt_len=cli_safe_read(mysql)) == packet_error)
-      break;
-    if (pkt_len <= 8 && mysql->net.read_pos[0] == 254)
+    my_bool is_ok_packet;
+    if (opt_flush_ok_packet(mysql, &is_ok_packet))
+      DBUG_VOID_RETURN;                         /* An error occurred. */
+    if (is_ok_packet)
     {
-      if (protocol_41(mysql))
-      {
-        char *pos= (char*) mysql->net.read_pos + 1;
-        mysql->warning_count=uint2korr(pos); pos+=2;
-        mysql->server_status=uint2korr(pos); pos+=2;
-      }
-      break;                            /* End of data */
+      /*
+        Indeed what we got from network was an OK packet, and we
+        know that OK is the last one in a multi-result-set, so
+        just return.
+      */
+      DBUG_VOID_RETURN;
     }
+    /*
+      It's a result set, not an OK packet. A result set contains
+      of two result set subsequences: field metadata, terminated
+      with EOF packet, and result set data, again terminated with
+      EOF packet. Read and flush them.
+    */
+    if (flush_one_result(mysql) || flush_one_result(mysql))
+      DBUG_VOID_RETURN;                         /* An error occurred. */
   }
+
   DBUG_VOID_RETURN;
 }
 
@@ -950,7 +1055,7 @@ mysql_free_result(MYSQL_RES *result)
         mysql->unbuffered_fetch_owner= 0;
       if (mysql->status == MYSQL_STATUS_USE_RESULT)
       {
-        (*mysql->methods->flush_use_result)(mysql);
+        (*mysql->methods->flush_use_result)(mysql, FALSE);
         mysql->status=MYSQL_STATUS_READY;
         if (mysql->unbuffered_fetch_owner)
           *mysql->unbuffered_fetch_owner= TRUE;

=== modified file 'sql/backup/backup_aux.h'
--- a/sql/backup/backup_aux.h	2009-02-04 10:49:16 +0000
+++ b/sql/backup/backup_aux.h	2009-03-04 13:31:31 +0000
@@ -207,8 +207,8 @@ int set_table_list(TABLE_LIST &tl, const
   tl.db= const_cast<char*>(tbl.db().name().ptr());
   tl.lock_type= lock_type;
 
-  tl.mdl_lock_data= mdl_alloc_lock(0, tl.db, tl.table_name, mem); 
-  if (!tl.mdl_lock_data)                    // Failed to allocate lock
+  tl.mdl_request= MDL_request::create(0, tl.db, tl.table_name, mem);
+  if (!tl.mdl_request)
   {
     return 1;
   }

=== modified file 'sql/event_db_repository.cc'
--- a/sql/event_db_repository.cc	2008-12-17 18:40:14 +0000
+++ b/sql/event_db_repository.cc	2009-03-02 21:18:26 +0000
@@ -555,7 +555,7 @@ Event_db_repository::open_event_table(TH
   DBUG_ENTER("Event_db_repository::open_event_table");
 
   tables.init_one_table("mysql", 5, "event", 5, "event", lock_type);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &tables))
   {
@@ -1110,7 +1110,7 @@ Event_db_repository::check_system_tables
 
   /* Check mysql.db */
   tables.init_one_table("mysql", 5, "db", 2, "db", TL_READ);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &tables))
   {
@@ -1128,7 +1128,7 @@ Event_db_repository::check_system_tables
   }
   /* Check mysql.user */
   tables.init_one_table("mysql", 5, "user", 4, "user", TL_READ);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &tables))
   {
@@ -1149,7 +1149,7 @@ Event_db_repository::check_system_tables
   }
   /* Check mysql.event */
   tables.init_one_table("mysql", 5, "event", 5, "event", TL_READ);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &tables))
   {

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2009-02-23 14:53:18 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2009-03-04 13:33:47 +0000
@@ -147,8 +147,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema
 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
                                    bool have_lock);
 
-static MDL_LOCK_DATA binlog_mdl_lock_data;
-static char binlog_mdlkey[MAX_MDLKEY_LENGTH];
+static MDL_request binlog_mdl_request;
 
 /*
   Helper functions
@@ -3008,9 +3007,8 @@ static int open_and_lock_ndb_binlog_inde
   THD_SET_PROC_INFO(thd, "Opening " NDB_REP_DB "." NDB_REP_TABLE);
   tables->required_type= FRMTYPE_TABLE;
   thd->clear_error();
-  mdl_init_lock(&binlog_mdl_lock_data, binlog_mdlkey, 0, tables->db,
-                tables->table_name);
-  tables->mdl_lock_data= &binlog_mdl_lock_data;
+  binlog_mdl_request.init(0, tables->db, tables->table_name);
+  tables->mdl_request= &binlog_mdl_request;
   if (simple_open_n_lock_tables(thd, tables))
   {
     if (thd->killed)

=== modified file 'sql/ha_partition.cc'
--- a/sql/ha_partition.cc	2009-02-13 16:30:54 +0000
+++ b/sql/ha_partition.cc	2009-02-26 21:48:52 +0000
@@ -2381,6 +2381,21 @@ err1:
 /****************************************************************************
                 MODULE open/close object
 ****************************************************************************/
+
+
+/**
+  A destructor for partition-specific TABLE_SHARE data.
+*/
+
+void ha_data_partition_destroy(void *ha_data)
+{
+  if (ha_data)
+  {
+    HA_DATA_PARTITION *ha_data_partition= (HA_DATA_PARTITION*) ha_data;
+    pthread_mutex_destroy(&ha_data_partition->mutex);
+  }
+}
+
 /*
   Open handler object
 
@@ -2536,6 +2551,8 @@ int ha_partition::open(const char *name,
     }
     DBUG_PRINT("info", ("table_share->ha_data 0x%p", ha_data));
     bzero(ha_data, sizeof(HA_DATA_PARTITION));
+    table_share->ha_data_destroy= ha_data_partition_destroy;
+    pthread_mutex_init(&ha_data->mutex, MY_MUTEX_INIT_FAST);
   }
   if (is_not_tmp_table)
     pthread_mutex_unlock(&table_share->LOCK_ha_data);

=== modified file 'sql/ha_partition.h'
--- a/sql/ha_partition.h	2009-02-13 16:30:54 +0000
+++ b/sql/ha_partition.h	2009-02-19 13:06:05 +0000
@@ -45,6 +45,7 @@ typedef struct st_ha_data_partition
 {
   ulonglong next_auto_inc_val;                 /**< first non reserved value */
   bool auto_inc_initialized;
+  pthread_mutex_t mutex;
 } HA_DATA_PARTITION;
 
 #define PARTITION_BYTES_IN_POS 2
@@ -908,14 +909,16 @@ private:
   virtual int reset_auto_increment(ulonglong value);
   virtual void lock_auto_increment()
   {
+    HA_DATA_PARTITION *ha_data; 
     /* lock already taken */
     if (auto_increment_safe_stmt_log_lock)
       return;
     DBUG_ASSERT(table_share->ha_data && !auto_increment_lock);
     if(table_share->tmp_table == NO_TMP_TABLE)
     {
+      ha_data= (HA_DATA_PARTITION*) table_share->ha_data;
       auto_increment_lock= TRUE;
-      pthread_mutex_lock(&table_share->LOCK_ha_data);
+      pthread_mutex_lock(&ha_data->mutex);
     }
   }
   virtual void unlock_auto_increment()
@@ -928,7 +931,8 @@ private:
     */
     if(auto_increment_lock && !auto_increment_safe_stmt_log_lock)
     {
-      pthread_mutex_unlock(&table_share->LOCK_ha_data);
+      HA_DATA_PARTITION *ha_data= (HA_DATA_PARTITION*) table_share->ha_data; 
+      pthread_mutex_unlock(&ha_data->mutex);
       auto_increment_lock= FALSE;
     }
   }

=== modified file 'sql/lock.cc'
--- a/sql/lock.cc	2009-02-16 21:18:45 +0000
+++ b/sql/lock.cc	2009-03-04 13:31:31 +0000
@@ -954,7 +954,7 @@ static MYSQL_LOCK *get_lock_data(THD *th
    @note This function assumes that no metadata locks were acquired
          before calling it. Also it cannot be called while holding
          LOCK_open mutex. Both these invariants are enforced by asserts
-         in mdl_acquire_exclusive_locks() functions.
+         in MDL_context::acquire_exclusive_locks().
 
    @retval FALSE  Success.
    @retval TRUE   Failure (OOM or thread was killed).
@@ -963,25 +963,25 @@ static MYSQL_LOCK *get_lock_data(THD *th
 bool lock_table_names(THD *thd, TABLE_LIST *table_list)
 {
   TABLE_LIST *lock_table;
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_request *mdl_request;
 
   DEBUG_SYNC(thd, "before_wait_locked_tname");
   for (lock_table= table_list; lock_table; lock_table= lock_table->next_local)
   {
-    if (!(mdl_lock_data= mdl_alloc_lock(0, lock_table->db,
-                                        lock_table->table_name,
-                                        thd->mem_root)))
+    mdl_request= MDL_request::create(0, lock_table->db, lock_table->table_name,
+                                     thd->mem_root);
+    if (!mdl_request)
       goto end;
-    mdl_set_lock_type(mdl_lock_data, MDL_EXCLUSIVE);
-    mdl_add_lock(&thd->mdl_context, mdl_lock_data);
-    lock_table->mdl_lock_data= mdl_lock_data;
+    mdl_request->set_type(MDL_EXCLUSIVE);
+    thd->mdl_context.add_request(mdl_request);
+    lock_table->mdl_request= mdl_request;
   }
-  if (mdl_acquire_exclusive_locks(&thd->mdl_context))
+  if (thd->mdl_context.acquire_exclusive_locks())
     goto end;
   return 0;
 
 end:
-  mdl_remove_all_locks(&thd->mdl_context);
+  thd->mdl_context.remove_all_requests();
   return 1;
 }
 
@@ -997,8 +997,8 @@ end:
 void unlock_table_names(THD *thd)
 {
   DBUG_ENTER("unlock_table_names");
-  mdl_release_locks(&thd->mdl_context);
-  mdl_remove_all_locks(&thd->mdl_context);
+  thd->mdl_context.release_all_locks();
+  thd->mdl_context.remove_all_requests();
   DBUG_VOID_RETURN;
 }
 
@@ -1182,7 +1182,7 @@ bool lock_global_read_lock(THD *thd)
             redundancy between metadata locks, global read lock and DDL
             blocker (see WL#4399 and WL#4400).
     */
-    if (mdl_acquire_global_shared_lock(&thd->mdl_context))
+    if (thd->mdl_context.acquire_global_shared_lock())
     {
       /* Our thread was killed -- return back to initial state. */
       pthread_mutex_lock(&LOCK_global_read_lock);
@@ -1216,7 +1216,7 @@ void unlock_global_read_lock(THD *thd)
              ("global_read_lock: %u  global_read_lock_blocks_commit: %u",
               global_read_lock, global_read_lock_blocks_commit));
 
-  mdl_release_global_shared_lock(&thd->mdl_context);
+  thd->mdl_context.release_global_shared_lock();
 
   pthread_mutex_lock(&LOCK_global_read_lock);
   tmp= --global_read_lock;

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2009-02-23 14:53:18 +0000
+++ b/sql/log.cc	2009-03-04 10:04:58 +0000
@@ -1352,7 +1352,7 @@ bool Log_to_csv_event_handler::purge_bac
     tables.init_one_table("mysql", strlen("mysql"), 
                           "backup_history", strlen("backup_history"),
                           "backup_history", TL_READ);
-    alloc_mdl_locks(&tables, thd->mem_root);
+    alloc_mdl_requests(&tables, thd->mem_root);
     res= mysql_truncate(thd, &tables, 1);
     close_thread_tables(thd);
     if (res)
@@ -1364,7 +1364,7 @@ bool Log_to_csv_event_handler::purge_bac
     tables.init_one_table("mysql", strlen("mysql"), 
                           "backup_progress", strlen("backup_progress"),
                           "backup_progress", TL_READ);
-    alloc_mdl_locks(&tables, thd->mem_root);
+    alloc_mdl_requests(&tables, thd->mem_root);
     res= mysql_truncate(thd, &tables, 1);
     close_thread_tables(thd);
   }
@@ -3911,7 +3911,7 @@ my_bool MYSQL_BACKUP_LOG::check_backup_l
   tables.init_one_table("mysql", strlen("mysql"), 
                         "backup_history", strlen("backup_history"),
                         "backup_history", TL_READ);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
   if (simple_open_n_lock_tables(thd, &tables))
   {
     /*
@@ -3934,7 +3934,7 @@ my_bool MYSQL_BACKUP_LOG::check_backup_l
   tables.init_one_table("mysql", strlen("mysql"), 
                         "backup_progress", strlen("backup_progress"),
                         "backup_progress", TL_READ);
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
   if (simple_open_n_lock_tables(thd, &tables))
   {
     /*

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2009-02-23 14:53:18 +0000
+++ b/sql/log_event.cc	2009-03-04 13:33:47 +0000
@@ -7983,8 +7983,8 @@ Table_map_log_event::~Table_map_log_even
 int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
 {
   RPL_TABLE_LIST *table_list;
-  char *db_mem, *tname_mem, *mdlkey;
-  MDL_LOCK_DATA *mdl_lock_data;
+  char *db_mem, *tname_mem;
+  MDL_request *mdl_request;
   size_t dummy_len;
   void *memory;
   DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
@@ -7999,8 +7999,7 @@ int Table_map_log_event::do_apply_event(
                                 &table_list, (uint) sizeof(RPL_TABLE_LIST),
                                 &db_mem, (uint) NAME_LEN + 1,
                                 &tname_mem, (uint) NAME_LEN + 1,
-                                &mdl_lock_data, sizeof(MDL_LOCK_DATA),
-                                &mdlkey, MAX_MDLKEY_LENGTH,
+                                &mdl_request, sizeof(MDL_request),
                                 NullS)))
     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
 
@@ -8013,9 +8012,8 @@ int Table_map_log_event::do_apply_event(
   table_list->updating= 1;
   strmov(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
   strmov(table_list->table_name, m_tblnam);
-  mdl_init_lock(mdl_lock_data, mdlkey, 0, table_list->db,
-                table_list->table_name);
-  table_list->mdl_lock_data= mdl_lock_data;
+  mdl_request->init(0, table_list->db, table_list->table_name);
+  table_list->mdl_request= mdl_request;
 
   int error= 0;
 

=== modified file 'sql/mdl.cc'
--- a/sql/mdl.cc	2009-01-27 13:41:58 +0000
+++ b/sql/mdl.cc	2009-03-05 21:39:58 +0000
@@ -22,123 +22,154 @@
 static bool mdl_initialized= 0;
 
 /**
-   The lock context. Created internally for an acquired lock.
-   For a given name, there exists only one MDL_LOCK instance,
-   and it exists only when the lock has been granted.
-   Can be seen as an MDL subsystem's version of TABLE_SHARE.
-*/
-
-struct MDL_LOCK
-{
-  I_P_List<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> active_shared;
-  /*
-    There can be several upgraders and active exclusive
-    belonging to the same context.
-  */
-  I_P_List<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> active_shared_waiting_upgrade;
-  I_P_List<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> active_exclusive;
-  I_P_List<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> waiting_exclusive;
+  The lock context. Created internally for an acquired lock.
+  For a given name, there exists only one MDL_lock instance,
+  and it exists only when the lock has been granted.
+  Can be seen as an MDL subsystem's version of TABLE_SHARE.
+*/
+
+class MDL_lock
+{
+public:
+  typedef I_P_List<MDL_ticket,
+                   I_P_List_adapter<MDL_ticket,
+                                    &MDL_ticket::next_in_lock,
+                                    &MDL_ticket::prev_in_lock> >
+          Ticket_list;
+
+  typedef Ticket_list::Iterator Ticket_iterator;
+
+  /** The type of lock (shared or exclusive). */
+  enum
+  {
+    MDL_LOCK_SHARED,
+    MDL_LOCK_EXCLUSIVE,
+  } type;
+  /** The key of the object (data) being protected. */
+  MDL_key key;
+  /** List of granted tickets for this lock. */
+  Ticket_list granted;
   /**
-     Number of MDL_LOCK_DATA objects associated with this MDL_LOCK instance
-     and therefore present in one of above lists. Note that this number
-     doesn't account for pending requests for shared lock since we don't
-     associate them with MDL_LOCK and don't keep them in any list.
+    There can be several upgraders and active exclusive
+    locks belonging to the same context. E.g.
+    in case of RENAME t1 to t2, t2 to t3, we attempt to
+    exclusively lock t2 twice.
   */
-  uint   lock_data_count;
+  Ticket_list waiting;
   void   *cached_object;
   mdl_cached_object_release_hook cached_object_release_hook;
 
-  MDL_LOCK() : cached_object(0), cached_object_release_hook(0) {}
-
-  MDL_LOCK_DATA *get_key_owner()
+  bool is_empty() const
   {
-     return !active_shared.is_empty() ?
-            active_shared.head() :
-            (!active_shared_waiting_upgrade.is_empty() ?
-             active_shared_waiting_upgrade.head() :
-             (!active_exclusive.is_empty() ?
-              active_exclusive.head() : waiting_exclusive.head()));
+    return (granted.is_empty() && waiting.is_empty());
   }
 
-  bool has_one_lock_data()
+  bool can_grant_lock(const MDL_context *requestor_ctx,
+                      enum_mdl_type type, bool is_upgrade);
+
+  inline static MDL_lock *create(const MDL_key *key);
+  inline static void destroy(MDL_lock *lock);
+private:
+  MDL_lock(const MDL_key *key_arg)
+  : type(MDL_LOCK_SHARED),
+    key(key_arg),
+    cached_object(NULL),
+    cached_object_release_hook(NULL)
   {
-    return (lock_data_count == 1);
   }
 };
 
 
-pthread_mutex_t LOCK_mdl;
-pthread_cond_t  COND_mdl;
-HASH mdl_locks;
+static pthread_mutex_t LOCK_mdl;
+static pthread_cond_t  COND_mdl;
+static HASH mdl_locks;
 
 /**
-   Structure implementing global metadata lock. The only types
-   of locks which are supported at the moment are shared and
-   intention exclusive locks. Note that the latter type of global
-   lock acquired automatically when one tries to acquire exclusive
-   or shared upgradable lock on particular object.
+  An implementation of the global metadata lock. The only
+  locking modes which are supported at the moment are SHARED and
+  INTENTION EXCLUSIVE. Note, that SHARED global metadata lock
+  is acquired automatically when one tries to acquire an EXCLUSIVE
+  or UPGRADABLE SHARED metadata lock on an individual object.
 */
 
-struct MDL_GLOBAL_LOCK
+class MDL_global_lock
 {
+public:
   uint waiting_shared;
   uint active_shared;
   uint active_intention_exclusive;
-} global_lock;
+
+  bool is_empty() const
+  {
+    return (waiting_shared == 0 && active_shared  == 0 &&
+            active_intention_exclusive == 0);
+  }
+  bool is_lock_type_compatible(enum_mdl_type type, bool is_upgrade) const;
+};
 
 
-extern "C" uchar *mdl_locks_key(const uchar *record, size_t *length,
-                                my_bool not_used __attribute__((unused)))
+static MDL_global_lock global_lock;
+
+
+extern "C"
 {
-  MDL_LOCK *entry=(MDL_LOCK*) record;
-  *length= entry->get_key_owner()->key_length;
-  return (uchar*) entry->get_key_owner()->key;
+static uchar *
+mdl_locks_key(const uchar *record, size_t *length,
+              my_bool not_used __attribute__((unused)))
+{
+  MDL_lock *lock=(MDL_lock*) record;
+  *length= lock->key.length();
+  return (uchar*) lock->key.ptr();
 }
+} /* extern "C" */
 
 
 /**
-   Initialize the metadata locking subsystem.
+  Initialize the metadata locking subsystem.
 
-   This function is called at server startup.
+  This function is called at server startup.
 
-   In particular, initializes the new global mutex and
-   the associated condition variable: LOCK_mdl and COND_mdl.
-   These locking primitives are implementation details of the MDL
-   subsystem and are private to it.
+  In particular, initializes the new global mutex and
+  the associated condition variable: LOCK_mdl and COND_mdl.
+  These locking primitives are implementation details of the MDL
+  subsystem and are private to it.
 
-   Note, that even though the new implementation adds acquisition
-   of a new global mutex to the execution flow of almost every SQL
-   statement, the design capitalizes on that to later save on
-   look ups in the table definition cache. This leads to reduced
-   contention overall and on LOCK_open in particular.
-   Please see the description of mdl_acquire_shared_lock() for details.
+  Note, that even though the new implementation adds acquisition
+  of a new global mutex to the execution flow of almost every SQL
+  statement, the design capitalizes on that to later save on
+  look ups in the table definition cache. This leads to reduced
+  contention overall and on LOCK_open in particular.
+  Please see the description of MDL_context::acquire_shared_lock()
+  for details.
 */
 
 void mdl_init()
 {
-  mdl_initialized= 1;
+  DBUG_ASSERT(! mdl_initialized);
+  mdl_initialized= TRUE;
   pthread_mutex_init(&LOCK_mdl, NULL);
   pthread_cond_init(&COND_mdl, NULL);
   my_hash_init(&mdl_locks, &my_charset_bin, 16 /* FIXME */, 0, 0,
                mdl_locks_key, 0, 0);
-  global_lock.waiting_shared= global_lock.active_shared= 0;
-  global_lock.active_intention_exclusive= 0;
+  /* The global lock is zero-initialized by the loader. */
+  DBUG_ASSERT(global_lock.is_empty());
 }
 
 
 /**
-   Release resources of metadata locking subsystem.
+  Release resources of metadata locking subsystem.
 
-   Destroys the global mutex and the condition variable.
-   Called at server shutdown.
+  Destroys the global mutex and the condition variable.
+  Called at server shutdown.
 */
 
 void mdl_destroy()
 {
   if (mdl_initialized)
   {
-    mdl_initialized= 0;
+    mdl_initialized= FALSE;
     DBUG_ASSERT(!mdl_locks.records);
+    DBUG_ASSERT(global_lock.is_empty());
     pthread_mutex_destroy(&LOCK_mdl);
     pthread_cond_destroy(&COND_mdl);
     my_hash_free(&mdl_locks);
@@ -147,319 +178,328 @@ void mdl_destroy()
 
 
 /**
-   Initialize a metadata locking context.
+  Initialize a metadata locking context.
 
-   This is to be called when a new server connection is created.
+  This is to be called when a new server connection is created.
 */
 
-void mdl_context_init(MDL_CONTEXT *context, THD *thd)
+void MDL_context::init(THD *thd_arg)
 {
-  context->locks.empty();
-  context->thd= thd;
-  context->has_global_shared_lock= FALSE;
+  m_has_global_shared_lock= FALSE;
+  m_thd= thd_arg;
+  /*
+    FIXME: In reset_n_backup_open_tables_state,
+    we abuse "init" as a reset, i.e. call it on an already
+    constructed non-empty object. This is why we can't
+    rely here on the default constructors of I_P_List
+    to empty the list.
+  */
+  m_requests.empty();
+  m_tickets.empty();
 }
 
 
 /**
-   Destroy metadata locking context.
+  Destroy metadata locking context.
 
-   Assumes and asserts that there are no active or pending locks
-   associated with this context at the time of the destruction.
+  Assumes and asserts that there are no active or pending locks
+  associated with this context at the time of the destruction.
 
-   Currently does nothing. Asserts that there are no pending
-   or satisfied lock requests. The pending locks must be released
-   prior to destruction. This is a new way to express the assertion
-   that all tables are closed before a connection is destroyed.
+  Currently does nothing. Asserts that there are no pending
+  or satisfied lock requests. The pending locks must be released
+  prior to destruction. This is a new way to express the assertion
+  that all tables are closed before a connection is destroyed.
 */
 
-void mdl_context_destroy(MDL_CONTEXT *context)
+void MDL_context::destroy()
 {
-  DBUG_ASSERT(context->locks.is_empty());
-  DBUG_ASSERT(!context->has_global_shared_lock);
+  DBUG_ASSERT(m_requests.is_empty());
+  DBUG_ASSERT(m_tickets.is_empty());
+  DBUG_ASSERT(! m_has_global_shared_lock);
 }
 
 
 /**
-   Backup and reset state of meta-data locking context.
+  Backup and reset state of meta-data locking context.
 
-   mdl_context_backup_and_reset(), mdl_context_restore() and
-   mdl_context_merge() are used by HANDLER implementation which
-   needs to open table for new HANDLER independently of already
-   open HANDLERs and add this table/metadata lock to the set of
-   tables open/metadata locks for HANDLERs afterwards.
+  mdl_context_backup_and_reset(), mdl_context_restore() and
+  mdl_context_merge() are used by HANDLER implementation which
+  needs to open table for new HANDLER independently of already
+  open HANDLERs and add this table/metadata lock to the set of
+  tables open/metadata locks for HANDLERs afterwards.
 */
 
-void mdl_context_backup_and_reset(MDL_CONTEXT *ctx, MDL_CONTEXT *backup)
+void MDL_context::backup_and_reset(MDL_context *backup)
 {
-  backup->locks.empty();
-  ctx->locks.swap(backup->locks);
+  DBUG_ASSERT(backup->m_requests.is_empty());
+  DBUG_ASSERT(backup->m_tickets.is_empty());
+
+  m_requests.swap(backup->m_requests);
+  m_tickets.swap(backup->m_tickets);
+
+  backup->m_has_global_shared_lock= m_has_global_shared_lock;
+  /*
+    When the main context is swapped out, one can not take
+    the global shared lock, and one can not rely on it:
+    the functionality in this mode is reduced, since it exists as
+    a temporary hack to support ad-hoc opening of system tables.
+  */
+  m_has_global_shared_lock= FALSE;
 }
 
 
 /**
-   Restore state of meta-data locking context from backup.
+  Restore state of meta-data locking context from backup.
 */
 
-void mdl_context_restore(MDL_CONTEXT *ctx, MDL_CONTEXT *backup)
+void MDL_context::restore_from_backup(MDL_context *backup)
 {
-  DBUG_ASSERT(ctx->locks.is_empty());
-  ctx->locks.swap(backup->locks);
+  DBUG_ASSERT(m_requests.is_empty());
+  DBUG_ASSERT(m_tickets.is_empty());
+  DBUG_ASSERT(m_has_global_shared_lock == FALSE);
+
+  m_requests.swap(backup->m_requests);
+  m_tickets.swap(backup->m_tickets);
+  m_has_global_shared_lock= backup->m_has_global_shared_lock;
 }
 
 
 /**
-   Merge meta-data locks from one context into another.
+  Merge meta-data locks from one context into another.
 */
 
-void mdl_context_merge(MDL_CONTEXT *dst, MDL_CONTEXT *src)
+void MDL_context::merge(MDL_context *src)
 {
-  MDL_LOCK_DATA *lock_data;
+  MDL_ticket *ticket;
+  MDL_request *mdl_request;
 
-  DBUG_ASSERT(dst->thd == src->thd);
+  DBUG_ASSERT(m_thd == src->m_thd);
+
+  if (!src->m_requests.is_empty())
+  {
+    Request_iterator it(src->m_requests);
+    while ((mdl_request= it++))
+      m_requests.push_front(mdl_request);
+    src->m_requests.empty();
+  }
 
-  if (!src->locks.is_empty())
+  if (!src->m_tickets.is_empty())
   {
-    I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(src->locks);
-    while ((lock_data= it++))
+    Ticket_iterator it(src->m_tickets);
+    while ((ticket= it++))
     {
-      DBUG_ASSERT(lock_data->ctx);
-      lock_data->ctx= dst;
-      dst->locks.push_front(lock_data);
+      DBUG_ASSERT(ticket->m_ctx);
+      ticket->m_ctx= this;
+      m_tickets.push_front(ticket);
     }
-    src->locks.empty();
+    src->m_tickets.empty();
   }
+  /*
+    MDL_context::merge() is a hack used in one place only: to open
+    an SQL handler. We never acquire the global shared lock there.
+  */
+  DBUG_ASSERT(! src->m_has_global_shared_lock);
 }
 
 
 /**
-   Initialize a lock request.
+  Initialize a lock request.
 
-   This is to be used for every lock request.
+  This is to be used for every lock request.
 
-   Note that initialization and allocation are split
-   into two calls. This is to allow flexible memory management
-   of lock requests. Normally a lock request is stored
-   in statement memory (e.g. is a member of struct TABLE_LIST),
-   but we would also like to allow allocation of lock
-   requests in other memory roots, for example in the grant
-   subsystem, to lock privilege tables.
+  Note that initialization and allocation are split into two
+  calls. This is to allow flexible memory management of lock
+  requests. Normally a lock request is stored in statement memory
+  (e.g. is a member of struct TABLE_LIST), but we would also like
+  to allow allocation of lock requests in other memory roots,
+  for example in the grant subsystem, to lock privilege tables.
 
-   The MDL subsystem does not own or manage memory of lock
-   requests. Instead it assumes that the life time of every lock
-   request encloses calls to mdl_acquire_shared_lock() and
-   mdl_release_locks().
+  The MDL subsystem does not own or manage memory of lock requests.
+  Instead it assumes that the life time of every lock request (including
+  encompassed members db/name) encloses calls to MDL_context::add_request()
+  and MDL_context::remove_request() or MDL_context::remove_all_requests().
 
-   @param  lock_data  Pointer to an MDL_LOCK_DATA object to initialize
-   @param  key_buff   Pointer to the buffer for key for the lock request
-                      (should be at least 4+ strlen(db) + 1 + strlen(name)
-                      + 1 bytes, or, if the lengths are not known,
-                      MAX_MDLKEY_LENGTH)
-   @param  type       Id of type of object to be locked
-   @param  db         Name of database to which the object belongs
-   @param  name       Name of of the object
+  @param  type       Id of type of object to be locked
+  @param  db         Name of database to which the object belongs
+  @param  name       Name of of the object
 
-   Stores the database name, object name and the type in the key
-   buffer. Initializes mdl_el to point to the key.
-   We can't simply initialize MDL_LOCK_DATA with type, db and name
-   by-pointer because of the underlying HASH implementation
-   requires the key to be a contiguous buffer.
+  The initialized lock request will have MDL_SHARED type.
 
-   The initialized lock request will have MDL_SHARED type.
-
-   Suggested lock types: TABLE - 0 PROCEDURE - 1 FUNCTION - 2
-   Note that tables and views have the same lock type, since
-   they share the same name space in the SQL standard.
+  Suggested lock types: TABLE - 0 PROCEDURE - 1 FUNCTION - 2
+  Note that tables and views must have the same lock type, since
+  they share the same name space in the SQL standard.
 */
 
-void mdl_init_lock(MDL_LOCK_DATA *lock_data, char *key, int type,
-                   const char *db, const char *name)
+void MDL_request::init(unsigned char type_arg,
+                       const char *db_arg,
+                       const char *name_arg)
 {
-  int4store(key, type);
-  lock_data->key_length= (uint) (strmov(strmov(key+4, db)+1, name)-key)+1;
-  lock_data->key= key;
-  lock_data->type= MDL_SHARED;
-  lock_data->state= MDL_INITIALIZED;
-#ifndef DBUG_OFF
-  lock_data->ctx= 0;
-  lock_data->lock= 0;
-#endif
+  key.mdl_key_init(type_arg, db_arg, name_arg);
+  type= MDL_SHARED;
+  ticket= NULL;
 }
 
 
 /**
-   Allocate and initialize one lock request.
+  Allocate and initialize one lock request.
 
-   Same as mdl_init_lock(), but allocates the lock and the key buffer
-   on a memory root. Necessary to lock ad-hoc tables, e.g.
-   mysql.* tables of grant and data dictionary subsystems.
+  Same as mdl_init_lock(), but allocates the lock and the key buffer
+  on a memory root. Necessary to lock ad-hoc tables, e.g.
+  mysql.* tables of grant and data dictionary subsystems.
 
-   @param  type       Id of type of object to be locked
-   @param  db         Name of database to which object belongs
-   @param  name       Name of of object
-   @param  root       MEM_ROOT on which object should be allocated
+  @param  type       Id of type of object to be locked
+  @param  db         Name of database to which object belongs
+  @param  name       Name of of object
+  @param  root       MEM_ROOT on which object should be allocated
 
-   @note The allocated lock request will have MDL_SHARED type.
+  @note The allocated lock request will have MDL_SHARED type.
 
-   @retval 0      Error if out of memory
-   @retval non-0  Pointer to an object representing a lock request
+  @retval 0      Error if out of memory
+  @retval non-0  Pointer to an object representing a lock request
 */
 
-MDL_LOCK_DATA *mdl_alloc_lock(int type, const char *db, const char *name,
-                              MEM_ROOT *root)
+MDL_request *
+MDL_request::create(unsigned char type, const char *db,
+                    const char *name, MEM_ROOT *root)
 {
-  MDL_LOCK_DATA *lock_data;
-  char *key;
+  MDL_request *mdl_request;
 
-  if (!multi_alloc_root(root, &lock_data, sizeof(MDL_LOCK_DATA), &key,
-                        MAX_MDLKEY_LENGTH, NULL))
+  if (!(mdl_request= (MDL_request*) alloc_root(root, sizeof(MDL_request))))
     return NULL;
 
-  mdl_init_lock(lock_data, key, type, db, name);
+  mdl_request->init(type, db, name);
 
-  return lock_data;
+  return mdl_request;
 }
 
 
 /**
-   Add a lock request to the list of lock requests of the context.
+  Add a lock request to the list of lock requests of the context.
 
-   The procedure to acquire metadata locks is:
-     - allocate and initialize lock requests (mdl_alloc_lock())
-     - associate them with a context (mdl_add_lock())
-     - call mdl_acquire_shared_lock()/mdl_release_lock() (maybe repeatedly).
+  The procedure to acquire metadata locks is:
+    - allocate and initialize lock requests
+    (MDL_request::create())
+    - associate them with a context (MDL_context::add_request())
+    - call MDL_context::acquire_shared_lock() and
+    MDL_context::release_lock() (maybe repeatedly).
 
-   Associates a lock request with the given context.
+  Associates a lock request with the given context.
+  There should be no more than one context per connection, to
+  avoid deadlocks.
 
-   @param  context    The MDL context to associate the lock with.
-                      There should be no more than one context per
-                      connection, to avoid deadlocks.
-   @param  lock_data  The lock request to be added.
+  @param  mdl_request   The lock request to be added.
 */
 
-void mdl_add_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data)
+void MDL_context::add_request(MDL_request *mdl_request)
 {
-  DBUG_ENTER("mdl_add_lock");
-  DBUG_ASSERT(lock_data->state == MDL_INITIALIZED);
-  DBUG_ASSERT(!lock_data->ctx);
-  lock_data->ctx= context;
-  context->locks.push_front(lock_data);
+  DBUG_ENTER("MDL_context::add_request");
+  DBUG_ASSERT(mdl_request->ticket == NULL);
+  m_requests.push_front(mdl_request);
   DBUG_VOID_RETURN;
 }
 
 
 /**
-   Remove a lock request from the list of lock requests of the context.
+  Remove a lock request from the list of lock requests.
 
-   Disassociates a lock request from the given context.
+  Disassociates a lock request from the given context.
 
-   @param  context    The MDL context to remove the lock from.
-   @param  lock_data  The lock request to be removed.
+  @param  mdl_request   The lock request to be removed.
 
-   @pre The lock request being removed should correspond to lock which
-        was released or was not acquired.
+  @pre The lock request being removed should correspond to a ticket that
+       was released or was not acquired.
 
-   @note Resets lock request for lock released back to its initial state
-         (i.e. sets type to MDL_SHARED).
+  @note Resets lock request back to its initial state
+        (i.e. sets type to MDL_SHARED).
 */
 
-void mdl_remove_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data)
+void MDL_context::remove_request(MDL_request *mdl_request)
 {
-  DBUG_ENTER("mdl_remove_lock");
-  DBUG_ASSERT(lock_data->state == MDL_INITIALIZED);
-  DBUG_ASSERT(context == lock_data->ctx);
+  DBUG_ENTER("MDL_context::remove_request");
   /* Reset lock request back to its initial state. */
-  lock_data->type= MDL_SHARED;
-#ifndef DBUG_OFF
-  lock_data->ctx= 0;
-#endif
-  context->locks.remove(lock_data);
+  mdl_request->type= MDL_SHARED;
+  mdl_request->ticket= NULL;
+  m_requests.remove(mdl_request);
   DBUG_VOID_RETURN;
 }
 
 
 /**
-   Clear all lock requests in the context (clear the context).
-
-   Disassociates lock requests from the context.
-   All granted locks must be released prior to calling this
-   function.
-
-   In other words, the expected procedure to release locks is:
-     - mdl_release_locks();
-     - mdl_remove_all_locks();
-
-   We could possibly merge mdl_remove_all_locks() and mdl_release_locks(),
-   but this function comes in handy when we need to back off: in that case
-   we release all the locks acquired so-far but do not free them, since
-   we know that the respective lock requests will be used again.
-
-   Also resets lock requests back to their initial state (i.e. MDL_SHARED).
+  Clear all lock requests in the context.
+  Disassociates lock requests from the context.
 
-   @param context Context to be cleared.
+  Also resets lock requests back to their initial state (i.e. MDL_SHARED).
 */
 
-void mdl_remove_all_locks(MDL_CONTEXT *context)
+void MDL_context::remove_all_requests()
 {
-  MDL_LOCK_DATA *lock_data;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
-  while ((lock_data= it++))
+  MDL_request *mdl_request;
+  Request_iterator it(m_requests);
+  while ((mdl_request= it++))
   {
     /* Reset lock request back to its initial state. */
-    lock_data->type= MDL_SHARED;
-#ifndef DBUG_OFF
-    lock_data->ctx= 0;
-#endif
+    mdl_request->type= MDL_SHARED;
+    mdl_request->ticket= NULL;
   }
-  context->locks.empty();
+  m_requests.empty();
 }
 
 
 /**
-   Auxiliary functions needed for creation/destruction of MDL_LOCK
-   objects.
+  Auxiliary functions needed for creation/destruction of MDL_lock objects.
 
-   @todo This naive implementation should be replaced with one that saves
-         on memory allocation by reusing released objects.
+  @todo This naive implementation should be replaced with one that saves
+        on memory allocation by reusing released objects.
 */
 
-static MDL_LOCK* get_lock_object(void)
+inline MDL_lock *MDL_lock::create(const MDL_key *mdl_key)
 {
-  return new MDL_LOCK();
+  return new MDL_lock(mdl_key);
 }
 
 
-static void release_lock_object(MDL_LOCK *lock)
+void MDL_lock::destroy(MDL_lock *lock)
 {
   delete lock;
 }
 
 
 /**
-   Helper functions which simplifies writing various checks and asserts.
+  Auxiliary functions needed for creation/destruction of MDL_ticket
+  objects.
+
+  @todo This naive implementation should be replaced with one that saves
+        on memory allocation by reusing released objects.
 */
 
-static bool is_shared(MDL_LOCK_DATA *lock_data)
+MDL_ticket *MDL_ticket::create(MDL_context *ctx_arg, enum_mdl_type type_arg)
+{
+  return new MDL_ticket(ctx_arg, type_arg);
+}
+
+
+void MDL_ticket::destroy(MDL_ticket *ticket)
 {
-  return (lock_data->type < MDL_EXCLUSIVE);
+  delete ticket;
 }
 
 
 /**
-   Helper functions and macros to be used for killable waiting in metadata
-   locking subsystem.
+  Helper functions and macros to be used for killable waiting in metadata
+  locking subsystem.
 
-   @sa THD::enter_cond()/exit_cond()/killed.
+  @sa THD::enter_cond()/exit_cond()/killed.
 
-   @note We can't use THD::enter_cond()/exit_cond()/killed directly here
-         since this will make metadata subsystem dependant on THD class
-         and thus prevent us from writing unit tests for it. And usage of
-         wrapper functions to access THD::killed/enter_cond()/exit_cond()
-         will probably introduce too much overhead.
+  @note We can't use THD::enter_cond()/exit_cond()/killed directly here
+        since this will make metadata subsystem dependant on THD class
+        and thus prevent us from writing unit tests for it. And usage of
+        wrapper functions to access THD::killed/enter_cond()/exit_cond()
+        will probably introduce too much overhead.
 */
 
 #define MDL_ENTER_COND(A, B) mdl_enter_cond(A, B, __func__, __FILE__, __LINE__)
 
-static inline const char* mdl_enter_cond(MDL_CONTEXT *context,
+static inline const char *mdl_enter_cond(THD *thd,
                                          st_my_thread_var *mysys_var,
                                          const char *calling_func,
                                          const char *calling_file,
@@ -470,15 +510,15 @@ static inline const char* mdl_enter_cond
   mysys_var->current_mutex= &LOCK_mdl;
   mysys_var->current_cond= &COND_mdl;
 
-  DEBUG_SYNC(context->thd, "mdl_enter_cond");
+  DEBUG_SYNC(thd, "mdl_enter_cond");
 
-  return set_thd_proc_info(context->thd, "Waiting for table",
+  return set_thd_proc_info(thd, "Waiting for table",
                            calling_func, calling_file, calling_line);
 }
 
 #define MDL_EXIT_COND(A, B, C) mdl_exit_cond(A, B, C, __func__, __FILE__, __LINE__)
 
-static inline void mdl_exit_cond(MDL_CONTEXT *context,
+static inline void mdl_exit_cond(THD *thd,
                                  st_my_thread_var *mysys_var,
                                  const char* old_msg,
                                  const char *calling_func,
@@ -493,61 +533,63 @@ static inline void mdl_exit_cond(MDL_CON
   mysys_var->current_cond= 0;
   pthread_mutex_unlock(&mysys_var->mutex);
 
-  DEBUG_SYNC(context->thd, "mdl_exit_cond");
+  DEBUG_SYNC(thd, "mdl_exit_cond");
 
-  (void) set_thd_proc_info(context->thd, old_msg, calling_func,
+  (void) set_thd_proc_info(thd, old_msg, calling_func,
                            calling_file, calling_line);
 }
 
 
 /**
-   Check if request for the lock on particular object can be satisfied given
-   current state of the global metadata lock.
-
-   @note In other words, we're trying to check that the individual lock
-         request, implying a form of lock on the global metadata, is
-         compatible with the current state of the global metadata lock.
-
-   @param lock_data Request for lock on an individual object, implying a
-                    certain kind of global metadata lock.
-
-   @retval TRUE  - Lock request can be satisfied
-   @retval FALSE - There is some conflicting lock
-
-   Here is a compatibility matrix defined by this function:
-
-                   |             | Satisfied or pending requests
-                   |             | for global metadata lock
-   ----------------+-------------+--------------------------------------------
-   Type of request | Correspond. |
-   for indiv. lock | global lock | Active-S  Pending-S  Active-IS(**) Active-IX
-   ----------------+-------------+--------------------------------------------
-   S, high-prio S  |   IS        |    +         +          +             +
-   upgradable S    |   IX        |    -         -          +             +
-   X               |   IX        |    -         -          +             +
-   S upgraded to X |   IX (*)    |    0         +          +             +
-
-   Here: "+" -- means that request can be satisfied
-         "-" -- means that request can't be satisfied and should wait
-         "0" -- means impossible situation which will trigger assert
-
-   (*)  Since for upgradable shared locks we always take intention exclusive
-        global lock at the same time when obtaining the shared lock, there
-        is no need to obtain such lock during the upgrade itself.
-   (**) Since intention shared global locks are compatible with all other
-        type of locks we don't even have any accounting for them.
-*/
+  Check if request for the lock on particular object can be satisfied given
+  current state of the global metadata lock.
 
-static bool can_grant_global_lock(MDL_LOCK_DATA *lock_data)
+  @note In other words, we're trying to check that the individual lock
+        request, implying a form of lock on the global metadata, is
+        compatible with the current state of the global metadata lock.
+
+  @param mdl_request  Request for lock on an individual object, implying a
+                      certain kind of global metadata lock.
+
+  @retval TRUE  - Lock request can be satisfied
+  @retval FALSE - There is some conflicting lock
+
+  Here is a compatibility matrix defined by this function:
+
+                  |             | Satisfied or pending requests
+                  |             | for global metadata lock
+  ----------------+-------------+--------------------------------------------
+  Type of request | Correspond. |
+  for indiv. lock | global lock | Active-S  Pending-S  Active-IS(**) Active-IX
+  ----------------+-------------+--------------------------------------------
+  S, high-prio S  |   IS        |    +         +          +             +
+  upgradable S    |   IX        |    -         -          +             +
+  X               |   IX        |    -         -          +             +
+  S upgraded to X |   IX (*)    |    0         +          +             +
+
+  Here: "+" -- means that request can be satisfied
+        "-" -- means that request can't be satisfied and should wait
+        "0" -- means impossible situation which will trigger assert
+
+  (*)  Since for upgradable shared locks we always take intention exclusive
+       global lock at the same time when obtaining the shared lock, there
+       is no need to obtain such lock during the upgrade itself.
+  (**) Since intention shared global locks are compatible with all other
+       type of locks we don't even have any accounting for them.
+*/
+
+bool
+MDL_global_lock::is_lock_type_compatible(enum_mdl_type type,
+                                         bool is_upgrade) const
 {
-  switch (lock_data->type)
+  switch (type)
   {
   case MDL_SHARED:
   case MDL_SHARED_HIGH_PRIO:
     return TRUE;
     break;
   case MDL_SHARED_UPGRADABLE:
-    if (global_lock.active_shared || global_lock.waiting_shared)
+    if (active_shared || waiting_shared)
     {
       /*
         We are going to obtain intention exclusive global lock and
@@ -559,7 +601,7 @@ static bool can_grant_global_lock(MDL_LO
       return TRUE;
     break;
   case MDL_EXCLUSIVE:
-    if (lock_data->state == MDL_PENDING_UPGRADE)
+    if (is_upgrade)
     {
       /*
         We are upgrading MDL_SHARED to MDL_EXCLUSIVE.
@@ -567,13 +609,12 @@ static bool can_grant_global_lock(MDL_LO
         There should be no conflicting global locks since for each upgradable
         shared lock we obtain intention exclusive global lock first.
       */
-      DBUG_ASSERT(global_lock.active_shared == 0 &&
-                  global_lock.active_intention_exclusive);
+      DBUG_ASSERT(active_shared == 0 && active_intention_exclusive);
       return TRUE;
     }
     else
     {
-      if (global_lock.active_shared || global_lock.waiting_shared)
+      if (active_shared || waiting_shared)
       {
         /*
           We are going to obtain intention exclusive global lock and
@@ -593,229 +634,287 @@ static bool can_grant_global_lock(MDL_LO
 
 
 /**
-   Check if request for the lock can be satisfied given current state of lock.
+  Check if request for the lock can be satisfied given current state of lock.
 
-   @param  lock       Lock.
-   @param  lock_data  Request for lock.
+  @param  lock        Lock.
+  @param  mdl_request Request for lock.
 
-   @retval TRUE   Lock request can be satisfied
-   @retval FALSE  There is some conflicting lock.
+  @retval TRUE   Lock request can be satisfied
+  @retval FALSE  There is some conflicting lock.
 
-   This function defines the following compatibility matrix for metadata locks:
+  This function defines the following compatibility matrix for metadata locks:
 
-                   | Satisfied or pending requests which we have in MDL_LOCK
-   ----------------+---------------------------------------------------------
-   Current request | Active-S  Pending-X Active-X Act-S-pend-upgrade-to-X
-   ----------------+---------------------------------------------------------
-   S, upgradable S |    +         -         - (*)           -
-   High-prio S     |    +         +         -               +
-   X               |    -         +         -               -
-   S upgraded to X |    - (**)    +         0               0
+                  | Satisfied or pending requests which we have in MDL_lock
+  ----------------+---------------------------------------------------------
+  Current request | Active-S  Pending-X Active-X Act-S-pend-upgrade-to-X
+  ----------------+---------------------------------------------------------
+  S, upgradable S |    +         -         - (*)           -
+  High-prio S     |    +         +         -               +
+  X               |    -         +         -               -
+  S upgraded to X |    - (**)    +         0               0
 
-   Here: "+" -- means that request can be satisfied
-         "-" -- means that request can't be satisfied and should wait
-         "0" -- means impossible situation which will trigger assert
+  Here: "+" -- means that request can be satisfied
+        "-" -- means that request can't be satisfied and should wait
+        "0" -- means impossible situation which will trigger assert
 
-   (*)  Unless active exclusive lock belongs to the same context as shared
-        lock being requested.
-   (**) Unless all active shared locks belong to the same context as one
-        being upgraded.
+  (*)  Unless active exclusive lock belongs to the same context as shared
+       lock being requested.
+  (**) Unless all active shared locks belong to the same context as one
+       being upgraded.
 */
 
-static bool can_grant_lock(MDL_LOCK *lock, MDL_LOCK_DATA *lock_data)
+bool
+MDL_lock::can_grant_lock(const MDL_context *requestor_ctx, enum_mdl_type type_arg,
+                         bool is_upgrade)
 {
-  switch (lock_data->type)
-  {
+  bool can_grant= FALSE;
+
+  switch (type_arg) {
   case MDL_SHARED:
   case MDL_SHARED_UPGRADABLE:
   case MDL_SHARED_HIGH_PRIO:
-    if ((lock->active_exclusive.is_empty() &&
-         (lock_data->type == MDL_SHARED_HIGH_PRIO ||
-          lock->waiting_exclusive.is_empty() &&
-          lock->active_shared_waiting_upgrade.is_empty())) ||
-        (!lock->active_exclusive.is_empty() &&
-         lock->active_exclusive.head()->ctx == lock_data->ctx))
+    if (type == MDL_lock::MDL_LOCK_SHARED)
+    {
+      /* Pending exclusive locks have higher priority over shared locks. */
+      if (waiting.is_empty() || type_arg == MDL_SHARED_HIGH_PRIO)
+        can_grant= TRUE;
+    }
+    else if (granted.head()->get_ctx() == requestor_ctx)
     {
       /*
         When exclusive lock comes from the same context we can satisfy our
         shared lock. This is required for CREATE TABLE ... SELECT ... and
         ALTER VIEW ... AS ....
       */
-      return TRUE;
+      can_grant= TRUE;
     }
-    else
-      return FALSE;
     break;
   case MDL_EXCLUSIVE:
-    if (lock_data->state == MDL_PENDING_UPGRADE)
+    if (is_upgrade)
     {
       /* We are upgrading MDL_SHARED to MDL_EXCLUSIVE. */
-      MDL_LOCK_DATA *conf_lock_data;
-      I_P_List_iterator<MDL_LOCK_DATA,
-                        MDL_LOCK_DATA_lock> it(lock->active_shared);
+      MDL_ticket *conflicting_ticket;
+      MDL_lock::Ticket_iterator it(granted);
 
       /*
         There should be no active exclusive locks since we own shared lock
         on the object.
       */
-      DBUG_ASSERT(lock->active_exclusive.is_empty() &&
-                  lock->active_shared_waiting_upgrade.head() == lock_data);
+      DBUG_ASSERT(type == MDL_lock::MDL_LOCK_SHARED);
 
-      while ((conf_lock_data= it++))
+      while ((conflicting_ticket= it++))
       {
         /*
           When upgrading shared lock to exclusive one we can have other shared
           locks for the same object in the same context, e.g. in case when several
           instances of TABLE are open.
         */
-        if (conf_lock_data->ctx != lock_data->ctx)
-          return FALSE;
+        if (conflicting_ticket->get_ctx() != requestor_ctx)
+          break;
       }
-      return TRUE;
+      /* Grant lock if there are no conflicting shared locks. */
+      if (conflicting_ticket == NULL)
+        can_grant= TRUE;
+      break;
     }
-    else
+    else if (type == MDL_lock::MDL_LOCK_SHARED)
     {
-      return (lock->active_exclusive.is_empty() &&
-              lock->active_shared_waiting_upgrade.is_empty() &&
-              lock->active_shared.is_empty());
+      can_grant= granted.is_empty();
     }
     break;
   default:
     DBUG_ASSERT(0);
   }
-  return FALSE;
+  return can_grant;
 }
 
 
 /**
-   Try to acquire one shared lock.
+  Check whether the context already holds a compatible lock ticket
+  on a object. Only shared locks can be recursive.
 
-   Unlike exclusive locks, shared locks are acquired one by
-   one. This is interface is chosen to simplify introduction of
-   the new locking API to the system. mdl_acquire_shared_lock()
-   is currently used from open_table(), and there we have only one
-   table to work with.
+  @param mdl_request  Lock request object for lock to be acquired
 
-   In future we may consider allocating multiple shared locks at once.
+  @return A pointer to the lock ticket for the object or NULL otherwise.
+*/
 
-   This function must be called after the lock is added to a context.
+MDL_ticket *
+MDL_context::find_ticket(MDL_request *mdl_request)
+{
+  MDL_ticket *ticket;
+  Ticket_iterator it(m_tickets);
+
+  DBUG_ASSERT(mdl_request->is_shared());
+
+  while ((ticket= it++))
+  {
+    if (mdl_request->type == ticket->m_type &&
+        mdl_request->key.is_equal(&ticket->m_lock->key))
+      break;
+  }
+
+  return ticket;
+}
+
+
+/**
+  Try to acquire one shared lock.
+
+  Unlike exclusive locks, shared locks are acquired one by
+  one. This is interface is chosen to simplify introduction of
+  the new locking API to the system. MDL_context::acquire_shared_lock()
+  is currently used from open_table(), and there we have only one
+  table to work with.
+
+  In future we may consider allocating multiple shared locks at once.
 
-   @param context    [in]  Context containing request for lock
-   @param lock_data  [in]  Lock request object for lock to be acquired
-   @param retry      [out] Indicates that conflicting lock exists and another
-                           attempt should be made after releasing all current
-                           locks and waiting for conflicting lock go away
-                           (using mdl_wait_for_locks()).
+  This function must be called after the lock is added to a context.
 
-   @retval  FALSE   Success.
-   @retval  TRUE    Failure. Either error occured or conflicting lock exists.
-                    In the latter case "retry" parameter is set to TRUE.
+  @param mdl_request [in] Lock request object for lock to be acquired
+  @param retry      [out] Indicates that conflicting lock exists and another
+                          attempt should be made after releasing all current
+                          locks and waiting for conflicting lock go away
+                          (using MDL_context::wait_for_locks()).
+
+  @retval  FALSE   Success.
+  @retval  TRUE    Failure. Either error occurred or conflicting lock exists.
+                   In the latter case "retry" parameter is set to TRUE.
 */
 
-bool mdl_acquire_shared_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data,
-                             bool *retry)
+bool
+MDL_context::acquire_shared_lock(MDL_request *mdl_request, bool *retry)
 {
-  MDL_LOCK *lock;
+  MDL_lock *lock;
+  MDL_key *key= &mdl_request->key;
+  MDL_ticket *ticket;
   *retry= FALSE;
 
-  DBUG_ASSERT(is_shared(lock_data) && lock_data->state == MDL_INITIALIZED);
-
-  DBUG_ASSERT(lock_data->ctx == context);
+  DBUG_ASSERT(mdl_request->is_shared() && mdl_request->ticket == NULL);
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
-  if (context->has_global_shared_lock &&
-      lock_data->type == MDL_SHARED_UPGRADABLE)
+  if (m_has_global_shared_lock &&
+      mdl_request->type == MDL_SHARED_UPGRADABLE)
   {
     my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
     return TRUE;
   }
 
+  /*
+    Check whether the context already holds a shared lock on the object,
+    and if so, grant the request.
+  */
+  if ((ticket= find_ticket(mdl_request)))
+  {
+    DBUG_ASSERT(ticket->m_state == MDL_ACQUIRED);
+    mdl_request->ticket= ticket;
+    return FALSE;
+  }
+
   pthread_mutex_lock(&LOCK_mdl);
 
-  if (!can_grant_global_lock(lock_data))
+  if (!global_lock.is_lock_type_compatible(mdl_request->type, FALSE))
   {
     pthread_mutex_unlock(&LOCK_mdl);
     *retry= TRUE;
     return TRUE;
   }
 
-  if (!(lock= (MDL_LOCK*) my_hash_search(&mdl_locks, (uchar*)lock_data->key,
-                                         lock_data->key_length)))
+  if (!(ticket= MDL_ticket::create(this, mdl_request->type)))
   {
-    if (!(lock= get_lock_object()))
-    {
-      pthread_mutex_unlock(&LOCK_mdl);
-      return TRUE;
-    }
-    /*
-      Before inserting MDL_LOCK object into hash we should add at least one
-      MDL_LOCK_DATA to its lists in order to provide key for this element.
-      Thus we can't merge two branches of the above if-statement.
-    */
-    lock->active_shared.push_front(lock_data);
-    lock->lock_data_count= 1;
-    if (my_hash_insert(&mdl_locks, (uchar*)lock))
+    pthread_mutex_unlock(&LOCK_mdl);
+    return TRUE;
+  }
+
+  if (!(lock= (MDL_lock*) my_hash_search(&mdl_locks,
+                                         key->ptr(), key->length())))
+  {
+    /* Default lock type is MDL_lock::MDL_LOCK_SHARED */
+    lock= MDL_lock::create(key);
+    if (!lock || my_hash_insert(&mdl_locks, (uchar*)lock))
     {
-      release_lock_object(lock);
+      MDL_lock::destroy(lock);
+      MDL_ticket::destroy(ticket);
       pthread_mutex_unlock(&LOCK_mdl);
       return TRUE;
     }
-    lock_data->state= MDL_ACQUIRED;
-    lock_data->lock= lock;
-    if (lock_data->type == MDL_SHARED_UPGRADABLE)
+  }
+
+  if (lock->can_grant_lock(this, mdl_request->type, FALSE))
+  {
+    mdl_request->ticket= ticket;
+    lock->granted.push_front(ticket);
+    m_tickets.push_front(ticket);
+    ticket->m_state= MDL_ACQUIRED;
+    ticket->m_lock= lock;
+    if (mdl_request->type == MDL_SHARED_UPGRADABLE)
       global_lock.active_intention_exclusive++;
   }
   else
   {
-    if (can_grant_lock(lock, lock_data))
-    {
-      lock->active_shared.push_front(lock_data);
-      lock->lock_data_count++;
-      lock_data->state= MDL_ACQUIRED;
-      lock_data->lock= lock;
-      if (lock_data->type == MDL_SHARED_UPGRADABLE)
-        global_lock.active_intention_exclusive++;
-    }
-    else
-      *retry= TRUE;
+    /* We can't get here if we allocated a new lock. */
+    DBUG_ASSERT(! lock->is_empty());
+    *retry= TRUE;
+    MDL_ticket::destroy(ticket);
   }
+
   pthread_mutex_unlock(&LOCK_mdl);
 
   return *retry;
 }
 
 
-static void release_lock(MDL_LOCK_DATA *lock_data);
+/**
+  Notify a thread holding a shared metadata lock which
+  conflicts with a pending exclusive lock.
+
+  @param thd               Current thread context
+  @param conflicting_ticket  Conflicting metadata lock
+
+  @retval TRUE   A thread was woken up
+  @retval FALSE  Lock is not a shared one or no thread was woken up
+*/
+
+static bool notify_shared_lock(THD *thd, MDL_ticket *conflicting_ticket)
+{
+  bool woke= FALSE;
+  if (conflicting_ticket->is_shared())
+  {
+    THD *conflicting_thd= conflicting_ticket->get_ctx()->get_thd();
+    woke= mysql_notify_thread_having_shared_lock(thd, conflicting_thd);
+  }
+  return woke;
+}
 
 
 /**
-   Acquire exclusive locks. The context must contain the list of
-   locks to be acquired. There must be no granted locks in the
-   context.
+  Acquire exclusive locks. The context must contain the list of
+  locks to be acquired. There must be no granted locks in the
+  context.
 
-   This is a replacement of lock_table_names(). It is used in
-   RENAME, DROP and other DDL SQL statements.
+  This is a replacement of lock_table_names(). It is used in
+  RENAME, DROP and other DDL SQL statements.
 
-   @param context  A context containing requests for exclusive locks
-                   The context may not have other lock requests.
+  @note The MDL context may not have non-exclusive lock requests
+        or acquired locks.
 
-   @retval FALSE  Success
-   @retval TRUE   Failure
+  @retval FALSE  Success
+  @retval TRUE   Failure
 */
 
-bool mdl_acquire_exclusive_locks(MDL_CONTEXT *context)
+bool MDL_context::acquire_exclusive_locks()
 {
-  MDL_LOCK_DATA *lock_data;
-  MDL_LOCK *lock;
+  MDL_lock *lock;
   bool signalled= FALSE;
   const char *old_msg;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
+  MDL_request *mdl_request;
+  MDL_ticket *ticket;
   st_my_thread_var *mysys_var= my_thread_var;
+  Request_iterator it(m_requests);
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
-  if (context->has_global_shared_lock)
+  if (m_has_global_shared_lock)
   {
     my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
     return TRUE;
@@ -823,83 +922,76 @@ bool mdl_acquire_exclusive_locks(MDL_CON
 
   pthread_mutex_lock(&LOCK_mdl);
 
-  old_msg= MDL_ENTER_COND(context, mysys_var);
+  old_msg= MDL_ENTER_COND(m_thd, mysys_var);
 
-  while ((lock_data= it++))
+  while ((mdl_request= it++))
   {
-    DBUG_ASSERT(lock_data->type == MDL_EXCLUSIVE &&
-                lock_data->state == MDL_INITIALIZED);
-    if (!(lock= (MDL_LOCK*) my_hash_search(&mdl_locks, (uchar*)lock_data->key,
-                                           lock_data->key_length)))
+    MDL_key *key= &mdl_request->key;
+    DBUG_ASSERT(mdl_request->type == MDL_EXCLUSIVE &&
+                mdl_request->ticket == NULL);
+
+    /* Early allocation: ticket is used as a shortcut to the lock. */
+    if (!(ticket= MDL_ticket::create(this, mdl_request->type)))
+      goto err;
+
+    if (!(lock= (MDL_lock*) my_hash_search(&mdl_locks,
+                                           key->ptr(), key->length())))
     {
-      if (!(lock= get_lock_object()))
-        goto err;
-      /*
-        Again before inserting MDL_LOCK into hash provide key for
-        it by adding MDL_LOCK_DATA to one of its lists.
-      */
-      lock->waiting_exclusive.push_front(lock_data);
-      lock->lock_data_count= 1;
-      if (my_hash_insert(&mdl_locks, (uchar*)lock))
+      lock= MDL_lock::create(key);
+      if (!lock || my_hash_insert(&mdl_locks, (uchar*)lock))
       {
-        release_lock_object(lock);
+        MDL_ticket::destroy(ticket);
+        MDL_lock::destroy(lock);
         goto err;
       }
-      lock_data->lock= lock;
-      lock_data->state= MDL_PENDING;
-    }
-    else
-    {
-      lock->waiting_exclusive.push_front(lock_data);
-      lock->lock_data_count++;
-      lock_data->lock= lock;
-      lock_data->state= MDL_PENDING;
     }
+
+    mdl_request->ticket= ticket;
+    lock->waiting.push_front(ticket);
+    ticket->m_lock= lock;
   }
 
   while (1)
   {
     it.rewind();
-    while ((lock_data= it++))
+    while ((mdl_request= it++))
     {
-      lock= lock_data->lock;
+      lock= mdl_request->ticket->m_lock;
 
-      if (!can_grant_global_lock(lock_data))
+      if (!global_lock.is_lock_type_compatible(mdl_request->type, FALSE))
       {
         /*
-          There is an active or pending global shared lock so we have
-          to wait until it goes away.
+          Someone owns or wants to acquire the global shared lock so
+          we have to wait until he goes away.
         */
         signalled= TRUE;
         break;
       }
-      else if (!can_grant_lock(lock, lock_data))
+      else if (!lock->can_grant_lock(this, mdl_request->type, FALSE))
       {
-        MDL_LOCK_DATA *conf_lock_data;
-        I_P_List_iterator<MDL_LOCK_DATA,
-                          MDL_LOCK_DATA_lock> it(lock->active_shared);
-
-        signalled= !lock->active_exclusive.is_empty() ||
-                   !lock->active_shared_waiting_upgrade.is_empty();
-
-        while ((conf_lock_data= it++))
-        {
-          signalled|=
-            mysql_notify_thread_having_shared_lock(context->thd,
-                                                   conf_lock_data->ctx->thd);
-        }
+        MDL_ticket *conflicting_ticket;
+        MDL_lock::Ticket_iterator it(lock->granted);
+
+        signalled= (lock->type == MDL_lock::MDL_LOCK_EXCLUSIVE);
+
+        while ((conflicting_ticket= it++))
+          signalled|= notify_shared_lock(m_thd, conflicting_ticket);
 
         break;
       }
     }
-    if (!lock_data)
+    if (!mdl_request)
       break;
+
+    /* There is a shared or exclusive lock on the object. */
+    DEBUG_SYNC(m_thd, "mdl_acquire_exclusive_locks_wait");
+
     if (signalled)
       pthread_cond_wait(&COND_mdl, &LOCK_mdl);
     else
     {
       /*
-        Another thread obtained shared MDL-lock on some table but
+        Another thread obtained a shared MDL lock on some table but
         has not yet opened it and/or tried to obtain data lock on
         it. In this case we need to wait until this happens and try
         to abort this thread once again.
@@ -912,19 +1004,22 @@ bool mdl_acquire_exclusive_locks(MDL_CON
       goto err;
   }
   it.rewind();
-  while ((lock_data= it++))
+  while ((mdl_request= it++))
   {
     global_lock.active_intention_exclusive++;
-    lock= lock_data->lock;
-    lock->waiting_exclusive.remove(lock_data);
-    lock->active_exclusive.push_front(lock_data);
-    lock_data->state= MDL_ACQUIRED;
+    ticket= mdl_request->ticket;
+    lock= ticket->m_lock;
+    lock->type= MDL_lock::MDL_LOCK_EXCLUSIVE;
+    lock->waiting.remove(ticket);
+    lock->granted.push_front(ticket);
+    m_tickets.push_front(ticket);
+    ticket->m_state= MDL_ACQUIRED;
     if (lock->cached_object)
       (*lock->cached_object_release_hook)(lock->cached_object);
     lock->cached_object= NULL;
   }
   /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-  MDL_EXIT_COND(context, mysys_var, old_msg);
+  MDL_EXIT_COND(m_thd, mysys_var, old_msg);
   return FALSE;
 
 err:
@@ -933,96 +1028,87 @@ err:
     Ignore those lock requests which were not made MDL_PENDING.
   */
   it.rewind();
-  while ((lock_data= it++) && lock_data->state == MDL_PENDING)
+  while ((mdl_request= it++) && mdl_request->ticket)
   {
-    release_lock(lock_data);
-    lock_data->state= MDL_INITIALIZED;
+    ticket= mdl_request->ticket;
+    DBUG_ASSERT(ticket->m_state == MDL_PENDING);
+    lock= ticket->m_lock;
+    lock->waiting.remove(ticket);
+    MDL_ticket::destroy(ticket);
+    /* Reset lock request back to its initial state. */
+    mdl_request->ticket= NULL;
+    if (lock->is_empty())
+    {
+      my_hash_delete(&mdl_locks, (uchar *)lock);
+      MDL_lock::destroy(lock);
+    }
   }
   /* May be some pending requests for shared locks can be satisfied now. */
   pthread_cond_broadcast(&COND_mdl);
-  MDL_EXIT_COND(context, mysys_var, old_msg);
+  MDL_EXIT_COND(m_thd, mysys_var, old_msg);
   return TRUE;
 }
 
 
 /**
-   Upgrade a shared metadata lock to exclusive.
+  Upgrade a shared metadata lock to exclusive.
 
-   Used in ALTER TABLE, when a copy of the table with the
-   new definition has been constructed.
+  Used in ALTER TABLE, when a copy of the table with the
+  new definition has been constructed.
 
-   @param context   Context to which shared lock belongs
-   @param lock_data Satisfied request for shared lock to be upgraded
+  @note In case of failure to upgrade lock (e.g. because upgrader
+        was killed) leaves lock in its original state (locked in
+        shared mode).
 
-   @note In case of failure to upgrade lock (e.g. because upgrader
-         was killed) leaves lock in its original state (locked in
-         shared mode).
+  @note There can be only one upgrader for a lock or we will have deadlock.
+        This invariant is ensured by code outside of metadata subsystem usually
+        by obtaining some sort of exclusive table-level lock (e.g. TL_WRITE,
+        TL_WRITE_ALLOW_READ) before performing upgrade of metadata lock.
 
-   @retval FALSE  Success
-   @retval TRUE   Failure (thread was killed)
+  @retval FALSE  Success
+  @retval TRUE   Failure (thread was killed)
 */
 
-bool mdl_upgrade_shared_lock_to_exclusive(MDL_CONTEXT *context,
-                                          MDL_LOCK_DATA *lock_data)
+bool
+MDL_ticket::upgrade_shared_lock_to_exclusive()
 {
-  MDL_LOCK *lock;
   const char *old_msg;
   st_my_thread_var *mysys_var= my_thread_var;
+  THD *thd= m_ctx->get_thd();
 
-  DBUG_ENTER("mdl_upgrade_shared_lock_to_exclusive");
+  DBUG_ENTER("MDL_ticket::upgrade_shared_lock_to_exclusive");
+  DEBUG_SYNC(thd, "mdl_upgrade_shared_lock_to_exclusive");
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED);
-
   /* Allow this function to be called twice for the same lock request. */
-  if (lock_data->type == MDL_EXCLUSIVE)
+  if (m_type == MDL_EXCLUSIVE)
     DBUG_RETURN(FALSE);
 
-  DBUG_ASSERT(lock_data->type == MDL_SHARED_UPGRADABLE);
-
-  lock= lock_data->lock;
-
   pthread_mutex_lock(&LOCK_mdl);
 
-  old_msg= MDL_ENTER_COND(context, mysys_var);
+  old_msg= MDL_ENTER_COND(thd, mysys_var);
 
-  lock_data->state= MDL_PENDING_UPGRADE;
-  /* Set type of lock request to the type at which we are aiming. */
-  lock_data->type= MDL_EXCLUSIVE;
-  lock->active_shared.remove(lock_data);
-  /*
-    There can be only one upgrader for this lock or we will have deadlock.
-    This invariant is ensured by code outside of metadata subsystem usually
-    by obtaining some sort of exclusive table-level lock (e.g. TL_WRITE,
-    TL_WRITE_ALLOW_READ) before performing upgrade of metadata lock.
-  */
-  DBUG_ASSERT(lock->active_shared_waiting_upgrade.is_empty());
-  lock->active_shared_waiting_upgrade.push_front(lock_data);
 
   /*
-    Since we should have been already acquired intention exclusive global lock
-    this call is only enforcing asserts.
+    Since we should have already acquired an intention exclusive
+    global lock this call is only enforcing asserts.
   */
-  DBUG_ASSERT(can_grant_global_lock(lock_data));
+  DBUG_ASSERT(global_lock.is_lock_type_compatible(MDL_EXCLUSIVE, TRUE));
 
   while (1)
   {
-    if (can_grant_lock(lock, lock_data))
+    if (m_lock->can_grant_lock(m_ctx, MDL_EXCLUSIVE, TRUE))
       break;
 
     bool signalled= FALSE;
-    MDL_LOCK_DATA *conf_lock_data;
-    I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_lock> it(lock->active_shared);
+    MDL_ticket *conflicting_ticket;
+    MDL_lock::Ticket_iterator it(m_lock->granted);
 
-    while ((conf_lock_data= it++))
+    while ((conflicting_ticket= it++))
     {
-      if (conf_lock_data->ctx != context)
-      {
-        signalled|=
-          mysql_notify_thread_having_shared_lock(context->thd,
-                                                 conf_lock_data->ctx->thd);
-      }
+      if (conflicting_ticket->m_ctx != m_ctx)
+        signalled|= notify_shared_lock(thd, conflicting_ticket);
     }
 
     if (signalled)
@@ -1030,7 +1116,7 @@ bool mdl_upgrade_shared_lock_to_exclusiv
     else
     {
       /*
-        Another thread obtained shared MDL-lock on some table but
+        Another thread obtained a shared MDL lock on some table but
         has not yet opened it and/or tried to obtain data lock on
         it. In this case we need to wait until this happens and try
         to abort this thread once again.
@@ -1042,62 +1128,59 @@ bool mdl_upgrade_shared_lock_to_exclusiv
     }
     if (mysys_var->abort)
     {
-      lock_data->state= MDL_ACQUIRED;
-      lock_data->type= MDL_SHARED_UPGRADABLE;
-      lock->active_shared_waiting_upgrade.remove(lock_data);
-      lock->active_shared.push_front(lock_data);
       /* Pending requests for shared locks can be satisfied now. */
       pthread_cond_broadcast(&COND_mdl);
-      MDL_EXIT_COND(context, mysys_var, old_msg);
+      MDL_EXIT_COND(thd, mysys_var, old_msg);
       DBUG_RETURN(TRUE);
     }
   }
 
-  lock->active_shared_waiting_upgrade.remove(lock_data);
-  lock->active_exclusive.push_front(lock_data);
-  lock_data->state= MDL_ACQUIRED;
-  if (lock->cached_object)
-    (*lock->cached_object_release_hook)(lock->cached_object);
-  lock->cached_object= 0;
+  m_lock->type= MDL_lock::MDL_LOCK_EXCLUSIVE;
+  /* Set the new type of lock in the ticket. */
+  m_type= MDL_EXCLUSIVE;
+  if (m_lock->cached_object)
+    (*m_lock->cached_object_release_hook)(m_lock->cached_object);
+  m_lock->cached_object= 0;
 
   /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-  MDL_EXIT_COND(context, mysys_var, old_msg);
+  MDL_EXIT_COND(thd, mysys_var, old_msg);
   DBUG_RETURN(FALSE);
 }
 
 
 /**
-   Try to acquire an exclusive lock on the object if there are
-   no conflicting locks.
+  Try to acquire an exclusive lock on the object if there are
+  no conflicting locks.
 
-   Similar to the previous function, but returns
-   immediately without any side effect if encounters a lock
-   conflict. Otherwise takes the lock.
+  Similar to the previous function, but returns
+  immediately without any side effect if encounters a lock
+  conflict. Otherwise takes the lock.
 
-   This function is used in CREATE TABLE ... LIKE to acquire a lock
-   on the table to be created. In this statement we don't want to
-   block and wait for the lock if the table already exists.
+  This function is used in CREATE TABLE ... LIKE to acquire a lock
+  on the table to be created. In this statement we don't want to
+  block and wait for the lock if the table already exists.
 
-   @param context  [in]  The context containing the lock request
-   @param lock     [in]  The lock request
-   @param conflict [out] Indicates that conflicting lock exists
+  @param mdl_request [in] The lock request
+  @param conflict   [out] Indicates that conflicting lock exists
 
-   @retval TRUE  Failure either conflicting lock exists or some error
-                 occured (probably OOM).
-   @retval FALSE Success, lock was acquired.
+  @retval TRUE  Failure either conflicting lock exists or some error
+                occured (probably OOM).
+  @retval FALSE Success, lock was acquired.
 
-   FIXME: Compared to lock_table_name_if_not_cached()
-          it gives sligthly more false negatives.
+  FIXME: Compared to lock_table_name_if_not_cached()
+         it gives slightly more false negatives.
 */
 
-bool mdl_try_acquire_exclusive_lock(MDL_CONTEXT *context,
-                                    MDL_LOCK_DATA *lock_data,
-                                    bool *conflict)
+bool
+MDL_context::try_acquire_exclusive_lock(MDL_request *mdl_request,
+                                        bool *conflict)
 {
-  MDL_LOCK *lock;
+  MDL_lock *lock;
+  MDL_ticket *ticket;
+  MDL_key *key= &mdl_request->key;
 
-  DBUG_ASSERT(lock_data->type == MDL_EXCLUSIVE &&
-              lock_data->state == MDL_INITIALIZED);
+  DBUG_ASSERT(mdl_request->type == MDL_EXCLUSIVE &&
+              mdl_request->ticket == NULL);
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
@@ -1105,20 +1188,23 @@ bool mdl_try_acquire_exclusive_lock(MDL_
 
   pthread_mutex_lock(&LOCK_mdl);
 
-  if (!(lock= (MDL_LOCK*) my_hash_search(&mdl_locks, (uchar*)lock_data->key,
-                                         lock_data->key_length)))
+  if (!(lock= (MDL_lock*) my_hash_search(&mdl_locks,
+                                         key->ptr(), key->length())))
   {
-    if (!(lock= get_lock_object()))
-      goto err;
-    lock->active_exclusive.push_front(lock_data);
-    lock->lock_data_count= 1;
-    if (my_hash_insert(&mdl_locks, (uchar*)lock))
+    ticket= MDL_ticket::create(this, mdl_request->type);
+    lock= MDL_lock::create(key);
+    if (!ticket || !lock || my_hash_insert(&mdl_locks, (uchar*)lock))
     {
-      release_lock_object(lock);
+      MDL_ticket::destroy(ticket);
+      MDL_lock::destroy(lock);
       goto err;
     }
-    lock_data->state= MDL_ACQUIRED;
-    lock_data->lock= lock;
+    mdl_request->ticket= ticket;
+    lock->type= MDL_lock::MDL_LOCK_EXCLUSIVE;
+    lock->granted.push_front(ticket);
+    m_tickets.push_front(ticket);
+    ticket->m_state= MDL_ACQUIRED;
+    ticket->m_lock= lock;
     global_lock.active_intention_exclusive++;
     pthread_mutex_unlock(&LOCK_mdl);
     return FALSE;
@@ -1134,29 +1220,27 @@ err:
 
 
 /**
-   Acquire global shared metadata lock.
+  Acquire the global shared metadata lock.
 
-   Holding this lock will block all requests for exclusive locks
-   and shared locks which can be potentially upgraded to exclusive.
+  Holding this lock will block all requests for exclusive locks
+  and shared locks which can be potentially upgraded to exclusive.
 
-   @param context Current metadata locking context.
-
-   @retval FALSE Success -- the lock was granted.
-   @retval TRUE  Failure -- our thread was killed.
+  @retval FALSE Success -- the lock was granted.
+  @retval TRUE  Failure -- our thread was killed.
 */
 
-bool mdl_acquire_global_shared_lock(MDL_CONTEXT *context)
+bool MDL_context::acquire_global_shared_lock()
 {
   st_my_thread_var *mysys_var= my_thread_var;
   const char *old_msg;
 
   safe_mutex_assert_not_owner(&LOCK_open);
-  DBUG_ASSERT(!context->has_global_shared_lock);
+  DBUG_ASSERT(!m_has_global_shared_lock);
 
   pthread_mutex_lock(&LOCK_mdl);
 
   global_lock.waiting_shared++;
-  old_msg= MDL_ENTER_COND(context, mysys_var);
+  old_msg= MDL_ENTER_COND(m_thd, mysys_var);
 
   while (!mysys_var->abort && global_lock.active_intention_exclusive)
     pthread_cond_wait(&COND_mdl, &LOCK_mdl);
@@ -1165,37 +1249,36 @@ bool mdl_acquire_global_shared_lock(MDL_
   if (mysys_var->abort)
   {
     /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-    MDL_EXIT_COND(context, mysys_var, old_msg);
+    MDL_EXIT_COND(m_thd, mysys_var, old_msg);
     return TRUE;
   }
   global_lock.active_shared++;
-  context->has_global_shared_lock= TRUE;
+  m_has_global_shared_lock= TRUE;
   /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-  MDL_EXIT_COND(context, mysys_var, old_msg);
+  MDL_EXIT_COND(m_thd, mysys_var, old_msg);
   return FALSE;
 }
 
 
 /**
-   Wait until there will be no locks that conflict with lock requests
-   in the context.
-
-   This is a part of the locking protocol and must be used by the
-   acquirer of shared locks after a back-off.
+  Wait until there will be no locks that conflict with lock requests
+  in the context.
 
-   Does not acquire the locks!
+  This is a part of the locking protocol and must be used by the
+  acquirer of shared locks after a back-off.
 
-   @param context Context with which lock requests are associated.
+  Does not acquire the locks!
 
-   @retval FALSE  Success. One can try to obtain metadata locks.
-   @retval TRUE   Failure (thread was killed)
+  @retval FALSE  Success. One can try to obtain metadata locks.
+  @retval TRUE   Failure (thread was killed)
 */
 
-bool mdl_wait_for_locks(MDL_CONTEXT *context)
+bool
+MDL_context::wait_for_locks()
 {
-  MDL_LOCK_DATA *lock_data;
-  MDL_LOCK *lock;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
+  MDL_lock *lock;
+  MDL_request *mdl_request;
+  Request_iterator it(m_requests);
   const char *old_msg;
   st_my_thread_var *mysys_var= my_thread_var;
 
@@ -1213,92 +1296,83 @@ bool mdl_wait_for_locks(MDL_CONTEXT *con
       TODO: investigate situations in which we need to broadcast on
             COND_mdl because of above scenario.
     */
-    mysql_ha_flush(context->thd);
+    mysql_ha_flush(m_thd);
     pthread_mutex_lock(&LOCK_mdl);
-    old_msg= MDL_ENTER_COND(context, mysys_var);
+    old_msg= MDL_ENTER_COND(m_thd, mysys_var);
     it.rewind();
-    while ((lock_data= it++))
+    while ((mdl_request= it++))
     {
-      DBUG_ASSERT(lock_data->state == MDL_INITIALIZED);
-      if (!can_grant_global_lock(lock_data))
+      MDL_key *key= &mdl_request->key;
+      DBUG_ASSERT(mdl_request->ticket == NULL);
+      if (!global_lock.is_lock_type_compatible(mdl_request->type, FALSE))
         break;
       /*
         To avoid starvation we don't wait if we have a conflict against
         request for MDL_EXCLUSIVE lock.
       */
-      if (is_shared(lock_data) &&
-          (lock= (MDL_LOCK*) my_hash_search(&mdl_locks, (uchar*)lock_data->key,
-                                            lock_data->key_length)) &&
-          !can_grant_lock(lock, lock_data))
+      if (mdl_request->is_shared() &&
+          (lock= (MDL_lock*) my_hash_search(&mdl_locks, key->ptr(),
+                                            key->length())) &&
+          !lock->can_grant_lock(this, mdl_request->type, FALSE))
         break;
     }
-    if (!lock_data)
+    if (!mdl_request)
     {
       pthread_mutex_unlock(&LOCK_mdl);
       break;
     }
     pthread_cond_wait(&COND_mdl, &LOCK_mdl);
     /* As a side-effect MDL_EXIT_COND() unlocks LOCK_mdl. */
-    MDL_EXIT_COND(context, mysys_var, old_msg);
+    MDL_EXIT_COND(m_thd, mysys_var, old_msg);
   }
   return mysys_var->abort;
 }
 
 
 /**
-   Auxiliary function which allows to release particular lock
-   ownership of which is represented by lock request object.
+  Auxiliary function which allows to release particular lock
+  ownership of which is represented by a lock ticket object.
 */
 
-static void release_lock(MDL_LOCK_DATA *lock_data)
+void MDL_context::release_ticket(MDL_ticket *ticket)
 {
-  MDL_LOCK *lock;
+  MDL_lock *lock= ticket->m_lock;
+  DBUG_ENTER("release_ticket");
+  DBUG_PRINT("enter", ("db=%s name=%s", lock->key.db_name(),
+                                        lock->key.table_name()));
 
-  DBUG_ENTER("release_lock");
-  DBUG_PRINT("enter", ("db=%s name=%s", lock_data->key + 4,
-                        lock_data->key + 4 + strlen(lock_data->key + 4) + 1));
+  safe_mutex_assert_owner(&LOCK_mdl);
 
-  DBUG_ASSERT(lock_data->state == MDL_PENDING ||
-              lock_data->state == MDL_ACQUIRED);
+  m_tickets.remove(ticket);
 
-  lock= lock_data->lock;
-  if (lock->has_one_lock_data())
+  switch (ticket->m_type)
+  {
+    case MDL_SHARED_UPGRADABLE:
+      global_lock.active_intention_exclusive--;
+      /* Fallthrough. */
+    case MDL_SHARED:
+    case MDL_SHARED_HIGH_PRIO:
+      lock->granted.remove(ticket);
+      break;
+    case MDL_EXCLUSIVE:
+      lock->type= MDL_lock::MDL_LOCK_SHARED;
+      lock->granted.remove(ticket);
+      global_lock.active_intention_exclusive--;
+      break;
+    default:
+      DBUG_ASSERT(0);
+  }
+
+  MDL_ticket::destroy(ticket);
+
+  if (lock->is_empty())
   {
     my_hash_delete(&mdl_locks, (uchar *)lock);
     DBUG_PRINT("info", ("releasing cached_object cached_object=%p",
                         lock->cached_object));
     if (lock->cached_object)
       (*lock->cached_object_release_hook)(lock->cached_object);
-    release_lock_object(lock);
-    if (lock_data->state == MDL_ACQUIRED &&
-        (lock_data->type == MDL_EXCLUSIVE ||
-         lock_data->type == MDL_SHARED_UPGRADABLE))
-      global_lock.active_intention_exclusive--;
-  }
-  else
-  {
-    switch (lock_data->type)
-    {
-      case MDL_SHARED_UPGRADABLE:
-        global_lock.active_intention_exclusive--;
-        /* Fallthrough. */
-      case MDL_SHARED:
-      case MDL_SHARED_HIGH_PRIO:
-        lock->active_shared.remove(lock_data);
-        break;
-      case MDL_EXCLUSIVE:
-        if (lock_data->state == MDL_PENDING)
-          lock->waiting_exclusive.remove(lock_data);
-        else
-        {
-          lock->active_exclusive.remove(lock_data);
-          global_lock.active_intention_exclusive--;
-        }
-        break;
-      default:
-        DBUG_ASSERT(0);
-    }
-    lock->lock_data_count--;
+    MDL_lock::destroy(lock);
   }
 
   DBUG_VOID_RETURN;
@@ -1306,324 +1380,323 @@ static void release_lock(MDL_LOCK_DATA *
 
 
 /**
-   Release all locks associated with the context, but leave them
-   in the context as lock requests.
+  Release all locks associated with the context, but leave them
+  in the context as lock requests.
 
-   This function is used to back off in case of a lock conflict.
-   It is also used to release shared locks in the end of an SQL
-   statement.
-
-   @param context The context with which the locks to be released
-                  are associated.
+  This function is used to back off in case of a lock conflict.
+  It is also used to release shared locks in the end of an SQL
+  statement.
 */
 
-void mdl_release_locks(MDL_CONTEXT *context)
+void MDL_context::release_all_locks()
 {
-  MDL_LOCK_DATA *lock_data;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
-  DBUG_ENTER("mdl_release_locks");
+  MDL_ticket *ticket;
+  Ticket_iterator it(m_tickets);
+  DBUG_ENTER("MDL_context::release_all_locks");
 
   safe_mutex_assert_not_owner(&LOCK_open);
 
+  /* Detach lock tickets from the requests for back off. */
+  {
+    MDL_request *mdl_request;
+    Request_iterator it(m_requests);
+
+    while ((mdl_request= it++))
+      mdl_request->ticket= NULL;
+  }
+
+  if (m_tickets.is_empty())
+    DBUG_VOID_RETURN;
+
   pthread_mutex_lock(&LOCK_mdl);
-  while ((lock_data= it++))
+  while ((ticket= it++))
   {
-    DBUG_PRINT("info", ("found lock to release lock_data=%p", lock_data));
-    /*
-      Don't call release_lock() for a shared lock if has not been
-      granted. Lock state in this case is MDL_INITIALIZED.
-      We have pending and granted shared locks in the same context
-      when this function is called from the "back-off" path of
-      open_tables().
-    */
-    if (lock_data->state != MDL_INITIALIZED)
-    {
-      release_lock(lock_data);
-      lock_data->state= MDL_INITIALIZED;
-#ifndef DBUG_OFF
-      lock_data->lock= 0;
-#endif
-    }
-    /*
-      We will return lock request to its initial state only in
-      mdl_remove_all_locks() since we need to know type of lock
-      request in mdl_wait_for_locks().
-    */
+    DBUG_PRINT("info", ("found lock to release ticket=%p", ticket));
+    release_ticket(ticket);
   }
   /* Inefficient but will do for a while */
   pthread_cond_broadcast(&COND_mdl);
   pthread_mutex_unlock(&LOCK_mdl);
+
+  m_tickets.empty();
+
   DBUG_VOID_RETURN;
 }
 
 
 /**
-   Release a lock.
-
-   @param context   Context containing lock in question
-   @param lock_data Lock to be released
+  Release a lock.
 
+  @param ticket    Lock to be released
 */
 
-void mdl_release_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data)
+void MDL_context::release_lock(MDL_ticket *ticket)
 {
+  DBUG_ASSERT(this == ticket->m_ctx);
   safe_mutex_assert_not_owner(&LOCK_open);
 
   pthread_mutex_lock(&LOCK_mdl);
-  release_lock(lock_data);
-#ifndef DBUG_OFF
-  lock_data->lock= 0;
-#endif
-  lock_data->state= MDL_INITIALIZED;
+  release_ticket(ticket);
   pthread_cond_broadcast(&COND_mdl);
   pthread_mutex_unlock(&LOCK_mdl);
 }
 
 
 /**
-   Release all locks in the context which correspond to the same name/
-   object as this lock request, remove lock requests from the context.
+  Release all locks in the context which correspond to the same name/
+  object as this lock request, remove lock requests from the context.
 
-   @param context   Context containing locks in question
-   @param lock_data One of the locks for the name/object for which all
-                    locks should be released.
+  @param ticket    One of the locks for the name/object for which all
+                   locks should be released.
 */
 
-void mdl_release_and_remove_all_locks_for_name(MDL_CONTEXT *context,
-                                               MDL_LOCK_DATA *lock_data)
+void MDL_context::release_all_locks_for_name(MDL_ticket *name)
 {
-  MDL_LOCK *lock;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
+  /* Use MDL_ticket::lock to identify other locks for the same object. */
+  MDL_lock *lock= name->m_lock;
 
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED);
+  /* Remove matching lock requests from the context. */
+  MDL_request *mdl_request;
+  Request_iterator it_mdl_request(m_requests);
 
-  /*
-    We can use MDL_LOCK_DATA::lock here to identify other locks for the same
-    object since even altough MDL_LOCK object might be reused for different
-    lock after the first lock for this object have been released we can't
-    have references to this other MDL_LOCK object in this context.
-  */
-  lock= lock_data->lock;
+  while ((mdl_request= it_mdl_request++))
+  {
+    DBUG_ASSERT(mdl_request->ticket &&
+                mdl_request->ticket->m_state == MDL_ACQUIRED);
+
+    if (mdl_request->ticket->m_lock == lock)
+      remove_request(mdl_request);
+  }
+
+  /* Remove matching lock tickets from the context. */
+  MDL_ticket *ticket;
+  Ticket_iterator it_ticket(m_tickets);
 
-  while ((lock_data= it++))
+  while ((ticket= it_ticket++))
   {
-    DBUG_ASSERT(lock_data->state == MDL_ACQUIRED);
-    if (lock_data->lock == lock)
-    {
-      mdl_release_lock(context, lock_data);
-      mdl_remove_lock(context, lock_data);
-    }
+    DBUG_ASSERT(ticket->m_state == MDL_ACQUIRED);
+    /*
+      We rarely have more than one ticket in this loop,
+      let's not bother saving on pthread_cond_broadcast().
+    */
+    if (ticket->m_lock == lock)
+      release_lock(ticket);
   }
 }
 
 
 /**
-   Downgrade an exclusive lock to shared metadata lock.
-
-   @param context   A context to which exclusive lock belongs
-   @param lock_data Satisfied request for exclusive lock to be downgraded
+  Downgrade an exclusive lock to shared metadata lock.
 */
 
-void mdl_downgrade_exclusive_lock(MDL_CONTEXT *context,
-                                  MDL_LOCK_DATA *lock_data)
+void MDL_ticket::downgrade_exclusive_lock()
 {
-  MDL_LOCK *lock;
-
   safe_mutex_assert_not_owner(&LOCK_open);
 
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED);
-
-  if (is_shared(lock_data))
+  if (is_shared())
     return;
 
-  lock= lock_data->lock;
-
   pthread_mutex_lock(&LOCK_mdl);
-  lock->active_exclusive.remove(lock_data);
-  lock_data->type= MDL_SHARED_UPGRADABLE;
-  lock->active_shared.push_front(lock_data);
+  m_lock->type= MDL_lock::MDL_LOCK_SHARED;
+  m_type= MDL_SHARED_UPGRADABLE;
   pthread_cond_broadcast(&COND_mdl);
   pthread_mutex_unlock(&LOCK_mdl);
 }
 
 
 /**
-   Release global shared metadata lock.
-
-   @param context Current context
+  Release the global shared metadata lock.
 */
 
-void mdl_release_global_shared_lock(MDL_CONTEXT *context)
+void MDL_context::release_global_shared_lock()
 {
   safe_mutex_assert_not_owner(&LOCK_open);
-  DBUG_ASSERT(context->has_global_shared_lock);
+  DBUG_ASSERT(m_has_global_shared_lock);
 
   pthread_mutex_lock(&LOCK_mdl);
   global_lock.active_shared--;
-  context->has_global_shared_lock= FALSE;
+  m_has_global_shared_lock= FALSE;
   pthread_cond_broadcast(&COND_mdl);
   pthread_mutex_unlock(&LOCK_mdl);
 }
 
 
 /**
-   Auxiliary function which allows to check if we have exclusive lock
-   on the object.
+  Auxiliary function which allows to check if we have exclusive lock
+  on the object.
 
-   @param context Current context
-   @param type    Id of object type
-   @param db      Name of the database
-   @param name    Name of the object
+  @param type    Id of object type
+  @param db      Name of the database
+  @param name    Name of the object
 
-   @return TRUE if current context contains exclusive lock for the object,
-           FALSE otherwise.
+  @return TRUE if current context contains exclusive lock for the object,
+          FALSE otherwise.
 */
 
-bool mdl_is_exclusive_lock_owner(MDL_CONTEXT *context, int type,
-                                 const char *db, const char *name)
+bool
+MDL_context::is_exclusive_lock_owner(unsigned char type,
+                                     const char *db, const char *name)
 {
-  char key[MAX_MDLKEY_LENGTH];
-  uint key_length;
-  MDL_LOCK_DATA *lock_data;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
+  MDL_key key(type, db, name);
+  MDL_ticket *ticket;
+  MDL_context::Ticket_iterator it(m_tickets);
 
-  int4store(key, type);
-  key_length= (uint) (strmov(strmov(key+4, db)+1, name)-key)+1;
+  while ((ticket= it++))
+  {
+    if (ticket->m_lock->type == MDL_lock::MDL_LOCK_EXCLUSIVE &&
+        ticket->m_lock->key.is_equal(&key))
+      break;
+  }
 
-  while ((lock_data= it++) &&
-         (lock_data->key_length != key_length ||
-          memcmp(lock_data->key, key, key_length) ||
-          !(lock_data->type == MDL_EXCLUSIVE &&
-            lock_data->state == MDL_ACQUIRED)))
-    continue;
-  return lock_data;
+  return ticket;
 }
 
 
 /**
-   Auxiliary function which allows to check if we some kind of lock on
-   the object.
+  Auxiliary function which allows to check if we have some kind of lock on
+  a object.
 
-   @param context Current context
-   @param type    Id of object type
-   @param db      Name of the database
-   @param name    Name of the object
+  @param type    Id of object type
+  @param db      Name of the database
+  @param name    Name of the object
 
-   @return TRUE if current context contains satisfied lock for the object,
-           FALSE otherwise.
+  @return TRUE if current context contains satisfied lock for the object,
+          FALSE otherwise.
 */
 
-bool mdl_is_lock_owner(MDL_CONTEXT *context, int type, const char *db,
-                        const char *name)
+bool
+MDL_context::is_lock_owner(unsigned char type,
+                           const char *db, const char *name)
 {
-  char key[MAX_MDLKEY_LENGTH];
-  uint key_length;
-  MDL_LOCK_DATA *lock_data;
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> it(context->locks);
-
-  int4store(key, type);
-  key_length= (uint) (strmov(strmov(key+4, db)+1, name)-key)+1;
+  MDL_key key(type, db, name);
+  MDL_ticket *ticket;
+  MDL_context::Ticket_iterator it(m_tickets);
 
-  while ((lock_data= it++) &&
-         (lock_data->key_length != key_length ||
-          memcmp(lock_data->key, key, key_length) ||
-          lock_data->state != MDL_ACQUIRED))
-    continue;
+  while ((ticket= it++))
+  {
+    if (ticket->m_lock->key.is_equal(&key))
+      break;
+  }
 
-  return lock_data;
+  return ticket;
 }
 
 
 /**
-   Check if we have any pending exclusive locks which conflict with
-   existing shared lock.
+  Check if we have any pending exclusive locks which conflict with
+  existing shared lock.
 
-   @param lock_data Shared lock against which check should be performed.
+  @pre The ticket must match an acquired lock.
 
-   @return TRUE if there are any conflicting locks, FALSE otherwise.
+  @param ticket Shared lock against which check should be performed.
+
+  @return TRUE if there are any conflicting locks, FALSE otherwise.
 */
 
-bool mdl_has_pending_conflicting_lock(MDL_LOCK_DATA *lock_data)
+bool MDL_ticket::has_pending_conflicting_lock() const
 {
   bool result;
 
-  DBUG_ASSERT(is_shared(lock_data) && lock_data->state == MDL_ACQUIRED);
+  DBUG_ASSERT(is_shared());
   safe_mutex_assert_not_owner(&LOCK_open);
 
   pthread_mutex_lock(&LOCK_mdl);
-  result= !(lock_data->lock->waiting_exclusive.is_empty() &&
-            lock_data->lock->active_shared_waiting_upgrade.is_empty());
+  result= !m_lock->waiting.is_empty();
   pthread_mutex_unlock(&LOCK_mdl);
   return result;
 }
 
 
 /**
-   Associate pointer to an opaque object with a lock.
-
-   @param lock_data     Lock request for the lock with which the
-                        object should be associated.
-   @param cached_object Pointer to the object
-   @param release_hook  Cleanup function to be called when MDL subsystem
-                        decides to remove lock or associate another object.
-
-   This is used to cache a pointer to TABLE_SHARE in the lock
-   structure. Such caching can save one acquisition of LOCK_open
-   and one table definition cache lookup for every table.
-
-   Since the pointer may be stored only inside an acquired lock,
-   the caching is only effective when there is more than one lock
-   granted on a given table.
-
-   This function has the following usage pattern:
-     - try to acquire an MDL lock
-     - when done, call for mdl_get_cached_object(). If it returns NULL, our
-       thread has the only lock on this table.
-     - look up TABLE_SHARE in the table definition cache
-     - call mdl_set_cache_object() to assign the share to the opaque pointer.
+  Associate pointer to an opaque object with a lock.
 
-  The release hook is invoked when the last shared metadata
-  lock on this name is released.
-*/
-
-void mdl_set_cached_object(MDL_LOCK_DATA *lock_data, void *cached_object,
-                           mdl_cached_object_release_hook release_hook)
+  @param cached_object Pointer to the object
+  @param release_hook  Cleanup function to be called when MDL subsystem
+                       decides to remove lock or associate another object.
+
+  This is used to cache a pointer to TABLE_SHARE in the lock
+  structure. Such caching can save one acquisition of LOCK_open
+  and one table definition cache lookup for every table.
+
+  Since the pointer may be stored only inside an acquired lock,
+  the caching is only effective when there is more than one lock
+  granted on a given table.
+
+  This function has the following usage pattern:
+    - try to acquire an MDL lock
+    - when done, call for mdl_get_cached_object(). If it returns NULL, our
+      thread has the only lock on this table.
+    - look up TABLE_SHARE in the table definition cache
+    - call mdl_set_cache_object() to assign the share to the opaque pointer.
+
+ The release hook is invoked when the last shared metadata
+ lock on this name is released.
+*/
+
+void
+MDL_ticket::set_cached_object(void *cached_object,
+                              mdl_cached_object_release_hook release_hook)
 {
   DBUG_ENTER("mdl_set_cached_object");
-  DBUG_PRINT("enter", ("db=%s name=%s cached_object=%p", lock_data->key + 4,
-                       lock_data->key + 4 + strlen(lock_data->key + 4) + 1,
-                       cached_object));
-
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED ||
-              lock_data->state == MDL_PENDING_UPGRADE);
-
+  DBUG_PRINT("enter", ("db=%s name=%s cached_object=%p",
+                        m_lock->key.db_name(), m_lock->key.table_name(),
+                        cached_object));
   /*
-    TODO: This assumption works now since we do mdl_get_cached_object()
-          and mdl_set_cached_object() in the same critical section. Once
+    TODO: This assumption works now since we do get_cached_object()
+          and set_cached_object() in the same critical section. Once
           this becomes false we will have to call release_hook here and
           use additional mutex protecting 'cached_object' member.
   */
-  DBUG_ASSERT(!lock_data->lock->cached_object);
+  DBUG_ASSERT(!m_lock->cached_object);
 
-  lock_data->lock->cached_object= cached_object;
-  lock_data->lock->cached_object_release_hook= release_hook;
+  m_lock->cached_object= cached_object;
+  m_lock->cached_object_release_hook= release_hook;
 
   DBUG_VOID_RETURN;
 }
 
 
 /**
-   Get a pointer to an opaque object that associated with the lock.
+  Get a pointer to an opaque object that associated with the lock.
 
-   @param  lock_data Lock request for the lock with which the object is
-                     associated.
+  @param ticket  Lock ticket for the lock which the object is associated to.
 
-   @return Pointer to an opaque object associated with the lock.
+  @return Pointer to an opaque object associated with the lock.
 */
 
-void* mdl_get_cached_object(MDL_LOCK_DATA *lock_data)
+void *MDL_ticket::get_cached_object()
 {
-  DBUG_ASSERT(lock_data->state == MDL_ACQUIRED ||
-              lock_data->state == MDL_PENDING_UPGRADE);
-  return lock_data->lock->cached_object;
+  return m_lock->cached_object;
 }
+
+
+/**
+  Releases metadata locks that were acquired after a specific savepoint.
+
+  @note Used to release tickets acquired during a savepoint unit.
+  @note It's safe to iterate and unlock any locks after taken after this
+        savepoint because other statements that take other special locks
+        cause a implicit commit (ie LOCK TABLES).
+
+  @param mdl_savepont  The last acquired MDL lock when the
+                       savepoint was set.
+*/
+
+void MDL_context::rollback_to_savepoint(MDL_ticket *mdl_savepoint)
+{
+  MDL_ticket *ticket;
+  Ticket_iterator it(m_tickets);
+  DBUG_ENTER("MDL_context::rollback_to_savepoint");
+
+  while ((ticket= it++))
+  {
+    /* Stop when lock was acquired before this savepoint. */
+    if (ticket == mdl_savepoint)
+      break;
+    release_lock(ticket);
+  }
+
+  DBUG_VOID_RETURN;
+}
+
+

=== modified file 'sql/mdl.h'
--- a/sql/mdl.h	2008-06-20 13:11:20 +0000
+++ b/sql/mdl.h	2009-03-04 20:29:16 +0000
@@ -23,223 +23,316 @@
 
 class THD;
 
-struct MDL_LOCK_DATA;
-struct MDL_LOCK;
-struct MDL_CONTEXT;
+class MDL_context;
+class MDL_lock;
+class MDL_ticket;
 
 /**
-   Type of metadata lock request.
+  Type of metadata lock request.
 
-   - High-priority shared locks differ from ordinary shared locks by
-     that they ignore pending requests for exclusive locks.
-   - Upgradable shared locks can be later upgraded to exclusive
-     (because of that their acquisition involves implicit
-      acquisition of global intention-exclusive lock).
+  - High-priority shared locks differ from ordinary shared locks by
+    that they ignore pending requests for exclusive locks.
+  - Upgradable shared locks can be later upgraded to exclusive
+    (because of that their acquisition involves implicit
+     acquisition of global intention-exclusive lock).
 
-   @see Comments for can_grant_lock() and can_grant_global_lock() for details.
+  @see Comments for can_grant_lock() and can_grant_global_lock() for details.
 */
 
 enum enum_mdl_type {MDL_SHARED=0, MDL_SHARED_HIGH_PRIO,
                     MDL_SHARED_UPGRADABLE, MDL_EXCLUSIVE};
 
 
-/** States which metadata lock request can have. */
+/** States which a metadata lock ticket can have. */
 
-enum enum_mdl_state {MDL_INITIALIZED=0, MDL_PENDING,
-                     MDL_ACQUIRED, MDL_PENDING_UPGRADE};
+enum enum_mdl_state { MDL_PENDING, MDL_ACQUIRED };
+
+
+/** Maximal length of key for metadata locking subsystem. */
+#define MAX_MDLKEY_LENGTH (1 + NAME_LEN + 1 + NAME_LEN + 1)
 
 
 /**
-   A pending lock request or a granted metadata lock. A lock is requested
-   or granted based on a fully qualified name and type. E.g. for a table
-   the key consists of <0 (=table)>+<database name>+<table name>.
-   Later in this document this triple will be referred to simply as
-   "key" or "name".
+  Metadata lock object key.
+
+  A lock is requested or granted based on a fully qualified name and type.
+  E.g. They key for a table consists of <0 (=table)>+<database>+<table name>.
+  Elsewhere in the comments this triple will be referred to simply as "key"
+  or "name".
 */
 
-struct MDL_LOCK_DATA
+class MDL_key
 {
-  char          *key;
-  uint          key_length;
-  enum          enum_mdl_type type;
-  enum          enum_mdl_state state;
-
-private:
-  /**
-     Pointers for participating in the list of lock requests for this context.
-  */
-  MDL_LOCK_DATA *next_context;
-  MDL_LOCK_DATA **prev_context;
-  /**
-     Pointers for participating in the list of satisfied/pending requests
-     for the lock.
-  */
-  MDL_LOCK_DATA *next_lock;
-  MDL_LOCK_DATA **prev_lock;
+public:
+  const uchar *ptr() const { return (uchar*) m_ptr; }
+  uint length() const { return m_length; }
 
-  friend struct MDL_LOCK_DATA_context;
-  friend struct MDL_LOCK_DATA_lock;
+  const char *db_name() const { return m_ptr + 1; }
+  uint db_name_length() const { return m_db_name_length; }
 
-public:
-  /*
-    Pointer to the lock object for this lock request. Valid only if this lock
-    request is satisified or is present in the list of pending lock requests
-    for particular lock.
-  */
-  MDL_LOCK      *lock;
-  MDL_CONTEXT   *ctx;
-};
+  const char *table_name() const { return m_ptr + m_db_name_length + 2; }
+  uint table_name_length() const { return m_length - m_db_name_length - 3; }
 
+  /**
+    Construct a metadata lock key from a triplet (type, database and name).
 
-/**
-   Helper class which specifies which members of MDL_LOCK_DATA are used for
-   participation in the list lock requests belonging to one context.
-*/
+    @remark The key for a table is <0 (=table)>+<database name>+<table name>
 
-struct MDL_LOCK_DATA_context
-{
-  static inline MDL_LOCK_DATA **next_ptr(MDL_LOCK_DATA *l)
+    @param  type   Id of type of object to be locked
+    @param  db     Name of database to which the object belongs
+    @param  name   Name of of the object
+    @param  key    Where to store the the MDL key.
+  */
+  void mdl_key_init(char type, const char *db, const char *name)
+  {
+    m_ptr[0]= type;
+    m_db_name_length= (uint) (strmov(m_ptr + 1, db) - m_ptr - 1);
+    m_length= (uint) (strmov(m_ptr + m_db_name_length + 2, name) - m_ptr + 1);
+  }
+  void mdl_key_init(const MDL_key *rhs)
   {
-    return &l->next_context;
+    memcpy(m_ptr, rhs->m_ptr, rhs->m_length);
+    m_length= rhs->m_length;
+    m_db_name_length= rhs->m_db_name_length;
   }
-  static inline MDL_LOCK_DATA ***prev_ptr(MDL_LOCK_DATA *l)
+  bool is_equal(const MDL_key *rhs) const
   {
-    return &l->prev_context;
+    return (m_length == rhs->m_length &&
+            memcmp(m_ptr, rhs->m_ptr, m_length) == 0);
   }
+  MDL_key(const MDL_key *rhs)
+  {
+    mdl_key_init(rhs);
+  }
+  MDL_key(char type_arg, const char *db_arg, const char *name_arg)
+  {
+    mdl_key_init(type_arg, db_arg, name_arg);
+  }
+  MDL_key() {} /* To use when part of MDL_request. */
+
+private:
+  char m_ptr[MAX_MDLKEY_LENGTH];
+  uint m_length;
+  uint m_db_name_length;
+private:
+  MDL_key(const MDL_key &);                     /* not implemented */
+  MDL_key &operator=(const MDL_key &);          /* not implemented */
 };
 
 
+
 /**
-   Helper class which specifies which members of MDL_LOCK_DATA are used for
-   participation in the list of satisfied/pending requests for the lock.
+  Hook class which via its methods specifies which members
+  of T should be used for participating in MDL lists.
 */
 
-struct MDL_LOCK_DATA_lock
+template <typename T, T* T::*next, T** T::*prev>
+struct I_P_List_adapter
 {
-  static inline MDL_LOCK_DATA **next_ptr(MDL_LOCK_DATA *l)
-  {
-    return &l->next_lock;
-  }
-  static inline MDL_LOCK_DATA ***prev_ptr(MDL_LOCK_DATA *l)
-  {
-    return &l->prev_lock;
-  }
+  static inline T **next_ptr(T *el) { return &(el->*next); }
+
+  static inline T ***prev_ptr(T *el) { return &(el->*prev); }
 };
 
 
 /**
-   Context of the owner of metadata locks. I.e. each server
-   connection has such a context.
+  A pending metadata lock request.
+
+  A lock request and a granted metadata lock are represented by
+  different classes because they have different allocation
+  sites and hence different lifetimes. The allocation of lock requests is
+  controlled from outside of the MDL subsystem, while allocation of granted
+  locks (tickets) is controlled within the MDL subsystem.
+
+  MDL_request is a C structure, you don't need to call a constructor
+  or destructor for it.
 */
 
-struct MDL_CONTEXT
+class MDL_request
 {
-  I_P_List <MDL_LOCK_DATA, MDL_LOCK_DATA_context> locks;
-  bool has_global_shared_lock;
-  THD      *thd;
-};
+public:
+  /** Type of metadata lock. */
+  enum          enum_mdl_type type;
+
+  /**
+    Pointers for participating in the list of lock requests for this context.
+  */
+  MDL_request *next_in_context;
+  MDL_request **prev_in_context;
+  /** A lock is requested based on a fully qualified name and type. */
+  MDL_key key;
+
+  void init(unsigned char type_arg, const char *db_arg, const char *name_arg);
+  /** Set type of lock request. Can be only applied to pending locks. */
+  inline void set_type(enum_mdl_type type_arg)
+  {
+    DBUG_ASSERT(ticket == NULL);
+    type= type_arg;
+  }
+  bool is_shared() const { return type < MDL_EXCLUSIVE; }
 
+  /**
+    Pointer to the lock ticket object for this lock request.
+    Valid only if this lock request is satisfied.
+  */
+  MDL_ticket *ticket;
 
-void mdl_init();
-void mdl_destroy();
+  static MDL_request *create(unsigned char type, const char *db,
+                             const char *name, MEM_ROOT *root);
 
-void mdl_context_init(MDL_CONTEXT *context, THD *thd);
-void mdl_context_destroy(MDL_CONTEXT *context);
-void mdl_context_backup_and_reset(MDL_CONTEXT *ctx, MDL_CONTEXT *backup);
-void mdl_context_restore(MDL_CONTEXT *ctx, MDL_CONTEXT *backup);
-void mdl_context_merge(MDL_CONTEXT *target, MDL_CONTEXT *source);
+};
 
-/** Maximal length of key for metadata locking subsystem. */
-#define MAX_MDLKEY_LENGTH (4 + NAME_LEN + 1 + NAME_LEN + 1)
 
-void mdl_init_lock(MDL_LOCK_DATA *lock_data, char *key, int type,
-                   const char *db, const char *name);
-MDL_LOCK_DATA *mdl_alloc_lock(int type, const char *db, const char *name,
-                              MEM_ROOT *root);
-void mdl_add_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data);
-void mdl_remove_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data);
-void mdl_remove_all_locks(MDL_CONTEXT *context);
+typedef void (*mdl_cached_object_release_hook)(void *);
 
 /**
-   Set type of lock request. Can be only applied to pending locks.
+  A granted metadata lock.
+
+  @warning MDL_ticket members are private to the MDL subsystem.
+
+  @note Multiple shared locks on a same object are represented by a
+        single ticket. The same does not apply for other lock types.
 */
 
-inline void mdl_set_lock_type(MDL_LOCK_DATA *lock_data, enum_mdl_type lock_type)
+class MDL_ticket
 {
-  DBUG_ASSERT(lock_data->state == MDL_INITIALIZED);
-  lock_data->type= lock_type;
-}
-
-bool mdl_acquire_shared_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data,
-                             bool *retry);
-bool mdl_acquire_exclusive_locks(MDL_CONTEXT *context);
-bool mdl_upgrade_shared_lock_to_exclusive(MDL_CONTEXT *context,
-                                          MDL_LOCK_DATA *lock_data);
-bool mdl_try_acquire_exclusive_lock(MDL_CONTEXT *context,
-                                    MDL_LOCK_DATA *lock_data,
-                                    bool *conflict);
-bool mdl_acquire_global_shared_lock(MDL_CONTEXT *context);
-
-bool mdl_wait_for_locks(MDL_CONTEXT *context);
-
-void mdl_release_locks(MDL_CONTEXT *context);
-void mdl_release_and_remove_all_locks_for_name(MDL_CONTEXT *context,
-                                               MDL_LOCK_DATA *lock_data);
-void mdl_release_lock(MDL_CONTEXT *context, MDL_LOCK_DATA *lock_data);
-void mdl_downgrade_exclusive_lock(MDL_CONTEXT *context,
-                                  MDL_LOCK_DATA *lock_data);
-void mdl_release_global_shared_lock(MDL_CONTEXT *context);
-
-bool mdl_is_exclusive_lock_owner(MDL_CONTEXT *context, int type, const char *db,
-                                 const char *name);
-bool mdl_is_lock_owner(MDL_CONTEXT *context, int type, const char *db,
-                       const char *name);
+public:
+  /**
+    Pointers for participating in the list of lock requests for this context.
+  */
+  MDL_ticket *next_in_context;
+  MDL_ticket **prev_in_context;
+  /**
+    Pointers for participating in the list of satisfied/pending requests
+    for the lock.
+  */
+  MDL_ticket *next_in_lock;
+  MDL_ticket **prev_in_lock;
+public:
+  bool has_pending_conflicting_lock() const;
 
-bool mdl_has_pending_conflicting_lock(MDL_LOCK_DATA *lock_data);
+  void *get_cached_object();
+  void set_cached_object(void *cached_object,
+                         mdl_cached_object_release_hook release_hook);
+  const MDL_context *get_ctx() const { return m_ctx; }
+  bool is_shared() const { return m_type < MDL_EXCLUSIVE; }
+  bool upgrade_shared_lock_to_exclusive();
+  void downgrade_exclusive_lock();
+private:
+  friend class MDL_context;
+
+  MDL_ticket(MDL_context *ctx_arg, enum_mdl_type type_arg)
+   : m_type(type_arg),
+     m_state(MDL_PENDING),
+     m_ctx(ctx_arg),
+     m_lock(NULL)
+  {}
 
-inline bool mdl_has_locks(MDL_CONTEXT *context)
-{
-  return !context->locks.is_empty();
-}
+
+  static MDL_ticket *create(MDL_context *ctx_arg, enum_mdl_type type_arg);
+  static void destroy(MDL_ticket *ticket);
+private:
+  /** Type of metadata lock. */
+  enum enum_mdl_type m_type;
+  /** State of the metadata lock ticket. */
+  enum enum_mdl_state m_state;
+
+  /** Context of the owner of the metadata lock ticket. */
+  MDL_context *m_ctx;
+
+  /** Pointer to the lock object for this lock ticket. */
+  MDL_lock *m_lock;
+private:
+  MDL_ticket(const MDL_ticket &);               /* not implemented */
+  MDL_ticket &operator=(const MDL_ticket &);    /* not implemented */
+};
 
 
 /**
-   Get iterator for walking through all lock requests in the context.
+  Context of the owner of metadata locks. I.e. each server
+  connection has such a context.
 */
 
-inline I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context>
-mdl_get_locks(MDL_CONTEXT *ctx)
+class MDL_context
 {
-  I_P_List_iterator<MDL_LOCK_DATA, MDL_LOCK_DATA_context> result(ctx->locks);
-  return result;
-}
-
-/**
-   Give metadata lock request object for the table get table definition
-   cache key corresponding to it.
+public:
+  typedef I_P_List<MDL_request,
+                   I_P_List_adapter<MDL_request,
+                                    &MDL_request::next_in_context,
+                                    &MDL_request::prev_in_context> >
+          Request_list;
+
+  typedef Request_list::Iterator Request_iterator;
+
+  typedef I_P_List<MDL_ticket,
+                   I_P_List_adapter<MDL_ticket,
+                                    &MDL_ticket::next_in_context,
+                                    &MDL_ticket::prev_in_context> >
+          Ticket_list;
+
+  typedef Ticket_list::Iterator Ticket_iterator;
+
+  void init(THD *thd);
+  void destroy();
+
+  void backup_and_reset(MDL_context *backup);
+  void restore_from_backup(MDL_context *backup);
+  void merge(MDL_context *source);
+
+  void add_request(MDL_request *mdl_request);
+  void remove_request(MDL_request *mdl_request);
+  void remove_all_requests();
+
+  bool acquire_shared_lock(MDL_request *mdl_request, bool *retry);
+  bool acquire_exclusive_locks();
+  bool try_acquire_exclusive_lock(MDL_request *mdl_request, bool *conflict);
+  bool acquire_global_shared_lock();
+
+  bool wait_for_locks();
+
+  void release_all_locks();
+  void release_all_locks_for_name(MDL_ticket *ticket);
+  void release_lock(MDL_ticket *ticket);
+  void release_global_shared_lock();
+
+  bool is_exclusive_lock_owner(unsigned char type,
+                               const char *db,
+                               const char *name);
+  bool is_lock_owner(unsigned char type, const char *db, const char *name);
 
-   @param lock_data  [in]  Lock request object for the table.
-   @param key        [out] LEX_STRING object where table definition cache key
-                           should be put.
+  inline bool has_locks() const
+  {
+    return !m_tickets.is_empty();
+  }
 
-   @note This key will have the same life-time as this lock request object.
+  inline MDL_ticket *mdl_savepoint()
+  {
+    return m_tickets.head();
+  }
 
-   @note This is yet another place where border between MDL subsystem and the
-         rest of the server is broken. OTOH it allows to save some CPU cycles
-         and memory by avoiding generating these TDC keys from table list.
-*/
+  void rollback_to_savepoint(MDL_ticket *mdl_savepoint);
 
-inline void mdl_get_tdc_key(MDL_LOCK_DATA *lock_data, LEX_STRING *key)
-{
-  key->str= lock_data->key + 4;
-  key->length= lock_data->key_length - 4;
-}
+  /**
+    Get iterator for walking through all lock requests in the context.
+  */
+  inline Request_iterator get_requests()
+  {
+    return Request_iterator(m_requests);
+  }
+  inline THD *get_thd() const { return m_thd; }
+private:
+  Request_list m_requests;
+  Ticket_list m_tickets;
+  bool m_has_global_shared_lock;
+  THD *m_thd;
+private:
+  void release_ticket(MDL_ticket *ticket);
+  MDL_ticket *find_ticket(MDL_request *mdl_req);
+};
 
 
-typedef void (* mdl_cached_object_release_hook)(void *);
-void* mdl_get_cached_object(MDL_LOCK_DATA *lock_data);
-void mdl_set_cached_object(MDL_LOCK_DATA *lock_data, void *cached_object,
-                           mdl_cached_object_release_hook release_hook);
+void mdl_init();
+void mdl_destroy();
 
 
 /*

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2009-02-13 16:30:54 +0000
+++ b/sql/rpl_rli.cc	2009-03-04 13:31:31 +0000
@@ -1187,7 +1187,7 @@ void Relay_log_info::clear_tables_to_loc
     meta-data locks are stored. So we want to be sure that we don't have
     any references to this memory left.
   */
-  DBUG_ASSERT(!mdl_has_locks(&(current_thd->mdl_context)));
+  DBUG_ASSERT(!current_thd->mdl_context.has_locks());
 
   while (tables_to_lock)
   {

=== modified file 'sql/set_var.cc'
--- a/sql/set_var.cc	2009-02-23 14:53:18 +0000
+++ b/sql/set_var.cc	2009-03-04 10:04:58 +0000
@@ -1840,7 +1840,7 @@ bool sys_var::check_enum(THD *thd, set_v
     if (!(res=var->value->val_str(&str)) ||
 	((long) (var->save_result.ulong_value=
 		 (ulong) find_type(enum_names, res->ptr(),
-				   res->length(),1)-1)) < 0)
+				   res->length(), FALSE) - 1)) < 0)
     {
       value= res ? res->c_ptr() : "NULL";
       goto err;

=== modified file 'sql/si_objects.cc'
--- a/sql/si_objects.cc	2009-02-05 18:10:29 +0000
+++ b/sql/si_objects.cc	2009-03-04 13:33:47 +0000
@@ -204,33 +204,7 @@ run_service_interface_sql(THD *thd, Ed_c
   names.
 */
 
-struct Table_name_key
-{
-public:
-  LEX_STRING db_name;
-  LEX_STRING table_name;
-  LEX_STRING key;
-
-public:
-  void init_from_mdl_key(const char *key_arg, size_t key_length_arg,
-                         char *key_buff_arg);
-};
-
-///////////////////////////////////////////////////////////////////////////
-
-void
-Table_name_key::init_from_mdl_key(const char *key_arg, size_t key_length_arg,
-                                  char *key_buff_arg)
-{
-  key.str= key_buff_arg;
-  key.length= key_length_arg - 4; /* Skip MDL object type */
-  memcpy(key.str, key_arg + 4, key.length);
-
-  db_name.str= key.str;
-  db_name.length= strlen(db_name.str);
-  table_name.str= db_name.str + db_name.length + 1; /* Skip \0 */
-  table_name.length= strlen(table_name.str);
-}
+typedef MDL_key Table_name_key;
 
 ///////////////////////////////////////////////////////////////////////////
 
@@ -242,8 +216,8 @@ get_table_name_key(const uchar *record,
                    my_bool not_used __attribute__((unused)))
 {
   Table_name_key *table_name_key= (Table_name_key *) record;
-  *key_length= table_name_key->key.length;
-  return (uchar *) table_name_key->key.str;
+  *key_length= table_name_key->length();
+  return (uchar*) table_name_key->ptr();
 }
 
 ///////////////////////////////////////////////////////////////////////////
@@ -1560,21 +1534,16 @@ Find_view_underlying_tables::execute_ser
     while ((table= it++))
     {
       Table_name_key *table_name_key;
-      char *key_buff;
 
       /* If we expect a view, and it's a table, or vice versa, continue */
       if ((int) m_base_obj_it->get_base_obj_kind() != test(table->view))
         continue;
 
       if (! my_multi_malloc(MYF(MY_WME),
-                            &table_name_key, sizeof(*table_name_key),
-                            &key_buff, table->mdl_lock_data->key_length,
-                            NullS))
+                            &table_name_key, sizeof(*table_name_key), NullS))
         goto end;
 
-      table_name_key->init_from_mdl_key(table->mdl_lock_data->key,
-                                        table->mdl_lock_data->key_length,
-                                        key_buff);
+      table_name_key->mdl_key_init(&table->mdl_request->key);
 
       if (my_hash_insert(m_table_names, (uchar*) table_name_key))
       {
@@ -1608,6 +1577,8 @@ bool View_base_obj_iterator::init(THD *t
 
 Obj *View_base_obj_iterator::next()
 {
+  LEX_STRING db_name;
+  LEX_STRING table_name;
   if (m_cur_idx >= m_table_names.records)
     return NULL;
 
@@ -1616,7 +1587,13 @@ Obj *View_base_obj_iterator::next()
 
   ++m_cur_idx;
 
-  return create_obj(&table_name_key->db_name, &table_name_key->table_name);
+  db_name.str= (char*) table_name_key->db_name();
+  db_name.length= table_name_key->db_name_length();
+
+  table_name.str= (char*) table_name_key->table_name();
+  table_name.length= table_name_key->table_name_length();
+
+  return create_obj(&db_name, &table_name);
 }
 
 ///////////////////////////////////////////////////////////////////////////

=== modified file 'sql/sp_head.cc'
--- a/sql/sp_head.cc	2009-02-23 14:53:18 +0000
+++ b/sql/sp_head.cc	2009-03-04 13:33:47 +0000
@@ -3975,10 +3975,10 @@ sp_head::add_used_tables_to_table_list(T
       table->prelocking_placeholder= 1;
       table->belong_to_view= belong_to_view;
       table->trg_event_map= stab->trg_event_map;
-      table->mdl_lock_data= mdl_alloc_lock(0, table->db, table->table_name,
-                                           thd->locked_tables_root ?
-                                           thd->locked_tables_root :
-                                           thd->mem_root);
+      table->mdl_request= MDL_request::create(0, table->db, table->table_name,
+                                              thd->locked_tables_root ?
+                                              thd->locked_tables_root :
+                                              thd->mem_root);
 
       /* Everyting else should be zeroed */
 
@@ -4022,9 +4022,10 @@ sp_add_to_query_tables(THD *thd, LEX *le
   table->lock_transactional= 1; /* allow transactional locks */
   table->select_lex= lex->current_select;
   table->cacheable_table= 1;
-  table->mdl_lock_data= mdl_alloc_lock(0, table->db, table->table_name,
-                                       thd->locked_tables_root ?
-                                       thd->locked_tables_root : thd->mem_root);
+  table->mdl_request= MDL_request::create(0, table->db, table->table_name,
+                                          thd->locked_tables_root ?
+                                          thd->locked_tables_root :
+                                          thd->mem_root);
 
   lex->add_to_query_tables(table);
   return table;

=== modified file 'sql/sql_acl.cc'
--- a/sql/sql_acl.cc	2009-02-23 14:53:18 +0000
+++ b/sql/sql_acl.cc	2009-03-04 10:04:58 +0000
@@ -693,7 +693,7 @@ my_bool acl_reload(THD *thd)
   tables[0].lock_type=tables[1].lock_type=tables[2].lock_type=TL_READ;
   tables[0].skip_temporary= tables[1].skip_temporary=
     tables[2].skip_temporary= TRUE;
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, tables))
   {
@@ -1591,7 +1591,7 @@ bool change_password(THD *thd, const cha
   bzero((char*) &tables, sizeof(tables));
   tables.alias= tables.table_name= (char*) "user";
   tables.db= (char*) "mysql";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
 #ifdef HAVE_REPLICATION
   /*
@@ -3027,7 +3027,7 @@ int mysql_table_grant(THD *thd, TABLE_LI
 			    ? tables+2 : 0);
   tables[0].lock_type=tables[1].lock_type=tables[2].lock_type=TL_WRITE;
   tables[0].db=tables[1].db=tables[2].db=(char*) "mysql";
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   /*
     This statement will be replicated as a statement, even when using
@@ -3249,7 +3249,7 @@ bool mysql_routine_grant(THD *thd, TABLE
   tables[0].next_local= tables[0].next_global= tables+1;
   tables[0].lock_type=tables[1].lock_type=TL_WRITE;
   tables[0].db=tables[1].db=(char*) "mysql";
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   /*
     This statement will be replicated as a statement, even when using
@@ -3391,7 +3391,7 @@ bool mysql_grant(THD *thd, const char *d
   tables[0].next_local= tables[0].next_global= tables+1;
   tables[0].lock_type=tables[1].lock_type=TL_WRITE;
   tables[0].db=tables[1].db=(char*) "mysql";
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   /*
     This statement will be replicated as a statement, even when using
@@ -3724,7 +3724,7 @@ static my_bool grant_reload_procs_priv(T
   table.db= (char *) "mysql";
   table.lock_type= TL_READ;
   table.skip_temporary= 1;
-  alloc_mdl_locks(&table, thd->mem_root);
+  alloc_mdl_requests(&table, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, &table))
   {
@@ -3791,7 +3791,7 @@ my_bool grant_reload(THD *thd)
   tables[0].next_local= tables[0].next_global= tables+1;
   tables[0].lock_type= tables[1].lock_type= TL_READ;
   tables[0].skip_temporary= tables[1].skip_temporary= TRUE;
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   /*
     To avoid deadlocks we should obtain table locks before
@@ -5137,7 +5137,7 @@ int open_grant_tables(THD *thd, TABLE_LI
     (tables+4)->lock_type= TL_WRITE;
   tables->db= (tables+1)->db= (tables+2)->db= 
     (tables+3)->db= (tables+4)->db= (char*) "mysql";
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
 #ifdef HAVE_REPLICATION
   /*

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2009-02-23 14:53:18 +0000
+++ b/sql/sql_base.cc	2009-03-04 13:33:47 +0000
@@ -119,7 +119,7 @@ static bool check_and_update_table_versi
 static bool open_table_entry_fini(THD *thd, TABLE_SHARE *share, TABLE *entry);
 static bool auto_repair_table(THD *thd, TABLE_LIST *table_list);
 static void free_cache_entry(TABLE *entry);
-static bool tdc_wait_for_old_versions(THD *thd, MDL_CONTEXT *context);
+static bool tdc_wait_for_old_versions(THD *thd, MDL_context *context);
 static bool
 has_two_write_locked_tables_with_auto_increment(TABLE_LIST *tables);
 
@@ -444,8 +444,8 @@ TABLE_SHARE *get_table_share(THD *thd, T
     To be able perform any operation on table we should own
     some kind of metadata lock on it.
   */
-  DBUG_ASSERT(mdl_is_lock_owner(&thd->mdl_context, 0, table_list->db,
-                                table_list->table_name));
+  DBUG_ASSERT(thd->mdl_context.is_lock_owner(0, table_list->db,
+                                             table_list->table_name));
 
   /* Read table definition from cache */
   if ((share= (TABLE_SHARE*) my_hash_search(&table_def_cache,(uchar*) key,
@@ -1043,12 +1043,12 @@ err_with_reopen:
     */
     thd->locked_tables_list.reopen_tables(thd);
     /*
-      Since mdl_downgrade_exclusive_lock() won't do anything with shared
-      metadata lock it is much simplier to go through all open tables rather
+      Since downgrade_exclusive_lock() won't do anything with shared
+      metadata lock it is much simpler to go through all open tables rather
       than picking only those tables that were flushed.
     */
     for (TABLE *tab= thd->open_tables; tab; tab= tab->next)
-      mdl_downgrade_exclusive_lock(&thd->mdl_context, tab->mdl_lock_data);
+      tab->mdl_ticket->downgrade_exclusive_lock();
   }
   DBUG_RETURN(result);
 }
@@ -1333,7 +1333,7 @@ close_all_tables_for_name(THD *thd, TABL
 */
 
 void close_thread_tables(THD *thd,
-                         bool skip_mdl)
+                         bool is_back_off)
 {
   TABLE *table;
   DBUG_ENTER("close_thread_tables");
@@ -1475,10 +1475,10 @@ void close_thread_tables(THD *thd,
   if (thd->open_tables)
     close_open_tables(thd);
 
-  mdl_release_locks(&thd->mdl_context);
-  if (!skip_mdl)
+  thd->mdl_context.release_all_locks();
+  if (!is_back_off)
   {
-    mdl_remove_all_locks(&thd->mdl_context);
+    thd->mdl_context.remove_all_requests();
   }
   DBUG_VOID_RETURN;
 }
@@ -1497,7 +1497,7 @@ bool close_thread_table(THD *thd, TABLE 
 
   *table_ptr=table->next;
 
-  table->mdl_lock_data= 0;
+  table->mdl_ticket= NULL;
   if (table->needs_reopen() ||
       thd->version != refresh_version || !table->db_stat)
   {
@@ -2084,8 +2084,7 @@ bool wait_while_table_is_used(THD *thd, 
   old_lock_type= table->reginfo.lock_type;
   mysql_lock_abort(thd, table, TRUE);	/* end threads waiting on lock */
 
-  if (mdl_upgrade_shared_lock_to_exclusive(&thd->mdl_context,
-                                           table->mdl_lock_data))
+  if (table->mdl_ticket->upgrade_shared_lock_to_exclusive())
   {
     mysql_lock_downgrade_write(thd, table, old_lock_type);
     DBUG_RETURN(TRUE);
@@ -2273,11 +2272,11 @@ void table_share_release_hook(void *shar
 
 static bool
 open_table_get_mdl_lock(THD *thd, TABLE_LIST *table_list,
-                        MDL_LOCK_DATA *mdl_lock_data,
+                        MDL_request *mdl_request,
                         uint flags,
                         enum_open_table_action *action)
 {
-  mdl_add_lock(&thd->mdl_context, mdl_lock_data);
+  thd->mdl_context.add_request(mdl_request);
 
   if (table_list->open_type)
   {
@@ -2290,10 +2289,10 @@ open_table_get_mdl_lock(THD *thd, TABLE_
       shared locks. This invariant is preserved here and is also
       enforced by asserts in metadata locking subsystem.
     */
-    mdl_set_lock_type(mdl_lock_data, MDL_EXCLUSIVE);
-    if (mdl_acquire_exclusive_locks(&thd->mdl_context))
+    mdl_request->set_type(MDL_EXCLUSIVE);
+    if (thd->mdl_context.acquire_exclusive_locks())
     {
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      thd->mdl_context.remove_request(mdl_request);
       return 1;
     }
   }
@@ -2310,16 +2309,16 @@ open_table_get_mdl_lock(THD *thd, TABLE_
 
     if (flags & MYSQL_OPEN_TAKE_UPGRADABLE_MDL &&
         table_list->lock_type >= TL_WRITE_ALLOW_WRITE)
-      mdl_set_lock_type(mdl_lock_data, MDL_SHARED_UPGRADABLE);
+      mdl_request->set_type(MDL_SHARED_UPGRADABLE);
     if (flags & MYSQL_LOCK_IGNORE_FLUSH)
-      mdl_set_lock_type(mdl_lock_data, MDL_SHARED_HIGH_PRIO);
+      mdl_request->set_type(MDL_SHARED_HIGH_PRIO);
 
-    if (mdl_acquire_shared_lock(&thd->mdl_context, mdl_lock_data, &retry))
+    if (thd->mdl_context.acquire_shared_lock(mdl_request, &retry))
     {
       if (retry)
         *action= OT_BACK_OFF_AND_RETRY;
       else
-        mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+        thd->mdl_context.remove_request(mdl_request);
       return 1;
     }
   }
@@ -2374,7 +2373,8 @@ bool open_table(THD *thd, TABLE_LIST *ta
   char	key[MAX_DBKEY_LENGTH];
   uint	key_length;
   char	*alias= table_list->alias;
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_request *mdl_request;
+  MDL_ticket *mdl_ticket;
   int error;
   TABLE_SHARE *share;
   DBUG_ENTER("open_table");
@@ -2523,8 +2523,8 @@ bool open_table(THD *thd, TABLE_LIST *ta
       TABLES breaks metadata locking protocol (potentially can lead
       to deadlocks) it should be disallowed.
     */
-    if (mdl_is_lock_owner(&thd->mdl_context, 0, table_list->db,
-                          table_list->table_name))
+    if (thd->mdl_context.is_lock_owner(0, table_list->db,
+                                       table_list->table_name))
     {
       char path[FN_REFLEN];
       enum legacy_db_type not_used;
@@ -2566,10 +2566,10 @@ bool open_table(THD *thd, TABLE_LIST *ta
     This is the normal use case.
   */
 
-  mdl_lock_data= table_list->mdl_lock_data;
+  mdl_request= table_list->mdl_request;
   if (! (flags & MYSQL_OPEN_HAS_MDL_LOCK))
   {
-    if (open_table_get_mdl_lock(thd, table_list, mdl_lock_data, flags,
+    if (open_table_get_mdl_lock(thd, table_list, mdl_request, flags,
                                 action))
     {
       DEBUG_SYNC(thd, "before_open_table_wait_refresh");
@@ -2577,6 +2577,13 @@ bool open_table(THD *thd, TABLE_LIST *ta
     }
   }
 
+  /*
+    Grab reference to the granted MDL lock ticket. Must be done after
+    open_table_get_mdl_lock as the lock on the table might have been
+    acquired previously (MYSQL_OPEN_HAS_MDL_LOCK).
+  */
+  mdl_ticket= mdl_request->ticket;
+
   pthread_mutex_lock(&LOCK_open);
 
   /*
@@ -2618,7 +2625,7 @@ bool open_table(THD *thd, TABLE_LIST *ta
     DBUG_RETURN(FALSE);
   }
 
-  if (!(share= (TABLE_SHARE *)mdl_get_cached_object(mdl_lock_data)))
+  if (!(share= (TABLE_SHARE *) mdl_ticket->get_cached_object()))
   {
     if (!(share= get_table_share_with_create(thd, table_list, key,
                                              key_length, OPEN_VIEW,
@@ -2679,7 +2686,7 @@ bool open_table(THD *thd, TABLE_LIST *ta
       so we need to increase reference counter;
     */
     reference_table_share(share);
-    mdl_set_cached_object(mdl_lock_data, share, table_share_release_hook);
+    mdl_ticket->set_cached_object(share, table_share_release_hook);
   }
   else
   {
@@ -2788,9 +2795,9 @@ bool open_table(THD *thd, TABLE_LIST *ta
     lock on this table to shared metadata lock.
   */
   if (table_list->open_type == TABLE_LIST::OPEN_OR_CREATE)
-    mdl_downgrade_exclusive_lock(&thd->mdl_context, table_list->mdl_lock_data);
+    mdl_ticket->downgrade_exclusive_lock();
 
-  table->mdl_lock_data= mdl_lock_data;
+  table->mdl_ticket= mdl_ticket;
 
   table->next=thd->open_tables;		/* Link into simple list */
   thd->open_tables=table;
@@ -2840,8 +2847,8 @@ err_unlock2:
   pthread_mutex_unlock(&LOCK_open);
   if (! (flags & MYSQL_OPEN_HAS_MDL_LOCK))
   {
-    mdl_release_lock(&thd->mdl_context, mdl_lock_data);
-    mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+    thd->mdl_context.release_lock(mdl_ticket);
+    thd->mdl_context.remove_request(mdl_request);
   }
   DBUG_RETURN(TRUE);
 }
@@ -2959,7 +2966,7 @@ Locked_tables_list::init_locked_tables(T
     dst_table_list->init_one_table(db, db_len, table_name, table_name_len,
                                    alias,
                                    src_table_list->table->reginfo.lock_type);
-    dst_table_list->mdl_lock_data= src_table_list->mdl_lock_data;
+    dst_table_list->mdl_request= src_table_list->mdl_request;
     dst_table_list->table= table;
     memcpy(db, src_table_list->db, db_len + 1);
     memcpy(table_name, src_table_list->table_name, table_name_len + 1);
@@ -3491,20 +3498,21 @@ recover_from_failed_open_table_attempt(T
                                        enum_open_table_action action)
 {
   bool result= FALSE;
+  MDL_request *mdl_request= table->mdl_request;
 
   switch (action)
   {
     case OT_BACK_OFF_AND_RETRY:
-      result= (mdl_wait_for_locks(&thd->mdl_context) ||
+      result= (thd->mdl_context.wait_for_locks() ||
                tdc_wait_for_old_versions(thd, &thd->mdl_context));
-      mdl_remove_all_locks(&thd->mdl_context);
+      thd->mdl_context.remove_all_requests();
       break;
     case OT_DISCOVER:
-      mdl_set_lock_type(table->mdl_lock_data, MDL_EXCLUSIVE);
-      mdl_add_lock(&thd->mdl_context, table->mdl_lock_data);
-      if (mdl_acquire_exclusive_locks(&thd->mdl_context))
+      mdl_request->set_type(MDL_EXCLUSIVE);
+      thd->mdl_context.add_request(mdl_request);
+      if (thd->mdl_context.acquire_exclusive_locks())
       {
-        mdl_remove_lock(&thd->mdl_context, table->mdl_lock_data);
+        thd->mdl_context.remove_request(mdl_request);
         return TRUE;
       }
       pthread_mutex_lock(&LOCK_open);
@@ -3514,15 +3522,15 @@ recover_from_failed_open_table_attempt(T
 
       thd->warning_info->clear_warning_info(thd->query_id);
       thd->clear_error();                 // Clear error message
-      mdl_release_lock(&thd->mdl_context, table->mdl_lock_data);
-      mdl_remove_lock(&thd->mdl_context, table->mdl_lock_data);
+      thd->mdl_context.release_lock(mdl_request->ticket);
+      thd->mdl_context.remove_request(mdl_request);
       break;
     case OT_REPAIR:
-      mdl_set_lock_type(table->mdl_lock_data, MDL_EXCLUSIVE);
-      mdl_add_lock(&thd->mdl_context, table->mdl_lock_data);
-      if (mdl_acquire_exclusive_locks(&thd->mdl_context))
+      mdl_request->set_type(MDL_EXCLUSIVE);
+      thd->mdl_context.add_request(mdl_request);
+      if (thd->mdl_context.acquire_exclusive_locks())
       {
-        mdl_remove_lock(&thd->mdl_context, table->mdl_lock_data);
+        thd->mdl_context.remove_request(mdl_request);
         return TRUE;
       }
       pthread_mutex_lock(&LOCK_open);
@@ -3530,8 +3538,8 @@ recover_from_failed_open_table_attempt(T
       pthread_mutex_unlock(&LOCK_open);
 
       result= auto_repair_table(thd, table);
-      mdl_release_lock(&thd->mdl_context, table->mdl_lock_data);
-      mdl_remove_lock(&thd->mdl_context, table->mdl_lock_data);
+      thd->mdl_context.release_lock(mdl_request->ticket);
+      thd->mdl_context.remove_request(mdl_request);
       break;
     default:
       DBUG_ASSERT(0);
@@ -4604,7 +4612,7 @@ int lock_tables(THD *thd, TABLE_LIST *ta
 
 */
 
-void close_tables_for_reopen(THD *thd, TABLE_LIST **tables, bool skip_mdl)
+void close_tables_for_reopen(THD *thd, TABLE_LIST **tables, bool is_back_off)
 {
   /*
     If table list consists only from tables from prelocking set, table list
@@ -4616,7 +4624,7 @@ void close_tables_for_reopen(THD *thd, T
   sp_remove_not_own_routines(thd->lex);
   for (TABLE_LIST *tmp= *tables; tmp; tmp= tmp->next_global)
     tmp->table= 0;
-  close_thread_tables(thd, skip_mdl);
+  close_thread_tables(thd, is_back_off);
 }
 
 
@@ -7651,8 +7659,7 @@ void tdc_remove_table(THD *thd, enum_tdc
   safe_mutex_assert_owner(&LOCK_open);
 
   DBUG_ASSERT(remove_type == TDC_RT_REMOVE_UNUSED ||
-              mdl_is_exclusive_lock_owner(&thd->mdl_context, 0,
-                                          db, table_name));
+              thd->mdl_context.is_exclusive_lock_owner(0, db, table_name));
 
   key_length=(uint) (strmov(strmov(key,db)+1,table_name)-key)+1;
 
@@ -7699,12 +7706,11 @@ void tdc_remove_table(THD *thd, enum_tdc
    @param context  Metadata locking context with locks.
 */
 
-static bool tdc_wait_for_old_versions(THD *thd, MDL_CONTEXT *context)
+static bool tdc_wait_for_old_versions(THD *thd, MDL_context *mdl_context)
 {
-  MDL_LOCK_DATA *lock_data;
   TABLE_SHARE *share;
   const char *old_msg;
-  LEX_STRING key;
+  MDL_request *mdl_request;
 
   while (!thd->killed)
   {
@@ -7717,18 +7723,15 @@ static bool tdc_wait_for_old_versions(TH
     mysql_ha_flush(thd);
     pthread_mutex_lock(&LOCK_open);
 
-    I_P_List_iterator<MDL_LOCK_DATA,
-                      MDL_LOCK_DATA_context> it= mdl_get_locks(context);
-    while ((lock_data= it++))
-    {
-      mdl_get_tdc_key(lock_data, &key);
-      if ((share= (TABLE_SHARE*) my_hash_search(&table_def_cache,
-                                                (uchar*) key.str,
-                                                key.length)) &&
+    MDL_context::Request_iterator it= mdl_context->get_requests();
+    while ((mdl_request= it++))
+    {
+      if ((share= get_cached_table_share(mdl_request->key.db_name(),
+                                         mdl_request->key.table_name())) &&
           share->version != refresh_version)
         break;
     }
-    if (!lock_data)
+    if (!mdl_request)
     {
       pthread_mutex_unlock(&LOCK_open);
       break;
@@ -7958,7 +7961,7 @@ open_system_tables_for_read(THD *thd, TA
 
   DBUG_ENTER("open_system_tables_for_read");
 
-  alloc_mdl_locks(table_list, thd->mem_root);
+  alloc_mdl_requests(table_list, thd->mem_root);
 
   /*
     Besides using new Open_tables_state for opening system tables,
@@ -8033,7 +8036,7 @@ open_system_table_for_update(THD *thd, T
 {
   DBUG_ENTER("open_system_table_for_update");
 
-  alloc_mdl_locks(one_table, thd->mem_root);
+  alloc_mdl_requests(one_table, thd->mem_root);
 
   TABLE *table= open_ltable(thd, one_table, one_table->lock_type, 0);
   if (table)
@@ -8071,7 +8074,7 @@ open_performance_schema_table(THD *thd, 
 
   thd->reset_n_backup_open_tables_state(backup);
 
-  alloc_mdl_locks(one_table, thd->mem_root);
+  alloc_mdl_requests(one_table, thd->mem_root);
   if ((table= open_ltable(thd, one_table, one_table->lock_type, flags)))
   {
     DBUG_ASSERT(table->s->table_category == TABLE_CATEGORY_PERFORMANCE);
@@ -8148,8 +8151,8 @@ void close_performance_schema_table(THD 
 
   pthread_mutex_unlock(&LOCK_open);
 
-  mdl_release_locks(&thd->mdl_context);
-  mdl_remove_all_locks(&thd->mdl_context);
+  thd->mdl_context.release_all_locks();
+  thd->mdl_context.remove_all_requests();
 
   thd->restore_backup_open_tables_state(backup);
 }

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2009-02-23 14:53:18 +0000
+++ b/sql/sql_class.cc	2009-03-04 13:33:47 +0000
@@ -1030,8 +1030,8 @@ THD::~THD()
   if (!cleanup_done)
     cleanup();
 
-  mdl_context_destroy(&mdl_context);
-  mdl_context_destroy(&handler_mdl_context);
+  mdl_context.destroy();
+  handler_mdl_context.destroy();
   ha_close_connection(this);
   mysql_audit_release(this);
   plugin_thdvar_cleanup(this);
@@ -3018,8 +3018,8 @@ void THD::restore_backup_open_tables_sta
               lock == 0 &&
               locked_tables_mode == LTM_NONE &&
               m_reprepare_observer == NULL);
-  mdl_context_destroy(&mdl_context);
-  mdl_context_destroy(&handler_mdl_context);
+  mdl_context.destroy();
+  handler_mdl_context.destroy();
 
   set_open_tables_state(backup);
   DBUG_VOID_RETURN;

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2009-02-23 14:53:18 +0000
+++ b/sql/sql_class.h	2009-03-04 13:33:47 +0000
@@ -798,6 +798,8 @@ struct st_savepoint {
   char                *name;
   uint                 length;
   Ha_trx_info         *ha_list;
+  /** Last acquired lock before this savepoint was set. */
+  MDL_ticket     *mdl_savepoint;
 };
 
 enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
@@ -999,8 +1001,8 @@ public:
   */
   uint state_flags;
 
-  MDL_CONTEXT mdl_context;
-  MDL_CONTEXT handler_mdl_context;
+  MDL_context mdl_context;
+  MDL_context handler_mdl_context;
 
   /**
      This constructor initializes Open_tables_state instance which can only
@@ -1031,8 +1033,8 @@ public:
     locked_tables_mode= LTM_NONE;
     state_flags= 0U;
     m_reprepare_observer= NULL;
-    mdl_context_init(&mdl_context, thd);
-    mdl_context_init(&handler_mdl_context, thd);
+    mdl_context.init(thd);
+    handler_mdl_context.init(thd);
   }
 };
 

=== modified file 'sql/sql_delete.cc'
--- a/sql/sql_delete.cc	2009-02-16 21:18:45 +0000
+++ b/sql/sql_delete.cc	2009-03-04 13:31:31 +0000
@@ -1035,7 +1035,7 @@ bool mysql_truncate(THD *thd, TABLE_LIST
   TABLE *table;
   bool error;
   uint path_length;
-  MDL_LOCK_DATA *mdl_lock_data= 0;
+  MDL_request *mdl_request= NULL;
   Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_truncate");
 
@@ -1100,13 +1100,13 @@ bool mysql_truncate(THD *thd, TABLE_LIST
              tries to get table enging and therefore accesses table in some way
              without holding any kind of meta-data lock.
     */
-    mdl_lock_data= mdl_alloc_lock(0, table_list->db, table_list->table_name,
-                                  thd->mem_root);
-    mdl_set_lock_type(mdl_lock_data, MDL_EXCLUSIVE);
-    mdl_add_lock(&thd->mdl_context, mdl_lock_data);
-    if (mdl_acquire_exclusive_locks(&thd->mdl_context))
+    mdl_request= MDL_request::create(0, table_list->db,
+                                     table_list->table_name, thd->mem_root);
+    mdl_request->set_type(MDL_EXCLUSIVE);
+    thd->mdl_context.add_request(mdl_request);
+    if (thd->mdl_context.acquire_exclusive_locks())
     {
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      thd->mdl_context.remove_request(mdl_request);
       DBUG_RETURN(TRUE);
     }
     pthread_mutex_lock(&LOCK_open);
@@ -1139,18 +1139,18 @@ end:
       write_bin_log(thd, TRUE, thd->query, thd->query_length);
       my_ok(thd);		// This should return record count
     }
-    if (mdl_lock_data)
+    if (mdl_request)
     {
-      mdl_release_lock(&thd->mdl_context, mdl_lock_data);
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      thd->mdl_context.release_lock(mdl_request->ticket);
+      thd->mdl_context.remove_request(mdl_request);
     }
   }
   else if (error)
   {
-    if (mdl_lock_data)
+    if (mdl_request)
     {
-      mdl_release_lock(&thd->mdl_context, mdl_lock_data);
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      thd->mdl_context.release_lock(mdl_request->ticket);
+      thd->mdl_context.remove_request(mdl_request);
     }
   }
   DBUG_RETURN(error);

=== modified file 'sql/sql_handler.cc'
--- a/sql/sql_handler.cc	2009-01-27 02:08:48 +0000
+++ b/sql/sql_handler.cc	2009-03-04 13:31:31 +0000
@@ -125,7 +125,7 @@ static void mysql_ha_hash_free(TABLE_LIS
 static void mysql_ha_close_table(THD *thd, TABLE_LIST *tables)
 {
   TABLE **table_ptr;
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_ticket *mdl_ticket;
 
   /*
     Though we could take the table pointer from hash_tables->table,
@@ -141,7 +141,7 @@ static void mysql_ha_close_table(THD *th
   if (*table_ptr)
   {
     (*table_ptr)->file->ha_index_or_rnd_end();
-    mdl_lock_data= (*table_ptr)->mdl_lock_data;
+    mdl_ticket= (*table_ptr)->mdl_ticket;
     pthread_mutex_lock(&LOCK_open);
     if (close_thread_table(thd, table_ptr))
     {
@@ -149,8 +149,8 @@ static void mysql_ha_close_table(THD *th
       broadcast_refresh();
     }
     pthread_mutex_unlock(&LOCK_open);
-    mdl_release_lock(&thd->handler_mdl_context, mdl_lock_data);
-    mdl_remove_lock(&thd->handler_mdl_context, mdl_lock_data);
+    thd->handler_mdl_context.release_lock(mdl_ticket);
+    thd->handler_mdl_context.remove_request(tables->mdl_request);
   }
   else if (tables->table)
   {
@@ -188,12 +188,12 @@ static void mysql_ha_close_table(THD *th
 bool mysql_ha_open(THD *thd, TABLE_LIST *tables, bool reopen)
 {
   TABLE_LIST    *hash_tables = NULL;
-  MDL_LOCK_DATA *mdl_lock_data;
-  char          *db, *name, *alias, *mdlkey;
+  char          *db, *name, *alias;
   uint          dblen, namelen, aliaslen, counter;
   int           error;
   TABLE         *backup_open_tables;
-  MDL_CONTEXT   backup_mdl_context;
+  MDL_context   backup_mdl_context;
+  MDL_request  *mdl_request;
   DBUG_ENTER("mysql_ha_open");
   DBUG_PRINT("enter",("'%s'.'%s' as '%s'  reopen: %d",
                       tables->db, tables->table_name, tables->alias,
@@ -236,8 +236,7 @@ bool mysql_ha_open(THD *thd, TABLE_LIST 
                           &db, (uint) dblen,
                           &name, (uint) namelen,
                           &alias, (uint) aliaslen,
-                          &mdl_lock_data, sizeof(MDL_LOCK_DATA),
-                          &mdlkey, MAX_MDLKEY_LENGTH,
+                          &mdl_request, sizeof(MDL_request),
                           NullS)))
     {
       DBUG_PRINT("exit",("ERROR"));
@@ -251,8 +250,8 @@ bool mysql_ha_open(THD *thd, TABLE_LIST 
     memcpy(hash_tables->db, tables->db, dblen);
     memcpy(hash_tables->table_name, tables->table_name, namelen);
     memcpy(hash_tables->alias, tables->alias, aliaslen);
-    mdl_init_lock(mdl_lock_data, mdlkey, 0, db, name);
-    hash_tables->mdl_lock_data= mdl_lock_data;
+    mdl_request->init(0, db, name);
+    hash_tables->mdl_request= mdl_request;
 
     /* add to hash */
     if (my_hash_insert(&thd->handler_tables_hash, (uchar*) hash_tables))
@@ -280,7 +279,7 @@ bool mysql_ha_open(THD *thd, TABLE_LIST 
   */
   backup_open_tables= thd->open_tables;
   thd->open_tables= NULL;
-  mdl_context_backup_and_reset(&thd->mdl_context, &backup_mdl_context);
+  thd->mdl_context.backup_and_reset(&backup_mdl_context);
 
   /*
     open_tables() will set 'hash_tables->table' if successful.
@@ -297,10 +296,10 @@ bool mysql_ha_open(THD *thd, TABLE_LIST 
     thd->open_tables->next= thd->handler_tables;
     thd->handler_tables= thd->open_tables;
   }
-  mdl_context_merge(&thd->handler_mdl_context, &thd->mdl_context);
+  thd->handler_mdl_context.merge(&thd->mdl_context);
 
   thd->open_tables= backup_open_tables;
-  mdl_context_restore(&thd->mdl_context, &backup_mdl_context);
+  thd->mdl_context.restore_from_backup(&backup_mdl_context);
 
   if (error)
     goto err;
@@ -500,7 +499,7 @@ retry:
 
   if (need_reopen)
   {
-    mysql_ha_close_table(thd, tables);
+    mysql_ha_close_table(thd, hash_tables);
     hash_tables->table= NULL;
     /*
       The lock might have been aborted, we need to manually reset
@@ -774,11 +773,11 @@ void mysql_ha_flush(THD *thd)
   {
     hash_tables= (TABLE_LIST*) my_hash_element(&thd->handler_tables_hash, i);
     /*
-      TABLE::mdl_lock_data is 0 for temporary tables so we need extra check.
+      TABLE::mdl_ticket is 0 for temporary tables so we need extra check.
     */
     if (hash_tables->table &&
-        (hash_tables->table->mdl_lock_data &&
-         mdl_has_pending_conflicting_lock(hash_tables->table->mdl_lock_data) ||
+        (hash_tables->table->mdl_ticket &&
+         hash_tables->table->mdl_ticket->has_pending_conflicting_lock() ||
          hash_tables->table->needs_reopen()))
     {
       mysql_ha_close_table(thd, hash_tables);

=== modified file 'sql/sql_insert.cc'
--- a/sql/sql_insert.cc	2009-02-23 14:53:18 +0000
+++ b/sql/sql_insert.cc	2009-03-04 10:04:58 +0000
@@ -2382,7 +2382,7 @@ pthread_handler_t handle_delayed_insert(
   thd->lex->set_stmt_unsafe();
   thd->set_current_stmt_binlog_row_based_if_mixed();
 
-  alloc_mdl_locks(&di->table_list, thd->mem_root);
+  alloc_mdl_requests(&di->table_list, thd->mem_root);
 
   if (di->open_and_lock_table())
     goto err;

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2009-02-23 14:53:18 +0000
+++ b/sql/sql_parse.cc	2009-03-04 13:33:47 +0000
@@ -1124,7 +1124,7 @@ bool dispatch_command(enum enum_server_c
       select_lex.table_list.link_in_list((uchar*) &table_list,
                                          (uchar**) &table_list.next_local);
     thd->lex->add_to_query_tables(&table_list);
-    alloc_mdl_locks(&table_list, thd->mem_root);
+    alloc_mdl_requests(&table_list, thd->mem_root);
 
     /* switch on VIEW optimisation: do not fill temporary tables */
     thd->lex->sql_command= SQLCOM_SHOW_FIELDS;
@@ -3516,7 +3516,7 @@ ddl_blocker_err:
     if (trans_commit_implicit(thd))
       goto error;
 
-    alloc_mdl_locks(all_tables, thd->locked_tables_list.locked_tables_root());
+    alloc_mdl_requests(all_tables, thd->locked_tables_list.locked_tables_root());
 
     thd->options|= OPTION_TABLE_LOCK;
     thd->in_lock_tables=1;
@@ -6142,9 +6142,9 @@ TABLE_LIST *st_select_lex::add_table_to_
   ptr->next_name_resolution_table= NULL;
   /* Link table in global list (all used tables) */
   lex->add_to_query_tables(ptr);
-  ptr->mdl_lock_data= mdl_alloc_lock(0 , ptr->db, ptr->table_name,
-                                thd->locked_tables_root ?
-                                thd->locked_tables_root : thd->mem_root);
+  ptr->mdl_request=
+    MDL_request::create(0, ptr->db, ptr->table_name, thd->locked_tables_root ?
+                        thd->locked_tables_root : thd->mem_root);
   DBUG_RETURN(ptr);
 }
 

=== modified file 'sql/sql_plist.h'
--- a/sql/sql_plist.h	2008-05-27 09:45:34 +0000
+++ b/sql/sql_plist.h	2009-03-04 13:31:31 +0000
@@ -79,6 +79,7 @@ public:
     **B::prev_ptr(a)= next;
   }
   inline T* head() { return first; }
+  inline const T *head() const { return first; }
   void swap(I_P_List<T,B> &rhs)
   {
     swap_variables(T *, first, rhs.first);
@@ -90,6 +91,7 @@ public:
 #ifndef _lint
   friend class I_P_List_iterator<T, B>;
 #endif
+  typedef I_P_List_iterator<T, B> Iterator;
 };
 
 

=== modified file 'sql/sql_plugin.cc'
--- a/sql/sql_plugin.cc	2009-02-16 21:18:45 +0000
+++ b/sql/sql_plugin.cc	2009-03-02 21:18:26 +0000
@@ -1343,7 +1343,7 @@ static void plugin_load(MEM_ROOT *tmp_ro
   tables.alias= tables.table_name= (char*)"plugin";
   tables.lock_type= TL_READ;
   tables.db= new_thd->db;
-  alloc_mdl_locks(&tables, tmp_root);
+  alloc_mdl_requests(&tables, tmp_root);
 
 #ifdef EMBEDDED_LIBRARY
   /*
@@ -1644,7 +1644,7 @@ bool mysql_install_plugin(THD *thd, cons
   if (check_table_access(thd, INSERT_ACL, &tables, FALSE, FALSE, 1))
     DBUG_RETURN(TRUE);
 
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   /* need to open before acquiring LOCK_plugin or it will deadlock */
   if (! (table = open_ltable(thd, &tables, TL_WRITE, 0)))
@@ -1709,7 +1709,7 @@ bool mysql_uninstall_plugin(THD *thd, co
   bzero(&tables, sizeof(tables));
   tables.db= (char *)"mysql";
   tables.table_name= tables.alias= (char *)"plugin";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   /* need to open before acquiring LOCK_plugin or it will deadlock */
   if (! (table= open_ltable(thd, &tables, TL_WRITE, 0)))

=== modified file 'sql/sql_servers.cc'
--- a/sql/sql_servers.cc	2009-01-27 02:08:48 +0000
+++ b/sql/sql_servers.cc	2009-03-02 21:18:26 +0000
@@ -233,7 +233,7 @@ bool servers_reload(THD *thd)
   tables[0].alias= tables[0].table_name= (char*) "servers";
   tables[0].db= (char*) "mysql";
   tables[0].lock_type= TL_READ;
-  alloc_mdl_locks(tables, thd->mem_root);
+  alloc_mdl_requests(tables, thd->mem_root);
 
   if (simple_open_n_lock_tables(thd, tables))
   {
@@ -364,7 +364,7 @@ insert_server(THD *thd, FOREIGN_SERVER *
   bzero((char*) &tables, sizeof(tables));
   tables.db= (char*) "mysql";
   tables.alias= tables.table_name= (char*) "servers";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   /* need to open before acquiring THR_LOCK_plugin or it will deadlock */
   if (! (table= open_ltable(thd, &tables, TL_WRITE, 0)))
@@ -583,7 +583,7 @@ int drop_server(THD *thd, LEX_SERVER_OPT
   bzero((char*) &tables, sizeof(tables));
   tables.db= (char*) "mysql";
   tables.alias= tables.table_name= (char*) "servers";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   rw_wrlock(&THR_LOCK_servers);
 
@@ -708,7 +708,7 @@ int update_server(THD *thd, FOREIGN_SERV
   bzero((char*) &tables, sizeof(tables));
   tables.db= (char*)"mysql";
   tables.alias= tables.table_name= (char*)"servers";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
 
   if (!(table= open_ltable(thd, &tables, TL_WRITE, 0)))
   {

=== modified file 'sql/sql_show.cc'
--- a/sql/sql_show.cc	2009-02-18 12:57:37 +0000
+++ b/sql/sql_show.cc	2009-03-04 13:31:31 +0000
@@ -3088,12 +3088,8 @@ uint get_table_open_method(TABLE_LIST *t
    Acquire high priority share metadata lock on a table.
 
    @param thd            Thread context.
-   @param mdl_lock_data  Pointer to memory to be used for MDL_LOCK_DATA
+   @param mdl_request    Pointer to memory to be used for MDL_request
                          object for a lock request.
-   @param mdlkey         Pointer to the buffer for key for the lock request
-                         (should be at least strlen(db) + strlen(name) + 2
-                         bytes, or, if the lengths are not known,
-                         MAX_MDLKEY_LENGTH)
    @param table          Table list element for the table
 
    @note This is an auxiliary function to be used in cases when we want to
@@ -3107,23 +3103,23 @@ uint get_table_open_method(TABLE_LIST *t
 */
 
 static bool
-acquire_high_prio_shared_mdl_lock(THD *thd, MDL_LOCK_DATA *mdl_lock_data,
-                                  char *mdlkey, TABLE_LIST *table)
+acquire_high_prio_shared_mdl_lock(THD *thd, MDL_request *mdl_request,
+                                  TABLE_LIST *table)
 {
   bool retry;
 
-  mdl_init_lock(mdl_lock_data, mdlkey, 0, table->db, table->table_name);
-  table->mdl_lock_data= mdl_lock_data;
-  mdl_add_lock(&thd->mdl_context, mdl_lock_data);
-  mdl_set_lock_type(mdl_lock_data, MDL_SHARED_HIGH_PRIO);
+  mdl_request->init(0, table->db, table->table_name);
+  table->mdl_request= mdl_request;
+  thd->mdl_context.add_request(mdl_request);
+  mdl_request->set_type(MDL_SHARED_HIGH_PRIO);
 
   while (1)
   {
-    if (mdl_acquire_shared_lock(&thd->mdl_context, mdl_lock_data, &retry))
+    if (thd->mdl_context.acquire_shared_lock(mdl_request, &retry))
     {
-      if (!retry || mdl_wait_for_locks(&thd->mdl_context))
+      if (!retry || thd->mdl_context.wait_for_locks())
       {
-        mdl_remove_all_locks(&thd->mdl_context);
+        thd->mdl_context.remove_all_requests();
         return TRUE;
       }
       continue;
@@ -3165,8 +3161,7 @@ static int fill_schema_table_from_frm(TH
   int error;
   char key[MAX_DBKEY_LENGTH];
   uint key_length;
-  MDL_LOCK_DATA mdl_lock_data;
-  char mdlkey[MAX_MDLKEY_LENGTH];
+  MDL_request mdl_request;
 
   bzero((char*) &table_list, sizeof(TABLE_LIST));
   bzero((char*) &tbl, sizeof(TABLE));
@@ -3179,8 +3174,7 @@ static int fill_schema_table_from_frm(TH
           simply obtaining internal lock of data-dictionary (ATM it
           is LOCK_open) instead of obtaning full-blown metadata lock.
   */
-  if (acquire_high_prio_shared_mdl_lock(thd, &mdl_lock_data, mdlkey,
-                                        &table_list))
+  if (acquire_high_prio_shared_mdl_lock(thd, &mdl_request, &table_list))
   {
     /*
       Some error occured (most probably we have been killed while
@@ -3264,8 +3258,8 @@ err_unlock:
   pthread_mutex_unlock(&LOCK_open);
 
 err:
-  mdl_release_lock(&thd->mdl_context, &mdl_lock_data);
-  mdl_remove_lock(&thd->mdl_context, &mdl_lock_data);
+  thd->mdl_context.release_lock(mdl_request.ticket);
+  thd->mdl_context.remove_request(&mdl_request);
   thd->clear_error();
   return res;
 }
@@ -7572,7 +7566,7 @@ bool show_create_trigger(THD *thd, const
 
   uint num_tables; /* NOTE: unused, only to pass to open_tables(). */
 
-  alloc_mdl_locks(lst, thd->mem_root);
+  alloc_mdl_requests(lst, thd->mem_root);
 
   if (open_tables(thd, &lst, &num_tables, 0))
   {

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2009-02-23 14:53:18 +0000
+++ b/sql/sql_table.cc	2009-03-04 13:33:47 +0000
@@ -1631,7 +1631,7 @@ int mysql_rm_table_part2(THD *thd, TABLE
             Since we don't acquire metadata lock if we have found temporary
             table, we should do something to avoid releasing it at the end.
           */
-          table->mdl_lock_data= 0;
+          table->mdl_request= NULL;
         }
         else
         {
@@ -1644,7 +1644,7 @@ int mysql_rm_table_part2(THD *thd, TABLE
                                                 table->table_name);
           if (!table->table)
             DBUG_RETURN(1);
-          table->mdl_lock_data= table->table->mdl_lock_data;
+          table->mdl_request->ticket= table->table->mdl_ticket;
         }
     }
   }
@@ -1877,15 +1877,14 @@ err:
     }
     for (table= tables; table; table= table->next_local)
     {
-      if (table->mdl_lock_data)
+      if (table->mdl_request)
       {
         /*
           Under LOCK TABLES we may have several instances of table open
           and locked and therefore have to remove several metadata lock
           requests associated with them.
         */
-        mdl_release_and_remove_all_locks_for_name(&thd->mdl_context,
-                                                  table->mdl_lock_data);
+        thd->mdl_context.release_all_locks_for_name(table->mdl_request->ticket);
       }
     }
   }
@@ -3762,30 +3761,31 @@ warn:
 
 static bool lock_table_name_if_not_cached(THD *thd, const char *db,
                                           const char *table_name,
-                                          MDL_LOCK_DATA **lock_data)
+                                          MDL_request **mdl_request)
 {
   bool conflict;
 
-  if (!(*lock_data= mdl_alloc_lock(0, db, table_name, thd->mem_root)))
+  if (!(*mdl_request= MDL_request::create(0, db, table_name, thd->mem_root)))
     return TRUE;
-  mdl_set_lock_type(*lock_data, MDL_EXCLUSIVE);
-  mdl_add_lock(&thd->mdl_context, *lock_data);
-  if (mdl_try_acquire_exclusive_lock(&thd->mdl_context, *lock_data,
-                                     &conflict))
+  (*mdl_request)->set_type(MDL_EXCLUSIVE);
+  thd->mdl_context.add_request(*mdl_request);
+  if (thd->mdl_context.try_acquire_exclusive_lock(*mdl_request, &conflict))
   {
     /*
       To simplify our life under LOCK TABLES we remove unsatisfied
       lock request from the context.
     */
-    mdl_remove_lock(&thd->mdl_context, *lock_data);
+    thd->mdl_context.remove_request(*mdl_request);
     if (!conflict)
     {
       /* Probably OOM. */
       return TRUE;
     }
     else
-      *lock_data= 0;
-  } else {
+      *mdl_request= NULL;
+  }
+  else
+  {
     DEBUG_SYNC(thd, "locked_table_name");
   }
   return FALSE;
@@ -3802,7 +3802,7 @@ bool mysql_create_table(THD *thd, const 
                         bool internal_tmp_table,
                         uint select_field_count)
 {
-  MDL_LOCK_DATA *target_lock_data= 0;
+  MDL_request *target_mdl_request= NULL;
   bool result;
   Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_create_table");
@@ -3831,12 +3831,12 @@ bool mysql_create_table(THD *thd, const 
 
   if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
   {
-    if (lock_table_name_if_not_cached(thd, db, table_name, &target_lock_data))
+    if (lock_table_name_if_not_cached(thd, db, table_name, &target_mdl_request))
     {
       result= TRUE;
       goto unlock;
     }
-    if (!target_lock_data)
+    if (!target_mdl_request)
     {
       if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
       {
@@ -3860,10 +3860,10 @@ bool mysql_create_table(THD *thd, const 
                                      select_field_count);
 
 unlock:
-  if (target_lock_data)
+  if (target_mdl_request)
   {
-    mdl_release_lock(&thd->mdl_context, target_lock_data);
-    mdl_remove_lock(&thd->mdl_context, target_lock_data);
+    thd->mdl_context.release_lock(target_mdl_request->ticket);
+    thd->mdl_context.remove_request(target_mdl_request);
   }
   pthread_mutex_lock(&LOCK_lock_db);
   if (!--creating_table && creating_database)
@@ -4026,7 +4026,7 @@ static int prepare_for_repair(THD *thd, 
   char from[FN_REFLEN],tmp[FN_REFLEN+32];
   const char **ext;
   MY_STAT stat_info;
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_request *mdl_request= NULL;
   enum enum_open_table_action ot_action_unused;
   DBUG_ENTER("prepare_for_repair");
   uint reopen_for_repair_flags= (MYSQL_LOCK_IGNORE_FLUSH |
@@ -4045,13 +4045,13 @@ static int prepare_for_repair(THD *thd, 
     uint key_length;
 
     key_length= create_table_def_key(thd, key, table_list, 0);
-    mdl_lock_data= mdl_alloc_lock(0, table_list->db, table_list->table_name,
-                                  thd->mem_root);
-    mdl_set_lock_type(mdl_lock_data, MDL_EXCLUSIVE);
-    mdl_add_lock(&thd->mdl_context, mdl_lock_data);
-    if (mdl_acquire_exclusive_locks(&thd->mdl_context))
+    mdl_request= MDL_request::create(0, table_list->db,
+                                     table_list->table_name, thd->mem_root);
+    mdl_request->set_type(MDL_EXCLUSIVE);
+    thd->mdl_context.add_request(mdl_request);
+    if (thd->mdl_context.acquire_exclusive_locks())
     {
-      mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+      thd->mdl_context.remove_request(mdl_request);
       DBUG_RETURN(0);
     }
 
@@ -4071,11 +4071,7 @@ static int prepare_for_repair(THD *thd, 
     }
     pthread_mutex_unlock(&LOCK_open);
     table= &tmp_table;
-    table_list->mdl_lock_data= mdl_lock_data;
-  }
-  else
-  {
-    mdl_lock_data= table->mdl_lock_data;
+    table_list->mdl_request= mdl_request;
   }
 
   /* A MERGE table must not come here. */
@@ -4186,10 +4182,10 @@ end:
     pthread_mutex_unlock(&LOCK_open);
   }
   /* In case of a temporary table there will be no metadata lock. */
-  if (error && mdl_lock_data)
+  if (error && mdl_request)
   {
-    mdl_release_lock(&thd->mdl_context, mdl_lock_data);
-    mdl_remove_lock(&thd->mdl_context, mdl_lock_data);
+    thd->mdl_context.release_lock(mdl_request->ticket);
+    thd->mdl_context.remove_request(mdl_request);
   }
   DBUG_RETURN(error);
 }
@@ -4888,7 +4884,7 @@ bool mysql_create_like_schema_frm(THD* t
 bool mysql_create_like_table(THD* thd, TABLE_LIST* table, TABLE_LIST* src_table,
                              HA_CREATE_INFO *create_info)
 {
-  MDL_LOCK_DATA *target_lock_data= 0;
+  MDL_request *target_mdl_request= NULL;
   char src_path[FN_REFLEN], dst_path[FN_REFLEN];
   uint dst_path_length;
   char *db= table->db;
@@ -4935,9 +4931,9 @@ bool mysql_create_like_table(THD* thd, T
   }
   else
   {
-    if (lock_table_name_if_not_cached(thd, db, table_name, &target_lock_data))
+    if (lock_table_name_if_not_cached(thd, db, table_name, &target_mdl_request))
       goto err;
-    if (!target_lock_data)
+    if (!target_mdl_request)
       goto table_exists;
     dst_path_length= build_table_filename(dst_path, sizeof(dst_path),
                                           db, table_name, reg_ext, 0);
@@ -4947,7 +4943,7 @@ bool mysql_create_like_table(THD* thd, T
       Make the metadata lock available to open_table() called to
       reopen the table down the road.
     */
-    table->mdl_lock_data= target_lock_data;
+    table->mdl_request= target_mdl_request;
   }
 
   DBUG_EXECUTE_IF("sleep_create_like_before_copy", my_sleep(6000000););
@@ -5111,10 +5107,10 @@ table_exists:
     my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
 
 err:
-  if (target_lock_data)
+  if (target_mdl_request)
   {
-    mdl_release_lock(&thd->mdl_context, target_lock_data);
-    mdl_remove_lock(&thd->mdl_context, target_lock_data);
+    thd->mdl_context.release_lock(target_mdl_request->ticket);
+    thd->mdl_context.remove_request(target_mdl_request);
   }
   DBUG_RETURN(res);
 }
@@ -6003,7 +5999,7 @@ mysql_fast_or_online_alter_table(THD *th
       Make the metadata lock available to open_table() called to
       reopen the table down the road.
     */
-    table_list->mdl_lock_data= table->mdl_lock_data;
+    table_list->mdl_request->ticket= table->mdl_ticket;
   }
 
   /*
@@ -6580,7 +6576,8 @@ bool mysql_alter_table(THD *thd,char *ne
                        uint order_num, ORDER *order, bool ignore)
 {
   TABLE *table, *new_table= 0;
-  MDL_LOCK_DATA *mdl_lock_data, *target_lock_data= 0;
+  MDL_ticket *mdl_ticket;
+  MDL_request *target_mdl_request= NULL;
   int error= 0;
   char tmp_name[80],old_name[32],new_name_buff[FN_REFLEN];
   char new_alias_buff[FN_REFLEN], *table_name, *db, *new_alias, *alias;
@@ -6753,7 +6750,7 @@ view_err:
                                         MYSQL_OPEN_TAKE_UPGRADABLE_MDL)))
     DBUG_RETURN(TRUE);
   table->use_all_columns();
-  mdl_lock_data= table->mdl_lock_data;
+  mdl_ticket= table->mdl_ticket;
 
   /*
     Prohibit changing of the UNION list of a non-temporary MERGE table
@@ -6806,9 +6803,9 @@ view_err:
       else
       {
         if (lock_table_name_if_not_cached(thd, new_db, new_name,
-                                          &target_lock_data))
+                                          &target_mdl_request))
           DBUG_RETURN(TRUE);
-        if (!target_lock_data)
+        if (!target_mdl_request)
         {
 	  my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_alias);
 	  DBUG_RETURN(TRUE);
@@ -6988,13 +6985,12 @@ view_err:
       */
       if (new_name != table_name || new_db != db)
       {
-        mdl_release_lock(&thd->mdl_context, target_lock_data);
-        mdl_remove_lock(&thd->mdl_context, target_lock_data);
-        mdl_release_and_remove_all_locks_for_name(&thd->mdl_context,
-                                                  mdl_lock_data);
+        thd->mdl_context.release_lock(target_mdl_request->ticket);
+        thd->mdl_context.remove_request(target_mdl_request);
+        thd->mdl_context.release_all_locks_for_name(mdl_ticket);
       }
       else
-        mdl_downgrade_exclusive_lock(&thd->mdl_context, mdl_lock_data);
+        mdl_ticket->downgrade_exclusive_lock();
     }
     DBUG_RETURN(error);
   }
@@ -7408,13 +7404,12 @@ end_online:
   {
     if ((new_name != table_name || new_db != db))
     {
-      mdl_release_lock(&thd->mdl_context, target_lock_data);
-      mdl_remove_lock(&thd->mdl_context, target_lock_data);
-      mdl_release_and_remove_all_locks_for_name(&thd->mdl_context,
-                                                mdl_lock_data);
+      thd->mdl_context.release_lock(target_mdl_request->ticket);
+      thd->mdl_context.remove_request(target_mdl_request);
+      thd->mdl_context.release_all_locks_for_name(mdl_ticket);
     }
     else
-      mdl_downgrade_exclusive_lock(&thd->mdl_context, mdl_lock_data);
+      mdl_ticket->downgrade_exclusive_lock();
   }
 
 end_temporary:
@@ -7468,10 +7463,10 @@ err:
                                  alter_info->datetime_field->field_name);
     thd->abort_on_warning= save_abort_on_warning;
   }
-  if (target_lock_data)
+  if (target_mdl_request)
   {
-    mdl_release_lock(&thd->mdl_context, target_lock_data);
-    mdl_remove_lock(&thd->mdl_context, target_lock_data);
+    thd->mdl_context.release_lock(target_mdl_request->ticket);
+    thd->mdl_context.remove_request(target_mdl_request);
   }
   DBUG_RETURN(TRUE);
 
@@ -7483,12 +7478,12 @@ err_with_mdl:
     tables and release the exclusive metadata lock.
   */
   thd->locked_tables_list.unlink_all_closed_tables();
-  if (target_lock_data)
+  if (target_mdl_request)
   {
-    mdl_release_lock(&thd->mdl_context, target_lock_data);
-    mdl_remove_lock(&thd->mdl_context, target_lock_data);
+    thd->mdl_context.release_lock(target_mdl_request->ticket);
+    thd->mdl_context.remove_request(target_mdl_request);
   }
-  mdl_release_and_remove_all_locks_for_name(&thd->mdl_context, mdl_lock_data);
+  thd->mdl_context.release_all_locks_for_name(mdl_ticket);
   DBUG_RETURN(TRUE);
 }
 /* mysql_alter_table */

=== modified file 'sql/sql_trigger.cc'
--- a/sql/sql_trigger.cc	2009-01-16 11:53:32 +0000
+++ b/sql/sql_trigger.cc	2009-03-04 13:31:31 +0000
@@ -329,6 +329,7 @@ bool mysql_create_or_drop_trigger(THD *t
   String stmt_query;
   bool need_start_waiting= FALSE;
   bool lock_upgrade_done= FALSE;
+  MDL_ticket *mdl_ticket= NULL;
 
   DBUG_ENTER("mysql_create_or_drop_trigger");
 
@@ -451,8 +452,6 @@ bool mysql_create_or_drop_trigger(THD *t
     if (!(tables->table= find_write_locked_table(thd->open_tables, tables->db,
                                                  tables->table_name)))
       goto end;
-    /* Later on we will need it to downgrade the lock */
-    tables->mdl_lock_data= tables->table->mdl_lock_data;
   }
   else
   {
@@ -465,6 +464,9 @@ bool mysql_create_or_drop_trigger(THD *t
   }
   table= tables->table;
 
+  /* Later on we will need it to downgrade the lock */
+  mdl_ticket= table->mdl_ticket;
+
   if (wait_while_table_is_used(thd, table, HA_EXTRA_FORCE_REOPEN))
     goto end;
 
@@ -511,8 +513,7 @@ end:
     TABLE instance created by open_n_lock_single_table() and metadata lock.
   */
   if (thd->locked_tables_mode && tables && lock_upgrade_done)
-    mdl_downgrade_exclusive_lock(&thd->mdl_context,
-                                 tables->mdl_lock_data);
+    mdl_ticket->downgrade_exclusive_lock();
 
   if (need_start_waiting)
     start_waiting_global_read_lock(thd);
@@ -1881,7 +1882,7 @@ bool Table_triggers_list::change_table_n
     In the future, only an exclusive metadata lock will be enough.
   */
 #ifndef DBUG_OFF
-  if (mdl_is_exclusive_lock_owner(&thd->mdl_context, 0, db, old_table))
+  if (thd->mdl_context.is_exclusive_lock_owner(0, db, old_table))
     safe_mutex_assert_owner(&LOCK_open);
 #endif
 

=== modified file 'sql/sql_udf.cc'
--- a/sql/sql_udf.cc	2009-01-27 02:08:48 +0000
+++ b/sql/sql_udf.cc	2009-03-02 21:18:26 +0000
@@ -141,7 +141,7 @@ void udf_init()
   tables.alias= tables.table_name= (char*) "func";
   tables.lock_type = TL_READ;
   tables.db= db;
-  alloc_mdl_locks(&tables, new_thd->mem_root);
+  alloc_mdl_requests(&tables, new_thd->mem_root);
 
   if (simple_open_n_lock_tables(new_thd, &tables))
   {
@@ -480,7 +480,7 @@ int mysql_create_function(THD *thd,udf_f
   bzero((char*) &tables,sizeof(tables));
   tables.db= (char*) "mysql";
   tables.table_name= tables.alias= (char*) "func";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
   /* Allow creation of functions even if we can't open func table */
   if (!(table = open_ltable(thd, &tables, TL_WRITE, 0)))
     goto err;
@@ -559,7 +559,7 @@ int mysql_drop_function(THD *thd,const L
   bzero((char*) &tables,sizeof(tables));
   tables.db=(char*) "mysql";
   tables.table_name= tables.alias= (char*) "func";
-  alloc_mdl_locks(&tables, thd->mem_root);
+  alloc_mdl_requests(&tables, thd->mem_root);
   if (!(table = open_ltable(thd, &tables, TL_WRITE, 0)))
     goto err;
   table->use_all_columns();

=== modified file 'sql/table.cc'
--- a/sql/table.cc	2009-02-23 14:53:18 +0000
+++ b/sql/table.cc	2009-03-04 13:33:47 +0000
@@ -475,6 +475,8 @@ void free_table_share(TABLE_SHARE *share
   if (share->tmp_table == NO_TMP_TABLE)
     pthread_mutex_destroy(&share->LOCK_ha_data);
   my_hash_free(&share->name_hash);
+  if (share->ha_data_destroy)
+    share->ha_data_destroy(share->ha_data);
 
   plugin_unlock(NULL, share->db_plugin);
   share->db_plugin= NULL;
@@ -1087,7 +1089,7 @@ static int open_binary_frm(THD *thd, TAB
       next_chunk+= 2 + share->comment.length;
     }
     DBUG_ASSERT (next_chunk <= buff_end);
-    if (share->mysql_version >= MYSQL_VERSION_TABLESPACE_IN_FRM)
+    if (share->mysql_version >= MYSQL_VERSION_TABLESPACE_IN_FRM_CGE)
     {
       /*
        New frm format in mysql_version 5.2.5 (originally in
@@ -4919,12 +4921,11 @@ size_t max_row_length(TABLE *table, cons
    objects for all elements of table list.
 */
 
-void alloc_mdl_locks(TABLE_LIST *table_list, MEM_ROOT *root)
+void alloc_mdl_requests(TABLE_LIST *table_list, MEM_ROOT *root)
 {
   for ( ; table_list ; table_list= table_list->next_global)
-    table_list->mdl_lock_data= mdl_alloc_lock(0, table_list->db,
-                                              table_list->table_name,
-                                              root);
+    table_list->mdl_request=
+      MDL_request::create(0, table_list->db, table_list->table_name, root);
 }
 
 

=== modified file 'sql/table.h'
--- a/sql/table.h	2009-02-13 16:30:54 +0000
+++ b/sql/table.h	2009-03-04 20:29:16 +0000
@@ -27,7 +27,8 @@ class st_select_lex;
 class partition_info;
 class COND_EQUAL;
 class Security_context;
-struct MDL_LOCK_DATA;
+class MDL_request;
+class MDL_ticket;
 
 /*************************************************************************/
 
@@ -431,6 +432,7 @@ struct TABLE_SHARE
 
   /** place to store storage engine specific data */
   void *ha_data;
+  void (*ha_data_destroy)(void *); /* An optional destructor for ha_data */
 
 
   /*
@@ -789,7 +791,7 @@ public:
   partition_info *part_info;            /* Partition related information */
   bool no_partitions_used; /* If true, all partitions have been pruned away */
 #endif
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_ticket *mdl_ticket;
 
   bool fill_item_list(List<Item> *item_list) const;
   void reset_item_list(List<Item> *item_list) const;
@@ -1371,7 +1373,7 @@ struct TABLE_LIST
   uint table_open_method;
   enum enum_schema_table_state schema_table_state;
 
-  MDL_LOCK_DATA *mdl_lock_data;
+  MDL_request *mdl_request;
 
   void calc_md5(char *buffer);
   void set_underlying_merge();
@@ -1756,5 +1758,5 @@ static inline void dbug_tmp_restore_colu
 size_t max_row_length(TABLE *table, const uchar *data);
 
 
-void alloc_mdl_locks(TABLE_LIST *table_list, MEM_ROOT *root);
+void alloc_mdl_requests(TABLE_LIST *table_list, MEM_ROOT *root);
 

=== modified file 'storage/myisammrg/ha_myisammrg.cc'
--- a/storage/myisammrg/ha_myisammrg.cc	2009-02-13 12:48:06 +0000
+++ b/storage/myisammrg/ha_myisammrg.cc	2009-03-04 13:31:31 +0000
@@ -434,16 +434,15 @@ int ha_myisammrg::add_children_list(void
     /* Copy select_lex. Used in unique_table() at least. */
     child_l->select_lex= parent_l->select_lex;
 
-    child_l->mdl_lock_data= NULL; /* Safety, if alloc_mdl_locks fails. */
+    child_l->mdl_request= NULL; /* Safety, if alloc_mdl_requests fails. */
 
     /* Break when this was the last child. */
     if (&child_l->next_global == this->children_last_l)
       break;
   }
 
-  alloc_mdl_locks(children_l,
-                  thd->locked_tables_root ? thd->locked_tables_root :
-                  thd->mem_root);
+  alloc_mdl_requests(children_l, thd->locked_tables_root ?
+                     thd->locked_tables_root : thd->mem_root);
 
   /* Insert children into the table list. */
   if (parent_l->next_global)

=== modified file 'tests/mysql_client_test.c'
--- a/tests/mysql_client_test.c	2009-02-16 21:18:45 +0000
+++ b/tests/mysql_client_test.c	2009-02-25 10:03:18 +0000
@@ -2039,6 +2039,66 @@ static void test_wl4435()
   }
 }
 
+static void test_wl4435_2()
+{
+  MYSQL_STMT *stmt;
+  int  i;
+  int  rc;
+  char query[MAX_TEST_QUERY_LENGTH];
+
+  myheader("test_wl4435_2");
+  mct_start_logging("test_wl4435_2");
+
+  /*
+    Do a few iterations so that we catch any problem with incorrect
+    handling/flushing prepared statement results.
+  */
+
+  for (i= 0; i < 10; ++i)
+  {
+    /*
+      Prepare a procedure. That can be moved out of the loop, but it was
+      left in the loop for the sake of having as many statements as
+      possible.
+    */
+
+    rc= mysql_query(mysql, "DROP PROCEDURE IF EXISTS p1");
+    myquery(rc);
+
+    rc= mysql_query(mysql,
+      "CREATE PROCEDURE p1()"
+      "BEGIN "
+      "  SELECT 1; "
+      "  SELECT 2, 3 UNION SELECT 4, 5; "
+      "  SELECT 6, 7, 8; "
+      "END");
+    myquery(rc);
+
+    /* Invoke a procedure, that returns several result sets. */
+
+    strmov(query, "CALL p1()");
+    stmt= mysql_simple_prepare(mysql, query);
+    check_stmt(stmt);
+
+    /* Execute! */
+
+    rc= mysql_stmt_execute(stmt);
+    check_execute(stmt, rc);
+
+    /* Flush all the results. */
+
+    mysql_stmt_close(stmt);
+
+    /* Clean up. */
+    rc= mysql_commit(mysql);
+    myquery(rc);
+
+    rc= mysql_query(mysql, "DROP PROCEDURE p1");
+    myquery(rc);
+  }
+}
+
+
 /* Test simple prepare field results */
 
 static void test_prepare_field_result()
@@ -18957,6 +19017,7 @@ static struct my_tests_st my_tests[]= {
   { "test_bug36004", test_bug36004 },
   { "test_wl4284_1", test_wl4284_1 },
   { "test_wl4435",   test_wl4435 },
+  { "test_wl4435_2", test_wl4435_2 },
   { "test_bug38486", test_bug38486 },
   { "test_bug33831", test_bug33831 },
   { "test_bug40365", test_bug40365 },

Thread
bzr push into mysql-6.0 branch (alik:2742)Alexander Nozdrin6 Mar