List:Commits« Previous MessageNext Message »
From:tomas Date:May 3 2006 7:17pm
Subject:bk commit into 5.1 tree (tomas:1.2373)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2373 06/05/03 21:17:05 tomas@stripped +5 -0
  Merge tulin@stripped:/home/bk/mysql-5.1-new
  into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new

  mysql-test/mysql-test-run.pl
    1.113 06/05/03 21:16:55 tomas@stripped +1 -7
    manual merge

  support-files/mysql.spec.sh
    1.140 06/05/03 21:09:44 tomas@stripped +0 -0
    Auto merged

  sql/ha_ndbcluster_binlog.cc
    1.49 06/05/03 21:09:44 tomas@stripped +0 -0
    Auto merged

  sql/ha_ndbcluster.cc
    1.299 06/05/03 21:09:42 tomas@stripped +0 -0
    Auto merged

  mysql-test/t/disabled.def
    1.149 06/05/03 21:09:41 tomas@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.ndb.mysql.com
# Root:	/home/tomas/mysql-5.1-new/RESYNC

--- 1.139/support-files/mysql.spec.sh	2006-04-26 15:16:17 +02:00
+++ 1.140/support-files/mysql.spec.sh	2006-05-03 21:09:44 +02:00
@@ -388,22 +388,12 @@
 mv $RBR/%{_libdir}/mysql/*.so* $RBR/%{_libdir}/
 
 # install "mysqld-debug" and "mysqld-max"
-if test -f $RPM_BUILD_DIR/mysql-%{mysql_version}/mysql-debug-%{mysql_version}/sql/.libs/mysqld
-then
-  install -m 755 $RPM_BUILD_DIR/mysql-%{mysql_version}/mysql-debug-%{mysql_version}/sql/.libs/mysqld \
-                 $RBR%{_sbindir}/mysqld-debug
-else
-  install -m 755 $RPM_BUILD_DIR/mysql-%{mysql_version}/mysql-debug-%{mysql_version}/sql/mysqld \
+./libtool --mode=execute install -m 755 \
+                 $RPM_BUILD_DIR/mysql-%{mysql_version}/mysql-debug-%{mysql_version}/sql/mysqld \
                  $RBR%{_sbindir}/mysqld-debug
-fi
-if test -f $RPM_BUILD_DIR/mysql-%{mysql_version}/mysql-max-%{mysql_version}/sql/.libs/mysqld
-then
-  install -m 755 $RPM_BUILD_DIR/mysql-%{mysql_version}/mysql-max-%{mysql_version}/sql/.libs/mysqld \
+./libtool --mode=execute install -m 755 \
+                 $RPM_BUILD_DIR/mysql-%{mysql_version}/mysql-max-%{mysql_version}/sql/mysqld \
                  $RBR%{_sbindir}/mysqld-max
-else
-  install -m 755 $RPM_BUILD_DIR/mysql-%{mysql_version}/mysql-max-%{mysql_version}/sql/mysqld \
-                 $RBR%{_sbindir}/mysqld-max
-fi
 
 # install saved perror binary with NDB support (BUG#13740)
 install -m 755 $MBD/extra/perror $RBR%{_bindir}/perror
@@ -473,9 +463,12 @@
 # owns all database files.
 chown -R %{mysqld_user}:%{mysqld_group} $mysql_datadir
 
-# Initiate databases
+# Initiate databases if needed
 %{_bindir}/mysql_install_db --rpm --user=%{mysqld_user}
 
+# Upgrade databases if needed
+%{_bindir}/mysql_upgrade --user=%{mysqld_user}
+
 # Change permissions again to fix any new files.
 chown -R %{mysqld_user}:%{mysqld_group} $mysql_datadir
 
@@ -541,6 +534,7 @@
 %doc %attr(644, root, man) %{_mandir}/man1/mysqld_multi.1*
 %doc %attr(644, root, man) %{_mandir}/man1/mysqld_safe.1*
 %doc %attr(644, root, man) %{_mandir}/man1/mysql_fix_privilege_tables.1*
+%doc %attr(644, root, man) %{_mandir}/man1/mysql_upgrade.1*
 %doc %attr(644, root, man) %{_mandir}/man1/mysqlhotcopy.1*
 %doc %attr(644, root, man) %{_mandir}/man1/mysqlmanager.1*
 %doc %attr(644, root, man) %{_mandir}/man1/mysql.server.1*
@@ -552,30 +546,31 @@
 %ghost %config(noreplace,missingok) %{_sysconfdir}/my.cnf
 %ghost %config(noreplace,missingok) %{_sysconfdir}/mysqlmanager.passwd
 
-%attr(755, root, root) %{_bindir}/myisamchk
+%attr(755, root, root) %{_bindir}/my_print_defaults
 %attr(755, root, root) %{_bindir}/myisam_ftdump
+%attr(755, root, root) %{_bindir}/myisamchk
 %attr(755, root, root) %{_bindir}/myisamlog
 %attr(755, root, root) %{_bindir}/myisampack
-%attr(755, root, root) %{_bindir}/my_print_defaults
-%attr(755, root, root) %{_bindir}/mysqlbug
 %attr(755, root, root) %{_bindir}/mysql_convert_table_format
 %attr(755, root, root) %{_bindir}/mysql_create_system_tables
-%attr(755, root, root) %{_bindir}/mysqld_multi
-%attr(755, root, root) %{_bindir}/mysqld_safe
 %attr(755, root, root) %{_bindir}/mysql_explain_log
 %attr(755, root, root) %{_bindir}/mysql_fix_extensions
 %attr(755, root, root) %{_bindir}/mysql_fix_privilege_tables
-%attr(755, root, root) %{_bindir}/mysqlhotcopy
 %attr(755, root, root) %{_bindir}/mysql_install_db
 %attr(755, root, root) %{_bindir}/mysql_secure_installation
 %attr(755, root, root) %{_bindir}/mysql_setpermission
-%attr(755, root, root) %{_bindir}/mysqltest
 %attr(755, root, root) %{_bindir}/mysql_tzinfo_to_sql
+%attr(755, root, root) %{_bindir}/mysql_upgrade
 %attr(755, root, root) %{_bindir}/mysql_zap
+%attr(755, root, root) %{_bindir}/mysqlbug
+%attr(755, root, root) %{_bindir}/mysqld_multi
+%attr(755, root, root) %{_bindir}/mysqld_safe
+%attr(755, root, root) %{_bindir}/mysqlhotcopy
+%attr(755, root, root) %{_bindir}/mysqltest
 %attr(755, root, root) %{_bindir}/perror
 %attr(755, root, root) %{_bindir}/replace
-%attr(755, root, root) %{_bindir}/resolveip
 %attr(755, root, root) %{_bindir}/resolve_stack_dump
+%attr(755, root, root) %{_bindir}/resolveip
 %attr(755, root, root) %{_bindir}/safe_mysqld
 
 %attr(755, root, root) %{_sbindir}/mysqld
@@ -688,6 +683,14 @@
 # itself - note that they must be ordered by date (important when
 # merging BK trees)
 %changelog 
+* Mon May 01 2006 Kent Boortz <kent@stripped>
+
+- Use "./libtool --mode=execute" instead of searching for the
+  executable in current directory and ".libs".
+
+* Fri Apr 28 2006 Kent Boortz <kent@stripped>
+
+- Install and run "mysql_upgrade"
 
 * Wed Apr 12 2006 Jim Winstead <jimw@stripped>
 
@@ -697,7 +700,6 @@
 * Tue Apr 11 2006 Jim Winstead <jimw@stripped>
 
 - Remove old mysqltestmanager and related programs
-
 * Sat Apr 01 2006 Kent Boortz <kent@stripped>
 
 - Set $LDFLAGS from $MYSQL_BUILD_LDFLAGS

--- 1.112/mysql-test/mysql-test-run.pl	2006-05-01 09:14:28 +02:00
+++ 1.113/mysql-test/mysql-test-run.pl	2006-05-03 21:16:55 +02:00
@@ -1857,7 +1857,7 @@
     }
   }
 
-  if ( ndbcluster_install_slave() )
+  if ( $use_slaves and ndbcluster_install_slave() )
   {
     if ( $opt_force)
     {
@@ -2212,13 +2212,6 @@
       {
 	# Test needs cluster, start an extra mysqld connected to cluster
         # First wait for first mysql server to have created ndb system tables ok
-	if ( ! sleep_until_file_created("$master->[0]->{'path_myddir'}/cluster/apply_status.ndb",
-					$master->[0]->{'start_timeout'},
-					$master->[0]->{'pid'}))
-	{
-          report_failure_and_restart($tinfo);
-          return;
-	}
         mtr_tofile($master->[1]->{'path_myerr'},"CURRENT_TEST: $tname\n");
         $master->[1]->{'pid'}=
           mysqld_start('master',1,$tinfo->{'master_opt'},[],

--- 1.148/mysql-test/t/disabled.def	2006-04-27 15:31:18 +02:00
+++ 1.149/mysql-test/t/disabled.def	2006-05-03 21:09:41 +02:00
@@ -15,6 +15,7 @@
 events_scheduling        : BUG#19170 2006-04-26 andrey  Test case of 19170 fails on some platforms. Has to be checked.
 ndb_autodiscover         : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
 ndb_autodiscover2        : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog
+ndb_binlog_discover      : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown
 #ndb_cache2               : BUG#18597 2006-03-28 brian simultaneous drop table and ndb statistics update triggers node failure
 #ndb_cache_multi2         : BUG#18597 2006-04-10 kent  simultaneous drop table and ndb statistics update triggers node failure
 partition_03ndb          : BUG#16385 2006-03-24 mikael Partitions: crash when updating a range partitioned NDB table
@@ -32,7 +33,6 @@
 rpl_row_func003		 : BUG#19074 2006-13-04 andrei  test failed
 rpl_row_inexist_tbl      : BUG#18948 2006-03-09 mats    Disabled since patch makes this test wait forever
 rpl_sp                   : BUG#16456 2006-02-16 jmiller
-mysqldump                : BUG#18078 2006-03-10 lars
 udf                      : BUG#18564 2006-03-27 ian     (Permission by Brian)
 
 # the below testcase have been reworked to avoid the bug, test contains comment, keep bug open

--- 1.298/sql/ha_ndbcluster.cc	2006-05-02 13:10:55 +02:00
+++ 1.299/sql/ha_ndbcluster.cc	2006-05-03 21:09:42 +02:00
@@ -147,7 +147,7 @@
 #endif
 static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
 
-static int ndb_get_table_statistics(Ndb*, const char *, 
+static int ndb_get_table_statistics(Ndb*, const NDBTAB *, 
                                     struct Ndb_statistics *);
 
 
@@ -437,7 +437,7 @@
     Ndb *ndb= get_ndb();
     ndb->setDatabaseName(m_dbname);
     struct Ndb_statistics stat;
-    if (ndb_get_table_statistics(ndb, m_tabname, &stat) == 0){
+    if (ndb_get_table_statistics(ndb, m_table, &stat) == 0){
       mean_rec_length= stat.row_size;
       data_file_length= stat.fragment_memory;
       info->records= stat.row_count;
@@ -485,92 +485,6 @@
   DBUG_VOID_RETURN;
 }
 
-/*
-  Take care of the error that occured in NDB
-
-  RETURN
-    0   No error
-    #   The mapped error code
-*/
-
-int ha_ndbcluster::invalidate_dictionary_cache(bool global, const NDBTAB *ndbtab)
-{
-  NDBDICT *dict= get_ndb()->getDictionary();
-  DBUG_ENTER("invalidate_dictionary_cache");
-  DBUG_PRINT("info", ("m_tabname: %s  global: %d", m_tabname, global));
-
-  if (global)
-  {
-#ifdef HAVE_NDB_BINLOG
-    if (current_thd != injector_thd)
-    {
-      char key[FN_REFLEN];
-      build_table_filename(key, sizeof(key), m_dbname, m_tabname, "");
-      DBUG_PRINT("info", ("Getting ndbcluster mutex"));
-      pthread_mutex_lock(&ndbcluster_mutex);
-      NDB_SHARE *ndb_share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
-                                                    (byte*) key, strlen(key));
-      // Only binlog_thread is allowed to globally invalidate a table
-      if (ndb_share && ndb_share->op)
-      {
-        pthread_mutex_unlock(&ndbcluster_mutex);
-        DBUG_PRINT("info", ("Released ndbcluster mutex"));
-        DBUG_RETURN(1);
-      }
-      pthread_mutex_unlock(&ndbcluster_mutex);
-      DBUG_PRINT("info", ("Released ndbcluster mutex"));
-    }
-#endif
-    if (!ndbtab)
-    {
-      ndbtab= dict->getTable(m_tabname);
-      if (!ndbtab)
-        DBUG_RETURN(1);
-    }
-    dict->invalidateTable(ndbtab);
-    table_share->version= 0L;			/* Free when thread is ready */
-  }
-  else if (ndbtab)
-    dict->removeCachedTable(ndbtab);
-  else
-    dict->removeCachedTable(m_tabname);
-
-  /* Invalidate indexes */
-  for (uint i= 0; i < table_share->keys; i++)
-  {
-    NDBINDEX *index = (NDBINDEX *) m_index[i].index;
-    NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
-    if (!index && !unique_index)
-      continue;
-    NDB_INDEX_TYPE idx_type= m_index[i].type;
-
-    switch (idx_type) {
-    case PRIMARY_KEY_ORDERED_INDEX:
-    case ORDERED_INDEX:
-      if (global)
-        dict->invalidateIndex(index->getName(), m_tabname);
-      else
-        dict->removeCachedIndex(index->getName(), m_tabname);
-      break;
-    case UNIQUE_ORDERED_INDEX:
-      if (global)
-        dict->invalidateIndex(index->getName(), m_tabname);
-      else
-        dict->removeCachedIndex(index->getName(), m_tabname);
-    case UNIQUE_INDEX:
-      if (global)
-        dict->invalidateIndex(unique_index->getName(), m_tabname);
-      else
-        dict->removeCachedIndex(unique_index->getName(), m_tabname);
-      break;
-    case PRIMARY_KEY_INDEX:
-    case UNDEFINED_INDEX:
-      break;
-    }
-  }
-  DBUG_RETURN(0);
-}
-
 int ha_ndbcluster::ndb_err(NdbTransaction *trans)
 {
   int res;
@@ -581,32 +495,14 @@
   switch (err.classification) {
   case NdbError::SchemaError:
   {
+    // TODO perhaps we need to do more here, invalidate also in the cache
+    m_table->setStatusInvalid();
     /* Close other open handlers not used by any thread */
     TABLE_LIST table_list;
     bzero((char*) &table_list,sizeof(table_list));
     table_list.db= m_dbname;
     table_list.alias= table_list.table_name= m_tabname;
     close_cached_tables(current_thd, 0, &table_list);
-
-    invalidate_dictionary_cache(TRUE, m_table);
-
-    if (err.code==284)
-    {
-      /*
-         Check if the table is _really_ gone or if the table has
-         been alterend and thus changed table id
-       */
-      NDBDICT *dict= get_ndb()->getDictionary();
-      DBUG_PRINT("info", ("Check if table %s is really gone", m_tabname));
-      if (!(dict->getTable(m_tabname)))
-      {
-        err= dict->getNdbError();
-        DBUG_PRINT("info", ("Table not found, error: %d", err.code));
-        if (err.code != 709 && err.code != 723)
-          DBUG_RETURN(1);
-      }
-      DBUG_PRINT("info", ("Table exists but must have changed"));
-    }
     break;
   }
   default:
@@ -1052,73 +948,59 @@
   NDBDICT *dict= ndb->getDictionary();
   const NDBTAB *tab;
   int error;
-  bool invalidating_ndb_table= FALSE;
   DBUG_ENTER("get_metadata");
   DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
 
-  do {
-    const void *data, *pack_data;
-    uint length, pack_length;
+  DBUG_ASSERT(m_table == NULL);
+  DBUG_ASSERT(m_table_info == NULL);
 
-    if (!(tab= dict->getTable(m_tabname)))
-      ERR_RETURN(dict->getNdbError());
-    // Check if thread has stale local cache
-    if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
-    {
-      invalidate_dictionary_cache(FALSE, tab);
-      if (!(tab= dict->getTable(m_tabname)))
-         ERR_RETURN(dict->getNdbError());
-      DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
-    }
-    /*
-      Compare FrmData in NDB with frm file from disk.
-    */
-    error= 0;
-    if (readfrm(path, &data, &length) ||
-        packfrm(data, length, &pack_data, &pack_length))
-    {
-      my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
-      my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
-      DBUG_RETURN(1);
-    }
+  const void *data, *pack_data;
+  uint length, pack_length;
+
+  /*
+    Compare FrmData in NDB with frm file from disk.
+  */
+  error= 0;
+  if (readfrm(path, &data, &length) ||
+      packfrm(data, length, &pack_data, &pack_length))
+  {
+    my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
+    my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
+    DBUG_RETURN(1);
+  }
     
-    if (get_ndb_share_state(m_share) != NSS_ALTERED 
-        && cmp_frm(tab, pack_data, pack_length))
-    {
-      if (!invalidating_ndb_table)
-      {
-        DBUG_PRINT("info", ("Invalidating table"));
-        invalidate_dictionary_cache(TRUE, tab);
-        invalidating_ndb_table= TRUE;
-      }
-      else
-      {
-        DBUG_PRINT("error", 
-                   ("metadata, pack_length: %d  getFrmLength: %d  memcmp: %d",
-                    pack_length, tab->getFrmLength(),
-                    memcmp(pack_data, tab->getFrmData(), pack_length)));
-        DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
-        DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
-        error= HA_ERR_TABLE_DEF_CHANGED;
-        invalidating_ndb_table= FALSE;
-      }
-    }
-    else
-    {
-      invalidating_ndb_table= FALSE;
-    }
-    my_free((char*)data, MYF(0));
-    my_free((char*)pack_data, MYF(0));
-  } while (invalidating_ndb_table);
+  Ndb_table_guard ndbtab_g(dict, m_tabname);
+  if (!(tab= ndbtab_g.get_table()))
+    ERR_RETURN(dict->getNdbError());
+
+  if (get_ndb_share_state(m_share) != NSS_ALTERED 
+      && cmp_frm(tab, pack_data, pack_length))
+  {
+    DBUG_PRINT("error", 
+               ("metadata, pack_length: %d  getFrmLength: %d  memcmp: %d",
+                pack_length, tab->getFrmLength(),
+                memcmp(pack_data, tab->getFrmData(), pack_length)));
+    DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
+    DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
+    error= HA_ERR_TABLE_DEF_CHANGED;
+  }
+  my_free((char*)data, MYF(0));
+  my_free((char*)pack_data, MYF(0));
 
   if (error)
-    DBUG_RETURN(error);
-  
-  m_table_version= tab->getObjectVersion();
-  m_table= tab; 
-  m_table_info= NULL; // Set in external lock
-  
-  DBUG_RETURN(open_indexes(ndb, table, FALSE));
+    goto err;
+
+  DBUG_PRINT("info", ("fetched table %s", tab->getName()));
+  m_table= tab;
+  if ((error= open_indexes(ndb, table, FALSE)) == 0)
+  {
+    ndbtab_g.release();
+    DBUG_RETURN(0);
+  }
+err:
+  ndbtab_g.invalidate();
+  m_table= NULL;
+  DBUG_RETURN(error);
 }
 
 static int fix_unique_index_attr_order(NDB_INDEX_DATA &data,
@@ -1155,36 +1037,6 @@
   DBUG_RETURN(0);
 }
 
-int ha_ndbcluster::table_changed(const void *pack_frm_data, uint pack_frm_len)
-{
-  Ndb *ndb;
-  NDBDICT *dict;
-  const NDBTAB *orig_tab;
-  NdbDictionary::Table new_tab;
-  int result;
-  DBUG_ENTER("ha_ndbcluster::table_changed");
-  DBUG_PRINT("info", ("Modifying frm for table %s", m_tabname));
-  if (check_ndb_connection())
-    DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
-                                                                             
-  ndb= get_ndb();
-  dict= ndb->getDictionary();
-  if (!(orig_tab= dict->getTable(m_tabname)))
-    ERR_RETURN(dict->getNdbError());
-  // Check if thread has stale local cache
-  if (orig_tab->getObjectStatus() == NdbDictionary::Object::Invalid)
-  {
-    dict->removeCachedTable(orig_tab);
-    if (!(orig_tab= dict->getTable(m_tabname)))
-      ERR_RETURN(dict->getNdbError());
-  }
-  new_tab= *orig_tab;
-  new_tab.setFrm(pack_frm_data, pack_frm_len);
-  if (dict->alterTable(new_tab) != 0)
-    ERR_RETURN(dict->getNdbError());
-  DBUG_RETURN(0);
-}
-
 /*
   Create all the indexes for a table.
   If any index should fail to be created,
@@ -1252,7 +1104,7 @@
     const NDBINDEX *index;
     do
     {
-      index= dict->getIndex(index_name, m_tabname);
+      index= dict->getIndexGlobal(index_name, *m_table);
       if (!index)
         ERR_RETURN(dict->getNdbError());
       DBUG_PRINT("info", ("index: 0x%x  id: %d  version: %d.%d  status: %d",
@@ -1261,14 +1113,11 @@
                           index->getObjectVersion() & 0xFFFFFF,
                           index->getObjectVersion() >> 24,
                           index->getObjectStatus()));
-      if (index->getObjectStatus() != NdbDictionary::Object::Retrieved)
-      {
-        dict->removeCachedIndex(index);
-        continue;
-      }
+      DBUG_ASSERT(index->getObjectStatus() ==
+                  NdbDictionary::Object::Retrieved);
       break;
     } while (1);
-    m_index[index_no].index= (void *) index;
+    m_index[index_no].index= index;
     // ordered index - add stats
     NDB_INDEX_DATA& d=m_index[index_no];
     delete d.index_stat;
@@ -1299,7 +1148,7 @@
     const NDBINDEX *index;
     do
     {
-      index= dict->getIndex(unique_index_name, m_tabname);
+      index= dict->getIndexGlobal(unique_index_name, *m_table);
       if (!index)
         ERR_RETURN(dict->getNdbError());
       DBUG_PRINT("info", ("index: 0x%x  id: %d  version: %d.%d  status: %d",
@@ -1308,14 +1157,11 @@
                           index->getObjectVersion() & 0xFFFFFF,
                           index->getObjectVersion() >> 24,
                           index->getObjectStatus()));
-      if (index->getObjectStatus() != NdbDictionary::Object::Retrieved)
-      {
-        dict->removeCachedIndex(index);
-        continue;
-      }
+      DBUG_ASSERT(index->getObjectStatus() ==
+                  NdbDictionary::Object::Retrieved);
       break;
     } while (1);
-    m_index[index_no].unique_index= (void *) index;
+    m_index[index_no].unique_index= index;
     error= fix_unique_index_attr_order(m_index[index_no], index, key_info);
   }
   if (!error)
@@ -1346,7 +1192,27 @@
       else
         break;
   }
-  
+
+  if (error && !ignore_error)
+  {
+    while (i > 0)
+    {
+      i--;
+      if (m_index[i].index)
+      {
+         dict->removeIndexGlobal(*m_index[i].index, 1);
+         m_index[i].index= NULL;
+      }
+      if (m_index[i].unique_index)
+      {
+         dict->removeIndexGlobal(*m_index[i].unique_index, 1);
+         m_index[i].unique_index= NULL;
+      }
+    }
+  }
+
+  DBUG_ASSERT(error == 0 || error == 4243);
+
   DBUG_RETURN(error);
 }
 
@@ -1406,26 +1272,32 @@
     m_index[i].type= idx_type;
     if (m_index[i].status == TO_BE_DROPPED)
     {
-      NdbDictionary::Index *index= 
-        (NdbDictionary::Index *) m_index[i].index;
-      NdbDictionary::Index *unique_index= 
-        (NdbDictionary::Index *) m_index[i].unique_index;
+      const NdbDictionary::Index *index= m_index[i].index;
+      const NdbDictionary::Index *unique_index= m_index[i].unique_index;
       
       if (index)
       {
         index_name= index->getName();
         DBUG_PRINT("info", ("Dropping index %u: %s", i, index_name));  
         // Drop ordered index from ndb
-        error= drop_ndb_index(index_name);
+        error= dict->dropIndexGlobal(*index);
+        if (!error)
+        {
+          dict->removeIndexGlobal(*index, 1);
+          m_index[i].index= NULL;
+        }
       }
-      if (!error)
-        m_index[i].index= NULL;
       if (!error && unique_index)
       {
         index_name= unique_index->getName();
         DBUG_PRINT("info", ("Dropping unique index %u: %s", i, index_name));
         // Drop unique index from ndb
-        error= drop_ndb_index(index_name);
+        error= dict->dropIndexGlobal(*unique_index);
+        if (!error)
+        {
+          dict->removeIndexGlobal(*unique_index, 1);
+          m_index[i].unique_index= NULL;
+        }
       }
       if (error)
         DBUG_RETURN(error);
@@ -1482,30 +1354,57 @@
   DBUG_RETURN(0);
 }
 
-void ha_ndbcluster::release_metadata()
+void ha_ndbcluster::release_metadata(THD *thd, Ndb *ndb)
 {
   uint i;
 
   DBUG_ENTER("release_metadata");
   DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
 
-  m_table= NULL;
+  NDBDICT *dict= ndb->getDictionary();
+  int invalidate_indexes= 0;
+  if (thd && thd->lex && thd->lex->sql_command == SQLCOM_FLUSH)
+  {
+    invalidate_indexes = 1;
+  }
+  if (m_table != NULL)
+  {
+    if (m_table->getObjectStatus() == NdbDictionary::Object::Invalid)
+      invalidate_indexes= 1;
+    dict->removeTableGlobal(*m_table, invalidate_indexes);
+  }
+  // TODO investigate
+  DBUG_ASSERT(m_table_info == NULL);
   m_table_info= NULL;
 
   // Release index list 
   for (i= 0; i < MAX_KEY; i++)
   {
-    m_index[i].unique_index= NULL;      
-    m_index[i].index= NULL;      
+    if (m_index[i].unique_index)
+    {
+      DBUG_ASSERT(m_table != NULL);
+      dict->removeIndexGlobal(*m_index[i].unique_index, invalidate_indexes);
+      m_index[i].unique_index= NULL;
+    }
+    if (m_index[i].index)
+    {
+      DBUG_ASSERT(m_table != NULL);
+      dict->removeIndexGlobal(*m_index[i].index, invalidate_indexes);
+      m_index[i].index= NULL;
+    }
     if (m_index[i].unique_index_attrid_map)
     {
       my_free((char *)m_index[i].unique_index_attrid_map, MYF(0));
       m_index[i].unique_index_attrid_map= NULL;
     }
-    delete m_index[i].index_stat;
-    m_index[i].index_stat=NULL;
+    if (m_index[i].index_stat)
+    {
+      delete m_index[i].index_stat;
+      m_index[i].index_stat= NULL;
+    }
   }
 
+  m_table= NULL;
   DBUG_VOID_RETURN;
 }
 
@@ -1977,11 +1876,10 @@
     {
       // A unique index is defined on table
       NdbIndexOperation *iop;
-      NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
+      const NDBINDEX *unique_index = m_index[i].unique_index;
       key_part= key_info->key_part;
       end= key_part + key_info->key_parts;
-      if (!(iop= trans->getNdbIndexOperation(unique_index,
-                                             (const NDBTAB *) m_table)) ||
+      if (!(iop= trans->getNdbIndexOperation(unique_index, m_table)) ||
           iop->readTuple(lm) != 0)
         ERR_RETURN(trans->getNdbError());
 
@@ -2030,9 +1928,8 @@
   
   NdbOperation::LockMode lm=
     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
-  if (!(op= trans->getNdbIndexOperation((NDBINDEX *) 
-                                        m_index[active_index].unique_index, 
-                                        (const NDBTAB *) m_table)) ||
+  if (!(op= trans->getNdbIndexOperation(m_index[active_index].unique_index, 
+                                        m_table)) ||
       op->readTuple(lm) != 0)
     ERR_RETURN(trans->getNdbError());
   
@@ -2371,9 +2268,8 @@
     restart= FALSE;
     NdbOperation::LockMode lm=
       (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
-    if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
-                                              m_index[active_index].index, 
-                                              (const NDBTAB *) m_table)) ||
+    if (!(op= trans->getNdbIndexScanOperation(m_index[active_index].index, 
+                                              m_table)) ||
         op->readTuples(lm, 0, parallelism, sorted, descending))
       ERR_RETURN(trans->getNdbError());
     if (m_use_partition_function && part_spec != NULL &&
@@ -2443,7 +2339,7 @@
 
   NdbOperation::LockMode lm=
     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
-  if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
+  if (!(op=trans->getNdbScanOperation(m_table)) ||
       op->readTuples(lm, 0, parallelism))
     ERR_RETURN(trans->getNdbError());
   m_active_cursor= op;
@@ -2545,7 +2441,7 @@
   if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
     table->timestamp_field->set_time();
 
-  if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)))
+  if (!(op= trans->getNdbOperation(m_table)))
     ERR_RETURN(trans->getNdbError());
 
   res= (m_use_write) ? op->writeTuple() :op->insertTuple(); 
@@ -2571,7 +2467,7 @@
     Uint64 auto_value= NDB_FAILED_AUTO_INCREMENT;
     uint retries= NDB_AUTO_INCREMENT_RETRIES;
     do {
-      auto_value= ndb->getAutoIncrementValue((const NDBTAB *) m_table);
+      auto_value= ndb->getAutoIncrementValue(m_table);
     } while (auto_value == NDB_FAILED_AUTO_INCREMENT && 
              --retries &&
              ndb->getNdbError().status == NdbError::TemporaryError);
@@ -2672,7 +2568,7 @@
     DBUG_PRINT("info", 
                ("Trying to set next auto increment value to %lu",
                 (ulong) next_val));
-    if (ndb->setAutoIncrementValue((const NDBTAB *) m_table, next_val, TRUE))
+    if (ndb->setAutoIncrementValue(m_table, next_val, TRUE))
       DBUG_PRINT("info", 
                  ("Setting next auto increment value to %u", next_val));  
   }
@@ -2822,7 +2718,7 @@
   }
   else
   {  
-    if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
+    if (!(op= trans->getNdbOperation(m_table)) ||
         op->updateTuple() != 0)
       ERR_RETURN(trans->getNdbError());  
     
@@ -2931,7 +2827,7 @@
   else
   {
     
-    if (!(op=trans->getNdbOperation((const NDBTAB *) m_table)) || 
+    if (!(op=trans->getNdbOperation(m_table)) || 
         op->deleteTuple() != 0)
       ERR_RETURN(trans->getNdbError());
     
@@ -3084,7 +2980,7 @@
   {
     // Table with hidden primary key
     int hidden_no= table_share->fields;
-    const NDBTAB *tab= (const NDBTAB *) m_table;
+    const NDBTAB *tab= m_table;
     const NDBCOL *hidden_col= tab->getColumn(hidden_no);
     const NdbRecAttr* rec= m_value[hidden_no].rec;
     DBUG_ASSERT(rec);
@@ -3560,7 +3456,7 @@
       key_length= ref_length;
 #ifndef DBUG_OFF
     int hidden_no= table->s->fields;
-    const NDBTAB *tab= (const NDBTAB *) m_table;  
+    const NDBTAB *tab= m_table;  
     const NDBCOL *hidden_col= tab->getColumn(hidden_no);
     DBUG_ASSERT(hidden_col->getPrimaryKey() && 
                 hidden_col->getAutoIncrement() &&
@@ -3606,7 +3502,7 @@
       ndb->setDatabaseName(m_dbname);
       struct Ndb_statistics stat;
       if (current_thd->variables.ndb_use_exact_count &&
-          ndb_get_table_statistics(ndb, m_tabname, &stat) == 0)
+          ndb_get_table_statistics(ndb, m_table, &stat) == 0)
       {
         mean_rec_length= stat.row_size;
         data_file_length= stat.fragment_memory;
@@ -3637,7 +3533,7 @@
       Ndb *ndb= get_ndb();
       
       auto_increment_value= 
-        ndb->readAutoIncrementValue((const NDBTAB *) m_table);
+        ndb->readAutoIncrementValue(m_table);
     }
   }
   DBUG_VOID_RETURN;
@@ -3721,7 +3617,7 @@
 void ha_ndbcluster::start_bulk_insert(ha_rows rows)
 {
   int bytes, batch;
-  const NDBTAB *tab= (const NDBTAB *) m_table;    
+  const NDBTAB *tab= m_table;    
 
   DBUG_ENTER("start_bulk_insert");
   DBUG_PRINT("enter", ("rows: %d", (int)rows));
@@ -4000,41 +3896,8 @@
     // Start of transaction
     m_rows_changed= 0;
     m_ops_pending= 0;
-    {
-      NDBDICT *dict= ndb->getDictionary();
-      const NDBTAB *tab;
-      if (!(tab= dict->getTable(m_tabname)))
-        ERR_RETURN(dict->getNdbError());
-      DBUG_PRINT("info", ("Table schema version: %d", 
-                          tab->getObjectVersion()));
-      // Check if thread has stale local cache
-      // New transaction must not use old tables... (trans != 0)
-      // Running might...
-      if ((trans && tab->getObjectStatus() != NdbDictionary::Object::Retrieved)
-	  || tab->getObjectStatus() == NdbDictionary::Object::Invalid)
-      {
-        invalidate_dictionary_cache(FALSE, tab);
-        if (!(tab= dict->getTable(m_tabname)))
-          ERR_RETURN(dict->getNdbError());
-        DBUG_PRINT("info", ("Table schema version: %d", 
-                            tab->getObjectVersion()));
-      }
-      if (m_table_version < tab->getObjectVersion())
-      {
-        /*
-          The table has been altered, caller has to retry
-        */
-        NdbError err= ndb->getNdbError(NDB_INVALID_SCHEMA_OBJECT);
-        DBUG_RETURN(ndb_to_mysql_error(&err));
-      }
-      if (m_table != (void *)tab)
-      {
-        m_table= tab;
-        m_table_version = tab->getObjectVersion();
-        if (!(my_errno= open_indexes(ndb, table, FALSE)))
-          DBUG_RETURN(my_errno);
-      }
-    }
+
+    // TODO remove double pointers...
     m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table);
     m_table_info= &m_thd_ndb_share->stat;
   }
