List:Internals« Previous MessageNext Message »
From:msvensson Date:January 4 2006 4:07pm
Subject:bk commit into 5.0 tree (msvensson:1.1990) BUG#13228
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of msvensson. When msvensson does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1990 06/01/04 16:06:52 msvensson@neptunus.(none) +19 -0
  Bug #13228  	open table cache not flushed when table schema changed
   - Rework open/create
   - Remove Ndb_local_table_info and use new thd_ndb_share instead
   - Close open handlers which are not in use when table schema version error detected.

  mysql-test/t/ndb_count.test
    1.1 06/01/04 16:06:06 msvensson@neptunus.(none) +141 -0
    New BitKeeper file ``mysql-test/t/ndb_count.test''

  mysql-test/t/ndb_alter_table2.test
    1.1 06/01/04 16:06:06 msvensson@neptunus.(none) +84 -0
    New BitKeeper file ``mysql-test/t/ndb_alter_table2.test''

  mysql-test/r/ndb_count.result
    1.1 06/01/04 16:06:06 msvensson@neptunus.(none) +150 -0
    New BitKeeper file ``mysql-test/r/ndb_count.result''

  mysql-test/t/ndb_count.test
    1.0 06/01/04 16:06:06 msvensson@neptunus.(none) +0 -0
    BitKeeper file
/home/msvensson/mysql/bug13228/my50-bug13228/mysql-test/t/ndb_count.test

  mysql-test/t/ndb_alter_table2.test
    1.0 06/01/04 16:06:06 msvensson@neptunus.(none) +0 -0
    BitKeeper file
/home/msvensson/mysql/bug13228/my50-bug13228/mysql-test/t/ndb_alter_table2.test

  mysql-test/r/ndb_count.result
    1.0 06/01/04 16:06:06 msvensson@neptunus.(none) +0 -0
    BitKeeper file
/home/msvensson/mysql/bug13228/my50-bug13228/mysql-test/r/ndb_count.result

  mysql-test/r/ndb_alter_table2.result
    1.1 06/01/04 16:06:05 msvensson@neptunus.(none) +42 -0
    New BitKeeper file ``mysql-test/r/ndb_alter_table2.result''

  sql/ha_ndbcluster.h
    1.94 06/01/04 16:06:05 msvensson@neptunus.(none) +39 -36
    Add thd_ndb_share struct
    Add comments 
    Make functions that should not touch ha_ndbcluster member variables static

  sql/ha_ndbcluster.cc
    1.222 06/01/04 16:06:05 msvensson@neptunus.(none) +1059 -1020
    Rework create and open
     - Separate functionality for opening a handle for data acess and  
       when it's only used for metadata operations. 
     - Add comments to functions.
    Comment and rework external_lock
    Use getGlobalTable and getGlobalIndex to make sure the m_table pointer is kept until
relaseGlobalTable/Index is called.
    Implement hash list in thd_ndb for sharing between open tables with same name in one
thd.

  ndb/src/ndbapi/NdbDictionaryImpl.hpp
    1.34 06/01/04 16:06:05 msvensson@neptunus.(none) +78 -39
    Add functions for getting and releasing table and indexes from global cache 
    Remove Ndb_local_table_info

  ndb/src/ndbapi/NdbDictionaryImpl.cpp
    1.85 06/01/04 16:06:05 msvensson@neptunus.(none) +66 -104
    Add functions for getting and releasing table and indexes from global cache 
    Remove Ndb_local_table_info

  ndb/src/ndbapi/NdbDictionary.cpp
    1.36 06/01/04 16:06:05 msvensson@neptunus.(none) +35 -8
    Remove Ndb_local_table_info

  ndb/src/ndbapi/Ndb.cpp
    1.55 06/01/04 16:06:05 msvensson@neptunus.(none) +6 -8
    Remove Ndb_local_table_info

  ndb/src/ndbapi/DictCache.hpp
    1.11 06/01/04 16:06:05 msvensson@neptunus.(none) +4 -14
    Remove Ndb_local_table_info

  ndb/src/ndbapi/DictCache.cpp
    1.17 06/01/04 16:06:05 msvensson@neptunus.(none) +53 -115
    Remove Ndb_local_table_info
    Collapse drop and release into one function.

  ndb/src/kernel/blocks/dbdict/Dbdict.hpp
    1.17 06/01/04 16:06:05 msvensson@neptunus.(none) +4 -1
    Broadcast ALTER_TABLE_REP when table has been dropped

  ndb/src/kernel/blocks/dbdict/Dbdict.cpp
    1.59 06/01/04 16:06:05 msvensson@neptunus.(none) +26 -5
    Broadcast ALTER_TABLE_REP when table has been dropped

  ndb/include/ndbapi/NdbDictionary.hpp
    1.52 06/01/04 16:06:05 msvensson@neptunus.(none) +31 -5
    Add new functions for retrieving "global" table and index handles.

  mysql-test/t/ndb_multi.test
    1.6 06/01/04 16:06:05 msvensson@neptunus.(none) +1 -1
    Update expected error, use MySQL builtin.

  mysql-test/r/ndb_alter_table2.result
    1.0 06/01/04 16:06:05 msvensson@neptunus.(none) +0 -0
    BitKeeper file
/home/msvensson/mysql/bug13228/my50-bug13228/mysql-test/r/ndb_alter_table2.result

  mysql-test/t/ndb_alter_table.test
    1.30 06/01/04 16:06:04 msvensson@neptunus.(none) +1 -3
    Will work as long as no check is made if table version is ok, when taken out from open
table cache.

  mysql-test/r/ndb_multi.result
    1.4 06/01/04 16:06:04 msvensson@neptunus.(none) +1 -1
    Update error message

  mysql-test/r/ndb_alter_table.result
    1.32 06/01/04 16:06:04 msvensson@neptunus.(none) +2 -1
    Will work as long as no check is made if table version is ok, when taken out from open
table cache.

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	msvensson
# Host:	neptunus.(none)
# Root:	/home/msvensson/mysql/bug13228/my50-bug13228

--- 1.31/mysql-test/r/ndb_alter_table.result	2005-11-17 15:34:16 +01:00
+++ 1.32/mysql-test/r/ndb_alter_table.result	2006-01-04 16:06:04 +01:00
@@ -192,7 +192,8 @@
 insert into t3 values (1);
 commit;
 select * from t3;
-ERROR HY000: Can't lock file (errno: 155)
+a
+1
 select * from t4;
 a
 1

--- 1.29/mysql-test/t/ndb_alter_table.test	2005-11-18 15:54:45 +01:00
+++ 1.30/mysql-test/t/ndb_alter_table.test	2006-01-04 16:06:04 +01:00
@@ -197,10 +197,8 @@
 # This should work as transaction is ongoing...
 delete from t3;
 insert into t3 values (1);
-commit; 
+commit;
 
-# This should fail as its a new transaction
---error 1015
 select * from t3;
 select * from t4;
 drop table t4;

--- 1.51/ndb/include/ndbapi/NdbDictionary.hpp	2005-08-18 14:09:05 +02:00
+++ 1.52/ndb/include/ndbapi/NdbDictionary.hpp	2006-01-04 16:06:05 +01:00
@@ -1171,6 +1171,36 @@
     const Table * getTable(const char * name) const;
 
     /**
+     * Get table with given name and increase reference counter,
+     * thus keeping the Table until application calls releaseGlobalTable
+     * @param name   Name of table to get
+     * @return table if successful otherwise NULL.
+     */
+    const Table * getGlobalTable(const char * name) const ;
+
+    /**
+     * Release table, decrease reference counter
+     * @param tab Table to release
+     */
+    void releaseGlobalTable(const Table *tab) const;
+
+    /**
+     * Get index with given name and increase reference counter,
+     * thus keeping the Index until application calls releaseGlobalIndex
+     * @param name  Name of table to get
+     * @return index if successful otherwise NULL.
+     */
+    const Index * getGlobalIndex(const char * indexName,
+				 const char * tableName) const;
+
+    /**
+     * Release index, decrease reference counter
+     * @param idx - Index to release
+     */
+    void releaseGlobalIndex(const Index *idx) const;
+
+
+    /**
      * Get index with given name, NULL if undefined
      * @param indexName  Name of index to get.
      * @param tableName  Name of table that index belongs to.
@@ -1323,12 +1353,8 @@
     Dictionary(NdbDictionaryImpl&);
     const Table * getIndexTable(const char * indexName, 
 				const char * tableName) const;
-  public:
-#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
-    const Table * getTable(const char * name, void **data) const;
-    void set_local_table_data_size(unsigned sz);
-#endif
   };
+
 };
 
 class NdbOut& operator <<(class NdbOut& out, const
NdbDictionary::Column& col);

--- 1.58/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2005-10-17 09:29:36 +02:00
+++ 1.59/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2006-01-04 16:06:05 +01:00
@@ -3886,7 +3886,6 @@
   sendSignal(coordinatorRef, GSN_ALTER_TAB_CONF, signal, 
 	       AlterTabConf::SignalLength, JBB);
 
-
   {
     ApiBroadcastRep* api= (ApiBroadcastRep*)signal->getDataPtrSend();
     api->gsn = GSN_ALTER_TABLE_REP;
@@ -3896,12 +3895,12 @@
     rep->tableId = tabPtr.p->tableId;
     rep->tableVersion = alter_table_dec_schema_version(tabPtr.p->tableVersion);
     rep->changeType = AlterTableRep::CT_ALTERED;
-    
+
     LinearSectionPtr ptr[3];
     ptr[0].p = (Uint32*)alterTabPtr.p->previousTableName;
     ptr[0].sz = (sizeof(alterTabPtr.p->previousTableName) + 3) >> 2;
-    
-    sendSignal(QMGR_REF, GSN_API_BROADCAST_REP, signal, 
+
+    sendSignal(QMGR_REF, GSN_API_BROADCAST_REP, signal,
 	       ApiBroadcastRep::SignalLength + AlterTableRep::SignalLength,
 	       JBB, ptr,1);
   }
@@ -5421,7 +5420,9 @@
   dropTabPtr.p->m_coordinatorRef = reference();
   dropTabPtr.p->m_coordinatorData.m_gsn = GSN_PREP_DROP_TAB_REQ;
   dropTabPtr.p->m_coordinatorData.m_block = 0;
-  
+  strncpy(dropTabPtr.p->dropTableName,
+	  tablePtr.p->tableName, MAX_TAB_NAME_SIZE);
+
   Mutex mutex(signal, c_mutexMgr, dropTabPtr.p->m_define_backup_mutex);
   Callback c = { safe_cast(&Dbdict::dropTable_backup_mutex_locked),
 		 dropTabPtr.p->key};
@@ -5676,6 +5677,26 @@
   Uint32 ref = dropTabPtr.p->m_request.senderRef;
   sendSignal(ref, GSN_DROP_TABLE_CONF, signal, 
 	     DropTableConf::SignalLength, JBB);
+
+  // Inform all API's that table has been dropped
+  {
+    ApiBroadcastRep* api= (ApiBroadcastRep*)signal->getDataPtrSend();
+    api->gsn = GSN_ALTER_TABLE_REP;
+    api->minVersion = MAKE_VERSION(4,1,15);
+
+    AlterTableRep* rep = (AlterTableRep*)api->theData;
+    rep->tableId = dropTabPtr.p->m_request.tableId;
+    rep->tableVersion = dropTabPtr.p->m_request.tableVersion;
+    rep->changeType = AlterTableRep::CT_DROPPED;
+
+    LinearSectionPtr ptr[3];
+    ptr[0].p = (Uint32*)dropTabPtr.p->dropTableName;
+    ptr[0].sz = (sizeof(dropTabPtr.p->dropTableName) + 3) >> 2;
+
+    sendSignal(QMGR_REF, GSN_API_BROADCAST_REP, signal,
+	       ApiBroadcastRep::SignalLength + AlterTableRep::SignalLength,
+	       JBB, ptr,1);
+  }
 
   c_opDropTable.release(dropTabPtr);
   c_blockState = BS_IDLE;

--- 1.16/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2005-09-15 15:00:32 +02:00
+++ 1.17/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2006-01-04 16:06:05 +01:00
@@ -927,7 +927,10 @@
     void setErrorCode(Uint32 c){ if(m_errorCode == 0) m_errorCode = c;}
 
     MutexHandle2<BACKUP_DEFINE_MUTEX> m_define_backup_mutex;
-    
+
+    /* Used when broadcasting ALTER_TABLE_REP to API's */
+    char dropTableName[MAX_TAB_NAME_SIZE];
+
     /**
      * When sending stuff around
      */

--- 1.16/ndb/src/ndbapi/DictCache.cpp	2005-11-22 10:34:33 +01:00
+++ 1.17/ndb/src/ndbapi/DictCache.cpp	2006-01-04 16:06:05 +01:00
@@ -24,32 +24,7 @@
 static NdbTableImpl f_invalid_table;
 static NdbTableImpl f_altered_table;
 
-Ndb_local_table_info *
-Ndb_local_table_info::create(NdbTableImpl *table_impl, Uint32 sz)
-{
-  Uint32 tot_size= sizeof(Ndb_local_table_info) - sizeof(Uint64)
-    + ((sz+7) & ~7); // round to Uint64
-  void *data= malloc(tot_size);
-  if (data == 0)
-    return 0;
-  memset(data, 0, tot_size);
-  new (data) Ndb_local_table_info(table_impl);
-  return (Ndb_local_table_info *) data;
-}
 
-void Ndb_local_table_info::destroy(Ndb_local_table_info *info)
-{
-  free((void *)info);
-}
-
-Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl)
-{
-  m_table_impl= table_impl;
-}
-
-Ndb_local_table_info::~Ndb_local_table_info()
-{
-}
 
 LocalDictCache::LocalDictCache(){
   m_tableHash.createHashTable();
@@ -59,24 +34,22 @@
   m_tableHash.releaseHashTable();
 }
 
