MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:Konstantin Osipov Date:August 12 2010 1:51pm
Subject:bzr commit into mysql-5.5-bugfixing branch (kostja:3101) Bug#52044
View as plain text  
#At file:///opt/local/work/5.5-runtime/ based on revid:kostja@stripped

 3101 Konstantin Osipov	2010-08-12 [merge]
      Commit on behalf of Dmitry Lenev.
      Merge his patch for Bug#52044 into 5.5, and apply 
      review comments.

    modified:
      mysql-test/include/handler.inc
      mysql-test/r/flush.result
      mysql-test/r/mdl_sync.result
      mysql-test/suite/perfschema/r/dml_setup_instruments.result
      mysql-test/suite/perfschema/r/server_init.result
      mysql-test/t/flush.test
      mysql-test/t/kill.test
      mysql-test/t/lock_multi.test
      mysql-test/t/mdl_sync.test
      mysys/thr_rwlock.c
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster_binlog.cc
      sql/lock.cc
      sql/mdl.cc
      sql/mdl.h
      sql/mysqld.cc
      sql/mysqld.h
      sql/sql_base.cc
      sql/sql_base.h
      sql/sql_handler.cc
      sql/sql_insert.cc
      sql/sql_parse.cc
      sql/sql_show.cc
      sql/sql_yacc.yy
      sql/sys_vars.cc
      sql/table.cc
      sql/table.h
=== modified file 'mysql-test/include/handler.inc'
--- a/mysql-test/include/handler.inc	2010-08-06 11:29:37 +0000
+++ b/mysql-test/include/handler.inc	2010-08-12 13:50:23 +0000
@@ -523,7 +523,7 @@ connection waiter;
 --echo connection: waiter 
 let $wait_condition=
   select count(*) = 1 from information_schema.processlist
-  where state = "Flushing tables";
+  where state = "Waiting for table flush";
 --source include/wait_condition.inc
 connection default;
 --echo connection: default

=== modified file 'mysql-test/r/flush.result'
--- a/mysql-test/r/flush.result	2010-03-10 14:35:25 +0000
+++ b/mysql-test/r/flush.result	2010-08-12 13:50:23 +0000
@@ -205,6 +205,51 @@ a
 insert into t2 (a) values (3);
 # --> connection default;
 unlock tables;
+#
+# Check that "FLUSH TABLES <list> WITH READ LOCK" is
+# compatible with active "FLUSH TABLES WITH READ LOCK".
+# Vice versa it is not true, since tables read-locked by
+# "FLUSH TABLES <list> WITH READ LOCK" can't be flushed.
+flush tables with read lock;
+# --> connection con1;
+flush table t1 with read lock;
+select * from t1;
+a
+1
+unlock tables;
+# --> connection default;
+unlock tables;
+#
+# Check that FLUSH TABLES t1 WITH READ LOCK
+# does not conflict with an existing FLUSH TABLES t2
+# WITH READ LOCK.
+#
+flush table t1 with read lock;
+# --> connection con1
+flush table t2 with read lock;
+unlock tables;
+# --> connection default
+unlock tables;
+#
+# Check that FLUSH TABLES t1 WITH READ LOCK
+# does not conflict with SET GLOBAL read_only=1.
+#
+set global read_only=1;
+# connection con1
+flush table t1 with read lock;
+unlock tables;
+# connection default
+set global read_only=0;
+#
+# Check that it's possible to read-lock 
+# tables locked with FLUSH TABLE <list> WITH READ LOCK.
+#
+flush tables t1, t2 with read lock;
+# connection con1
+lock table t1 read, t2 read;
+unlock tables;
+# connection default
+unlock tables;
 # --> connection con1
 drop table t1, t2, t3;
 #

=== modified file 'mysql-test/r/mdl_sync.result'
--- a/mysql-test/r/mdl_sync.result	2010-07-02 09:26:27 +0000
+++ b/mysql-test/r/mdl_sync.result	2010-08-12 13:50:23 +0000
@@ -2034,6 +2034,155 @@ set debug_sync='now SIGNAL go2';
 # Switching to connection 'default'.
 # Reaping ALTER. It should succeed and not produce ER_LOCK_DEADLOCK.
 drop table t1;
+#
+# Now, test for a situation in which deadlock involves waiting not
+# only in MDL subsystem but also for TDC. Such deadlocks should be
+# successfully detected. If possible, they should be resolved without
+# resorting to ER_LOCK_DEADLOCK error.
+#
+create table t1(i int);
+create table t2(j int);
+#
+# First, let us check how we handle a simple scenario involving
+# waits in MDL and TDC.
+#
+set debug_sync= 'RESET';
+# Switching to connection 'deadlock_con1'.
+# Start a statement, which will acquire SR metadata lock on t1, open it
+# and then stop, before trying to acquire SW lock on t2 and opening it.
+set debug_sync='open_tables_after_open_and_process_table SIGNAL parked WAIT_FOR go';
+# Sending:
+select * from t1 where i in (select j from t2 for update);
+# Switching to connection 'deadlock_con2'.
+# Wait till the above SELECT stops.
+set debug_sync='now WAIT_FOR parked';
+# The below FLUSH TABLES WITH READ LOCK should acquire
+# SNW locks on t1 and t2 and wait till SELECT closes t1.
+# Sending:
+flush tables t1, t2 with read lock;
+# Switching to connection 'deadlock_con3'.
+# Wait until FLUSH TABLES WITH t1, t2 READ LOCK starts waiting
+# for SELECT to close t1.
+# Resume SELECT, so it tries to acquire SW lock on t1 and blocks,
+# creating a deadlock. This deadlock should be detected and resolved
+# by backing-off SELECT. As a result FTWRL should be able to finish.
+set debug_sync='now SIGNAL go';
+# Switching to connection 'deadlock_con2'.
+# Reap FLUSH TABLES WITH READ LOCK.
+unlock tables;
+# Switching to connection 'deadlock_con1'.
+# Reap SELECT.
+i
+#
+# The same scenario with a slightly different order of events
+# which emphasizes that setting correct deadlock detector weights
+# for flush waits is important.
+#
+set debug_sync= 'RESET';
+# Switching to connection 'deadlock_con2'.
+set debug_sync='flush_tables_with_read_lock_after_acquire_locks SIGNAL parked WAIT_FOR go';
+# The below FLUSH TABLES WITH READ LOCK should acquire
+# SNW locks on t1 and t2 and wait on debug sync point.
+# Sending:
+flush tables t1, t2 with read lock;
+# Switching to connection 'deadlock_con1'.
+# Wait till FLUSH TABLE WITH READ LOCK stops.
+set debug_sync='now WAIT_FOR parked';
+# Start statement which will acquire SR metadata lock on t1, open
+# it and then will block while trying to acquire SW lock on t2.
+# Sending:
+select * from t1 where i in (select j from t2 for update);
+# Switching to connection 'deadlock_con3'.
+# Wait till the above SELECT blocks.
+# Resume FLUSH TABLES, so it tries to flush t1, thus creating
+# a deadlock. This deadlock should be detected and resolved by
+# backing-off SELECT. As a result FTWRL should be able to finish.
+set debug_sync='now SIGNAL go';
+# Switching to connection 'deadlock_con2'.
+# Reap FLUSH TABLES WITH READ LOCK.
+unlock tables;
+# Switching to connection 'deadlock_con1'.
+# Reap SELECT.
+i
+#
+# Now a more complex scenario involving two connections
+# waiting for MDL and one for TDC.
+#
+set debug_sync= 'RESET';
+# Switching to connection 'deadlock_con1'.
+# Start a statement which will acquire SR metadata lock on t2, open it
+# and then stop, before trying to acquire SR on t1 and opening it.
+set debug_sync='open_tables_after_open_and_process_table SIGNAL parked WAIT_FOR go';
+# Sending:
+select * from t2, t1;
+# Switching to connection 'deadlock_con2'.
+# Wait till the above SELECT stops.
+set debug_sync='now WAIT_FOR parked';
+# The below FLUSH TABLES WITH READ LOCK should acquire
+# SNW locks on t2 and wait till SELECT closes t2.
+# Sending:
+flush tables t2 with read lock;
+# Switching to connection 'deadlock_con3'.
+# Wait until FLUSH TABLES WITH READ LOCK starts waiting
+# for SELECT to close t2.
+# The below DROP TABLES should acquire X lock on t1 and start
+# waiting for X lock on t2.
+# Sending:
+drop tables t1, t2;
+# Switching to connection 'default'.
+# Wait until DROP TABLES starts waiting for X lock on t2.
+# Resume SELECT, so it tries to acquire SR lock on t1 and blocks,
+# creating a deadlock. This deadlock should be detected and resolved
+# by backing-off SELECT. As a result, FTWRL should be able to finish.
+set debug_sync='now SIGNAL go';
+# Switching to connection 'deadlock_con2'.
+# Reap FLUSH TABLES WITH READ LOCK.
+# Unblock DROP TABLES.
+unlock tables;
+# Switching to connection 'deadlock_con3'.
+# Reap DROP TABLES.
+# Switching to connection 'deadlock_con1'.
+# Reap SELECT. It should emit error about missing table.
+ERROR 42S02: Table 'test.t2' doesn't exist
+# Switching to connection 'default'.
+set debug_sync= 'RESET';
+#
+# Test for a scenario in which FLUSH TABLES <list> WITH READ LOCK
+# used to erroneously release metadata locks.
+# 
+drop tables if exists t1, t2;
+set debug_sync= 'RESET';
+create table t1(i int);
+create table t2(j int);
+# Switching to connection 'con2'.
+set debug_sync='open_tables_after_open_and_process_table SIGNAL parked WAIT_FOR go';
+# The below FLUSH TABLES <list> WITH READ LOCK should acquire
+# SNW locks on t1 and t2, open table t1 and block on the debug
+# sync point.
+# Sending:
+flush tables t1, t2 with read lock;
+# Switching to connection 'con1'.
+# Wait till FLUSH TABLES <list> WITH READ LOCK stops.
+set debug_sync='now WAIT_FOR parked';
+# Start a statement which will flush all tables and thus
+# invalidate table t1 open by FLUSH TABLES <list> WITH READ LOCK.
+# Sending:
+flush tables;
+# Switching to connection 'default'.
+# Wait till the above FLUSH TABLES blocks.
+# Resume FLUSH TABLES <list> WITH READ LOCK, so it tries to open t2
+# discovers that its t1 is obsolete and tries to reopen all tables.
+# Such reopen should not cause releasing of SNW metadata locks
+# which would result in assertion failures.
+set debug_sync='now SIGNAL go';
+# Switching to connection 'con2'.
+# Reap FLUSH TABLES <list> WITH READ LOCK.
+unlock tables;
+# Switching to connection 'con1'.
+# Reap FLUSH TABLES.
+# Clean-up.
+# Switching to connection 'default'.
+drop tables t1, t2;
 set debug_sync= 'RESET';
 #
 # Test for bug #46748 "Assertion in MDL_context::wait_for_locks()

=== modified file 'mysql-test/suite/perfschema/r/dml_setup_instruments.result'
--- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result	2010-06-07 07:06:55 +0000
+++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result	2010-07-27 13:34:58 +0000
@@ -40,12 +40,12 @@ wait/synch/cond/sql/COND_flush_thread_ca
 wait/synch/cond/sql/COND_global_read_lock	YES	YES
 wait/synch/cond/sql/COND_manager	YES	YES
 wait/synch/cond/sql/COND_queue_state	YES	YES
-wait/synch/cond/sql/COND_refresh	YES	YES
 wait/synch/cond/sql/COND_rpl_status	YES	YES
 wait/synch/cond/sql/COND_server_started	YES	YES
 wait/synch/cond/sql/COND_thread_cache	YES	YES
 wait/synch/cond/sql/COND_thread_count	YES	YES
 wait/synch/cond/sql/Delayed_insert::cond	YES	YES
+wait/synch/cond/sql/Delayed_insert::cond_client	YES	YES
 select * from performance_schema.SETUP_INSTRUMENTS
 where name='Wait';
 select * from performance_schema.SETUP_INSTRUMENTS

=== modified file 'mysql-test/suite/perfschema/r/server_init.result'
--- a/mysql-test/suite/perfschema/r/server_init.result	2010-07-27 14:32:42 +0000
+++ b/mysql-test/suite/perfschema/r/server_init.result	2010-08-12 13:50:23 +0000
@@ -182,7 +182,7 @@ count(name)
 select count(name) from COND_INSTANCES
 where name like "wait/synch/cond/sql/COND_refresh";
 count(name)
-1
+0
 select count(name) from COND_INSTANCES
 where name like "wait/synch/cond/sql/COND_thread_count";
 count(name)

=== modified file 'mysql-test/t/flush.test'
--- a/mysql-test/t/flush.test	2010-03-10 14:35:25 +0000
+++ b/mysql-test/t/flush.test	2010-08-12 13:50:23 +0000
@@ -318,6 +318,58 @@ insert into t2 (a) values (3);
 --echo # --> connection default;
 connection default;
 unlock tables;