@@ -4679,6 +4542,14 @@
     my_errno= ndb_to_mysql_error(&err);
     DBUG_RETURN(my_errno);
   }
+
+  Ndb_table_guard ndbtab_g(dict, m_tabname);
+  // temporary set m_table during create
+  // reset at return
+  m_table= ndbtab_g.get_table();
+  // TODO check also that we have the same frm...
+  DBUG_ASSERT(m_table != 0);
+
   DBUG_PRINT("info", ("Table %s/%s created successfully", 
                       m_dbname, m_tabname));
 
@@ -4693,7 +4564,10 @@
       Failed to create an index,
       drop the table (and all it's indexes)
     */
-    drop_ndb_table();
+    if (dict->dropTableGlobal(*m_table) == 0)
+    {
+      m_table = 0;
+    }
   }
 
 #ifdef HAVE_NDB_BINLOG
@@ -4723,7 +4597,6 @@
 
     while (!IS_TMP_PREFIX(m_tabname))
     {
-      const NDBTAB *t= dict->getTable(m_tabname);
       String event_name(INJECTOR_EVENT_LEN);
       ndb_rep_event_name(&event_name,m_dbname,m_tabname);
       int do_event_op= ndb_binlog_running;
@@ -4737,14 +4610,14 @@
         Always create an event for the table, as other mysql servers
         expect it to be there.
       */
-      if (!ndbcluster_create_event(ndb, t, event_name.c_ptr(), share,
+      if (!ndbcluster_create_event(ndb, m_table, event_name.c_ptr(), share,
                                    share && do_event_op /* push warning */))
       {
         if (ndb_extra_logging)
           sql_print_information("NDB Binlog: CREATE TABLE Event: %s",
                                 event_name.c_ptr());
         if (share && do_event_op &&
-            ndbcluster_create_event_ops(share, t, event_name.c_ptr()))
+            ndbcluster_create_event_ops(share, m_table, event_name.c_ptr()))
         {
           sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations."
                           " Event: %s", name);
@@ -4760,13 +4633,15 @@
       ndbcluster_log_schema_op(current_thd, share,
                                current_thd->query, current_thd->query_length,
                                share->db, share->table_name,
-                               0, 0,
+                               m_table->getObjectId(),
+                               m_table->getObjectVersion(),
                                SOT_CREATE_TABLE);
       break;
     }
   }
 #endif /* HAVE_NDB_BINLOG */
 
+  m_table= 0;
   DBUG_RETURN(my_errno);
 }
 