-Ndb_local_table_info *
-LocalDictCache::get(const char * name){
+NdbTableImpl *
+LocalDictCache::get(const char *name){
   const Uint32 len = strlen(name);
   return m_tableHash.getData(name, len);
 }
 
 void
-LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
-  const Uint32 id = tab_info->m_table_impl->m_tableId;
-
-  m_tableHash.insertKey(name, strlen(name), id, tab_info);
+LocalDictCache::put(const char *name, NdbTableImpl *impl){
+  const Uint32 id = impl->m_tableId;
+  m_tableHash.insertKey(name, strlen(name), id, impl);
 }
 
 void
 LocalDictCache::drop(const char * name){
-  Ndb_local_table_info *info= m_tableHash.deleteKey(name, strlen(name));
-  DBUG_ASSERT(info != 0);
-  Ndb_local_table_info::destroy(info);
+  NdbTableImpl *impl= m_tableHash.deleteKey(name, strlen(name));
+  DBUG_ASSERT(impl != 0);
 }
 
 #ifndef DBUG_OFF
@@ -86,27 +59,26 @@
   DBUG_ENTER("LocalDictCache::printCache");
   DBUG_LOCK_FILE;
 
-  NdbElement_t<Ndb_local_table_info> * curr = m_tableHash.getNext(0);
+  NdbElement_t<NdbTableImpl> * curr = m_tableHash.getNext(0);
   while(curr != 0){
     fprintf(DBUG_FILE, "* %s len: %d, hash: %d, lk: %d\n",
             (char*)curr->str, curr->len, curr->hash, curr->localkey1);
     if (curr->theData){
-      Ndb_local_table_info *info = curr->theData;
-        NdbTableImpl *impl= info->m_table_impl;
-        if(impl != 0)
-          fprintf(DBUG_FILE, "     tabId: %d, name: %s, ver: %d, status: %s\n",
-                  impl->m_tableId, impl->m_internalName.c_str(),
-                  impl->m_version,
-                  impl->m_status ==
-                  NdbDictionary::Object::New ? "New" :
-                  (impl->m_status ==
-                   NdbDictionary::Object::Changed ? "Changed" :
-                   (impl->m_status ==
-                    NdbDictionary::Object::Retrieved ? "Retrieved" :
-                    (impl->m_status ==
-                     NdbDictionary::Object::Invalid ? "Invalid" :
-                     (impl->m_status ==
-                      NdbDictionary::Object::Altered ? "Altered" : "?" )))));
+      NdbTableImpl *impl = curr->theData;
+      if(impl != 0)
+	fprintf(DBUG_FILE, "     tabId: %d, name: %s, ver: %d, status: %s\n",
+		impl->m_tableId, impl->m_internalName.c_str(),
+		impl->m_version,
+		impl->m_status ==
+		NdbDictionary::Object::New ? "New" :
+		(impl->m_status ==
+		 NdbDictionary::Object::Changed ? "Changed" :
+		 (impl->m_status ==
+		  NdbDictionary::Object::Retrieved ? "Retrieved" :
+		  (impl->m_status ==
+		   NdbDictionary::Object::Invalid ? "Invalid" :
+		   (impl->m_status ==
+		    NdbDictionary::Object::Altered ? "Altered" : "?" )))));
     }
     curr = m_tableHash.getNext(curr);
   }
@@ -168,10 +140,12 @@
     switch(ver->m_status){
     case OK:
       ver->m_refCount++;
+      DBUG_PRINT("info", ("Found valid table"));
       DBUG_EXECUTE("global", printCache(););
       DBUG_RETURN(ver->m_impl);
     case DROPPED:
       retreive = true; // Break loop
+      DBUG_PRINT("info", ("Dropped, break loop"));
       break;
     case RETREIVING:
       NdbCondition_WaitTimeout(m_waitForTableCondition, m_mutex, waitTime);
@@ -188,6 +162,7 @@
   tmp.m_status = RETREIVING;
   tmp.m_refCount = 1; // The one retreiving it
   versions->push_back(tmp);
+  DBUG_PRINT("info", ("Retrieving table..."));
   DBUG_EXECUTE("global", printCache(););
   DBUG_RETURN(0);
 }
@@ -260,68 +235,21 @@
 GlobalDictCache::drop(NdbTableImpl * tab)
 {
   DBUG_ENTER("GlobalDictCache::drop");
-  DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
-
-  unsigned i;
-  const Uint32 len = strlen(tab->m_internalName.c_str());
-  Vector<TableVersion> * vers =
-    m_tableHash.getData(tab->m_internalName.c_str(), len);
-  if(vers == 0){
-    // Should always tried to retreive it first
-    // and thus there should be a record
-    abort();
-  }
-
-  const Uint32 sz = vers->size();
-  if(sz == 0){
-    // Should always tried to retreive it first
-    // and thus there should be a record
-    abort();
-  }
-
-  for(i = 0; i < sz; i++){
-    TableVersion & ver = (* vers)[i];
-    if(ver.m_impl == tab){
-      if(ver.m_refCount == 0 || ver.m_status == RETREIVING ||
-	 ver.m_version != tab->m_version){
-	DBUG_PRINT("info", ("Dropping with refCount=%d status=%d impl=%p",
-                            ver.m_refCount, ver.m_status, ver.m_impl));
-	break;
-      }
-      DBUG_PRINT("info", ("Found table to drop, i: %d, name: %s",
-                          i, ver.m_impl->m_internalName.c_str()));
-      ver.m_refCount--;
-      ver.m_status = DROPPED;
-      if(ver.m_refCount == 0){
-        DBUG_PRINT("info", ("refCount is zero, deleting m_impl"))
-	delete ver.m_impl;
-	vers->erase(i);
-      }
-      DBUG_EXECUTE("global", printCache(););
-      DBUG_VOID_RETURN;
-    }
-  }
-
-  for(i = 0; i<sz; i++){
-    TableVersion & ver = (* vers)[i];
-    DBUG_PRINT("info", ("%d: version: %d refCount: %d status: %d impl: %p",
-                        i, ver.m_version, ver.m_refCount,
-                        ver.m_status, ver.m_impl));
-  }
-
-  abort();
+  DBUG_RETURN(release(tab, true));
 }
 
 void
-GlobalDictCache::release(NdbTableImpl * tab)
+GlobalDictCache::release(NdbTableImpl * tab, bool mark_as_dropped)
 {
+  unsigned i;
+  const char* internal_name= tab->m_internalName.c_str();
+  const Uint32 len = strlen(internal_name);
+
   DBUG_ENTER("GlobalDictCache::release");
-  DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
+  DBUG_PRINT("enter", ("internal_name: %s", internal_name));
 
-  unsigned i;
-  const Uint32 len = strlen(tab->m_internalName.c_str());
   Vector<TableVersion> * vers =
-    m_tableHash.getData(tab->m_internalName.c_str(), len);
+    m_tableHash.getData(internal_name, len);
   if(vers == 0){
     // Should always tried to retreive it first
     // and thus there should be a record
@@ -339,13 +267,22 @@
     TableVersion & ver = (* vers)[i];
     if(ver.m_impl == tab){
       if(ver.m_refCount == 0 || ver.m_status == RETREIVING ||
-	 ver.m_version != tab->m_version){
-	DBUG_PRINT("info", ("Releasing with refCount=%d status=%d impl=%p",
-                            ver.m_refCount, ver.m_status, ver.m_impl));
+	 ver.m_version != tab->m_version)
 	break;
-      }
 
+      DBUG_PRINT("info", ("Found table i: %d, name: %s",
+                          i, ver.m_impl->m_internalName.c_str()));
       ver.m_refCount--;
+
+      if (mark_as_dropped)
+	ver.m_status = DROPPED;
+
+      if (ver.m_refCount == 0){
+        DBUG_PRINT("info", ("refCount is zero, deleting m_impl"));
+	delete ver.m_impl;
+	vers->erase(i);
+      }
+
       DBUG_EXECUTE("global", printCache(););
       DBUG_VOID_RETURN;
     }
@@ -361,6 +298,7 @@
   abort();
 }
 
+
 void
 GlobalDictCache::alter_table_rep(const char * name,
 				 Uint32 tableId,
@@ -423,18 +361,18 @@
   DBUG_LOCK_FILE;
   NdbElement_t<Vector<TableVersion> > * curr = m_tableHash.getNext(0);
   while(curr != 0){
-    fprintf(DBUG_FILE, "* %s len: %d, hash: %d, lk: %d\n",
+    fprintf(DBUG_FILE, "* %s len: %d, hash: %u, lk: %d\n",
             (char*)curr->str, curr->len, curr->hash, curr->localkey1);
     if (curr->theData){
       Vector<TableVersion> * vers = curr->theData;
       const unsigned sz = vers->size();
       for(unsigned i = 0; i<sz ; i++){
         TableVersion tv= (*vers)[i];
-        fprintf(DBUG_FILE, " + %d ver: %d, refCount: %d, status: %s\n",
+        fprintf(DBUG_FILE, " - %d: ver: %d, refCount: %d, status: %s\n",
                 i, tv.m_version, tv.m_refCount,
-                tv.m_status == 0 ? "OK" :
-                (tv.m_status == 1 ? "DROPPED" :
-                 (tv.m_status == 2 ? "RETREIVING" : "?"))) ;
+                tv.m_status == OK ? "OK" :
+                (tv.m_status == DROPPED ? "DROPPED" :
+                 (tv.m_status == RETREIVING ? "RETREIVING" : "?")));
         NdbTableImpl *impl= tv.m_impl;
         if(impl != 0)
           fprintf(DBUG_FILE, "     tabId: %d, name: %s, ver: %d, status: %s\n",

--- 1.10/ndb/src/ndbapi/DictCache.hpp	2005-11-22 10:34:33 +01:00
+++ 1.11/ndb/src/ndbapi/DictCache.hpp	2006-01-04 16:06:05 +01:00
@@ -28,16 +28,6 @@
 #include <NdbCondition.h>
 #include "NdbLinHash.hpp"
 
-class Ndb_local_table_info {
-public:
-  static Ndb_local_table_info *create(NdbTableImpl *table_impl, Uint32 sz=0);
-  static void destroy(Ndb_local_table_info *);
-  NdbTableImpl *m_table_impl;
-  Uint64 m_local_data[1]; // Must be last member. Used to access extra space.
-private:
-  Ndb_local_table_info(NdbTableImpl *table_impl);
-  ~Ndb_local_table_info();
-};
 
 /**
  * A non thread safe dict cache
@@ -47,12 +37,12 @@
   LocalDictCache();
   ~LocalDictCache();
 
-  Ndb_local_table_info * get(const char * name);
+  NdbTableImpl *get(const char * name);
 
-  void put(const char * name, Ndb_local_table_info *);
+  void put(const char * name, NdbTableImpl *);
   void drop(const char * name);
 
-  NdbLinHash<Ndb_local_table_info> m_tableHash; // On name
+  NdbLinHash<NdbTableImpl> m_tableHash; // On name
 
 private:
 #ifndef DBUG_OFF
@@ -72,7 +62,7 @@
 
   NdbTableImpl* put(const char * name, NdbTableImpl *);
   void drop(NdbTableImpl *);
-  void release(NdbTableImpl *);
+  void release(NdbTableImpl *, bool mark_as_dropped=false);
 
   void alter_table_rep(const char * name,
 		       Uint32 tableId, Uint32 tableVersion, bool altered);

--- 1.54/ndb/src/ndbapi/Ndb.cpp	2005-06-09 16:13:21 +02:00
+++ 1.55/ndb/src/ndbapi/Ndb.cpp	2006-01-04 16:06:05 +01:00
@@ -766,11 +766,10 @@
   DBUG_ENTER("getAutoIncrementValue");
   BaseString internal_tabname(internalize_table_name(aTableName));
 
-  Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
-  if (info == 0)
+  NdbTableImpl *impl= theDictionary->getTableImpl(internal_tabname, false);
+  if (impl == 0)
     DBUG_RETURN(~(Uint64)0);
-  const NdbTableImpl *table= info->m_table_impl;
+  const NdbTableImpl *table= impl;
   Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
   DBUG_PRINT("info", ("value %ul", (ulong) tupleId));
   DBUG_RETURN(tupleId);
@@ -858,13 +857,12 @@
   DBUG_ENTER("setAutoIncrementValue");
   BaseString internal_tabname(internalize_table_name(aTableName));
 
-  Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
-  if (info == 0) {
+  NdbTableImpl *impl= theDictionary->getTableImpl(internal_tabname, false);
+  if (impl == 0) {
     theError= theDictionary->getNdbError();
     DBUG_RETURN(false);
   }
-  const NdbTableImpl* table= info->m_table_impl;
+  const NdbTableImpl* table= impl;
   DBUG_RETURN(setTupleIdInNdb(table->m_tableId, val, increase));
 }
 

--- 1.35/ndb/src/ndbapi/NdbDictionary.cpp	2005-06-02 14:32:00 +02:00
+++ 1.36/ndb/src/ndbapi/NdbDictionary.cpp	2006-01-04 16:06:05 +01:00
@@ -751,24 +751,30 @@
   return m_impl.alterTable(NdbTableImpl::getImpl(t));
 }
 
-const NdbDictionary::Table * 
-NdbDictionary::Dictionary::getTable(const char * name, void **data) const
+const NdbDictionary::Table *
+NdbDictionary::Dictionary::getTable(const char * name) const
 {
-  NdbTableImpl * t = m_impl.getTable(name, data);
+  NdbTableImpl * t = m_impl.getTable(name);
   if(t)
     return t->m_facade;
   return 0;
 }
 
-void NdbDictionary::Dictionary::set_local_table_data_size(unsigned sz)
+const NdbDictionary::Table *
+NdbDictionary::Dictionary::getGlobalTable(const char * name) const
 {
-  m_impl.m_local_table_data_size= sz;
+  NdbTableImpl * t = m_impl.getGlobalTable(name);
+  if(t)
+    return t->m_facade;
+  return 0;
 }
 
-const NdbDictionary::Table * 
-NdbDictionary::Dictionary::getTable(const char * name) const
+void NdbDictionary::Dictionary::releaseGlobalTable(const Table *tab) const
 {
-  return getTable(name, 0);
+  DBUG_ENTER("releaseGlobalTable");
+  if (tab)
+    m_impl.removeGlobalCachedObject(*((NdbTableImpl*)tab));
+  DBUG_VOID_RETURN;
 }
 
 void
@@ -808,6 +814,27 @@
   if(i)
     return i->m_facade;
   return 0;
+}
+
+const NdbDictionary::Index *
+NdbDictionary::Dictionary::getGlobalIndex(const char * indexName,
+                                          const char * tableName) const
+{
+  NdbIndexImpl * i = m_impl.getIndex(indexName, tableName, true);
+  if(i)
+    return i->m_facade;
+  return 0;
+}
+
+void NdbDictionary::Dictionary::releaseGlobalIndex(const Index *idx) const
+{
+  DBUG_ENTER("releaseGlobalTable");
+  if(idx)
+  {
+    assert(((NdbIndexImpl*)idx)->m_table != 0);
+    m_impl.removeGlobalCachedObject(* ((NdbIndexImpl*)idx)->m_table);
+  }
+  DBUG_VOID_RETURN;
 }
 
 void

--- 1.84/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2005-08-18 14:24:56 +02:00
+++ 1.85/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-01-04 16:06:05 +01:00
@@ -684,7 +684,6 @@
     m_ndb(ndb)
 {
   m_globalHash = 0;
-  m_local_table_data_size= 0;
 }
 
 NdbDictionaryImpl::NdbDictionaryImpl(Ndb &ndb,
@@ -695,27 +694,26 @@
     m_ndb(ndb)
 {
   m_globalHash = 0;
-  m_local_table_data_size= 0;
 }
 
 static int f_dictionary_count = 0;
 
 NdbDictionaryImpl::~NdbDictionaryImpl()
 {
-  NdbElement_t<Ndb_local_table_info> * curr = m_localHash.m_tableHash.getNext(0);
+  NdbElement_t<NdbTableImpl> * curr = m_localHash.m_tableHash.getNext(0);
   if(m_globalHash){
     while(curr != 0){
       m_globalHash->lock();
-      m_globalHash->release(curr->theData->m_table_impl);
-      Ndb_local_table_info::destroy(curr->theData);
+      NdbTableImpl* impl= curr->theData;
+      m_globalHash->release(impl);
       m_globalHash->unlock();
-      
+
       curr = m_localHash.m_tableHash.getNext(curr);
     }
-    
+
     m_globalHash->lock();
     if(--f_dictionary_count == 0){
-      delete NdbDictionary::Column::FRAGMENT; 
+      delete NdbDictionary::Column::FRAGMENT;
       delete NdbDictionary::Column::FRAGMENT_MEMORY;
       delete NdbDictionary::Column::ROW_COUNT;
       delete NdbDictionary::Column::COMMIT_COUNT;
@@ -734,50 +732,35 @@
   }
 }
 
-Ndb_local_table_info *
-NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
+NdbTableImpl *
+NdbDictionaryImpl::fetchTableImpl(const BaseString& internalTableName)
 {
   NdbTableImpl *impl;
 
   m_globalHash->lock();
-  impl = m_globalHash->get(internalTableName.c_str());
+  impl= m_globalHash->get(internalTableName.c_str());
   m_globalHash->unlock();
 
   if (impl == 0){
-    impl = m_receiver.getTable(internalTableName,
-			       m_ndb.usingFullyQualifiedNames());
+    impl= m_receiver.getTable(internalTableName,
+			      m_ndb.usingFullyQualifiedNames());
     m_globalHash->lock();
     m_globalHash->put(internalTableName.c_str(), impl);
     m_globalHash->unlock();
-    
+
     if(impl == 0){
       return 0;
     }
   }
 
-  Ndb_local_table_info *info=
-    Ndb_local_table_info::create(impl, m_local_table_data_size);
-
-  m_localHash.put(internalTableName.c_str(), info);
+  m_localHash.put(internalTableName.c_str(), impl);
 
   m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
   m_ndb.theLastTupleId[impl->getTableId()]  = ~0;
-  
-  return info;
-}
 
-#if 0
-bool
-NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
-{
-  if(tf != 0){
-    m_globalHash = &tf->m_globalDictCache;
-    return m_receiver.setTransporter(tf);
-  }
-  
-  return false;
+  return impl;
 }
-#endif
+
 
 bool
 NdbDictionaryImpl::setTransporter(class Ndb* ndb, 
@@ -815,32 +798,6 @@
   return getTable(m_ndb.externalizeTableName(internalName.c_str()));
 }
 
-#if 0
-bool
-NdbDictInterface::setTransporter(class TransporterFacade * tf)
-{
-  if(tf == 0)
-    return false;
-  
-  Guard g(tf->theMutexPtr);
-  
-  m_blockNumber = tf->open(this,
-			   execSignal,
-			   execNodeStatus);
-  
-  if ( m_blockNumber == -1 ) {
-    m_error.code= 4105;
-    return false; // no more free blocknumbers
-  }//if
-  Uint32 theNode = tf->ownId();
-  m_reference = numberToRef(m_blockNumber, theNode);
-  m_transporter = tf;
-  m_waiter.m_mutex = tf->theMutexPtr;
-
-  return true;
-}
-#endif
-
 bool
 NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf)
 {
@@ -1458,13 +1415,12 @@
   if (t.m_noOfBlobs == 0)
     return 0;
   // update table def from DICT
-  Ndb_local_table_info *info=
-    get_local_table_info(t.m_internalName,false);
-  if (info == NULL) {
+  NdbTableImpl *impl= getTableImpl(t.m_internalName, false);
+  if (impl == NULL) {
     m_error.code= 709;
     return -1;
   }
-  if (createBlobTables(*(info->m_table_impl)) != 0) {
+  if (createBlobTables(*(impl)) != 0) {
     int save_code = m_error.code;
     (void)dropTable(t);
     m_error.code= save_code;
@@ -1485,12 +1441,11 @@
     if (createTable(bt) != 0)
       return -1;
     // Save BLOB table handle
-    Ndb_local_table_info *info=
-      get_local_table_info(bt.m_internalName, false);
-    if (info == 0) {
+    NdbTableImpl *impl= getTableImpl(bt.m_internalName, false);
+    if (impl == 0) {
       return -1;
     }
-    c.m_blobTable = info->m_table_impl;
+    c.m_blobTable = impl;
   }
   
   return 0;
@@ -1534,8 +1489,8 @@
   const char * originalInternalName = internalName.c_str();
 
   DBUG_ENTER("NdbDictionaryImpl::alterTable");
-  Ndb_local_table_info * local = 0;
-  if((local= get_local_table_info(originalInternalName, false)) == 0)
+  NdbTableImpl *local = getTableImpl(originalInternalName, false);
+  if(local == NULL)
   {
     m_error.code = 709;
     DBUG_RETURN(-1);
@@ -1546,8 +1501,8 @@
   if(ret == 0){
     // Remove cached information and let it be refreshed at next access
     m_globalHash->lock();
-    local->m_table_impl->m_status = NdbDictionary::Object::Invalid;
-    m_globalHash->drop(local->m_table_impl);
+    local->m_status = NdbDictionary::Object::Invalid;
+    m_globalHash->drop(local);
     m_globalHash->unlock();
     m_localHash.drop(originalInternalName);
   }
@@ -2020,13 +1975,22 @@
 {
   const char * internalTableName = impl.m_internalName.c_str();
 
-  m_localHash.drop(internalTableName);  
+  m_localHash.drop(internalTableName);
+  return 0;
+}
+
+int
+NdbDictionaryImpl::removeGlobalCachedObject(NdbTableImpl & impl)
+{
+  const char * internalTableName = impl.m_internalName.c_str();
+
   m_globalHash->lock();
   m_globalHash->release(&impl);
   m_globalHash->unlock();
   return 0;
 }
 
+
 /*****************************************************************
  * Get index info
  */
@@ -2034,13 +1998,12 @@
 NdbDictionaryImpl::getIndexImpl(const char * externalName,
 				const BaseString& internalName)
 {
-  Ndb_local_table_info * info = get_local_table_info(internalName,
-						     false);
-  if(info == 0){
+  NdbTableImpl *impl = getTableImpl(internalName, false);
+  if(impl == 0){
     m_error.code = 4243;
     return 0;
   }
-  NdbTableImpl * tab = info->m_table_impl;
+  NdbTableImpl * tab = impl;
 
   if(tab->m_indexType == NdbDictionary::Index::Undefined){
     // Not an index
@@ -2286,40 +2249,39 @@
 }
 
 int
-NdbDictionaryImpl::dropIndex(NdbIndexImpl & impl, const char * tableName)
+NdbDictionaryImpl::dropIndex(NdbIndexImpl & index_impl, const char * tableName)
 {
-  const char * indexName = impl.getName();
-  if (tableName || m_ndb.usingFullyQualifiedNames()) {
-    NdbTableImpl * timpl = impl.m_table;
-    
-    if (timpl == 0) {
-      m_error.code = 709;
-      return -1;
-    }
-
-    const BaseString internalIndexName((tableName)
-      ?
-      m_ndb.internalize_index_name(getTable(tableName), indexName)
-      :
-      m_ndb.internalize_table_name(indexName)); // Index is also a table
+  const char * indexName = index_impl.getName();
+  DBUG_ENTER("NdbDictionaryImpl::dropIndex");
+  DBUG_PRINT("enter", ("indexName: %s", indexName));
 
-    if(impl.m_status == NdbDictionary::Object::New){
-      return dropIndex(indexName, tableName);
-    }
+  if (!(tableName || m_ndb.usingFullyQualifiedNames()))
+  {
+    m_error.code = 4243;
+    DBUG_RETURN(-1);
+  }
 
-    int ret = m_receiver.dropIndex(impl, *timpl);
-    if(ret == 0){
-      m_localHash.drop(internalIndexName.c_str());
-      m_globalHash->lock();
-      impl.m_table->m_status = NdbDictionary::Object::Invalid;
-      m_globalHash->drop(impl.m_table);
-      m_globalHash->unlock();
-    }
-    return ret;
+  NdbTableImpl * index_tab_impl = index_impl.m_table;
+  if (index_tab_impl == 0) {
+    m_error.code = 709;
+    DBUG_RETURN(-1);
   }
 
-  m_error.code = 4243;
-  return -1;
+  /* If the table only exists in in this NdbApi */
+  if(index_impl.m_status == NdbDictionary::Object::New)
+    DBUG_RETURN(dropIndex(indexName, tableName));
+
+  /* Drop the index from NDB */
+  int ret = m_receiver.dropIndex(index_impl, *index_tab_impl);
+  if(ret == 0){
+    /* Drop from NDB suceeded, release in local and global cache */
+    m_localHash.drop(index_impl.m_internalName.c_str());
+    m_globalHash->lock();
+    index_tab_impl->m_status = NdbDictionary::Object::Invalid;
+    m_globalHash->drop(index_tab_impl);
+    m_globalHash->unlock();
+  }
+  DBUG_RETURN(ret);
 }
 
 int

--- 1.33/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2005-06-02 14:32:00 +02:00
+++ 1.34/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-01-04 16:06:05 +01:00
@@ -258,7 +258,7 @@
     m_transporter= NULL;
   }
   ~NdbDictInterface();
-  
+
   bool setTransporter(class Ndb * ndb, class TransporterFacade * tf);
   bool setTransporter(class TransporterFacade * tf);
   
@@ -387,6 +387,7 @@
   int dropBlobTables(NdbTableImpl &);
   int invalidateObject(NdbTableImpl &);
   int removeCachedObject(NdbTableImpl &);
+  int removeGlobalCachedObject(NdbTableImpl &);
 
   int createIndex(NdbIndexImpl &ix);
   int dropIndex(const char * indexName, 
@@ -396,7 +397,7 @@
 			       NdbTableImpl * table);
 
   int createEvent(NdbEventImpl &);
-  int dropEvent(const char * eventName);
+  int dropEvent(const char *eventName);
 
   int executeSubscribeEvent(NdbEventImpl &);
   int stopSubscribeEvent(NdbEventImpl &);
@@ -404,17 +405,19 @@
   int listObjects(List& list, NdbDictionary::Object::Type type);
   int listIndexes(List& list, Uint32 indexId);
 
-  NdbTableImpl * getTable(const char * tableName, void **data= 0);
-  Ndb_local_table_info* get_local_table_info(
-    const BaseString& internalTableName, bool do_add_blob_tables);
-  NdbIndexImpl * getIndex(const char * indexName,
-			  const char * tableName);
-  NdbEventImpl * getEvent(const char * eventName);
-  NdbEventImpl * getEventImpl(const char * internalName);
+  NdbTableImpl *getTable(const char *tableName);
+  NdbTableImpl *getGlobalTable(const char *tableName);
+  NdbTableImpl *getTableImpl(const BaseString& internalTableName,
+			     bool do_add_blob_tables,
+			     bool global= false);
+  NdbIndexImpl *getIndex(const char *indexName,
+			  const char *tableName,
+			  bool global= false);
+  NdbEventImpl *getEvent(const char *eventName);
+  NdbEventImpl *getEventImpl(const char *internalName);
   
   const NdbError & getNdbError() const;
   NdbError m_error;
-  Uint32 m_local_table_data_size;
 
   LocalDictCache m_localHash;
   GlobalDictCache * m_globalHash;
@@ -428,7 +431,7 @@
 private:
   NdbIndexImpl * getIndexImpl(const char * name,
                               const BaseString& internalName);
-  Ndb_local_table_info * fetchGlobalTableImpl(const BaseString& internalName);
+  NdbTableImpl* fetchTableImpl(const BaseString& internalName);
 };
 
 inline
@@ -637,59 +640,95 @@
 
 inline
 NdbTableImpl *
-NdbDictionaryImpl::getTable(const char * table_name, void **data)
+NdbDictionaryImpl::getTable(const char * table_name)
 {
+  DBUG_ENTER("NdbDictionaryImpl::getTable");
   const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
-  Ndb_local_table_info *info=
-    get_local_table_info(internal_tabname, true);
-  if (info == 0)
-    return 0;
+  NdbTableImpl *impl= getTableImpl(internal_tabname, true);
+  if (impl == 0)
+    DBUG_RETURN(0);
 
-  if (data)
-    *data= info->m_local_data;
-
-  return info->m_table_impl;
+  DBUG_RETURN(impl);
 }
 
+
 inline
-Ndb_local_table_info * 
-NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName,
-					bool do_add_blob_tables)
+NdbTableImpl *
+NdbDictionaryImpl::getTableImpl(const BaseString& internalTableName,
+			   bool do_add_blob_tables,
+			   bool global)
 {
-  Ndb_local_table_info *info= m_localHash.get(internalTableName.c_str());
-  if (info == 0) {
-    info= fetchGlobalTableImpl(internalTableName);
-    if (info == 0) {
-      return 0;
+  NdbTableImpl *impl;
+  DBUG_ENTER("NdbDictionaryImpl::getTableImpl");
+  do
+  {
+    impl= m_localHash.get(internalTableName.c_str());
+    if (impl == 0) {
+      impl= fetchTableImpl(internalTableName);
+      if (impl == 0) {
+        DBUG_RETURN(0);
+      }
+    }
+    else if (global)
+    {
+      if (impl->m_status != NdbDictionary::Object::Retrieved)
+      {
+        m_globalHash->lock();
+        m_globalHash->release(impl);
+        m_globalHash->unlock();
+        m_localHash.drop(internalTableName.c_str());
+        impl= 0;
+      }
     }
+  } while(impl == 0);
+
+  if (global) {
+    DBUG_PRINT("info", ("inc refcounter in global cache"));
+    m_globalHash->lock();
+    m_globalHash->get(internalTableName.c_str());
+    m_globalHash->unlock();
   }
-  if (do_add_blob_tables && info->m_table_impl->m_noOfBlobs)
-    addBlobTables(*(info->m_table_impl));
-  
-  return info; // autoincrement already initialized
+
+  if (do_add_blob_tables && impl->m_noOfBlobs)
+    addBlobTables(*(impl));
+
+  DBUG_RETURN(impl); // autoincrement already initialized
+}
+
+inline
+NdbTableImpl *
+NdbDictionaryImpl::getGlobalTable(const char * table_name)
+{
+  const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
+  NdbTableImpl *impl=
+    getTableImpl(internal_tabname, true, true);
+  if (impl == 0)
+    return 0;
+
+  return impl;
 }
 
 inline
-NdbIndexImpl * 
+NdbIndexImpl *
 NdbDictionaryImpl::getIndex(const char * index_name,
-			    const char * table_name)
+			    const char * table_name,
+			    bool global)
 {
   if (table_name || m_ndb.usingFullyQualifiedNames())
   {
     const BaseString internal_indexname(
       (table_name)
       ?
-      m_ndb.internalize_index_name(getTable(table_name), index_name)
+      m_ndb.internalize_index_name(getTable(table_name), index_name)...
       :
       m_ndb.internalize_table_name(index_name)); // Index is also a table
 
     if (internal_indexname.length())
     {
-      Ndb_local_table_info * info=
-        get_local_table_info(internal_indexname, false);
-      if (info)
+      NdbTableImpl *impl= getTableImpl(internal_indexname, false, global);
+      if (impl)
       {
-	NdbTableImpl * tab= info->m_table_impl;
+	NdbTableImpl *tab= impl;
         if (tab->m_index == 0)
           tab->m_index= getIndexImpl(index_name, internal_indexname);
         if (tab->m_index != 0)

--- 1.221/sql/ha_ndbcluster.cc	2005-11-18 09:54:38 +01:00
+++ 1.222/sql/ha_ndbcluster.cc	2006-01-04 16:06:05 +01:00
@@ -44,6 +44,8 @@
 static const int max_transactions= 2;
 
 static const char *ha_ndb_ext=".ndb";
+static const char* unique_suffix= "$unique";
+
 
 static int ndbcluster_close_connection(THD *thd);
 static int ndbcluster_commit(THD *thd, bool all);
@@ -78,8 +80,6 @@
 #define NDB_FAILED_AUTO_INCREMENT ~(Uint64)0
 #define NDB_AUTO_INCREMENT_RETRIES 10
 
-#define NDB_INVALID_SCHEMA_OBJECT 241
-
 #define ERR_PRINT(err) \
   DBUG_PRINT("error", ("%d  message: %s", err.code, err.message))
 
@@ -107,17 +107,20 @@
 // Table lock handling
 static HASH ndbcluster_open_tables;
 
-static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
-                                my_bool not_used __attribute__((unused)));
-static NDB_SHARE *get_share(const char *table_name);
-static void free_share(NDB_SHARE *share);
+static byte *ndb_share_get_key(NDB_SHARE *, uint *, my_bool);
+static NDB_SHARE *get_share(const char *);
+static void free_share(NDB_SHARE *);
+
+static byte* thd_share_get_key(THD_NDB_SHARE *, uint *, my_bool);
+static THD_NDB_SHARE *get_thd_share(Thd_ndb*, const char *);
+static void free_thd_share(THD_NDB_SHARE *);
 
 static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
 static int unpackfrm(const void **data, uint *len,
                      const void* pack_data);
 
-static int ndb_get_table_statistics(Ndb*, const char *, 
-                                    struct Ndb_statistics *);
+static int ndb_get_table_statistics(Ndb*, const NDBTAB *,
+				    struct Ndb_statistics *);
 
 // Util thread variables
 static pthread_t ndb_util_thread;
@@ -170,6 +173,7 @@
   {NullS, NullS, SHOW_LONG}
 };
 
+
 /*
   Error handling functions
 */
@@ -205,6 +209,7 @@
   { 832, HA_ERR_RECORD_FILE_FULL, 1 },
 
   { 284, HA_ERR_TABLE_DEF_CHANGED, 0 },
+  { 241, HA_ERR_TABLE_DEF_CHANGED, 0 },
 
   { 0, 1, 0 },
 
@@ -229,6 +234,33 @@
 }
 
 
+/*
+  Get tab andb db name from path
+
+  SYNOPSIS
+  split_path
+  path    - path to split
+  dbname  - buffer for name of db
+  tabname - buffer for name  of tab
+*/
+
+static void split_path(const char *path,
+		       char *dbname, char* tabname)
+{
+  DBUG_ENTER("split_path");
+  DBUG_PRINT("enter", ("path: %s", path));
+  char buf[FN_REFLEN];
+  uint length= dirname_part(buf, path);
+  buf[length-1]= 0;
+  strncpy(tabname, path+length, FN_HEADLEN);
+  *fn_ext(tabname)= 0;
+
+  strncpy(dbname, buf+dirname_length(buf), FN_HEADLEN);
+
+  DBUG_PRINT("exit", ("dbname: %s, tabname: %s", dbname, tabname));
+  DBUG_VOID_RETURN;
+}
+
 
 inline
 int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans)
@@ -282,21 +314,35 @@
                         h->m_force_send);
 }
 
+
+static byte* thd_share_get_key(THD_NDB_SHARE *thd_share, uint *length,
+			       my_bool not_used __attribute__((unused)))
+{
+  *length=thd_share->table_name_length;
+  return (byte*) thd_share->table_name;
+}
+
+
 /*
   Place holder for ha_ndbcluster thread specific data
 */
+
 Thd_ndb::Thd_ndb()
 {
   ndb= new Ndb(g_ndb_cluster_connection, "");
   lock_count= 0;
-  count= 0;
   all= NULL;
   stmt= NULL;
   error= 0;
+  (void) hash_init(&shared_tables,system_charset_info,32,0,0,
+                   (hash_get_key) thd_share_get_key,
+		   (hash_free_key)free_thd_share,0);
 }
 
+
 Thd_ndb::~Thd_ndb()
 {
+  hash_free(&shared_tables);
   if (ndb)
   {
 #ifndef DBUG_OFF
@@ -317,206 +363,178 @@
   changed_tables.empty();
 }
 
-inline
-Thd_ndb *
-get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot]; }
 
-inline
-void
-set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd->ha_data[ndbcluster_hton.slot]= thd_ndb;
}
+inline Thd_ndb *get_thd_ndb(THD *thd)
+{
+  return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot];
+}
+
+
+inline void set_thd_ndb(THD *thd, Thd_ndb *thd_ndb)
+{
+  thd->ha_data[ndbcluster_hton.slot]= thd_ndb;
+}
+
 
 inline
-Ndb *ha_ndbcluster::get_ndb()
+Ndb *ha_ndbcluster::get_ndb(THD *thd)
 {
-  return get_thd_ndb(current_thd)->ndb;
+  return get_thd_ndb(thd)->ndb;
 }
 
-/*
- * manage uncommitted insert/deletes during transactio to get records correct
- */
 
-struct Ndb_local_table_statistics {
-  int no_uncommitted_rows_count;
-  ulong last_count;
-  ha_rows records;
-};
 
-void ha_ndbcluster::set_rec_per_key()
+Thd_ndb* ha_ndbcluster::seize_thd_ndb()
 {
-  DBUG_ENTER("ha_ndbcluster::get_status_const");
-  for (uint i=0 ; i < table->s->keys ; i++)
+  Thd_ndb *thd_ndb;
+  DBUG_ENTER("seize_thd_ndb");
+
+  thd_ndb= new Thd_ndb();
+  if (thd_ndb->ndb->init(max_transactions) != 0)
   {
-    table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1;
+    ERR_PRINT(thd_ndb->ndb->getNdbError());
+    delete thd_ndb;
+    thd_ndb= NULL;
   }
-  DBUG_VOID_RETURN;
+  DBUG_RETURN(thd_ndb);
 }
 
-void ha_ndbcluster::records_update()
+
+/*
+  If this thread already has a Thd_ndb object allocated
+  in current THD, reuse it. Otherwise
+  seize a Thd_ndb object, assign it to current THD and use it.
+*/
+
+Thd_ndb *check_thd_ndb_in_thd(THD* thd)
 {
-  if (m_ha_not_exact_count)
-    return;
-  DBUG_ENTER("ha_ndbcluster::records_update");
-  struct Ndb_local_table_statistics *info= 
-    (struct Ndb_local_table_statistics *)m_table_info;
-  DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
-                      ((const NDBTAB *)m_table)->getTableId(),
-                      info->no_uncommitted_rows_count));
-  //  if (info->records == ~(ha_rows)0)
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  if (!thd_ndb)
   {
-    Ndb *ndb= get_ndb();
-    struct Ndb_statistics stat;
-    if (ndb_get_table_statistics(ndb, m_tabname, &stat) == 0){
-      mean_rec_length= stat.row_size;
-      data_file_length= stat.fragment_memory;
-      info->records= stat.row_count;
-    }
+    if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
+      return NULL;
+    set_thd_ndb(thd, thd_ndb);
   }
+  return thd_ndb;
+}
+
+Ndb* check_ndb_in_thd(THD* thd)
+{
+  Thd_ndb *thd_ndb ;
+  if (!(thd_ndb= check_thd_ndb_in_thd(thd)))
+    return NULL;
+  return thd_ndb->ndb;
+}
+
+
+/*
+  Allocate and init a new Ndb object
+*/
+
+Ndb* get_new_ndb(const char* dbname)
+{
+  Ndb* ndb= new Ndb(g_ndb_cluster_connection, dbname);
+  if (ndb->init() != 0)
   {
-    THD *thd= current_thd;
-    if (get_thd_ndb(thd)->error)
-      info->no_uncommitted_rows_count= 0;
+    delete ndb;
+    return NULL;
   }
-  records= info->records+ info->no_uncommitted_rows_count;
-  DBUG_VOID_RETURN;
+  return ndb;
 }
 
-void ha_ndbcluster::no_uncommitted_rows_execute_failure()
+
+int ndbcluster_close_connection(THD *thd)
 {
-  if (m_ha_not_exact_count)
-    return;
-  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_execute_failure");
-  get_thd_ndb(current_thd)->error= 1;
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  DBUG_ENTER("ndbcluster_close_connection");
+
+  if (thd_ndb)
+  {
+    delete thd_ndb;
+    set_thd_ndb(thd, NULL); // not strictly required but does not hurt either
+  }
+  DBUG_RETURN(0);
+}
+
+
+void ha_ndbcluster::set_rec_per_key()
+{
+  DBUG_ENTER("ha_ndbcluster::set_rec_per_key");
+  for (uint i=0 ; i < table->s->keys ; i++)
+    table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1;
   DBUG_VOID_RETURN;
 }
 
-void ha_ndbcluster::no_uncommitted_rows_init(THD *thd)
+
+/*
+  Reset uncommitted_rows of all tables this
+ */
+void Thd_ndb::uncommitted_rows_init()
 {
-  if (m_ha_not_exact_count)
-    return;
-  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init");
-  struct Ndb_local_table_statistics *info= 
-    (struct Ndb_local_table_statistics *)m_table_info;
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  if (info->last_count != thd_ndb->count)
+  uint i;
+  DBUG_ENTER("Thd_ndb::uncommitted_rows_init");
+
+  /* Loop through all tables and reset commit count */
+  for (i= 0 ; i < shared_tables.records ; i++)
   {
-    info->last_count= thd_ndb->count;
-    info->no_uncommitted_rows_count= 0;
-    info->records= ~(ha_rows)0;
-    DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
-                        ((const NDBTAB *)m_table)->getTableId(),
-                        info->no_uncommitted_rows_count));
+    THD_NDB_SHARE* thd_share= (THD_NDB_SHARE*)hash_element(&shared_tables, i);
+    thd_share->uncommitted_rows= 0;
   }
   DBUG_VOID_RETURN;
 }
 