+--echo #
+--echo # Check that "FLUSH TABLES <list> WITH READ LOCK" is
+--echo # compatible with active "FLUSH TABLES WITH READ LOCK".
+--echo # Vice versa it is not true, since tables read-locked by
+--echo # "FLUSH TABLES <list> WITH READ LOCK" can't be flushed.
+flush tables with read lock;
+--echo # --> connection con1;
+connection con1;
+flush table t1 with read lock;
+select * from t1;
+unlock tables;
+--echo # --> connection default;
+connection default;
+unlock tables;
+--echo #
+--echo # Check that FLUSH TABLES t1 WITH READ LOCK
+--echo # does not conflict with an existing FLUSH TABLES t2
+--echo # WITH READ LOCK.
+--echo #
+flush table t1 with read lock;
+--echo # --> connection con1
+connection con1;
+flush table t2 with read lock;
+unlock tables;
+--echo # --> connection default
+connection default;
+unlock tables;
+--echo #
+--echo # Check that FLUSH TABLES t1 WITH READ LOCK
+--echo # does not conflict with SET GLOBAL read_only=1.
+--echo #
+set global read_only=1;
+--echo # connection con1
+connection con1;
+flush table t1 with read lock;
+unlock tables;
+--echo # connection default
+connection default;
+set global read_only=0;
+--echo #
+--echo # Check that it's possible to read-lock 
+--echo # tables locked with FLUSH TABLE <list> WITH READ LOCK.
+--echo #
+flush tables t1, t2 with read lock;
+--echo # connection con1
+connection con1;
+lock table t1 read, t2 read;
+unlock tables;
+--echo # connection default
+connection default;
+unlock tables;
+
 --echo # --> connection con1
 connection con1;
 disconnect con1;

=== modified file 'mysql-test/t/kill.test'
--- a/mysql-test/t/kill.test	2010-08-06 11:29:37 +0000
+++ b/mysql-test/t/kill.test	2010-08-12 13:50:23 +0000
@@ -537,7 +537,7 @@ connection ddl;
 connection dml;
 let $wait_condition=
   select count(*) = 1 from information_schema.processlist
-  where state = "Flushing tables" and
+  where state = "Waiting for table flush" and
         info = "flush tables";
 --source include/wait_condition.inc
 --send select * from t1

=== modified file 'mysql-test/t/lock_multi.test'
--- a/mysql-test/t/lock_multi.test	2010-08-06 11:29:37 +0000
+++ b/mysql-test/t/lock_multi.test	2010-08-12 13:50:23 +0000
@@ -1000,7 +1000,7 @@ connection con3;
 connection con2;
 let $wait_condition=
   SELECT COUNT(*) = 1 FROM information_schema.processlist
-  WHERE state = "Flushing tables" AND info = "FLUSH TABLES";
+  WHERE state = "Waiting for table flush" AND info = "FLUSH TABLES";
 --source include/wait_condition.inc
 --error ER_LOCK_WAIT_TIMEOUT
 SELECT * FROM t1;

=== modified file 'mysql-test/t/mdl_sync.test'
--- a/mysql-test/t/mdl_sync.test	2010-08-10 11:16:44 +0000
+++ b/mysql-test/t/mdl_sync.test	2010-08-12 13:50:23 +0000
@@ -2915,6 +2915,188 @@ connection default;
 
 drop table t1;
 
+--echo #
+--echo # Now, test for a situation in which deadlock involves waiting not
+--echo # only in MDL subsystem but also for TDC. Such deadlocks should be
+--echo # successfully detected. If possible, they should be resolved without
+--echo # resorting to ER_LOCK_DEADLOCK error.
+--echo #
+create table t1(i int);
+create table t2(j int);
+
+--echo #
+--echo # First, let us check how we handle a simple scenario involving
+--echo # waits in MDL and TDC.
+--echo #
+set debug_sync= 'RESET';
+
+--echo # Switching to connection 'deadlock_con1'.
+connection deadlock_con1;
+--echo # Start a statement, which will acquire SR metadata lock on t1, open it
+--echo # and then stop, before trying to acquire SW lock on t2 and opening it.
+set debug_sync='open_tables_after_open_and_process_table SIGNAL parked WAIT_FOR go';
+--echo # Sending:
+--send select * from t1 where i in (select j from t2 for update)
+
+--echo # Switching to connection 'deadlock_con2'.
+connection deadlock_con2;
+--echo # Wait till the above SELECT stops.
+set debug_sync='now WAIT_FOR parked';
+--echo # The below FLUSH TABLES WITH READ LOCK should acquire
+--echo # SNW locks on t1 and t2 and wait till SELECT closes t1.
+--echo # Sending:
+send flush tables t1, t2 with read lock;
+
+--echo # Switching to connection 'deadlock_con3'.
+connection deadlock_con3;
+--echo # Wait until FLUSH TABLES WITH t1, t2 READ LOCK starts waiting
+--echo # for SELECT to close t1.
+let $wait_condition=
+  select count(*) = 1 from information_schema.processlist
+  where state = "Waiting for table flush" and
+        info = "flush tables t1, t2 with read lock";
+--source include/wait_condition.inc
+
+--echo # Resume SELECT, so it tries to acquire SW lock on t1 and blocks,
+--echo # creating a deadlock. This deadlock should be detected and resolved
+--echo # by backing-off SELECT. As a result FTWRL should be able to finish.
+set debug_sync='now SIGNAL go';
+
+--echo # Switching to connection 'deadlock_con2'.
+connection deadlock_con2;
+--echo # Reap FLUSH TABLES WITH READ LOCK.
+reap;
+unlock tables;
+
+--echo # Switching to connection 'deadlock_con1'.
+connection deadlock_con1;
+--echo # Reap SELECT.
+reap;
+
+--echo #
+--echo # The same scenario with a slightly different order of events
+--echo # which emphasizes that setting correct deadlock detector weights
+--echo # for flush waits is important.
+--echo #
+set debug_sync= 'RESET';
+
+--echo # Switching to connection 'deadlock_con2'.
+connection deadlock_con2;
+set debug_sync='flush_tables_with_read_lock_after_acquire_locks SIGNAL parked WAIT_FOR go';
+
+--echo # The below FLUSH TABLES WITH READ LOCK should acquire
+--echo # SNW locks on t1 and t2 and wait on debug sync point.
+--echo # Sending:
+send flush tables t1, t2 with read lock;
+
+--echo # Switching to connection 'deadlock_con1'.
+connection deadlock_con1;
+--echo # Wait till FLUSH TABLE WITH READ LOCK stops.
+set debug_sync='now WAIT_FOR parked';
+
+--echo # Start statement which will acquire SR metadata lock on t1, open
+--echo # it and then will block while trying to acquire SW lock on t2.
+--echo # Sending:
+send select * from t1 where i in (select j from t2 for update);
+
+--echo # Switching to connection 'deadlock_con3'.
+connection deadlock_con3;
+--echo # Wait till the above SELECT blocks.
+let $wait_condition=
+  select count(*) = 1 from information_schema.processlist
+  where state = "Waiting for table metadata lock" and
+        info = "select * from t1 where i in (select j from t2 for update)";
+--source include/wait_condition.inc
+
+--echo # Resume FLUSH TABLES, so it tries to flush t1, thus creating
+--echo # a deadlock. This deadlock should be detected and resolved by
+--echo # backing-off SELECT. As a result FTWRL should be able to finish.
+set debug_sync='now SIGNAL go';
+
+--echo # Switching to connection 'deadlock_con2'.
+connection deadlock_con2;
+--echo # Reap FLUSH TABLES WITH READ LOCK.
+reap;
+unlock tables;
+
+--echo # Switching to connection 'deadlock_con1'.
+connection deadlock_con1;
+--echo # Reap SELECT.
+reap;
+
+--echo #
+--echo # Now a more complex scenario involving two connections
+--echo # waiting for MDL and one for TDC.
+--echo #
+set debug_sync= 'RESET';
+
+--echo # Switching to connection 'deadlock_con1'.
+connection deadlock_con1;
+--echo # Start a statement which will acquire SR metadata lock on t2, open it
+--echo # and then stop, before trying to acquire SR on t1 and opening it.
+set debug_sync='open_tables_after_open_and_process_table SIGNAL parked WAIT_FOR go';
+--echo # Sending:
+send select * from t2, t1;
+
+--echo # Switching to connection 'deadlock_con2'.
+connection deadlock_con2;
+--echo # Wait till the above SELECT stops.
+set debug_sync='now WAIT_FOR parked';
+--echo # The below FLUSH TABLES WITH READ LOCK should acquire
+--echo # SNW locks on t2 and wait till SELECT closes t2.
+--echo # Sending:
+send flush tables t2 with read lock;
+
+--echo # Switching to connection 'deadlock_con3'.
+connection deadlock_con3;
+--echo # Wait until FLUSH TABLES WITH READ LOCK starts waiting
+--echo # for SELECT to close t2.
+let $wait_condition=
+  select count(*) = 1 from information_schema.processlist
+  where state = "Waiting for table flush" and
+        info = "flush tables t2 with read lock";
+--source include/wait_condition.inc
+
+--echo # The below DROP TABLES should acquire X lock on t1 and start
+--echo # waiting for X lock on t2.
+--echo # Sending:
+send drop tables t1, t2;
+
+--echo # Switching to connection 'default'.
+connection default;
+--echo # Wait until DROP TABLES starts waiting for X lock on t2.
+let $wait_condition=
+  select count(*) = 1 from information_schema.processlist
+  where state = "Waiting for table metadata lock" and
+        info = "drop tables t1, t2";
+--source include/wait_condition.inc
+
+--echo # Resume SELECT, so it tries to acquire SR lock on t1 and blocks,
+--echo # creating a deadlock. This deadlock should be detected and resolved
+--echo # by backing-off SELECT. As a result, FTWRL should be able to finish.
+set debug_sync='now SIGNAL go';
+
+--echo # Switching to connection 'deadlock_con2'.
+connection deadlock_con2;
+--echo # Reap FLUSH TABLES WITH READ LOCK.
+reap;
+--echo # Unblock DROP TABLES.
+unlock tables;
+
+--echo # Switching to connection 'deadlock_con3'.
+connection deadlock_con3;
+--echo # Reap DROP TABLES.
+reap;
+
+--echo # Switching to connection 'deadlock_con1'.
+connection deadlock_con1;
+--echo # Reap SELECT. It should emit error about missing table.
+--error ER_NO_SUCH_TABLE
+reap;
+
+--echo # Switching to connection 'default'.
+connection default;
+
 set debug_sync= 'RESET';
 
 disconnect deadlock_con1;
@@ -2923,6 +3105,75 @@ disconnect deadlock_con3;
 
 
 --echo #
+--echo # Test for a scenario in which FLUSH TABLES <list> WITH READ LOCK
+--echo # used to erroneously release metadata locks.
+--echo # 
+connect(con1,localhost,root,,);
+connect(con2,localhost,root,,);
+connection default;
+--disable_warnings
+drop tables if exists t1, t2;
+--enable_warnings
+set debug_sync= 'RESET';
+create table t1(i int);
+create table t2(j int);
+
+--echo # Switching to connection 'con2'.
+connection con2;
+set debug_sync='open_tables_after_open_and_process_table SIGNAL parked WAIT_FOR go';
+
+--echo # The below FLUSH TABLES <list> WITH READ LOCK should acquire
+--echo # SNW locks on t1 and t2, open table t1 and block on the debug
+--echo # sync point.
+--echo # Sending:
+send flush tables t1, t2 with read lock;
+
+--echo # Switching to connection 'con1'.
+connection con1;
+--echo # Wait till FLUSH TABLES <list> WITH READ LOCK stops.
+set debug_sync='now WAIT_FOR parked';
+
+--echo # Start a statement which will flush all tables and thus
+--echo # invalidate table t1 open by FLUSH TABLES <list> WITH READ LOCK.
+--echo # Sending:
+send flush tables;
+
+--echo # Switching to connection 'default'.
+connection default;
+--echo # Wait till the above FLUSH TABLES blocks.
+let $wait_condition=
+  select count(*) = 1 from information_schema.processlist
+  where state = "Waiting for table flush" and
+        info = "flush tables";
+--source include/wait_condition.inc
+
+--echo # Resume FLUSH TABLES <list> WITH READ LOCK, so it tries to open t2
+--echo # discovers that its t1 is obsolete and tries to reopen all tables.
+--echo # Such reopen should not cause releasing of SNW metadata locks
+--echo # which would result in assertion failures.
+set debug_sync='now SIGNAL go';
+
+--echo # Switching to connection 'con2'.
+connection con2;
+--echo # Reap FLUSH TABLES <list> WITH READ LOCK.
+reap;
+unlock tables;
+
+--echo # Switching to connection 'con1'.
+connection con1;
+--echo # Reap FLUSH TABLES.
+reap;
+
+--echo # Clean-up.
+--echo # Switching to connection 'default'.
+connection default;
+drop tables t1, t2;
+set debug_sync= 'RESET';
+disconnect con1;
+disconnect con2;
+
+
+--echo #
 --echo # Test for bug #46748 "Assertion in MDL_context::wait_for_locks()
 --echo # on INSERT + CREATE TRIGGER".
 --echo #

=== modified file 'mysys/thr_rwlock.c'
--- a/mysys/thr_rwlock.c	2010-08-10 21:12:01 +0000
+++ b/mysys/thr_rwlock.c	2010-08-12 13:50:23 +0000
@@ -127,7 +127,7 @@ int my_rw_wrlock(my_rw_lock_t *rwp)
   pthread_mutex_lock(&rwp->lock);
   rwp->waiters++;				/* another writer queued */
 
-  my_rw_lock_assert_not_owner(rwp);
+  my_rw_lock_assert_not_write_owner(rwp);
 
   while (rwp->state)
     pthread_cond_wait(&rwp->writers, &rwp->lock);

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2010-08-09 18:33:47 +0000
+++ b/sql/ha_ndbcluster.cc	2010-08-12 13:50:23 +0000
@@ -680,7 +680,7 @@ int ha_ndbcluster::ndb_err(NdbTransactio
     bzero((char*) &table_list,sizeof(table_list));
     table_list.db= m_dbname;
     table_list.alias= table_list.table_name= m_tabname;
-    close_cached_tables(thd, &table_list, FALSE);
+    close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
     break;
   }
   default:
@@ -8440,7 +8440,7 @@ int handle_trailing_share(NDB_SHARE *sha
   bzero((char*) &table_list,sizeof(table_list));
   table_list.db= share->db;
   table_list.alias= table_list.table_name= share->table_name;
-  close_cached_tables(thd, &table_list, FALSE);
+  close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
 
   mysql_mutex_lock(&ndbcluster_mutex);
   /* ndb_share reference temporary free */

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2010-08-09 18:33:47 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2010-08-12 13:50:23 +0000
@@ -934,7 +934,7 @@ int ndbcluster_setup_binlog_table_shares
     ndb_binlog_tables_inited= TRUE;
     if (opt_ndb_extra_logging)
       sql_print_information("NDB Binlog: ndb tables writable");
-    close_cached_tables(NULL, NULL, FALSE);
+    close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
     /* Signal injector thread that all is setup */
     mysql_cond_signal(&injector_cond);
   }
@@ -1736,7 +1736,7 @@ ndb_handle_schema_change(THD *thd, Ndb *
       bzero((char*) &table_list,sizeof(table_list));
       table_list.db= (char *)dbname;
       table_list.alias= table_list.table_name= (char *)tabname;
-      close_cached_tables(thd, &table_list, FALSE);
+      close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
 
       if ((error= ndbcluster_binlog_open_table(thd, share,
                                                table_share, table, 1)))
@@ -1840,7 +1840,7 @@ ndb_handle_schema_change(THD *thd, Ndb *
     bzero((char*) &table_list,sizeof(table_list));
     table_list.db= (char *)dbname;
     table_list.alias= table_list.table_name= (char *)tabname;
-    close_cached_tables(thd, &table_list, FALSE);
+    close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
     /* ndb_share reference create free */
     DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
                              share->key, share->use_count));
@@ -1961,7 +1961,7 @@ ndb_binlog_thread_handle_schema_event(TH
             bzero((char*) &table_list,sizeof(table_list));
             table_list.db= schema->db;
             table_list.alias= table_list.table_name= schema->name;
-            close_cached_tables(thd, &table_list, FALSE);
+            close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
           }
           /* ndb_share reference temporary free */
           if (share)
@@ -2076,7 +2076,7 @@ ndb_binlog_thread_handle_schema_event(TH
       mysql_mutex_unlock(&ndb_schema_share_mutex);
       /* end protect ndb_schema_share */
 
-      close_cached_tables(NULL, NULL, FALSE);
+      close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
       // fall through
     case NDBEVENT::TE_ALTER:
       ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
@@ -2233,7 +2233,7 @@ ndb_binlog_thread_handle_schema_event_po
           bzero((char*) &table_list,sizeof(table_list));
           table_list.db= schema->db;
           table_list.alias= table_list.table_name= schema->name;
-          close_cached_tables(thd, &table_list, FALSE);
+          close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
         }
         if (schema_type != SOT_ALTER_TABLE)
           break;
@@ -3938,9 +3938,9 @@ restart:
          !ndb_binlog_running))
       break; /* Shutting down server */
 
-    if (ndb_binlog_index && ndb_binlog_index->s->needs_reopen())
+    if (ndb_binlog_index && ndb_binlog_index->s->has_old_version())
     {
-      if (ndb_binlog_index->s->needs_reopen())
+      if (ndb_binlog_index->s->has_old_version())
       {
         trans_commit_stmt(thd);
         close_thread_tables(thd);

=== modified file 'sql/lock.cc'
--- a/sql/lock.cc	2010-07-27 12:42:36 +0000
+++ b/sql/lock.cc	2010-08-12 13:50:23 +0000
@@ -1298,27 +1298,19 @@ bool Global_read_lock::make_global_read_
 
 
 /**
-  Broadcast COND_refresh and COND_global_read_lock.
+  Broadcast COND_global_read_lock.
 
-    Due to a bug in a threading library it could happen that a signal
-    did not reach its target. A condition for this was that the same
-    condition variable was used with different mutexes in
-    mysql_cond_wait(). Some time ago we changed LOCK_open to
-    LOCK_global_read_lock in global read lock handling. So COND_refresh
-    was used with LOCK_open and LOCK_global_read_lock.
-
-    We did now also change from COND_refresh to COND_global_read_lock
-    in global read lock handling. But now it is necessary to signal
-    both conditions at the same time.
-
-  @note
-    When signalling COND_global_read_lock within the global read lock
-    handling, it is not necessary to also signal COND_refresh.
+  TODO/FIXME: Dmitry thinks that we broadcast on COND_global_read_lock
+              when old instance of table is closed to avoid races
+              between incrementing refresh_version and
+              wait_if_global_read_lock(thd, TRUE, FALSE) call.
+              Once global read lock implementation starts using MDL
+              infrastructure this will became unnecessary and should
+              be removed.
 */
 
 void broadcast_refresh(void)
 {
-  mysql_cond_broadcast(&COND_refresh);
   mysql_cond_broadcast(&COND_global_read_lock);
 }
 

=== modified file 'sql/mdl.cc'
--- a/sql/mdl.cc	2010-08-09 18:33:47 +0000
+++ b/sql/mdl.cc	2010-08-12 13:50:23 +0000
@@ -113,38 +113,32 @@ private:
 };
 
 
-enum enum_deadlock_weight
-{
-  MDL_DEADLOCK_WEIGHT_DML= 0,
-  MDL_DEADLOCK_WEIGHT_DDL= 100
-};
-
-
 /**
   A context of the recursive traversal through all contexts
   in all sessions in search for deadlock.
 */
 
-class Deadlock_detection_visitor
+class Deadlock_detection_visitor: public MDL_wait_for_graph_visitor
 {
 public:
   Deadlock_detection_visitor(MDL_context *start_node_arg)
     : m_start_node(start_node_arg),
       m_victim(NULL),
-      m_current_search_depth(0)
+      m_current_search_depth(0),
+      m_found_deadlock(FALSE)
   {}
-  bool enter_node(MDL_context * /* unused */);
-  void leave_node(MDL_context * /* unused */);
+  virtual bool enter_node(MDL_context *node);
+  virtual void leave_node(MDL_context *node);
 
-  bool inspect_edge(MDL_context *dest);
+  virtual bool inspect_edge(MDL_context *dest);
 
   MDL_context *get_victim() const { return m_victim; }
-
+private:
   /**
     Change the deadlock victim to a new one if it has lower deadlock
     weight.
   */
-  MDL_context *opt_change_victim_to(MDL_context *new_victim);
+  void opt_change_victim_to(MDL_context *new_victim);
 private:
   /**
     The context which has initiated the search. There
@@ -160,6 +154,8 @@ private:
     loop.
   */
   uint m_current_search_depth;
+  /** TRUE if we found a deadlock. */
+  bool m_found_deadlock;
   /**
     Maximum depth for deadlock searches. After this depth is
     achieved we will unconditionally declare that there is a
@@ -182,29 +178,38 @@ private:
   a node is entered, inspect_edge() will be called
   for all wait-for destinations of this node. Then
   leave_node() will be called.
-  We call "enter_node()" for all nodes we inspect, 
+  We call "enter_node()" for all nodes we inspect,
   including the starting node.
 
   @retval  TRUE  Maximum search depth exceeded.
   @retval  FALSE OK.
 */
 
-bool Deadlock_detection_visitor::enter_node(MDL_context * /* unused */)
+bool Deadlock_detection_visitor::enter_node(MDL_context *node)
 {
-  if (++m_current_search_depth >= MAX_SEARCH_DEPTH)
-    return TRUE;
-  return FALSE;
+  m_found_deadlock= ++m_current_search_depth >= MAX_SEARCH_DEPTH;
+  if (m_found_deadlock)
+  {
+    DBUG_ASSERT(! m_victim);
+    opt_change_victim_to(node);
+  }
+  return m_found_deadlock;
 }
 
 
 /**
   Done inspecting this node. Decrease the search
-  depth. Clear the node for debug safety.
+  depth. If a deadlock is found, and we are
+  backtracking to the start node, optionally
+  change the deadlock victim to one with lower
+  deadlock weight.
 */
 
-void Deadlock_detection_visitor::leave_node(MDL_context * /* unused */)
+void Deadlock_detection_visitor::leave_node(MDL_context *node)
 {
   --m_current_search_depth;
+  if (m_found_deadlock)
+    opt_change_victim_to(node);
 }
 
 
@@ -217,7 +222,8 @@ void Deadlock_detection_visitor::leave_n
 
 bool Deadlock_detection_visitor::inspect_edge(MDL_context *node)
 {
-  return node == m_start_node;
+  m_found_deadlock= node == m_start_node;
+  return m_found_deadlock;
 }
 
 
@@ -229,7 +235,7 @@ bool Deadlock_detection_visitor::inspect
   @retval !new_victim New victim became the current.
 */
 
-MDL_context *
+void
 Deadlock_detection_visitor::opt_change_victim_to(MDL_context *new_victim)
 {
   if (m_victim == NULL ||
@@ -238,10 +244,10 @@ Deadlock_detection_visitor::opt_change_v
     /* Swap victims, unlock the old one. */
     MDL_context *tmp= m_victim;
     m_victim= new_victim;
-    return tmp;
+    m_victim->lock_deadlock_victim();
+    if (tmp)
+      tmp->unlock_deadlock_victim();
   }
-  /* No change, unlock the current context. */
-  return new_victim;
 }
 
 
@@ -364,8 +370,8 @@ public:
 
   void remove_ticket(Ticket_list MDL_lock::*queue, MDL_ticket *ticket);
 
-  bool find_deadlock(MDL_ticket *waiting_ticket,
-                     Deadlock_detection_visitor *dvisitor);
+  bool visit_subgraph(MDL_ticket *waiting_ticket,
+                      MDL_wait_for_graph_visitor *gvisitor);
 
   /** List of granted tickets for this lock. */
   Ticket_list m_granted;
@@ -883,8 +889,8 @@ void MDL_ticket::destroy(MDL_ticket *tic
 uint MDL_ticket::get_deadlock_weight() const
 {
   return (m_lock->key.mdl_namespace() == MDL_key::GLOBAL ||
-          m_type > MDL_SHARED_NO_WRITE ?
-          MDL_DEADLOCK_WEIGHT_DDL : MDL_DEADLOCK_WEIGHT_DML);
+          m_type >= MDL_SHARED_NO_WRITE ?
+          DEADLOCK_WEIGHT_DDL : DEADLOCK_WEIGHT_DML);
 }
 
 
@@ -1388,6 +1394,15 @@ bool MDL_lock::has_pending_conflicting_l
 }
 
 
+MDL_wait_for_graph_visitor::~MDL_wait_for_graph_visitor()
+{
+}
+
+
+MDL_wait_for_subgraph::~MDL_wait_for_subgraph()
+{
+}
+
 /**
   Check if ticket represents metadata lock of "stronger" or equal type
   than specified one. I.e. if metadata lock represented by ticket won't
@@ -1536,9 +1551,8 @@ MDL_context::try_acquire_lock_impl(MDL_r
   MDL_ticket *ticket;
   bool is_transactional;
 
-  DBUG_ASSERT(mdl_request->type < MDL_SHARED_NO_WRITE ||
-              (is_lock_owner(MDL_key::GLOBAL, "", "",
-                             MDL_INTENTION_EXCLUSIVE)));
+  DBUG_ASSERT(mdl_request->type != MDL_EXCLUSIVE ||
+              is_lock_owner(MDL_key::GLOBAL, "", "", MDL_INTENTION_EXCLUSIVE));
   DBUG_ASSERT(mdl_request->ticket == NULL);
 
   /* Don't take chances in production. */
@@ -1963,8 +1977,17 @@ MDL_context::upgrade_shared_lock_to_excl
 }
 
 