@@ -4796,10 +4671,15 @@
   NDBDICT *dict= ndb->getDictionary();
   if (!info->frm_only)
     DBUG_RETURN(0); // Must be a create, ignore since frm is saved in create
+
+  // TODO handle this
+  DBUG_ASSERT(m_table != 0);
+
   set_dbname(file);
   set_tabname(file);
+  Ndb_table_guard ndbtab_g(dict, m_tabname);
   DBUG_PRINT("info", ("m_dbname: %s, m_tabname: %s", m_dbname, m_tabname));
-  if (!(tab= dict->getTable(m_tabname)))
+  if (!(tab= ndbtab_g.get_table()))
     DBUG_RETURN(0); // Unkown table, must be temporary table
 
   DBUG_ASSERT(get_ndb_share_state(m_share) == NSS_ALTERED);
@@ -4815,7 +4695,12 @@
   {
     DBUG_PRINT("info", ("Table %s has changed, altering frm in ndb", 
                         m_tabname));
-    error= table_changed(pack_data, pack_length);
+    NdbDictionary::Table new_tab= *tab;
+    new_tab.setFrm(pack_data, pack_length);
+    if (dict->alterTableGlobal(*tab, new_tab))
+    {
+      error= ndb_to_mysql_error(&dict->getNdbError());
+    }
     my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
     my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
   }