-void ha_ndbcluster::no_uncommitted_rows_update(int c)
+
+void ha_ndbcluster::uncommitted_rows_execute_failure()
 {
   if (m_ha_not_exact_count)
     return;
-  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update");
-  struct Ndb_local_table_statistics *info=
-    (struct Ndb_local_table_statistics *)m_table_info;
-  info->no_uncommitted_rows_count+= c;
-  DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
-                      ((const NDBTAB *)m_table)->getTableId(),
-                      info->no_uncommitted_rows_count));
+  DBUG_ENTER("ha_ndbcluster::uncommitted_rows_execute_failure");
+  DBUG_ASSERT(m_thd_share);
+  m_thd_share->uncommitted_rows= 0;
   DBUG_VOID_RETURN;
 }
 
-void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
+
+void ha_ndbcluster::uncommitted_rows_init(Thd_ndb *thd_ndb)
 {
   if (m_ha_not_exact_count)
     return;
-  DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset");
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  thd_ndb->count++;
-  thd_ndb->error= 0;
-  DBUG_VOID_RETURN;
+  return thd_ndb->uncommitted_rows_init();
 }
 
-/*
-  Take care of the error that occured in NDB
-
-  RETURN
-    0   No error
-    #   The mapped error code
-*/
 
-void ha_ndbcluster::invalidate_dictionary_cache(bool global)
+void ha_ndbcluster::uncommitted_rows_update(int c)
 {
-  NDBDICT *dict= get_ndb()->getDictionary();
-  DBUG_ENTER("invalidate_dictionary_cache");
-  DBUG_PRINT("info", ("invalidating %s", m_tabname));
-
-  if (global)
-  {
-    const NDBTAB *tab= dict->getTable(m_tabname);
-    if (!tab)
-      DBUG_VOID_RETURN;
-    if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
-    {
-      // Global cache has already been invalidated
-      dict->removeCachedTable(m_tabname);
-      global= FALSE;
-    }
-    else
-      dict->invalidateTable(m_tabname);
-  }
-  else
-    dict->removeCachedTable(m_tabname);
-  table->s->version=0L;			/* Free when thread is ready */
-  /* Invalidate indexes */
-  for (uint i= 0; i < table->s->keys; i++)
-  {
-    NDBINDEX *index = (NDBINDEX *) m_index[i].index;
-    NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
-    NDB_INDEX_TYPE idx_type= m_index[i].type;
-
-    switch (idx_type) {
-    case PRIMARY_KEY_ORDERED_INDEX:
-    case ORDERED_INDEX:
-      if (global)
-        dict->invalidateIndex(index->getName(), m_tabname);
-      else
-        dict->removeCachedIndex(index->getName(), m_tabname);
-      break;
-    case UNIQUE_ORDERED_INDEX:
-      if (global)
-        dict->invalidateIndex(index->getName(), m_tabname);
-      else
-        dict->removeCachedIndex(index->getName(), m_tabname);
-    case UNIQUE_INDEX:
-      if (global)
-        dict->invalidateIndex(unique_index->getName(), m_tabname);
-      else
-        dict->removeCachedIndex(unique_index->getName(), m_tabname);
-      break;
-    case PRIMARY_KEY_INDEX:
-    case UNDEFINED_INDEX:
-      break;
-    }
-  }
+  if (m_ha_not_exact_count)
+    return;
+  DBUG_ENTER("ha_ndbcluster::uncommitted_rows_update");
+  DBUG_ASSERT(m_thd_share);
+  m_thd_share->uncommitted_rows+= c;
+  DBUG_PRINT("info", ("uncommitted_rows: %d",
+                      m_thd_share->uncommitted_rows));
   DBUG_VOID_RETURN;
 }
 