-bool MDL_lock::find_deadlock(MDL_ticket *waiting_ticket,
-                             Deadlock_detection_visitor *dvisitor)
+/**
+  A fragment of recursive traversal of the wait-for graph
+  in search for deadlocks. Direct the deadlock visitor to all
+  contexts that own the lock the current node in the wait-for
+  graph is waiting for.
+  As long as the initial node is remembered in the visitor,
+  a deadlock is found when the same node is seen twice.
+*/
+
+bool MDL_lock::visit_subgraph(MDL_ticket *waiting_ticket,
+                              MDL_wait_for_graph_visitor *gvisitor)
 {
   MDL_ticket *ticket;
   MDL_context *src_ctx= waiting_ticket->get_ctx();
@@ -2033,7 +2056,7 @@ bool MDL_lock::find_deadlock(MDL_ticket 
     are visiting it but this is OK: in the worst case we might do some
     extra work and one more context might be chosen as a victim.
   */
-  if (dvisitor->enter_node(src_ctx))
+  if (gvisitor->enter_node(src_ctx))
     goto end;
 
   /*
@@ -2047,7 +2070,7 @@ bool MDL_lock::find_deadlock(MDL_ticket 
     /* Filter out edges that point to the same node. */
     if (ticket->get_ctx() != src_ctx &&
         ticket->is_incompatible_when_granted(waiting_ticket->get_type()) &&
-        dvisitor->inspect_edge(ticket->get_ctx()))
+        gvisitor->inspect_edge(ticket->get_ctx()))
     {
       goto end_leave_node;
     }
@@ -2058,7 +2081,7 @@ bool MDL_lock::find_deadlock(MDL_ticket 
     /* Filter out edges that point to the same node. */
     if (ticket->get_ctx() != src_ctx &&
         ticket->is_incompatible_when_waiting(waiting_ticket->get_type()) &&
-        dvisitor->inspect_edge(ticket->get_ctx()))
+        gvisitor->inspect_edge(ticket->get_ctx()))
     {
       goto end_leave_node;
     }
@@ -2070,7 +2093,7 @@ bool MDL_lock::find_deadlock(MDL_ticket 
   {
     if (ticket->get_ctx() != src_ctx &&
         ticket->is_incompatible_when_granted(waiting_ticket->get_type()) &&
-        ticket->get_ctx()->find_deadlock(dvisitor))
+        ticket->get_ctx()->visit_subgraph(gvisitor))
     {
       goto end_leave_node;
     }
@@ -2081,7 +2104,7 @@ bool MDL_lock::find_deadlock(MDL_ticket 
   {
     if (ticket->get_ctx() != src_ctx &&
         ticket->is_incompatible_when_waiting(waiting_ticket->get_type()) &&
-        ticket->get_ctx()->find_deadlock(dvisitor))
+        ticket->get_ctx()->visit_subgraph(gvisitor))
     {
       goto end_leave_node;
     }
@@ -2090,7 +2113,7 @@ bool MDL_lock::find_deadlock(MDL_ticket 
   result= FALSE;
 
 end_leave_node:
-  dvisitor->leave_node(src_ctx);
+  gvisitor->leave_node(src_ctx);
 
 end:
   mysql_prlock_unlock(&m_rwlock);
@@ -2099,35 +2122,47 @@ end:
 
 
 /**
-  Recursively traverse the wait-for graph of MDL contexts
-  in search for deadlocks.
+  Traverse a portion of wait-for graph which is reachable
+  through the edge represented by this ticket and search
+  for deadlocks.
 
-  @retval TRUE  A deadlock is found. A victim is remembered
-                by the visitor.
+  @retval TRUE  A deadlock is found. A pointer to deadlock
+                 victim is saved in the visitor.
   @retval FALSE
 */
 
-bool MDL_context::find_deadlock(Deadlock_detection_visitor *dvisitor)
+bool MDL_ticket::accept_visitor(MDL_wait_for_graph_visitor *gvisitor)
+{
+  return m_lock->visit_subgraph(this, gvisitor);
+}
+
+
+/**
+  A fragment of recursive traversal of the wait-for graph of
+  MDL contexts in the server in search for deadlocks.
+  Assume this MDL context is a node in the wait-for graph,
+  and direct the visitor to all adjacent nodes. As long
+  as the starting node is remembered in the visitor, a
+  deadlock is found when the same node is visited twice.
+  One MDL context is connected to another in the wait-for
+  graph if it waits on a resource that is held by the other
+  context.
+
+  @retval TRUE  A deadlock is found. A pointer to deadlock
+                victim is saved in the visitor.
+  @retval FALSE
+*/
+
+bool MDL_context::visit_subgraph(MDL_wait_for_graph_visitor *gvisitor)
 {
-  MDL_context *m_unlock_ctx= this;
   bool result= FALSE;
 
   mysql_prlock_rdlock(&m_LOCK_waiting_for);
 
   if (m_waiting_for)
-  {
-    result= m_waiting_for->m_lock->find_deadlock(m_waiting_for, dvisitor);
-    if (result)
-      m_unlock_ctx= dvisitor->opt_change_victim_to(this);
-  }
-  /*
-    We may recurse into the same MDL_context more than once
-    in case this is not the starting node. Make sure we release the
-    read lock as it's been taken, except for 1 read lock for
-    the deadlock victim.
-  */
-  if (m_unlock_ctx)
-    mysql_prlock_unlock(&m_unlock_ctx->m_LOCK_waiting_for);
+    result= m_waiting_for->accept_visitor(gvisitor);
+
+  mysql_prlock_unlock(&m_LOCK_waiting_for);
 
   return result;
 }
@@ -2149,14 +2184,14 @@ void MDL_context::find_deadlock()
   while (1)
   {
     /*
-      The fact that we use fresh instance of dvisitor for each
+      The fact that we use fresh instance of gvisitor for each
       search performed by find_deadlock() below is important,
       the code responsible for victim selection relies on this.
     */
     Deadlock_detection_visitor dvisitor(this);
     MDL_context *victim;
 
-    if (! find_deadlock(&dvisitor))
+    if (! visit_subgraph(&dvisitor))
     {
       /* No deadlocks are found! */
       break;
@@ -2177,7 +2212,7 @@ void MDL_context::find_deadlock()
       context was waiting is concurrently satisfied.
     */
     (void) victim->m_wait.set_status(MDL_wait::VICTIM);
-    mysql_prlock_unlock(&victim->m_LOCK_waiting_for);
+    victim->unlock_deadlock_victim();
 
     if (victim == this)
       break;

=== modified file 'sql/mdl.h'
--- a/sql/mdl.h	2010-08-06 11:29:37 +0000
+++ b/sql/mdl.h	2010-08-12 13:50:23 +0000
@@ -34,7 +34,6 @@ class THD;
 class MDL_context;
 class MDL_lock;
 class MDL_ticket;
-class Deadlock_detection_visitor;
 
 /**
   Type of metadata lock request.
@@ -372,6 +371,59 @@ public:
 
 typedef void (*mdl_cached_object_release_hook)(void *);
 
+
+/**
+  An abstract class for inspection of a connected
+  subgraph of the wait-for graph.
+*/
+
+class MDL_wait_for_graph_visitor
+{
+public:
+  virtual bool enter_node(MDL_context *node) = 0;
+  virtual void leave_node(MDL_context *node) = 0;
+
+  virtual bool inspect_edge(MDL_context *dest) = 0;
+  virtual ~MDL_wait_for_graph_visitor();
+  MDL_wait_for_graph_visitor() :m_lock_open_count(0) {}
+public:
+  /**
+   XXX, hack: During deadlock search, we may need to
+   inspect TABLE_SHAREs and acquire LOCK_open. Since
+   LOCK_open is not a recursive mutex, count here how many
+   times we "took" it (but only take and release once).
+   Not using a native recursive mutex or rwlock in 5.5 for
+   LOCK_open since it has significant performance impacts.
+  */
+  uint m_lock_open_count;
+};
+
+/**
+  Abstract class representing an edge in the waiters graph
+  to be traversed by deadlock detection algorithm.
+*/
+
+class MDL_wait_for_subgraph
+{
+public:
+  virtual ~MDL_wait_for_subgraph();
+
+  /**
+    Accept a wait-for graph visitor to inspect the node
+    this edge is leading to.
+  */
+  virtual bool accept_visitor(MDL_wait_for_graph_visitor *gvisitor) = 0;
+
+  enum enum_deadlock_weight
+  {
+    DEADLOCK_WEIGHT_DML= 0,
+    DEADLOCK_WEIGHT_DDL= 100
+  };
+  /* A helper used to determine which lock request should be aborted. */
+  virtual uint get_deadlock_weight() const = 0;
+};
+
+
 /**
   A granted metadata lock.
 
@@ -392,7 +444,7 @@ typedef void (*mdl_cached_object_release
           threads/contexts.
 */
 
-class MDL_ticket
+class MDL_ticket : public MDL_wait_for_subgraph
 {
 public:
   /**
@@ -426,8 +478,9 @@ public:
   bool is_incompatible_when_granted(enum_mdl_type type) const;
   bool is_incompatible_when_waiting(enum_mdl_type type) const;
 
-  /* A helper used to determine which lock request should be aborted. */
-  uint get_deadlock_weight() const;
+  /** Implement MDL_wait_for_subgraph interface. */
+  virtual bool accept_visitor(MDL_wait_for_graph_visitor *dvisitor);
+  virtual uint get_deadlock_weight() const;
 private:
   friend class MDL_context;
 
@@ -594,8 +647,6 @@ public:
   {
     return m_needs_thr_lock_abort;
   }
-
-  bool find_deadlock(Deadlock_detection_visitor *dvisitor);
 public:
   /**
     If our request for a lock is scheduled, or aborted by the deadlock
@@ -687,12 +738,13 @@ private:
   */
   mysql_prlock_t m_LOCK_waiting_for;
   /**
-    Tell the deadlock detector what lock this session is waiting for.
+    Tell the deadlock detector what metadata lock or table
+    definition cache entry this session is waiting for.
     In principle, this is redundant, as information can be found
     by inspecting waiting queues, but we'd very much like it to be
     readily available to the wait-for graph iterator.
    */
-  MDL_ticket *m_waiting_for;
+  MDL_wait_for_subgraph *m_waiting_for;
 private:
   MDL_ticket *find_ticket(MDL_request *mdl_req,
                           bool *is_transactional);
@@ -700,13 +752,16 @@ private:
   bool try_acquire_lock_impl(MDL_request *mdl_request,
                              MDL_ticket **out_ticket);
 
+public:
   void find_deadlock();
 
+  bool visit_subgraph(MDL_wait_for_graph_visitor *dvisitor);
+
   /** Inform the deadlock detector there is an edge in the wait-for graph. */
-  void will_wait_for(MDL_ticket *pending_ticket)
+  void will_wait_for(MDL_wait_for_subgraph *waiting_for_arg)
   {
     mysql_prlock_wrlock(&m_LOCK_waiting_for);
-    m_waiting_for= pending_ticket;
+    m_waiting_for=  waiting_for_arg;
     mysql_prlock_unlock(&m_LOCK_waiting_for);
   }
 
@@ -717,6 +772,14 @@ private:
     m_waiting_for= NULL;
     mysql_prlock_unlock(&m_LOCK_waiting_for);
   }
+  void lock_deadlock_victim()
+  {
+    mysql_prlock_rdlock(&m_LOCK_waiting_for);
+  }
+  void unlock_deadlock_victim()
+  {
+    mysql_prlock_unlock(&m_LOCK_waiting_for);
+  }
 private:
   MDL_context(const MDL_context &rhs);          /* not implemented */
   MDL_context &operator=(MDL_context &rhs);     /* not implemented */

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2010-07-29 12:32:11 +0000
+++ b/sql/mysqld.cc	2010-08-12 13:50:23 +0000
@@ -601,7 +601,7 @@ SHOW_COMP_OPTION have_profiling;
 pthread_key(MEM_ROOT**,THR_MALLOC);
 pthread_key(THD*, THR_THD);
 mysql_mutex_t LOCK_thread_count;
-mysql_mutex_t LOCK_open,
+mysql_mutex_t
   LOCK_status, LOCK_global_read_lock,
   LOCK_error_log, LOCK_uuid_generator,
   LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create,
@@ -623,7 +623,7 @@ mysql_mutex_t LOCK_des_key_file;
 mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
 mysql_rwlock_t LOCK_system_variables_hash;
 mysql_cond_t COND_thread_count;
-mysql_cond_t COND_refresh, COND_global_read_lock;
+mysql_cond_t COND_global_read_lock;
 pthread_t signal_thread;
 pthread_attr_t connection_attrib;
 mysql_mutex_t LOCK_server_started;
@@ -1526,7 +1526,6 @@ static void wait_for_signal_thread_to_en
 static void clean_up_mutexes()
 {
   mysql_rwlock_destroy(&LOCK_grant);
-  mysql_mutex_destroy(&LOCK_open);
   mysql_mutex_destroy(&LOCK_thread_count);
   mysql_mutex_destroy(&LOCK_status);
   mysql_mutex_destroy(&LOCK_delayed_insert);
@@ -1559,7 +1558,6 @@ static void clean_up_mutexes()
   mysql_mutex_destroy(&LOCK_prepared_stmt_count);
   mysql_mutex_destroy(&LOCK_error_messages);
   mysql_cond_destroy(&COND_thread_count);
-  mysql_cond_destroy(&COND_refresh);
   mysql_cond_destroy(&COND_global_read_lock);
   mysql_cond_destroy(&COND_thread_cache);
   mysql_cond_destroy(&COND_flush_thread_cache);
@@ -3500,7 +3498,6 @@ You should consider changing lower_case_
 
 static int init_thread_environment()
 {
-  mysql_mutex_init(key_LOCK_open, &LOCK_open, MY_MUTEX_INIT_FAST);
   mysql_mutex_init(key_LOCK_thread_count, &LOCK_thread_count, MY_MUTEX_INIT_FAST);
   mysql_mutex_init(key_LOCK_status, &LOCK_status, MY_MUTEX_INIT_FAST);
   mysql_mutex_init(key_LOCK_delayed_insert,
@@ -3547,7 +3544,6 @@ static int init_thread_environment()
   mysql_rwlock_init(key_rwlock_LOCK_sys_init_slave, &LOCK_sys_init_slave);
   mysql_rwlock_init(key_rwlock_LOCK_grant, &LOCK_grant);
   mysql_cond_init(key_COND_thread_count, &COND_thread_count, NULL);
-  mysql_cond_init(key_COND_refresh, &COND_refresh, NULL);
   mysql_cond_init(key_COND_global_read_lock, &COND_global_read_lock, NULL);
   mysql_cond_init(key_COND_thread_cache, &COND_thread_cache, NULL);
   mysql_cond_init(key_COND_flush_thread_cache, &COND_flush_thread_cache, NULL);
@@ -7680,7 +7676,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key
   key_LOCK_delayed_insert, key_LOCK_delayed_status, key_LOCK_error_log,
   key_LOCK_gdl, key_LOCK_global_read_lock, key_LOCK_global_system_variables,
   key_LOCK_manager,
-  key_LOCK_open, key_LOCK_prepared_stmt_count,
+  key_LOCK_prepared_stmt_count,
   key_LOCK_rpl_status, key_LOCK_server_started, key_LOCK_status,
   key_LOCK_system_variables_hash, key_LOCK_table_share, key_LOCK_thd_data,
   key_LOCK_user_conn, key_LOCK_uuid_generator, key_LOG_LOCK_log,
@@ -7719,7 +7715,6 @@ static PSI_mutex_info all_server_mutexes
   { &key_LOCK_global_read_lock, "LOCK_global_read_lock", PSI_FLAG_GLOBAL},
   { &key_LOCK_global_system_variables, "LOCK_global_system_variables", PSI_FLAG_GLOBAL},
   { &key_LOCK_manager, "LOCK_manager", PSI_FLAG_GLOBAL},
-  { &key_LOCK_open, "LOCK_open", PSI_FLAG_GLOBAL},
   { &key_LOCK_prepared_stmt_count, "LOCK_prepared_stmt_count", PSI_FLAG_GLOBAL},
   { &key_LOCK_rpl_status, "LOCK_rpl_status", PSI_FLAG_GLOBAL},
   { &key_LOCK_server_started, "LOCK_server_started", PSI_FLAG_GLOBAL},
@@ -7767,7 +7762,7 @@ PSI_cond_key key_PAGE_cond, key_COND_act
 
 PSI_cond_key key_BINLOG_COND_prep_xids, key_BINLOG_update_cond,
   key_COND_cache_status_changed, key_COND_global_read_lock, key_COND_manager,
-  key_COND_refresh, key_COND_rpl_status, key_COND_server_started,
+  key_COND_rpl_status, key_COND_server_started,
   key_delayed_insert_cond, key_delayed_insert_cond_client,
   key_item_func_sleep_cond, key_master_info_data_cond,
   key_master_info_start_cond, key_master_info_stop_cond,
@@ -7791,7 +7786,6 @@ static PSI_cond_info all_server_conds[]=
   { &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0},
   { &key_COND_global_read_lock, "COND_global_read_lock", PSI_FLAG_GLOBAL},
   { &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
-  { &key_COND_refresh, "COND_refresh", PSI_FLAG_GLOBAL},
   { &key_COND_rpl_status, "COND_rpl_status", PSI_FLAG_GLOBAL},
   { &key_COND_server_started, "COND_server_started", PSI_FLAG_GLOBAL},
   { &key_delayed_insert_cond, "Delayed_insert::cond", 0},

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2010-07-27 14:32:42 +0000
+++ b/sql/mysqld.h	2010-08-12 13:50:23 +0000
@@ -229,7 +229,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_ind
   key_LOCK_delayed_insert, key_LOCK_delayed_status, key_LOCK_error_log,
   key_LOCK_gdl, key_LOCK_global_read_lock, key_LOCK_global_system_variables,
   key_LOCK_logger, key_LOCK_manager,
-  key_LOCK_open, key_LOCK_prepared_stmt_count,
+  key_LOCK_prepared_stmt_count,
   key_LOCK_rpl_status, key_LOCK_server_started, key_LOCK_status,
   key_LOCK_table_share, key_LOCK_thd_data,
   key_LOCK_user_conn, key_LOCK_uuid_generator, key_LOG_LOCK_log,
@@ -249,7 +249,7 @@ extern PSI_cond_key key_PAGE_cond, key_C
 
 extern PSI_cond_key key_BINLOG_COND_prep_xids, key_BINLOG_update_cond,
   key_COND_cache_status_changed, key_COND_global_read_lock, key_COND_manager,
-  key_COND_refresh, key_COND_rpl_status, key_COND_server_started,
+  key_COND_rpl_status, key_COND_server_started,
   key_delayed_insert_cond, key_delayed_insert_cond_client,
   key_item_func_sleep_cond, key_master_info_data_cond,
   key_master_info_start_cond, key_master_info_stop_cond,
@@ -316,7 +316,7 @@ extern MYSQL_PLUGIN_IMPORT key_map key_m
 /*
   Server mutex locks and condition variables.
  */
-extern mysql_mutex_t LOCK_open,
+extern mysql_mutex_t
        LOCK_user_locks, LOCK_status,
        LOCK_error_log, LOCK_delayed_insert, LOCK_uuid_generator,
        LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
@@ -332,7 +332,7 @@ extern mysql_cond_t COND_server_started;
 extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
 extern mysql_rwlock_t LOCK_system_variables_hash;
 extern mysql_cond_t COND_thread_count;
-extern mysql_cond_t COND_refresh, COND_manager;
+extern mysql_cond_t COND_manager;
 extern mysql_cond_t COND_global_read_lock;
 extern int32 thread_running;
 extern my_atomic_rwlock_t thread_running_lock;

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2010-08-10 21:12:01 +0000
+++ b/sql/sql_base.cc	2010-08-12 13:50:23 +0000
@@ -124,6 +124,38 @@ bool Prelock_error_handler::safely_trapp
 */
 
 /**
+  Protects table_def_hash, used and unused lists in the
+  TABLE_SHARE object, LRU lists of used TABLEs and used
+  TABLE_SHAREs, refresh_version and the table id counter.
+*/
+mysql_mutex_t LOCK_open;
+
+#ifdef HAVE_PSI_INTERFACE
+static PSI_mutex_key key_LOCK_open;
+static PSI_mutex_info all_tdc_mutexes[]= {
+  { &key_LOCK_open, "LOCK_open", PSI_FLAG_GLOBAL }
+};
+
+/**
+  Initialize performance schema instrumentation points
+  used by the table cache.
+*/
+
+static void init_tdc_psi_keys(void)
+{
+  const char *category= "sql";
+  int count;
+
+  if (PSI_server == NULL)
+    return;
+
+  count= array_elements(all_tdc_mutexes);
+  PSI_server->register_mutex(category, all_tdc_mutexes, count);
+}
+#endif /* HAVE_PSI_INTERFACE */
+
+
+/**
    Total number of TABLE instances for tables in the table definition cache
    (both in use by threads and not in use). This value is accessible to user
    as "Open_tables" status variable.
@@ -146,9 +178,6 @@ static bool check_and_update_table_versi
 static bool open_table_entry_fini(THD *thd, TABLE_SHARE *share, TABLE *entry);
 static bool auto_repair_table(THD *thd, TABLE_LIST *table_list);
 static void free_cache_entry(TABLE *entry);
-static bool tdc_wait_for_old_versions(THD *thd,
-                                      MDL_request_list *mdl_requests,
-                                      ulong timeout);
 static bool
 has_write_table_with_auto_increment(TABLE_LIST *tables);
 
@@ -294,9 +323,14 @@ static void table_def_free_entry(TABLE_S
 bool table_def_init(void)
 {
   table_def_inited= 1;
+#ifdef HAVE_PSI_INTERFACE
+  init_tdc_psi_keys();
+#endif
+  mysql_mutex_init(key_LOCK_open, &LOCK_open, MY_MUTEX_INIT_FAST);
   oldest_unused_share= &end_of_unused_share;
   end_of_unused_share.prev= &oldest_unused_share;
 
+
   return my_hash_init(&table_def_cache, &my_charset_bin, table_def_size,
                       0, 0, table_def_key,
                       (my_hash_free_key) table_def_free_entry, 0) != 0;
@@ -323,7 +357,7 @@ void table_def_start_shutdown(void)
     table_def_shutdown_in_progress= TRUE;
     mysql_mutex_unlock(&LOCK_open);
     /* Free all cached but unused TABLEs and TABLE_SHAREs. */
-    close_cached_tables(NULL, NULL, FALSE);
+    close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
   }
 }
 
@@ -336,6 +370,7 @@ void table_def_free(void)
     table_def_inited= 0;
     /* Free table definitions. */
     my_hash_free(&table_def_cache);
+    mysql_mutex_destroy(&LOCK_open);
   }
   DBUG_VOID_RETURN;
 }
@@ -441,7 +476,7 @@ static void table_def_unuse_table(TABLE 
   DBUG_ASSERT(table->in_use);
 
   /* We shouldn't put the table to 'unused' list if the share is old. */
-  DBUG_ASSERT(! table->s->needs_reopen());
+  DBUG_ASSERT(! table->s->has_old_version());
 
   table->in_use= 0;
   /* Remove table from the list of tables used in this share. */
@@ -516,7 +551,7 @@ TABLE_SHARE *get_table_share(THD *thd, T
   }
 
   /*
-    We assign a new table id under the protection of the LOCK_open.
+    We assign a new table id under the protection of LOCK_open.
     We do this instead of creating a new mutex
     and using it for the sole purpose of serializing accesses to a
     static variable, we assign the table id here. We assign it to the
@@ -702,7 +737,7 @@ void release_table_share(TABLE_SHARE *sh
   DBUG_ASSERT(share->ref_count);
   if (!--share->ref_count)
   {
-    if (share->needs_reopen() || table_def_shutdown_in_progress)
+    if (share->has_old_version() || table_def_shutdown_in_progress)
       my_hash_delete(&table_def_cache, (uchar*) share);
     else
     {
@@ -889,7 +924,7 @@ void free_io_cache(TABLE *table)
 
    @param share Table share.
 
-   @pre Caller should have LOCK_open mutex acquired.
+   @pre Caller should have LOCK_open mutex.
 */
 
 static void kill_delayed_threads_for_table(TABLE_SHARE *share)
@@ -926,6 +961,7 @@ static void kill_delayed_threads_for_tab
   @param thd Thread context
   @param tables List of tables to remove from the cache
   @param wait_for_refresh Wait for a impending flush
+  @param timeout Timeout for waiting for flush to be completed.
 
   @note THD can be NULL, but then wait_for_refresh must be FALSE
         and tables must be NULL.
@@ -938,17 +974,28 @@ static void kill_delayed_threads_for_tab
         lock taken by thread trying to obtain global read lock.
 */
 
-bool close_cached_tables(THD *thd, TABLE_LIST *tables, bool wait_for_refresh)
+bool close_cached_tables(THD *thd, TABLE_LIST *tables,
+                         bool wait_for_refresh, ulong timeout)
 {
   bool result= FALSE;
   bool found= TRUE;
+  struct timespec abstime;
   DBUG_ENTER("close_cached_tables");
   DBUG_ASSERT(thd || (!wait_for_refresh && !tables));
 
   mysql_mutex_lock(&LOCK_open);
   if (!tables)
   {
-    refresh_version++;				// Force close of open tables
+    /*
+      Force close of all open tables.
+
+      Note that code in TABLE_SHARE::wait_for_old_version() assumes that
+      incrementing of refresh_version and removal of unused tables and
+      shares from TDC happens atomically under protection of LOCK_open,
+      or putting it another way that TDC does not contain old shares
+      which don't have any tables used.
+    */
+    refresh_version++;
     DBUG_PRINT("tcache", ("incremented global refresh_version to: %lu",
                           refresh_version));
     kill_delayed_threads();
@@ -987,6 +1034,8 @@ bool close_cached_tables(THD *thd, TABLE
   if (!wait_for_refresh)
     DBUG_RETURN(result);
 
+  set_timespec(abstime, timeout);
+
   if (thd->locked_tables_mode)
   {
     /*
@@ -1026,6 +1075,7 @@ bool close_cached_tables(THD *thd, TABLE
 
   while (found && ! thd->killed)
   {
+    TABLE_SHARE *share;
     found= FALSE;
     /*
       To a self-deadlock or deadlocks with other FLUSH threads
@@ -1036,15 +1086,12 @@ bool close_cached_tables(THD *thd, TABLE
 
     mysql_mutex_lock(&LOCK_open);
 
-    thd->enter_cond(&COND_refresh, &LOCK_open, "Flushing tables");
-
     if (!tables)
     {
       for (uint idx=0 ; idx < table_def_cache.records ; idx++)
       {
-        TABLE_SHARE *share=(TABLE_SHARE*) my_hash_element(&table_def_cache,
-                                                          idx);
-        if (share->needs_reopen())
+        share= (TABLE_SHARE*) my_hash_element(&table_def_cache, idx);
+        if (share->has_old_version())
         {
           found= TRUE;
           break;
@@ -1055,8 +1102,8 @@ bool close_cached_tables(THD *thd, TABLE
     {
       for (TABLE_LIST *table= tables; table; table= table->next_local)
       {
-        TABLE_SHARE *share= get_cached_table_share(table->db, table->table_name);
-        if (share && share->needs_reopen())
+        share= get_cached_table_share(table->db, table->table_name);
+        if (share && share->has_old_version())
         {
 	  found= TRUE;
           break;
@@ -1066,11 +1113,20 @@ bool close_cached_tables(THD *thd, TABLE
 
     if (found)
     {
-      DBUG_PRINT("signal", ("Waiting for COND_refresh"));
-      mysql_cond_wait(&COND_refresh, &LOCK_open);
+      /*
+        The method below temporarily unlocks LOCK_open and frees
+        share's memory.
+      */
+      if (share->wait_for_old_version(thd, &abstime,
+                                    MDL_wait_for_subgraph::DEADLOCK_WEIGHT_DDL))
+      {
+        mysql_mutex_unlock(&LOCK_open);
+        result= TRUE;
+        goto err_with_reopen;
+      }
     }
 
-    thd->exit_cond(NULL);
+    mysql_mutex_unlock(&LOCK_open);
   }
 
 err_with_reopen:
@@ -1141,7 +1197,7 @@ bool close_cached_connection_tables(THD 
   mysql_mutex_unlock(&LOCK_open);
 
   if (tables)
-    result= close_cached_tables(thd, tables, FALSE);
+    result= close_cached_tables(thd, tables, FALSE, LONG_TIMEOUT);
 
   if (if_wait_for_refresh)
   {
@@ -1488,9 +1544,6 @@ void close_thread_tables(THD *thd)
     thd->lock=0;
   }
   /*
-    Note that we need to hold LOCK_open while changing the
-    open_tables list. Another thread may work on it.
-    (See: mysql_notify_thread_having_shared_lock())
     Closing a MERGE child before the parent would be fatal if the
     other thread tries to abort the MERGE lock in between.
   */
@@ -1536,7 +1589,7 @@ bool close_thread_table(THD *thd, TABLE 
 
   mysql_mutex_lock(&LOCK_open);
 
-  if (table->s->needs_reopen() || table->needs_reopen() ||
+  if (table->s->has_old_version() || table->needs_reopen() ||
       table_def_shutdown_in_progress)
   {
     free_cache_entry(table);
@@ -2251,8 +2304,6 @@ void drop_open_table(THD *thd, TABLE *ta
                          exists and to FALSE otherwise.
 
     @note This function acquires LOCK_open internally.
-          It also assumes that the fact that there are no exclusive
-          metadata locks on the table was checked beforehand.
 
     @note If there is no .FRM file for the table but it exists in one
           of engines (e.g. it was created on another node of NDB cluster)
@@ -2345,8 +2396,9 @@ bool MDL_deadlock_handler::handle_condit
   {
     /* Disable the handler to avoid infinite recursion. */
     m_is_active= TRUE;
-    (void) m_ot_ctx->request_backoff_action(Open_table_context::OT_MDL_CONFLICT,
-                                            NULL);
+    (void) m_ot_ctx->request_backoff_action(
+             Open_table_context::OT_BACKOFF_AND_RETRY,
+             NULL);
     m_is_active= FALSE;
     /*
       If the above back-off request failed, a new instance of
@@ -2392,6 +2444,8 @@ open_table_get_mdl_lock(THD *thd, Open_t
                         uint flags,
                         MDL_ticket **mdl_ticket)
 {
+  MDL_request mdl_request_shared;
+
   if (flags & (MYSQL_OPEN_FORCE_SHARED_MDL |
                MYSQL_OPEN_FORCE_SHARED_HIGH_PRIO_MDL))
   {
@@ -2417,16 +2471,12 @@ open_table_get_mdl_lock(THD *thd, Open_t
     DBUG_ASSERT(!(flags & MYSQL_OPEN_FORCE_SHARED_MDL) ||
                 !(flags & MYSQL_OPEN_FORCE_SHARED_HIGH_PRIO_MDL));
 
-    mdl_request= new (thd->mem_root) MDL_request(mdl_request);
-    if (mdl_request == NULL)
-      return TRUE;
-
-    mdl_request->set_type((flags & MYSQL_OPEN_FORCE_SHARED_MDL) ?
-                          MDL_SHARED : MDL_SHARED_HIGH_PRIO);
+    mdl_request_shared.init(&mdl_request->key,
+                            (flags & MYSQL_OPEN_FORCE_SHARED_MDL) ?
+                            MDL_SHARED : MDL_SHARED_HIGH_PRIO);
+    mdl_request= &mdl_request_shared;
   }
 
-  ot_ctx->add_request(mdl_request);
-
   if (flags & MYSQL_OPEN_FAIL_ON_MDL_CONFLICT)
   {
     /*
@@ -2489,6 +2539,40 @@ open_table_get_mdl_lock(THD *thd, Open_t
 }
 
 
+/**
+  Check if table's share is being removed from the table definition
+  cache and, if yes, wait until the flush is complete.
+
+  @param thd             Thread context.
+  @param table_list      Table which share should be checked.
+  @param timeout         Timeout for waiting.
+  @param deadlock_weight Weight of this wait for deadlock detector.
+
+  @retval FALSE   Success. Share is up to date or has been flushed.
+  @retval TRUE    Error (OOM, our was killed, the wait resulted
+                  in a deadlock or timeout). Reported.
+*/
+
+static bool
+tdc_wait_for_old_version(THD *thd, const char *db, const char *table_name,
+                         ulong wait_timeout, uint deadlock_weight)
+{
+  TABLE_SHARE *share;
+  bool res= FALSE;
+
+  mysql_mutex_lock(&LOCK_open);
+  if ((share= get_cached_table_share(db, table_name)) &&
+      share->has_old_version())
+  {
+    struct timespec abstime;
+    set_timespec(abstime, wait_timeout);
+    res= share->wait_for_old_version(thd, &abstime, deadlock_weight);
+  }
+  mysql_mutex_unlock(&LOCK_open);
+  return res;
+}
+
+
 /*
   Open a table.
 
@@ -2578,8 +2662,8 @@ bool open_table(THD *thd, TABLE_LIST *ta
 
     if (thd->open_tables && thd->open_tables->s->version != refresh_version)
     {
-      (void) ot_ctx->request_backoff_action(Open_table_context::OT_WAIT_TDC,
-                                            NULL);
+      (void)ot_ctx->request_backoff_action(Open_table_context::OT_REOPEN_TABLES,
+                                           NULL);
       DBUG_RETURN(TRUE);
     }
   }
@@ -2790,6 +2874,8 @@ bool open_table(THD *thd, TABLE_LIST *ta
   else if (table_list->open_strategy == TABLE_LIST::OPEN_STUB)
     DBUG_RETURN(FALSE);
 
+retry_share:
+
   mysql_mutex_lock(&LOCK_open);
 
   if (!(share= get_table_share_with_discover(thd, table_list, key,
@@ -2861,31 +2947,50 @@ bool open_table(THD *thd, TABLE_LIST *ta
   if (table_list->i_s_requested_object &  OPEN_VIEW_ONLY)
     goto err_unlock;
 
-  /*
-    If the version changes while we're opening the tables,
-    we have to back off, close all the tables opened-so-far,
-    and try to reopen them. Note: refresh_version is currently
-    changed only during FLUSH TABLES.
-  */
-  if (share->needs_reopen() ||
-      (thd->open_tables && thd->open_tables->s->version != share->version))
-  {
-    if (!(flags & MYSQL_OPEN_IGNORE_FLUSH))
-    {
-       /*
-         We already have an MDL lock. But we have encountered an old
-         version of table in the table definition cache which is possible
-         when someone changes the table version directly in the cache
-         without acquiring a metadata lock (e.g. this can happen during
-         "rolling" FLUSH TABLE(S)).
-         Note, that to avoid a "busywait" in this case, we have to wait
-         separately in the caller for old table versions to go away
-         (see tdc_wait_for_old_versions()).
-       */
+  if (!(flags & MYSQL_OPEN_IGNORE_FLUSH))
+  {
+    if (share->has_old_version())
+    {
+      /*
+        We already have an MDL lock. But we have encountered an old
+        version of table in the table definition cache which is possible
+        when someone changes the table version directly in the cache
+        without acquiring a metadata lock (e.g. this can happen during
+        "rolling" FLUSH TABLE(S)).
+        Release our reference to share, wait until old version of
+        share goes away and then try to get new version of table share.
+      */
+      MDL_deadlock_handler mdl_deadlock_handler(ot_ctx);
+      bool wait_result;
+
       release_table_share(share);
       mysql_mutex_unlock(&LOCK_open);
-      (void) ot_ctx->request_backoff_action(Open_table_context::OT_WAIT_TDC,
-                                            NULL);
+
+      thd->push_internal_handler(&mdl_deadlock_handler);
+      wait_result= tdc_wait_for_old_version(thd, table_list->db,
+                                            table_list->table_name,
+                                            ot_ctx->get_timeout(),
+                                            mdl_ticket->get_deadlock_weight());
+      thd->pop_internal_handler();
+
+      if (wait_result)
+        DBUG_RETURN(TRUE);
+
+      goto retry_share;
+    }
+
+    if (thd->open_tables && thd->open_tables->s->version != share->version)
+    {
+      /*
+        If the version changes while we're opening the tables,
+        we have to back off, close all the tables opened-so-far,
+        and try to reopen them. Note: refresh_version is currently
+        changed only during FLUSH TABLES.
+      */
+      release_table_share(share);
+      mysql_mutex_unlock(&LOCK_open);
+      (void)ot_ctx->request_backoff_action(Open_table_context::OT_REOPEN_TABLES,
+                                           NULL);
       DBUG_RETURN(TRUE);
     }
   }
@@ -3425,7 +3530,7 @@ Locked_tables_list::reopen_tables(THD *t
   PRE-CONDITION(S)
 
     share is non-NULL
-    The LOCK_open mutex is locked
+    The LOCK_open mutex is locked.
 
   POST-CONDITION(S)
 
@@ -3814,7 +3919,7 @@ request_backoff_action(enum_open_table_a
                        TABLE_LIST *table)
 {
   /*
-    A back off action may be one of the three kinds:
+    A back off action may be one of three kinds:
 
     * We met a broken table that needs repair, or a table that
       is not present on this MySQL server and needs re-discovery.
@@ -3823,27 +3928,47 @@ request_backoff_action(enum_open_table_a
       locks is very deadlock-prone. If this is a multi- statement
       transaction that holds metadata locks for completed
       statements, we don't do it, and report an error instead.
+      The action type in this case is OT_DISCOVER or OT_REPAIR.
     * Our attempt to acquire an MDL lock lead to a deadlock,
       detected by the MDL deadlock detector. The current
       session was chosen a victim. If this is a multi-statement
-      transaction that holds metadata locks for completed statements,
-      restarting locking for the current statement may lead
-      to a livelock. Thus, again, if m_has_locks is set,
+      transaction that holds metadata locks taken by completed
+      statements, restarting locking for the current statement
+      may lead to a livelock. Releasing locks of completed
+      statements can not be done as will lead to violation
+      of ACID. Thus, again, if m_has_locks is set,
       we report an error. Otherwise, when there are no metadata
       locks other than which belong to this statement, we can
       try to recover from error by releasing all locks and
       restarting the pre-locking.
-    * Finally, we could have met a TABLE_SHARE with old version.
-      Again, if this is a first statement in a transaction we can
-      close all tables, release all metadata locks and wait for
-      the old version to go away. Otherwise, waiting with MDL locks
-      may lead to criss-cross wait between this connection and a
-      connection that has an open table and waits on a metadata lock,
-      i.e. to a deadlock.
-      Since there is no way to detect such a deadlock, we prevent
-      it by reporting an error.
+      Similarly, a deadlock error can occur when the
+      pre-locking process met a TABLE_SHARE that is being
+      flushed, and unsuccessfully waited for the flush to
+      complete. A deadlock in this case can happen, e.g.,
+      when our session is holding a metadata lock that
+      is being waited on by a session which is using
+      the table which is being flushed. The only way
+      to recover from this error is, again, to close all
+      open tables, release all locks, and retry pre-locking.
+      Action type name is OT_REOPEN_TABLES. Re-trying
+      while holding some locks may lead to a livelock,
+      and thus we don't do it.
+    * Finally, this session has open TABLEs from different
+      "generations" of the table cache. This can happen, e.g.,
+      when, after this session has successfully opened one
+      table used for a statement, FLUSH TABLES interfered and
+      expelled another table used in it. FLUSH TABLES then
+      blocks and waits on the table already opened by this
+      statement.
+      We detect this situation by ensuring that table cache
+      version of all tables used in a statement is the same.
+      If it isn't, all tables needs to be reopened.
+      Note, that we can always perform a reopen in this case,
+      even if we already have metadata locks, since we don't
+      keep tables open between statements and a livelock
+      is not possible.
   */
-  if (m_has_locks)
+  if (action_arg != OT_REOPEN_TABLES && m_has_locks)
   {
     my_error(ER_LOCK_DEADLOCK, MYF(0));
     return TRUE;
@@ -3889,11 +4014,9 @@ recover_from_failed_open(THD *thd)
   /* Execute the action. */
   switch (m_action)
   {
-    case OT_MDL_CONFLICT:
+    case OT_BACKOFF_AND_RETRY:
       break;
-    case OT_WAIT_TDC:
-      result= tdc_wait_for_old_versions(thd, &m_mdl_requests, get_timeout());
-      DBUG_ASSERT(thd->mysys_var->current_mutex == NULL);
+    case OT_REOPEN_TABLES:
       break;
     case OT_DISCOVER:
       {
@@ -3929,8 +4052,6 @@ recover_from_failed_open(THD *thd)
     default:
       DBUG_ASSERT(0);
   }
-  /* Remove all old requests, they will be re-added. */
-  m_mdl_requests.empty();
   /*
     Reset the pointers to conflicting MDL request and the
     TABLE_LIST element, set when we need auto-discovery or repair,
@@ -4051,8 +4172,6 @@ open_and_process_routine(THD *thd, Query
       if (rt != (Sroutine_hash_entry*)prelocking_ctx->sroutines_list.first ||
           mdl_type != MDL_key::PROCEDURE)
       {
-        ot_ctx->add_request(&rt->mdl_request);
-
         /*
           Since we acquire only shared lock on routines we don't
           need to care about global intention exclusive locks.
@@ -4729,6 +4848,8 @@ restart:
         }
         goto err;
       }
+
+      DEBUG_SYNC(thd, "open_tables_after_open_and_process_table");
     }
 
     /*
@@ -8597,17 +8718,6 @@ bool mysql_notify_thread_having_shared_l
     }
     mysql_mutex_unlock(&in_use->LOCK_thd_data);
   }
-  /*
-    Wake up threads waiting in tdc_wait_for_old_versions().
-    Normally such threads would already get blocked
-    in MDL subsystem, when trying to acquire a shared lock.
-    But in case a thread has an open HANDLER statement,
-    (and thus already grabbed a metadata lock), it gets
-    blocked only too late -- at the table cache level.
-    Starting from 5.5, this could also easily happen in
-    a multi-statement transaction.
-  */
-  broadcast_refresh();
   return signalled;
 }
 
@@ -8688,6 +8798,13 @@ void tdc_remove_table(THD *thd, enum_tdc
       /*
         Set share's version to zero in order to ensure that it gets
         automatically deleted once it is no longer referenced.
+
+        Note that code in TABLE_SHARE::wait_for_old_version() assumes
+        that marking share as old and removal of its unused tables
+        and of the share itself from TDC happens atomically under
+        protection of LOCK_open, or, putting it another way, that
+        TDC does not contain old shares which don't have any tables
+        used.
       */
       share->version= 0;
 
@@ -8703,85 +8820,6 @@ void tdc_remove_table(THD *thd, enum_tdc
 }
 
 
-/**
-   Wait until there are no old versions of tables in the table
-   definition cache for the metadata locks that we try to acquire.
-
-   @param thd      Thread context
-   @param context  Metadata locking context with locks.
-   @param timeout  Seconds to wait before reporting ER_LOCK_WAIT_TIMEOUT.
-*/
-
-static bool
-tdc_wait_for_old_versions(THD *thd, MDL_request_list *mdl_requests,
-                          ulong timeout)
-{
-  TABLE_SHARE *share;
-  const char *old_msg;
-  MDL_request *mdl_request;
-  struct timespec abstime;
-  set_timespec(abstime, timeout);
-  int wait_result= 0;
-
-  while (!thd->killed)
-  {
-    /*
-      We have to get rid of HANDLERs which are open by this thread
-      and have old TABLE versions. Otherwise we might get a deadlock
-      in situation when we are waiting for an old TABLE object which
-      corresponds to a HANDLER open by another session. And this
-      other session waits for our HANDLER object to get closed.
-
-      TODO: We should also investigate in which situations we have
-            to broadcast on COND_refresh because of this.
-    */
-    mysql_ha_flush(thd);
-
-    mysql_mutex_lock(&LOCK_open);
-
-    MDL_request_list::Iterator it(*mdl_requests);
-    while ((mdl_request= it++))
-    {
-      /* Skip requests on non-TDC objects. */
-      if (mdl_request->key.mdl_namespace() != MDL_key::TABLE)
-        continue;
-
-      if ((share= get_cached_table_share(mdl_request->key.db_name(),
-                                         mdl_request->key.name())) &&
-          share->needs_reopen())
-        break;
-    }
-    if (!mdl_request)
-    {
-      /*
-        Reset wait_result here in case this was the final check
-        after getting a timeout from mysql_cond_timedwait().
-      */
-      wait_result= 0;
-      mysql_mutex_unlock(&LOCK_open);
-      break;
-    }
-    if (wait_result == ETIMEDOUT || wait_result == ETIME)
-    {
-      /*
-        Test for timeout here instead of right after mysql_cond_timedwait().
-        This allows for a final iteration and a final check before reporting
-        ER_LOCK_WAIT_TIMEOUT.
-      */
-      mysql_mutex_unlock(&LOCK_open);
-      my_error(ER_LOCK_WAIT_TIMEOUT, MYF(0));
-      break;
-    }
-    old_msg= thd->enter_cond(&COND_refresh, &LOCK_open,
-                             "Waiting for table flush");
-    wait_result= mysql_cond_timedwait(&COND_refresh, &LOCK_open, &abstime);
-    /* LOCK_open mutex is unlocked by THD::exit_cond() as side-effect. */
-    thd->exit_cond(old_msg);
-  }
-  return thd->killed || wait_result == ETIMEDOUT || wait_result == ETIME;
-}
-
-
 int setup_ftfuncs(SELECT_LEX *select_lex)
 {
   List_iterator<Item_func_match> li(*(select_lex->ftfunc_list)),

=== modified file 'sql/sql_base.h'
--- a/sql/sql_base.h	2010-08-09 18:33:47 +0000
+++ b/sql/sql_base.h	2010-08-12 13:50:23 +0000
@@ -70,6 +70,7 @@ enum enum_tdc_remove_table_type {TDC_RT_
 #define RTFC_CHECK_KILLED_FLAG      0x0004
 
 bool check_dup(const char *db, const char *name, TABLE_LIST *tables);
+extern mysql_mutex_t LOCK_open;
 bool table_cache_init(void);
 void table_cache_free(void);
 bool table_def_init(void);
@@ -226,7 +227,8 @@ TABLE *open_performance_schema_table(THD
                                      Open_tables_state *backup);
 void close_performance_schema_table(THD *thd, Open_tables_state *backup);
 
-bool close_cached_tables(THD *thd, TABLE_LIST *tables, bool wait_for_refresh);
+bool close_cached_tables(THD *thd, TABLE_LIST *tables,
+                         bool wait_for_refresh, ulong timeout);
 bool close_cached_connection_tables(THD *thd, bool wait_for_refresh,
                                     LEX_STRING *connect_string);
 void close_all_tables_for_name(THD *thd, TABLE_SHARE *share,
@@ -426,8 +428,8 @@ public:
   enum enum_open_table_action
   {
     OT_NO_ACTION= 0,
-    OT_MDL_CONFLICT,
-    OT_WAIT_TDC,
+    OT_BACKOFF_AND_RETRY,
+    OT_REOPEN_TABLES,
     OT_DISCOVER,
     OT_REPAIR
   };
@@ -437,9 +439,6 @@ public:
   bool request_backoff_action(enum_open_table_action action_arg,
                               TABLE_LIST *table);
 
-  void add_request(MDL_request *request)
-  { m_mdl_requests.push_front(request); }
-
   bool can_recover_from_failed_open() const
   { return m_action != OT_NO_ACTION; }
 
@@ -461,8 +460,6 @@ public:
 
   uint get_flags() const { return m_flags; }
 private:
-  /** List of requests for all locks taken so far. Used for waiting on locks. */
-  MDL_request_list m_mdl_requests;
   /**
     For OT_DISCOVER and OT_REPAIR actions, the table list element for
     the table which definition should be re-discovered or which

=== modified file 'sql/sql_handler.cc'
--- a/sql/sql_handler.cc	2010-07-27 10:25:53 +0000
+++ b/sql/sql_handler.cc	2010-08-12 13:50:23 +0000
@@ -934,7 +934,7 @@ void mysql_ha_flush(THD *thd)
         ((hash_tables->table->mdl_ticket &&
          hash_tables->table->mdl_ticket->has_pending_conflicting_lock()) ||
          (!hash_tables->table->s->tmp_table &&
-          hash_tables->table->s->needs_reopen())))
+          hash_tables->table->s->has_old_version())))
       mysql_ha_close_table(thd, hash_tables);
   }
 

=== modified file 'sql/sql_insert.cc'
--- a/sql/sql_insert.cc	2010-08-09 18:33:47 +0000
+++ b/sql/sql_insert.cc	2010-08-12 13:50:23 +0000
@@ -2705,7 +2705,7 @@ bool Delayed_insert::handle_inserts(void
 
   thd_proc_info(&thd, "insert");
   max_rows= delayed_insert_limit;
-  if (thd.killed || table->s->needs_reopen())
+  if (thd.killed || table->s->has_old_version())
   {
     thd.killed= THD::KILL_CONNECTION;
     max_rows= ULONG_MAX;                     // Do as much as possible

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2010-08-09 18:33:47 +0000
+++ b/sql/sql_parse.cc	2010-08-12 13:50:23 +0000
@@ -1756,6 +1756,7 @@ static bool flush_tables_with_read_lock(
 {
   Lock_tables_prelocking_strategy lock_tables_prelocking_strategy;
   TABLE_LIST *table_list;
+  MDL_request_list mdl_requests;
 
   /*
     This is called from SQLCOM_FLUSH, the transaction has
@@ -1774,22 +1775,26 @@ static bool flush_tables_with_read_lock(
   }
 
   /*
-    @todo: Since lock_table_names() acquires a global IX
-    lock, this actually waits for a GRL in another connection.
-    We are thus introducing an incompatibility.
-    Do nothing for now, since not taking a global IX violates
-    current internal MDL asserts, fix after discussing with
-    Dmitry.
+    Acquire SNW locks on tables to be flushed. We can't use
+    lock_table_names() here as this call will also acquire global IX
+    and database-scope IX locks on the tables, and this will make
+    this statement incompatible with FLUSH TABLES WITH READ LOCK.
   */
-  if (lock_table_names(thd, all_tables, 0, thd->variables.lock_wait_timeout,
-                       MYSQL_OPEN_SKIP_TEMPORARY))
+  for (table_list= all_tables; table_list;
+       table_list= table_list->next_global)
+    mdl_requests.push_front(&table_list->mdl_request);
+
+  if (thd->mdl_context.acquire_locks(&mdl_requests,
+                                     thd->variables.lock_wait_timeout))
     goto error;
 
+  DEBUG_SYNC(thd,"flush_tables_with_read_lock_after_acquire_locks");
+
   for (table_list= all_tables; table_list;
        table_list= table_list->next_global)
   {
-    /* Remove the table from cache. */
-    tdc_remove_table(thd, TDC_RT_REMOVE_ALL,
+    /* Request removal of table from cache. */
+    tdc_remove_table(thd, TDC_RT_REMOVE_UNUSED,
                      table_list->db,
                      table_list->table_name, FALSE);
 
@@ -1798,6 +1803,11 @@ static bool flush_tables_with_read_lock(
     table_list->open_type= OT_BASE_ONLY;      /* Ignore temporary tables. */
   }
 
+  /*
+    Before opening and locking tables the below call also waits
+    for old shares to go away, so the fact that we don't pass
+    MYSQL_LOCK_IGNORE_FLUSH flag to it is important.
+  */
   if  (open_and_lock_tables(thd, all_tables, FALSE,
                             MYSQL_OPEN_HAS_MDL_LOCK,
                             &lock_tables_prelocking_strategy) ||
@@ -1808,17 +1818,11 @@ static bool flush_tables_with_read_lock(
   thd->variables.option_bits|= OPTION_TABLE_LOCK;
 
   /*
-    Downgrade the exclusive locks.
-    Use MDL_SHARED_NO_WRITE as the intended
-    post effect of this call is identical
-    to LOCK TABLES <...> READ, and we didn't use
-    thd->in_lock_talbes and thd->sql_command= SQLCOM_LOCK_TABLES
-    hacks to enter the LTM.
-    @todo: release the global IX lock here!!!
+    We don't downgrade MDL_SHARED_NO_WRITE here as the intended
+    post effect of this call is identical to LOCK TABLES <...> READ,
+    and we didn't use thd->in_lock_talbes and
+    thd->sql_command= SQLCOM_LOCK_TABLES hacks to enter the LTM.
   */
-  for (table_list= all_tables; table_list;
-       table_list= table_list->next_global)
-    table_list->mdl_request.ticket->downgrade_exclusive_lock(MDL_SHARED_NO_WRITE);
 
   return FALSE;
 
@@ -6852,10 +6856,11 @@ bool reload_acl_and_cache(THD *thd, ulon
       tmp_write_to_binlog= 0;
       if (thd->global_read_lock.lock_global_read_lock(thd))
 	return 1;                               // Killed
-      if (close_cached_tables(thd, tables, (options & REFRESH_FAST) ?
-                              FALSE : TRUE))
-          result= 1;
-      
+      if (close_cached_tables(thd, tables,
+                              ((options & REFRESH_FAST) ?  FALSE : TRUE),
+                              thd->variables.lock_wait_timeout))
+        result= 1;
+
       if (thd->global_read_lock.make_global_read_lock_block_commit(thd)) // Killed
       {
         /* Don't leave things in a half-locked state */
@@ -6892,8 +6897,10 @@ bool reload_acl_and_cache(THD *thd, ulon
         }
       }
 
-      if (close_cached_tables(thd, tables, (options & REFRESH_FAST) ?
-                              FALSE : TRUE))
+      if (close_cached_tables(thd, tables,
+                              ((options & REFRESH_FAST) ?  FALSE : TRUE),
+                              (thd ? thd->variables.lock_wait_timeout :
+                               LONG_TIMEOUT)))
         result= 1;
     }
     my_dbopt_cleanup();

=== modified file 'sql/sql_show.cc'
--- a/sql/sql_show.cc	2010-08-09 18:33:47 +0000
+++ b/sql/sql_show.cc	2010-08-12 13:50:23 +0000
@@ -3273,8 +3273,8 @@ static int fill_schema_table_from_frm(TH
 
   /*
     TODO: investigate if in this particular situation we can get by
-          simply obtaining internal lock of data-dictionary (ATM it
-          is LOCK_open) instead of obtaning full-blown metadata lock.
+          simply obtaining internal lock of the data-dictionary
+          instead of obtaining full-blown metadata lock.
   */
   if (try_acquire_high_prio_shared_mdl_lock(thd, &table_list, can_deadlock))
   {

=== modified file 'sql/sql_yacc.yy'
--- a/sql/sql_yacc.yy	2010-07-29 12:32:11 +0000
+++ b/sql/sql_yacc.yy	2010-08-12 13:50:23 +0000
@@ -11202,9 +11202,8 @@ opt_with_read_lock:
           {
             TABLE_LIST *tables= Lex->query_tables;
             Lex->type|= REFRESH_READ_LOCK;
-            /* We acquire an X lock currently and then downgrade. */
             for (; tables; tables= tables->next_global)
-              tables->mdl_request.set_type(MDL_EXCLUSIVE);
+              tables->mdl_request.set_type(MDL_SHARED_NO_WRITE);
           }
         ;
 

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-08-09 18:33:47 +0000
+++ b/sql/sys_vars.cc	2010-08-12 13:50:23 +0000
@@ -1492,7 +1492,8 @@ static bool fix_read_only(sys_var *self,
     can cause to wait on a read lock, it's required for the client application
     to unlock everything, and acceptable for the server to wait on all locks.
   */
-  if ((result= close_cached_tables(thd, NULL, TRUE)))
+  if ((result= close_cached_tables(thd, NULL, TRUE,
+                                   thd->variables.lock_wait_timeout)))
     goto end_with_read_lock;
 
   if ((result= thd->global_read_lock.make_global_read_lock_block_commit(thd)))

=== modified file 'sql/table.cc'
--- a/sql/table.cc	2010-07-29 12:32:11 +0000
+++ b/sql/table.cc	2010-08-12 13:50:23 +0000
@@ -34,6 +34,7 @@
 #include <m_ctype.h>
 #include "my_md5.h"
 #include "sql_select.h"
+#include "mdl.h"                 // MDL_wait_for_graph_visitor
 
 /* INFORMATION_SCHEMA name */
 LEX_STRING INFORMATION_SCHEMA_NAME= {C_STRING_WITH_LEN("information_schema")};
@@ -325,6 +326,7 @@ TABLE_SHARE *alloc_table_share(TABLE_LIS
 
     share->used_tables.empty();
     share->free_tables.empty();
+    share->m_flush_tickets.empty();
 
     memcpy((char*) &share->mem_root, (char*) &mem_root, sizeof(mem_root));
     mysql_mutex_init(key_TABLE_SHARE_LOCK_ha_data,
@@ -389,52 +391,92 @@ void init_tmp_table_share(THD *thd, TABL
 
   share->used_tables.empty();
   share->free_tables.empty();
+  share->m_flush_tickets.empty();
 
   DBUG_VOID_RETURN;
 }
 
 
+/**
+  Release resources (plugins) used by the share and free its memory.
+  TABLE_SHARE is self-contained -- it's stored in its own MEM_ROOT.
+  Free this MEM_ROOT.
+*/
+
+void TABLE_SHARE::destroy()
+{
+  uint idx;
+  KEY *info_it;
+
+  /* The mutex is initialized only for shares that are part of the TDC */
+  if (tmp_table == NO_TMP_TABLE)
+    mysql_mutex_destroy(&LOCK_ha_data);
+  my_hash_free(&name_hash);
+
+  plugin_unlock(NULL, db_plugin);
+  db_plugin= NULL;
+
+  /* Release fulltext parsers */
+  info_it= key_info;
+  for (idx= keys; idx; idx--, info_it++)
+  {
+    if (info_it->flags & HA_USES_PARSER)
+    {
+      plugin_unlock(NULL, info_it->parser);
+      info_it->flags= 0;
+    }
+  }
+  /*
+    Make a copy since the share is allocated in its own root,
+    and free_root() updates its argument after freeing the memory.
+  */
+  MEM_ROOT own_root= mem_root;
+  free_root(&own_root, MYF(0));
+}
+
 /*
   Free table share and memory used by it
 
   SYNOPSIS
     free_table_share()
     share		Table share
-
-  NOTES
-    share->mutex must be locked when we come here if it's not a temp table
 */
 
 void free_table_share(TABLE_SHARE *share)
 {
-  MEM_ROOT mem_root;
-  uint idx;
-  KEY *key_info;
   DBUG_ENTER("free_table_share");
   DBUG_PRINT("enter", ("table: %s.%s", share->db.str, share->table_name.str));
   DBUG_ASSERT(share->ref_count == 0);
 
-  /* The mutex is initialized only for shares that are part of the TDC */
-  if (share->tmp_table == NO_TMP_TABLE)
-    mysql_mutex_destroy(&share->LOCK_ha_data);
-  my_hash_free(&share->name_hash);
-
-  plugin_unlock(NULL, share->db_plugin);
-  share->db_plugin= NULL;
-
-  /* Release fulltext parsers */
-  key_info= share->key_info;
-  for (idx= share->keys; idx; idx--, key_info++)
+  if (share->m_flush_tickets.is_empty())
   {
-    if (key_info->flags & HA_USES_PARSER)
-    {
-      plugin_unlock(NULL, key_info->parser);
-      key_info->flags= 0;
-    }
+    /*
+      No threads are waiting for this share to be flushed (the
+      share is not old, is for a temporary table, or just nobody
+      happens to be waiting for it). Destroy it.
+    */
+    share->destroy();
+  }
+  else
+  {
+    Wait_for_flush_list::Iterator it(share->m_flush_tickets);
+    Wait_for_flush *ticket;
+    /*
+      We're about to iterate over a list that is used
+      concurrently. Make sure this never happens without a lock.
+    */
+    mysql_mutex_assert_owner(&LOCK_open);
+
+    while ((ticket= it++))
+      (void) ticket->get_ctx()->m_wait.set_status(MDL_wait::GRANTED);
+    /*
+      If there are threads waiting for this share to be flushed,
+      the last one to receive the notification will destroy the
+      share. At this point the share is removed from the table
+      definition cache, so is OK to proceed here without waiting
+      for this thread to do the work.
+    */
   }
-  /* We must copy mem_root from share because share is allocated through it */
-  memcpy((char*) &mem_root, (char*) &share->mem_root, sizeof(mem_root));
-  free_root(&mem_root, MYF(0));                 // Free's share
   DBUG_VOID_RETURN;
 }
 
@@ -2995,6 +3037,192 @@ Table_check_intact::check(TABLE *table, 
 }
 
 
+/**
+  Traverse portion of wait-for graph which is reachable through edge
+  represented by this flush ticket in search for deadlocks.
+
+  @retval TRUE  A deadlock is found. A victim is remembered
+                by the visitor.
+  @retval FALSE Success, no deadlocks.
+*/
+
+bool Wait_for_flush::accept_visitor(MDL_wait_for_graph_visitor *gvisitor)
+{
+  return m_share->visit_subgraph(this, gvisitor);
+}
+
+
+uint Wait_for_flush::get_deadlock_weight() const
+{
+  return m_deadlock_weight;
+}
+
+
+/**
+  Traverse portion of wait-for graph which is reachable through this
+  table share in search for deadlocks.
+
+  @param waiting_ticket  Ticket representing wait for this share.
+  @param dvisitor        Deadlock detection visitor.
+
+  @retval TRUE  A deadlock is found. A victim is remembered
+                by the visitor.
+  @retval FALSE No deadlocks, it's OK to begin wait.
+*/
+
+bool TABLE_SHARE::visit_subgraph(Wait_for_flush *wait_for_flush,
+                                 MDL_wait_for_graph_visitor *gvisitor)
+{
+  TABLE *table;
+  MDL_context *src_ctx= wait_for_flush->get_ctx();
+  bool result= TRUE;
+
+  /*
+    To protect used_tables list from being concurrently modified
+    while we are iterating through it we acquire LOCK_open.
+    This does not introduce deadlocks in the deadlock detector
+    because we won't try to acquire LOCK_open while
+    holding a write-lock on MDL_lock::m_rwlock.
+  */
+  if (gvisitor->m_lock_open_count++ == 0)
+    mysql_mutex_lock(&LOCK_open);
+
+  I_P_List_iterator <TABLE, TABLE_share> tables_it(used_tables);
+
+  /*
+    In case of multiple searches running in parallel, avoid going
+    over the same loop twice and shortcut the search.
+    Do it after taking the lock to weed out unnecessary races.
+  */
+  if (src_ctx->m_wait.get_status() != MDL_wait::EMPTY)
+  {
+    result= FALSE;
+    goto end;
+  }
+
+  if (gvisitor->enter_node(src_ctx))
+    goto end;
+
+  while ((table= tables_it++))
+  {
+    if (gvisitor->inspect_edge(&table->in_use->mdl_context))
+    {
+      goto end_leave_node;
+    }
+  }
+
+  tables_it.rewind();
+  while ((table= tables_it++))
+  {
+    if (table->in_use->mdl_context.visit_subgraph(gvisitor))
+    {
+      goto end_leave_node;
+    }
+  }
+
+  result= FALSE;
+
+end_leave_node:
+  gvisitor->leave_node(src_ctx);
+
+end:
+  if (gvisitor->m_lock_open_count-- == 1)
+    mysql_mutex_unlock(&LOCK_open);
+
+  return result;
+}
+
+
+/**
+  Wait until the subject share is removed from the table
+  definition cache and make sure it's destroyed.
+
+  @param mdl_context     MDL context for thread which is going to wait.
+  @param abstime         Timeout for waiting as absolute time value.
+  @param deadlock_weight Weight of this wait for deadlock detector.
+
+  @pre LOCK_open is write locked, the share is used (has
+       non-zero reference count), is marked for flush and
+       this connection does not reference the share.
+       LOCK_open will be unlocked temporarily during execution.
+
+  @retval FALSE - Success.
+  @retval TRUE  - Error (OOM, deadlock, timeout, etc...).
+*/
+
+bool TABLE_SHARE::wait_for_old_version(THD *thd, struct timespec *abstime,
+                                       uint deadlock_weight)
+{
+  MDL_context *mdl_context= &thd->mdl_context;
+  Wait_for_flush ticket(mdl_context, this, deadlock_weight);
+  MDL_wait::enum_wait_status wait_status;
+
+  mysql_mutex_assert_owner(&LOCK_open);
+  /*
+    We should enter this method only when share's version is not
+    up to date and the share is referenced. Otherwise our
+    thread will never be woken up from wait.
+  */
+  DBUG_ASSERT(version != refresh_version && ref_count != 0);
+
+  m_flush_tickets.push_front(&ticket);
+
+  mdl_context->m_wait.reset_status();
+
+  mysql_mutex_unlock(&LOCK_open);
+
+  mdl_context->will_wait_for(&ticket);
+
+  mdl_context->find_deadlock();
+
+  wait_status= mdl_context->m_wait.timed_wait(thd, abstime, TRUE,
+                                              "Waiting for table flush");
+
+  mdl_context->done_waiting_for();
+
+  mysql_mutex_lock(&LOCK_open);
+
+  m_flush_tickets.remove(&ticket);
+
+  if (m_flush_tickets.is_empty() && ref_count == 0)
+  {
+    /*
+      If our thread was the last one using the share,
+      we must destroy it here.
+    */
+    destroy();
+  }
+
+  /*
+    In cases when our wait was aborted by KILL statement,
+    a deadlock or a timeout, the share might still be referenced,
+    so we don't delete it. Note, that we can't determine this
+    condition by checking wait_status alone, since, for example,
+    a timeout can happen after all references to the table share
+    were released, but before the share is removed from the
+    cache and we receive the notification. This is why
+    we first destroy the share, and then look at
+    wait_status.
+  */
+  switch (wait_status)
+  {
+  case MDL_wait::GRANTED:
+    return FALSE;
+  case MDL_wait::VICTIM:
+    my_error(ER_LOCK_DEADLOCK, MYF(0));
+    return TRUE;
+  case MDL_wait::TIMEOUT:
+    my_error(ER_LOCK_WAIT_TIMEOUT, MYF(0));
+    return TRUE;
+  case MDL_wait::KILLED:
+    return TRUE;
+  default:
+    DBUG_ASSERT(0);
+    return TRUE;
+  }
+}
+
+
 /*
   Create Item_field for each column in the table.
 

=== modified file 'sql/table.h'
--- a/sql/table.h	2010-07-29 03:24:35 +0000
+++ b/sql/table.h	2010-08-12 13:50:23 +0000
@@ -508,7 +508,46 @@ public:
 };
 
 
-/*
+/**
+  Class representing the fact that some thread waits for table
+  share to be flushed. Is used to represent information about
+  such waits in MDL deadlock detector.
+*/
+
+class Wait_for_flush : public MDL_wait_for_subgraph
+{
+  MDL_context *m_ctx;
+  TABLE_SHARE *m_share;
+  uint m_deadlock_weight;
+public:
+  Wait_for_flush(MDL_context *ctx_arg, TABLE_SHARE *share_arg,
+               uint deadlock_weight_arg)
+    : m_ctx(ctx_arg), m_share(share_arg),
+      m_deadlock_weight(deadlock_weight_arg)
+  {}
+
+  MDL_context *get_ctx() const { return m_ctx; }
+
+  virtual bool accept_visitor(MDL_wait_for_graph_visitor *dvisitor);
+
+  virtual uint get_deadlock_weight() const;
+
+  /**
+    Pointers for participating in the list of waiters for table share.
+  */
+  Wait_for_flush *next_in_share;
+  Wait_for_flush **prev_in_share;
+};
+
+
+typedef I_P_List <Wait_for_flush,
+                  I_P_List_adapter<Wait_for_flush,
+                                   &Wait_for_flush::next_in_share,
+                                   &Wait_for_flush::prev_in_share> >
+                 Wait_for_flush_list;
+
+
+/**
   This structure is shared between different table objects. There is one
   instance of table share per one table in the database.
 */
@@ -662,6 +701,11 @@ struct TABLE_SHARE
   /** Instrumentation for this table share. */
   PSI_table_share *m_psi;
 
+  /**
+    List of tickets representing threads waiting for the share to be flushed.
+  */
+  Wait_for_flush_list m_flush_tickets;
+
   /*
     Set share's table cache key and update its db and table name appropriately.
 
@@ -731,10 +775,8 @@ struct TABLE_SHARE
   }
 
 
-  /*
-    Must all TABLEs be reopened?
-  */
-  inline bool needs_reopen() const
+  /** Is this table share being expelled from the table definition cache?  */
+  inline bool has_old_version() const
   {
     return version != refresh_version;
   }
@@ -837,6 +879,13 @@ struct TABLE_SHARE
     return (tmp_table == SYSTEM_TMP_TABLE || is_view) ? 0 : table_map_id;
   }
 
+  bool visit_subgraph(Wait_for_flush *waiting_ticket,
+                      MDL_wait_for_graph_visitor *gvisitor);
+
+  bool wait_for_old_version(THD *thd, struct timespec *abstime,
+                            uint deadlock_weight);
+  /** Release resources and free memory occupied by the table share. */
+  void destroy();
 };
 
 
@@ -1084,9 +1133,7 @@ public:
     read_set= &def_read_set;
     write_set= &def_write_set;
   }
-  /*
-    Is this instance of the table should be reopen?
-  */
+  /** Should this instance of the table be reopened? */
   inline bool needs_reopen()
   { return !db_stat || m_needs_reopen; }
 


Attachment: [text/bzr-bundle] bzr/kostja@sun.com-20100812135023-7wpcdtenxd05xr35.bundle
Thread
bzr commit into mysql-5.5-bugfixing branch (kostja:3101) Bug#52044Konstantin Osipov12 Aug