@@ -4918,7 +4803,7 @@
     ndb_index.addColumnName(field->field_name);
   }
   
-  if (dict->createIndex(ndb_index))
+  if (dict->createIndex(ndb_index, *m_table))
     ERR_RETURN(dict->getNdbError());
 
   // Success
@@ -4972,18 +4857,6 @@
 }
 
 /*
-  Drop an index in ndb
- */
-int ha_ndbcluster::drop_ndb_index(const char *name)
-{
-  DBUG_ENTER("ha_ndbcluster::drop_index");
-  DBUG_PRINT("enter", ("name: %s ", name));
-  Ndb *ndb= get_ndb();
-  NdbDictionary::Dictionary *dict= ndb->getDictionary();
-  DBUG_RETURN(dict->dropIndex(name, m_tabname));
-} 
-
-/*
   Mark one or several indexes for deletion. and
   renumber the remaining indexes
 */
@@ -5051,16 +4924,14 @@
   Ndb *ndb= get_ndb();
   ndb->setDatabaseName(old_dbname);
   dict= ndb->getDictionary();
-  if (!(orig_tab= dict->getTable(m_tabname)))
+  Ndb_table_guard ndbtab_g(dict, m_tabname);
+  if (!(orig_tab= ndbtab_g.get_table()))
     ERR_RETURN(dict->getNdbError());