+
 int ha_ndbcluster::ndb_err(NdbTransaction *trans)
 {
   int res;
   NdbError err= trans->getNdbError();
   DBUG_ENTER("ndb_err");
-  
+ 
   ERR_PRINT(err);
   switch (err.classification) {
   case NdbError::SchemaError:
-    invalidate_dictionary_cache(TRUE);
-
-    if (err.code==284)
+    // Indicate that this handler instance needs to be closed
+    table->s->version= 0L;
+    if (err.code == 284)
     {
       /*
-         Check if the table is _really_ gone or if the table has
-         been alterend and thus changed table id
+	Check if the table is _really_ gone or if the table has
+	been alterend and thus changed table id
        */
       NDBDICT *dict= get_ndb()->getDictionary();
+      dict->invalidateTable(m_tabname);
       DBUG_PRINT("info", ("Check if table %s is really gone", m_tabname));
       if (!(dict->getTable(m_tabname)))
       {
@@ -525,8 +543,9 @@
         if (err.code != 709)
           DBUG_RETURN(1);
       }
-      DBUG_PRINT("info", ("Table exists but must have changed"));
     }
+    /* Close open unused tables */
+    close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
     break;
   default:
     break;
@@ -903,92 +922,187 @@
 
 
 /*
-  Get metadata for this table from NDB 
+  Decode the type of an index from information
+  provided in table object
+*/
+
+static NDB_INDEX_TYPE get_index_type_from_table(TABLE* table, uint inx)
+{
+  bool is_hash_index=  (table->key_info[inx].algorithm == HA_KEY_ALG_HASH);
+  if (inx == table->s->primary_key)
+
+    return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX;
+
+  return ((table->key_info[inx].flags & HA_NOSAME) ?
+          (is_hash_index ? UNIQUE_INDEX : UNIQUE_ORDERED_INDEX) :
+          ORDERED_INDEX);
+}
+
+
+/*
+  Check that .frm file on disk is up to date
+
+  SYNOPSIS
+  check_frm_file_up2date
+  path - path of table
+
+  RETURN VALUES
+  1 - Could not read or pack frm data
+  2 - Frm file differed
 
-  IMPLEMENTATION
-    - check that frm-file on disk is equal to frm-file
-      of table accessed in NDB
 */
 
-int ha_ndbcluster::get_metadata(const char *path)
+static int check_frm_file_up2date(const char* path,
+				  const NDBTAB* tab)
+{
+  const void *data, *pack_data;
+  uint length, pack_length;
+
+  DBUG_ENTER("check_frm_file_up2date");
+
+  if (readfrm(path, &data, &length) ||
+      packfrm(data, length, &pack_data, &pack_length))
+  {
+    my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
+    my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
+    DBUG_RETURN(1);
+  }
+
+  if ((pack_length != tab->getFrmLength()) ||
+      (memcmp(pack_data, tab->getFrmData(), pack_length)))
+  {
+    DBUG_PRINT("error",
+	       ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
+		pack_length, tab->getFrmLength(),
+		memcmp(pack_data, tab->getFrmData(), pack_length)));
+    DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
+    DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
+    my_free((char*)data, MYF(0));
+    my_free((char*)pack_data, MYF(0));
+
+    DBUG_RETURN(2);
+  }
+  my_free((char*)data, MYF(0));
+  my_free((char*)pack_data, MYF(0));
+  DBUG_RETURN(0);
+}
+
+
+/*
+  Open handle to table and indexes in  NDB
+
+  SYNOPSIS
+  get_table_and_index_handles
+    path - path of table to open
+
+
+  RETURN_VALUE
+    0 if successful
+  > 0 if table could not be opened
+
+*/
+
+int ha_ndbcluster::get_table_and_index_handles(const char *path)
 {
-  Ndb *ndb= get_ndb();
-  NDBDICT *dict= ndb->getDictionary();
-  const NDBTAB *tab;
   int error;
-  bool invalidating_ndb_table= FALSE;
+  const NDBTAB *tab;
+  Ndb *ndb;
+  NDBDICT *dict;
+  DBUG_ASSERT(!m_table); // Check table is not yet open
+  DBUG_ASSERT(!m_thd); // Check handler is not locked to a thread
+  DBUG_ENTER("get_table_and_index_handles");
+  DBUG_PRINT("enter", ("path: %s", path));
 
-  DBUG_ENTER("get_metadata");
-  DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
+  /* Get Ndb object to use */
+  if(!(ndb= check_ndb_in_thd(current_thd)))
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+  ndb->setDatabaseName(m_dbname);
+  dict= ndb->getDictionary();
 
-  do {
-    const void *data, *pack_data;
-    uint length, pack_length;
+  /* Get handle to table and inc ref counter */
+  if (!(tab= dict->getGlobalTable(m_tabname)))
+    ERR_RETURN(dict->getNdbError());
 
-    if (!(tab= dict->getTable(m_tabname)))
-      ERR_RETURN(dict->getNdbError());
-    // Check if thread has stale local cache
-    if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
-    {
-      invalidate_dictionary_cache(FALSE);
-      if (!(tab= dict->getTable(m_tabname)))
-         ERR_RETURN(dict->getNdbError());
-      DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
-    }
-    /*
-      Compare FrmData in NDB with frm file from disk.
-    */
-    error= 0;
-    if (readfrm(path, &data, &length) ||
-        packfrm(data, length, &pack_data, &pack_length))
-    {
-      my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
-      my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
-      DBUG_RETURN(1);
-    }
-    
-    if ((pack_length != tab->getFrmLength()) || 
-        (memcmp(pack_data, tab->getFrmData(), pack_length)))
+  /* Check frm file against the table fetched from NDB */
+  if ((error= check_frm_file_up2date(path, tab)))
+  {
+    dict->releaseGlobalTable(tab);
+    DBUG_RETURN(error);
+  }
+
+  /* Build list of indexes */
+  if ((error= build_index_list(ndb)))
+  {
+    dict->releaseGlobalTable(tab);
+    DBUG_RETURN(error);
+  }
+
+  /* Save handle to the now open table */
+  m_table= (void *)tab;
+
+  DBUG_RETURN(0);
+}
+
+
+/*
+  Release resources setup in get_table_and_index_handles
+   - release handle to table
+   - release handle to all indexes
+*/
+
+void ha_ndbcluster::release_table_and_index_handles()
+{
+  uint i;
+  Ndb* ndb;
+  NDBDICT* dict;
+  THD* thd;
+  DBUG_ENTER("release_table_and_index_handles");
+
+  /* Get Ndb object to use */
+  if ((thd= current_thd))
+  {
+    if(!(ndb= check_ndb_in_thd(thd)))
     {
-      if (!invalidating_ndb_table)
-      {
-        DBUG_PRINT("info", ("Invalidating table"));
-        invalidate_dictionary_cache(TRUE);
-        invalidating_ndb_table= TRUE;
-      }
-      else
-      {
-        DBUG_PRINT("error", 
-                   ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", 
-                    pack_length, tab->getFrmLength(),
-                    memcmp(pack_data, tab->getFrmData(), pack_length)));      
-        DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
-        DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
-        error= 3;
-        invalidating_ndb_table= FALSE;
-      }
+      DBUG_ASSERT(false); // Crash in debug mode
+      DBUG_VOID_RETURN;
     }
-    else
+  }
+  else
+  {
+    /* No thd available, use global ndb object */
+    ndb= g_ndb;
+  }
+  dict= ndb->getDictionary();
+
+  /* Release index list */
+  for (i= 0; i < MAX_KEY; i++)
+  {
+    if (m_index[i].unique_index)
+      dict->releaseGlobalIndex((NDBINDEX*)m_index[i].unique_index);
+    if (m_index[i].index)
+      dict->releaseGlobalIndex((NDBINDEX*)m_index[i].index);
+
+    m_index[i].unique_index= NULL;
+    m_index[i].index= NULL;
+    if (m_index[i].unique_index_attrid_map)
     {
-      invalidating_ndb_table= FALSE;
+      my_free((char *)m_index[i].unique_index_attrid_map, MYF(0));
+      m_index[i].unique_index_attrid_map= NULL;
     }
-    my_free((char*)data, MYF(0));
-    my_free((char*)pack_data, MYF(0));
-  } while (invalidating_ndb_table);
+  }
 
-  if (error)
-    DBUG_RETURN(error);
-  
-  m_table_version= tab->getObjectVersion();
-  m_table= (void *)tab; 
-  m_table_info= NULL; // Set in external lock
-  
-  DBUG_RETURN(build_index_list(ndb, table, ILBP_OPEN));
+  /* Release table handle */
+  if (m_table)
+    dict->releaseGlobalTable((NDBTAB*)m_table);
+  m_table= NULL;
+
+  DBUG_VOID_RETURN;
 }
 
-static int fix_unique_index_attr_order(NDB_INDEX_DATA &data,
-                                       const NDBINDEX *index,
-                                       KEY *key_info)
+
+static void fix_unique_index_attr_order(NDB_INDEX_DATA &data,
+					const NDBINDEX *index,
+					KEY *key_info)
 {
   DBUG_ENTER("fix_unique_index_attr_order");
   unsigned sz= index->getNoOfIndexColumns();
@@ -1000,11 +1114,11 @@
   KEY_PART_INFO* key_part= key_info->key_part;
   KEY_PART_INFO* end= key_part+key_info->key_parts;
   DBUG_ASSERT(key_info->key_parts == sz);
-  for (unsigned i= 0; key_part != end; key_part++, i++) 
+  for (unsigned i= 0; key_part != end; key_part++, i++)
   {
     const char *field_name= key_part->field->field_name;
 #ifndef DBUG_OFF
-   data.unique_index_attrid_map[i]= 255;
+    data.unique_index_attrid_map[i]= 255;
 #endif
     for (unsigned j= 0; j < sz; j++)
     {
@@ -1017,149 +1131,72 @@
     }
     DBUG_ASSERT(data.unique_index_attrid_map[i] != 255);
   }
-  DBUG_RETURN(0);
+  DBUG_VOID_RETURN;
 }
 
-int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
+
+/*
+  Build list of handles to all indexes for this table
+  Open handle to table and indexes in  NDB
+
+  SYNOPSIS
+  build_index_list
+    ndb - pointer to current Ndb object
+
+
+  RETURN_VALUE
+    0 if successful
+  > 0 if an error occured while opening an index
+
+*/
+
+int ha_ndbcluster::build_index_list(Ndb* ndb)
 {
   uint i;
-  int error= 0;
-  const char *index_name;
   char unique_index_name[FN_LEN];
-  static const char* unique_suffix= "$unique";
-  KEY* key_info= tab->key_info;
-  const char **key_name= tab->s->keynames.type_names;
-  NDBDICT *dict= ndb->getDictionary();
+  KEY* key_info= table->key_info;
+  const char **key_name= table->s->keynames.type_names;
+  NDBDICT* dict= ndb->getDictionary();
   DBUG_ENTER("ha_ndbcluster::build_index_list");
-  
-  // Save information about all known indexes
-  for (i= 0; i < tab->s->keys; i++, key_info++, key_name++)
+
+  /* Save information about all known indexes */
+  for (i= 0; i < table->s->keys; i++, key_info++, key_name++)
   {
-    index_name= *key_name;
-    NDB_INDEX_TYPE idx_type= get_index_type_from_table(i);
+    const char* index_name= *key_name;
+    NDB_INDEX_TYPE idx_type= get_index_type_from_table(table, i);
     m_index[i].type= idx_type;
+    DBUG_PRINT("info", ("index_name: %s, idx_type: %d", index_name, idx_type));
+
+    /* Format a name for the uniqe index */
     if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
-    {
       strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
-      DBUG_PRINT("info", ("Created unique index name \'%s\' for index %d",
-                          unique_index_name, i));
-    }
-    // Create secondary indexes if in create phase
-    if (phase == ILBP_CREATE)
-    {
-      DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));      
-      switch (idx_type){
-        
-      case PRIMARY_KEY_INDEX:
-        // Do nothing, already created
-        break;
-      case PRIMARY_KEY_ORDERED_INDEX:
-        error= create_ordered_index(index_name, key_info);
-        break;
-      case UNIQUE_ORDERED_INDEX:
-        if (!(error= create_ordered_index(index_name, key_info)))
-          error= create_unique_index(unique_index_name, key_info);
-        break;
-      case UNIQUE_INDEX:
-        if (!(error= check_index_fields_not_null(i)))
-          error= create_unique_index(unique_index_name, key_info);
-        break;
-      case ORDERED_INDEX:
-        error= create_ordered_index(index_name, key_info);
-        break;
-      default:
-        DBUG_ASSERT(FALSE);
-        break;
-      }
-      if (error)
-      {
-        DBUG_PRINT("error", ("Failed to create index %u", i));
-        drop_table();
-        break;
-      }
-    }
-    // Add handles to index objects
+
+
     if (idx_type != PRIMARY_KEY_INDEX && idx_type != UNIQUE_INDEX)
     {
       DBUG_PRINT("info", ("Get handle to index %s", index_name));
-      const NDBINDEX *index= dict->getIndex(index_name, m_tabname);
-      if (!index) DBUG_RETURN(1);
+      const NDBINDEX *index= dict->getGlobalIndex(index_name,
+						  m_tabname);
+      if (!index)
+	ERR_RETURN(dict->getNdbError());
       m_index[i].index= (void *) index;
     }
+
     if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
     {
-      DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name));
-      const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname);
-      if (!index) DBUG_RETURN(1);
+      DBUG_PRINT("info", ("Get handle to unique_index %s",
+			  unique_index_name));
+      const NDBINDEX *index= dict->getGlobalIndex(unique_index_name,
+						  m_tabname);
+      if (!index)
+	ERR_RETURN(dict->getNdbError());
       m_index[i].unique_index= (void *) index;
-      error= fix_unique_index_attr_order(m_index[i], index, key_info);
+      fix_unique_index_attr_order(m_index[i], index, key_info);
     }
   }
-  
-  DBUG_RETURN(error);
-}
-
-
-/*
-  Decode the type of an index from information 
-  provided in table object
-*/
-NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
-{
-  bool is_hash_index=  (table->key_info[inx].algorithm == HA_KEY_ALG_HASH);
-  if (inx == table->s->primary_key)
-    return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX;
-
-  return ((table->key_info[inx].flags & HA_NOSAME) ? 
-          (is_hash_index ? UNIQUE_INDEX : UNIQUE_ORDERED_INDEX) :
-          ORDERED_INDEX);
-} 
-
-int ha_ndbcluster::check_index_fields_not_null(uint inx)
-{
-  KEY* key_info= table->key_info + inx;
-  KEY_PART_INFO* key_part= key_info->key_part;
-  KEY_PART_INFO* end= key_part+key_info->key_parts;
-  DBUG_ENTER("ha_ndbcluster::check_index_fields_not_null");
-  
-  for (; key_part != end; key_part++) 
-    {
-      Field* field= key_part->field;
-      if (field->maybe_null())
-      {
-        my_printf_error(ER_NULL_COLUMN_IN_INDEX,ER(ER_NULL_COLUMN_IN_INDEX),
-                        MYF(0),field->field_name);
-        DBUG_RETURN(ER_NULL_COLUMN_IN_INDEX);
-      }
-    }
-  
   DBUG_RETURN(0);
 }
 
-void ha_ndbcluster::release_metadata()
-{
-  uint i;
-
-  DBUG_ENTER("release_metadata");
-  DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
-
-  m_table= NULL;
-  m_table_info= NULL;
-
-  // Release index list 
-  for (i= 0; i < MAX_KEY; i++)
-  {
-    m_index[i].unique_index= NULL;      
-    m_index[i].index= NULL;      
-    if (m_index[i].unique_index_attrid_map)
-    {
-      my_free((char *)m_index[i].unique_index_attrid_map, MYF(0));
-      m_index[i].unique_index_attrid_map= NULL;
-    }
-  }
-
-  DBUG_VOID_RETURN;
-}
 
 int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
 {
@@ -1224,12 +1261,12 @@
 */
 
 inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part,
-                                        bool all_parts) const 
-{ 
+                                        bool all_parts) const
+{
   DBUG_ENTER("ha_ndbcluster::index_flags");
   DBUG_PRINT("info", ("idx_no: %d", idx_no));
-  DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
-  DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)] | 
+  DBUG_ASSERT(get_index_type_from_table(table, idx_no) < index_flags_size);
+  DBUG_RETURN(index_type_flags[get_index_type_from_table(table, idx_no)] |
               HA_KEY_SCAN_NOT_ROR);
 }
 
@@ -2014,7 +2051,7 @@
     Find out how this is detected!
   */
   m_rows_inserted++;
-  no_uncommitted_rows_update(1);
+  uncommitted_rows_update(1);
   m_bulk_insert_not_flushed= TRUE;
   if ((m_rows_to_insert == (ha_rows) 1) || 
       ((m_rows_inserted % m_bulk_insert_rows) == 0) ||
@@ -2032,7 +2069,7 @@
       if (execute_no_commit(this,trans) != 0)
       {
         m_skip_auto_increment= TRUE;
-        no_uncommitted_rows_execute_failure();
+        uncommitted_rows_execute_failure();
         DBUG_RETURN(ndb_err(trans));
       }
     }
@@ -2041,7 +2078,7 @@
       if (execute_commit(this,trans) != 0)
       {
         m_skip_auto_increment= TRUE;
-        no_uncommitted_rows_execute_failure();
+        uncommitted_rows_execute_failure();
         DBUG_RETURN(ndb_err(trans));
       }
       if (trans->restart() != 0)
@@ -2231,7 +2268,7 @@
 
   // Execute update operation
   if (!cursor && execute_no_commit(this,trans) != 0) {
-    no_uncommitted_rows_execute_failure();
+    uncommitted_rows_execute_failure();
     DBUG_RETURN(ndb_err(trans));
   }
   
@@ -2268,7 +2305,7 @@
       ERR_RETURN(trans->getNdbError());     
     m_ops_pending++;
 
-    no_uncommitted_rows_update(-1);
+    uncommitted_rows_update(-1);
 
     if (!m_primary_key_update)
       // If deleting from cursor, NoCommit will be handled in next_result
@@ -2281,7 +2318,7 @@
         op->deleteTuple() != 0)
       ERR_RETURN(trans->getNdbError());
     
-    no_uncommitted_rows_update(-1);
+    uncommitted_rows_update(-1);
     
     if (table->s->primary_key == MAX_KEY) 
     {
@@ -2304,7 +2341,7 @@
 
   // Execute delete operation
   if (execute_no_commit(this,trans) != 0) {
-    no_uncommitted_rows_execute_failure();
+    uncommitted_rows_execute_failure();
     DBUG_RETURN(ndb_err(trans));
   }
   DBUG_RETURN(0);
@@ -2732,7 +2769,7 @@
     */
     DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));    
     if (execute_no_commit(this,trans) != 0) {
-      no_uncommitted_rows_execute_failure();
+      uncommitted_rows_execute_failure();
       DBUG_RETURN(ndb_err(trans));
     }
     m_ops_pending= 0;
