List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:October 2 2008 10:28pm
Subject:bzr push into mysql-5.1 branch (jonas:2876)
View as plain text  
 2876 Jonas Oreland	2008-10-03 [merge]
      merge 63 to 64
added:
  mysql-test/suite/ndb/r/ndb_discover_db.result
  mysql-test/suite/ndb/r/ndb_discover_db2.result
  mysql-test/suite/ndb/t/ndb_discover_db.test
  mysql-test/suite/ndb/t/ndb_discover_db2-master.opt
  mysql-test/suite/ndb/t/ndb_discover_db2.test
  sql/ha_ndbcluster_lock_ext.h
modified:
  mysql-test/mysql-test-run.pl
  sql/Makefile.am
  sql/ha_ndbcluster.cc
  sql/ha_ndbcluster.h
  sql/ha_ndbcluster_binlog.cc
  sql/ha_ndbcluster_binlog.h
  sql/ha_ndbcluster_connection.cc
  sql/ha_ndbcluster_connection.h
  sql/handler.cc
  sql/handler.h
  sql/mysqld.cc
  sql/sql_db.cc
  sql/sql_delete.cc
  sql/sql_parse.cc
  sql/sql_rename.cc
  sql/sql_table.cc

=== modified file 'mysql-test/mysql-test-run.pl'
--- a/mysql-test/mysql-test-run.pl	2008-09-17 13:23:21 +0000
+++ b/mysql-test/mysql-test-run.pl	2008-10-02 22:28:22 +0000
@@ -4010,7 +4010,6 @@ sub mysqld_arguments ($$$$) {
       }
       if ( $mysql_version_id >= 50100 )
       {
-	mtr_add_arg($args, "%s--ndb-extra-logging", $prefix);
         if ( ! $glob_use_embedded_server )
         {
           mtr_add_arg($args, "%s--ndb-log-orig", $prefix);
@@ -4093,7 +4092,6 @@ sub mysqld_arguments ($$$$) {
       }
       if ( $mysql_version_id >= 50100 )
       {
-	mtr_add_arg($args, "%s--ndb-extra-logging", $prefix);
         if ( ! $glob_use_embedded_server )
         {
           mtr_add_arg($args, "%s--ndb-log-orig", $prefix);

=== added file 'mysql-test/suite/ndb/r/ndb_discover_db.result'
--- a/mysql-test/suite/ndb/r/ndb_discover_db.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_discover_db.result	2008-09-30 09:14:44 +0000
@@ -0,0 +1,7 @@
+drop database if exists discover_db;
+drop database if exists discover_db_2;
+create database discover_db;
+create table discover_db.t1 (a int key, b int) engine ndb;
+create database discover_db_2;
+alter database discover_db_2 character set binary;
+create table discover_db_2.t1 (a int key, b int) engine ndb;

=== added file 'mysql-test/suite/ndb/r/ndb_discover_db2.result'
--- a/mysql-test/suite/ndb/r/ndb_discover_db2.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_discover_db2.result	2008-09-30 09:14:44 +0000
@@ -0,0 +1,28 @@
+show create database discover_db;
+Database	Create Database
+discover_db	CREATE DATABASE `discover_db` /*!40100 DEFAULT CHARACTER SET latin1 */
+show create database discover_db_2;
+Database	Create Database
+discover_db_2	CREATE DATABASE `discover_db_2` /*!40100 DEFAULT CHARACTER SET binary */
+reset master;
+insert into discover_db.t1 values (1,1);
+show binlog events from <binlog_start>;
+Log_name	Pos	Event_type	Server_id	End_log_pos	Info
+master-bin1.000001	#	Query	102	#	BEGIN
+master-bin1.000001	#	Table_map	102	#	table_id: # (discover_db.t1)
+master-bin1.000001	#	Table_map	102	#	table_id: # (mysql.ndb_apply_status)
+master-bin1.000001	#	Write_rows	102	#	table_id: #
+master-bin1.000001	#	Write_rows	102	#	table_id: # flags: STMT_END_F
+master-bin1.000001	#	Query	102	#	COMMIT
+reset master;
+insert into discover_db_2.t1 values (1,1);
+show binlog events from <binlog_start>;
+Log_name	Pos	Event_type	Server_id	End_log_pos	Info
+master-bin1.000001	#	Query	102	#	BEGIN
+master-bin1.000001	#	Table_map	102	#	table_id: # (discover_db_2.t1)
+master-bin1.000001	#	Table_map	102	#	table_id: # (mysql.ndb_apply_status)
+master-bin1.000001	#	Write_rows	102	#	table_id: #
+master-bin1.000001	#	Write_rows	102	#	table_id: # flags: STMT_END_F
+master-bin1.000001	#	Query	102	#	COMMIT
+drop database discover_db;
+drop database discover_db_2;

=== added file 'mysql-test/suite/ndb/t/ndb_discover_db.test'
--- a/mysql-test/suite/ndb/t/ndb_discover_db.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_discover_db.test	2008-09-30 09:14:44 +0000
@@ -0,0 +1,33 @@
+-- source include/have_multi_ndb.inc
+-- source include/have_binlog_format_mixed_or_row.inc
+
+-- disable_warnings
+drop database if exists discover_db;
+drop database if exists discover_db_2;
+-- enable_warnings
+
+#
+# Prepare for testing database discovery by creating
+# databases, and removing them on one mysqld
+# The discovery happens in ndb_discover_db2.test
+#
+
+# check that created database is discovered
+create database discover_db;
+create table discover_db.t1 (a int key, b int) engine ndb;
+
+# check that altered database is discovered
+create database discover_db_2;
+alter database discover_db_2 character set binary;
+create table discover_db_2.t1 (a int key, b int) engine ndb;
+
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db/t1.frm
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db/t1.ndb
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db/db.opt
+-- rmdir $MYSQLTEST_VARDIR/master1-data/discover_db
+
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db_2/t1.frm
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db_2/t1.ndb
+-- remove_file $MYSQLTEST_VARDIR/master1-data/discover_db_2/db.opt
+-- rmdir $MYSQLTEST_VARDIR/master1-data/discover_db_2
+

=== added file 'mysql-test/suite/ndb/t/ndb_discover_db2-master.opt'
--- a/mysql-test/suite/ndb/t/ndb_discover_db2-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_discover_db2-master.opt	2008-09-30 09:14:44 +0000
@@ -0,0 +1 @@
+--skip-external-locking

=== added file 'mysql-test/suite/ndb/t/ndb_discover_db2.test'
--- a/mysql-test/suite/ndb/t/ndb_discover_db2.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_discover_db2.test	2008-09-30 09:14:44 +0000
@@ -0,0 +1,21 @@
+-- source include/have_multi_ndb.inc
+-- source include/have_binlog_format_mixed_or_row.inc
+
+#
+# When this test started there no database on disk for server2
+# Check that table has been discovered correctly, and that the
+# binlog is updated correctly
+#
+
+-- connection server2
+show create database discover_db;
+show create database discover_db_2;
+reset master;
+insert into discover_db.t1 values (1,1);
+--source include/show_binlog_events2.inc
+reset master;
+insert into discover_db_2.t1 values (1,1);
+--source include/show_binlog_events2.inc
+
+drop database discover_db;
+drop database discover_db_2;

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2008-09-01 12:28:57 +0000
+++ b/sql/Makefile.am	2008-10-02 08:19:14 +0000
@@ -60,6 +60,7 @@ noinst_HEADERS =	item.h item_func.h item
 			ha_ndbcluster.h ha_ndbcluster_cond.h \
 			ha_ndbcluster_binlog.h ha_ndbcluster_tables.h \
 			ha_ndbcluster_connection.h ha_ndbcluster_connection.h \
+			ha_ndbcluster_lock_ext.h \
 			ha_partition.h rpl_constants.h \
 			opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
 			rpl_reporting.h \

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2008-10-01 04:35:20 +0000
+++ b/sql/ha_ndbcluster.cc	2008-10-02 22:28:22 +0000
@@ -68,12 +68,22 @@ TYPELIB ndb_distribution_typelib= { arra
 const char *opt_ndb_distribution= ndb_distribution_names[ND_KEYHASH];
 enum ndb_distribution opt_ndb_distribution_id= ND_KEYHASH;
 
+/*
+  Provided for testing purposes to be able to run full test suite
+  with --ndbcluster option without getting warnings about cluster
+  not being connected
+*/
+my_bool ndbcluster_silent= 0;
+
 // Default value for parallelism
 static const int parallelism= 0;
 
-// Default value for max number of transactions
-// createable against NDB from this handler
-static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
+/*
+  Default value for max number of transactions createable against NDB from
+  the handler. Should really be 2 but there is a transaction to much allocated
+  when lock table is used, and one extra to used for global schema lock.
+*/
+static const int max_transactions= 4;
 
 static uint ndbcluster_partition_flags();
 static int ndbcluster_init(void *);
@@ -155,7 +165,7 @@ HASH ndbcluster_open_tables;
 static uchar *ndbcluster_get_key(NDB_SHARE *share, size_t *length,
                                 my_bool not_used __attribute__((unused)));
 
-static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*,
+static int ndb_get_table_statistics(THD *thd, ha_ndbcluster*, bool, Ndb*,
                                     const NdbRecord *, struct Ndb_statistics *);
 
 THD *injector_thd= 0;
@@ -617,6 +627,9 @@ Thd_ndb::Thd_ndb()
   m_max_violation_count= 0;
   m_old_violation_count= 0;
   m_conflict_fn_usage_count= 0;
+  global_schema_lock_trans= NULL;
+  global_schema_lock_count= 0;
+  global_schema_lock_error= 0;
   init_alloc_root(&m_batch_mem_root, BATCH_FLUSH_SIZE/4, 0);
 }
 
@@ -624,19 +637,6 @@ Thd_ndb::~Thd_ndb()
 {
   if (ndb)
   {
-#ifndef DBUG_OFF
-    Ndb::Free_list_usage tmp;
-    tmp.m_name= 0;
-    while (ndb->get_free_list_usage(&tmp))
-    {
-      uint leaked= (uint) tmp.m_created - tmp.m_free;
-      if (leaked)
-        fprintf(stderr, "NDB: Found %u %s%s that %s not been released\n",
-                leaked, tmp.m_name,
-                (leaked == 1)?"":"'s",
-                (leaked == 1)?"has":"have");
-    }
-#endif
     delete ndb;
     ndb= NULL;
   }
@@ -3181,9 +3181,9 @@ int ha_ndbcluster::ndb_write_row(uchar *
     for (;;)
     {
       Ndb_tuple_id_range_guard g(m_share);
-      if (ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1) == -1)
+      if (ndb->getAutoIncrementValue(m_table, g.range, auto_value, 1000) == -1)
       {
-	if (--retries &&
+	if (--retries && !thd->killed &&
 	    ndb->getNdbError().status == NdbError::TemporaryError)
 	{
 	  do_retry_sleep(retry_sleep);
@@ -5307,6 +5307,11 @@ int ha_ndbcluster::start_statement(THD *
     /* This is currently dead code in wait for implementation in NDB */
     /* lockThisTable(); */
     DBUG_PRINT("info", ("Locking the table..." ));
+#ifdef NOT_YET
+    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 0,
+                        "Table only locked locally in this mysqld", "NDB");
+#endif
   }
   DBUG_RETURN(0);
 }
@@ -6308,6 +6313,12 @@ int ha_ndbcluster::create(const char *na
     DBUG_RETURN(my_errno);
   }
 
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+
+  if (!((thd_ndb->options & TNO_NO_LOCK_SCHEMA_OP) ||
+        ndbcluster_has_global_schema_lock(thd_ndb)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::create"));
   /*
     Don't allow table creation unless
     schema distribution table is setup
@@ -7037,6 +7048,10 @@ int ha_ndbcluster::rename_table(const ch
   if (check_ndb_connection(thd))
     DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
 
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::rename_table"));
+
   Ndb *ndb= get_ndb(thd);
   ndb->setDatabaseName(old_dbname);
   dict= ndb->getDictionary();
@@ -7418,7 +7433,6 @@ int ha_ndbcluster::delete_table(const ch
   if (!ndb_schema_share)
   {
     DBUG_PRINT("info", ("Schema distribution table not setup"));
-    DBUG_ASSERT(ndb_schema_share);
     error= HA_ERR_NO_CONNECTION;
     goto err;
   }
@@ -7430,6 +7444,11 @@ int ha_ndbcluster::delete_table(const ch
   }
 
   ndb= get_ndb(thd);
+
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::delete_table"));
+
   /*
     Drop table in ndb.
     If it was already gone it might have been dropped
@@ -7484,7 +7503,7 @@ void ha_ndbcluster::get_auto_increment(u
         ndb->readAutoIncrementValue(m_table, g.range, auto_value) ||
         ndb->getAutoIncrementValue(m_table, g.range, auto_value, cache_size, increment, offset))
     {
-      if (--retries &&
+      if (--retries && !thd->killed &&
           ndb->getNdbError().status == NdbError::TemporaryError)
       {
         do_retry_sleep(retry_sleep);
@@ -8191,7 +8210,10 @@ err:
                              share->key, share->use_count));
     free_share(&share);
   }
-  if (ndb_error.code)
+  /*
+    ndbcluster_silent - avoid "cluster disconnected error"
+  */
+  if (ndb_error.code && (!ndbcluster_silent || ndb_error.code != 4009))
   {
     ERR_RETURN(ndb_error);
   }
@@ -8215,7 +8237,15 @@ int ndbcluster_table_exists_in_engine(ha
   NDBDICT* dict= ndb->getDictionary();
   NdbDictionary::Dictionary::List list;
   if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
+  {
+    /*
+      ndbcluster_silent
+      - avoid "cluster failure" warning if cluster is not connected
+    */
+    if (ndbcluster_silent && dict->getNdbError().code == 4009)
+      DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
     ERR_RETURN(dict->getNdbError());
+  }
   for (uint i= 0 ; i < list.count ; i++)
   {
     NdbDictionary::Dictionary::List::Element& elmt= list.elements[i];
@@ -8315,7 +8345,6 @@ static void ndbcluster_drop_database(han
   if (!ndb_schema_share)
   {
     DBUG_PRINT("info", ("Schema distribution table not setup"));
-    DBUG_ASSERT(ndb_schema_share);
     DBUG_VOID_RETURN;
   }
   ndbcluster_drop_database_impl(thd, path);
@@ -8492,6 +8521,10 @@ int ndbcluster_find_files(handlerton *ht
   if (dir)
     DBUG_RETURN(0); // Discover of databases not yet supported
 
+  Ndbcluster_global_schema_lock_guard ndbcluster_global_schema_lock_guard(thd);
+  if (ndbcluster_global_schema_lock_guard.lock())
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
   // List tables in NDB
   NDBDICT *dict= ndb->getDictionary();
   if (dict->listObjects(list, 
@@ -8777,9 +8810,7 @@ static int ndbcluster_init(void *p)
     h->alter_partition_flags=
       ndbcluster_alter_partition_flags;             /* Alter table flags */
     h->fill_files_table= ndbcluster_fill_files_table;
-#ifdef HAVE_NDB_BINLOG
     ndbcluster_binlog_init_handlerton();
-#endif
     h->flags=            HTON_CAN_RECREATE | HTON_TEMPORARY_NOT_SUPPORTED;
     h->discover=         ndbcluster_discover;
     h->find_files= ndbcluster_find_files;
@@ -8880,22 +8911,6 @@ static int ndbcluster_end(handlerton *ht
   }
   hash_free(&ndbcluster_open_tables);
 
-  if (g_ndb)
-  {
-#ifndef DBUG_OFF
-    Ndb::Free_list_usage tmp;
-    tmp.m_name= 0;
-    while (g_ndb->get_free_list_usage(&tmp))
-    {
-      uint leaked= (uint) tmp.m_created - tmp.m_free;
-      if (leaked)
-        fprintf(stderr, "NDB: Found %u %s%s that %s not been released\n",
-                leaked, tmp.m_name,
-                (leaked == 1)?"":"'s",
-                (leaked == 1)?"has":"have");
-    }
-#endif
-  }
   ndbcluster_disconnect();
 
   // cleanup ndb interface
@@ -9264,7 +9279,7 @@ uint ndb_get_commitcount(THD *thd, char 
   {
     Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
     if (ndbtab_g.get_table() == 0
-        || ndb_get_table_statistics(NULL, 
+        || ndb_get_table_statistics(thd, NULL, 
                                     FALSE, 
                                     ndb, 
                                     ndbtab_g.get_table()->getDefaultRecord(),
@@ -9954,7 +9969,7 @@ int ha_ndbcluster::update_stats(THD *thd
     {
       DBUG_RETURN(my_errno= HA_ERR_OUT_OF_MEM);
     }
-    if (int err= ndb_get_table_statistics(this, TRUE, ndb,
+    if (int err= ndb_get_table_statistics(thd, this, TRUE, ndb,
                                           m_ndb_record, &stat))
     {
       DBUG_RETURN(err);
@@ -9992,7 +10007,7 @@ int ha_ndbcluster::update_stats(THD *thd
 
 static 
 int
-ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb,
+ndb_get_table_statistics(THD *thd, ha_ndbcluster* file, bool report_error, Ndb* ndb,
                          const NdbRecord *record,
                          struct Ndb_statistics * ndbstat)
 {
@@ -10147,7 +10162,8 @@ retry:
       ndb->closeTransaction(pTrans);
       pTrans= NULL;
     }
-    if (error.status == NdbError::TemporaryError && retries--)
+    if (error.status == NdbError::TemporaryError &&
+        retries-- && !thd->killed)
     {
       do_retry_sleep(retry_sleep);
       continue;
@@ -11049,7 +11065,7 @@ pthread_handler_t ndb_util_thread_func(v
         }
         Ndb_table_guard ndbtab_g(ndb->getDictionary(), share->table_name);
         if (ndbtab_g.get_table() &&
-            ndb_get_table_statistics(NULL, FALSE, ndb,
+            ndb_get_table_statistics(thd, NULL, FALSE, ndb,
                                      ndbtab_g.get_table()->getDefaultRecord(), 
                                      &stat) == 0)
         {
@@ -11785,6 +11801,10 @@ int ha_ndbcluster::alter_table_phase1(TH
   adding=  adding | HA_ADD_INDEX | HA_ADD_UNIQUE_INDEX;
   dropping= dropping | HA_DROP_INDEX | HA_DROP_UNIQUE_INDEX;
 
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::alter_table_phase1"));
+
   if (!(alter_data= new NDB_ALTER_DATA(dict, m_table)))
     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
   old_tab= alter_data->old_table;
@@ -12007,6 +12027,10 @@ int ha_ndbcluster::alter_table_phase2(TH
   DBUG_ENTER("alter_table_phase2");
   dropping= dropping  | HA_DROP_INDEX | HA_DROP_UNIQUE_INDEX;
 
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::alter_table_phase2"));
+
   if ((*alter_flags & dropping).is_set())
   {
     /* Tell the handler to finally drop the indexes. */
@@ -12059,6 +12083,10 @@ int ha_ndbcluster::alter_table_phase3(TH
 {
   DBUG_ENTER("alter_table_phase3");
 
+  if (!ndbcluster_has_global_schema_lock(get_thd_ndb(thd)))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ha_ndbcluster::alter_table_phase3"));
+
   const char *db= table->s->db.str;
   const char *name= table->s->table_name.str;
   /*

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2008-08-12 09:47:08 +0000
+++ b/sql/ha_ndbcluster.h	2008-10-02 22:28:22 +0000
@@ -263,7 +263,12 @@ typedef enum ndb_query_state_bits {
 
 enum THD_NDB_OPTIONS
 {
-  TNO_NO_LOG_SCHEMA_OP= 1 << 0
+  TNO_NO_LOG_SCHEMA_OP=  1 << 0,
+  /*
+    In participating mysqld, do not try to acquire global schema
+    lock, as one other mysqld already has the lock.
+  */
+  TNO_NO_LOCK_SCHEMA_OP= 1 << 1
 };
 
 enum THD_NDB_TRANS_OPTIONS
@@ -322,6 +327,10 @@ class Thd_ndb 
   uint m_max_violation_count;
   uint m_old_violation_count;
   uint m_conflict_fn_usage_count;
+
+  NdbTransaction *global_schema_lock_trans;
+  uint global_schema_lock_count;
+  uint global_schema_lock_error;
 };
 
 int ndbcluster_commit(handlerton *hton, THD *thd, bool all);

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2008-10-01 04:35:20 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2008-10-02 22:28:22 +0000
@@ -38,6 +38,8 @@ extern my_bool opt_ndb_log_orig;
 extern my_bool opt_ndb_log_update_as_write;
 extern my_bool opt_ndb_log_updated_only;
 
+extern my_bool ndbcluster_silent;
+
 /*
   defines for cluster replication table names
 */
@@ -144,7 +146,6 @@ static void ndb_free_schema_object(NDB_S
 /*
   Helper functions
 */
-
 static bool ndbcluster_check_if_local_table(const char *dbname, const char *tabname);
 static bool ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);
 
@@ -474,8 +475,14 @@ static void ndbcluster_binlog_wait(THD *
   if (ndb_binlog_running)
   {
     DBUG_ENTER("ndbcluster_binlog_wait");
-    const char *save_info= thd ? thd->proc_info : 0;
     ulonglong wait_epoch= ndb_get_latest_trans_gci();
+    /*
+      cluster not connected or no transactions done
+      so nothing to wait for
+    */
+    if (!wait_epoch)
+      DBUG_VOID_RETURN;
+    const char *save_info= thd ? thd->proc_info : 0;
     int count= 30;
     if (thd)
       thd->proc_info= "Waiting for ndbcluster binlog update to "
@@ -698,13 +705,21 @@ static void ndbcluster_reset_slave(THD *
   char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
   run_query(thd, buf, end, NULL, TRUE);
   if (thd->main_da.is_error() &&
-      thd->main_da.sql_errno() == ER_NO_SUCH_TABLE)
+      ((thd->main_da.sql_errno() == ER_NO_SUCH_TABLE) ||
+       (thd->main_da.sql_errno() == ER_OPEN_AS_READONLY && ndbcluster_silent)))
   {
     /*
       If table does not exist ignore the error as it
       is a consistant behavior
     */
     thd->main_da.reset_diagnostics_area();
+    /*
+      ndbcluster_silent
+      - avoid "no table mysql.ndb_apply_status" warning - ER_NO_SUCH_TABLE
+      - avoid "mysql.ndb_apply_status read only" warning - ER_OPEN_AS_READONLY
+    */
+    if (ndbcluster_silent)
+      mysql_reset_errors(thd, 1);
   }
 
   DBUG_VOID_RETURN;
@@ -725,14 +740,128 @@ static bool ndbcluster_flush_logs(handle
   return FALSE;
 }
 
+/*
+  Global schema lock across mysql servers
+*/
+int ndbcluster_has_global_schema_lock(Thd_ndb *thd_ndb)
+{
+  if (thd_ndb->global_schema_lock_trans)
+  {
+    thd_ndb->global_schema_lock_trans->refresh();
+    return 1;
+  }
+  return 0;
+}
+
+int ndbcluster_no_global_schema_lock_abort(THD *thd, const char *msg)
+{
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  if (thd_ndb && thd_ndb->global_schema_lock_error != 0)
+    return HA_ERR_NO_CONNECTION;
+  sql_print_error("NDB: programming error, no lock taken while running "
+                  "query %s. Message: %s", thd->query, msg);
+  abort();
+}
+
+#include "ha_ndbcluster_lock_ext.h"
+
+static int ndbcluster_global_schema_lock(THD *thd,
+                                         int report_cluster_disconnected)
+{
+  Ndb *ndb= check_ndb_in_thd(thd);
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  NdbError ndb_error;
+  thd_ndb->global_schema_lock_error= 0;
+  if (thd_ndb->options & TNO_NO_LOCK_SCHEMA_OP)
+    return 0;
+  DBUG_ENTER("ndbcluster_global_schema_lock");
+  DBUG_PRINT("enter", ("query: %s", thd->query));
+  if (thd_ndb->global_schema_lock_trans)
+  {
+    thd_ndb->global_schema_lock_trans->refresh();
+    thd_ndb->global_schema_lock_count++;
+    DBUG_PRINT("exit", ("global_schema_lock_count: %d",
+                        thd_ndb->global_schema_lock_count));
+    DBUG_RETURN(0);
+  }
+  DBUG_ASSERT(thd_ndb->global_schema_lock_count == 0);
+  thd_ndb->global_schema_lock_count= 0;
+
+  if ((thd_ndb->global_schema_lock_trans=
+       ndbcluster_global_schema_lock_ext(ndb, ndb_error)) != NULL)
+  {
+    thd_ndb->global_schema_lock_count= 1;
+    DBUG_RETURN(0);
+  }
+
+  /*
+    ndbcluster_silent - avoid "cluster disconnected error"
+  */
+  if (ndbcluster_silent)
+    report_cluster_disconnected= 0;
+  if (ndb_error.code != 4009 || report_cluster_disconnected)
+  {
+    sql_print_warning("NDB: Could not acquire global schema lock (%d)%s",
+                      ndb_error.code, ndb_error.message);
+    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+                        ndb_error.code, ndb_error.message,
+                        "NDB. Could not acquire global schema lock");
+  }
+  thd_ndb->global_schema_lock_error= ndb_error.code;
+  DBUG_RETURN(-1);
+}
+static int ndbcluster_global_schema_unlock(THD *thd)
+{
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  DBUG_ASSERT(thd_ndb != 0);
+  if (thd_ndb)
+    thd_ndb->global_schema_lock_error= 0;
+  if (thd_ndb == 0 || (thd_ndb->options & TNO_NO_LOCK_SCHEMA_OP))
+    return 0;
+  Ndb *ndb= thd_ndb->ndb;
+  DBUG_ASSERT(ndb != NULL);
+  if (ndb == NULL)
+    return 0;
+  DBUG_ENTER("ndbcluster_global_schema_unlock");
+  NdbTransaction *trans= thd_ndb->global_schema_lock_trans;
+  if (trans)
+  {
+    thd_ndb->global_schema_lock_count--;
+    if (thd_ndb->global_schema_lock_count != 0)
+    {
+      DBUG_PRINT("exit", ("global_schema_lock_count: %d",
+                          thd_ndb->global_schema_lock_count));
+      DBUG_RETURN(0);
+    }
+    thd_ndb->global_schema_lock_trans= NULL;
+    NdbError ndb_error;
+    if (ndbcluster_global_schema_unlock_ext(ndb, trans, ndb_error))
+    {
+      sql_print_warning("NDB: Releasing global schema lock (%d)%s",
+                        ndb_error.code, ndb_error.message);
+      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+                          ndb_error.code,
+                          ndb_error.message,
+                          "ndb. Releasing global schema lock");
+      DBUG_RETURN(-1);
+    }
+  }
+  DBUG_PRINT("exit", ("global_schema_lock_count: %d",
+                      thd_ndb->global_schema_lock_count));
+  DBUG_RETURN(0);
+}
+
 static int ndbcluster_binlog_func(handlerton *hton, THD *thd, 
                                   enum_binlog_func fn, 
                                   void *arg)
 {
+  DBUG_ENTER("ndbcluster_binlog_func");
   switch(fn)
   {
   case BFN_RESET_LOGS:
-    return ndbcluster_reset_logs(thd);
+    DBUG_RETURN(ndbcluster_reset_logs(thd));
   case BFN_RESET_SLAVE:
     ndbcluster_reset_slave(thd);
     break;
@@ -745,8 +874,34 @@ static int ndbcluster_binlog_func(handle
   case BFN_BINLOG_PURGE_FILE:
     ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
     break;
+  case BFN_GLOBAL_SCHEMA_LOCK:
+    ndbcluster_global_schema_lock(thd, 1);
+    break;
+  case BFN_GLOBAL_SCHEMA_UNLOCK:
+    ndbcluster_global_schema_unlock(thd);
+    break;
   }
-  return 0;
+  DBUG_RETURN(0);
+}
+Ndbcluster_global_schema_lock_guard::Ndbcluster_global_schema_lock_guard(THD *thd)
+  : m_thd(thd), m_lock(0)
+{
+}
+Ndbcluster_global_schema_lock_guard::~Ndbcluster_global_schema_lock_guard()
+{
+  if (m_lock)
+    ndbcluster_global_schema_unlock(m_thd);
+}
+int Ndbcluster_global_schema_lock_guard::lock()
+{
+  m_lock= !ndbcluster_global_schema_lock(m_thd, 0);
+  return !m_lock;
+}
+void Ndbcluster_global_schema_lock_guard::unlock()
+{
+  if (m_lock)
+    ndbcluster_global_schema_unlock(m_thd);
+  m_lock= 0;
 }
 
 void ndbcluster_binlog_init_handlerton()
@@ -954,8 +1109,185 @@ static int ndbcluster_create_schema_tabl
   DBUG_RETURN(0);
 }
 
+class Thd_ndb_options_guard
+{
+public:
+  Thd_ndb_options_guard(Thd_ndb *thd_ndb)
+    : m_val(thd_ndb->options), m_save_val(thd_ndb->options) {}
+  ~Thd_ndb_options_guard() { m_val= m_save_val; }
+  void set(uint32 flag) { m_val|= flag; }
+private:
+  uint32 &m_val;
+  uint32 m_save_val;
+};
+
+/*
+  Ndb has no representation of the database schema objects.
+  The mysql.ndb_schema table contains the latest schema operations
+  done via a mysqld, and thus reflects databases created/dropped/altered
+  while a mysqld was disconnected.  This function tries to recover
+  the correct state w.r.t created databases using the information in
+  that table.
+
+  Function should only be called while ndbcluster_global_schema_lock
+  is held, to ensure that ndb_schema table is not being updated while
+  scanning.
+*/
+static int ndbcluster_find_all_databases(THD *thd)
+{
+  Ndb *ndb= check_ndb_in_thd(thd);
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+  NDBDICT *dict= ndb->getDictionary();
+  NdbTransaction *trans= NULL;
+  NdbError ndb_error;
+  int retries= 100;
+  int retry_sleep= 30; /* 30 milliseconds, transaction */
+  DBUG_ENTER("ndbcluster_find_all_databases");
+  /* Ensure that we have the right lock */
+  if (!ndbcluster_has_global_schema_lock(thd_ndb))
+    DBUG_RETURN(ndbcluster_no_global_schema_lock_abort
+                (thd, "ndbcluster_find_all_databases"));
+  ndb->setDatabaseName(NDB_REP_DB);
+  thd_ndb_options.set(TNO_NO_LOG_SCHEMA_OP);
+  thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+  while (1)
+  {
+    char db_buffer[FN_REFLEN];
+    char *db= db_buffer+1;
+    char name[FN_REFLEN];
+    char query[64000];
+    Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
+    const NDBTAB *ndbtab= ndbtab_g.get_table();
+    NdbScanOperation *op;
+    NdbBlob *query_blob_handle;
+    int r= 0;
+    if (ndbtab == NULL)
+    {
+      ndb_error= dict->getNdbError();
+      goto error;
+    }
+    trans= ndb->startTransaction();
+    if (trans == NULL)
+    {
+      ndb_error= ndb->getNdbError();
+      goto error;
+    }
+    op= trans->getNdbScanOperation(ndbtab);
+    if (op == NULL)
+      abort();
+    op->readTuples(NdbScanOperation::LM_Read,
+                   NdbScanOperation::SF_TupScan, 1);
+    
+    r|= op->getValue("db", db_buffer) == NULL;
+    r|= op->getValue("name", name) == NULL;
+    r|= (query_blob_handle= op->getBlobHandle("query")) == NULL;
+    r|= query_blob_handle->getValue(query, sizeof(query));
+
+    if (r)
+      abort();
+
+    if (trans->execute(NdbTransaction::NoCommit))
+    {
+      ndb_error= trans->getNdbError();
+      goto error;
+    }
+
+    while ((r= op->nextResult()) == 0)
+    {
+      unsigned db_len= db_buffer[0];
+      unsigned name_len= name[0];
+      /*
+        name_len == 0 means no table name, hence the row
+        is for a database
+      */
+      if (db_len > 0 && name_len == 0)
+      {
+        /* database found */
+        Uint64 query_length= 0;
+        if (query_blob_handle->getLength(query_length))
+          abort();
+        db[db_len]= 0;
+        query[query_length]= 0;
+        build_table_filename(name, sizeof(name), db, "", "", 0);
+        int database_exists= !my_access(name, F_OK);
+        if (strncasecmp("CREATE", query, 6) == 0)
+        {
+          /* Database should exist */
+          if (!database_exists)
+          {
+            /* create missing database */
+            sql_print_information("NDB: Discovered missing database '%s'", db);
+            const int no_print_error[1]= {0};
+            run_query(thd, query, query + query_length,
+                      no_print_error,    /* print error */
+                      TRUE);   /* don't binlog the query */
+            /* always reset here */
+            thd->main_da.reset_diagnostics_area();
+          }
+        }
+        else if (strncasecmp("ALTER", query, 5) == 0)
+        {
+          /* Database should exist */
+          if (!database_exists)
+          {
+            /* create missing database */
+            sql_print_information("NDB: Discovered missing database '%s'", db);
+            const int no_print_error[1]= {0};
+            name_len= snprintf(name, sizeof(name), "CREATE DATABASE %s", db);
+            run_query(thd, name, name + name_len,
+                      no_print_error,    /* print error */
+                      TRUE);   /* don't binlog the query */
+            thd->main_da.reset_diagnostics_area();
+            run_query(thd, query, query + query_length,
+                      no_print_error,    /* print error */
+                      TRUE);   /* don't binlog the query */
+            /* always reset here */
+            thd->main_da.reset_diagnostics_area();
+          }
+        }
+        else if (strncasecmp("DROP", query, 4) == 0)
+        {
+          /* Database should not exist */
+          if (database_exists)
+          {
+            /* drop missing database */
+            sql_print_information("NDB: Discovered reamining database '%s'", db);
+          }
+        }
+      }
+    }
+    if (r == -1)
+    {
+      ndb_error= op->getNdbError();
+      goto error;
+    }
+    ndb->closeTransaction(trans);
+    trans= NULL;
+    DBUG_RETURN(0); // success
+  error:
+    if (trans)
+    {
+      ndb->closeTransaction(trans);
+      trans= NULL;
+    }
+    if (ndb_error.status == NdbError::TemporaryError && !thd->killed)
+    {
+      if (retries--)
+      {
+        do_retry_sleep(retry_sleep);
+        continue; // retry
+      }
+    }
+    DBUG_RETURN(1); // not temp error or too many retries
+  }
+}
+
 int ndbcluster_setup_binlog_table_shares(THD *thd)
 {
+  Ndbcluster_global_schema_lock_guard global_schema_lock_guard(thd);
+  if (global_schema_lock_guard.lock())
+    return 1;
   if (!ndb_schema_share &&
       ndbcluster_check_ndb_schema_share() == 0)
   {
@@ -984,6 +1316,11 @@ int ndbcluster_setup_binlog_table_shares
     }
   }
 
+  if (ndbcluster_find_all_databases(thd))
+  {
+    return 1;
+  }
+
   if (!ndbcluster_find_all_files(thd))
   {
     pthread_mutex_lock(&LOCK_open);
@@ -1170,7 +1507,8 @@ ndbcluster_update_slock(THD *thd,
 
   if (ndbtab == 0)
   {
-    abort();
+    if (dict->getNdbError().code != 4009)
+      abort();
     DBUG_RETURN(0);
   }
 
@@ -1255,7 +1593,7 @@ ndbcluster_update_slock(THD *thd,
   err:
     const NdbError *this_error= trans ?
       &trans->getNdbError() : &ndb->getNdbError();
-    if (this_error->status == NdbError::TemporaryError)
+    if (this_error->status == NdbError::TemporaryError && !thd->killed)
     {
       if (retries--)
       {
@@ -1604,7 +1942,7 @@ int ndbcluster_log_schema_op(THD *thd,
 err:
     const NdbError *this_error= trans ?
       &trans->getNdbError() : &ndb->getNdbError();
-    if (this_error->status == NdbError::TemporaryError)
+    if (this_error->status == NdbError::TemporaryError && !thd->killed)
     {
       if (retries--)
       {
@@ -1868,6 +2206,8 @@ ndb_binlog_thread_handle_schema_event(TH
     if (ev_type == NDBEVENT::TE_UPDATE ||
         ev_type == NDBEVENT::TE_INSERT)
     {
+      Thd_ndb *thd_ndb= get_thd_ndb(thd);
+      Thd_ndb_options_guard thd_ndb_options(thd_ndb);
       Cluster_schema *schema= (Cluster_schema *)
         sql_alloc(sizeof(Cluster_schema));
       MY_BITMAP slock;
@@ -1940,6 +2280,7 @@ ndb_binlog_thread_handle_schema_event(TH
           }
           if (! ndbcluster_check_if_local_table(schema->db, schema->name))
           {
+            thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
             const int no_print_error[1]=
               {ER_BAD_TABLE_ERROR}; /* ignore missing table */
             run_query(thd, schema->query,
@@ -2001,6 +2342,7 @@ ndb_binlog_thread_handle_schema_event(TH
           break;
         // fall through
         case SOT_CREATE_TABLE:
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           pthread_mutex_lock(&LOCK_open);
           if (ndbcluster_check_if_local_table(schema->db, schema->name))
           {
@@ -2028,6 +2370,7 @@ ndb_binlog_thread_handle_schema_event(TH
           break;
         case SOT_DROP_DB:
           /* Drop the database locally if it only contains ndb tables */
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
           {
             const int no_print_error[1]= {0};
@@ -2059,6 +2402,7 @@ ndb_binlog_thread_handle_schema_event(TH
           /* fall through */
         case SOT_ALTER_DB:
         {
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           const int no_print_error[1]= {0};
           run_query(thd, schema->query,
                     schema->query + schema->query_length,
@@ -2217,8 +2561,10 @@ ndb_binlog_thread_handle_schema_event_po
     return;
   DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
   Cluster_schema *schema;
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
   while ((schema= post_epoch_log_list->pop()))
   {
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
     DBUG_PRINT("info",
                ("%s.%s: log query_length: %d  query: '%s'  type: %d",
                 schema->db, schema->name,
@@ -2366,6 +2712,7 @@ ndb_binlog_thread_handle_schema_event_po
             free_share(&share);
             share= 0;
           }
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           pthread_mutex_lock(&LOCK_open);
           if (ndbcluster_check_if_local_table(schema->db, schema->name))
           {
@@ -2545,6 +2892,7 @@ ndb_binlog_thread_handle_schema_event_po
             free_share(&share);
             share= 0;
           }
+          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
           pthread_mutex_lock(&LOCK_open);
           if (ndbcluster_check_if_local_table(schema->db, schema->name))
           {
@@ -4044,7 +4392,7 @@ ndbcluster_create_event_ops(THD *thd, ND
       op->setCustomData(NULL);
       ndb->dropEventOperation(op);
       pthread_mutex_unlock(&injector_mutex);
-      if (retries)
+      if (retries && !thd->killed)
       {
         do_retry_sleep(retry_sleep);
         continue;

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2008-10-01 04:35:20 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2008-10-02 22:28:22 +0000
@@ -290,8 +290,17 @@ set_thd_ndb(THD *thd, Thd_ndb *thd_ndb)
 
 Ndb* check_ndb_in_thd(THD* thd);
 
-/* perform random sleep in the range milli_sleep to 2*milli_sleep */
-inline void do_retry_sleep(unsigned milli_sleep)
+int ndbcluster_has_global_schema_lock(Thd_ndb *thd_ndb);
+int ndbcluster_no_global_schema_lock_abort(THD *thd, const char *msg);
+
+class Ndbcluster_global_schema_lock_guard
 {
-  my_sleep(1000*(milli_sleep + 5*(rand()%(milli_sleep/5))));
-}
+public:
+  Ndbcluster_global_schema_lock_guard(THD *thd);
+  ~Ndbcluster_global_schema_lock_guard();
+  int lock();
+  void unlock();
+private:
+  THD *m_thd;
+  int m_lock;
+};

=== modified file 'sql/ha_ndbcluster_connection.cc'
--- a/sql/ha_ndbcluster_connection.cc	2008-08-25 08:43:30 +0000
+++ b/sql/ha_ndbcluster_connection.cc	2008-10-02 22:28:22 +0000
@@ -94,7 +94,9 @@ int ndbcluster_connect(int (*connect_cal
   {
     if (NdbTick_CurrentMillisecond() > end_time)
       break;
-    sleep(1);
+    do_retry_sleep(100);
+    if (abort_loop)
+      goto ndbcluster_connect_error;
   }
 
   {

=== modified file 'sql/ha_ndbcluster_connection.h'
--- a/sql/ha_ndbcluster_connection.h	2008-04-09 13:52:09 +0000
+++ b/sql/ha_ndbcluster_connection.h	2008-10-02 15:52:29 +0000
@@ -23,3 +23,9 @@ int ndb_has_node_id(uint id);
 
 /* options from from mysqld.cc */
 extern ulong opt_ndb_cluster_connection_pool;
+
+/* perform random sleep in the range milli_sleep to 2*milli_sleep */
+inline void do_retry_sleep(unsigned milli_sleep)
+{
+  my_sleep(1000*(milli_sleep + 5*(rand()%(milli_sleep/5))));
+}

=== added file 'sql/ha_ndbcluster_lock_ext.h'
--- a/sql/ha_ndbcluster_lock_ext.h	1970-01-01 00:00:00 +0000
+++ b/sql/ha_ndbcluster_lock_ext.h	2008-09-30 09:14:44 +0000
@@ -0,0 +1,91 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; version 2 of the License.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+/*
+  These functions are shared with ndb_restore so that the creating of
+  tables through ndb_restore is syncronized correctly with the mysqld's
+
+  The lock/unlock functions use the BACKUP_SEQUENCE row in SYSTAB_0
+*/
+static NdbTransaction *
+ndbcluster_global_schema_lock_ext(Ndb *ndb, NdbError &ndb_error)
+{
+  ndb->setDatabaseName("sys");
+  ndb->setDatabaseSchemaName("def");
+  NdbDictionary::Dictionary *dict= ndb->getDictionary();
+  Ndb_table_guard ndbtab_g(dict, "SYSTAB_0");
+  const NdbDictionary::Table *ndbtab;
+  NdbOperation *op;
+  NdbTransaction *trans= NULL;
+  int retries= 100;
+  int retry_sleep= 50; /* 50 milliseconds, transaction */
+  if (!(ndbtab= ndbtab_g.get_table()))
+  {
+    ndb_error= dict->getNdbError();
+    goto error_handler;
+  }
+  while (1)
+  {
+    trans= ndb->startTransaction();
+    if (trans == NULL)
+    {
+      ndb_error= ndb->getNdbError();
+      goto error_handler;
+    }
+
+    op= trans->getNdbOperation(ndbtab);
+    op->readTuple(NdbOperation::LM_Exclusive);
+    op->equal("SYSKEY_0", NDB_BACKUP_SEQUENCE);
+
+    if (trans->execute(NdbTransaction::NoCommit) == 0)
+      break;
+
+    if (trans->getNdbError().status == NdbError::TemporaryError)
+    {
+      if (retries--)
+      {
+        ndb->closeTransaction(trans);
+        trans= NULL;
+        do_retry_sleep(retry_sleep);
+        continue; // retry
+      }
+    }
+    ndb_error= trans->getNdbError();
+    goto error_handler;
+  }
+  return trans;
+
+ error_handler:
+  if (trans)
+  {
+    ndb->closeTransaction(trans);
+  }
+  return NULL;
+}
+
+static int
+ndbcluster_global_schema_unlock_ext(Ndb *ndb, NdbTransaction *trans,
+                                    NdbError &ndb_error)
+{
+  if (trans->execute(NdbTransaction::Commit))
+  {
+    ndb_error= trans->getNdbError();
+    ndb->closeTransaction(trans);
+    return -1;
+  }
+  ndb->closeTransaction(trans);
+  return 0;
+}

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2008-09-02 16:31:03 +0000
+++ b/sql/handler.cc	2008-10-02 06:43:17 +0000
@@ -3757,7 +3757,7 @@ int ha_table_exists_in_engine(THD* thd, 
   DBUG_RETURN(args.err);
 }
 
-#ifdef HAVE_NDB_BINLOG
+#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
 /*
   TODO: change this into a dynamic struct
   List<handlerton> does not work as
@@ -3812,6 +3812,7 @@ static my_bool binlog_func_foreach(THD *
   return FALSE;
 }
 
+#ifdef HAVE_NDB_BINLOG
 int ha_reset_logs(THD *thd)
 {
   binlog_func_st bfn= {BFN_RESET_LOGS, 0};
@@ -3851,6 +3852,25 @@ int ha_binlog_index_purge_file(THD *thd,
     return 1;
   return 0;
 }
+#endif
+
+int ha_global_schema_lock(THD *thd)
+{
+  binlog_func_st bfn= {BFN_GLOBAL_SCHEMA_LOCK, 0};
+  binlog_func_foreach(thd, &bfn);
+  if (thd->main_da.is_error())
+    return 1;
+  return 0;
+}
+
+int ha_global_schema_unlock(THD *thd)
+{
+  binlog_func_st bfn= {BFN_GLOBAL_SCHEMA_UNLOCK, 0};
+  binlog_func_foreach(thd, &bfn);
+  if (thd->main_da.is_error())
+    return 1;
+  return 0;
+}
 
 struct binlog_log_query_st
 {

=== modified file 'sql/handler.h'
--- a/sql/handler.h	2008-09-02 16:31:03 +0000
+++ b/sql/handler.h	2008-10-02 06:43:17 +0000
@@ -324,7 +324,9 @@ enum enum_binlog_func {
   BFN_RESET_SLAVE=       2,
   BFN_BINLOG_WAIT=       3,
   BFN_BINLOG_END=        4,
-  BFN_BINLOG_PURGE_FILE= 5
+  BFN_BINLOG_PURGE_FILE= 5,
+  BFN_GLOBAL_SCHEMA_LOCK=  6,
+  BFN_GLOBAL_SCHEMA_UNLOCK=7
 };
 
 enum enum_binlog_command {
@@ -2204,16 +2206,48 @@ void trans_register_ha(THD *thd, bool al
 #define trans_need_2pc(thd, all)                   ((total_ha_2pc > 1) && \
         !((all ? &thd->transaction.all : &thd->transaction.stmt)->no_2pc))
 
+#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
 #ifdef HAVE_NDB_BINLOG
 int ha_reset_logs(THD *thd);
 int ha_binlog_index_purge_file(THD *thd, const char *file);
 int ha_reset_slave(THD *thd);
+void ha_binlog_wait(THD *thd);
+int ha_binlog_end(THD *thd);
+#else
+inline int ha_int_dummy() { return 0; }
+#define ha_reset_logs(a) ha_int_dummy()
+#define ha_binlog_index_purge_file(a,b) ha_int_dummy()
+#define ha_reset_slave(a) ha_int_dummy()
+#define ha_binlog_wait(a) do {} while (0)
+#define ha_binlog_end(a)  do {} while (0)
+#endif
 void ha_binlog_log_query(THD *thd, handlerton *db_type,
                          enum_binlog_command binlog_command,
                          const char *query, uint query_length,
                          const char *db, const char *table_name);
-void ha_binlog_wait(THD *thd);
-int ha_binlog_end(THD *thd);
+int ha_global_schema_lock(THD *thd);
+int ha_global_schema_unlock(THD *thd);
+class Ha_global_schema_lock_guard
+{
+public:
+  Ha_global_schema_lock_guard(THD *thd)
+    : m_thd(thd), m_lock(0)
+  {
+  }
+  ~Ha_global_schema_lock_guard()
+  {
+    if (m_lock)
+      ha_global_schema_unlock(m_thd);
+  }
+  int lock()
+  {
+    m_lock= !ha_global_schema_lock(m_thd);
+    return !m_lock;
+  }
+private:
+  THD *m_thd;
+  int m_lock;
+};
 #else
 inline int ha_int_dummy() { return 0; }
 #define ha_reset_logs(a) ha_int_dummy()
@@ -2222,4 +2256,13 @@ inline int ha_int_dummy() { return 0; }
 #define ha_binlog_log_query(a,b,c,d,e,f,g) do {} while (0)
 #define ha_binlog_wait(a) do {} while (0)
 #define ha_binlog_end(a)  do {} while (0)
+#define ha_global_schema_lock(a) ha_int_dummy()
+#define ha_global_schema_unlock(a) ha_int_dummy()
+class Ha_global_schema_lock_guard
+{
+public:
+  Ha_global_schema_lock_guard(THD *thd) {}
+  ~Ha_global_schema_lock_guard() {}
+  int lock() { return 0; }
+};
 #endif

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2008-10-01 10:06:50 +0000
+++ b/sql/mysqld.cc	2008-10-02 22:28:22 +0000
@@ -5970,7 +5970,7 @@ master-ssl",
    "Turn on more logging in the error log.",
    (uchar**) &ndb_extra_logging,
    (uchar**) &ndb_extra_logging,
-   0, GET_ULONG, OPT_ARG, 0, 0, 0, 0, 0, 0},
+   0, GET_ULONG, OPT_ARG, 1, 0, 0, 0, 0, 0},
 #ifdef HAVE_NDB_BINLOG
   {"ndb-report-thresh-binlog-epoch-slip", OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP,
    "Threshold on number of epochs to be behind before reporting binlog status. "

=== modified file 'sql/sql_db.cc'
--- a/sql/sql_db.cc	2008-08-27 08:40:11 +0000
+++ b/sql/sql_db.cc	2008-10-02 06:39:01 +0000
@@ -616,6 +616,7 @@ int mysql_create_db(THD *thd, char *db, 
   MY_STAT stat_info;
   uint create_options= create_info ? create_info->options : 0;
   uint path_len;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_create_db");
 
   /* do not create 'information_schema' db */
@@ -643,6 +644,8 @@ int mysql_create_db(THD *thd, char *db, 
     goto exit2;
   }
 
+  global_schema_lock_guard.lock();
+
   VOID(pthread_mutex_lock(&LOCK_mysql_create_db));
 
   /* Check directory */
@@ -767,6 +770,7 @@ bool mysql_alter_db(THD *thd, const char
   char path[FN_REFLEN+16];
   long result=1;
   int error= 0;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_alter_db");
 
   /*
@@ -784,6 +788,8 @@ bool mysql_alter_db(THD *thd, const char
   if ((error=wait_if_global_read_lock(thd,0,1)))
     goto exit2;
 
+  global_schema_lock_guard.lock();
+
   VOID(pthread_mutex_lock(&LOCK_mysql_create_db));
 
   /* 
@@ -861,6 +867,7 @@ bool mysql_rm_db(THD *thd,char *db,bool 
   MY_DIR *dirp;
   uint length;
   TABLE_LIST* dropped_tables= 0;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_rm_db");
 
   /*
@@ -881,6 +888,8 @@ bool mysql_rm_db(THD *thd,char *db,bool 
     goto exit2;
   }
 
+  global_schema_lock_guard.lock();
+
   VOID(pthread_mutex_lock(&LOCK_mysql_create_db));
 
   length= build_table_filename(path, sizeof(path), db, "", "", 0);

=== modified file 'sql/sql_delete.cc'
--- a/sql/sql_delete.cc	2008-09-02 16:31:03 +0000
+++ b/sql/sql_delete.cc	2008-09-30 10:42:32 +0000
@@ -989,6 +989,7 @@ bool mysql_truncate(THD *thd, TABLE_LIST
   TABLE *table;
   bool error;
   uint path_length;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_truncate");
 
   bzero((char*) &create_info,sizeof(create_info));
@@ -1039,6 +1040,8 @@ bool mysql_truncate(THD *thd, TABLE_LIST
                                       HTON_CAN_RECREATE))
       goto trunc_by_del;
 
+    if (table_type == DB_TYPE_NDBCLUSTER)
+      global_schema_lock_guard.lock();
     if (lock_and_wait_for_table_name(thd, table_list))
       DBUG_RETURN(TRUE);
   }

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2008-09-01 12:28:57 +0000
+++ b/sql/sql_parse.cc	2008-10-02 06:39:01 +0000
@@ -2487,6 +2487,7 @@ mysql_execute_command(THD *thd)
     if (select_lex->item_list.elements)		// With select
     {
       select_result *result;
+      Ha_global_schema_lock_guard global_schema_lock_guard(thd);
 
       select_lex->options|= SELECT_NO_UNLOCK;
       unit->set_limit(select_lex);
@@ -2508,6 +2509,8 @@ mysql_execute_command(THD *thd)
       {
         lex->link_first_table_back(create_table, link_to_local);
         create_table->create= TRUE;
+        if (!thd->locked_tables)
+          global_schema_lock_guard.lock();
       }
 
       if (!(res= open_and_lock_tables(thd, lex->query_tables)))

=== modified file 'sql/sql_rename.cc'
--- a/sql/sql_rename.cc	2008-02-19 12:45:21 +0000
+++ b/sql/sql_rename.cc	2008-10-02 06:39:01 +0000
@@ -37,6 +37,7 @@ bool mysql_rename_tables(THD *thd, TABLE
   TABLE_LIST *ren_table= 0;
   int to_table;
   char *rename_log_table[2]= {NULL, NULL};
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_rename_tables");
 
   /*
@@ -133,6 +134,13 @@ bool mysql_rename_tables(THD *thd, TABLE
     }
   }
 
+  /*
+    Rename table may clash with parallell or existing ndb table
+    thus a global shema lock is needed to ensure global serialization.
+    Moreover we do not know here what table type the tables have.
+  */
+  global_schema_lock_guard.lock();
+
   pthread_mutex_lock(&LOCK_open);
   if (lock_table_names_exclusively(thd, table_list))
   {

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2008-09-03 13:53:09 +0000
+++ b/sql/sql_table.cc	2008-10-02 22:28:22 +0000
@@ -1563,6 +1563,15 @@ int mysql_rm_table_part2(THD *thd, TABLE
 
   mysql_ha_rm_tables(thd, tables, FALSE);
 
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+  /*
+    Here we have no way of knowing if an ndb table is one of
+    the ones that should be dropped, so we take the global
+    schema lock to be safe.
+  */
+  if (!drop_temporary)
+    global_schema_lock_guard.lock();
+
   pthread_mutex_lock(&LOCK_open);
 
   /*
@@ -3608,8 +3617,12 @@ bool mysql_create_table(THD *thd, const 
 {
   TABLE *name_lock= 0;
   bool result;
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_create_table");
 
+  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
+    global_schema_lock_guard.lock();
+
   /* Wait for any database locks */
   pthread_mutex_lock(&LOCK_lock_db);
   while (!thd->killed &&
@@ -4831,9 +4844,13 @@ bool mysql_create_like_table(THD* thd, T
 #ifdef WITH_PARTITION_STORAGE_ENGINE
   char tmp_path[FN_REFLEN];
 #endif
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
   DBUG_ENTER("mysql_create_like_table");
 
 
+  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
+    global_schema_lock_guard.lock();
+
   /*
     By opening source table we guarantee that it exists and no concurrent
     DDL operation will mess with it. Later we also take an exclusive
@@ -6569,6 +6586,11 @@ bool mysql_alter_table(THD *thd,char *ne
 
     if (wait_if_global_read_lock(thd,0,1))
       DBUG_RETURN(TRUE);
+
+    Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+    if (table_type == DB_TYPE_NDBCLUSTER)
+      global_schema_lock_guard.lock();
+
     VOID(pthread_mutex_lock(&LOCK_open));
     if (lock_table_names(thd, table_list))
     {
@@ -6595,6 +6617,21 @@ view_err:
     DBUG_RETURN(error);
   }
 
+  Ha_global_schema_lock_guard global_schema_lock_guard(thd);
+  if (table_type == DB_TYPE_NDBCLUSTER ||
+      (create_info->db_type && create_info->db_type->db_type == DB_TYPE_NDBCLUSTER))
+  {
+    /*
+      To avoid deadlock in this situation
+    */
+    if (thd->locked_tables)
+    {
+      my_message(ER_LOCK_OR_ACTIVE_TRANSACTION,
+                 ER(ER_LOCK_OR_ACTIVE_TRANSACTION), MYF(0));
+      DBUG_RETURN(TRUE);
+    }
+    global_schema_lock_guard.lock();
+  }
   if (!(table= open_n_lock_single_table(thd, table_list, TL_WRITE_ALLOW_READ)))
     DBUG_RETURN(TRUE);
   table->use_all_columns();

Thread
bzr push into mysql-5.1 branch (jonas:2876) Jonas Oreland3 Oct