-  // Check if thread has stale local cache
-  if (orig_tab->getObjectStatus() == NdbDictionary::Object::Invalid)
-  {
-    dict->removeCachedTable(orig_tab);
-    if (!(orig_tab= dict->getTable(m_tabname)))
-      ERR_RETURN(dict->getNdbError());
-  }
+
 #ifdef HAVE_NDB_BINLOG
+  int ndb_table_id= orig_tab->getObjectId();
+  int ndb_table_version= orig_tab->getObjectVersion();
+
   NDB_SHARE *share= get_share(from, 0, false);
   if (share)
   {
@@ -5068,13 +4939,15 @@
     DBUG_ASSERT(r == 0);
   }
 #endif
-  m_table= orig_tab;
   // Change current database to that of target table
   set_dbname(to);
   ndb->setDatabaseName(m_dbname);
 
-  if ((result= alter_table_name(new_tabname)))
+  NdbDictionary::Table new_tab= *orig_tab;
+  new_tab.setName(new_tabname);
+  if (dict->alterTableGlobal(*orig_tab, new_tab) != 0)
   {
+    NdbError ndb_error= dict->getNdbError();
 #ifdef HAVE_NDB_BINLOG
     if (share)
     {
@@ -5083,7 +4956,7 @@
       free_share(&share);
     }
 #endif
-    DBUG_RETURN(result);
+    ERR_RETURN(ndb_error);
   }
   
   // Rename .ndb file
@@ -5117,7 +4990,8 @@
     /* always create an event for the table */
     String event_name(INJECTOR_EVENT_LEN);
     ndb_rep_event_name(&event_name, to + sizeof(share_prefix) - 1, 0);
-    const NDBTAB *ndbtab= dict->getTable(new_tabname);
+    Ndb_table_guard ndbtab_g2(dict, new_tabname);
+    const NDBTAB *ndbtab= ndbtab_g2.get_table();
 
     if (!ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share,
                                  share && ndb_binlog_running /* push warning */))
@@ -5140,10 +5014,10 @@
     if (!is_old_table_tmpfile)
       ndbcluster_log_schema_op(current_thd, share,
                                current_thd->query, current_thd->query_length,
-                               m_dbname, new_tabname,
-                               0, 0,
+                               old_dbname, m_tabname,
+                               ndb_table_id, ndb_table_version,
                                SOT_RENAME_TABLE,
-                               old_dbname, m_tabname);
+                               m_dbname, new_tabname);
   }
   if (share)
     free_share(&share);
@@ -5154,30 +5028,6 @@
 
 
 /*
-  Rename a table in NDB Cluster using alter table
- */
-
-int ha_ndbcluster::alter_table_name(const char *to)
-{
-  Ndb *ndb= get_ndb();
-  NDBDICT *dict= ndb->getDictionary();
-  const NDBTAB *orig_tab= (const NDBTAB *) m_table;
-  DBUG_ENTER("alter_table_name");
-  DBUG_PRINT("info", ("from: %s to: %s", orig_tab->getName(), to));
-
-  NdbDictionary::Table new_tab= *orig_tab;
-  new_tab.setName(to);
-  if (dict->alterTable(new_tab) != 0)
-    ERR_RETURN(dict->getNdbError());
-
-  m_table= NULL;
-  m_table_info= NULL;
-                                                                             
-  DBUG_RETURN(0);
-}
-
-
-/*
   Delete table from NDB Cluster
 
  */
@@ -5193,6 +5043,8 @@
   DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
   NDBDICT *dict= ndb->getDictionary();
 #ifdef HAVE_NDB_BINLOG
+  int ndb_table_id= 0;
+  int ndb_table_version= 0;
   /*
     Don't allow drop table unless
     schema distribution table is setup
@@ -5208,14 +5060,45 @@
   /* Drop the table from NDB */
   
   int res;
-  if (h)
+  if (h && h->m_table)
   {
-    res= h->drop_ndb_table();
+    if (dict->dropTableGlobal(*h->m_table))
+      res= ndb_to_mysql_error(&dict->getNdbError());
+#ifdef HAVE_NDB_BINLOG
+    if (res == 0)
+    {
+      ndb_table_id= h->m_table->getObjectId();
+      ndb_table_version= h->m_table->getObjectVersion();
+    }
+#endif
+    h->release_metadata(current_thd, ndb);
   }
   else
   {
     ndb->setDatabaseName(db);
-    res= dict->dropTable(table_name);
+    while (1)
+    {
+      Ndb_table_guard ndbtab_g(dict, table_name);
+      if (ndbtab_g.get_table())
+      {
+        if (dict->dropTableGlobal(*ndbtab_g.get_table()) == 0)
+        {
+#ifdef HAVE_NDB_BINLOG
+          ndb_table_id= ndbtab_g.get_table()->getObjectId();
+          ndb_table_version= ndbtab_g.get_table()->getObjectVersion();
+#endif
+          res= 0;
+        }
+        else if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT)
+        {
+          ndbtab_g.invalidate();
+          continue;
+        }
+      }
+      else
+        res= ndb_to_mysql_error(&dict->getNdbError());
+      break;
+    }
   }
 
   if (res)
@@ -5257,7 +5140,7 @@
     ndbcluster_log_schema_op(current_thd, share,
                              current_thd->query, current_thd->query_length,
                              share->db, share->table_name,
-                             0, 0,
+                             ndb_table_id, ndb_table_version,
                              SOT_DROP_TABLE);
   }
   else if (table_dropped && share && share->op) /* ndbcluster_log_schema_op
@@ -5321,24 +5204,6 @@
 }
 
 
-/*
-  Drop table in NDB Cluster
- */
-
-int ha_ndbcluster::drop_ndb_table()
-{
-  Ndb *ndb= get_ndb();
-  NdbDictionary::Dictionary *dict= ndb->getDictionary();
-
-  DBUG_ENTER("intern_drop_table");
-  DBUG_PRINT("enter", ("Deleting %s", m_tabname));
-  release_metadata();
-  if (dict->dropTable(m_tabname))
-    ERR_RETURN(dict->getNdbError());
-  DBUG_RETURN(0);
-}
-
-
 ulonglong ha_ndbcluster::get_auto_increment()
 {  
   int cache_size;
@@ -5362,8 +5227,8 @@
   do {
     auto_value=
       (m_skip_auto_increment) ? 
-      ndb->readAutoIncrementValue((const NDBTAB *) m_table)
-      : ndb->getAutoIncrementValue((const NDBTAB *) m_table, cache_size);
+      ndb->readAutoIncrementValue(m_table)
+      : ndb->getAutoIncrementValue(m_table, cache_size);
   } while (auto_value == NDB_FAILED_AUTO_INCREMENT && 
            --retries &&
            ndb->getNdbError().status == NdbError::TemporaryError);
@@ -5397,7 +5262,6 @@
   m_active_trans(NULL),
   m_active_cursor(NULL),
   m_table(NULL),
-  m_table_version(-1),
   m_table_info(NULL),
   m_table_flags(HA_NDBCLUSTER_TABLE_FLAGS),
   m_share(0),
@@ -5443,19 +5307,31 @@
 }
 
 