@@ -2840,33 +2877,45 @@
 
 void ha_ndbcluster::info(uint flag)
 {
+  DBUG_ASSERT(m_table); // Check table is open
   DBUG_ENTER("info");
   DBUG_PRINT("enter", ("flag: %d", flag));
-  
-  if (flag & HA_STATUS_POS)
-    DBUG_PRINT("info", ("HA_STATUS_POS"));
-  if (flag & HA_STATUS_NO_LOCK)
-    DBUG_PRINT("info", ("HA_STATUS_NO_LOCK"));
-  if (flag & HA_STATUS_TIME)
-    DBUG_PRINT("info", ("HA_STATUS_TIME"));
+
   if (flag & HA_STATUS_VARIABLE)
   {
     DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
-    if (m_table_info)
+    if (m_thd)
     {
+      /* This handler is in use by a thd */
+      DBUG_ASSERT(m_thd_share);
       if (m_ha_not_exact_count)
         records= 100;
       else
-        records_update();
+      {
+	Ndb* ndb= get_ndb(m_thd);
+	struct Ndb_statistics stat;
+	if (ndb_get_table_statistics(ndb, (const NDBTAB*)m_table, &stat))
+	{
+	  /* Failed to get table statistics */
+	  records= (ha_rows)~0;
+	  DBUG_VOID_RETURN;
+	}
+	DBUG_PRINT("info", ("uncommitted_rows: %d",
+			    m_thd_share->uncommitted_rows));
+	mean_rec_length= stat.row_size;
+	data_file_length= stat.fragment_memory;
+	records= stat.row_count + m_thd_share->uncommitted_rows;
+      }
     }
     else
     {
-      if ((my_errno= check_ndb_connection()))
-        DBUG_VOID_RETURN;
-      Ndb *ndb= get_ndb();
+      THD* thd= current_thd;
+      Ndb *ndb;
+      if (!(ndb= check_ndb_in_thd(thd)))
+	DBUG_VOID_RETURN;
       struct Ndb_statistics stat;
-      if (current_thd->variables.ndb_use_exact_count &&
-          ndb_get_table_statistics(ndb, m_tabname, &stat) == 0)
+      if (thd->variables.ndb_use_exact_count &&
+          ndb_get_table_statistics(ndb, (const NDBTAB *)m_table, &stat) == 0)
       {
         mean_rec_length= stat.row_size;
         data_file_length= stat.fragment_memory;
@@ -2879,24 +2928,20 @@
       }
     }
   }
-  if (flag & HA_STATUS_CONST)
-  {
-    DBUG_PRINT("info", ("HA_STATUS_CONST"));
-    set_rec_per_key();
-  }
   if (flag & HA_STATUS_ERRKEY)
   {
     DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
     errkey= m_dupkey;
   }
+
   if (flag & HA_STATUS_AUTO)
   {
     DBUG_PRINT("info", ("HA_STATUS_AUTO"));
     if (m_table)
     {
       Ndb *ndb= get_ndb();
-      
-      auto_increment_value= 
+
+      auto_increment_value=
         ndb->readAutoIncrementValue((const NDBTAB *) m_table);
     }
   }
@@ -3098,7 +3143,7 @@
                         (int) m_rows_inserted, (int) m_bulk_insert_rows)); 
     m_bulk_insert_not_flushed= FALSE;
     if (execute_no_commit(this,trans) != 0) {
-      no_uncommitted_rows_execute_failure();
+      uncommitted_rows_execute_failure();
       my_errno= error= ndb_err(trans);
     }
   }
@@ -3185,176 +3230,132 @@
   DBUG_RETURN(to);
 }
 
-#ifndef DBUG_OFF
-#define PRINT_OPTION_FLAGS(t) { \
-      if (t->options & OPTION_NOT_AUTOCOMMIT) \
-        DBUG_PRINT("thd->options", ("OPTION_NOT_AUTOCOMMIT")); \
-      if (t->options & OPTION_BEGIN) \
-        DBUG_PRINT("thd->options", ("OPTION_BEGIN")); \
-      if (t->options & OPTION_TABLE_LOCK) \
-        DBUG_PRINT("thd->options", ("OPTION_TABLE_LOCK")); \
-}
-#else
-#define PRINT_OPTION_FLAGS(t)
-#endif
-
 
 /*
-  As MySQL will execute an external lock for every new table it uses
-  we can use this to start the transactions.
-  If we are in auto_commit mode we just need to start a transaction
-  for the statement, this will be stored in thd_ndb.stmt.
-  If not, we have to start a master transaction if there doesn't exist
-  one from before, this will be stored in thd_ndb.all
- 
-  When a table lock is held one transaction will be started which holds
-  the table lock and for each statement a hupp transaction will be started  
-  If we are locking the table then:
-  - save the NdbDictionary::Table for easy access
-  - save reference to table statistics
-  - refresh list of the indexes for the table if needed (if altered)
- */
+
+  Function is executed for every new handler the thd want's to
+  use or stop use
+
+  It will either:
+    Lock an open handler to a thd, start transaction and connect handler to
+    that transaction
+
+      or
+
+    Unlock a previously locked handler from thd and disconnect it from
+    transaction
+
+  SYNOPSIS
+  ha_ndbcluster::external_lock
+  thd            - thread handle
+  lock_type      - indicates wether to lock or unlock
+
+  RETURN_VALUE
+  0 if successful
+  > 0 if table could not be locked/unlocked
+
+*/
 
 int ha_ndbcluster::external_lock(THD *thd, int lock_type)
 {
-  int error=0;
-  NdbTransaction* trans= NULL;
-
+  Thd_ndb *thd_ndb;
   DBUG_ENTER("external_lock");
-  /*
-    Check that this handler instance has a connection
-    set up to the Ndb object of thd
-   */
-  if (check_ndb_connection(thd))
-    DBUG_RETURN(1);
-
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  Ndb *ndb= thd_ndb->ndb;
+  DBUG_ASSERT(m_table); // Check that handler has opened table
 
-  DBUG_PRINT("enter", ("thd: %x, thd_ndb: %x, thd_ndb->lock_count: %d",
-                       thd, thd_ndb, thd_ndb->lock_count));
+  if (!(thd_ndb= check_thd_ndb_in_thd(thd)))
+    DBUG_RETURN(1);
 
   if (lock_type != F_UNLCK)
   {
-    DBUG_PRINT("info", ("lock_type != F_UNLCK"));
-    if (!thd_ndb->lock_count++)
+    /* This handler is now connected to thd */
+    DBUG_PRINT("info", ("Connect this handler to thd"));
+    m_thd= thd;
+
+    if (!(m_thd_share= get_thd_share(thd_ndb, table->s->path)))
+      DBUG_RETURN(2);
+
+    if (!thd_ndb->lock_count++ && !thd_ndb->all)
     {
-      PRINT_OPTION_FLAGS(thd);
-      if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) 
+      /*
+	This is the first time this thd locks a handler
+	and there is no "master" trans started
+      */
+      DBUG_PRINT("info", ("Start of transaction"));
+
+      Ndb* ndb;
+      if (!(ndb= get_ndb(thd)))
+	DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+      if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
       {
-        // Autocommit transaction
+        /* Start autocommit transaction */
         DBUG_ASSERT(!thd_ndb->stmt);
-        DBUG_PRINT("trans",("Starting transaction stmt"));      
+        DBUG_PRINT("info", ("Starting autocommit transaction"));
 
-        trans= ndb->startTransaction();
-        if (trans == NULL)
+	NdbTransaction* trans;
+        if (!(trans= ndb->startTransaction()))
           ERR_RETURN(ndb->getNdbError());
-        no_uncommitted_rows_reset(thd);
         thd_ndb->stmt= trans;
         trans_register_ha(thd, FALSE, &ndbcluster_hton);
-      } 
-      else 
-      { 
-        if (!thd_ndb->all)
-        {
-          // Not autocommit transaction
-          // A "master" transaction ha not been started yet
-          DBUG_PRINT("trans",("starting transaction, all"));
-          
-          trans= ndb->startTransaction();
-          if (trans == NULL)
-            ERR_RETURN(ndb->getNdbError());
-          no_uncommitted_rows_reset(thd);
-          thd_ndb->all= trans; 
-          trans_register_ha(thd, TRUE, &ndbcluster_hton);
-
-          /*
-            If this is the start of a LOCK TABLE, a table look 
-            should be taken on the table in NDB
-           
-            Check if it should be read or write lock
-           */
-          if (thd->options & (OPTION_TABLE_LOCK))
-          {
-            //lockThisTable();
-            DBUG_PRINT("info", ("Locking the table..." ));
-          }
 
-        }
       }
+      else
+      {
+	/* Start "master" transaction  */
+	DBUG_ASSERT(!thd_ndb->all);
+	DBUG_PRINT("info", ("Starting transaction"));
+
+	NdbTransaction* trans;
+	if (!(trans= ndb->startTransaction()))
+	  ERR_RETURN(ndb->getNdbError());
+	thd_ndb->all= trans;
+	trans_register_ha(thd, TRUE, &ndbcluster_hton);
+      }
+
+      uncommitted_rows_init(thd_ndb);
+
     }
     /*
       This is the place to make sure this handler instance
       has a started transaction.
-     
-      The transaction is started by the first handler on which 
+
+      The transaction is started by the first handler on which
       MySQL Server calls external lock
-     
-      Other handlers in the same stmt or transaction should use 
-      the same NDB transaction. This is done by setting up the m_active_trans
-      pointer to point to the NDB transaction. 
+
+      Other handlers in the same stmt or transaction should use
+      the same transaction. This is done by setting the m_active_trans
+      pointer to point to the NDB transaction.
      */
+    m_active_trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
+    DBUG_ASSERT(m_active_trans);
 
-    // store thread specific data first to set the right context
+    /* store thread specific data to set the right context */
     m_force_send=          thd->variables.ndb_force_send;
     m_ha_not_exact_count= !thd->variables.ndb_use_exact_count;
-    m_autoincrement_prefetch= 
+    m_autoincrement_prefetch=
       (ha_rows) thd->variables.ndb_autoincrement_prefetch_sz;
     if (!thd->transaction.on)
       m_transaction_on= FALSE;
     else
       m_transaction_on= thd->variables.ndb_use_transactions;
 
-    m_active_trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
-    DBUG_ASSERT(m_active_trans);
-    // Start of transaction
+    /* Init transaction "scope" variables */
     m_rows_changed= 0;
     m_retrieve_all_fields= FALSE;
     m_retrieve_primary_key= FALSE;
     m_ops_pending= 0;
-    {
-      NDBDICT *dict= ndb->getDictionary();
-      const NDBTAB *tab;
-      void *tab_info;
-      if (!(tab= dict->getTable(m_tabname, &tab_info)))
-        ERR_RETURN(dict->getNdbError());
-      DBUG_PRINT("info", ("Table schema version: %d", 
-                          tab->getObjectVersion()));
-      // Check if thread has stale local cache
-      // New transaction must not use old tables... (trans != 0)
-      // Running might...
-      if ((trans && tab->getObjectStatus() !=
NdbDictionary::Object::Retrieved)
-	  || tab->getObjectStatus() == NdbDictionary::Object::Invalid)
-      {
-        invalidate_dictionary_cache(FALSE);
-        if (!(tab= dict->getTable(m_tabname, &tab_info)))
-          ERR_RETURN(dict->getNdbError());
-        DBUG_PRINT("info", ("Table schema version: %d", 
-                            tab->getObjectVersion()));
-      }
-      if (m_table != (void *)tab)
-      {
-        m_table= (void *)tab;
-        m_table_version = tab->getObjectVersion();
-      }
-      else if (m_table_version < tab->getObjectVersion())
-      {
-        /*
-          The table has been altered, caller has to retry
-        */
-        DBUG_RETURN(my_errno= HA_ERR_TABLE_DEF_CHANGED);
-      }
-      m_table_info= tab_info;
-    }
-    no_uncommitted_rows_init(thd);
+
   }
   else
   {
-    DBUG_PRINT("info", ("lock_type == F_UNLCK"));
+    /* Remove this handler from transaction */
+    DBUG_PRINT("info", ("Unlock this handler from thd"));
 
+    DBUG_ASSERT(m_thd); // Check that handler is locked
+
+    /* Util thread is active and rows has changed*/
     if (ndb_cache_check_time && m_rows_changed)
     {
-      DBUG_PRINT("info", ("Rows has changed and util thread is running"));
       if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
       {
         DBUG_PRINT("info", ("Add share to list of tables to be invalidated"));
@@ -3371,9 +3372,10 @@
 
     if (!--thd_ndb->lock_count)
     {
-      DBUG_PRINT("trans", ("Last external_lock"));
-      PRINT_OPTION_FLAGS(thd);
-
+      /*
+	This is the last time during statement that
+	this thd unlocks a handler
+      */
       if (thd_ndb->stmt)
       {
         /*
@@ -3381,41 +3383,42 @@
           This happens if the thread didn't update any rows
           We must in this case close the transaction to release resources
         */
+	Ndb* ndb;
+	if (!(ndb= get_ndb(thd)))
+	  DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
         DBUG_PRINT("trans",("ending non-updating transaction"));
         ndb->closeTransaction(m_active_trans);
         thd_ndb->stmt= NULL;
       }
     }
-    m_table_info= NULL;
 
-    /*
-      This is the place to make sure this handler instance
-      no longer are connected to the active transaction.
+    /* Disconnect this handler from the active transaction.*/
+    m_active_trans= NULL;
 
-      And since the handler is no longer part of the transaction 
-      it can't have open cursors, ops or blobs pending.
-    */
-    m_active_trans= NULL;    
+    /* Disconnect from thd */
+    m_thd= NULL;
 
-    if (m_active_cursor)
-      DBUG_PRINT("warning", ("m_active_cursor != NULL"));
-    m_active_cursor= NULL;
+    /* Disconnect from thd_hare */
+    m_thd_share= NULL;
 
-    if (m_multi_cursor)
-      DBUG_PRINT("warning", ("m_multi_cursor != NULL"));
+    /*
+      Since the handler is no longer part of the transaction
+      it can't have open cursors, ops or blobs pending
+    */
+    DBUG_ASSERT(m_active_cursor == NULL);
+    m_active_cursor= NULL;
+    DBUG_ASSERT(m_multi_cursor == NULL);
     m_multi_cursor= NULL;
-    
-    if (m_blobs_pending)
-      DBUG_PRINT("warning", ("blobs_pending != 0"));
-    m_blobs_pending= 0;
-    
-    if (m_ops_pending)
-      DBUG_PRINT("warning", ("ops_pending != 0L"));
+    DBUG_ASSERT(m_blobs_pending == FALSE );
+    m_blobs_pending= FALSE;
+    DBUG_ASSERT(m_ops_pending == 0);
     m_ops_pending= 0;
   }
-  DBUG_RETURN(error);
+  DBUG_RETURN(0);
 }
 
+
 /*
   Start a transaction for running a statement if one is not
   already running in a transaction. This will be the case in
@@ -3428,7 +3431,6 @@
 {
   int error=0;
   DBUG_ENTER("start_stmt");
-  PRINT_OPTION_FLAGS(thd);
 
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   NdbTransaction *trans= (thd_ndb->stmt)?thd_ndb->stmt:thd_ndb->all;
@@ -3438,17 +3440,18 @@
     trans= ndb->startTransaction();
     if (trans == NULL)
       ERR_RETURN(ndb->getNdbError());
-    no_uncommitted_rows_reset(thd);
     thd_ndb->stmt= trans;
     trans_register_ha(thd, FALSE, &ndbcluster_hton);
+
+    uncommitted_rows_init(thd_ndb);
   }
   m_active_trans= trans;
 
   // Start of statement
   m_retrieve_all_fields= FALSE;
   m_retrieve_primary_key= FALSE;
-  m_ops_pending= 0;    
-  
+  m_ops_pending= 0;
+
   DBUG_RETURN(error);
 }
 
@@ -3546,7 +3549,6 @@
 /*
   Define NDB column based on Field.
   Returns 0 or mysql error code.
-  Not member of ha_ndbcluster because NDBCOL cannot be declared.
 
   MySQL text types with character set "binary" are mapped to true
   NDB binary types without a character set.  This may change.
@@ -3821,10 +3823,170 @@
   return 0;
 }
 
+
 /*
-  Create a table in NDB Cluster
+  Return error when trying to create an index
+  on a nullable column
+*/
+
+static int check_index_fields_not_null(TABLE* table, uint inx)
+{
+  KEY* key_info= table->key_info + inx;
+  KEY_PART_INFO* key_part= key_info->key_part;
+  KEY_PART_INFO* end= key_part+key_info->key_parts;
+  DBUG_ENTER("check_index_fields_not_null");
+
+  for (; key_part != end; key_part++)
+  {
+    Field* field= key_part->field;
+    if (field->maybe_null())
+    {
+      my_printf_error(ER_NULL_COLUMN_IN_INDEX,ER(ER_NULL_COLUMN_IN_INDEX),
+		      MYF(0),field->field_name);
+      DBUG_RETURN(ER_NULL_COLUMN_IN_INDEX);
+    }
+  }
+  DBUG_RETURN(0);
+}
+
+
+/*
+  Create an index in NDB Cluster
  */
 
+static int create_index(Ndb *ndb, TABLE *form,
+			const char *name, KEY *key_info,
+			bool unique)
+{
+  NdbDictionary::Dictionary *dict= ndb->getDictionary();
+  KEY_PART_INFO *key_part= key_info->key_part;
+  KEY_PART_INFO *end= key_part + key_info->key_parts;
+
+  DBUG_ENTER("create_index");
+  DBUG_PRINT("enter", ("name: %s, unique: %d", name, unique));
+
+  NdbDictionary::Index ndb_index(name);
+  if (unique)
+    ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
+  else
+  {
+    ndb_index.setType(NdbDictionary::Index::OrderedIndex);
+    // TODO Only temporary ordered indexes supported
+    ndb_index.setLogging(FALSE);
+  }
+  ndb_index.setTable(form->s->table_name);
+
+  for (; key_part != end; key_part++)
+  {
+    Field *field= key_part->field;
+    DBUG_PRINT("info", ("attr: %s", field->field_name));
+    ndb_index.addColumnName(field->field_name);
+  }
+  if (dict->createIndex(ndb_index))
+    ERR_RETURN(dict->getNdbError());
+
+  // Success
+  DBUG_PRINT("info", ("Created index %s", name));
+  DBUG_RETURN(0);
+}
+
+static int create_ordered_index(Ndb* ndb, TABLE* form,
+				const char *name, KEY *key_info)
+{
+  return create_index(ndb, form, name, key_info, FALSE);
+}
+
+
+static int create_unique_index(Ndb* ndb, TABLE* form,
+			       const char *name, KEY *key_info)
+{
+  return create_index(ndb, form, name, key_info, TRUE);
+}
+
+
+/*
+  Create all indexes associated with this table
+*/
+
+static int create_indexes(Ndb *ndb, TABLE *form)
+{
+  uint i;
+  int error= 0;
+  const char *index_name;
+  char unique_index_name[FN_LEN];
+  KEY* key_info= form->key_info;
+  const char **key_name= form->s->keynames.type_names;
+  DBUG_ENTER("create_indexes");
+
+  for (i= 0; i < form->s->keys; i++, key_info++, key_name++)
+  {
+    index_name= *key_name;
+    NDB_INDEX_TYPE idx_type= get_index_type_from_table(form, i);
+    if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
+      strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
+
+    DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));
+    switch (idx_type){
+      case PRIMARY_KEY_INDEX:
+        // Do nothing, already created
+        break;
+      case PRIMARY_KEY_ORDERED_INDEX:
+        error= create_ordered_index(ndb, form, index_name, key_info);
+        break;
+      case UNIQUE_ORDERED_INDEX:
+        if (!(error= create_ordered_index(ndb, form, index_name, key_info)))
+          error= create_unique_index(ndb, form, unique_index_name, key_info);
+        break;
+      case UNIQUE_INDEX:
+        if (!(error= check_index_fields_not_null(form, i)))
+          error= create_unique_index(ndb, form, unique_index_name, key_info);
+        break;
+      case ORDERED_INDEX:
+        error= create_ordered_index(ndb, form, index_name, key_info);
+        break;
+      default:
+        DBUG_ASSERT(FALSE);
+        break;
+    }
+    if (error)
+    {
+      DBUG_PRINT("error", ("Failed to create index %u", i));
+      DBUG_RETURN(error);
+    }
+  }
+  DBUG_RETURN(0);
+}
+
+/*
+  Create a .ndb file to serve as a placeholder indicating
+  that the table with this name is a ndb table
+*/
+
+static int write_ndb_file(const char* path)
+{
+  File file;
+  bool error=1;
+  char file_name[FN_REFLEN];
+
+  DBUG_ENTER("write_ndb_file");
+  DBUG_PRINT("enter", ("path: %s", path));
+
+  fn_format(file_name, path, "", ha_ndb_ext, MY_REPLACE_EXT);
+  if ((file= my_create(file_name,
+		       CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
+  {
+    /* It's an empty file, no content is written to the file */
+    error=0;
+    my_close(file,MYF(0));
+  }
+  DBUG_RETURN(error);
+}
+
+
+/*
+  Calculate and set fragmentation
+*/
+
 static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length)
 {
   if (form->s->max_rows == (ha_rows) 0) /* default setting, don't set fragmentation
*/
@@ -3871,65 +4033,70 @@
   }
 }
 
-int ha_ndbcluster::create(const char *name, 
-                          TABLE *form, 
-                          HA_CREATE_INFO *info)
+
+/*
+  Create a table in NDB Cluster
+ */
+
+static int ha_ndbcluster_create(const char *path,
+				TABLE *form,
+				HA_CREATE_INFO *info)
 {
+  int error;
   NDBTAB tab;
   NDBCOL col;
   uint pack_length, length, i, pk_length= 0;
   const void *data, *pack_data;
-  char name2[FN_HEADLEN];
   bool create_from_engine= test(info->table_options &
                                 HA_OPTION_CREATE_FROM_ENGINE);
-   
-  DBUG_ENTER("ha_ndbcluster::create");
-  DBUG_PRINT("enter", ("name: %s", name));
-  fn_format(name2, name, "", "",2);       // Remove the .frm extension
-  set_dbname(name2);
-  set_tabname(name2);    
+
+  DBUG_ENTER("ha_ndbcluster_create");
+  DBUG_PRINT("enter", ("path: %s", path));
 
   if (create_from_engine)
   {
     /*
-      Table alreay exists in NDB and frm file has been created by 
+      Table alreay exists in NDB and frm file has been created by
       caller.
       Do Ndb specific stuff, such as create a .ndb file
     */
-    my_errno= write_ndb_file();
-    DBUG_RETURN(my_errno);
+    DBUG_RETURN(write_ndb_file(path));
   }
 
-  DBUG_PRINT("table", ("name: %s", m_tabname));  
-  tab.setName(m_tabname);
-  tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));    
-   
-  // Save frm data for this table
-  if (readfrm(name, &data, &length))
+  char dbname[FN_HEADLEN], tabname[FN_HEADLEN];
+  split_path(path, dbname, tabname);
+
+  DBUG_PRINT("table", ("name: %s", tabname));
+  tab.setName(tabname);
+  tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));
+
+  /* Save frm data for this table */
+  if (readfrm(path, &data, &length))
     DBUG_RETURN(1);
   if (packfrm(data, length, &pack_data, &pack_length))
     DBUG_RETURN(2);