+int ha_ndbcluster::ha_initialise()
+{
+  DBUG_ENTER("ha_ndbcluster::ha_initialise");
+  if (check_ndb_in_thd(current_thd))
+  {
+    DBUG_RETURN(FALSE);
+  }
+  DBUG_RETURN(TRUE);
+}
+
 /*
   Destructor for NDB Cluster table handler
  */
 
 ha_ndbcluster::~ha_ndbcluster() 
 {
+  THD *thd= current_thd;
+  Ndb *ndb= thd ? check_ndb_in_thd(thd) : g_ndb;
   DBUG_ENTER("~ha_ndbcluster");
 
   if (m_share)
   {
     free_share(&m_share);
   }
-  release_metadata();
+  release_metadata(thd, ndb);
   my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
   m_blobs_buffer= 0;
 
@@ -5570,10 +5446,12 @@
 
 int ha_ndbcluster::close(void)
 {
-  DBUG_ENTER("close");  
+  DBUG_ENTER("close");
+  THD *thd= current_thd;
+  Ndb *ndb= thd ? check_ndb_in_thd(thd) : g_ndb;
   free_share(&m_share);
   m_share= 0;
-  release_metadata();
+  release_metadata(thd, ndb);
   DBUG_RETURN(0);
 }
 
@@ -5661,9 +5539,10 @@
 int ndbcluster_discover(THD* thd, const char *db, const char *name,
                         const void** frmblob, uint* frmlen)
 {
+  int error= 0;
+  NdbError ndb_error;
   uint len;
   const void* data;
-  const NDBTAB* tab;
   Ndb* ndb;
   char key[FN_REFLEN];
   DBUG_ENTER("ndbcluster_discover");
@@ -5673,7 +5552,6 @@
     DBUG_RETURN(HA_ERR_NO_CONNECTION);  
   ndb->setDatabaseName(db);
   NDBDICT* dict= ndb->getDictionary();
-  dict->invalidateTable(name);
   build_table_filename(key, sizeof(key), db, name, "");
   NDB_SHARE *share= get_share(key, 0, false);
   if (share && get_ndb_share_state(share) == NSS_ALTERED)
@@ -5682,21 +5560,22 @@
     if (readfrm(key, &data, &len))
     {
       DBUG_PRINT("error", ("Could not read frm"));
-      if (share)
-        free_share(&share);
-      DBUG_RETURN(1);
+      error= 1;
+      goto err;
     }
   }
   else
   {
-    if (!(tab= dict->getTable(name)))
-    {    
+    Ndb_table_guard ndbtab_g(dict, name);
+    const NDBTAB *tab= ndbtab_g.get_table();
+    if (!tab)
+    {
       const NdbError err= dict->getNdbError();
-      if (share)
-        free_share(&share);
       if (err.code == 709 || err.code == 723)
-        DBUG_RETURN(-1);
-      ERR_RETURN(err);
+        error= -1;
+      else
+        ndb_error= err;
+      goto err;
     }
     DBUG_PRINT("info", ("Found table %s", tab->getName()));
     
@@ -5704,17 +5583,15 @@
     if (len == 0 || tab->getFrmData() == NULL)
     {
       DBUG_PRINT("error", ("No frm data found."));
-      if (share)
-        free_share(&share);
-      DBUG_RETURN(1);
+      error= 1;
+      goto err;
     }
     
     if (unpackfrm(&data, &len, tab->getFrmData()))
     {
       DBUG_PRINT("error", ("Could not unpack table"));
-      if (share)
-        free_share(&share);
-      DBUG_RETURN(1);
+      error= 1;
+      goto err;
     }
   }
 
@@ -5725,6 +5602,14 @@
     free_share(&share);
 
   DBUG_RETURN(0);
+err:
+  if (share)
+    free_share(&share);
+  if (ndb_error.code)
+  {
+    ERR_RETURN(ndb_error);
+  }
+  DBUG_RETURN(error);
 }
 
 /*
@@ -5732,29 +5617,32 @@
 
  */
 
-int ndbcluster_table_exists_in_engine(THD* thd, const char *db, const char *name)
+int ndbcluster_table_exists_in_engine(THD* thd, const char *db,
+                                      const char *name)
 {
-  const NDBTAB* tab;
   Ndb* ndb;
   DBUG_ENTER("ndbcluster_table_exists_in_engine");
   DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
 
   if (!(ndb= check_ndb_in_thd(thd)))
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
-  ndb->setDatabaseName(db);
 
   NDBDICT* dict= ndb->getDictionary();
-  dict->invalidateTable(name);
-  if (!(tab= dict->getTable(name)))
+  NdbDictionary::Dictionary::List list;
+  if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
+    ERR_RETURN(dict->getNdbError());
+  for (int i= 0 ; i < list.count ; i++)
   {
-    const NdbError err= dict->getNdbError();
-    if (err.code == 709 || err.code == 723)
-      DBUG_RETURN(0);
-    ERR_RETURN(err);
+    NdbDictionary::Dictionary::List::Element& elmt= list.elements[i];
+    if (my_strcasecmp(system_charset_info, elmt.database, db))
+      continue;
+    if (my_strcasecmp(system_charset_info, elmt.name, name))
+      continue;
+    // table found
+    DBUG_PRINT("info", ("Found table"));
+    DBUG_RETURN(1);
   }
-
-  DBUG_PRINT("info", ("Found table %s", tab->getName()));
-  DBUG_RETURN(1);
+  DBUG_RETURN(0);
 }
 
 
@@ -5908,9 +5796,9 @@
       }
 
       ndb->setDatabaseName(elmt.database);
-      const NDBTAB *ndbtab;
-
-      if (!(ndbtab= dict->getTable(elmt.name)))
+      Ndb_table_guard ndbtab_g(dict, elmt.name);
+      const NDBTAB *ndbtab= ndbtab_g.get_table();
+      if (!ndbtab)
       {
         if (retries == 0)
           sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
@@ -6190,11 +6078,13 @@
   return 0;
 }
 