-  
+
   DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
-  tab.setFrm(pack_data, pack_length);      
+  tab.setFrm(pack_data, pack_length);
   my_free((char*)data, MYF(0));
   my_free((char*)pack_data, MYF(0));
-  
-  for (i= 0; i < form->s->fields; i++) 
+
+  /* Create columns */
+  for (i= 0; i < form->s->fields; i++)
   {
     Field *field= form->field[i];
-    DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", 
+    DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
                         field->field_name, field->real_type(),
                         field->pack_length()));
-    if ((my_errno= create_ndb_column(col, field, info)))
-      DBUG_RETURN(my_errno);
+    if ((error= create_ndb_column(col, field, info)))
+      DBUG_RETURN(error);
     tab.addColumn(col);
     if (col.getPrimaryKey())
       pk_length += (field->pack_length() + 3) / 4;
   }
-  
-  // No primary key, create shadow key as 64 bit, auto increment  
-  if (form->s->primary_key == MAX_KEY) 
+
+  /* No primary key, create shadow key as 64 bit, auto increment  */
+  if (form->s->primary_key == MAX_KEY)
   {
     DBUG_PRINT("info", ("Generating shadow key"));
     col.setName("$PK");
@@ -3941,9 +4108,9 @@
     tab.addColumn(col);
     pk_length += 2;
   }
-  
-  // Make sure that blob tables don't have to big part size
-  for (i= 0; i < form->s->fields; i++) 
+
+  /* Make sure that blob tables don't have to big part size */
+  for (i= 0; i < form->s->fields; i++)
   {
     /**
      * The extra +7 concists
@@ -3952,13 +4119,13 @@
      */
     switch (form->field[i]->real_type()) {
     case MYSQL_TYPE_GEOMETRY:
-    case MYSQL_TYPE_BLOB:    
-    case MYSQL_TYPE_MEDIUM_BLOB:   
-    case MYSQL_TYPE_LONG_BLOB: 
+    case MYSQL_TYPE_BLOB:
+    case MYSQL_TYPE_MEDIUM_BLOB:
+    case MYSQL_TYPE_LONG_BLOB:
     {
       NdbDictionary::Column * col= tab.getColumn(i);
       int size= pk_length + (col->getPartSize()+3)/4 + 7;
-      if (size > NDB_MAX_TUPLE_SIZE_IN_WORDS && 
+      if (size > NDB_MAX_TUPLE_SIZE_IN_WORDS &&
          (pk_length+7) < NDB_MAX_TUPLE_SIZE_IN_WORDS)
       {
         size= NDB_MAX_TUPLE_SIZE_IN_WORDS - pk_length - 7;
@@ -3968,7 +4135,7 @@
        * If size > NDB_MAX and pk_length+7 >= NDB_MAX
        *   then the table can't be created anyway, so skip
        *   changing part size, and have error later
-       */ 
+       */
     }
     default:
       break;
@@ -3977,88 +4144,38 @@
 
   ndb_set_fragmentation(tab, form, pk_length);
 
-  if ((my_errno= check_ndb_connection()))
-    DBUG_RETURN(my_errno);
-  
-  // Create the table in NDB     
-  Ndb *ndb= get_ndb();
+  Ndb* ndb;
+  if (!(ndb= check_ndb_in_thd(current_thd)))
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+  /* Create the table in NDB */
   NDBDICT *dict= ndb->getDictionary();
-  if (dict->createTable(tab) != 0) 
+  ndb->setDatabaseName(dbname);
+  if (dict->createTable(tab) != 0)
   {
     const NdbError err= dict->getNdbError();
     ERR_PRINT(err);
-    my_errno= ndb_to_mysql_error(&err);
-    DBUG_RETURN(my_errno);
+    DBUG_RETURN(ndb_to_mysql_error(&err));
   }
-  DBUG_PRINT("info", ("Table %s/%s created successfully", 
-                      m_dbname, m_tabname));
+  DBUG_PRINT("info", ("Table %s/%s created successfully",
+                      dbname, tabname));
 
-  // Create secondary indexes
-  my_errno= build_index_list(ndb, form, ILBP_CREATE);
-
-  if (!my_errno)
-    my_errno= write_ndb_file();
-
-  DBUG_RETURN(my_errno);
-}
-
-
-int ha_ndbcluster::create_ordered_index(const char *name, 
-                                        KEY *key_info)
-{
-  DBUG_ENTER("ha_ndbcluster::create_ordered_index");
-  DBUG_RETURN(create_index(name, key_info, FALSE));
-}
-
-int ha_ndbcluster::create_unique_index(const char *name, 
-                                       KEY *key_info)
-{
+  /* Create indexes */
+  if ((error= create_indexes(ndb, form)))
+  {
+    /* Try to cleanup by dropping the table in NDB */
+    dict->dropTable(tabname);
+    DBUG_RETURN(error);
+  }
 
-  DBUG_ENTER("ha_ndbcluster::create_unique_index");
-  DBUG_RETURN(create_index(name, key_info, TRUE));
+  DBUG_RETURN(write_ndb_file(path));
 }
 
 
-/*
-  Create an index in NDB Cluster
- */
-
-int ha_ndbcluster::create_index(const char *name, 
-                                KEY *key_info,
-                                bool unique)
+int ha_ndbcluster::create(const char *path, TABLE *form,
+                          HA_CREATE_INFO *info)
 {
-  Ndb *ndb= get_ndb();
-  NdbDictionary::Dictionary *dict= ndb->getDictionary();
-  KEY_PART_INFO *key_part= key_info->key_part;
-  KEY_PART_INFO *end= key_part + key_info->key_parts;
-  
-  DBUG_ENTER("ha_ndbcluster::create_index");
-  DBUG_PRINT("enter", ("name: %s ", name));
-
-  NdbDictionary::Index ndb_index(name);
-  if (unique)
-    ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
-  else 
-  {
-    ndb_index.setType(NdbDictionary::Index::OrderedIndex);
-    // TODO Only temporary ordered indexes supported
-    ndb_index.setLogging(FALSE); 
-  }
-  ndb_index.setTable(m_tabname);
-
-  for (; key_part != end; key_part++) 
-  {
-    Field *field= key_part->field;
-    DBUG_PRINT("info", ("attr: %s", field->field_name));
-    ndb_index.addColumnName(field->field_name);
-  }
-  
-  if (dict->createIndex(ndb_index))
-    ERR_RETURN(dict->getNdbError());
-
-  // Success
-  DBUG_PRINT("info", ("Created index %s", name));
-  DBUG_RETURN(0);  
+  return ha_ndbcluster_create(path, form, info);
 }
 
 
@@ -4066,109 +4183,85 @@
   Rename a table in NDB Cluster
 */
 
-int ha_ndbcluster::rename_table(const char *from, const char *to)
+static int ha_ndbcluster_rename_table(const char *from, const char *to)
 {
+
+  Ndb* ndb;
   NDBDICT *dict;
-  char new_tabname[FN_HEADLEN];
+  char dbname[FN_HEADLEN], tabname[FN_HEADLEN];
   const NDBTAB *orig_tab;
-  int result;
 
-  DBUG_ENTER("ha_ndbcluster::rename_table");
+  DBUG_ENTER("ha_ndbcluster_rename_table");
   DBUG_PRINT("info", ("Renaming %s to %s", from, to));
-  set_dbname(from);
-  set_tabname(from);
-  set_tabname(to, new_tabname);
 
-  if (check_ndb_connection())
-    DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
-
-  Ndb *ndb= get_ndb();
+  if (!(ndb= check_ndb_in_thd(current_thd)))
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
   dict= ndb->getDictionary();
-  if (!(orig_tab= dict->getTable(m_tabname)))
-    ERR_RETURN(dict->getNdbError());
-  // Check if thread has stale local cache
-  if (orig_tab->getObjectStatus() == NdbDictionary::Object::Invalid)
-  {
-    dict->removeCachedTable(m_tabname);
-    if (!(orig_tab= dict->getTable(m_tabname)))
-      ERR_RETURN(dict->getNdbError());
-  }
-  m_table= (void *)orig_tab;
-  // Change current database to that of target table
-  set_dbname(to);
-  ndb->setDatabaseName(m_dbname);
-  if (!(result= alter_table_name(new_tabname)))
-  {
-    // Rename .ndb file
-    result= handler::rename_table(from, to);
-  }
-
-  DBUG_RETURN(result);
-}
-
-
-/*
-  Rename a table in NDB Cluster using alter table
- */
 
-int ha_ndbcluster::alter_table_name(const char *to)
-{
-  Ndb *ndb= get_ndb();
-  NDBDICT *dict= ndb->getDictionary();
-  const NDBTAB *orig_tab= (const NDBTAB *) m_table;
-  DBUG_ENTER("alter_table_name_table");
+  /* Get old table */
+  split_path(from, dbname, tabname);
+  ndb->setDatabaseName(dbname);
+  if (!(orig_tab= dict->getTable(tabname)))
+    ERR_RETURN(dict->getNdbError());
 
-  NdbDictionary::Table new_tab= *orig_tab;
-  new_tab.setName(to);
+  /* Change name of table */
+  split_path(to, dbname, tabname);
+  ndb->setDatabaseName(dbname);
+  NDBTAB new_tab= *orig_tab;  // Copy constructor invoked!
+  new_tab.setName(tabname);
   if (dict->alterTable(new_tab) != 0)
     ERR_RETURN(dict->getNdbError());
 
-  m_table= NULL;
-  m_table_info= NULL;
-                                                                             
   DBUG_RETURN(0);
 }
 
+int ha_ndbcluster::rename_table(const char *from, const char *to)
+{
+  int error;
+  if ((error= ha_ndbcluster_rename_table(from, to)))
+    return error;
+
+  // Rename .ndb file
+  return handler::rename_table(from, to);
+}
+
 
 /*
   Delete table from NDB Cluster
+*/
 
- */
-
-int ha_ndbcluster::delete_table(const char *name)
+static int ha_ndbcluster_delete_table(const char *path)
 {
-  DBUG_ENTER("ha_ndbcluster::delete_table");
-  DBUG_PRINT("enter", ("name: %s", name));
-  set_dbname(name);
-  set_tabname(name);
+  Ndb* ndb;
+  char dbname[FN_HEADLEN], tabname[FN_HEADLEN];
+
+  DBUG_ENTER("ha_ndbcluster_delete_table");
+  DBUG_PRINT("enter", ("path: %s", path));
+  split_path(path, dbname, tabname);
 
-  if (check_ndb_connection())
+  if (!(ndb= check_ndb_in_thd(current_thd)))
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
 
-  /* Call ancestor function to delete .ndb file */
-  handler::delete_table(name);
-  
   /* Drop the table from NDB */
-  DBUG_RETURN(drop_table());
-}
+  NdbDictionary::Dictionary *dict= ndb->getDictionary();
+  ndb->setDatabaseName(dbname);
+  if (dict->dropTable(tabname))
+    ERR_RETURN(dict->getNdbError());
 
+  DBUG_RETURN(0);
+}
 
-/*
-  Drop table in NDB Cluster
- */
 
-int ha_ndbcluster::drop_table()
+int ha_ndbcluster::delete_table(const char *path)
 {
-  Ndb *ndb= get_ndb();
-  NdbDictionary::Dictionary *dict= ndb->getDictionary();
-
-  DBUG_ENTER("drop_table");
-  DBUG_PRINT("enter", ("Deleting %s", m_tabname));
 
-  release_metadata();
-  if (dict->dropTable(m_tabname))
-    ERR_RETURN(dict->getNdbError());
-  DBUG_RETURN(0);
+  int error= ha_ndbcluster_delete_table(path);
+  if (error == 0 || error == HA_ERR_NO_SUCH_TABLE)
+  {
+    /* Call ancestor function to delete .ndb file */
+    handler::delete_table(path);
+  }
+  return error;
 }
 
 
@@ -4212,24 +4305,18 @@
 
 
 /*
-  Constructor for the NDB Cluster table handler 
- */
+  Constructor for the NDB Cluster table handler
+*/
 
 ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
   handler(&ndbcluster_hton, table_arg),
   m_active_trans(NULL),
   m_active_cursor(NULL),
   m_table(NULL),
-  m_table_version(-1),
-  m_table_info(NULL),
-  m_table_flags(HA_REC_NOT_IN_SEQ |
-                HA_NULL_IN_KEY |
-                HA_AUTO_PART_KEY |
-                HA_NO_PREFIX_CHAR_KEYS |
-                HA_NEED_READ_RANGE_BUFFER |
-                HA_CAN_GEOMETRY |
-                HA_CAN_BIT_FIELD),
+  m_tabname(NULL),
+  m_dbname(NULL),
   m_share(0),
+  m_thd_share(NULL),
   m_use_write(FALSE),
   m_ignore_dup_key(FALSE),
   m_primary_key_update(FALSE),
@@ -4242,7 +4329,7 @@
   m_bulk_insert_not_flushed(FALSE),
   m_ops_pending(0),
   m_skip_auto_increment(TRUE),
-  m_blobs_pending(0),
+  m_blobs_pending(FALSE),
   m_blobs_buffer(0),
   m_blobs_buffer_size(0),
   m_dupkey((uint) -1),
@@ -4251,14 +4338,12 @@
   m_autoincrement_prefetch((ha_rows) 32),
   m_transaction_on(TRUE),
   m_cond_stack(NULL),
-  m_multi_cursor(NULL)
+  m_multi_cursor(NULL),
+  m_thd(NULL)
 {
   int i;
- 
-  DBUG_ENTER("ha_ndbcluster");
 
-  m_tabname[0]= '\0';
-  m_dbname[0]= '\0';
+  DBUG_ENTER("ha_ndbcluster");
 
   records= ~(ha_rows)0; // uninitialized
   block_size= 1024;
@@ -4270,6 +4355,7 @@
     m_index[i].index= NULL;
     m_index[i].unique_index_attrid_map= NULL;
   }
+  m_dbname_buf[0]= 0;
 
   DBUG_VOID_RETURN;
 }
@@ -4277,26 +4363,18 @@
 
 /*
   Destructor for NDB Cluster table handler
- */
+*/
 
-ha_ndbcluster::~ha_ndbcluster() 
+ha_ndbcluster::~ha_ndbcluster()
 {
   DBUG_ENTER("~ha_ndbcluster");
+  DBUG_ASSERT(!m_table); // Check table is not open
 
   if (m_share)
     free_share(m_share);
-  release_metadata();
   my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
   m_blobs_buffer= 0;
 
-  // Check for open cursor/transaction
-  if (m_active_cursor) {
-  }
-  DBUG_ASSERT(m_active_cursor == NULL);
-  if (m_active_trans) {
-  }
-  DBUG_ASSERT(m_active_trans == NULL);
-
   // Discard the condition stack
   DBUG_PRINT("info", ("Clearing condition stack"));
   cond_clear();
@@ -4307,145 +4385,118 @@
 
 
 /*
-  Open a table for further use
-  - fetch metadata for this table from NDB
+  Open a table for DML operations
   - check that table exists
+  - get handle to table
+  - get handles to indexes of table
+
+  SYNOPSIS
+  ha_ndbcluster::open
+  path           - path of table to open
+  mode           - Not used
+  test_if_locked - Not used
+
+  RETURN_VALUE
+  0 if successful
+  > 0 if table could not be opened
+
+  NOTE This has actually _nothing_ to do with thd, it only opens the table
+
 */
 
-int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
+int ha_ndbcluster::open(const char *path, int mode, uint test_if_locked)
 {
-  int res;
-  KEY *key;
-  DBUG_ENTER("open");
-  DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
-                       name, mode, test_if_locked));
-  
-  // Setup ref_length to make room for the whole 
-  // primary key to be written in the ref variable
-  
-  if (table->s->primary_key != MAX_KEY) 
+  int error;
+  DBUG_ENTER("ha_ndbcluster::open");
+  DBUG_PRINT("enter", ("path: %s mode: %d",
+                       path, mode));
+  DBUG_ASSERT(!m_table); // Check table is closed
+
+  /*
+    Get dbname from path
+    Should be removed when table->s->db contains the db name
+  */
+  char buf[FN_HEADLEN];
+  split_path(path, m_dbname_buf, buf);
+  m_dbname= m_dbname_buf;
+
+  /* Set up m_tabname */
+  m_tabname= table->s->table_name;
+
+  /*
+    Setup ref_length to make room for the whole
+    primary key to be written in the ref variable
+  */
+  if (table->s->primary_key != MAX_KEY)
   {
-    key= table->key_info+table->s->primary_key;
+    KEY *key= table->key_info + table->s->primary_key;
     ref_length= key->key_length;
     DBUG_PRINT("info", (" ref_length: %d", ref_length));
   }
-  // Init table lock structure 
-  if (!(m_share=get_share(name)))
+
+  /* Init table lock structure */
+  if (!(m_share=get_share(path)))
     DBUG_RETURN(1);
   thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
-  
-  set_dbname(name);
-  set_tabname(name);
-  
-  if (check_ndb_connection()) {
-    free_share(m_share); m_share= 0;
-    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+  if ((error= get_table_and_index_handles(path)))
+  {
+    release_table_and_index_handles();
+    free_share(m_share);
+    m_share= 0;
+    m_tabname= NULL;
+    DBUG_RETURN(error);
   }
-  
-  res= get_metadata(name);
-  if (!res)
-    info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
 
-  DBUG_RETURN(res);
+  set_rec_per_key();
+
+  DBUG_PRINT("exit", ("Table %s opened sucessfully", path));
+  DBUG_RETURN(0);
 }
 
 
 /*
   Close the table
   - release resources setup by open()
+
+  NOTE! close will not be called if open fails
  */
 
 int ha_ndbcluster::close(void)
 {
-  DBUG_ENTER("close");  
-  free_share(m_share); m_share= 0;
-  release_metadata();
+  DBUG_ENTER("ha_ndbcluster::close");
+  DBUG_PRINT("enter", ("table: %s", m_tabname));
+  DBUG_ASSERT(!m_thd); // Check handler is not locked
+  DBUG_ASSERT(m_table); // Check table is open
+  release_table_and_index_handles();
+  free_share(m_share);
+  m_share= 0;
+  m_tabname= NULL;
+  DBUG_ASSERT(!m_table); // Check table is closed
   DBUG_RETURN(0);
 }
 
 
-Thd_ndb* ha_ndbcluster::seize_thd_ndb()
-{
-  Thd_ndb *thd_ndb;
-  DBUG_ENTER("seize_thd_ndb");
-
-  thd_ndb= new Thd_ndb();
-  thd_ndb->ndb->getDictionary()->set_local_table_data_size(
-    sizeof(Ndb_local_table_statistics)
-    );
-  if (thd_ndb->ndb->init(max_transactions) != 0)
-  {
-    ERR_PRINT(thd_ndb->ndb->getNdbError());
-    /*
-      TODO 
-      Alt.1 If init fails because to many allocated Ndb 
-      wait on condition for a Ndb object to be released.
-      Alt.2 Seize/release from pool, wait until next release 
-    */
-    delete thd_ndb;
-    thd_ndb= NULL;
-  }
-  DBUG_RETURN(thd_ndb);
-}
-
-
-void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
-{
-  DBUG_ENTER("release_thd_ndb");
-  delete thd_ndb;
-  DBUG_VOID_RETURN;
-}
-
-
 /*
-  If this thread already has a Thd_ndb object allocated
-  in current THD, reuse it. Otherwise
-  seize a Thd_ndb object, assign it to current THD and use it.
- 
-*/
-
-Ndb* check_ndb_in_thd(THD* thd)
-{
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  if (!thd_ndb)
-  {
-    if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
-      return NULL;
-    set_thd_ndb(thd, thd_ndb);
-  }
-  return thd_ndb->ndb;
-}
-
+  Try to discover one table from NDB, if table exists
+  and has a frm in NDB, return the frm data to caller
 
+  SYNOPSIS
+  ndbcluster_discover
+    thd - thd handle
+    db - database name
+    name - table name
+    frmblob - pointer where to store the frm fetched from NDB
+    frmlen - length of frm fetched from NDB
 
-int ha_ndbcluster::check_ndb_connection(THD* thd)
-{
-  Ndb *ndb;
-  DBUG_ENTER("check_ndb_connection");
-  
-  if (!(ndb= check_ndb_in_thd(thd)))
-    DBUG_RETURN(HA_ERR_NO_CONNECTION);
-  ndb->setDatabaseName(m_dbname);
-  DBUG_RETURN(0);
-}
-
-
-int ndbcluster_close_connection(THD *thd)
-{
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  DBUG_ENTER("ndbcluster_close_connection");
-  if (thd_ndb)
-  {
-    ha_ndbcluster::release_thd_ndb(thd_ndb);
-    set_thd_ndb(thd, NULL); // not strictly required but does not hurt either
-  }
-  DBUG_RETURN(0);
-}
-
+  NOTE
+    The data returned in frmblob should be freed by caller
 
-/*
-  Try to discover one table from NDB
- */
+  RETURN_VALUE
+  -1 : Table did not exists
+   0 : Table created ok
+   > 0 : Error, table existed but could not be discovered
+*/
 
 int ndbcluster_discover(THD* thd, const char *db, const char *name,
                         const void** frmblob, uint* frmlen)
@@ -4455,31 +4506,32 @@
   const NDBTAB* tab;
   Ndb* ndb;
   DBUG_ENTER("ndbcluster_discover");
-  DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); 
+  DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
 
+  /* Get ndb object */
   if (!(ndb= check_ndb_in_thd(thd)))
-    DBUG_RETURN(HA_ERR_NO_CONNECTION);  
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
   ndb->setDatabaseName(db);
 
+  /* Check if table exists */
   NDBDICT* dict= ndb->getDictionary();
-  dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
   dict->invalidateTable(name);
   if (!(tab= dict->getTable(name)))
-  {    
+  {
     const NdbError err= dict->getNdbError();
     if (err.code == 709)
       DBUG_RETURN(-1);
     ERR_RETURN(err);
   }
   DBUG_PRINT("info", ("Found table %s", tab->getName()));
-  
-  len= tab->getFrmLength();  
+
+  len= tab->getFrmLength();
   if (len == 0 || tab->getFrmData() == NULL)
   {
     DBUG_PRINT("error", ("No frm data found."));
     DBUG_RETURN(1);
   }
-  
+
   if (unpackfrm(&data, &len, tab->getFrmData()))
   {
     DBUG_PRINT("error", ("Could not unpack table"));
@@ -4488,28 +4540,42 @@
 
   *frmlen= len;
   *frmblob= data;
-  
+
   DBUG_RETURN(0);
 }
 
+
 /*
   Check if a table exists in NDB
 
- */
+  SYNOPSIS
+  ndbcluster_table_exists_in_engine
+    thd - thd handle
+    db - database name
+    name - table name
+
+  RETURN_VALUE
+    0 - table didn't exist
+    1 - table existed
+   >1 - an error occured
+
+*/
 
-int ndbcluster_table_exists_in_engine(THD* thd, const char *db, const char *name)
+int ndbcluster_table_exists_in_engine(THD* thd, const char *db, 
+				      const char *name)
 {
   const NDBTAB* tab;
   Ndb* ndb;
   DBUG_ENTER("ndbcluster_table_exists_in_engine");
   DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
 
+  /* Get Ndb object to use */
   if (!(ndb= check_ndb_in_thd(thd)))
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
   ndb->setDatabaseName(db);
 
+  /* Look in dictionary of Ndb for the table */
   NDBDICT* dict= ndb->getDictionary();
-  dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
   dict->invalidateTable(name);
   if (!(tab= dict->getTable(name)))
   {
@@ -4524,15 +4590,6 @@
 }
 
 
-
-extern "C" byte* tables_get_key(const char *entry, uint *length,
-                                my_bool not_used __attribute__((unused)))
-{
-  *length= strlen(entry);
-  return (byte*) entry;
-}
-
-
 /*
   Drop a database in NDB Cluster
  */
@@ -4541,19 +4598,20 @@
 {
   DBUG_ENTER("ndbcluster_drop_database");
   THD *thd= current_thd;
-  char dbname[FN_HEADLEN];
   Ndb* ndb;
   NdbDictionary::Dictionary::List list;
   uint i;
   char *tabname;
+  char buf[FN_HEADLEN], dbname[FN_HEADLEN];
   List<char> drop_list;
   int ret= 0;
-  ha_ndbcluster::set_dbname(path, (char *)&dbname);
+
+  split_path(path, dbname, buf);
   DBUG_PRINT("enter", ("db: %s", dbname));
-  
+
   if (!(ndb= check_ndb_in_thd(thd)))
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
-  
+
   // List tables in NDB
   NDBDICT *dict= ndb->getDictionary();
   if (dict->listObjects(list, 
@@ -4588,19 +4646,30 @@
   DBUG_RETURN(ret);      
 }
 
+/*
+  Util function for ndbcluster_find_files, used when looking up a
+  table by name in hash list
+*/
+extern "C" byte* tables_get_key(const char *entry, uint *length,
+                                my_bool not_used __attribute__((unused)))
+{
+  *length= strlen(entry);
+  return (byte*) entry;
+}
+
 
 int ndbcluster_find_files(THD *thd,const char *db,const char *path,
                           const char *wild, bool dir, List<char> *files)
 {
-  DBUG_ENTER("ndbcluster_find_files");
-  DBUG_PRINT("enter", ("db: %s", db));
-  { // extra bracket to avoid gcc 2.95.3 warning
   uint i;
   Ndb* ndb;
   char name[FN_REFLEN];
   HASH ndb_tables, ok_tables;
   NdbDictionary::Dictionary::List list;
 
+  DBUG_ENTER("ndbcluster_find_files");
+  DBUG_PRINT("enter", ("db: %s", db));
+
   if (!(ndb= check_ndb_in_thd(thd)))
     DBUG_RETURN(HA_ERR_NO_CONNECTION);
 
@@ -4742,7 +4811,7 @@
   
   hash_free(&ok_tables);
   hash_free(&ndb_tables);
-  } // extra bracket to avoid gcc 2.95.3 warning
+
   DBUG_RETURN(0);    
 }
 
@@ -4781,18 +4850,12 @@
   g_ndb_cluster_connection->set_optimized_node_selection
     (opt_ndb_optimized_node_selection);
 
-  // Create a Ndb object to open the connection  to NDB
-  if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
+  /* Create a global Ndb object to open and _keep_ the connection to NDB */
+  if (!(g_ndb= get_new_ndb("sys")))
   {
     DBUG_PRINT("error", ("failed to create global ndb object"));
     goto ndbcluster_init_error;
   }
- 
g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
-  if (g_ndb->init() != 0)
-  {
-    ERR_PRINT (g_ndb->getNdbError());
-    goto ndbcluster_init_error;
-  }
 
   if ((res= g_ndb_cluster_connection->connect(0,0,0)) == 0)
   {
@@ -4828,7 +4891,7 @@
   }
   
   (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
-                   (hash_get_key) ndbcluster_get_key,0,0);
+                   (hash_get_key) ndb_share_get_key,0,0);
   pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
   pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&COND_ndb_util_thread, NULL);
@@ -4906,6 +4969,7 @@
   pthread_mutex_destroy(&LOCK_ndb_util_thread);
   pthread_cond_destroy(&COND_ndb_util_thread);
   ndbcluster_inited= 0;
+
   DBUG_RETURN(0);
 }
 
@@ -4927,86 +4991,6 @@
   DBUG_VOID_RETURN;
 }
 
-/**
- * Set a given location from full pathname to database name
- *
- */
-void ha_ndbcluster::set_dbname(const char *path_name, char *dbname)
-{
-  char *end, *ptr;
-  
-  /* Scan name from the end */
-  ptr= strend(path_name)-1;
-  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
-    ptr--;
-  }
-  ptr--;
-  end= ptr;
-  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
-    ptr--;
-  }
-  uint name_len= end - ptr;
-  memcpy(dbname, ptr + 1, name_len);
-  dbname[name_len]= '\0';
-#ifdef __WIN__
-  /* Put to lower case */
-  
-  ptr= dbname;
-  
-  while (*ptr != '\0') {
-    *ptr= tolower(*ptr);
-    ptr++;
-  }
-#endif
-}
-
-/*
-  Set m_dbname from full pathname to table file
- */
-
-void ha_ndbcluster::set_dbname(const char *path_name)
-{
-  set_dbname(path_name, m_dbname);
-}
-
-/**
- * Set a given location from full pathname to table file
- *
- */
-void
-ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
-{
-  char *end, *ptr;
-  
-  /* Scan name from the end */
-  end= strend(path_name)-1;
-  ptr= end;
-  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
-    ptr--;
-  }
-  uint name_len= end - ptr;
-  memcpy(tabname, ptr + 1, end - ptr);
-  tabname[name_len]= '\0';
-#ifdef __WIN__
-  /* Put to lower case */
-  ptr= tabname;
-  
-  while (*ptr != '\0') {
-    *ptr= tolower(*ptr);
-    ptr++;
-  }
-#endif
-}
-
-/*
-  Set m_tabname from full pathname to table file 
- */
-
-void ha_ndbcluster::set_tabname(const char *path_name)
-{
-  set_tabname(path_name, m_tabname);
-}
-
 
 ha_rows 
 ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
@@ -5033,13 +5017,29 @@
   DBUG_RETURN(10); /* Good guess when you don't know anything */
 }
 
+
+/*
+  Return bitfield describing the capabilities of this handler
+*/
+
 ulong ha_ndbcluster::table_flags(void) const
 {
+  ulong table_flags=
+    HA_REC_NOT_IN_SEQ |
+    HA_NULL_IN_KEY |
+    HA_AUTO_PART_KEY |
+    HA_NO_PREFIX_CHAR_KEYS |
+    HA_NEED_READ_RANGE_BUFFER |
+    HA_CAN_GEOMETRY |
+    HA_CAN_BIT_FIELD;
+
   if (m_ha_not_exact_count)
-    return m_table_flags | HA_NOT_EXACT_COUNT;
+    return table_flags | HA_NOT_EXACT_COUNT;
   else
-    return m_table_flags;
+    return table_flags;
 }
+
+
 const char * ha_ndbcluster::table_type() const 
 {
   return("ndbcluster");
@@ -5093,7 +5093,7 @@
 }
 
 
-uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
+uint ndb_get_commitcount(THD *thd, const char *dbname, const char *tabname,
                          Uint64 *commit_count)
 {
   DBUG_ENTER("ndb_get_commitcount");
@@ -5133,11 +5133,12 @@
   if (!(ndb= check_ndb_in_thd(thd)))
     DBUG_RETURN(1);
   ndb->setDatabaseName(dbname);
+  const NDBTAB* tab= ndb->getDictionary()->getTable(tabname);
   uint lock= share->commit_count_lock;
   pthread_mutex_unlock(&share->mutex);
 
   struct Ndb_statistics stat;
-  if (ndb_get_table_statistics(ndb, tabname, &stat))
+  if (tab && ndb_get_table_statistics(ndb, tab, &stat))
   {
     free_share(share);
     DBUG_RETURN(1);
@@ -5301,8 +5302,8 @@
   data we want to or can share.
  */
 
-static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
-                                my_bool not_used __attribute__((unused)))
+static byte* ndb_share_get_key(NDB_SHARE *share,uint *length,
+			       my_bool not_used __attribute__((unused)))
 {
   *length=share->table_name_length;
   return (byte*) share->table_name;
@@ -5317,7 +5318,7 @@
                                        (byte*) table_name,
                                        length)))
   {
-    if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
+    if ((share=(NDB_SHARE*) my_malloc(sizeof(*share)+length+1,
                                        MYF(MY_WME | MY_ZEROFILL))))
     {
       share->table_name_length=length;
@@ -5357,7 +5358,7 @@
   pthread_mutex_lock(&ndbcluster_mutex);
   if (!--share->use_count)
   {
-     hash_delete(&ndbcluster_open_tables, (byte*) share);
+    hash_delete(&ndbcluster_open_tables, (byte*) share);
     thr_lock_delete(&share->lock);
     pthread_mutex_destroy(&share->mutex);
     my_free((gptr) share, MYF(0));
@@ -5366,6 +5367,52 @@
 }
 
 
+static THD_NDB_SHARE* get_thd_share(Thd_ndb *thd_ndb,
+				    const char *table_name)
+{
+  THD_NDB_SHARE *thd_share;
+  uint length=(uint) strlen(table_name);
+  DBUG_ENTER("get_thd_share");
+  DBUG_PRINT("enter", ("table_name: %s", table_name));
+  if (!(thd_share= (THD_NDB_SHARE*)hash_search(&thd_ndb->shared_tables,
+						   (byte*) table_name,
+						   length)))
+  {
+    if ((thd_share= (THD_NDB_SHARE *)my_malloc(sizeof(*thd_share)+length+1,
+					       MYF(MY_WME | MY_ZEROFILL))))
+    {
+      thd_share->table_name_length=length;
+      thd_share->table_name=(char*) (thd_share+1);
+      strmov(thd_share->table_name, table_name);
+      if (my_hash_insert(&thd_ndb->shared_tables,
+			 (byte*) thd_share))
+      {
+        my_free((gptr) thd_share, 0);
+	DBUG_RETURN(0);
+      }
+      thd_share->uncommitted_rows= 0;
+    }
+    else
+    {
+      DBUG_PRINT("error", ("Failed to alloc THD_NDB_SHARE"));
+      pthread_mutex_unlock(&ndbcluster_mutex);
+      DBUG_RETURN(0);
+    }
+  }
+
+  DBUG_PRINT("thd_share",
+	     ("table_name: %s, length: %d uncommitted_rows: %d",
+	      thd_share->table_name, thd_share->table_name_length,
+	      thd_share->uncommitted_rows));
+  DBUG_RETURN(thd_share);
+}
+
+
+static void free_thd_share(THD_NDB_SHARE* thd_share)
+{
+  DBUG_ASSERT(thd_share);
+  my_free((gptr) thd_share, MYF(0));
+}
 
 /*
   Internal representation of the frm blob
@@ -5464,43 +5511,43 @@
    DBUG_RETURN(0);
 }
 
-static 
+static
 int
-ndb_get_table_statistics(Ndb* ndb, const char * table,
+ndb_get_table_statistics(Ndb* ndb, const NDBTAB* table,
                          struct Ndb_statistics * ndbstat)
 {
   DBUG_ENTER("ndb_get_table_statistics");
-  DBUG_PRINT("enter", ("table: %s", table));
+  DBUG_PRINT("enter", ("table: %s", table->getName()));
   NdbTransaction* pTrans= ndb->startTransaction();
-  do 
+  do
   {
     if (pTrans == NULL)
       break;
-      
+
     NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
     if (pOp == NULL)
       break;
-    
+
     if (pOp->readTuples(NdbOperation::LM_CommittedRead))
       break;
-    
+
     int check= pOp->interpret_exit_last_row();
     if (check == -1)
       break;
-    
+
     Uint64 rows, commits, mem;
     Uint32 size;
     pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
     pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
     pOp->getValue(NdbDictionary::Column::ROW_SIZE, (char*)&size);
     pOp->getValue(NdbDictionary::Column::FRAGMENT_MEMORY, (char*)&mem);
-    
+
     check= pTrans->execute(NdbTransaction::NoCommit,
                            NdbTransaction::AbortOnError,
                            TRUE);
     if (check == -1)
       break;
-    
+
     Uint32 count= 0;
     Uint64 sum_rows= 0;
     Uint64 sum_commits= 0;
@@ -5515,7 +5562,7 @@
       sum_mem+= mem;
       count++;
     }
-    
+
     if (check == -1)
       break;
 
@@ -5542,31 +5589,6 @@
   DBUG_RETURN(-1);
 }
 
-/*
-  Create a .ndb file to serve as a placeholder indicating 
-  that the table with this name is a ndb table
-*/
-
-int ha_ndbcluster::write_ndb_file()
-{
-  File file;
-  bool error=1;
-  char path[FN_REFLEN];
-  
-  DBUG_ENTER("write_ndb_file");
-  DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname));
-
-  (void)strxnmov(path, FN_REFLEN, 
-                 mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS);
-
-  if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
-  {
-    // It's an empty file
-    error=0;
-    my_close(file,MYF(0));
-  }
-  DBUG_RETURN(error);
-}
 
 int
 ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