+extern int ndb_dictionary_is_mysqld;
 static bool ndbcluster_init()
 {
   int res;
   DBUG_ENTER("ndbcluster_init");
 
+  ndb_dictionary_is_mysqld= 1;
   if (have_ndbcluster != SHOW_OPTION_YES)
     goto ndbcluster_init_error;
 
@@ -6326,6 +6216,24 @@
   if (!ndbcluster_inited)
     DBUG_RETURN(0);
 
+#ifdef HAVE_NDB_BINLOG
+  {
+    pthread_mutex_lock(&ndbcluster_mutex);
+    for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+    {
+      NDB_SHARE *share=
+        (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
+#ifndef DBUG_OFF
+      fprintf(stderr, "NDB: table share %s with use_count %d not freed\n",
+              share->key, share->use_count);
+#endif
+      real_free_share(&share);
+    }
+    pthread_mutex_unlock(&ndbcluster_mutex);
+  }
+#endif
+  hash_free(&ndbcluster_open_tables);
+
   if (g_ndb)
   {
 #ifndef DBUG_OFF
@@ -6347,23 +6255,6 @@
   delete g_ndb_cluster_connection;
   g_ndb_cluster_connection= NULL;
 
-#ifdef HAVE_NDB_BINLOG
-  {
-    pthread_mutex_lock(&ndbcluster_mutex);
-    for (uint i= 0; i < ndbcluster_open_tables.records; i++)
-    {
-      NDB_SHARE *share=
-        (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
-#ifndef DBUG_OFF
-      fprintf(stderr, "NDB: table share %s with use_count %d not freed\n",
-              share->key, share->use_count);
-#endif
-      real_free_share(&share);
-    }
-    pthread_mutex_unlock(&ndbcluster_mutex);
-  }
-#endif
-  hash_free(&ndbcluster_open_tables);
   pthread_mutex_destroy(&ndbcluster_mutex);
   pthread_mutex_destroy(&LOCK_ndb_util_thread);
   pthread_cond_destroy(&COND_ndb_util_thread);
@@ -6524,7 +6415,7 @@
     m_index[inx].index_stat != NULL)
   {
     NDB_INDEX_DATA& d=m_index[inx];
-    NDBINDEX* index=(NDBINDEX*)d.index;
+    const NDBINDEX* index= d.index;
     Ndb* ndb=get_ndb();
     NdbTransaction* trans=NULL;
     NdbIndexScanOperation* op=NULL;
@@ -6544,7 +6435,7 @@
       else
       {
         Ndb_statistics stat;
-        if ((res=ndb_get_table_statistics(ndb, m_tabname, &stat)) != 0)
+        if ((res=ndb_get_table_statistics(ndb, m_table, &stat)) != 0)
           break;
         table_rows=stat.row_count;
         DBUG_PRINT("info", ("use db row_count: %llu", table_rows));
@@ -6705,10 +6596,14 @@
   pthread_mutex_unlock(&share->mutex);
 
   struct Ndb_statistics stat;
-  if (ndb_get_table_statistics(ndb, tabname, &stat))
   {
-    free_share(&share);
-    DBUG_RETURN(1);
+    Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
+    if (ndbtab_g.get_table() == 0
+        || ndb_get_table_statistics(ndb, ndbtab_g.get_table(), &stat))
+    {
+      free_share(&share);
+      DBUG_RETURN(1);
+    }
   }
 
   pthread_mutex_lock(&share->mutex);
@@ -6922,7 +6817,11 @@
   ++share->use_count;
   pthread_mutex_unlock(&ndbcluster_mutex);
 
-  close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
+  TABLE_LIST table_list;
+  bzero((char*) &table_list,sizeof(table_list));
+  table_list.db= share->db;
+  table_list.alias= table_list.table_name= share->table_name;
+  close_cached_tables(current_thd, 0, &table_list, TRUE);
 
   pthread_mutex_lock(&ndbcluster_mutex);
   if (!--share->use_count)
@@ -7268,15 +7167,17 @@
 
 static 
 int
-ndb_get_table_statistics(Ndb* ndb, const char * table,
+ndb_get_table_statistics(Ndb* ndb, const NDBTAB *ndbtab,
                          struct Ndb_statistics * ndbstat)
 {
   DBUG_ENTER("ndb_get_table_statistics");
-  DBUG_PRINT("enter", ("table: %s", table));
+  DBUG_PRINT("enter", ("table: %s", ndbtab->getName()));
   NdbTransaction* pTrans;
   int retries= 10;
   int retry_sleep= 30 * 1000; /* 30 milliseconds */
 
+  DBUG_ASSERT(ndbtab != 0);
+
   do
   {
     pTrans= ndb->startTransaction();
@@ -7291,7 +7192,7 @@
       ERR_RETURN(ndb->getNdbError());
     }
 
-    NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
+    NdbScanOperation* pOp= pTrans->getNdbScanOperation(ndbtab);
     if (pOp == NULL)
       break;
     
@@ -7452,9 +7353,9 @@
   byte *end_of_buffer= (byte*)buffer->buffer_end;
   NdbOperation::LockMode lm= 
     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
-  const NDBTAB *tab= (const NDBTAB *) m_table;
-  const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index;
-  const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index; 
+  const NDBTAB *tab= m_table;
+  const NDBINDEX *unique_idx= m_index[active_index].unique_index;
+  const NDBINDEX *idx= m_index[active_index].index; 
   const NdbOperation* lastOp= m_active_trans->getLastDefinedOperation();
   NdbIndexScanOperation* scanOp= 0;
   for (; multi_range_curr<multi_range_end && curr+reclength <= end_of_buffer; 
@@ -7803,11 +7704,8 @@
 
   ndb->setDatabaseName(m_dbname);
   NDBDICT* dict= ndb->getDictionary();
-  const NDBTAB* tab;
-  if (!(tab= dict->getTable(m_tabname)))
-  {
-    return((char*)comment);
-  }
+  const NDBTAB* tab= m_table;
+  DBUG_ASSERT(tab != NULL);
 
   char *str;
   const char *fmt="%s%snumber_of_replicas: %d";
@@ -7993,18 +7891,22 @@
       lock= share->commit_count_lock;
       pthread_mutex_unlock(&share->mutex);
 
-      if (ndb_get_table_statistics(ndb, share->table_name, &stat) == 0)
       {
-        DBUG_PRINT("ndb_util_thread",
-                   ("Table: %s, commit_count: %llu, rows: %llu",
-                    share->key, stat.commit_count, stat.row_count));
-      }
-      else
-      {
-        DBUG_PRINT("ndb_util_thread",
-                   ("Error: Could not get commit count for table %s",
-                    share->key));
-        stat.commit_count= 0;
+        Ndb_table_guard ndbtab_g(ndb->getDictionary(), share->table_name);
+        if (ndbtab_g.get_table() &&
+            ndb_get_table_statistics(ndb, ndbtab_g.get_table(), &stat) == 0)
+        {
+          DBUG_PRINT("ndb_util_thread",
+                     ("Table: %s, commit_count: %llu, rows: %llu",
+                      share->key, stat.commit_count, stat.row_count));
+        }
+        else
+        {
+          DBUG_PRINT("ndb_util_thread",
+                     ("Error: Could not get commit count for table %s",
+                      share->key));
+          stat.commit_count= 0;
+        }
       }
 
       pthread_mutex_lock(&share->mutex);
@@ -9396,12 +9298,8 @@
   NdbError ndberr;
   Uint32 id;
   ndb->setDatabaseName(m_dbname);
-  const NDBTAB *ndbtab= ndbdict->getTable(m_tabname);
-  if (ndbtab == 0)
-  {
-    ndberr= ndbdict->getNdbError();
-    goto err;
-  }
+  const NDBTAB *ndbtab= m_table;
+  DBUG_ASSERT(ndbtab != NULL);
   if (!ndbtab->getTablespace(&id))
   {
     return 0;
@@ -10066,17 +9964,10 @@
     }
     ndb= get_ndb();
     ndb->setDatabaseName(m_dbname);
-    dict= ndb->getDictionary();
-    if (!(tab= dict->getTable(m_tabname)))
+    Ndb_table_guard ndbtab_g(dict= ndb->getDictionary(), m_tabname);
+    if (!ndbtab_g.get_table())
       ERR_BREAK(dict->getNdbError(), err);
-    // Check if thread has stale local cache
-    if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
-    {
-      invalidate_dictionary_cache(FALSE, tab);
-      if (!(tab= dict->getTable(m_tabname)))
-         ERR_BREAK(dict->getNdbError(), err);
-    }
-    *no_parts= tab->getFragmentCount();
+    *no_parts= ndbtab_g.get_table()->getFragmentCount();
     DBUG_RETURN(FALSE);
   } while (1);
 

--- 1.48/sql/ha_ndbcluster_binlog.cc	2006-04-24 22:28:18 +02:00
+++ 1.49/sql/ha_ndbcluster_binlog.cc	2006-05-03 21:09:44 +02:00
@@ -987,7 +987,7 @@
                              uint32 ndb_table_id,
                              uint32 ndb_table_version,
                              enum SCHEMA_OP_TYPE type,
-                             const char *old_db, const char *old_table_name)
+                             const char *new_db, const char *new_table_name)
 {
   DBUG_ENTER("ndbcluster_log_schema_op");
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -1027,8 +1027,8 @@
     /* redo the rename table query as is may contain several tables */
     query= tmp_buf2;
     query_length= (uint) (strxmov(tmp_buf2, "rename table `",
-                                  old_db, ".", old_table_name, "` to `",
-                                  db, ".", table_name, "`", NullS) - tmp_buf2);
+                                  db, ".", table_name, "` to `",
+                                  new_db, ".", new_table_name, "`", NullS) - tmp_buf2);
     type_str= "rename table";
     break;
   case SOT_CREATE_TABLE:
@@ -1068,6 +1068,8 @@
   Uint64 epoch= 0;
   MY_BITMAP schema_subscribers;
   uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
+  uint32 bitbuf_e[sizeof(bitbuf)];
+  bzero((char *)bitbuf_e, sizeof(bitbuf_e));
   {
     int i, updated= 0;
     int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
@@ -1111,7 +1113,8 @@
   char tmp_buf[FN_REFLEN];
   NDBDICT *dict= ndb->getDictionary();
   ndb->setDatabaseName(NDB_REP_DB);
-  const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
+  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
+  const NDBTAB *ndbtab= ndbtab_g.get_table();
   NdbTransaction *trans= 0;
   int retries= 100;
   const NDBCOL *col[SCHEMA_SIZE];
@@ -1142,8 +1145,13 @@
 
   while (1)
   {
+    const char *log_db= db;
+    const char *log_tab= table_name;
+    const char *log_subscribers= (char*)schema_subscribers.bitmap;
+    uint32 log_type= (uint32)type;
     if ((trans= ndb->startTransaction()) == 0)
       goto err;
+    while (1)
     {
       NdbOperation *op= 0;
       int r= 0;
@@ -1153,17 +1161,17 @@
       DBUG_ASSERT(r == 0);
       
       /* db */
-      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
+      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
       r|= op->equal(SCHEMA_DB_I, tmp_buf);
       DBUG_ASSERT(r == 0);
       /* name */
-      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
-                       strlen(table_name));
+      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
+                       strlen(log_tab));
       r|= op->equal(SCHEMA_NAME_I, tmp_buf);
       DBUG_ASSERT(r == 0);
       /* slock */
       DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
-      r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap);
+      r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
       DBUG_ASSERT(r == 0);
       /* query */
       {
@@ -1187,8 +1195,17 @@
       r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
       DBUG_ASSERT(r == 0);
       /* type */
-      r|= op->setValue(SCHEMA_TYPE_I, (uint32)type);
+      r|= op->setValue(SCHEMA_TYPE_I, log_type);
       DBUG_ASSERT(r == 0);
+      if (log_db != new_db && new_db && new_table_name)
+      {
+        log_db= new_db;
+        log_tab= new_table_name;
+        log_subscribers= (const char *)bitbuf_e; // no ack expected on this
+        log_type= (uint32)SOT_RENAME_TABLE_NEW;
+        continue;
+      }
+      break;
     }
     if (trans->execute(NdbTransaction::Commit) == 0)
     {
@@ -1307,7 +1324,8 @@
   char tmp_buf[FN_REFLEN];
   NDBDICT *dict= ndb->getDictionary();
   ndb->setDatabaseName(NDB_REP_DB);
-  const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
+  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
+  const NDBTAB *ndbtab= ndbtab_g.get_table();
   NdbTransaction *trans= 0;
   int retries= 100;
   const NDBCOL *col[SCHEMA_SIZE];
@@ -1453,31 +1471,28 @@
   {
     if (pOp->tableFrmChanged())
     {
+      DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed"));
       is_online_alter_table= TRUE;
     }
     else
     {
+      DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed"));
       DBUG_ASSERT(pOp->tableNameChanged());
       is_rename_table= TRUE;
     }
   }
 
-  /*
-    Refresh local dictionary cache by
-    invalidating table and all it's indexes
-  */
-  ndb->setDatabaseName(dbname);
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  DBUG_ASSERT(thd_ndb != NULL);
-  Ndb* old_ndb= thd_ndb->ndb;
-  thd_ndb->ndb= ndb;
-  ha_ndbcluster table_handler(table_share);
-  (void)strxmov(table_handler.m_dbname, dbname, NullS);
-  (void)strxmov(table_handler.m_tabname, tabname, NullS);
-  table_handler.open_indexes(ndb, table, TRUE);
-  table_handler.invalidate_dictionary_cache(TRUE, 0);
-  thd_ndb->ndb= old_ndb;
-  
+  {
+    ndb->setDatabaseName(dbname);
+    Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
+    const NDBTAB *ev_tab= pOp->getTable();
+    const NDBTAB *cache_tab= ndbtab_g.get_table();
+    if (cache_tab &&
+        cache_tab->getObjectId() == ev_tab->getObjectId() &&
+        cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
+      ndbtab_g.invalidate();
+  }
+
   /*
     Refresh local frm file and dictionary cache if
     remote on-line alter table
@@ -1506,7 +1521,8 @@
       DBUG_DUMP("frm", (char*)altered_table->getFrmData(), 
                 altered_table->getFrmLength());
       pthread_mutex_lock(&LOCK_open);
-      const NDBTAB *old= dict->getTable(tabname);
+      Ndb_table_guard ndbtab_g(dict, tabname);
+      const NDBTAB *old= ndbtab_g.get_table();
       if (!old &&
           old->getObjectVersion() != altered_table->getObjectVersion())
         dict->putTable(altered_table);
@@ -1518,7 +1534,13 @@
                               dbname, tabname, error);
       }
       ndbcluster_binlog_close_table(thd, share);
-      close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
+
+      TABLE_LIST table_list;
+      bzero((char*) &table_list,sizeof(table_list));
+      table_list.db= (char *)dbname;
+      table_list.alias= table_list.table_name= (char *)tabname;
+      close_cached_tables(thd, 0, &table_list, TRUE);
+
       if ((error= ndbcluster_binlog_open_table(thd, share, 
                                                table_share, table)))
         sql_print_information("NDB: Failed to re-open table %s.%s",
@@ -1546,26 +1568,22 @@
                             share_prefix, share->table->s->db.str,
                             share->table->s->table_name.str,
                             share->key);
+    {
+      ndb->setDatabaseName(share->table->s->db.str);
+      Ndb_table_guard ndbtab_g(ndb->getDictionary(),
+                               share->table->s->table_name.str);
+      const NDBTAB *ev_tab= pOp->getTable();
+      const NDBTAB *cache_tab= ndbtab_g.get_table();
+      if (cache_tab &&
+          cache_tab->getObjectId() == ev_tab->getObjectId() &&
+          cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
+        ndbtab_g.invalidate();
+    }
     /* do the rename of the table in the share */
     share->table->s->db.str= share->db;
     share->table->s->db.length= strlen(share->db);
     share->table->s->table_name.str= share->table_name;
     share->table->s->table_name.length= strlen(share->table_name);
-    /*
-      Refresh local dictionary cache by invalidating any 
-      old table with same name and all it's indexes
-    */
-    ndb->setDatabaseName(dbname);
-    Thd_ndb *thd_ndb= get_thd_ndb(thd);
-    DBUG_ASSERT(thd_ndb != NULL);
-    Ndb* old_ndb= thd_ndb->ndb;
-    thd_ndb->ndb= ndb;
-    ha_ndbcluster table_handler(table_share);
-    table_handler.set_dbname(share->key);
-    table_handler.set_tabname(share->key);
-    table_handler.open_indexes(ndb, table, TRUE);
-    table_handler.invalidate_dictionary_cache(TRUE, 0);
-    thd_ndb->ndb= old_ndb;
   }
   DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
   if (share->op_old == pOp)
@@ -1583,14 +1601,19 @@
   if (is_remote_change && share && share->state != NSS_DROPPED)
   {
     DBUG_PRINT("info", ("remote change"));
+    share->state= NSS_DROPPED;
     if (share->use_count != 1)
       do_close_cached_tables= TRUE;
-    share->state= NSS_DROPPED;
-    free_share(&share, TRUE);
+    else
+    {
+      free_share(&share, TRUE);
+      share= 0;
+    }
   }
+  else
+    share= 0;
   pthread_mutex_unlock(&ndbcluster_mutex);
 
-  share= 0;
   pOp->setCustomData(0);
 
   pthread_mutex_lock(&injector_mutex);
@@ -1599,7 +1622,14 @@
   pthread_mutex_unlock(&injector_mutex);
 
   if (do_close_cached_tables)
-    close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
+  {
+    TABLE_LIST table_list;
+    bzero((char*) &table_list,sizeof(table_list));
+    table_list.db= (char *)dbname;
+    table_list.alias= table_list.table_name= (char *)tabname;
+    close_cached_tables(thd, 0, &table_list);
+    free_share(&share);
+  }
   DBUG_RETURN(0);
 }
 
@@ -1631,53 +1661,27 @@
       if (schema->node_id != node_id)
       {
         int log_query= 0, post_epoch_unlock= 0;
-        DBUG_PRINT("info", ("log query_length: %d  query: '%s'",
-                            schema->query_length, schema->query));
+        DBUG_PRINT("info",
+                   ("%s.%s: log query_length: %d  query: '%s'  type: %d",
+                    schema->db, schema->name,
+                    schema->query_length, schema->query,
+                    schema->type));
         char key[FN_REFLEN];
         build_table_filename(key, sizeof(key), schema->db, schema->name, "");
-        NDB_SHARE *share= get_share(key, 0, false, false);
         switch ((enum SCHEMA_OP_TYPE)schema->type)
         {
         case SOT_DROP_TABLE:
-          /* binlog dropping table after any table operations */
-          if (share && share->op)
-          {
-            post_epoch_log_list->push_back(schema, mem_root);
-            /* acknowledge this query _after_ epoch completion */
-            post_epoch_unlock= 1;
-          }
-          /* table is either ignored or logging is postponed to later */
-          log_query= 0;
-          break;
+          // fall through
         case SOT_RENAME_TABLE:
-          if (share && share->op)
-          {
-            post_epoch_log_list->push_back(schema, mem_root);
-            /* acknowledge this query _after_ epoch completion */
-            post_epoch_unlock= 1;
-            break; /* discovery will be handled by binlog */
-          }
-          goto sot_create_table;
+          // fall through
+        case SOT_RENAME_TABLE_NEW:
+          // fall through
         case SOT_ALTER_TABLE:
-          if (share && share->op)
-          {
-            post_epoch_log_list->push_back(schema, mem_root);
-            /* acknowledge this query _after_ epoch completion */
-            post_epoch_unlock= 1;
-            break; /* discovery will be handled by binlog */
-          }
-          goto sot_create_table;
+          post_epoch_log_list->push_back(schema, mem_root);
+          /* acknowledge this query _after_ epoch completion */
+          post_epoch_unlock= 1;
+          break;
         case SOT_CREATE_TABLE:
-      sot_create_table:
-          /*
-            we need to free any share here as command below
-            may need to call handle_trailing_share
-          */
-          if (share)
-          {
-            free_share(&share);
-            share= 0;
-          }
           pthread_mutex_lock(&LOCK_open);
           if (ndb_create_table_from_engine(thd, schema->db, schema->name))
           {
@@ -1695,12 +1699,9 @@
                     TRUE,    /* print error */
                     TRUE);   /* don't binlog the query */
           /* binlog dropping database after any table operations */
-          if (ndb_binlog_running)
-          {
-            post_epoch_log_list->push_back(schema, mem_root);
-            /* acknowledge this query _after_ epoch completion */
-            post_epoch_unlock= 1;
-          }
+          post_epoch_log_list->push_back(schema, mem_root);
+          /* acknowledge this query _after_ epoch completion */
+          post_epoch_unlock= 1;
           break;
         case SOT_CREATE_DB:
           /* fall through */
@@ -1727,8 +1728,6 @@
             pthread_mutex_unlock(&ndb_schema_object->mutex);
             pthread_cond_signal(&injector_cond);
           }
-          if (share)
-            free_share(&share, TRUE);
           pthread_mutex_unlock(&ndbcluster_mutex);
           DBUG_RETURN(0);
         }
@@ -1737,11 +1736,6 @@
           log_query= 1;
           break;
         }
-        if (share)
-        {
-          free_share(&share);
-          share= 0;
-        }
         if (log_query && ndb_binlog_running)
         {
           char *thd_db_save= thd->db;
@@ -1865,36 +1859,81 @@
                                                  List<Cluster_schema>
                                                  *post_epoch_unlock_list)
 {
+  if (post_epoch_log_list->elements == 0)
+    return;
   DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
   Cluster_schema *schema;
   while ((schema= post_epoch_log_list->pop()))
   {
-    DBUG_PRINT("info", ("log query_length: %d  query: '%s'",
-                        schema->query_length, schema->query));
+    DBUG_PRINT("info",
+               ("%s.%s: log query_length: %d  query: '%s'  type: %d",
+                schema->db, schema->name,
+                schema->query_length, schema->query,
+                schema->type));
+    int log_query= 0;
     {
       char key[FN_REFLEN];
       build_table_filename(key, sizeof(key), schema->db, schema->name, "");
       NDB_SHARE *share= get_share(key, 0, false, false);
-      switch ((enum SCHEMA_OP_TYPE)schema->type)
+      enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
+      switch (schema_type)
       {
       case SOT_DROP_DB:
-      case SOT_DROP_TABLE:
+        log_query= 1;
         break;
+      case SOT_DROP_TABLE:
+        // invalidation already handled by binlog thread
+        if (share && share->op)
+        {
+          log_query= 1;
+          break;
+        }
+        // fall through
       case SOT_RENAME_TABLE:
+        // fall through
       case SOT_ALTER_TABLE:
-        if (share && share->op)
+        // invalidation already handled by binlog thread
+        if (!share || !share->op)
         {
-          break; /* discovery handled by binlog */
+          {
+            injector_ndb->setDatabaseName(schema->db);
+            Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
+                                     schema->name);
+            ndbtab_g.invalidate();
+          }
+          TABLE_LIST table_list;
+          bzero((char*) &table_list,sizeof(table_list));
+          table_list.db= schema->db;
+          table_list.alias= table_list.table_name= schema->name;
+          close_cached_tables(thd, 0, &table_list, FALSE);
         }
-        pthread_mutex_lock(&LOCK_open);
-        if (ndb_create_table_from_engine(thd, schema->db, schema->name))
+        if (schema_type != SOT_ALTER_TABLE)
+          break;
+        // fall through
+      case SOT_RENAME_TABLE_NEW:
+        log_query= 1;
+        if (ndb_binlog_running)
         {
-          sql_print_error("Could not discover table '%s.%s' from "
-                          "binlog schema event '%s' from node %d",
-                          schema->db, schema->name, schema->query,
-                          schema->node_id);
+          /*
+            we need to free any share here as command below
+            may need to call handle_trailing_share
+          */
+          if (share)
+          {
+            free_share(&share);
+            share= 0;
+          }
+          pthread_mutex_lock(&LOCK_open);
+          if (ndb_create_table_from_engine(thd, schema->db, schema->name))
+          {
+            sql_print_error("Could not discover table '%s.%s' from "
+                            "binlog schema event '%s' from node %d",
+                            schema->db, schema->name, schema->query,
+                            schema->node_id);
+          }
+          pthread_mutex_unlock(&LOCK_open);
         }
-        pthread_mutex_unlock(&LOCK_open);
+        break;
       default:
         DBUG_ASSERT(false);
       }
@@ -1904,6 +1943,7 @@
         share= 0;
       }
     }
+    if (ndb_binlog_running && log_query)
     {
       char *thd_db_save= thd->db;
       thd->db= schema->db;
@@ -2187,7 +2227,8 @@
     ndb->setDatabaseName(db);
 
     NDBDICT *dict= ndb->getDictionary();
-    const NDBTAB *ndbtab= dict->getTable(table_name);
+    Ndb_table_guard ndbtab_g(dict, table_name);
+    const NDBTAB *ndbtab= ndbtab_g.get_table();
     if (ndbtab == 0)
     {
       if (ndb_extra_logging)
@@ -2202,7 +2243,8 @@
       event should have been created by someone else,
       but let's make sure, and create if it doesn't exist
     */
-    if (!dict->getEvent(event_name.c_ptr()))
+    const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
+    if (!ev)
     {
       if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share))
       {
@@ -2217,9 +2259,12 @@
                               event_name.c_ptr());
     }
     else
+    {
+      delete ev;
       if (ndb_extra_logging)
         sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
                               event_name.c_ptr());
+    }
 
     /*
       create the event operations for receiving logging events
@@ -2329,8 +2374,10 @@
       try retrieving the event, if table version/id matches, we will get
       a valid event.  Otherwise we have a trailing event from before
     */
-    if (dict->getEvent(event_name))
+    const NDBEVENT *ev;
+    if ((ev= dict->getEvent(event_name)))
     {
+      delete ev;
       DBUG_RETURN(0);
     }
 
Thread
bk commit into 5.1 tree (tomas:1.2373)tomas3 May