@@ -5964,6 +5986,7 @@
 {
   THD *thd; /* needs to be first for thread_stack */
   Ndb* ndb;
+  NDBDICT* dict;
   struct timespec abstime;
 
   my_thread_init();
@@ -5972,17 +5995,32 @@
 
   thd= new THD; /* note that contructor of THD uses DBUG_ */
   THD_CHECK_SENTRY(thd);
-  ndb= new Ndb(g_ndb_cluster_connection, "");
 
   pthread_detach_this_thread();
   ndb_util_thread= pthread_self();
 
   thd->thread_stack= (char*)&thd; /* remember where our stack is */
-  if (thd->store_globals() && (ndb->init() != -1))
+  if (thd->store_globals())
   {
     thd->cleanup();
     delete thd;
+    DBUG_RETURN(NULL);
+  }
+
+  /* Get Ndb object for this thread */
+  if (!(ndb= get_new_ndb("")))
+  {
+    thd->cleanup();
+    delete thd;
+    DBUG_RETURN(NULL);
+  }
+
+  /* Get pointer to dictionary of Ndb */
+  if (!(dict= ndb->getDictionary()))
+  {
     delete ndb;
+    thd->cleanup();
+    delete thd;
     DBUG_RETURN(NULL);
   }
 
@@ -6016,7 +6054,7 @@
     for (uint i= 0; i < ndbcluster_open_tables.records; i++)
     {
       share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
-      share->use_count++; /* Make sure the table can't be closed */
+      share->use_count++; /* Make sure the share is not deallocated */
       DBUG_PRINT("ndb_util_thread",
                  ("Found open table[%d]: %s, use_count: %d",
                   i, share->table_name, share->use_count));
@@ -6044,6 +6082,7 @@
 
       /* Contact NDB to get commit count for table */
       ndb->setDatabaseName(db);
+      const NDBTAB* tab= dict->getTable(tabname);
       struct Ndb_statistics stat;
 
       uint lock;
@@ -6051,7 +6090,7 @@
       lock= share->commit_count_lock;
       pthread_mutex_unlock(&share->mutex);
 
-      if (ndb_get_table_statistics(ndb, tabname, &stat) == 0)
+      if (ndb_get_table_statistics(ndb, tab, &stat) == 0)
       {
         DBUG_PRINT("ndb_util_thread",
                    ("Table: %s, commit_count: %llu, rows: %llu",
@@ -6099,9 +6138,9 @@
     }
   }
 
+  delete ndb;
   thd->cleanup();
   delete thd;
-  delete ndb;
   DBUG_PRINT("exit", ("ndb_util_thread"));
   my_thread_end();
   pthread_exit(0);

--- 1.93/sql/ha_ndbcluster.h	2005-10-04 06:08:55 +02:00
+++ 1.94/sql/ha_ndbcluster.h	2006-01-04 16:06:05 +01:00
@@ -56,6 +56,7 @@
   unsigned char *unique_index_attrid_map;
 } NDB_INDEX_DATA;
 
+/* Structure shared between all ndb handlers with same table name */
 typedef struct st_ndbcluster_share {
   THR_LOCK lock;
   pthread_mutex_t mutex;
@@ -65,6 +66,16 @@
   ulonglong commit_count;
 } NDB_SHARE;
 
+/* Structure shared between all ndb handlers with same table name in thd
+  - manages uncommitted insert/deletes during transaction to
+    get record count correct
+*/
+typedef struct st_thd_ndb_share {
+  ha_rows uncommitted_rows;
+  char *table_name;
+  uint table_name_length;
+} THD_NDB_SHARE;
+
 typedef enum ndb_item_type {
   NDB_VALUE = 0,   // Qualified more with Item::Type
   NDB_FIELD = 1,   // Qualified from table definition
@@ -433,24 +444,29 @@
   Ndb_rewrite_context *rewrite_stack;
 };
 
+
 /*
   Place holder for ha_ndbcluster thread specific data
 */
 
-class Thd_ndb 
+class Thd_ndb
 {
  public:
   Thd_ndb();
   ~Thd_ndb();
   Ndb *ndb;
-  ulong count;
   uint lock_count;
   NdbTransaction *all;
   NdbTransaction *stmt;
   int error;
+  /* List of tables changed during transaction */
   List<NDB_SHARE> changed_tables;
+  /* Hash list for THD_NDB_SHARE's, see definition od THD_NDB_SHARE above */
+  HASH shared_tables;
+  void uncommitted_rows_init();
 };
 
+
 class ha_ndbcluster: public handler
 {
  public:
@@ -528,11 +544,7 @@
   int end_bulk_insert();
 
   static Thd_ndb* seize_thd_ndb();
-  static void release_thd_ndb(Thd_ndb* thd_ndb);
  
-static void set_dbname(const char *pathname, char *dbname);
-static void set_tabname(const char *pathname, char *tabname);
-
   /*
     Condition pushdown
   */
@@ -578,19 +590,11 @@
                                      qc_engine_callback *engine_callback,
                                      ulonglong *engine_data);
 private:
-  int alter_table_name(const char *to);
-  int drop_table();
-  int create_index(const char *name, KEY *key_info, bool unique);
-  int create_ordered_index(const char *name, KEY *key_info);
-  int create_unique_index(const char *name, KEY *key_info);
   int initialize_autoincrement(const void *table);
-  enum ILBP {ILBP_CREATE = 0, ILBP_OPEN = 1}; // Index List Build Phase
-  int build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase);
-  int get_metadata(const char* path);
-  void release_metadata();
+  int build_index_list(Ndb* ndb);
+  int get_table_and_index_handles(const char* path);
+  void release_table_and_index_handles();
   NDB_INDEX_TYPE get_index_type(uint idx_no) const;
-  NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
-  int check_index_fields_not_null(uint index_no);
 
   int pk_read(const byte *key, uint key_len, byte *buf);
   int complemented_pk_read(const byte *old_data, byte *new_data);
@@ -611,9 +615,6 @@
   void unpack_record(byte *buf);
   int get_ndb_lock_type(enum thr_lock_type type);
 
-  void set_dbname(const char *pathname);
-  void set_tabname(const char *pathname);
-
   bool set_hidden_key(NdbOperation*,
                       uint fieldnr, const byte* field_ptr);
   int set_ndb_key(NdbOperation*, Field *field,
@@ -630,22 +631,17 @@
   void print_results();
 
   ulonglong get_auto_increment();
-  void invalidate_dictionary_cache(bool global);
   int ndb_err(NdbTransaction*);
   bool uses_blob_value(bool all_fields);
 
   char *update_table_comment(const char * comment);
 
-  int write_ndb_file();
-
   int check_ndb_connection(THD* thd= current_thd);
 
   void set_rec_per_key();
-  void records_update();
-  void no_uncommitted_rows_execute_failure();
-  void no_uncommitted_rows_update(int);
-  void no_uncommitted_rows_init(THD *);
-  void no_uncommitted_rows_reset(THD *);
+  void uncommitted_rows_execute_failure();
+  void uncommitted_rows_update(int);
+  void uncommitted_rows_init(Thd_ndb *);
 
   /*
     Condition pushdown
@@ -665,18 +661,22 @@
   friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*);
   friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*);
 
+  // The NDB transaction this handler is part of
   NdbTransaction *m_active_trans;
+  // Pointer to cursor this handler has opened in NDB
   NdbScanOperation *m_active_cursor;
+  // Pointer to the table this handler has opened in NDB
   void *m_table;
-  int m_table_version;
-  void *m_table_info;
-  char m_dbname[FN_HEADLEN];
-  //char m_schemaname[FN_HEADLEN];
-  char m_tabname[FN_HEADLEN];
-  ulong m_table_flags;
+  // Pointer to and info about the indexes this handler has opened in NDB
+  NDB_INDEX_DATA  m_index[MAX_KEY];
+  const char* m_tabname;  // Points to table->s->table_name
+  const char* m_dbname;
+  char m_dbname_buf[FN_HEADLEN];
   THR_LOCK_DATA m_lock;
+  // Shared between all tables with same name
   NDB_SHARE *m_share;
-  NDB_INDEX_DATA  m_index[MAX_KEY];
+  // Shared between all tables with same name in thd
+  THD_NDB_SHARE *m_thd_share;
   // NdbRecAttr has no reference to blob
   typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
   NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
@@ -710,8 +710,11 @@
   const NdbOperation *m_current_multi_operation;
   NdbIndexScanOperation *m_multi_cursor;
   byte *m_multi_range_cursor_result_ptr;
+  THD* m_thd; // Thd that handler is locked to
+
   int setup_recattr(const NdbRecAttr*);
-  Ndb *get_ndb();
+  Ndb *get_ndb(THD *thd= current_thd);
+
 };
 
 extern struct show_var_st ndb_status_variables[];

--- 1.3/mysql-test/r/ndb_multi.result	2005-04-27 18:17:18 +02:00
+++ 1.4/mysql-test/r/ndb_multi.result	2006-01-04 16:06:04 +01:00
@@ -29,7 +29,7 @@
 create table t1 (a int) engine=ndbcluster;
 insert into t1 value (2);
 select * from t1;
-ERROR HY000: Got error 241 'Invalid schema object version' from ndbcluster
+ERROR HY000: Table definition has changed, please retry transaction
 select * from t1;
 a
 2

--- 1.5/mysql-test/t/ndb_multi.test	2005-07-28 02:21:45 +02:00
+++ 1.6/mysql-test/t/ndb_multi.test	2006-01-04 16:06:05 +01:00
@@ -38,7 +38,7 @@
 insert into t1 value (2);
 connection server1;
 # Currently a retry is required remotely
---error 1296
+--error 1412
 select * from t1;
 select * from t1;
 
--- New file ---
+++ mysql-test/r/ndb_alter_table2.result	06/01/04 16:06:05
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (
a INT NOT NULL PRIMARY KEY,
b INT NOT NULL
) ENGINE=ndbcluster;
BEGIN;
INSERT INTO t1 VALUES (9410,9412);
BEGIN;
INSERT INTO t1 VALUES (9411,9412);
BEGIN;
INSERT INTO t1 VALUES (9412,9412);
BEGIN;
INSERT INTO t1 VALUES (9413,9412);
BEGIN;
INSERT INTO t1 VALUES (9414,9412);
BEGIN;
INSERT INTO t1 VALUES (9415,9412);
ROLLBACK;
ROLLBACK;
ROLLBACK;
ROLLBACK;
ROLLBACK;
ROLLBACK;
drop table t1;
CREATE TABLE t1 (
a INT NOT NULL PRIMARY KEY,
b INT NOT NULL,
c INT NOT NULL
) ENGINE=ndbcluster;
select * from t1;
ERROR HY000: Table definition has changed, please retry transaction
select * from t1;
a	b	c
select * from t1;
a	b	c
select * from t1;
a	b	c
select * from t1;
a	b	c
select * from t1;
a	b	c
drop table t1;

--- New file ---
+++ mysql-test/r/ndb_count.result	06/01/04 16:06:06
drop table if exists t1, t2;
create table t1 (
a int not null primary key
) engine=ndbcluster;
create table t2 (
a int not null primary key
) engine=ndbcluster;
insert into t2 values(3);
insert into t1 values(1);
insert into t1 values(2);
select count(*) from t1;
count(*)
2
select count(*) from t2;
count(*)
1
begin;
insert into t2 values(6);
insert into t1 values(7);
insert into t1 values(8);
select count(*) from t1;
count(*)
4
select count(*) from t2;
count(*)
2
commit;
begin;
insert into t2 values(9);
insert into t1 values(10);
insert into t1 values(11);
select count(*) from t1;
count(*)
6
select count(*) from t2;
count(*)
3
commit;
begin;
insert into t2 values(14);
insert into t1 values(15);
insert into t1 values(16);
select count(*) from t2;
count(*)
4
delete from t2 where a=14;
select count(*) from t2;
count(*)
3
delete from t2 where a=9;
select count(*) from t2;
count(*)
2
commit;
begin;
insert into t2 values(12);
insert into t1 values(13);
insert into t1 values(4);
select count(*) from t1;
count(*)
10
select count(*) from t2;
count(*)
3
rollback;
begin;
insert into t2 values(23);
insert into t1 values(24),(25),(28);
select count(*) from t1;
count(*)
11
select count(*) from t2;
count(*)
3
commit;
select count(*) from t1;
count(*)
11
select count(*) from t2;
count(*)
3
delete from t1;
delete from t2;
Multiple handlers from same thread
insert into t2 values(1);
insert into t2 values(3);
begin;
insert into t2 values(2);
insert into t2 select a+100 from t2;
insert into t2 select count(*)+200 from t2;
select count(*) from t2;
count(*)
7
commit;
Multiple threads
BEGIN;
INSERT INTO t1 VALUES (101);
select count(*) from t1;
count(*)
1
BEGIN;
INSERT INTO t1 VALUES (102);
BEGIN;
INSERT INTO t1 VALUES (103);
select count(*) from t1;
count(*)
1
BEGIN;
INSERT INTO t1 VALUES (104);
select count(*) from t1;
count(*)
1
BEGIN;
INSERT INTO t1 VALUES (105);
select count(*) from t1;
count(*)
1
BEGIN;
INSERT INTO t1 VALUES (106), (107);
select count(*) from t1;
count(*)
2
commit;
select count(*) from t1;
count(*)
2
select count(*) from t1;
count(*)
2
ROLLBACK;
commit;
ROLLBACK;
select count(*) from t1;
count(*)
3
commit;
select count(*) from t1;
count(*)
3
select count(*) from t1;
count(*)
5
ROLLBACK;
select count(*) from t1;
count(*)
3
select count(*) from t1;
count(*)
3
drop table t1, t2;

--- New file ---
+++ mysql-test/t/ndb_alter_table2.test	06/01/04 16:06:06
-- source include/have_ndb.inc
-- source include/have_multi_ndb.inc
-- source include/not_embedded.inc

--disable_warnings
DROP TABLE IF EXISTS t1;
--enable_warnings

connect (con1,localhost,root,,test);
connect (con2,localhost,root,,test);
connect (con3,localhost,root,,test);
connect (con4,localhost,root,,test);
connect (con5,localhost,root,,test);
connect (con6,localhost,root,,test);

CREATE TABLE t1 (
  a INT NOT NULL PRIMARY KEY,
  b INT NOT NULL
) ENGINE=ndbcluster;

connection con1;
BEGIN;
INSERT INTO t1 VALUES (9410,9412);
connection con2;
BEGIN;
--send
INSERT INTO t1 VALUES (9411,9412);
connection con3;
BEGIN;
--send
INSERT INTO t1 VALUES (9412,9412);
connection con4;
BEGIN;
--send
INSERT INTO t1 VALUES (9413,9412);
connection con5;
BEGIN;
--send
INSERT INTO t1 VALUES (9414,9412);
connection con6;
BEGIN;
--send
INSERT INTO t1 VALUES (9415,9412);
connection con1;
sleep 1;

ROLLBACK;
connection con2;
reap;
ROLLBACK;
connection con3;
reap;
ROLLBACK;
connection con4;
reap;
ROLLBACK;
connection con5;
reap;
ROLLBACK;
connection con6;
reap;
ROLLBACK;

connection server2;

drop table t1;
CREATE TABLE t1 (
  a INT NOT NULL PRIMARY KEY,
  b INT NOT NULL,
  c INT NOT NULL
) ENGINE=ndbcluster;

connection server1;

--error 1412
select * from t1;
# FLUSH TABLES;
select * from t1;
select * from t1;
select * from t1;
select * from t1;
select * from t1;

drop table t1;

--- New file ---
+++ mysql-test/t/ndb_count.test	06/01/04 16:06:06
-- source include/have_ndb.inc
-- source include/not_embedded.inc

--disable_warnings
drop table if exists t1, t2;
--enable_warnings


create table t1 (
  a int not null primary key
) engine=ndbcluster;

create table t2 (
  a int not null primary key
) engine=ndbcluster;


insert into t2 values(3);
insert into t1 values(1);
insert into t1 values(2);
select count(*) from t1;
select count(*) from t2;


begin;
insert into t2 values(6);
insert into t1 values(7);
insert into t1 values(8);
select count(*) from t1;
select count(*) from t2;
commit;

begin;
insert into t2 values(9);
insert into t1 values(10);
insert into t1 values(11);
select count(*) from t1;
select count(*) from t2;
commit;

begin;
insert into t2 values(14);
insert into t1 values(15);
insert into t1 values(16);
select count(*) from t2;
delete from t2 where a=14;
select count(*) from t2;
delete from t2 where a=9;
select count(*) from t2;
commit;

begin;
insert into t2 values(12);
insert into t1 values(13);
insert into t1 values(4);
select count(*) from t1;
select count(*) from t2;
rollback;

begin;
insert into t2 values(23);
insert into t1 values(24),(25),(28);
select count(*) from t1;
select count(*) from t2;
commit;

select count(*) from t1;
select count(*) from t2;

delete from t1;
delete from t2;

# Multiple handlers from same thread
echo Multiple handlers from same thread;

insert into t2 values(1);
insert into t2 values(3);
begin;
insert into t2 values(2);
insert into t2 select a+100 from t2;
insert into t2 select count(*)+200 from t2;
select count(*) from t2;
commit;


# Multiple threads
echo Multiple threads;

connect (con1,localhost,root,,test);
connect (con2,localhost,root,,test);
connect (con3,localhost,root,,test);
connect (con4,localhost,root,,test);
connect (con5,localhost,root,,test);
connect (con6,localhost,root,,test);

connection con1;
BEGIN;
INSERT INTO t1 VALUES (101);
select count(*) from t1;
connection con2;
BEGIN;
INSERT INTO t1 VALUES (102);
connection con3;
BEGIN;
INSERT INTO t1 VALUES (103);
select count(*) from t1;
connection con4;
BEGIN;
INSERT INTO t1 VALUES (104);
select count(*) from t1;
connection con5;
BEGIN;
INSERT INTO t1 VALUES (105);
select count(*) from t1;
connection con6;
BEGIN;
INSERT INTO t1 VALUES (106), (107);
select count(*) from t1;
connection con1;
sleep 1;

commit;
connection con2;
select count(*) from t1;
select count(*) from t1;
ROLLBACK;
connection con3;
commit;
connection con4;
ROLLBACK;
connection con5;
select count(*) from t1;
commit;
select count(*) from t1;
connection con6;
select count(*) from t1;
ROLLBACK;
select count(*) from t1;
select count(*) from t1;

drop table t1, t2;

Thread
bk commit into 5.0 tree (msvensson:1.1990) BUG#13228msvensson4 Jan