List:Commits« Previous MessageNext Message »
From:lzhou Date:August 15 2007 3:23pm
Subject:bk commit into 5.1 tree (lzhou:1.2528)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of zhl. When zhl does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-08-15 15:22:52+00:00, lzhou@dev3-63.(none) +3 -0
  Merge dev3-63.(none):/home/zhl/mysql/mysql-5.0/bug29674
  into  dev3-63.(none):/home/zhl/mysql/mysql-5.1/bug29674
  MERGE: 1.1810.2964.2

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2007-08-15 15:22:38+00:00, lzhou@dev3-63.(none) +0 -10
    Revert the change in parseTableInfo for modified only in 5.0
    MERGE: 1.74.35.2

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2007-08-15 15:10:10+00:00, lzhou@dev3-63.(none) +0 -0
    Merge rename: ndb/src/ndbapi/NdbDictionaryImpl.cpp -> storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp@stripped, 2007-08-15 15:22:38+00:00, lzhou@dev3-63.(none) +0 -2
    Revert declare of pareTableInfo for 5.1
    MERGE: 1.30.14.2

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp@stripped, 2007-08-15 15:10:10+00:00, lzhou@dev3-63.(none) +0 -0
    Merge rename: ndb/src/ndbapi/NdbDictionaryImpl.hpp -> storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp

  storage/ndb/tools/restore/Restore.cpp@stripped, 2007-08-15 15:22:38+00:00, lzhou@dev3-63.(none) +0 -32
    Revert change in 5.1
    MERGE: 1.22.12.2

  storage/ndb/tools/restore/Restore.cpp@stripped, 2007-08-15 15:10:10+00:00, lzhou@dev3-63.(none) +0 -0
    Merge rename: ndb/tools/restore/Restore.cpp -> storage/ndb/tools/restore/Restore.cpp

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	lzhou
# Host:	dev3-63.(none)
# Root:	/home/zhl/mysql/mysql-5.1/bug29674/RESYNC

--- 1.22.12.1/ndb/tools/restore/Restore.cpp	2007-08-15 15:23:13 +00:00
+++ 1.52/storage/ndb/tools/restore/Restore.cpp	2007-08-15 15:23:13 +00:00
@@ -15,6 +15,7 @@
 
 #include "Restore.hpp"
 #include <NdbTCP.h>
+#include <NdbMem.h>
 #include <OutputStream.hpp>
 #include <Bitmask.hpp>
 
@@ -22,7 +23,10 @@
 #include <trigger_definitions.h>
 #include <SimpleProperties.hpp>
 #include <signaldata/DictTabInfo.hpp>
+#include <ndb_limits.h>
+#include <NdbAutoPtr.hpp>
 
+#include "../../../../sql/ha_ndbcluster_tables.h"
 extern NdbRecordPrintFormat g_ndbrecord_print_format;
 
 Uint16 Twiddle16(Uint16 in); // Byte shift 16-bit data
@@ -156,27 +160,137 @@ RestoreMetaData::readMetaTableList() {
 bool
 RestoreMetaData::readMetaTableDesc() {
   
-  Uint32 sectionInfo[2];
+  Uint32 sectionInfo[3];
   
   // Read section header 
-  if (buffer_read(&sectionInfo, sizeof(sectionInfo), 1) != 1){
+  Uint32 sz = sizeof(sectionInfo) >> 2;
+  if (m_fileHeader.NdbVersion < NDBD_ROWID_VERSION)
+  {
+    sz = 2;
+    sectionInfo[2] = htonl(DictTabInfo::UserTable);
+  }
+  if (buffer_read(&sectionInfo, 4*sz, 1) != 1){
     err << "readMetaTableDesc read header error" << endl;
     return false;
   } // if
   sectionInfo[0] = ntohl(sectionInfo[0]);
   sectionInfo[1] = ntohl(sectionInfo[1]);
+  sectionInfo[2] = ntohl(sectionInfo[2]);
   
   assert(sectionInfo[0] == BackupFormat::TABLE_DESCRIPTION);
   
   // Read dictTabInfo buffer
-  const Uint32 len = (sectionInfo[1] - 2);
+  const Uint32 len = (sectionInfo[1] - sz);
   void *ptr;
   if (buffer_get_ptr(&ptr, 4, len) != len){
     err << "readMetaTableDesc read error" << endl;
     return false;
   } // if
   
-  return parseTableDescriptor((Uint32*)ptr, len);	     
+  int errcode = 0;
+  DictObject obj = { sectionInfo[2], 0 };
+  switch(obj.m_objType){
+  case DictTabInfo::SystemTable:
+  case DictTabInfo::UserTable:
+  case DictTabInfo::UniqueHashIndex:
+  case DictTabInfo::OrderedIndex:
+    return parseTableDescriptor((Uint32*)ptr, len);	     
+    break;
+  case DictTabInfo::Tablespace:
+  {
+    NdbDictionary::Tablespace * dst = new NdbDictionary::Tablespace;
+    errcode = 
+      NdbDictInterface::parseFilegroupInfo(NdbTablespaceImpl::getImpl(* dst), 
+					   (Uint32*)ptr, len);
+    if (errcode)
+      delete dst;
+    obj.m_objPtr = dst;
+    debug << hex << obj.m_objPtr << " " 
+	   << dec << dst->getObjectId() << " " << dst->getName() << endl;
+    break;
+  }
+  case DictTabInfo::LogfileGroup:
+  {
+    NdbDictionary::LogfileGroup * dst = new NdbDictionary::LogfileGroup;
+    errcode = 
+      NdbDictInterface::parseFilegroupInfo(NdbLogfileGroupImpl::getImpl(* dst),
+					   (Uint32*)ptr, len);
+    if (errcode)
+      delete dst;
+    obj.m_objPtr = dst;
+    debug << hex << obj.m_objPtr << " " 
+	   << dec << dst->getObjectId() << " " << dst->getName() << endl;
+    break;
+  }
+  case DictTabInfo::Datafile:
+  {
+    NdbDictionary::Datafile * dst = new NdbDictionary::Datafile;
+    errcode = 
+      NdbDictInterface::parseFileInfo(NdbDatafileImpl::getImpl(* dst), 
+				      (Uint32*)ptr, len);
+    if (errcode)
+      delete dst;
+    obj.m_objPtr = dst;
+    debug << hex << obj.m_objPtr << " "
+	   << dec << dst->getObjectId() << " " << dst->getPath() << endl;
+    break;
+  }
+  case DictTabInfo::Undofile:
+  {
+    NdbDictionary::Undofile * dst = new NdbDictionary::Undofile;
+    errcode = 
+      NdbDictInterface::parseFileInfo(NdbUndofileImpl::getImpl(* dst), 
+				      (Uint32*)ptr, len);
+    if (errcode)
+      delete dst;
+    obj.m_objPtr = dst;
+    debug << hex << obj.m_objPtr << " " 
+	   << dec << dst->getObjectId() << " " << dst->getPath() << endl;
+    break;
+  }
+  default:
+    err << "Unsupported table type!! " << sectionInfo[2] << endl;
+    return false;
+  }
+  if (errcode)
+  {
+    err << "Unable to parse dict info..." 
+	<< sectionInfo[2] << " " << errcode << endl;
+    return false;
+  }
+
+  /**
+   * DD objects need to be sorted...
+   */
+  for(Uint32 i = 0; i<m_objects.size(); i++)
+  {
+    switch(sectionInfo[2]){
+    case DictTabInfo::Tablespace:
+      if (DictTabInfo::isFile(m_objects[i].m_objType))
+      {
+	m_objects.push(obj, i);
+	goto end;
+      }
+      break;
+    case DictTabInfo::LogfileGroup:
+    {
+      if (DictTabInfo::isFile(m_objects[i].m_objType) ||
+	  m_objects[i].m_objType == DictTabInfo::Tablespace)
+      {
+	m_objects.push(obj, i);
+	goto end;
+      }
+      break;
+    }
+    default:
+      m_objects.push_back(obj);
+      goto end;
+    }
+  }
+  m_objects.push_back(obj);
+  
+end:
+  return true;
 }
 
 bool
@@ -191,7 +305,18 @@ RestoreMetaData::markSysTables()
         strcmp(tableName, "SYSTAB_0") == 0 ||
         strcmp(tableName, "NDB$EVENTS_0") == 0 ||
         strcmp(tableName, "sys/def/SYSTAB_0") == 0 ||
-        strcmp(tableName, "sys/def/NDB$EVENTS_0") == 0)
+        strcmp(tableName, "sys/def/NDB$EVENTS_0") == 0 ||
+        /*
+          The following is for old MySQL versions,
+           before we changed the database name of the tables from
+           "cluster_replication" -> "cluster" -> "mysql"
+        */
+        strcmp(tableName, "cluster_replication/def/" OLD_NDB_APPLY_TABLE) == 0 ||
+        strcmp(tableName, OLD_NDB_REP_DB "/def/" OLD_NDB_APPLY_TABLE) == 0 ||
+        strcmp(tableName, OLD_NDB_REP_DB "/def/" OLD_NDB_SCHEMA_TABLE) == 0 ||
+        strcmp(tableName, NDB_REP_DB "/def/" NDB_APPLY_TABLE) == 0 ||
+        strcmp(tableName, NDB_REP_DB "/def/" NDB_SCHEMA_TABLE)== 0 )
+
       table->isSysTable = true;
   }
   for (i = 0; i < getNoOfTables(); i++) {
@@ -319,17 +444,15 @@ TableS::~TableS()
     delete allAttributesDesc[i];
 }
 
+
 // Parse dictTabInfo buffer and pushback to to vector storage 
 bool
 RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len)
 {
   NdbTableImpl* tableImpl = 0;
-  int ret = 0;
-  if(!m_hostByteOrder)
-      ret = NdbDictInterface::parseTableInfo(&tableImpl, data, len, false, false);
-  else
-      ret = NdbDictInterface::parseTableInfo(&tableImpl, data, len, false);
-
+  int ret = NdbDictInterface::parseTableInfo(&tableImpl, data, len, false,
+                                             m_fileHeader.NdbVersion);
+  
   if (ret != 0) {
     err << "parseTableInfo " << " failed" << endl;
     return false;
@@ -338,7 +461,6 @@ RestoreMetaData::parseTableDescriptor(co
     return false;
 
   debug << "parseTableInfo " << tableImpl->getName() << " done" << endl;
-
   TableS * table = new TableS(m_fileHeader.NdbVersion, tableImpl);
   if(table == NULL) {
     return false;
@@ -442,7 +564,9 @@ RestoreDataIterator::getNextTuple(int  &
     res = -1;
     return NULL;
   }
-  
+ 
+  //if (m_currentTable->getTableId() >= 2) { for (uint ii=0; ii<dataLenBytes; ii+=4) ndbout << "*" << hex << *(Uint32*)( (char*)_buf_ptr+ii ); ndbout << endl; }
+
   Uint32 *buf_ptr = (Uint32*)_buf_ptr, *ptr = buf_ptr;
   ptr += m_currentTable->m_nullBitmaskSize;
   Uint32 i;
@@ -482,10 +606,8 @@ RestoreDataIterator::getNextTuple(int  &
     attr_data->void_value = ptr;
     attr_data->size = 4*sz;
 
-    if(!m_hostByteOrder
-        && attr_desc->m_column->getType() == NdbDictionary::Column::Timestamp)
-      attr_data->u_int32_value[0] = Twiddle32(attr_data->u_int32_value[0]);
-
+    //if (m_currentTable->getTableId() >= 2) { ndbout << "fix i=" << i << " off=" << ptr-buf_ptr << " attrId=" << attrId << endl; }
+    
     if(!Twiddle(attr_desc, attr_data))
       {
 	res = -1;
@@ -495,13 +617,28 @@ RestoreDataIterator::getNextTuple(int  &
     ptr += sz;
   }
 
+  // init to NULL
   for(i = 0; i < m_currentTable->m_variableAttribs.size(); i++){
     const Uint32 attrId = m_currentTable->m_variableAttribs[i]->attrId;
 
     AttributeData * attr_data = m_tuple.getData(attrId);
+
+    attr_data->null = true;
+    attr_data->void_value = NULL;
+  }
+
+  while (ptr + 2 < buf_ptr + dataLength) {
+    typedef BackupFormat::DataFile::VariableData VarData;
+    VarData * data = (VarData *)ptr;
+    Uint32 sz = ntohl(data->Sz);
+    Uint32 attrId = ntohl(data->Id); // column_no
+
+    AttributeData * attr_data = m_tuple.getData(attrId);
     const AttributeDesc * attr_desc = m_tuple.getDesc(attrId);
     
-    if(attr_desc->m_column->getNullable()){
+    // just a reminder - remove when backwards compat implemented
+    if(m_currentTable->backupVersion < MAKE_VERSION(5,1,3) && 
+       attr_desc->m_column->getNullable()){
       const Uint32 ind = attr_desc->m_nullBitIndex;
       if(BitmaskImpl::get(m_currentTable->m_nullBitmaskSize, 
 			  buf_ptr,ind)){
@@ -511,55 +648,33 @@ RestoreDataIterator::getNextTuple(int  &
       }
     }
 
-    assert(ptr < buf_ptr + dataLength);
-
-    typedef BackupFormat::DataFile::VariableData VarData;
-    VarData * data = (VarData *)ptr;
-    Uint32 sz = ntohl(data->Sz);
-    Uint32 id = ntohl(data->Id);
-    assert(id == attrId);
+    if (m_currentTable->backupVersion < MAKE_VERSION(5,1,3))
+    {
+      sz *= 4;
+    }
     
     attr_data->null = false;
     attr_data->void_value = &data->Data[0];
-    attr_data->size = sz*4;
+    attr_data->size = sz;
+
+    //if (m_currentTable->getTableId() >= 2) { ndbout << "var off=" << ptr-buf_ptr << " attrId=" << attrId << endl; }
 
     /**
      * Compute array size
      */
-    const Uint32 arraySize = (4 * sz) / (attr_desc->size / 8);
-    assert(arraySize >= attr_desc->arraySize);
-    
-    //convert the length of blob(v1) and text(v1)
-    if(!m_hostByteOrder
-        && (attr_desc->m_column->getType() == NdbDictionary::Column::Blob
-           || attr_desc->m_column->getType() == NdbDictionary::Column::Text))
-    {
-      char* p = (char*)&attr_data->u_int64_value[0];
-      Uint64 x;
-      memcpy(&x, p, sizeof(Uint64));
-      x = Twiddle64(x);
-      memcpy(p, &x, sizeof(Uint64));
-    }
-
-    if(!m_hostByteOrder
-        && attr_desc->m_column->getType() == NdbDictionary::Column::Datetime)
-    {
-      char* p = (char*)&attr_data->u_int64_value[0];
-      Uint64 x;
-      memcpy(&x, p, sizeof(Uint64));
-      x = Twiddle64(x);
-      memcpy(p, &x, sizeof(Uint64));
-    }
-
+    const Uint32 arraySize = sz / (attr_desc->size / 8);
+    assert(arraySize <= attr_desc->arraySize);
     if(!Twiddle(attr_desc, attr_data, attr_desc->arraySize))
       {
 	res = -1;
 	return NULL;
       }
-
-    ptr += (sz + 2);
+    
+    ptr += ((sz + 3) >> 2) + 2;
   }
 
+  assert(ptr == buf_ptr + dataLength);
+
   m_count ++;  
   res = 0;
   return &m_tuple;
@@ -752,19 +867,38 @@ BackupFile::validateFooter(){
   return true;
 }
 
-bool RestoreDataIterator::readFragmentHeader(int & ret)
+bool RestoreDataIterator::readFragmentHeader(int & ret, Uint32 *fragmentId)
 {
   BackupFormat::DataFile::FragmentHeader Header;
   
   debug << "RestoreDataIterator::getNextFragment" << endl;
   
-  if (buffer_read(&Header, sizeof(Header), 1) != 1){
+  while (1)
+  {
+    /* read first part of header */
+    if (buffer_read(&Header, 8, 1) != 1)
+    {
+      ret = 0;
+      return false;
+    } // if
+
+    /* skip if EMPTY_ENTRY */
+    Header.SectionType  = ntohl(Header.SectionType);
+    Header.SectionLength  = ntohl(Header.SectionLength);
+    if (Header.SectionType == BackupFormat::EMPTY_ENTRY)
+    {
+      void *tmp;
+      buffer_get_ptr(&tmp, Header.SectionLength*4-8, 1);
+      continue;
+    }
+    break;
+  }
+  /* read rest of header */
+  if (buffer_read(((char*)&Header)+8, sizeof(Header)-8, 1) != 1)
+  {
     ret = 0;
     return false;
-  } // if
-  
-  Header.SectionType  = ntohl(Header.SectionType);
-  Header.SectionLength  = ntohl(Header.SectionLength);
+  }
   Header.TableId  = ntohl(Header.TableId);
   Header.FragmentNo  = ntohl(Header.FragmentNo);
   Header.ChecksumType  = ntohl(Header.ChecksumType);
@@ -795,7 +929,7 @@ bool RestoreDataIterator::readFragmentHe
   
   m_count = 0;
   ret = 0;
-
+  *fragmentId = Header.FragmentNo;
   return true;
 } // RestoreDataIterator::getNextFragment
 
@@ -848,16 +982,20 @@ void TableS::createAttr(NdbDictionary::C
     return;
   }
   
-  if(!d->m_column->getNullable())
+  if (d->m_column->getArrayType() == NDB_ARRAYTYPE_FIXED &&
+      ! d->m_column->getNullable())
   {
     m_fixedAttribs.push_back(d);
     return;
   }
 
-  /* Nullable attr*/
-  d->m_nullBitIndex = m_noOfNullable; 
-  m_noOfNullable++;
-  m_nullBitmaskSize = (m_noOfNullable + 31) / 32;
+  // just a reminder - does not solve backwards compat
+  if (backupVersion < MAKE_VERSION(5,1,3))
+  {
+    d->m_nullBitIndex = m_noOfNullable; 
+    m_noOfNullable++;
+    m_nullBitmaskSize = (m_noOfNullable + 31) / 32;
+  }
   m_variableAttribs.push_back(d);
 } // TableS::createAttr
 
@@ -914,12 +1052,15 @@ RestoreLogIterator::RestoreLogIterator(c
 const LogEntry *
 RestoreLogIterator::getNextLogEntry(int & res) {
   // Read record length
-  typedef BackupFormat::LogFile::LogEntry LogE;
-
-  LogE * logE= 0;
-  Uint32 len= ~0;
   const Uint32 stopGCP = m_metaData.getStopGCP();
+  Uint32 tableId;
+  Uint32 triggerEvent;
+  Uint32 frag_id;
+  Uint32 *attr_data;
+  Uint32 attr_data_len;
   do {
+    Uint32 len;
+    Uint32 *logEntryPtr;
     if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){
       res= -1;
       return 0;
@@ -927,7 +1068,7 @@ RestoreLogIterator::getNextLogEntry(int 
     len= ntohl(len);
 
     Uint32 data_len = sizeof(Uint32) + len*4;
-    if (buffer_get_ptr((void **)(&logE), 1, data_len) != data_len) {
+    if (buffer_get_ptr((void **)(&logEntryPtr), 1, data_len) != data_len) {
       res= -2;
       return 0;
     }
@@ -937,20 +1078,46 @@ RestoreLogIterator::getNextLogEntry(int 
       return 0;
     }
 
-    logE->TableId= ntohl(logE->TableId);
-    logE->TriggerEvent= ntohl(logE->TriggerEvent);
-    
-    const bool hasGcp= (logE->TriggerEvent & 0x10000) != 0;
-    logE->TriggerEvent &= 0xFFFF;
+    if (unlikely(m_metaData.getFileHeader().NdbVersion < NDBD_FRAGID_VERSION))
+    {
+      /*
+        FragId was introduced in LogEntry in version
+        5.1.6
+        We set FragId to 0 in older versions (these versions
+        do not support restore of user defined partitioned
+        tables.
+      */
+      typedef BackupFormat::LogFile::LogEntry_no_fragid LogE_no_fragid;
+      LogE_no_fragid * logE_no_fragid= (LogE_no_fragid *)logEntryPtr;
+      tableId= ntohl(logE_no_fragid->TableId);
+      triggerEvent= ntohl(logE_no_fragid->TriggerEvent);
+      frag_id= 0;
+      attr_data= &logE_no_fragid->Data[0];
+      attr_data_len= len - ((offsetof(LogE_no_fragid, Data) >> 2) - 1);
+    }
+    else /* normal case */
+    {
+      typedef BackupFormat::LogFile::LogEntry LogE;
+      LogE * logE= (LogE *)logEntryPtr;
+      tableId= ntohl(logE->TableId);
+      triggerEvent= ntohl(logE->TriggerEvent);
+      frag_id= ntohl(logE->FragId);
+      attr_data= &logE->Data[0];
+      attr_data_len= len - ((offsetof(LogE, Data) >> 2) - 1);
+    }
     
+    const bool hasGcp= (triggerEvent & 0x10000) != 0;
+    triggerEvent &= 0xFFFF;
+
     if(hasGcp){
-      len--;
-      m_last_gci = ntohl(logE->Data[len-2]);
+      // last attr_data is gci info
+      attr_data_len--;
+      m_last_gci = ntohl(*(attr_data + attr_data_len));
     }
   } while(m_last_gci > stopGCP + 1);
-  
-  m_logEntry.m_table = m_metaData.getTable(logE->TableId);
-  switch(logE->TriggerEvent){
+
+  m_logEntry.m_table = m_metaData.getTable(tableId);
+  switch(triggerEvent){
   case TriggerEvent::TE_INSERT:
     m_logEntry.m_type = LogEntry::LE_INSERT;
     break;
@@ -968,9 +1135,10 @@ RestoreLogIterator::getNextLogEntry(int 
   const TableS * tab = m_logEntry.m_table;
   m_logEntry.clear();
 
-  AttributeHeader * ah = (AttributeHeader *)&logE->Data[0];
-  AttributeHeader *end = (AttributeHeader *)&logE->Data[len - 2];
+  AttributeHeader * ah = (AttributeHeader *)attr_data;
+  AttributeHeader *end = (AttributeHeader *)(attr_data + attr_data_len);
   AttributeS *  attr;
+  m_logEntry.m_frag_id = frag_id;
   while(ah < end){
     attr= m_logEntry.add_attr();
     if(attr == NULL) {
@@ -979,6 +1147,9 @@ RestoreLogIterator::getNextLogEntry(int 
       return 0;
     }
 
+    if(unlikely(!m_hostByteOrder))
+      *(Uint32*)ah = Twiddle32(*(Uint32*)ah);
+
     attr->Desc = (* tab)[ah->getAttributeId()];
     assert(attr->Desc != 0);
 
@@ -1014,7 +1185,7 @@ operator<<(NdbOut& ndbout, const Attribu
   
   NdbRecAttr tmprec(0);
   tmprec.setup(desc.m_column, 0);
-  tmprec.receive_data((Uint32*)data.void_value, (data.size+3)/4);
+  tmprec.receive_data((Uint32*)data.void_value, data.size);
   ndbrecattr_print_formatted(ndbout, tmprec, g_ndbrecord_print_format);
 
   return ndbout;
@@ -1080,4 +1251,4 @@ template class Vector<TableS*>;
 template class Vector<AttributeS*>;
 template class Vector<AttributeDesc*>;
 template class Vector<FragmentInfo*>;
-
+template class Vector<DictObject>;

--- 1.74.35.1/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-08-15 15:23:13 +00:00
+++ 1.172/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-08-15 15:23:13 +00:00
@@ -28,17 +28,47 @@
 #include <signaldata/AlterTable.hpp>
 #include <signaldata/DropIndx.hpp>
 #include <signaldata/ListTables.hpp>
+#include <signaldata/DropFilegroup.hpp>
+#include <signaldata/CreateFilegroup.hpp>
+#include <signaldata/WaitGCP.hpp>
 #include <SimpleProperties.hpp>
 #include <Bitmask.hpp>
 #include <AttributeList.hpp>
+#include <NdbEventOperation.hpp>
+#include "NdbEventOperationImpl.hpp"
 #include <NdbBlob.hpp>
 #include "NdbBlobImpl.hpp"
 #include <AttributeHeader.hpp>
 #include <my_sys.h>
+#include <NdbEnv.h>
+#include <NdbMem.h>
+#include <ndb_version.h>
 
 #define DEBUG_PRINT 0
 #define INCOMPATIBLE_VERSION -2
 
+#define DICT_WAITFOR_TIMEOUT (7*24*60*60*1000)
+
+#define ERR_RETURN(a,b) \
+{\
+   DBUG_PRINT("exit", ("error %d  return %d", (a).code, b));\
+   DBUG_RETURN(b);\
+}
+
+int ndb_dictionary_is_mysqld = 0;
+
+bool
+is_ndb_blob_table(const char* name, Uint32* ptab_id, Uint32* pcol_no)
+{
+  return DictTabInfo::isBlobTableName(name, ptab_id, pcol_no);
+}
+
+bool
+is_ndb_blob_table(const NdbTableImpl* t)
+{
+  return is_ndb_blob_table(t->m_internalName.c_str());
+}
+
 //#define EVENT_DEBUG
 
 /**
@@ -47,18 +77,26 @@
 NdbColumnImpl::NdbColumnImpl()
   : NdbDictionary::Column(* this), m_attrId(-1), m_facade(this)
 {
+  DBUG_ENTER("NdbColumnImpl::NdbColumnImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 NdbColumnImpl::NdbColumnImpl(NdbDictionary::Column & f)
   : NdbDictionary::Column(* this), m_attrId(-1), m_facade(&f)
 {
+  DBUG_ENTER("NdbColumnImpl::NdbColumnImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 NdbColumnImpl&
 NdbColumnImpl::operator=(const NdbColumnImpl& col)
 {
+  DBUG_ENTER("NdbColumnImpl::operator=");
+  DBUG_PRINT("info", ("this: %p  &col: %p", this, &col));
   m_attrId = col.m_attrId;
   m_name = col.m_name;
   m_type = col.m_type;
@@ -74,11 +112,20 @@ NdbColumnImpl::operator=(const NdbColumn
   m_defaultValue = col.m_defaultValue;
   m_attrSize = col.m_attrSize; 
   m_arraySize = col.m_arraySize;
+  m_arrayType = col.m_arrayType;
+  m_storageType = col.m_storageType;
   m_keyInfoPos = col.m_keyInfoPos;
-  m_blobTable = col.m_blobTable;
+  if (col.m_blobTable == NULL)
+    m_blobTable = NULL;
+  else {
+    if (m_blobTable == NULL)
+      m_blobTable = new NdbTableImpl();
+    m_blobTable->assign(*col.m_blobTable);
+  }
+  m_column_no = col.m_column_no;
   // Do not copy m_facade !!
 
-  return *this;
+  DBUG_RETURN(*this);
 }
 
 void
@@ -105,6 +152,7 @@ NdbColumnImpl::init(Type t)
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Olddecimal:
   case Olddecimalunsigned:
@@ -114,34 +162,57 @@ NdbColumnImpl::init(Type t)
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Char:
+    m_precision = 0;
+    m_scale = 0;
+    m_length = 1;
+    m_cs = default_cs;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
+    break;
   case Varchar:
     m_precision = 0;
     m_scale = 0;
     m_length = 1;
     m_cs = default_cs;
+    m_arrayType = NDB_ARRAYTYPE_SHORT_VAR;
     break;
   case Binary:
+    m_precision = 0;
+    m_scale = 0;
+    m_length = 1;
+    m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
+    break;
   case Varbinary:
+    m_precision = 0;
+    m_scale = 0;
+    m_length = 1;
+    m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_SHORT_VAR;
+    break;
   case Datetime:
   case Date:
     m_precision = 0;
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Blob:
     m_precision = 256;
     m_scale = 8000;
     m_length = 4;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Text:
     m_precision = 256;
     m_scale = 8000;
     m_length = 4;
     m_cs = default_cs;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Time:
   case Year:
@@ -150,24 +221,28 @@ NdbColumnImpl::init(Type t)
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Bit:
     m_precision = 0;
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Longvarchar:
     m_precision = 0;
     m_scale = 0;
     m_length = 1; // legal
     m_cs = default_cs;
+    m_arrayType = NDB_ARRAYTYPE_MEDIUM_VAR;
     break;
   case Longvarbinary:
     m_precision = 0;
     m_scale = 0;
     m_length = 1; // legal
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_MEDIUM_VAR;
     break;
   default:
   case Undefined:
@@ -184,16 +259,28 @@ NdbColumnImpl::init(Type t)
   m_autoIncrement = false;
   m_autoIncrementInitialValue = 1;
   m_blobTable = NULL;
+  m_storageType = NDB_STORAGETYPE_MEMORY;
+#ifdef VM_TRACE
+  if(NdbEnv_GetEnv("NDB_DEFAULT_DISK", (char *)0, 0))
+    m_storageType = NDB_STORAGETYPE_DISK;
+#endif
 }
 
 NdbColumnImpl::~NdbColumnImpl()
 {
+  DBUG_ENTER("NdbColumnImpl::~NdbColumnImpl");
+  DBUG_PRINT("info", ("this: %p", this));
+  if (m_blobTable != NULL)
+    delete m_blobTable;
+  m_blobTable = NULL;
+  DBUG_VOID_RETURN;
 }
 
 bool
 NdbColumnImpl::equal(const NdbColumnImpl& col) const 
 {
   DBUG_ENTER("NdbColumnImpl::equal");
+  DBUG_PRINT("info", ("this: %p  &col: %p", this, &col));
   if(strcmp(m_name.c_str(), col.m_name.c_str()) != 0){
     DBUG_RETURN(false);
   }
@@ -206,13 +293,11 @@ NdbColumnImpl::equal(const NdbColumnImpl
   if(m_nullable != col.m_nullable){
     DBUG_RETURN(false);
   }
-#ifdef ndb_dictionary_dkey_fixed
-  if(m_pk){
-    if(m_distributionKey != col.m_distributionKey){
+  if (m_pk) {
+    if (m_distributionKey != col.m_distributionKey) {
       DBUG_RETURN(false);
     }
   }
-#endif
   if (m_precision != col.m_precision ||
       m_scale != col.m_scale ||
       m_length != col.m_length ||
@@ -226,11 +311,15 @@ NdbColumnImpl::equal(const NdbColumnImpl
     DBUG_RETURN(false);
   }
 
+  if (m_arrayType != col.m_arrayType || m_storageType != col.m_storageType){
+    DBUG_RETURN(false);
+  }
+
   DBUG_RETURN(true);
 }
 
 NdbDictionary::Column *
-NdbColumnImpl::create_psuedo(const char * name){
+NdbColumnImpl::create_pseudo(const char * name){
   NdbDictionary::Column * col = new NdbDictionary::Column();
   col->setName(name);
   if(!strcmp(name, "NDB$FRAGMENT")){
@@ -238,9 +327,14 @@ NdbColumnImpl::create_psuedo(const char 
     col->m_impl.m_attrId = AttributeHeader::FRAGMENT;
     col->m_impl.m_attrSize = 4;
     col->m_impl.m_arraySize = 1;
-  } else if(!strcmp(name, "NDB$FRAGMENT_MEMORY")){
+  } else if(!strcmp(name, "NDB$FRAGMENT_FIXED_MEMORY")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::FRAGMENT_FIXED_MEMORY;
+    col->m_impl.m_attrSize = 8;
+    col->m_impl.m_arraySize = 1;
+  } else if(!strcmp(name, "NDB$FRAGMENT_VARSIZED_MEMORY")){
     col->setType(NdbDictionary::Column::Bigunsigned);
-    col->m_impl.m_attrId = AttributeHeader::FRAGMENT_MEMORY;
+    col->m_impl.m_attrId = AttributeHeader::FRAGMENT_VARSIZED_MEMORY;
     col->m_impl.m_attrSize = 8;
     col->m_impl.m_arraySize = 1;
   } else if(!strcmp(name, "NDB$ROW_COUNT")){
@@ -263,9 +357,41 @@ NdbColumnImpl::create_psuedo(const char 
     col->m_impl.m_attrId = AttributeHeader::RANGE_NO;
     col->m_impl.m_attrSize = 4;
     col->m_impl.m_arraySize = 1;
+  } else if(!strcmp(name, "NDB$DISK_REF")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::DISK_REF;
+    col->m_impl.m_attrSize = 8;
+    col->m_impl.m_arraySize = 1;
+  } else if(!strcmp(name, "NDB$RECORDS_IN_RANGE")){
+    col->setType(NdbDictionary::Column::Unsigned);
+    col->m_impl.m_attrId = AttributeHeader::RECORDS_IN_RANGE;
+    col->m_impl.m_attrSize = 4;
+    col->m_impl.m_arraySize = 4;
+  } else if(!strcmp(name, "NDB$ROWID")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::ROWID;
+    col->m_impl.m_attrSize = 4;
+    col->m_impl.m_arraySize = 2;
+  } else if(!strcmp(name, "NDB$ROW_GCI")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::ROW_GCI;
+    col->m_impl.m_attrSize = 8;
+    col->m_impl.m_arraySize = 1;
+    col->m_impl.m_nullable = true;
+  } else if(!strcmp(name, "NDB$ANY_VALUE")){
+    col->setType(NdbDictionary::Column::Unsigned);
+    col->m_impl.m_attrId = AttributeHeader::ANY_VALUE;
+    col->m_impl.m_attrSize = 4;
+    col->m_impl.m_arraySize = 1;
+  } else if(!strcmp(name, "NDB$COPY_ROWID")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::COPY_ROWID;
+    col->m_impl.m_attrSize = 4;
+    col->m_impl.m_arraySize = 2;
   } else {
     abort();
   }
+  col->m_impl.m_storageType = NDB_STORAGETYPE_MEMORY;
   return col;
 }
 
@@ -274,50 +400,87 @@ NdbColumnImpl::create_psuedo(const char 
  */
 
 NdbTableImpl::NdbTableImpl()
-  : NdbDictionary::Table(* this), m_facade(this)
+  : NdbDictionary::Table(* this), 
+    NdbDictObjectImpl(NdbDictionary::Object::UserTable), m_facade(this)
 {
+  DBUG_ENTER("NdbTableImpl::NdbTableImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 NdbTableImpl::NdbTableImpl(NdbDictionary::Table & f)
-  : NdbDictionary::Table(* this), m_facade(&f)
+  : NdbDictionary::Table(* this), 
+    NdbDictObjectImpl(NdbDictionary::Object::UserTable), m_facade(&f)
 {
+  DBUG_ENTER("NdbTableImpl::NdbTableImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 NdbTableImpl::~NdbTableImpl()
 {
+  DBUG_ENTER("NdbTableImpl::~NdbTableImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   if (m_index != 0) {
     delete m_index;
     m_index = 0;
   }
   for (unsigned i = 0; i < m_columns.size(); i++)
-    delete m_columns[i];  
+    delete m_columns[i];
+  DBUG_VOID_RETURN;
 }
 
 void
 NdbTableImpl::init(){
   m_changeMask= 0;
-  m_tableId= RNIL;
+  m_id= RNIL;
+  m_version = ~0;
+  m_status = NdbDictionary::Object::Invalid;
+  m_type = NdbDictionary::Object::TypeUndefined;
+  m_primaryTableId= RNIL;
+  m_internalName.clear();
+  m_externalName.clear();
+  m_newExternalName.clear();
+  m_mysqlName.clear();
   m_frm.clear();
+  m_newFrm.clear();
+  m_ts_name.clear();
+  m_new_ts_name.clear();
+  m_ts.clear();
+  m_new_ts.clear();
+  m_fd.clear();
+  m_new_fd.clear();
+  m_range.clear();
+  m_new_range.clear();
   m_fragmentType= NdbDictionary::Object::FragAllSmall;
   m_hashValueMask= 0;
   m_hashpointerValue= 0;
+  m_linear_flag= true;
+  m_primaryTable.clear();
+  m_default_no_part_flag = 1;
   m_logging= true;
+  m_temporary = false;
+  m_row_gci = true;
+  m_row_checksum = true;
+  m_force_var_part = false;
   m_kvalue= 6;
   m_minLoadFactor= 78;
   m_maxLoadFactor= 80;
   m_keyLenInWords= 0;
   m_fragmentCount= 0;
-  m_dictionary= NULL;
   m_index= NULL;
-  m_indexType= NdbDictionary::Index::Undefined;
+  m_indexType= NdbDictionary::Object::TypeUndefined;
   m_noOfKeys= 0;
   m_noOfDistributionKeys= 0;
   m_noOfBlobs= 0;
   m_replicaCount= 0;
   m_min_rows = 0;
   m_max_rows = 0;
+  m_tablespace_name.clear();
+  m_tablespace_id = ~0;
+  m_tablespace_version = ~0;
   m_single_user_mode = 0;
 }
 
@@ -328,56 +491,190 @@ NdbTableImpl::equal(const NdbTableImpl& 
   if ((m_internalName.c_str() == NULL) || 
       (strcmp(m_internalName.c_str(), "") == 0) ||
       (obj.m_internalName.c_str() == NULL) || 
-      (strcmp(obj.m_internalName.c_str(), "") == 0)) {
+      (strcmp(obj.m_internalName.c_str(), "") == 0))
+  {
     // Shallow equal
-    if(strcmp(getName(), obj.getName()) != 0){
+    if(strcmp(getName(), obj.getName()) != 0)
+    {
       DBUG_PRINT("info",("name %s != %s",getName(),obj.getName()));
       DBUG_RETURN(false);    
     }
-  } else 
+  }
+  else
+  {
     // Deep equal
-    if(strcmp(m_internalName.c_str(), obj.m_internalName.c_str()) != 0){
+    if(strcmp(m_internalName.c_str(), obj.m_internalName.c_str()) != 0)
     {
       DBUG_PRINT("info",("m_internalName %s != %s",
 			 m_internalName.c_str(),obj.m_internalName.c_str()));
       DBUG_RETURN(false);
     }
   }
-  if(m_fragmentType != obj.m_fragmentType){
-    DBUG_PRINT("info",("m_fragmentType %d != %d",m_fragmentType,obj.m_fragmentType));
+  if (m_frm.length() != obj.m_frm.length() ||
+      (memcmp(m_frm.get_data(), obj.m_frm.get_data(), m_frm.length())))
+  {
+    DBUG_PRINT("info",("m_frm not equal"));
+    DBUG_RETURN(false);
+  }
+  if (m_fd.length() != obj.m_fd.length() ||
+      (memcmp(m_fd.get_data(), obj.m_fd.get_data(), m_fd.length())))
+  {
+    DBUG_PRINT("info",("m_fd not equal"));
+    DBUG_RETURN(false);
+  }
+  if (m_ts.length() != obj.m_ts.length() ||
+      (memcmp(m_ts.get_data(), obj.m_ts.get_data(), m_ts.length())))
+  {
+    DBUG_PRINT("info",("m_ts not equal"));
+    DBUG_RETURN(false);
+  }
+  if (m_range.length() != obj.m_range.length() ||
+      (memcmp(m_range.get_data(), obj.m_range.get_data(), m_range.length())))
+  {
+    DBUG_PRINT("info",("m_range not equal"));
+    DBUG_RETURN(false);
+  }
+  if(m_fragmentType != obj.m_fragmentType)
+  {
+    DBUG_PRINT("info",("m_fragmentType %d != %d",m_fragmentType,
+                        obj.m_fragmentType));
     DBUG_RETURN(false);
   }
-  if(m_columns.size() != obj.m_columns.size()){
-    DBUG_PRINT("info",("m_columns.size %d != %d",m_columns.size(),obj.m_columns.size()));
+  if(m_columns.size() != obj.m_columns.size())
+  {
+    DBUG_PRINT("info",("m_columns.size %d != %d",m_columns.size(),
+                       obj.m_columns.size()));
     DBUG_RETURN(false);
   }
 
-  for(unsigned i = 0; i<obj.m_columns.size(); i++){
-    if(!m_columns[i]->equal(* obj.m_columns[i])){
+  for(unsigned i = 0; i<obj.m_columns.size(); i++)
+  {
+    if(!m_columns[i]->equal(* obj.m_columns[i]))
+    {
       DBUG_PRINT("info",("m_columns [%d] != [%d]",i,i));
       DBUG_RETURN(false);
     }
   }
   
-  if(m_logging != obj.m_logging){
+  if(m_linear_flag != obj.m_linear_flag)
+  {
+    DBUG_PRINT("info",("m_linear_flag %d != %d",m_linear_flag,
+                        obj.m_linear_flag));
+    DBUG_RETURN(false);
+  }
+
+  if(m_max_rows != obj.m_max_rows)
+  {
+    DBUG_PRINT("info",("m_max_rows %d != %d",(int32)m_max_rows,
+                       (int32)obj.m_max_rows));
+    DBUG_RETURN(false);
+  }
+
+  if(m_default_no_part_flag != obj.m_default_no_part_flag)
+  {
+    DBUG_PRINT("info",("m_default_no_part_flag %d != %d",m_default_no_part_flag,
+                        obj.m_default_no_part_flag));
+    DBUG_RETURN(false);
+  }
+
+  if(m_logging != obj.m_logging)
+  {
     DBUG_PRINT("info",("m_logging %d != %d",m_logging,obj.m_logging));
     DBUG_RETURN(false);
   }
 
-  if(m_kvalue != obj.m_kvalue){
+  if(m_temporary != obj.m_temporary)
+  {
+    DBUG_PRINT("info",("m_temporary %d != %d",m_temporary,obj.m_temporary));
+    DBUG_RETURN(false);
+  }
+
+  if(m_row_gci != obj.m_row_gci)
+  {
+    DBUG_PRINT("info",("m_row_gci %d != %d",m_row_gci,obj.m_row_gci));
+    DBUG_RETURN(false);
+  }
+
+  if(m_row_checksum != obj.m_row_checksum)
+  {
+    DBUG_PRINT("info",("m_row_checksum %d != %d",m_row_checksum,
+                        obj.m_row_checksum));
+    DBUG_RETURN(false);
+  }
+
+  if(m_kvalue != obj.m_kvalue)
+  {
     DBUG_PRINT("info",("m_kvalue %d != %d",m_kvalue,obj.m_kvalue));
     DBUG_RETURN(false);
   }
 
-  if(m_minLoadFactor != obj.m_minLoadFactor){
-    DBUG_PRINT("info",("m_minLoadFactor %d != %d",m_minLoadFactor,obj.m_minLoadFactor));
+  if(m_minLoadFactor != obj.m_minLoadFactor)
+  {
+    DBUG_PRINT("info",("m_minLoadFactor %d != %d",m_minLoadFactor,
+                        obj.m_minLoadFactor));
+    DBUG_RETURN(false);
+  }
+
+  if(m_maxLoadFactor != obj.m_maxLoadFactor)
+  {
+    DBUG_PRINT("info",("m_maxLoadFactor %d != %d",m_maxLoadFactor,
+                        obj.m_maxLoadFactor));
+    DBUG_RETURN(false);
+  }
+
+  if(m_tablespace_id != obj.m_tablespace_id)
+  {
+    DBUG_PRINT("info",("m_tablespace_id %d != %d",m_tablespace_id,
+                        obj.m_tablespace_id));
+    DBUG_RETURN(false);
+  }
+
+  if(m_tablespace_version != obj.m_tablespace_version)
+  {
+    DBUG_PRINT("info",("m_tablespace_version %d != %d",m_tablespace_version,
+                        obj.m_tablespace_version));
+    DBUG_RETURN(false);
+  }
+
+  if(m_id != obj.m_id)
+  {
+    DBUG_PRINT("info",("m_id %d != %d",m_id,obj.m_id));
+    DBUG_RETURN(false);
+  }
+
+  if(m_version != obj.m_version)
+  {
+    DBUG_PRINT("info",("m_version %d != %d",m_version,obj.m_version));
     DBUG_RETURN(false);
   }
 
-  if(m_maxLoadFactor != obj.m_maxLoadFactor){
-    DBUG_PRINT("info",("m_maxLoadFactor %d != %d",m_maxLoadFactor,obj.m_maxLoadFactor));
+  if(m_type != obj.m_type)
+  {
+    DBUG_PRINT("info",("m_type %d != %d",m_type,obj.m_type));
     DBUG_RETURN(false);
   }
+
+  if (m_type == NdbDictionary::Object::UniqueHashIndex ||
+      m_type == NdbDictionary::Object::OrderedIndex)
+  {
+    if(m_primaryTableId != obj.m_primaryTableId)
+    {
+      DBUG_PRINT("info",("m_primaryTableId %d != %d",m_primaryTableId,
+                 obj.m_primaryTableId));
+      DBUG_RETURN(false);
+    }
+    if (m_indexType != obj.m_indexType)
+    {
+      DBUG_PRINT("info",("m_indexType %d != %d",m_indexType,obj.m_indexType));
+      DBUG_RETURN(false);
+    }
+    if(strcmp(m_primaryTable.c_str(), obj.m_primaryTable.c_str()) != 0)
+    {
+      DBUG_PRINT("info",("m_primaryTable %s != %s",
+			 m_primaryTable.c_str(),obj.m_primaryTable.c_str()));
+      DBUG_RETURN(false);
+    }
+  }
   
   if(m_single_user_mode != obj.m_single_user_mode)
   {
@@ -387,24 +684,51 @@ NdbTableImpl::equal(const NdbTableImpl& 
     DBUG_RETURN(false);
   }
 
-   DBUG_RETURN(true);
+  DBUG_RETURN(true);
 }
 
 int
 NdbTableImpl::assign(const NdbTableImpl& org)
 {
-  m_tableId = org.m_tableId;
+  DBUG_ENTER("NdbColumnImpl::assign");
+  DBUG_PRINT("info", ("this: %p  &org: %p", this, &org));
+  /* m_changeMask intentionally not copied */
+  m_primaryTableId = org.m_primaryTableId;
   if (!m_internalName.assign(org.m_internalName) ||
-      !m_externalName.assign(org.m_externalName) ||
-      !m_newExternalName.assign(org.m_newExternalName) ||
-      m_frm.assign(org.m_frm.get_data(), org.m_frm.length()))
+      updateMysqlName())
   {
     return -1;
   }
-  m_fragmentType = org.m_fragmentType;
-  m_fragmentCount = org.m_fragmentCount;
+  // If the name has been explicitly set, use that name
+  // otherwise use the fetched name
+  if (!org.m_newExternalName.empty())
+    m_externalName.assign(org.m_newExternalName);
+  else
+    m_externalName.assign(org.m_externalName);
+  m_frm.assign(org.m_frm.get_data(), org.m_frm.length());
+  m_ts_name.assign(org.m_ts_name.get_data(), org.m_ts_name.length());
+  m_new_ts_name.assign(org.m_new_ts_name.get_data(),
+                       org.m_new_ts_name.length());
+  m_ts.assign(org.m_ts.get_data(), org.m_ts.length());
+  m_new_ts.assign(org.m_new_ts.get_data(), org.m_new_ts.length());
+  m_fd.assign(org.m_fd.get_data(), org.m_fd.length());
+  m_new_fd.assign(org.m_new_fd.get_data(), org.m_new_fd.length());
+  m_range.assign(org.m_range.get_data(), org.m_range.length());
+  m_new_range.assign(org.m_new_range.get_data(), org.m_new_range.length());
 
-  for(unsigned i = 0; i<org.m_columns.size(); i++){
+  m_fragmentType = org.m_fragmentType;
+  /*
+    m_columnHashMask, m_columnHash, m_hashValueMask, m_hashpointerValue
+    is state calculated by computeAggregates and buildColumnHash
+  */
+  unsigned i;
+  for(i = 0; i < m_columns.size(); i++)
+  {
+    delete m_columns[i];
+  }
+  m_columns.clear();
+  for(i = 0; i < org.m_columns.size(); i++)
+  {
     NdbColumnImpl * col = new NdbColumnImpl();
     if (col == NULL)
     {
@@ -420,28 +744,47 @@ NdbTableImpl::assign(const NdbTableImpl&
     }
   }
 
+  m_fragments = org.m_fragments;
+
+  m_linear_flag = org.m_linear_flag;
+  m_max_rows = org.m_max_rows;
+  m_default_no_part_flag = org.m_default_no_part_flag;
   m_logging = org.m_logging;
+  m_temporary = org.m_temporary;
+  m_row_gci = org.m_row_gci;
+  m_row_checksum = org.m_row_checksum;
+  m_force_var_part = org.m_force_var_part;
   m_kvalue = org.m_kvalue;
   m_minLoadFactor = org.m_minLoadFactor;
   m_maxLoadFactor = org.m_maxLoadFactor;
+  m_keyLenInWords = org.m_keyLenInWords;
+  m_fragmentCount = org.m_fragmentCount;
+  
   m_single_user_mode = org.m_single_user_mode;
 
   if (m_index != 0)
     delete m_index;
   m_index = org.m_index;
-  
-  m_noOfDistributionKeys = org.m_noOfDistributionKeys;
+ 
+  m_primaryTable = org.m_primaryTable;
+  m_indexType = org.m_indexType;
+
   m_noOfKeys = org.m_noOfKeys;
-  m_keyLenInWords = org.m_keyLenInWords;
+  m_noOfDistributionKeys = org.m_noOfDistributionKeys;
   m_noOfBlobs = org.m_noOfBlobs;
+  m_replicaCount = org.m_replicaCount;
 
+  m_id = org.m_id;
   m_version = org.m_version;
   m_status = org.m_status;
 
   m_max_rows = org.m_max_rows;
   m_min_rows = org.m_min_rows;
 
-  return 0;
+  m_tablespace_name = org.m_tablespace_name;
+  m_tablespace_id= org.m_tablespace_id;
+  m_tablespace_version = org.m_tablespace_version;
+  DBUG_RETURN(0);
 }
 
 int NdbTableImpl::setName(const char * name)
@@ -458,11 +801,214 @@ NdbTableImpl::getName() const
     return m_newExternalName.c_str();
 }
 
+void
+NdbTableImpl::computeAggregates()
+{
+  m_noOfKeys = 0;
+  m_keyLenInWords = 0;
+  m_noOfDistributionKeys = 0;
+  m_noOfBlobs = 0;
+  m_noOfDiskColumns = 0;
+  Uint32 i, n;
+  for (i = 0; i < m_columns.size(); i++) {
+    NdbColumnImpl* col = m_columns[i];
+    if (col->m_pk) {
+      m_noOfKeys++;
+      m_keyLenInWords += (col->m_attrSize * col->m_arraySize + 3) / 4;
+    }
+    if (col->m_distributionKey)
+      m_noOfDistributionKeys++; // XXX check PK
+    
+    if (col->getBlobType())
+      m_noOfBlobs++;
+
+    if (col->getStorageType() == NdbDictionary::Column::StorageTypeDisk)
+      m_noOfDiskColumns++;
+    
+    col->m_keyInfoPos = ~0;
+  }
+  if (m_noOfDistributionKeys == m_noOfKeys) {
+    // all is none!
+    m_noOfDistributionKeys = 0;
+  }
+
+  if (m_noOfDistributionKeys == 0) 
+  {
+    // none is all!
+    for (i = 0, n = m_noOfKeys; n != 0; i++) {
+      NdbColumnImpl* col = m_columns[i];
+      if (col->m_pk) {
+        col->m_distributionKey = true;
+        n--;
+      }
+    }
+  }
+  
+  Uint32 keyInfoPos = 0;
+  for (i = 0, n = m_noOfKeys; n != 0; i++) {
+    NdbColumnImpl* col = m_columns[i];
+    if (col->m_pk) {
+      col->m_keyInfoPos = keyInfoPos++;
+      n--;
+    }
+  }
+}
+
+// TODO add error checks
+// TODO use these internally at create and retrieve
+int
+NdbTableImpl::aggregate(NdbError& error)
+{
+  computeAggregates();
+  return 0;
+}
+int
+NdbTableImpl::validate(NdbError& error)
+{
+  if (aggregate(error) == -1)
+    return -1;
+  return 0;
+}
+
+const void*
+NdbTableImpl::getTablespaceNames() const
+{
+  if (m_new_ts_name.empty())
+    return m_ts_name.get_data();
+  else
+    return m_new_ts_name.get_data();
+}
+
+Uint32
+NdbTableImpl::getTablespaceNamesLen() const
+{
+  if (m_new_ts_name.empty())
+    return m_ts_name.length();
+  else
+    return m_new_ts_name.length();
+}
+
+int NdbTableImpl::setTablespaceNames(const void *data, Uint32 len)
+{
+  return !m_new_ts_name.assign(data, len);
+}
+
+void NdbTableImpl::setFragmentCount(Uint32 count)
+{
+  m_fragmentCount= count;
+}
+
+Uint32 NdbTableImpl::getFragmentCount() const
+{
+  return m_fragmentCount;
+}
+
+int NdbTableImpl::setFrm(const void* data, Uint32 len)
+{
+  return m_newFrm.assign(data, len);
+}
+
+const void * 
+NdbTableImpl::getFrmData() const
+{
+  if (m_newFrm.empty())
+    return m_frm.get_data();
+  else
+    return m_newFrm.get_data();
+}
+
+Uint32
+NdbTableImpl::getFrmLength() const 
+{
+  if (m_newFrm.empty())
+    return m_frm.length();
+  else
+    return m_newFrm.length();
+}
+
+int NdbTableImpl::setFragmentData(const void* data, Uint32 len)
+{
+  return m_new_fd.assign(data, len);
+}
+
+const void * 
+NdbTableImpl::getFragmentData() const
+{
+  if (m_new_fd.empty())
+    return m_fd.get_data();
+  else
+    return m_new_fd.get_data();
+}
+
+Uint32
+NdbTableImpl::getFragmentDataLen() const 
+{
+  if (m_new_fd.empty())
+    return m_fd.length();
+  else
+    return m_new_fd.length();
+}
+
+int NdbTableImpl::setTablespaceData(const void* data, Uint32 len)
+{
+  return !m_new_ts.assign(data, len);
+}
+
+const void * 
+NdbTableImpl::getTablespaceData() const
+{
+  if (m_new_ts.empty())
+    return m_ts.get_data();
+  else
+    return m_new_ts.get_data();
+}
+
+Uint32
+NdbTableImpl::getTablespaceDataLen() const 
+{
+  if (m_new_ts.empty())
+    return m_ts.length();
+  else
+    return m_new_ts.length();
+}
+
+int NdbTableImpl::setRangeListData(const void* data, Uint32 len)
+{
+  return m_new_range.assign(data, len);
+}
+
+const void * 
+NdbTableImpl::getRangeListData() const
+{
+  if (m_new_range.empty())
+    return m_range.get_data();
+  else
+    return m_new_range.get_data();
+}
+
+Uint32
+NdbTableImpl::getRangeListDataLen() const 
+{
+  if (m_new_range.empty())
+    return m_range.length();
+  else
+    return m_new_range.length();
+}
+
+int
+NdbTableImpl::updateMysqlName()
+{
+  Vector<BaseString> v;
+  if (m_internalName.split(v,"/") == 3)
+  {
+    return !m_mysqlName.assfmt("%s/%s",v[0].c_str(),v[2].c_str());
+  }
+  return !m_mysqlName.assign("");
+}
 
 int
 NdbTableImpl::buildColumnHash(){
   const Uint32 size = m_columns.size();
-
   int i;
   for(i = 31; i >= 0; i--){
     if(((1 << i) & size) != 0){
@@ -546,46 +1092,111 @@ NdbTableImpl::buildColumnHash(){
 Uint32
 NdbTableImpl::get_nodes(Uint32 hashValue, const Uint16 ** nodes) const
 {
-  if(m_replicaCount > 0)
+  Uint32 fragmentId;
+  if(m_replicaCount == 0)
+    return 0;
+  switch (m_fragmentType)
   {
-    Uint32 fragmentId = hashValue & m_hashValueMask;
-    if(fragmentId < m_hashpointerValue) 
+    case NdbDictionary::Object::FragAllSmall:
+    case NdbDictionary::Object::FragAllMedium:
+    case NdbDictionary::Object::FragAllLarge:
+    case NdbDictionary::Object::FragSingle:
+    case NdbDictionary::Object::DistrKeyLin:
     {
-      fragmentId = hashValue & ((m_hashValueMask << 1) + 1);
+      fragmentId = hashValue & m_hashValueMask;
+      if(fragmentId < m_hashpointerValue) 
+        fragmentId = hashValue & ((m_hashValueMask << 1) + 1);
+      break;
     }
-    Uint32 pos = fragmentId * m_replicaCount;
-    if(pos + m_replicaCount <= m_fragments.size())
+    case NdbDictionary::Object::DistrKeyHash:
     {
-      * nodes = m_fragments.getBase()+pos;
-      return m_replicaCount;
+      fragmentId = hashValue % m_fragmentCount;
+      break;
     }
+    default:
+      return 0;
+  }
+  Uint32 pos = fragmentId * m_replicaCount;
+  if (pos + m_replicaCount <= m_fragments.size())
+  {
+    *nodes = m_fragments.getBase()+pos;
+    return m_replicaCount;
   }
   return 0;
 }
+
+int
+NdbDictionary::Table::checkColumns(const Uint32* map, Uint32 len) const
+{
+  int ret = 0;
+  Uint32 colCnt = m_impl.m_columns.size();
+  if (map == 0)
+  {
+    ret |= 1;
+    ret |= (m_impl.m_noOfDiskColumns) ? 2 : 0;
+    ret |= (colCnt > m_impl.m_noOfDiskColumns) ? 4 : 0;
+    return ret;
+  }
+
+  NdbColumnImpl** cols = m_impl.m_columns.getBase();
+  const char * ptr = reinterpret_cast<const char*>(map);
+  const char * end = ptr + len;
+  Uint32 no = 0;
+  while (ptr < end)
+  {
+    Uint32 val = (Uint32)* ptr;
+    Uint32 idx = 1;
+    for (Uint32 i = 0; i<8; i++)
+    {
+      if (val & idx)
+      {
+	if (cols[no]->getPrimaryKey())
+	  ret |= 1;
+	else
+	{
+	  if (cols[no]->getStorageType() == NdbDictionary::Column::StorageTypeDisk)
+	    ret |= 2;
+	  else
+	    ret |= 4;
+	}
+      }
+      no ++;
+      idx *= 2; 
+      if (no == colCnt)
+	return ret;
+    }
+    
+    ptr++;
+  }
+  return ret;
+}
+
+
   
 /**
  * NdbIndexImpl
  */
 
 NdbIndexImpl::NdbIndexImpl() : 
-  NdbDictionary::Index(* this), 
-  m_facade(this)
+  NdbDictionary::Index(* this),
+  NdbDictObjectImpl(NdbDictionary::Object::OrderedIndex), m_facade(this)
 {
   init();
 }
 
 NdbIndexImpl::NdbIndexImpl(NdbDictionary::Index & f) : 
   NdbDictionary::Index(* this), 
-  m_facade(&f)
+  NdbDictObjectImpl(NdbDictionary::Object::OrderedIndex), m_facade(&f)
 {
   init();
 }
 
 void NdbIndexImpl::init()
 {
-  m_indexId= RNIL;
-  m_type= NdbDictionary::Index::Undefined;
+  m_id= RNIL;
+  m_type= NdbDictionary::Object::TypeUndefined;
   m_logging= true;
+  m_temporary= false;
   m_table= NULL;
 }
 
@@ -624,6 +1235,167 @@ NdbIndexImpl::getIndexTable() const
 }
 
 /**
+ * NdbEventImpl
+ */
+
+NdbEventImpl::NdbEventImpl() : 
+  NdbDictionary::Event(* this),
+  NdbDictObjectImpl(NdbDictionary::Object::TypeUndefined), m_facade(this)
+{
+  DBUG_ENTER("NdbEventImpl::NdbEventImpl");
+  DBUG_PRINT("info", ("this: %p", this));
+  init();
+  DBUG_VOID_RETURN;
+}
+
+NdbEventImpl::NdbEventImpl(NdbDictionary::Event & f) : 
+  NdbDictionary::Event(* this),
+  NdbDictObjectImpl(NdbDictionary::Object::TypeUndefined), m_facade(&f)
+{
+  DBUG_ENTER("NdbEventImpl::NdbEventImpl");
+  DBUG_PRINT("info", ("this: %p", this));
+  init();
+  DBUG_VOID_RETURN;
+}
+
+void NdbEventImpl::init()
+{
+  m_eventId= RNIL;
+  m_eventKey= RNIL;
+  mi_type= 0;
+  m_dur= NdbDictionary::Event::ED_UNDEFINED;
+  m_mergeEvents = false;
+  m_tableImpl= NULL;
+  m_rep= NdbDictionary::Event::ER_UPDATED;
+}
+
+NdbEventImpl::~NdbEventImpl()
+{
+  DBUG_ENTER("NdbEventImpl::~NdbEventImpl");
+  DBUG_PRINT("info", ("this: %p", this));
+  for (unsigned i = 0; i < m_columns.size(); i++)
+    delete  m_columns[i];
+  if (m_tableImpl)
+    delete m_tableImpl;
+  DBUG_VOID_RETURN;
+}
+
+int NdbEventImpl::setName(const char * name)
+{
+  return !m_name.assign(name);
+}
+
+const char *NdbEventImpl::getName() const
+{
+  return m_name.c_str();
+}
+
+int
+NdbEventImpl::setTable(const NdbDictionary::Table& table)
+{
+  setTable(&NdbTableImpl::getImpl(table));
+  return !m_tableName.assign(m_tableImpl->getName());
+}
+
+void 
+NdbEventImpl::setTable(NdbTableImpl *tableImpl)
+{
+  DBUG_ENTER("NdbEventImpl::setTable");
+  DBUG_PRINT("info", ("this: %p  tableImpl: %p", this, tableImpl));
+  DBUG_ASSERT(tableImpl->m_status != NdbDictionary::Object::Invalid);
+  if (!m_tableImpl) 
+    m_tableImpl = new NdbTableImpl();
+  // Copy table, since event might be accessed from different threads
+  m_tableImpl->assign(*tableImpl);
+  DBUG_VOID_RETURN;
+}
+
+const NdbDictionary::Table *
+NdbEventImpl::getTable() const
+{
+  if (m_tableImpl) 
+    return m_tableImpl->m_facade;
+  else
+    return NULL;
+}
+
+int
+NdbEventImpl::setTable(const char * table)
+{
+  return !m_tableName.assign(table);
+}
+
+const char *
+NdbEventImpl::getTableName() const
+{
+  return m_tableName.c_str();
+}
+
+void
+NdbEventImpl::addTableEvent(const NdbDictionary::Event::TableEvent t =  NdbDictionary::Event::TE_ALL)
+{
+  mi_type |= (unsigned)t;
+}
+
+bool
+NdbEventImpl::getTableEvent(const NdbDictionary::Event::TableEvent t) const
+{
+  return (mi_type & (unsigned)t) == (unsigned)t;
+}
+
+void
+NdbEventImpl::setDurability(NdbDictionary::Event::EventDurability d)
+{
+  m_dur = d;
+}
+
+NdbDictionary::Event::EventDurability
+NdbEventImpl::getDurability() const
+{
+  return m_dur;
+}
+
+void
+NdbEventImpl::setReport(NdbDictionary::Event::EventReport r)
+{
+  m_rep = r;
+}
+
+NdbDictionary::Event::EventReport
+NdbEventImpl::getReport() const
+{
+  return m_rep;
+}
+
+int NdbEventImpl::getNoOfEventColumns() const
+{
+  return m_attrIds.size() + m_columns.size();
+}
+
+const NdbDictionary::Column *
+NdbEventImpl::getEventColumn(unsigned no) const
+{
+  if (m_columns.size())
+  {
+    if (no < m_columns.size())
+    {
+      return m_columns[no];
+    }
+  }
+  else if (m_attrIds.size())
+  {
+    if (no < m_attrIds.size())
+    {
+      NdbTableImpl* tab= m_tableImpl;
+      if (tab == 0)
+        return 0;
+      return tab->getColumn(m_attrIds[no]);
+    }
+  }
+  return 0;
+}
+
+/**
  * NdbDictionaryImpl
  */
 
@@ -648,8 +1420,6 @@ NdbDictionaryImpl::NdbDictionaryImpl(Ndb
   m_local_table_data_size= 0;
 }
 
-static int f_dictionary_count = 0;
-
 NdbDictionaryImpl::~NdbDictionaryImpl()
 {
   NdbElement_t<Ndb_local_table_info> * curr = m_localHash.m_tableHash.getNext(0);
@@ -662,58 +1432,132 @@ NdbDictionaryImpl::~NdbDictionaryImpl()
       
       curr = m_localHash.m_tableHash.getNext(curr);
     }
-    
-    m_globalHash->lock();
-    if(--f_dictionary_count == 0){
-      delete NdbDictionary::Column::FRAGMENT; 
-      delete NdbDictionary::Column::FRAGMENT_MEMORY;
-      delete NdbDictionary::Column::ROW_COUNT;
-      delete NdbDictionary::Column::COMMIT_COUNT;
-      delete NdbDictionary::Column::ROW_SIZE;
-      delete NdbDictionary::Column::RANGE_NO;
-      NdbDictionary::Column::FRAGMENT= 0;
-      NdbDictionary::Column::FRAGMENT_MEMORY= 0;
-      NdbDictionary::Column::ROW_COUNT= 0;
-      NdbDictionary::Column::COMMIT_COUNT= 0;
-      NdbDictionary::Column::ROW_SIZE= 0;
-      NdbDictionary::Column::RANGE_NO= 0;
-    }
-    m_globalHash->unlock();
   } else {
     assert(curr == 0);
   }
 }
 
-Ndb_local_table_info *
-NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
+NdbTableImpl *
+NdbDictionaryImpl::fetchGlobalTableImplRef(const GlobalCacheInitObject &obj)
 {
+  DBUG_ENTER("fetchGlobalTableImplRef");
   NdbTableImpl *impl;
   int error= 0;
 
   m_globalHash->lock();
-  impl = m_globalHash->get(internalTableName.c_str(), &error);
+  impl = m_globalHash->get(obj.m_name.c_str(), &error);
   m_globalHash->unlock();
 
   if (impl == 0){
     if (error == 0)
-      impl = m_receiver.getTable(internalTableName,
+      impl = m_receiver.getTable(obj.m_name.c_str(),
                                  m_ndb.usingFullyQualifiedNames());
     else
       m_error.code = 4000;
+    if (impl != 0 && obj.init(*impl))
+    {
+      delete impl;
+      impl = 0;
+    }
     m_globalHash->lock();
-    m_globalHash->put(internalTableName.c_str(), impl);
+    m_globalHash->put(obj.m_name.c_str(), impl);
     m_globalHash->unlock();
-    
-    if(impl == 0){
-      return 0;
-    }
   }
 
+  DBUG_RETURN(impl);
+}
+
+void
+NdbDictionaryImpl::putTable(NdbTableImpl *impl)
+{
+  NdbTableImpl *old;
+
+  int ret = getBlobTables(*impl);
+  int error = 0;
+  assert(ret == 0);
+
+  m_globalHash->lock();
+  if ((old= m_globalHash->get(impl->m_internalName.c_str(), &error)))
+  {
+    m_globalHash->alter_table_rep(old->m_internalName.c_str(),
+                                  impl->m_id,
+                                  impl->m_version,
+                                  FALSE);
+  }
+  m_globalHash->put(impl->m_internalName.c_str(), impl);
+  m_globalHash->unlock();
   Ndb_local_table_info *info=
     Ndb_local_table_info::create(impl, m_local_table_data_size);
+  
+  m_localHash.put(impl->m_internalName.c_str(), info);
+}
+
+int
+NdbDictionaryImpl::getBlobTables(NdbTableImpl &t)
+{
+  unsigned n= t.m_noOfBlobs;
+  DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
+  // optimized for blob column being the last one
+  // and not looking for more than one if not neccessary
+  for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
+    i--;
+    NdbColumnImpl & c = *t.m_columns[i];
+    if (! c.getBlobType() || c.getPartSize() == 0)
+      continue;
+    n--;
+    // retrieve blob table def from DICT - by-pass cache
+    char btname[NdbBlobImpl::BlobTableNameSize];
+    NdbBlob::getBlobTableName(btname, &t, &c);
+    BaseString btname_internal = m_ndb.internalize_table_name(btname);
+    NdbTableImpl* bt =
+      m_receiver.getTable(btname_internal, m_ndb.usingFullyQualifiedNames());
+    if (bt == NULL)
+      DBUG_RETURN(-1);
 
-  m_localHash.put(internalTableName.c_str(), info);
-  return info;
+    // TODO check primary id/version when returned by DICT
+
+    // the blob column owns the blob table
+    assert(c.m_blobTable == NULL);
+    c.m_blobTable = bt;
+  }
+  DBUG_RETURN(0); 
+}
+
+NdbTableImpl*
+NdbDictionaryImpl::getBlobTable(const NdbTableImpl& tab, uint col_no)
+{
+  if (col_no < tab.m_columns.size()) {
+    NdbColumnImpl* col = tab.m_columns[col_no];
+    if (col != NULL) {
+      NdbTableImpl* bt = col->m_blobTable;
+      if (bt != NULL)
+        return bt;
+      else
+        m_error.code = 4273; // No blob table..
+    } else
+      m_error.code = 4249; // Invalid table..
+  } else
+    m_error.code = 4318; // Invalid attribute..
+  return NULL;
+}
+
+NdbTableImpl*
+NdbDictionaryImpl::getBlobTable(uint tab_id, uint col_no)
+{
+  DBUG_ENTER("NdbDictionaryImpl::getBlobTable");
+  DBUG_PRINT("enter", ("tab_id: %u col_no %u", tab_id, col_no));
+
+  NdbTableImpl* tab = m_receiver.getTable(tab_id,
+                                          m_ndb.usingFullyQualifiedNames());
+  if (tab == NULL)
+    DBUG_RETURN(NULL);
+  Ndb_local_table_info* info =
+    get_local_table_info(tab->m_internalName);
+  delete tab;
+  if (info == NULL)
+    DBUG_RETURN(NULL);
+  NdbTableImpl* bt = getBlobTable(*info->m_table_impl, col_no);
+  DBUG_RETURN(bt);
 }
 
 #if 0
@@ -735,22 +1579,6 @@ NdbDictionaryImpl::setTransporter(class 
 {
   m_globalHash = &tf->m_globalDictCache;
   if(m_receiver.setTransporter(ndb, tf)){
-    m_globalHash->lock();
-    if(f_dictionary_count++ == 0){
-      NdbDictionary::Column::FRAGMENT= 
-	NdbColumnImpl::create_psuedo("NDB$FRAGMENT");
-      NdbDictionary::Column::FRAGMENT_MEMORY= 
-	NdbColumnImpl::create_psuedo("NDB$FRAGMENT_MEMORY");
-      NdbDictionary::Column::ROW_COUNT= 
-	NdbColumnImpl::create_psuedo("NDB$ROW_COUNT");
-      NdbDictionary::Column::COMMIT_COUNT= 
-	NdbColumnImpl::create_psuedo("NDB$COMMIT_COUNT");
-      NdbDictionary::Column::ROW_SIZE=
-	NdbColumnImpl::create_psuedo("NDB$ROW_SIZE");
-      NdbDictionary::Column::RANGE_NO= 
-	NdbColumnImpl::create_psuedo("NDB$RANGE_NO");
-    }
-    m_globalHash->unlock();
     return true;
   }
   return false;
@@ -760,9 +1588,21 @@ NdbTableImpl *
 NdbDictionaryImpl::getIndexTable(NdbIndexImpl * index,
 				 NdbTableImpl * table)
 {
+  const char *current_db= m_ndb.getDatabaseName();
+  NdbTableImpl *index_table;
   const BaseString internalName(
     m_ndb.internalize_index_name(table, index->getName()));
-  return getTable(m_ndb.externalizeTableName(internalName.c_str()));
+  // Get index table in system database
+  m_ndb.setDatabaseName(NDB_SYSTEM_DATABASE);
+  index_table= getTable(m_ndb.externalizeTableName(internalName.c_str()));
+  m_ndb.setDatabaseName(current_db);
+  if (!index_table)
+  {
+    // Index table not found
+    // Try geting index table in current database (old format)
+    index_table= getTable(m_ndb.externalizeTableName(internalName.c_str()));    
+  }
+  return index_table;
 }
 
 #if 0
@@ -850,9 +1690,63 @@ NdbDictInterface::execSignal(void* dictI
   case GSN_DROP_INDX_CONF:
     tmp->execDROP_INDX_CONF(signal, ptr);
     break;
+  case GSN_CREATE_EVNT_REF:
+    tmp->execCREATE_EVNT_REF(signal, ptr);
+    break;
+  case GSN_CREATE_EVNT_CONF:
+    tmp->execCREATE_EVNT_CONF(signal, ptr);
+    break;
+  case GSN_SUB_START_CONF:
+    tmp->execSUB_START_CONF(signal, ptr);
+    break;
+  case GSN_SUB_START_REF:
+    tmp->execSUB_START_REF(signal, ptr);
+    break;
+  case GSN_SUB_STOP_CONF:
+    tmp->execSUB_STOP_CONF(signal, ptr);
+    break;
+  case GSN_SUB_STOP_REF:
+    tmp->execSUB_STOP_REF(signal, ptr);
+    break;
+  case GSN_DROP_EVNT_REF:
+    tmp->execDROP_EVNT_REF(signal, ptr);
+    break;
+  case GSN_DROP_EVNT_CONF:
+    tmp->execDROP_EVNT_CONF(signal, ptr);
+    break;
   case GSN_LIST_TABLES_CONF:
     tmp->execLIST_TABLES_CONF(signal, ptr);
     break;
+  case GSN_CREATE_FILEGROUP_REF:
+    tmp->execCREATE_FILEGROUP_REF(signal, ptr);
+    break;
+  case GSN_CREATE_FILEGROUP_CONF:
+    tmp->execCREATE_FILEGROUP_CONF(signal, ptr);
+    break;
+  case GSN_CREATE_FILE_REF:
+    tmp->execCREATE_FILE_REF(signal, ptr);
+    break;
+  case GSN_CREATE_FILE_CONF:
+    tmp->execCREATE_FILE_CONF(signal, ptr);
+    break;
+  case GSN_DROP_FILEGROUP_REF:
+    tmp->execDROP_FILEGROUP_REF(signal, ptr);
+    break;
+  case GSN_DROP_FILEGROUP_CONF:
+    tmp->execDROP_FILEGROUP_CONF(signal, ptr);
+    break;
+  case GSN_DROP_FILE_REF:
+    tmp->execDROP_FILE_REF(signal, ptr);
+    break;
+  case GSN_DROP_FILE_CONF:
+    tmp->execDROP_FILE_CONF(signal, ptr);
+    break;
+  case GSN_WAIT_GCP_CONF:
+    tmp->execWAIT_GCP_CONF(signal, ptr);
+    break;
+  case GSN_WAIT_GCP_REF:
+    tmp->execWAIT_GCP_REF(signal, ptr);
+    break;
   default:
     abort();
   }
@@ -874,73 +1768,57 @@ NdbDictInterface::execNodeStatus(void* d
 }
 
 int
-NdbDictInterface::dictSignal(NdbApiSignal* signal, 
-			     LinearSectionPtr ptr[3],int noLSP,
-			     const int useMasterNodeId,
-			     const Uint32 RETRIES,
-			     const WaitSignalType wst,
-			     const int theWait,
-			     const int *errcodes,
-			     const int noerrcodes,
-			     const int temporaryMask)
+NdbDictInterface::dictSignal(NdbApiSignal* sig, 
+			     LinearSectionPtr ptr[3], int secs,
+			     int node_specification,
+			     WaitSignalType wst,
+			     int timeout, Uint32 RETRIES,
+			     const int *errcodes, int temporaryMask)
 {
   DBUG_ENTER("NdbDictInterface::dictSignal");
-  DBUG_PRINT("enter", ("useMasterNodeId: %d", useMasterNodeId));
+  DBUG_PRINT("enter", ("useMasterNodeId: %d", node_specification));
   for(Uint32 i = 0; i<RETRIES; i++){
-    //if (useMasterNodeId == 0)
     m_buffer.clear();
 
     // Protected area
-    m_transporter->lock_mutex();
-    Uint32 aNodeId;
-    if (useMasterNodeId) {
-      if ((m_masterNodeId == 0) ||
-	  (!m_transporter->get_node_alive(m_masterNodeId))) {
-	m_masterNodeId = m_transporter->get_an_alive_node();
-      }//if
-      aNodeId = m_masterNodeId;
-    } else {
-      aNodeId = m_transporter->get_an_alive_node();
+    /*
+      The PollGuard has an implicit call of unlock_and_signal through the
+      ~PollGuard method. This method is called implicitly by the compiler
+      in all places where the object is out of context due to a return,
+      break, continue or simply end of statement block
+    */
+    PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
+    Uint32 node;
+    switch(node_specification){
+    case 0:
+      node = (m_transporter->get_node_alive(m_masterNodeId) ? m_masterNodeId :
+	      (m_masterNodeId = m_transporter->get_an_alive_node()));
+      break;
+    case -1:
+      node = m_transporter->get_an_alive_node();
+      break;
+    default:
+      node = node_specification;
     }
-    if(aNodeId == 0){
+    DBUG_PRINT("info", ("node %d", node));
+    if(node == 0){
       m_error.code= 4009;
-      m_transporter->unlock_mutex();
       DBUG_RETURN(-1);
     }
-    {
-      int r;
-      if (ptr) {
-#ifdef EVENT_DEBUG
-	printf("Long signal %d ptr", noLSP);
-	for (int q=0;q<noLSP;q++) {
-	  printf(" sz %d", ptr[q].sz);
-	}
-	printf("\n");
-#endif
-	r = m_transporter->sendFragmentedSignal(signal, aNodeId, ptr, noLSP);
-      } else {
-#ifdef EVENT_DEBUG
-	printf("Short signal\n");
-#endif
-	r = m_transporter->sendSignal(signal, aNodeId);
-      }
-      if(r != 0){
-        m_error.code= 4007;
-	m_transporter->unlock_mutex();
-	continue;
-      }
-    }
+    int res = (ptr ? 
+	       m_transporter->sendFragmentedSignal(sig, node, ptr, secs):
+	       m_transporter->sendSignal(sig, node));
+    if(res != 0){
+      DBUG_PRINT("info", ("dictSignal failed to send signal"));
+      m_error.code = 4007;
+      continue;
+    }    
     
     m_error.code= 0;
-    
-    m_waiter.m_node = aNodeId;
-    m_waiter.m_state = wst;
-
-    m_waiter.wait(theWait);
-    m_transporter->unlock_mutex();    
+    int ret_val= poll_guard.wait_n_unlock(timeout, node, wst);
     // End of Protected area  
     
-    if(m_waiter.m_state == NO_WAIT && m_error.code == 0){
+    if(ret_val == 0 && m_error.code == 0){
       // Normal return
       DBUG_RETURN(0);
     }
@@ -948,37 +1826,45 @@ NdbDictInterface::dictSignal(NdbApiSigna
     /**
      * Handle error codes
      */
-    if(m_waiter.m_state == WAIT_NODE_FAILURE)
+    if(ret_val == -2) //WAIT_NODE_FAILURE
     {
       m_error.code = 4013;
       continue;
     }
-
     if(m_waiter.m_state == WST_WAIT_TIMEOUT)
     {
+      DBUG_PRINT("info", ("dictSignal caught time-out"));
       m_error.code = 4008;
       DBUG_RETURN(-1);
     }
     
-    if ( (temporaryMask & m_error.code) != 0 ) {
+    if ( temporaryMask == -1)
+    {
+      const NdbError &error= getNdbError();
+      if (error.status ==  NdbError::TemporaryError)
+	continue;
+    }
+    else if ( (temporaryMask & m_error.code) != 0 ) {
       continue;
     }
-    if (errcodes) {
-      int doContinue = 0;
-      for (int j=0; j < noerrcodes; j++)
-	if(m_error.code == errcodes[j]) {
-	  doContinue = 1;
+    DBUG_PRINT("info", ("dictSignal caught error= %d", m_error.code));
+    
+    if(m_error.code && errcodes)
+    {
+      int j;
+      for(j = 0; errcodes[j] ; j++){
+	if(m_error.code == errcodes[j]){
 	  break;
 	}
-      if (doContinue)
+      }
+      if(errcodes[j]) // Accepted error code
 	continue;
     }
-
-    DBUG_RETURN(-1);
+    break;
   }
   DBUG_RETURN(-1);
 }
-#if 0
+
 /*
   Get dictionary information for a table using table id as reference
 
@@ -989,8 +1875,8 @@ NdbTableImpl *
 NdbDictInterface::getTable(int tableId, bool fullyQualifiedNames)
 {
   NdbApiSignal tSignal(m_reference);
-  GetTabInfoReq* const req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
-
+  GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
+  
   req->senderRef = m_reference;
   req->senderData = 0;
   req->requestType =
@@ -1002,8 +1888,6 @@ NdbDictInterface::getTable(int tableId, 
 
   return getTable(&tSignal, 0, 0, fullyQualifiedNames);
 }
-#endif
-
 
 /*
   Get dictionary information for a table using table name as the reference
@@ -1062,29 +1946,30 @@ NdbDictInterface::getTable(class NdbApiS
 			   LinearSectionPtr ptr[3],
 			   Uint32 noOfSections, bool fullyQualifiedNames)
 {
-  int errCodes[] = {GetTabInfoRef::Busy };
-
-  int r = dictSignal(signal,ptr,noOfSections,
-		     0/*do not use masternode id*/,
-		     100,
+  int errCodes[] = {GetTabInfoRef::Busy, 0 };
+  int r = dictSignal(signal, ptr, noOfSections,
+		     -1, // any node
 		     WAIT_GET_TAB_INFO_REQ,
-		     WAITFOR_RESPONSE_TIMEOUT,
-		     errCodes, 1);
-  if (r) return 0;
+		     DICT_WAITFOR_TIMEOUT, 100, errCodes);
 
+  if (r)
+    return 0;
+  
   NdbTableImpl * rt = 0;
-  m_error.code= parseTableInfo(&rt, 
-			       (Uint32*)m_buffer.get_data(), 
-			       m_buffer.length() / 4, fullyQualifiedNames);
-  if (rt != 0)
+  m_error.code = parseTableInfo(&rt, 
+				(Uint32*)m_buffer.get_data(), 
+  				m_buffer.length() / 4, 
+				fullyQualifiedNames);
+  if(rt)
   {
     if (rt->buildColumnHash())
     {
       m_error.code = 4000;
       delete rt;
       return NULL;
-    }
+     }
   }
+  
   return rt;
 }
 
@@ -1123,8 +2008,9 @@ void
 NdbDictInterface::execGET_TABINFO_REF(NdbApiSignal * signal,
 				      LinearSectionPtr ptr[3])
 {
-  const GetTabInfoRef* ref = CAST_CONSTPTR(GetTabInfoRef, signal->getDataPtr());
-
+  const GetTabInfoRef* ref = CAST_CONSTPTR(GetTabInfoRef, 
+					   signal->getDataPtr());
+  
   m_error.code= ref->errorCode;
   m_waiter.signal(NO_WAIT);
 }
@@ -1172,6 +2058,9 @@ fragmentTypeMapping[] = {
   { DictTabInfo::AllNodesMediumTable, NdbDictionary::Object::FragAllMedium },
   { DictTabInfo::AllNodesLargeTable,  NdbDictionary::Object::FragAllLarge },
   { DictTabInfo::SingleFragment,      NdbDictionary::Object::FragSingle },
+  { DictTabInfo::DistrKeyHash,      NdbDictionary::Object::DistrKeyHash },
+  { DictTabInfo::DistrKeyLin,      NdbDictionary::Object::DistrKeyLin },
+  { DictTabInfo::UserDefined,      NdbDictionary::Object::UserDefined },
   { -1, -1 }
 };
 
@@ -1186,6 +2075,10 @@ objectTypeMapping[] = {
   { DictTabInfo::IndexTrigger,       NdbDictionary::Object::IndexTrigger },
   { DictTabInfo::SubscriptionTrigger,NdbDictionary::Object::SubscriptionTrigger },
   { DictTabInfo::ReadOnlyConstraint ,NdbDictionary::Object::ReadOnlyConstraint },
+  { DictTabInfo::Tablespace,         NdbDictionary::Object::Tablespace },
+  { DictTabInfo::LogfileGroup,       NdbDictionary::Object::LogfileGroup },
+  { DictTabInfo::Datafile,           NdbDictionary::Object::Datafile },
+  { DictTabInfo::Undofile,           NdbDictionary::Object::Undofile },
   { -1, -1 }
 };
 
@@ -1204,7 +2097,7 @@ objectStateMapping[] = {
 static const
 ApiKernelMapping
 objectStoreMapping[] = {
-  { DictTabInfo::StoreTemporary,     NdbDictionary::Object::StoreTemporary },
+  { DictTabInfo::StoreNotLogged,     NdbDictionary::Object::StoreNotLogged },
   { DictTabInfo::StorePermanent,     NdbDictionary::Object::StorePermanent },
   { -1, -1 }
 };
@@ -1221,74 +2114,96 @@ int
 NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
 				 const Uint32 * data, Uint32 len,
 				 bool fullyQualifiedNames,
-				 bool hostByteOrder)
+                                 Uint32 version)
 {
-  DBUG_ENTER("NdbDictInterface::parseTableInfo");
-
   SimplePropertiesLinearReader it(data, len);
-  DictTabInfo::Table tableDesc; tableDesc.init();
+  DictTabInfo::Table *tableDesc;
   SimpleProperties::UnpackStatus s;
-  s = SimpleProperties::unpack(it, &tableDesc, 
+  DBUG_ENTER("NdbDictInterface::parseTableInfo");
+
+  tableDesc = (DictTabInfo::Table*)NdbMem_Allocate(sizeof(DictTabInfo::Table));
+  if (!tableDesc)
+  {
+    DBUG_RETURN(4000);
+  }
+  tableDesc->init();
+  s = SimpleProperties::unpack(it, tableDesc, 
 			       DictTabInfo::TableMapping, 
 			       DictTabInfo::TableMappingSize, 
 			       true, true);
   
   if(s != SimpleProperties::Break){
+    NdbMem_Free((void*)tableDesc);
     DBUG_RETURN(703);
   }
-  const char * internalName = tableDesc.TableName;
+  const char * internalName = tableDesc->TableName;
   const char * externalName = Ndb::externalizeTableName(internalName, fullyQualifiedNames);
 
   NdbTableImpl * impl = new NdbTableImpl();
-  impl->m_tableId = tableDesc.TableId;
-  impl->m_version = tableDesc.TableVersion;
+  impl->m_id = tableDesc->TableId;
+  impl->m_version = tableDesc->TableVersion;
   impl->m_status = NdbDictionary::Object::Retrieved;
   if (!impl->m_internalName.assign(internalName) ||
+      impl->updateMysqlName() ||
       !impl->m_externalName.assign(externalName) ||
-      impl->m_frm.assign(tableDesc.FrmData, tableDesc.FrmLen))
+      impl->m_frm.assign(tableDesc->FrmData, tableDesc->FrmLen) ||
+      impl->m_fd.assign(tableDesc->FragmentData, tableDesc->FragmentDataLen) ||
+      impl->m_range.assign(tableDesc->RangeListData, tableDesc->RangeListDataLen))
   {
     DBUG_RETURN(4000);
   }
+  impl->m_fragmentCount = tableDesc->FragmentCount;
+
+  /*
+    We specifically don't get tablespace data and range/list arrays here
+    since those are known by the MySQL Server through analysing the
+    frm file.
+    Fragment Data contains the real node group mapping and the fragment
+    identities used for each fragment. At the moment we have no need for
+    this.
+    Frm file is needed for autodiscovery.
+  */
   
   impl->m_fragmentType = (NdbDictionary::Object::FragmentType)
-    getApiConstant(tableDesc.FragmentType, 
+    getApiConstant(tableDesc->FragmentType, 
 		   fragmentTypeMapping, 
 		   (Uint32)NdbDictionary::Object::FragUndefined);
   
-  Uint64 max_rows = ((Uint64)tableDesc.MaxRowsHigh) << 32;
-  max_rows += tableDesc.MaxRowsLow;
+  Uint64 max_rows = ((Uint64)tableDesc->MaxRowsHigh) << 32;
+  max_rows += tableDesc->MaxRowsLow;
   impl->m_max_rows = max_rows;
-  Uint64 min_rows = ((Uint64)tableDesc.MinRowsHigh) << 32;
-  min_rows += tableDesc.MinRowsLow;
+  Uint64 min_rows = ((Uint64)tableDesc->MinRowsHigh) << 32;
+  min_rows += tableDesc->MinRowsLow;
   impl->m_min_rows = min_rows;
-  impl->m_logging = tableDesc.TableLoggedFlag;
-  impl->m_kvalue = tableDesc.TableKValue;
-  impl->m_minLoadFactor = tableDesc.MinLoadFactor;
-  impl->m_maxLoadFactor = tableDesc.MaxLoadFactor;
-  impl->m_single_user_mode = tableDesc.SingleUserMode;
+  impl->m_default_no_part_flag = tableDesc->DefaultNoPartFlag;
+  impl->m_linear_flag = tableDesc->LinearHashFlag;
+  impl->m_logging = tableDesc->TableLoggedFlag;
+  impl->m_temporary = tableDesc->TableTemporaryFlag;
+  impl->m_row_gci = tableDesc->RowGCIFlag;
+  impl->m_row_checksum = tableDesc->RowChecksumFlag;
+  impl->m_force_var_part = tableDesc->ForceVarPartFlag;
+  impl->m_kvalue = tableDesc->TableKValue;
+  impl->m_minLoadFactor = tableDesc->MinLoadFactor;
+  impl->m_maxLoadFactor = tableDesc->MaxLoadFactor;
+  impl->m_single_user_mode = tableDesc->SingleUserMode;
 
-  impl->m_indexType = (NdbDictionary::Index::Type)
-    getApiConstant(tableDesc.TableType,
+  impl->m_indexType = (NdbDictionary::Object::Type)
+    getApiConstant(tableDesc->TableType,
 		   indexTypeMapping,
-		   NdbDictionary::Index::Undefined);
+		   NdbDictionary::Object::TypeUndefined);
   
-  if(impl->m_indexType == NdbDictionary::Index::Undefined){
+  if(impl->m_indexType == NdbDictionary::Object::TypeUndefined){
   } else {
     const char * externalPrimary = 
-      Ndb::externalizeTableName(tableDesc.PrimaryTable, fullyQualifiedNames);
+      Ndb::externalizeTableName(tableDesc->PrimaryTable, fullyQualifiedNames);
     if (!impl->m_primaryTable.assign(externalPrimary))
     {
       DBUG_RETURN(4000);
     }
   }
   
-  Uint32 keyInfoPos = 0;
-  Uint32 keyCount = 0;
-  Uint32 blobCount = 0;
-  Uint32 distKeys = 0;
-  
   Uint32 i;
-  for(i = 0; i < tableDesc.NoOfAttributes; i++) {
+  for(i = 0; i < tableDesc->NoOfAttributes; i++) {
     DictTabInfo::Attribute attrDesc; attrDesc.init();
     s = SimpleProperties::unpack(it, 
 				 &attrDesc, 
@@ -1297,6 +2212,7 @@ NdbDictInterface::parseTableInfo(NdbTabl
 				 true, true);
     if(s != SimpleProperties::Break){
       delete impl;
+      NdbMem_Free((void*)tableDesc);
       DBUG_RETURN(703);
     }
     
@@ -1308,6 +2224,7 @@ NdbDictInterface::parseTableInfo(NdbTabl
     if (! attrDesc.translateExtType()) {
       delete col;
       delete impl;
+      NdbMem_Free((void*)tableDesc);
       DBUG_RETURN(703);
     }
     col->m_type = (NdbDictionary::Column::Type)attrDesc.AttributeExtType;
@@ -1320,6 +2237,7 @@ NdbDictInterface::parseTableInfo(NdbTabl
     if (col->getCharType() != (cs_number != 0)) {
       delete col;
       delete impl;
+      NdbMem_Free((void*)tableDesc);
       DBUG_RETURN(703);
     }
     if (col->getCharType()) {
@@ -1327,21 +2245,24 @@ NdbDictInterface::parseTableInfo(NdbTabl
       if (col->m_cs == NULL) {
         delete col;
         delete impl;
+        NdbMem_Free((void*)tableDesc);
         DBUG_RETURN(743);
       }
     }
     col->m_attrSize = (1 << attrDesc.AttributeSize) / 8;
     col->m_arraySize = attrDesc.AttributeArraySize;
+    col->m_arrayType = attrDesc.AttributeArrayType;
     if(attrDesc.AttributeSize == 0)
     {
       col->m_attrSize = 4;
       col->m_arraySize = (attrDesc.AttributeArraySize + 31) >> 5;
     }
+    col->m_storageType = attrDesc.AttributeStorageType;
     
     col->m_pk = attrDesc.AttributeKeyFlag;
-    col->m_distributionKey = attrDesc.AttributeDKey;
+    col->m_distributionKey = (attrDesc.AttributeDKey != 0);
     col->m_nullable = attrDesc.AttributeNullableFlag;
-    col->m_autoIncrement = (attrDesc.AttributeAutoIncrement ? true : false);
+    col->m_autoIncrement = (attrDesc.AttributeAutoIncrement != 0);
     col->m_autoIncrementInitialValue = ~0;
     if (!col->m_defaultValue.assign(attrDesc.AttributeDefaultValue))
     {
@@ -1350,54 +2271,32 @@ NdbDictInterface::parseTableInfo(NdbTabl
       DBUG_RETURN(4000);
     }
 
-    if(attrDesc.AttributeKeyFlag){
-      col->m_keyInfoPos = keyInfoPos + 1;
-      keyInfoPos += ((col->m_attrSize * col->m_arraySize + 3) / 4);
-      keyCount++;
-      
-      if(attrDesc.AttributeDKey)
-	distKeys++;
-    } else {
-      col->m_keyInfoPos = 0;
-    }
-    if (col->getBlobType())
-      blobCount++;
-    NdbColumnImpl * null = 0;
-    impl->m_columns.fill(attrDesc.AttributeId, null);
-    if(impl->m_columns[attrDesc.AttributeId] != 0){
-      delete col;
-      delete impl;
-      DBUG_RETURN(703);
-    }
-    impl->m_columns[attrDesc.AttributeId] = col;
+    col->m_column_no = impl->m_columns.size();
+    impl->m_columns.push_back(col);
     it.next();
   }
 
-  impl->m_noOfKeys = keyCount;
-  impl->m_keyLenInWords = keyInfoPos;
-  impl->m_noOfBlobs = blobCount;
-  impl->m_noOfDistributionKeys = distKeys;
+  impl->computeAggregates();
 
-  if(tableDesc.FragmentDataLen > 0)
+  if(tableDesc->ReplicaDataLen > 0)
   {
-    Uint16 replicaCount = tableDesc.FragmentData[0];
-    Uint16 fragCount = tableDesc.FragmentData[1];
-
-    if(hostByteOrder == false)
-    {
-      replicaCount = ((replicaCount & 0xFF00) >> 8) |((replicaCount & 0x00FF) << 8);
-      fragCount = ((fragCount & 0xFF00) >> 8) |((fragCount & 0x00FF) << 8);
-    }
+    Uint16 replicaCount = ntohs(tableDesc->ReplicaData[0]);
+    Uint16 fragCount = ntohs(tableDesc->ReplicaData[1]);
 
     impl->m_replicaCount = replicaCount;
     impl->m_fragmentCount = fragCount;
-
-    for(i = 0; i<(fragCount*replicaCount); i++)
+    DBUG_PRINT("info", ("replicaCount=%x , fragCount=%x",replicaCount,fragCount));
+    Uint32 pos = 2;
+    for(i = 0; i < (Uint32) fragCount;i++)
     {
-      if (impl->m_fragments.push_back(tableDesc.FragmentData[i+2]))
+      pos++; // skip logpart
+      for (Uint32 j = 0; j<(Uint32)replicaCount; j++)
       {
-        delete impl;
-        DBUG_RETURN(4000);
+	if (impl->m_fragments.push_back(ntohs(tableDesc->ReplicaData[pos++])))
+	{
+          delete impl;
+          DBUG_RETURN(4000);
+        }
       }
     }
 
@@ -1410,23 +2309,26 @@ NdbDictInterface::parseTableInfo(NdbTabl
   }
   else
   {
-    impl->m_fragmentCount = tableDesc.FragmentCount;
+    impl->m_fragmentCount = tableDesc->FragmentCount;
     impl->m_replicaCount = 0;
     impl->m_hashValueMask = 0;
     impl->m_hashpointerValue = 0;
   }
 
-  if(distKeys == 0)
-  {
-    for(i = 0; i < tableDesc.NoOfAttributes; i++)
-    {
-      if(impl->m_columns[i]->getPrimaryKey())
-	impl->m_columns[i]->m_distributionKey = true;
-    }
-  }
+  impl->m_tablespace_id = tableDesc->TablespaceId;
+  impl->m_tablespace_version = tableDesc->TablespaceVersion;
   
   * ret = impl;
 
+  NdbMem_Free((void*)tableDesc);
+  if (version < MAKE_VERSION(5,1,3))
+  {
+    ;
+  } 
+  else
+  {
+    DBUG_ASSERT(impl->m_fragmentCount > 0);
+  }
   DBUG_RETURN(0);
 }
 
@@ -1436,79 +2338,114 @@ NdbDictInterface::parseTableInfo(NdbTabl
 int
 NdbDictionaryImpl::createTable(NdbTableImpl &t)
 { 
+  DBUG_ENTER("NdbDictionaryImpl::createTable");
+
+  // if the new name has not been set, use the copied name
+  if (t.m_newExternalName.empty())
+  {
+    if (!t.m_newExternalName.assign(t.m_externalName))
+    {
+      m_error.code= 4000;
+      DBUG_RETURN(-1);
+    }
+  }
+  // create table
   if (m_receiver.createTable(m_ndb, t) != 0)
-    return -1;
-  if (t.m_noOfBlobs == 0)
-    return 0;
-  // update table def from DICT
-  Ndb_local_table_info *info=
-    get_local_table_info(t.m_internalName,false);
-  if (info == NULL) {
-    m_error.code= 709;
-    return -1;
+    DBUG_RETURN(-1);
+  Uint32* data = (Uint32*)m_receiver.m_buffer.get_data();
+  t.m_id = data[0];
+  t.m_version = data[1];
+
+  // update table def from DICT - by-pass cache
+  NdbTableImpl* t2 =
+    m_receiver.getTable(t.m_internalName, m_ndb.usingFullyQualifiedNames());
+
+  // check if we got back same table
+  if (t2 == NULL) {
+    DBUG_PRINT("info", ("table %s dropped by another thread", 
+                        t.m_internalName.c_str()));
+    m_error.code = 283;
+    DBUG_RETURN(-1);
+  }
+  if (t.m_id != t2->m_id || t.m_version != t2->m_version) {
+    DBUG_PRINT("info", ("table %s re-created by another thread",
+                        t.m_internalName.c_str()));
+    m_error.code = 283;
+    delete t2;
+    DBUG_RETURN(-1);
+  }
+
+  // auto-increment - use "t" because initial value is not in DICT
+  {
+    bool autoIncrement = false;
+    Uint64 initialValue = 0;
+    for (Uint32 i = 0; i < t.m_columns.size(); i++) {
+      const NdbColumnImpl* c = t.m_columns[i];
+      assert(c != NULL);
+      if (c->m_autoIncrement) {
+        if (autoIncrement) {
+          m_error.code = 4335;
+          delete t2;
+          DBUG_RETURN(-1);
+        }
+        autoIncrement = true;
+        initialValue = c->m_autoIncrementInitialValue;
+      }
+    }
+    if (autoIncrement) {
+      // XXX unlikely race condition - t.m_id may no longer be same table
+      // the tuple id range is not used on input
+      Ndb::TupleIdRange range;
+      if (m_ndb.setTupleIdInNdb(&t, range, initialValue, false) == -1) {
+        assert(m_ndb.theError.code != 0);
+        m_error.code = m_ndb.theError.code;
+        delete t2;
+        DBUG_RETURN(-1);
+      }
+    }
   }
-  if (createBlobTables(*(info->m_table_impl)) != 0) {
+
+  // blob tables - use "t2" to get values set by kernel
+  if (t2->m_noOfBlobs != 0 && createBlobTables(t, *t2) != 0) {
     int save_code = m_error.code;
-    (void)dropTable(t);
-    m_error.code= save_code;
-    return -1;
+    (void)dropTableGlobal(*t2);
+    m_error.code = save_code;
+    delete t2;
+    DBUG_RETURN(-1);
   }
-  return 0;
+
+  // not entered in cache
+  delete t2;
+  DBUG_RETURN(0);
 }
 
 int
-NdbDictionaryImpl::createBlobTables(NdbTableImpl &t)
+NdbDictionaryImpl::createBlobTables(NdbTableImpl& orig, NdbTableImpl &t)
 {
+  DBUG_ENTER("NdbDictionaryImpl::createBlobTables");
   for (unsigned i = 0; i < t.m_columns.size(); i++) {
     NdbColumnImpl & c = *t.m_columns[i];
     if (! c.getBlobType() || c.getPartSize() == 0)
       continue;
     NdbTableImpl bt;
     NdbBlob::getBlobTable(bt, &t, &c);
-    if (createTable(bt) != 0)
-      return -1;
-    // Save BLOB table handle
-    Ndb_local_table_info *info=
-      get_local_table_info(bt.m_internalName, false);
-    if (info == 0) {
-      return -1;
-    }
-    c.m_blobTable = info->m_table_impl;
-  }
-  
-  return 0;
-}
-
-int
-NdbDictionaryImpl::addBlobTables(NdbTableImpl &t)
-{
-  unsigned n= t.m_noOfBlobs;
-  // optimized for blob column being the last one
-  // and not looking for more than one if not neccessary
-  for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
-    i--;
-    NdbColumnImpl & c = *t.m_columns[i];
-    if (! c.getBlobType() || c.getPartSize() == 0)
-      continue;
-    n--;
-    char btname[NdbBlobImpl::BlobTableNameSize];
-    NdbBlob::getBlobTableName(btname, &t, &c);
-    // Save BLOB table handle
-    NdbTableImpl * cachedBlobTable = getTable(btname);
-    if (cachedBlobTable == 0) {
-      return -1;
+    NdbDictionary::Column::StorageType 
+      d = NdbDictionary::Column::StorageTypeDisk;
+    if (orig.m_columns[i]->getStorageType() == d)
+      bt.getColumn("DATA")->setStorageType(d);
+    if (createTable(bt) != 0) {
+      DBUG_RETURN(-1);
     }
-    c.m_blobTable = cachedBlobTable;
   }
-  
-  return 0;
+  DBUG_RETURN(0); 
 }
 
 int 
 NdbDictInterface::createTable(Ndb & ndb,
 			      NdbTableImpl & impl)
 {
-  return createOrAlterTable(ndb, impl, false);
+  DBUG_ENTER("NdbDictInterface::createTable");
+  DBUG_RETURN(createOrAlterTable(ndb, impl, false));
 }
 
 int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)
@@ -1518,30 +2455,43 @@ int NdbDictionaryImpl::alterTable(NdbTab
 
   DBUG_ENTER("NdbDictionaryImpl::alterTable");
   Ndb_local_table_info * local = 0;
-  if((local= get_local_table_info(originalInternalName, false)) == 0)
+  if((local= get_local_table_info(originalInternalName)) == 0)
   {
     m_error.code = 709;
     DBUG_RETURN(-1);
   }
 
   // Alter the table
-  int ret = m_receiver.alterTable(m_ndb, impl);
-  if(ret == 0){
-    // Remove cached information and let it be refreshed at next access
+  int ret = alterTableGlobal(*local->m_table_impl, impl);
+  if(ret == 0)
+  {
     m_globalHash->lock();
-    local->m_table_impl->m_status = NdbDictionary::Object::Invalid;
-    m_globalHash->drop(local->m_table_impl);
+    m_globalHash->release(local->m_table_impl, 1);
     m_globalHash->unlock();
     m_localHash.drop(originalInternalName);
   }
   DBUG_RETURN(ret);
 }
 
+int NdbDictionaryImpl::alterTableGlobal(NdbTableImpl &old_impl,
+                                        NdbTableImpl &impl)
+{
+  DBUG_ENTER("NdbDictionaryImpl::alterTableGlobal");
+  // Alter the table
+  int ret = m_receiver.alterTable(m_ndb, impl);
+  old_impl.m_status = NdbDictionary::Object::Invalid;
+  if(ret == 0){
+    DBUG_RETURN(ret);
+  }
+  ERR_RETURN(getNdbError(), ret);
+}
+
 int 
 NdbDictInterface::alterTable(Ndb & ndb,
 			      NdbTableImpl & impl)
 {
-  return createOrAlterTable(ndb, impl, true);
+  DBUG_ENTER("NdbDictInterface::alterTable");
+  DBUG_RETURN(createOrAlterTable(ndb, impl, true));
 }
 
 int 
@@ -1549,8 +2499,12 @@ NdbDictInterface::createOrAlterTable(Ndb
 				     NdbTableImpl & impl,
 				     bool alter)
 {
-  DBUG_ENTER("NdbDictInterface::createOrAlterTable");
   unsigned i, err;
+  char *ts_names[MAX_NDB_PARTITIONS];
+  DBUG_ENTER("NdbDictInterface::createOrAlterTable");
+
+  impl.computeAggregates();
+
   if((unsigned)impl.getNoOfPrimaryKeys() > NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY){
     m_error.code= 4317;
     DBUG_RETURN(-1);
@@ -1560,16 +2514,102 @@ NdbDictInterface::createOrAlterTable(Ndb
     m_error.code= 4318;
     DBUG_RETURN(-1);
   }
-
+  
+  // Check if any changes for alter table
+  
+  // Name change
   if (!impl.m_newExternalName.empty()) {
+    if (alter)
+    {
+      AlterTableReq::setNameFlag(impl.m_changeMask, true);
+    }
     if (!impl.m_externalName.assign(impl.m_newExternalName))
     {
       m_error.code= 4000;
       DBUG_RETURN(-1);
     }
-    AlterTableReq::setNameFlag(impl.m_changeMask, true);
+    impl.m_newExternalName.clear();
+  }
+  // Definition change (frm)
+  if (!impl.m_newFrm.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setFrmFlag(impl.m_changeMask, true);
+    }
+    if (impl.m_frm.assign(impl.m_newFrm.get_data(), impl.m_newFrm.length()))
+    {
+      m_error.code= 4000;
+      DBUG_RETURN(-1);
+    }
+    impl.m_newFrm.clear();
+  }
+  // Change FragmentData (fragment identity, state, tablespace id)
+  if (!impl.m_new_fd.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setFragDataFlag(impl.m_changeMask, true);
+    }
+    if (impl.m_fd.assign(impl.m_new_fd.get_data(), impl.m_new_fd.length()))
+    {
+      m_error.code= 4000;
+      DBUG_RETURN(-1);
+    }
+    impl.m_new_fd.clear();
+  }
+  // Change Tablespace Name Data
+  if (!impl.m_new_ts_name.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setTsNameFlag(impl.m_changeMask, true);
+    }
+    if (impl.m_ts_name.assign(impl.m_new_ts_name.get_data(),
+                              impl.m_new_ts_name.length()))
+    {
+      m_error.code= 4000;
+      DBUG_RETURN(-1);
+    }
+    impl.m_new_ts_name.clear();
+  }
+  // Change Range/List Data
+  if (!impl.m_new_range.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setRangeListFlag(impl.m_changeMask, true);
+    }
+    if (impl.m_range.assign(impl.m_new_range.get_data(),
+                            impl.m_new_range.length()))
+    {
+      m_error.code= 4000;
+      DBUG_RETURN(-1);
+    }
+    impl.m_new_range.clear();
+  }
+  // Change Tablespace Data
+  if (!impl.m_new_ts.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setTsFlag(impl.m_changeMask, true);
+    }
+    if (impl.m_ts.assign(impl.m_new_ts.get_data(),
+                         impl.m_new_ts.length()))
+    {
+      m_error.code= 4000;
+      DBUG_RETURN(-1);
+    }
+    impl.m_new_ts.clear();
   }
 
+
+  /*
+     TODO RONM: Here I need to insert checks for fragment array and
+     range or list array
+  */
+  
   //validate();
   //aggregate();
 
@@ -1580,67 +2620,175 @@ NdbDictInterface::createOrAlterTable(Ndb
     m_error.code= 4000;
     DBUG_RETURN(-1);
   }
-  UtilBufferWriter w(m_buffer);
-  DictTabInfo::Table tmpTab; tmpTab.init();
-  BaseString::snprintf(tmpTab.TableName,
-	   sizeof(tmpTab.TableName),
+  impl.updateMysqlName();
+  DictTabInfo::Table *tmpTab;
+
+  tmpTab = (DictTabInfo::Table*)NdbMem_Allocate(sizeof(DictTabInfo::Table));
+  if (!tmpTab)
+  {
+    m_error.code = 4000;
+    DBUG_RETURN(-1);
+  }
+  tmpTab->init();
+  BaseString::snprintf(tmpTab->TableName,
+	   sizeof(tmpTab->TableName),
 	   internalName.c_str());
 
-  bool haveAutoIncrement = false;
-  Uint64 autoIncrementValue = 0;
   Uint32 distKeys= 0;
-  for(i = 0; i<sz; i++){
+  for(i = 0; i<sz; i++) {
     const NdbColumnImpl * col = impl.m_columns[i];
-    if(col == 0)
-      continue;
-    if (col->m_autoIncrement) {
-      if (haveAutoIncrement) {
-        m_error.code= 4335;
-        DBUG_RETURN(-1);
-      }
-      haveAutoIncrement = true;
-      autoIncrementValue = col->m_autoIncrementInitialValue;
+    if (col == NULL) {
+      m_error.code = 4272;
+      NdbMem_Free((void*)tmpTab);
+      DBUG_RETURN(-1);
     }
     if (col->m_distributionKey)
+    {
       distKeys++;
+    }
   }
+  if (distKeys == impl.m_noOfKeys)
+    distKeys= 0;
+  impl.m_noOfDistributionKeys= distKeys;
+
 
   // Check max length of frm data
   if (impl.m_frm.length() > MAX_FRM_DATA_SIZE){
     m_error.code= 1229;
+    NdbMem_Free((void*)tmpTab);
     DBUG_RETURN(-1);
   }
-  tmpTab.FrmLen = impl.m_frm.length();
-  memcpy(tmpTab.FrmData, impl.m_frm.get_data(), impl.m_frm.length());
+  /*
+    TODO RONM: This needs to change to dynamic arrays instead
+    Frm Data, FragmentData, TablespaceData, RangeListData, TsNameData
+  */
+  tmpTab->FrmLen = impl.m_frm.length();
+  memcpy(tmpTab->FrmData, impl.m_frm.get_data(), impl.m_frm.length());
+
+  tmpTab->FragmentDataLen = impl.m_fd.length();
+  memcpy(tmpTab->FragmentData, impl.m_fd.get_data(), impl.m_fd.length());
+
+  tmpTab->TablespaceDataLen = impl.m_ts.length();
+  memcpy(tmpTab->TablespaceData, impl.m_ts.get_data(), impl.m_ts.length());
+
+  tmpTab->RangeListDataLen = impl.m_range.length();
+  memcpy(tmpTab->RangeListData, impl.m_range.get_data(),
+         impl.m_range.length());
+
+  memcpy(ts_names, impl.m_ts_name.get_data(),
+         impl.m_ts_name.length());
+
+  tmpTab->FragmentCount= impl.m_fragmentCount;
+  tmpTab->TableLoggedFlag = impl.m_logging;
+  tmpTab->TableTemporaryFlag = impl.m_temporary;
+  tmpTab->RowGCIFlag = impl.m_row_gci;
+  tmpTab->RowChecksumFlag = impl.m_row_checksum;
+  tmpTab->TableKValue = impl.m_kvalue;
+  tmpTab->MinLoadFactor = impl.m_minLoadFactor;
+  tmpTab->MaxLoadFactor = impl.m_maxLoadFactor;
+  tmpTab->TableType = DictTabInfo::UserTable;
+  tmpTab->PrimaryTableId = impl.m_primaryTableId;
+  tmpTab->NoOfAttributes = sz;
+  tmpTab->MaxRowsHigh = (Uint32)(impl.m_max_rows >> 32);
+  tmpTab->MaxRowsLow = (Uint32)(impl.m_max_rows & 0xFFFFFFFF);
+  tmpTab->MinRowsHigh = (Uint32)(impl.m_min_rows >> 32);
+  tmpTab->MinRowsLow = (Uint32)(impl.m_min_rows & 0xFFFFFFFF);
+  tmpTab->DefaultNoPartFlag = impl.m_default_no_part_flag;
+  tmpTab->LinearHashFlag = impl.m_linear_flag;
+  tmpTab->SingleUserMode = impl.m_single_user_mode;
+  tmpTab->ForceVarPartFlag = impl.m_force_var_part;
 
-  tmpTab.TableLoggedFlag = impl.m_logging;
-  tmpTab.TableKValue = impl.m_kvalue;
-  tmpTab.MinLoadFactor = impl.m_minLoadFactor;
-  tmpTab.MaxLoadFactor = impl.m_maxLoadFactor;
-  tmpTab.TableType = DictTabInfo::UserTable;
-  tmpTab.NoOfAttributes = sz;
-  tmpTab.MaxRowsHigh = (Uint32)(impl.m_max_rows >> 32);
-  tmpTab.MaxRowsLow = (Uint32)(impl.m_max_rows & 0xFFFFFFFF);
-  tmpTab.MinRowsHigh = (Uint32)(impl.m_min_rows >> 32);
-  tmpTab.MinRowsLow = (Uint32)(impl.m_min_rows & 0xFFFFFFFF);
-
-  tmpTab.SingleUserMode = impl.m_single_user_mode;
-
-  tmpTab.FragmentType = getKernelConstant(impl.m_fragmentType,
-					  fragmentTypeMapping,
-					  DictTabInfo::AllNodesSmallTable);
-  tmpTab.TableVersion = rand();
+  if (impl.m_ts_name.length())
+  {
+    char **ts_name_ptr= (char**)ts_names;
+    i= 0;
+    do
+    {
+      NdbTablespaceImpl tmp;
+      if (*ts_name_ptr)
+      {
+        if(get_filegroup(tmp, NdbDictionary::Object::Tablespace, 
+                         (const char*)*ts_name_ptr) == 0)
+        {
+          tmpTab->TablespaceData[2*i] = tmp.m_id;
+          tmpTab->TablespaceData[2*i + 1] = tmp.m_version;
+        }
+        else
+        { 
+          NdbMem_Free((void*)tmpTab);
+          DBUG_RETURN(-1);
+        }
+      }
+      else
+      {
+        /*
+          No tablespace used, set tablespace id to NULL
+        */
+        tmpTab->TablespaceData[2*i] = RNIL;
+        tmpTab->TablespaceData[2*i + 1] = 0;
+      }
+      ts_name_ptr++;
+    } while (++i < tmpTab->FragmentCount);
+    tmpTab->TablespaceDataLen= 4*i;
+  }
 
+  tmpTab->FragmentType = getKernelConstant(impl.m_fragmentType,
+ 					   fragmentTypeMapping,
+					   DictTabInfo::AllNodesSmallTable);
+  tmpTab->TableVersion = rand();
+
+  const char *tablespace_name= impl.m_tablespace_name.c_str();
+loop:
+  if(impl.m_tablespace_id != ~(Uint32)0)
+  {
+    tmpTab->TablespaceId = impl.m_tablespace_id;
+    tmpTab->TablespaceVersion = impl.m_tablespace_version;
+  }
+  else if(strlen(tablespace_name))
+  {
+    NdbTablespaceImpl tmp;
+    if(get_filegroup(tmp, NdbDictionary::Object::Tablespace, 
+		     tablespace_name) == 0)
+    {
+      tmpTab->TablespaceId = tmp.m_id;
+      tmpTab->TablespaceVersion = tmp.m_version;
+    }
+    else 
+    {
+      // error set by get filegroup
+      if (m_error.code == 723)
+	m_error.code = 755;
+      
+      NdbMem_Free((void*)tmpTab);
+      DBUG_RETURN(-1);
+    }
+  } 
+  else
+  {
+    for(i = 0; i<sz; i++)
+    {
+      if(impl.m_columns[i]->m_storageType == NDB_STORAGETYPE_DISK)
+      {
+	tablespace_name = "DEFAULT-TS";
+	goto loop;
+      }
+    }
+  }
+  
+  UtilBufferWriter w(m_buffer);
   SimpleProperties::UnpackStatus s;
   s = SimpleProperties::pack(w, 
-			     &tmpTab,
+			     tmpTab,
 			     DictTabInfo::TableMapping, 
 			     DictTabInfo::TableMappingSize, true);
   
   if(s != SimpleProperties::Eof){
     abort();
   }
+  NdbMem_Free((void*)tmpTab);
   
+  DBUG_PRINT("info",("impl.m_noOfDistributionKeys: %d impl.m_noOfKeys: %d distKeys: %d",
+		     impl.m_noOfDistributionKeys, impl.m_noOfKeys, distKeys));
   if (distKeys == impl.m_noOfKeys)
     distKeys= 0;
   impl.m_noOfDistributionKeys= distKeys;
@@ -1650,10 +2798,12 @@ NdbDictInterface::createOrAlterTable(Ndb
     if(col == 0)
       continue;
     
+    DBUG_PRINT("info",("column: %s(%d) col->m_distributionKey: %d",
+		       col->m_name.c_str(), i, col->m_distributionKey));
     DictTabInfo::Attribute tmpAttr; tmpAttr.init();
     BaseString::snprintf(tmpAttr.AttributeName, sizeof(tmpAttr.AttributeName), 
 	     col->m_name.c_str());
-    tmpAttr.AttributeId = i;
+    tmpAttr.AttributeId = col->m_attrId;
     tmpAttr.AttributeKeyFlag = col->m_pk;
     tmpAttr.AttributeNullableFlag = col->m_nullable;
     tmpAttr.AttributeDKey = distKeys ? col->m_distributionKey : 0;
@@ -1662,7 +2812,19 @@ NdbDictInterface::createOrAlterTable(Ndb
     tmpAttr.AttributeExtPrecision = ((unsigned)col->m_precision & 0xFFFF);
     tmpAttr.AttributeExtScale = col->m_scale;
     tmpAttr.AttributeExtLength = col->m_length;
+    if(col->m_storageType == NDB_STORAGETYPE_DISK)
+      tmpAttr.AttributeArrayType = NDB_ARRAYTYPE_FIXED;
+    else
+      tmpAttr.AttributeArrayType = col->m_arrayType;
+
+    if(col->m_pk)
+      tmpAttr.AttributeStorageType = NDB_STORAGETYPE_MEMORY;      
+    else
+      tmpAttr.AttributeStorageType = col->m_storageType;
 
+    if(col->getBlobType())
+      tmpAttr.AttributeStorageType = NDB_STORAGETYPE_MEMORY;      
+    
     // check type and compute attribute size and array size
     if (! tmpAttr.translateExtType()) {
       m_error.code= 703;
@@ -1682,8 +2844,14 @@ NdbDictInterface::createOrAlterTable(Ndb
     }
     // distribution key not supported for Char attribute
     if (distKeys && col->m_distributionKey && col->m_cs != NULL) {
-      m_error.code= 745;
-      DBUG_RETURN(-1);
+      // we can allow this for non-var char where strxfrm does nothing
+      if (col->m_type == NdbDictionary::Column::Char &&
+          (col->m_cs->state & MY_CS_BINSORT))
+        ;
+      else {
+        m_error.code= 745;
+        DBUG_RETURN(-1);
+      }
     }
     // charset in upper half of precision
     if (col->getCharType()) {
@@ -1701,137 +2869,89 @@ NdbDictInterface::createOrAlterTable(Ndb
     w.add(DictTabInfo::AttributeEnd, 1);
   }
 
-  NdbApiSignal tSignal(m_reference);
-  tSignal.theReceiversBlockNumber = DBDICT;
+  int ret;
   
   LinearSectionPtr ptr[1];
   ptr[0].p = (Uint32*)m_buffer.get_data();
   ptr[0].sz = m_buffer.length() / 4;
-  int ret;
-  if (alter)
-  {
-    AlterTableReq * const req = 
-      CAST_PTR(AlterTableReq, tSignal.getDataPtrSend());
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  if (alter) {
+    tSignal.theVerId_signalNumber   = GSN_ALTER_TABLE_REQ;
+    tSignal.theLength = AlterTableReq::SignalLength;
+
+    AlterTableReq * req = CAST_PTR(AlterTableReq, tSignal.getDataPtrSend());
     
     req->senderRef = m_reference;
     req->senderData = 0;
     req->changeMask = impl.m_changeMask;
-    req->tableId = impl.m_tableId;
+    req->tableId = impl.m_id;
     req->tableVersion = impl.m_version;;
-    tSignal.theVerId_signalNumber   = GSN_ALTER_TABLE_REQ;
-    tSignal.theLength = AlterTableReq::SignalLength;
-    ret= alterTable(&tSignal, ptr);
-  }
-  else
-  {
-    CreateTableReq * const req = 
-      CAST_PTR(CreateTableReq, tSignal.getDataPtrSend());
+
+    int errCodes[] = { AlterTableRef::NotMaster, AlterTableRef::Busy, 0 };
+    ret = dictSignal(&tSignal, ptr, 1,
+		     0, // master
+		     WAIT_ALTER_TAB_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100,
+		     errCodes);
     
-    req->senderRef = m_reference;
-    req->senderData = 0;
+    if(m_error.code == AlterTableRef::InvalidTableVersion) {
+      // Clear caches and try again
+      DBUG_RETURN(INCOMPATIBLE_VERSION);
+    }
+  } else {
     tSignal.theVerId_signalNumber   = GSN_CREATE_TABLE_REQ;
     tSignal.theLength = CreateTableReq::SignalLength;
-    ret= createTable(&tSignal, ptr);
-
-    if (ret)
-      DBUG_RETURN(ret);
 
-    if (haveAutoIncrement) {
-      if (ndb.setAutoIncrementValue(impl.m_externalName.c_str(),
-				    autoIncrementValue, false) == -1) {
-        DBUG_ASSERT(ndb.theError.code != 0);
-        m_error= ndb.theError;
-	ret = -1;
-      }
-    }
+    CreateTableReq * req = CAST_PTR(CreateTableReq, tSignal.getDataPtrSend());
+    req->senderRef = m_reference;
+    req->senderData = 0;
+    int errCodes[] = { CreateTableRef::Busy, CreateTableRef::NotMaster, 0 };
+    ret = dictSignal(&tSignal, ptr, 1,
+		     0, // master node
+		     WAIT_CREATE_INDX_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100,
+		     errCodes);
   }
+  
   DBUG_RETURN(ret);
 }
 
-int
-NdbDictInterface::createTable(NdbApiSignal* signal, LinearSectionPtr ptr[3])
-{
-#if DEBUG_PRINT
-  ndbout_c("BufferLen = %d", ptr[0].sz);
-  SimplePropertiesLinearReader r(ptr[0].p, ptr[0].sz);
-  r.printAll(ndbout);
-#endif
-  const int noErrCodes = 2;
-  int errCodes[noErrCodes] = 
-     {CreateTableRef::Busy,
-      CreateTableRef::NotMaster};
-  return dictSignal(signal,ptr,1,
-		    1/*use masternode id*/,
-		    100,
-		    WAIT_CREATE_INDX_REQ,
-		    WAITFOR_RESPONSE_TIMEOUT,
-		    errCodes,noErrCodes);
-}
-
-
 void
 NdbDictInterface::execCREATE_TABLE_CONF(NdbApiSignal * signal,
 					LinearSectionPtr ptr[3])
 {
-#if 0
   const CreateTableConf* const conf=
     CAST_CONSTPTR(CreateTableConf, signal->getDataPtr());
-  Uint32 tableId= conf->tableId;
-  Uint32 tableVersion= conf->tableVersion;
-#endif
+  m_buffer.grow(4 * 2); // 2 words
+  Uint32* data = (Uint32*)m_buffer.get_data();
+  data[0] = conf->tableId;
+  data[1] = conf->tableVersion;
   m_waiter.signal(NO_WAIT);  
 }
 
 void
-NdbDictInterface::execCREATE_TABLE_REF(NdbApiSignal * signal,
+NdbDictInterface::execCREATE_TABLE_REF(NdbApiSignal * sig,
 				       LinearSectionPtr ptr[3])
 {
-  const CreateTableRef* const ref=
-    CAST_CONSTPTR(CreateTableRef, signal->getDataPtr());
+  const CreateTableRef* ref = CAST_CONSTPTR(CreateTableRef, sig->getDataPtr());
   m_error.code= ref->errorCode;
   m_masterNodeId = ref->masterNodeId;
   m_waiter.signal(NO_WAIT);  
 }
 
-int
-NdbDictInterface::alterTable(NdbApiSignal* signal, LinearSectionPtr ptr[3])
-{
-#if DEBUG_PRINT
-  ndbout_c("BufferLen = %d", ptr[0].sz);
-  SimplePropertiesLinearReader r(ptr[0].p, ptr[0].sz);
-  r.printAll(ndbout);
-#endif
-  const int noErrCodes = 2;
-  int errCodes[noErrCodes] =
-    {AlterTableRef::NotMaster,
-     AlterTableRef::Busy};
-  int r = dictSignal(signal,ptr,1,
-		     1/*use masternode id*/,
-		     100,WAIT_ALTER_TAB_REQ,
-		     WAITFOR_RESPONSE_TIMEOUT,
-		     errCodes, noErrCodes);
-  if(m_error.code == AlterTableRef::InvalidTableVersion) {
-    // Clear caches and try again
-    return INCOMPATIBLE_VERSION;
-  }
-
-  return r;
-}
-
 void
 NdbDictInterface::execALTER_TABLE_CONF(NdbApiSignal * signal,
                                        LinearSectionPtr ptr[3])
 {
-  //AlterTableConf* const conf = CAST_CONSTPTR(AlterTableConf, signal->getDataPtr());
   m_waiter.signal(NO_WAIT);
 }
 
 void
-NdbDictInterface::execALTER_TABLE_REF(NdbApiSignal * signal,
+NdbDictInterface::execALTER_TABLE_REF(NdbApiSignal * sig,
 				      LinearSectionPtr ptr[3])
 {
-  const AlterTableRef * const ref = 
-    CAST_CONSTPTR(AlterTableRef, signal->getDataPtr());
+  const AlterTableRef * ref = CAST_CONSTPTR(AlterTableRef, sig->getDataPtr());
   m_error.code= ref->errorCode;
   m_masterNodeId = ref->masterNodeId;
   m_waiter.signal(NO_WAIT);
@@ -1845,6 +2965,7 @@ NdbDictionaryImpl::dropTable(const char 
 {
   DBUG_ENTER("NdbDictionaryImpl::dropTable");
   DBUG_PRINT("enter",("name: %s", name));
+  ASSERT_NOT_MYSQLD;
   NdbTableImpl * tab = getTable(name);
   if(tab == 0){
     DBUG_RETURN(-1);
@@ -1854,16 +2975,14 @@ NdbDictionaryImpl::dropTable(const char 
   // we must clear the cache and try again
   if (ret == INCOMPATIBLE_VERSION) {
     const BaseString internalTableName(m_ndb.internalize_table_name(name));
-
     DBUG_PRINT("info",("INCOMPATIBLE_VERSION internal_name: %s", internalTableName.c_str()));
     m_localHash.drop(internalTableName.c_str());
     m_globalHash->lock();
-    tab->m_status = NdbDictionary::Object::Invalid;
-    m_globalHash->drop(tab);
+    m_globalHash->release(tab, 1);
     m_globalHash->unlock();
     DBUG_RETURN(dropTable(name));
   }
-
+  
   DBUG_RETURN(ret);
 }
 
@@ -1876,13 +2995,14 @@ NdbDictionaryImpl::dropTable(NdbTableImp
     return dropTable(name);
   }
 
-  if (impl.m_indexType != NdbDictionary::Index::Undefined) {
+  if (impl.m_indexType != NdbDictionary::Object::TypeUndefined)
+  {
     m_receiver.m_error.code= 1228;
     return -1;
   }
 
   List list;
-  if ((res = listIndexes(list, impl.m_tableId)) == -1){
+  if ((res = listIndexes(list, impl.m_id)) == -1){
     return -1;
   }
   for (unsigned i = 0; i < list.count; i++) {
@@ -1900,14 +3020,13 @@ NdbDictionaryImpl::dropTable(NdbTableImp
   }
   
   int ret = m_receiver.dropTable(impl);  
-  if(ret == 0 || m_error.code == 709){
+  if(ret == 0 || m_error.code == 709 || m_error.code == 723){
     const char * internalTableName = impl.m_internalName.c_str();
 
     
     m_localHash.drop(internalTableName);
     m_globalHash->lock();
-    impl.m_status = NdbDictionary::Object::Invalid;
-    m_globalHash->drop(&impl);
+    m_globalHash->release(&impl, 1);
     m_globalHash->unlock();
 
     return 0;
@@ -1917,6 +3036,49 @@ NdbDictionaryImpl::dropTable(NdbTableImp
 }
 
 int
+NdbDictionaryImpl::dropTableGlobal(NdbTableImpl & impl)
+{
+  int res;
+  DBUG_ENTER("NdbDictionaryImpl::dropTableGlobal");
+  DBUG_ASSERT(impl.m_status != NdbDictionary::Object::New);
+  DBUG_ASSERT(impl.m_indexType == NdbDictionary::Object::TypeUndefined);
+
+  List list;
+  if ((res = listIndexes(list, impl.m_id)) == -1){
+    ERR_RETURN(getNdbError(), -1);
+  }
+  for (unsigned i = 0; i < list.count; i++) {
+    const List::Element& element = list.elements[i];
+    NdbIndexImpl *idx= getIndexGlobal(element.name, impl);
+    if (idx == NULL)
+    {
+      ERR_RETURN(getNdbError(), -1);
+    }
+    if ((res = dropIndexGlobal(*idx)) == -1)
+    {
+      releaseIndexGlobal(*idx, 1);
+      ERR_RETURN(getNdbError(), -1);
+    }
+    releaseIndexGlobal(*idx, 1);
+  }
+  
+  if (impl.m_noOfBlobs != 0) {
+    if (dropBlobTables(impl) != 0){
+      ERR_RETURN(getNdbError(), -1);
+    }
+  }
+  
+  int ret = m_receiver.dropTable(impl);  
+  impl.m_status = NdbDictionary::Object::Invalid;
+  if(ret == 0 || m_error.code == 709 || m_error.code == 723)
+  {
+    DBUG_RETURN(0);
+  }
+  
+  ERR_RETURN(getNdbError(), ret);
+}
+
+int
 NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t)
 {
   DBUG_ENTER("NdbDictionaryImpl::dropBlobTables");
@@ -1924,15 +3086,21 @@ NdbDictionaryImpl::dropBlobTables(NdbTab
     NdbColumnImpl & c = *t.m_columns[i];
     if (! c.getBlobType() || c.getPartSize() == 0)
       continue;
-    char btname[NdbBlobImpl::BlobTableNameSize];
-    NdbBlob::getBlobTableName(btname, &t, &c);
-    if (dropTable(btname) != 0) {
-      if (m_error.code != 709){
-	DBUG_PRINT("exit",("error %u - exiting",m_error.code));
-        DBUG_RETURN(-1);
-      }
-      DBUG_PRINT("info",("error %u - continuing",m_error.code));
+    NdbTableImpl* bt = c.m_blobTable;
+    if (bt == NULL) {
+      DBUG_PRINT("info", ("col %s: blob table pointer is NULL",
+                          c.m_name.c_str()));
+      continue; // "force" mode on
+    }
+    // drop directly - by-pass cache
+    int ret = m_receiver.dropTable(*c.m_blobTable);
+    if (ret != 0) {
+      DBUG_PRINT("info", ("col %s: blob table %s: error %d",
+                 c.m_name.c_str(), bt->m_internalName.c_str(), m_error.code));
+      if (! (ret == 709 || ret == 723)) // "force" mode on
+        ERR_RETURN(getNdbError(), -1);
     }
+    // leave c.m_blobTable defined
   }
   DBUG_RETURN(0);
 }
@@ -1945,28 +3113,21 @@ NdbDictInterface::dropTable(const NdbTab
   tSignal.theVerId_signalNumber   = GSN_DROP_TABLE_REQ;
   tSignal.theLength = DropTableReq::SignalLength;
   
-  DropTableReq * const req = CAST_PTR(DropTableReq, tSignal.getDataPtrSend());
+  DropTableReq * req = CAST_PTR(DropTableReq, tSignal.getDataPtrSend());
   req->senderRef = m_reference;
   req->senderData = 0;
-  req->tableId = impl.m_tableId;
+  req->tableId = impl.m_id;
   req->tableVersion = impl.m_version;
 
-  return dropTable(&tSignal, 0);
-}
-
-int
-NdbDictInterface::dropTable(NdbApiSignal* signal, LinearSectionPtr ptr[3])
-{
-  const int noErrCodes = 3;
-  int errCodes[noErrCodes] =
-         {DropTableRef::NoDropTableRecordAvailable,
-          DropTableRef::NotMaster,
-          DropTableRef::Busy};
-  int r = dictSignal(signal,NULL,0,
-		     1/*use masternode id*/,
-		     100,WAIT_DROP_TAB_REQ,
-		     WAITFOR_RESPONSE_TIMEOUT,
-		     errCodes, noErrCodes);
+  int errCodes[] =
+    { DropTableRef::NoDropTableRecordAvailable,
+      DropTableRef::NotMaster,
+      DropTableRef::Busy, 0 };
+  int r = dictSignal(&tSignal, 0, 0,
+		     0, // master
+		     WAIT_DROP_TAB_REQ, 
+		     DICT_WAITFOR_TIMEOUT, 100,
+		     errCodes);
   if(m_error.code == DropTableRef::InvalidTableVersion) {
     // Clear caches and try again
     return INCOMPATIBLE_VERSION;
@@ -1990,7 +3151,7 @@ NdbDictInterface::execDROP_TABLE_REF(Ndb
 				      LinearSectionPtr ptr[3])
 {
   DBUG_ENTER("NdbDictInterface::execDROP_TABLE_REF");
-  const DropTableRef* const ref = CAST_CONSTPTR(DropTableRef, signal->getDataPtr());
+  const DropTableRef* ref = CAST_CONSTPTR(DropTableRef, signal->getDataPtr());
   m_error.code= ref->errorCode;
   m_masterNodeId = ref->masterNodeId;
   m_waiter.signal(NO_WAIT);  
@@ -2003,10 +3164,10 @@ NdbDictionaryImpl::invalidateObject(NdbT
   const char * internalTableName = impl.m_internalName.c_str();
   DBUG_ENTER("NdbDictionaryImpl::invalidateObject");
   DBUG_PRINT("enter", ("internal_name: %s", internalTableName));
+
   m_localHash.drop(internalTableName);
   m_globalHash->lock();
-  impl.m_status = NdbDictionary::Object::Invalid;
-  m_globalHash->drop(&impl);
+  m_globalHash->release(&impl, 1);
   m_globalHash->unlock();
   DBUG_RETURN(0);
 }
@@ -2015,67 +3176,22 @@ int
 NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl)
 {
   const char * internalTableName = impl.m_internalName.c_str();
+  DBUG_ENTER("NdbDictionaryImpl::removeCachedObject");
+  DBUG_PRINT("enter", ("internal_name: %s", internalTableName));
 
   m_localHash.drop(internalTableName);  
   m_globalHash->lock();
   m_globalHash->release(&impl);
   m_globalHash->unlock();
-  return 0;
-}
-
-/*****************************************************************
- * Get index info
- */
-NdbIndexImpl*
-NdbDictionaryImpl::getIndexImpl(const char * externalName,
-				const BaseString& internalName)
-{
-  Ndb_local_table_info * info = get_local_table_info(internalName,
-						     false);
-  if(info == 0){
-    m_error.code = 4243;
-    return 0;
-  }
-  NdbTableImpl * tab = info->m_table_impl;
-
-  if(tab->m_indexType == NdbDictionary::Index::Undefined){
-    // Not an index
-    m_error.code = 4243;
-    return 0;
-  }
-
-  NdbTableImpl* prim = getTable(tab->m_primaryTable.c_str());
-  if(prim == 0){
-    m_error.code = 4243;
-    return 0;
-  }
-
-  /**
-   * Create index impl
-   */
-  NdbIndexImpl* idx;
-  if(NdbDictInterface::create_index_obj_from_table(&idx, tab, prim) == 0){
-    idx->m_table = tab;
-    if (!idx->m_externalName.assign(externalName) ||
-        !idx->m_internalName.assign(internalName))
-    {
-      delete idx;
-      m_error.code = 4000;
-      return 0;
-    }
-    // TODO Assign idx to tab->m_index
-    // Don't do it right now since assign can't asign a table with index
-    // tab->m_index = idx;
-    return idx;
-  }
-  m_error.code = 4000;
-  return 0;
+  DBUG_RETURN(0);
 }
 
 int
 NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst,
 					      NdbTableImpl* tab,
-					      const NdbTableImpl* prim){
+					      const NdbTableImpl* prim)
+{
+  DBUG_ENTER("NdbDictInterface::create_index_obj_from_table");
   NdbIndexImpl *idx = new NdbIndexImpl();
   if (idx == NULL)
   {
@@ -2084,7 +3200,7 @@ NdbDictInterface::create_index_obj_from_
   }
   idx->m_version = tab->m_version;
   idx->m_status = tab->m_status;
-  idx->m_indexId = tab->m_tableId;
+  idx->m_id = tab->m_id;
   if (!idx->m_externalName.assign(tab->getName()) ||
       !idx->m_tableName.assign(prim->m_externalName))
   {
@@ -2092,8 +3208,9 @@ NdbDictInterface::create_index_obj_from_
     errno = ENOMEM;
     return -1;
   }
-  NdbDictionary::Index::Type type = idx->m_type = tab->m_indexType;
+  NdbDictionary::Object::Type type = idx->m_type = tab->m_indexType;
   idx->m_logging = tab->m_logging;
+  idx->m_temporary = tab->m_temporary;
   // skip last attribute (NDB$PK or NDB$TNODE)
 
   const Uint32 distKeys = prim->m_noOfDistributionKeys;
@@ -2129,7 +3246,7 @@ NdbDictInterface::create_index_obj_from_
     idx->m_key_ids[key_id] = i;
     col->m_keyInfoPos = key_id;
 
-    if(type == NdbDictionary::Index::OrderedIndex && 
+    if(type == NdbDictionary::Object::OrderedIndex && 
        (primCol->m_distributionKey ||
 	(distKeys == 0 && primCol->getPrimaryKey())))
     {
@@ -2148,8 +3265,12 @@ NdbDictInterface::create_index_obj_from_
       tab->m_columns[i]->m_distributionKey = 0;
   }
 
+  idx->m_table_id = prim->getObjectId();
+  idx->m_table_version = prim->getObjectVersion();
+  
   * dst = idx;
-  return 0;
+  DBUG_PRINT("exit", ("m_id: %d  m_version: %d", idx->m_id, idx->m_version));
+  DBUG_RETURN(0);
 }
 
 /*****************************************************************
@@ -2158,6 +3279,7 @@ NdbDictInterface::create_index_obj_from_
 int
 NdbDictionaryImpl::createIndex(NdbIndexImpl &ix)
 {
+  ASSERT_NOT_MYSQLD;
   NdbTableImpl* tab = getTable(ix.getTable());
   if(tab == 0){
     m_error.code = 4249;
@@ -2167,9 +3289,15 @@ NdbDictionaryImpl::createIndex(NdbIndexI
   return m_receiver.createIndex(m_ndb, ix, * tab);
 }
 
+int
+NdbDictionaryImpl::createIndex(NdbIndexImpl &ix, NdbTableImpl &tab)
+{
+  return m_receiver.createIndex(m_ndb, ix, tab);
+}
+
 int 
 NdbDictInterface::createIndex(Ndb & ndb,
-			      NdbIndexImpl & impl, 
+			      const NdbIndexImpl & impl, 
 			      const NdbTableImpl & table)
 {
   //validate();
@@ -2183,14 +3311,9 @@ NdbDictInterface::createIndex(Ndb & ndb,
   }
   const BaseString internalName(
     ndb.internalize_index_name(&table, impl.getName()));
-  if (!impl.m_internalName.assign(internalName))
-  {
-    m_error.code = 4000;
-    return -1;
-  }
-
   w.add(DictTabInfo::TableName, internalName.c_str());
   w.add(DictTabInfo::TableLoggedFlag, impl.m_logging);
+  w.add(DictTabInfo::TableTemporaryFlag, impl.m_temporary);
 
   NdbApiSignal tSignal(m_reference);
   tSignal.theReceiversBlockNumber = DBDICT;
@@ -2213,7 +3336,7 @@ NdbDictInterface::createIndex(Ndb & ndb,
   }
   req->setIndexType((DictTabInfo::TableType) it);
   
-  req->setTableId(table.m_tableId);
+  req->setTableId(table.m_id);
   req->setOnline(true);
   AttributeList attributeList;
   attributeList.sz = impl.m_columns.size();
@@ -2224,7 +3347,7 @@ NdbDictInterface::createIndex(Ndb & ndb,
       m_error.code = 4247;
       return -1;
     }
-    // Copy column definition
+    // Copy column definition  XXX must be wrong, overwrites
     *impl.m_columns[i] = *col;
 
     // index key type check
@@ -2237,44 +3360,35 @@ NdbDictInterface::createIndex(Ndb & ndb,
       m_error.code = err;
       return -1;
     }
-    attributeList.id[i] = col->m_attrId;
+    // API uses external column number to talk to DICT
+    attributeList.id[i] = col->m_column_no;
   }
   LinearSectionPtr ptr[2];
   ptr[0].p = (Uint32*)&attributeList;
   ptr[0].sz = 1 + attributeList.sz;
   ptr[1].p = (Uint32*)m_buffer.get_data();
   ptr[1].sz = m_buffer.length() >> 2;                //BUG?
-  return createIndex(&tSignal, ptr);
-}
 
-int
-NdbDictInterface::createIndex(NdbApiSignal* signal, 
-			      LinearSectionPtr ptr[3])
-{
-  const int noErrCodes = 2;
-  int errCodes[noErrCodes] = {CreateIndxRef::Busy, CreateIndxRef::NotMaster};
-  return dictSignal(signal,ptr,2,
-		    1 /*use masternode id*/,
-		    100,
+  int errCodes[] = { CreateIndxRef::Busy, CreateIndxRef::NotMaster, 0 };
+  return dictSignal(&tSignal, ptr, 2,
+		    0, // master
 		    WAIT_CREATE_INDX_REQ,
-		    -1,
-		    errCodes,noErrCodes);
+		    DICT_WAITFOR_TIMEOUT, 100,
+		    errCodes);
 }
 
 void
 NdbDictInterface::execCREATE_INDX_CONF(NdbApiSignal * signal,
 				       LinearSectionPtr ptr[3])
 {
-  //CreateTableConf* const conf = CAST_CONSTPTR(CreateTableConf, signal->getDataPtr());
-  
   m_waiter.signal(NO_WAIT);  
 }
 
 void
-NdbDictInterface::execCREATE_INDX_REF(NdbApiSignal * signal,
+NdbDictInterface::execCREATE_INDX_REF(NdbApiSignal * sig,
 				      LinearSectionPtr ptr[3])
 {
-  const CreateIndxRef* const ref = CAST_CONSTPTR(CreateIndxRef, signal->getDataPtr());
+  const CreateIndxRef* ref = CAST_CONSTPTR(CreateIndxRef, sig->getDataPtr());
   m_error.code = ref->getErrorCode();
   if(m_error.code == ref->NotMaster)
     m_masterNodeId= ref->masterNodeId;
@@ -2288,12 +3402,13 @@ int
 NdbDictionaryImpl::dropIndex(const char * indexName, 
 			     const char * tableName)
 {
+  ASSERT_NOT_MYSQLD;
   NdbIndexImpl * idx = getIndex(indexName, tableName);
   if (idx == 0) {
     m_error.code = 4243;
     return -1;
   }
-  int ret = dropIndex(*idx);
+  int ret = dropIndex(*idx, tableName);
   // If index stored in cache is incompatible with the one in the kernel
   // we must clear the cache and try again
   if (ret == INCOMPATIBLE_VERSION) {
@@ -2305,8 +3420,7 @@ NdbDictionaryImpl::dropIndex(const char 
 
     m_localHash.drop(internalIndexName.c_str());
     m_globalHash->lock();
-    idx->m_table->m_status = NdbDictionary::Object::Invalid;
-    m_globalHash->drop(idx->m_table);
+    m_globalHash->release(idx->m_table, 1);
     m_globalHash->unlock();
     return dropIndex(indexName, tableName);
   }
@@ -2315,29 +3429,62 @@ NdbDictionaryImpl::dropIndex(const char 
 }
 
 int
-NdbDictionaryImpl::dropIndex(NdbIndexImpl & impl)
+NdbDictionaryImpl::dropIndex(NdbIndexImpl & impl, const char * tableName)
 {
+  const char * indexName = impl.getName();
+  if (tableName || m_ndb.usingFullyQualifiedNames()) {
     NdbTableImpl * timpl = impl.m_table;
     
     if (timpl == 0) {
       m_error.code = 709;
       return -1;
     }
-    int ret = m_receiver.dropIndex(impl, *timpl);
-    if(ret == 0){
-      m_localHash.drop(timpl->m_internalName.c_str());
+
+    const BaseString internalIndexName((tableName)
+      ?
+      m_ndb.internalize_index_name(getTable(tableName), indexName)
+      :
+      m_ndb.internalize_table_name(indexName)); // Index is also a table
+
+    if(impl.m_status == NdbDictionary::Object::New){
+      return dropIndex(indexName, tableName);
+    }
+
+    int ret= dropIndexGlobal(impl);
+    if (ret == 0)
+    {
       m_globalHash->lock();
-      timpl->m_status = NdbDictionary::Object::Invalid;
-      m_globalHash->drop(timpl);
+      m_globalHash->release(impl.m_table, 1);
       m_globalHash->unlock();
+      m_localHash.drop(internalIndexName.c_str());
     }
     return ret;
+  }
+
+  m_error.code = 4243;
+  return -1;
+}
+
+int
+NdbDictionaryImpl::dropIndexGlobal(NdbIndexImpl & impl)
+{
+  DBUG_ENTER("NdbDictionaryImpl::dropIndexGlobal");
+  int ret = m_receiver.dropIndex(impl, *impl.m_table);
+  impl.m_status = NdbDictionary::Object::Invalid;
+  if(ret == 0)
+  {
+    DBUG_RETURN(0);
+  }
+  ERR_RETURN(getNdbError(), ret);
 }
 
 int
 NdbDictInterface::dropIndex(const NdbIndexImpl & impl, 
 			    const NdbTableImpl & timpl)
 {
+  DBUG_ENTER("NdbDictInterface::dropIndex");
+  DBUG_PRINT("enter", ("indexId: %d  indexVersion: %d",
+                       timpl.m_id, timpl.m_version));
   NdbApiSignal tSignal(m_reference);
   tSignal.theReceiversBlockNumber = DBDICT;
   tSignal.theVerId_signalNumber   = GSN_DROP_INDX_REQ;
@@ -2348,28 +3495,20 @@ NdbDictInterface::dropIndex(const NdbInd
   req->setConnectionPtr(0);
   req->setRequestType(DropIndxReq::RT_USER);
   req->setTableId(~0);  // DICT overwrites
-  req->setIndexId(timpl.m_tableId);
+  req->setIndexId(timpl.m_id);
   req->setIndexVersion(timpl.m_version);
 
-  return dropIndex(&tSignal, 0);
-}
-
-int
-NdbDictInterface::dropIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3])
-{
-  const int noErrCodes = 2;
-  int errCodes[noErrCodes] = {DropIndxRef::Busy, DropIndxRef::NotMaster};
-  int r = dictSignal(signal,NULL,0,
-		     1/*Use masternode id*/,
-		     100,
+  int errCodes[] = { DropIndxRef::Busy, DropIndxRef::NotMaster, 0 };
+  int r = dictSignal(&tSignal, 0, 0,
+		     0, // master
 		     WAIT_DROP_INDX_REQ,
-		     WAITFOR_RESPONSE_TIMEOUT,
-		     errCodes,noErrCodes);
+		     DICT_WAITFOR_TIMEOUT, 100,
+		     errCodes);
   if(m_error.code == DropIndxRef::InvalidIndexVersion) {
     // Clear caches and try again
-    return INCOMPATIBLE_VERSION;
+    ERR_RETURN(m_error, INCOMPATIBLE_VERSION);
   }
-  return r;
+  ERR_RETURN(m_error, r);
 }
 
 void
@@ -2383,7 +3522,7 @@ void
 NdbDictInterface::execDROP_INDX_REF(NdbApiSignal * signal,
 				      LinearSectionPtr ptr[3])
 {
-  const DropIndxRef* const ref = CAST_CONSTPTR(DropIndxRef, signal->getDataPtr());
+  const DropIndxRef* ref = CAST_CONSTPTR(DropIndxRef, signal->getDataPtr());
   m_error.code = ref->getErrorCode();
   if(m_error.code == ref->NotMaster)
     m_masterNodeId= ref->masterNodeId;
@@ -2391,6 +3530,696 @@ NdbDictInterface::execDROP_INDX_REF(NdbA
 }
 
 /*****************************************************************
+ * Create event
+ */
+
+int
+NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
+{
+  DBUG_ENTER("NdbDictionaryImpl::createEvent");
+  int i;
+  NdbTableImpl* tab= evnt.m_tableImpl;
+  if (tab == 0)
+  {
+    tab= getTable(evnt.getTableName());
+    if(tab == 0){
+      DBUG_PRINT("info",("NdbDictionaryImpl::createEvent: table not found: %s",
+			 evnt.getTableName()));
+      ERR_RETURN(getNdbError(), -1);
+    }
+    evnt.setTable(tab);
+  }
+
+  DBUG_PRINT("info",("Table: id: %d version: %d", tab->m_id, tab->m_version));
+
+  NdbTableImpl &table = *evnt.m_tableImpl;
+
+  int attributeList_sz = evnt.m_attrIds.size();
+
+  for (i = 0; i < attributeList_sz; i++) {
+    NdbColumnImpl *col_impl = table.getColumn(evnt.m_attrIds[i]);
+    if (col_impl) {
+      evnt.m_facade->addColumn(*(col_impl->m_facade));
+    } else {
+      ndbout_c("Attr id %u in table %s not found", evnt.m_attrIds[i],
+	       evnt.getTableName());
+      m_error.code= 4713;
+      ERR_RETURN(getNdbError(), -1);
+    }
+  }
+
+  evnt.m_attrIds.clear();
+
+  attributeList_sz = evnt.m_columns.size();
+
+  DBUG_PRINT("info",("Event on tableId=%d, tableVersion=%d, event name %s, no of columns %d",
+		     table.m_id, table.m_version,
+		     evnt.m_name.c_str(),
+		     evnt.m_columns.size()));
+
+  int pk_count = 0;
+  evnt.m_attrListBitmask.clear();
+
+  for(i = 0; i<attributeList_sz; i++){
+    const NdbColumnImpl* col = 
+      table.getColumn(evnt.m_columns[i]->m_name.c_str());
+    if(col == 0){
+      m_error.code= 4247;
+      ERR_RETURN(getNdbError(), -1);
+    }
+    // Copy column definition
+    *evnt.m_columns[i] = *col;
+    
+    if(col->m_pk){
+      pk_count++;
+    }
+    
+    evnt.m_attrListBitmask.set(col->m_attrId);
+  }
+  
+  // Sort index attributes according to primary table (using insertion sort)
+  for(i = 1; i < attributeList_sz; i++) {
+    NdbColumnImpl* temp = evnt.m_columns[i];
+    unsigned int j = i;
+    while((j > 0) && (evnt.m_columns[j - 1]->m_attrId > temp->m_attrId)) {
+      evnt.m_columns[j] = evnt.m_columns[j - 1];
+      j--;
+    }
+    evnt.m_columns[j] = temp;
+  }
+  // Check for illegal duplicate attributes
+  for(i = 1; i<attributeList_sz; i++) {
+    if (evnt.m_columns[i-1]->m_attrId == evnt.m_columns[i]->m_attrId) {
+      m_error.code= 4258;
+      ERR_RETURN(getNdbError(), -1);
+    }
+  }
+  
+#ifdef EVENT_DEBUG
+  char buf[128] = {0};
+  evnt.m_attrListBitmask.getText(buf);
+  ndbout_c("createEvent: mask = %s", buf);
+#endif
+
+  // NdbDictInterface m_receiver;
+  if (m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */) != 0)
+    ERR_RETURN(getNdbError(), -1);
+
+  // Create blob events
+  if (evnt.m_mergeEvents && createBlobEvents(evnt) != 0) {
+    int save_code = m_error.code;
+    (void)dropEvent(evnt.m_name.c_str());
+    m_error.code = save_code;
+    ERR_RETURN(getNdbError(), -1);
+  }
+  DBUG_RETURN(0);
+}
+
+int
+NdbDictionaryImpl::createBlobEvents(NdbEventImpl& evnt)
+{
+  DBUG_ENTER("NdbDictionaryImpl::createBlobEvents");
+  NdbTableImpl& t = *evnt.m_tableImpl;
+  Uint32 n = t.m_noOfBlobs;
+  Uint32 i;
+  for (i = 0; i < evnt.m_columns.size() && n > 0; i++) {
+    NdbColumnImpl & c = *evnt.m_columns[i];
+    if (! c.getBlobType() || c.getPartSize() == 0)
+      continue;
+    n--;
+    NdbEventImpl blob_evnt;
+    NdbBlob::getBlobEvent(blob_evnt, &evnt, &c);
+    if (createEvent(blob_evnt) != 0)
+      ERR_RETURN(getNdbError(), -1);
+  }
+  DBUG_RETURN(0);
+}
+
+int
+NdbDictInterface::createEvent(class Ndb & ndb,
+			      NdbEventImpl & evnt,
+			      int getFlag)
+{
+  DBUG_ENTER("NdbDictInterface::createEvent");
+  DBUG_PRINT("enter",("getFlag=%d",getFlag));
+
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_CREATE_EVNT_REQ;
+  if (getFlag)
+    tSignal.theLength = CreateEvntReq::SignalLengthGet;
+  else
+    tSignal.theLength = CreateEvntReq::SignalLengthCreate;
+
+  CreateEvntReq * const req = CAST_PTR(CreateEvntReq, tSignal.getDataPtrSend());
+  
+  req->setUserRef(m_reference);
+  req->setUserData(0);
+
+  if (getFlag) {
+    // getting event from Dictionary
+    req->setRequestType(CreateEvntReq::RT_USER_GET);
+  } else {
+    DBUG_PRINT("info",("tableId: %u tableVersion: %u",
+		       evnt.m_tableImpl->m_id, 
+                       evnt.m_tableImpl->m_version));
+    // creating event in Dictionary
+    req->setRequestType(CreateEvntReq::RT_USER_CREATE);
+    req->setTableId(evnt.m_tableImpl->m_id);
+    req->setTableVersion(evnt.m_tableImpl->m_version);
+    req->setAttrListBitmask(evnt.m_attrListBitmask);
+    req->setEventType(evnt.mi_type);
+    req->clearFlags();
+    if (evnt.m_rep & NdbDictionary::Event::ER_ALL)
+      req->setReportAll();
+    if (evnt.m_rep & NdbDictionary::Event::ER_SUBSCRIBE)
+      req->setReportSubscribe();
+  }
+
+  UtilBufferWriter w(m_buffer);
+
+  const size_t len = strlen(evnt.m_name.c_str()) + 1;
+  if(len > MAX_TAB_NAME_SIZE) {
+    m_error.code= 4241;
+    ERR_RETURN(getNdbError(), -1);
+  }
+
+  w.add(SimpleProperties::StringValue, evnt.m_name.c_str());
+
+  if (getFlag == 0)
+  {
+    const BaseString internal_tabname(
+      ndb.internalize_table_name(evnt.m_tableName.c_str()));
+    w.add(SimpleProperties::StringValue,
+	 internal_tabname.c_str());
+  }
+
+  LinearSectionPtr ptr[1];
+  ptr[0].p = (Uint32*)m_buffer.get_data();
+  ptr[0].sz = (m_buffer.length()+3) >> 2;
+
+  int ret = dictSignal(&tSignal,ptr, 1,
+		       0, // master
+		       WAIT_CREATE_INDX_REQ,
+		       DICT_WAITFOR_TIMEOUT, 100,
+		       0, -1);
+
+  if (ret) {
+    ERR_RETURN(getNdbError(), ret);
+  }
+  
+  char *dataPtr = (char *)m_buffer.get_data();
+  unsigned int lenCreateEvntConf = *((unsigned int *)dataPtr);
+  dataPtr += sizeof(lenCreateEvntConf);
+  CreateEvntConf const * evntConf = (CreateEvntConf *)dataPtr;
+  dataPtr += lenCreateEvntConf;
+  
+  //  NdbEventImpl *evntImpl = (NdbEventImpl *)evntConf->getUserData();
+
+  evnt.m_eventId = evntConf->getEventId();
+  evnt.m_eventKey = evntConf->getEventKey();
+  evnt.m_table_id = evntConf->getTableId();
+  evnt.m_table_version = evntConf->getTableVersion();
+
+  if (getFlag) {
+    evnt.m_attrListBitmask = evntConf->getAttrListBitmask();
+    evnt.mi_type           = evntConf->getEventType();
+    evnt.setTable(dataPtr);
+  } else {
+    if ((Uint32) evnt.m_tableImpl->m_id         != evntConf->getTableId() ||
+	evnt.m_tableImpl->m_version    != evntConf->getTableVersion() ||
+	//evnt.m_attrListBitmask != evntConf->getAttrListBitmask() ||
+	evnt.mi_type           != evntConf->getEventType()) {
+      ndbout_c("ERROR*************");
+      ERR_RETURN(getNdbError(), 1);
+    }
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+NdbDictionaryImpl::executeSubscribeEvent(NdbEventOperationImpl & ev_op)
+{
+  // NdbDictInterface m_receiver;
+  return m_receiver.executeSubscribeEvent(m_ndb, ev_op);
+}
+
+int
+NdbDictInterface::executeSubscribeEvent(class Ndb & ndb,
+					NdbEventOperationImpl & ev_op)
+{
+  DBUG_ENTER("NdbDictInterface::executeSubscribeEvent");
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_SUB_START_REQ;
+  tSignal.theLength = SubStartReq::SignalLength2;
+  
+  SubStartReq * req = CAST_PTR(SubStartReq, tSignal.getDataPtrSend());
+
+  req->subscriptionId   = ev_op.m_eventImpl->m_eventId;
+  req->subscriptionKey  = ev_op.m_eventImpl->m_eventKey;
+  req->part             = SubscriptionData::TableData;
+  req->subscriberData   = ev_op.m_oid;
+  req->subscriberRef    = m_reference;
+
+  DBUG_PRINT("info",("GSN_SUB_START_REQ subscriptionId=%d,subscriptionKey=%d,"
+		     "subscriberData=%d",req->subscriptionId,
+		     req->subscriptionKey,req->subscriberData));
+
+  DBUG_RETURN(dictSignal(&tSignal,NULL,0,
+			 0 /*use masternode id*/,
+			 WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
+			 -1, 100,
+			 0, -1));
+}
+
+int
+NdbDictionaryImpl::stopSubscribeEvent(NdbEventOperationImpl & ev_op)
+{
+  // NdbDictInterface m_receiver;
+  return m_receiver.stopSubscribeEvent(m_ndb, ev_op);
+}
+
+int
+NdbDictInterface::stopSubscribeEvent(class Ndb & ndb,
+				     NdbEventOperationImpl & ev_op)
+{
+  DBUG_ENTER("NdbDictInterface::stopSubscribeEvent");
+
+  NdbApiSignal tSignal(m_reference);
+  //  tSignal.theReceiversBlockNumber = SUMA;
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_SUB_STOP_REQ;
+  tSignal.theLength = SubStopReq::SignalLength;
+  
+  SubStopReq * req = CAST_PTR(SubStopReq, tSignal.getDataPtrSend());
+
+  req->subscriptionId  = ev_op.m_eventImpl->m_eventId;
+  req->subscriptionKey = ev_op.m_eventImpl->m_eventKey;
+  req->subscriberData  = ev_op.m_oid;
+  req->part            = (Uint32) SubscriptionData::TableData;
+  req->subscriberRef   = m_reference;
+
+  DBUG_PRINT("info",("GSN_SUB_STOP_REQ subscriptionId=%d,subscriptionKey=%d,"
+		     "subscriberData=%d",req->subscriptionId,
+		     req->subscriptionKey,req->subscriberData));
+
+  DBUG_RETURN(dictSignal(&tSignal,NULL,0,
+			 0 /*use masternode id*/,
+			 WAIT_CREATE_INDX_REQ /*WAIT_SUB_STOP__REQ*/,
+			 -1, 100,
+			 0, -1));
+}
+
+NdbEventImpl * 
+NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
+{
+  DBUG_ENTER("NdbDictionaryImpl::getEvent");
+  DBUG_PRINT("enter",("eventName= %s", eventName));
+
+  NdbEventImpl *ev =  new NdbEventImpl();
+  if (ev == NULL) {
+    DBUG_RETURN(NULL);
+  }
+
+  ev->setName(eventName);
+
+  int ret = m_receiver.createEvent(m_ndb, *ev, 1 /* getFlag set */);
+
+  if (ret) {
+    delete ev;
+    DBUG_RETURN(NULL);
+  }
+
+  // We only have the table name with internal name
+  DBUG_PRINT("info",("table %s", ev->getTableName()));
+  if (tab == NULL)
+  {
+    tab= fetchGlobalTableImplRef(InitTable(this, ev->getTableName()));
+    if (tab == 0)
+    {
+      DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
+      delete ev;
+      DBUG_RETURN(NULL);
+    }
+    if ((tab->m_status != NdbDictionary::Object::Retrieved) ||
+        ((Uint32) tab->m_id != ev->m_table_id) ||
+        (table_version_major(tab->m_version) !=
+         table_version_major(ev->m_table_version)))
+    {
+      DBUG_PRINT("info", ("mismatch on verison in cache"));
+      releaseTableGlobal(*tab, 1);
+      tab= fetchGlobalTableImplRef(InitTable(this, ev->getTableName()));
+      if (tab == 0)
+      {
+        DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
+        delete ev;
+        DBUG_RETURN(NULL);
+      }
+    }
+    ev->setTable(tab);
+    releaseTableGlobal(*tab, 0);
+  }
+  else
+    ev->setTable(tab);
+  tab = 0;
+
+  ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));  
+  // get the columns from the attrListBitmask
+  NdbTableImpl &table = *ev->m_tableImpl;
+  AttributeMask & mask = ev->m_attrListBitmask;
+  unsigned attributeList_sz = mask.count();
+
+  DBUG_PRINT("info",("Table: id: %d version: %d", 
+                     table.m_id, table.m_version));
+
+  if ((Uint32) table.m_id != ev->m_table_id ||
+      table_version_major(table.m_version) !=
+      table_version_major(ev->m_table_version))
+  {
+    m_error.code = 241;
+    delete ev;
+    DBUG_RETURN(NULL);
+  }
+#ifndef DBUG_OFF
+  char buf[128] = {0};
+  mask.getText(buf);
+  DBUG_PRINT("info",("attributeList_sz= %d, mask= %s", 
+                     attributeList_sz, buf));
+#endif
+
+  
+  if ( attributeList_sz > (uint) table.getNoOfColumns() )
+  {
+    m_error.code = 241;
+    DBUG_PRINT("error",("Invalid version, too many columns"));
+    delete ev;
+    DBUG_RETURN(NULL);
+  }
+
+  assert( (int)attributeList_sz <= table.getNoOfColumns() );
+  for(unsigned id= 0; ev->m_columns.size() < attributeList_sz; id++) {
+    if ( id >= (uint) table.getNoOfColumns())
+    {
+      m_error.code = 241;
+      DBUG_PRINT("error",("Invalid version, column %d out of range", id));
+      delete ev;
+      DBUG_RETURN(NULL);
+    }
+    if (!mask.get(id))
+      continue;
+
+    const NdbColumnImpl* col = table.getColumn(id);
+    DBUG_PRINT("info",("column %d %s", id, col->getName()));
+    NdbColumnImpl* new_col = new NdbColumnImpl;
+    // Copy column definition
+    *new_col = *col;
+    ev->m_columns.push_back(new_col);
+  }
+  DBUG_RETURN(ev);
+}
+
+// ev is main event and has been retrieved previously
+NdbEventImpl *
+NdbDictionaryImpl::getBlobEvent(const NdbEventImpl& ev, uint col_no)
+{
+  DBUG_ENTER("NdbDictionaryImpl::getBlobEvent");
+  DBUG_PRINT("enter", ("ev=%s col=%u", ev.m_name.c_str(), col_no));
+
+  NdbTableImpl* tab = ev.m_tableImpl;
+  assert(tab != NULL && col_no < tab->m_columns.size());
+  NdbColumnImpl* col = tab->m_columns[col_no];
+  assert(col != NULL && col->getBlobType() && col->getPartSize() != 0);
+  NdbTableImpl* blob_tab = col->m_blobTable;
+  assert(blob_tab != NULL);
+  char bename[MAX_TAB_NAME_SIZE];
+  NdbBlob::getBlobEventName(bename, &ev, col);
+
+  NdbEventImpl* blob_ev = getEvent(bename, blob_tab);
+  DBUG_RETURN(blob_ev);
+}
+
+void
+NdbDictInterface::execCREATE_EVNT_CONF(NdbApiSignal * signal,
+				       LinearSectionPtr ptr[3])
+{
+  DBUG_ENTER("NdbDictInterface::execCREATE_EVNT_CONF");
+
+  m_buffer.clear();
+  unsigned int len = signal->getLength() << 2;
+  m_buffer.append((char *)&len, sizeof(len));
+  m_buffer.append(signal->getDataPtr(), len);
+
+  if (signal->m_noOfSections > 0) {
+    m_buffer.append((char *)ptr[0].p, strlen((char *)ptr[0].p)+1);
+  }
+
+  const CreateEvntConf * const createEvntConf=
+    CAST_CONSTPTR(CreateEvntConf, signal->getDataPtr());
+
+  Uint32 subscriptionId = createEvntConf->getEventId();
+  Uint32 subscriptionKey = createEvntConf->getEventKey();
+
+  DBUG_PRINT("info",("nodeid=%d,subscriptionId=%d,subscriptionKey=%d",
+		     refToNode(signal->theSendersBlockRef),
+		     subscriptionId,subscriptionKey));
+  m_waiter.signal(NO_WAIT);
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbDictInterface::execCREATE_EVNT_REF(NdbApiSignal * signal,
+				      LinearSectionPtr ptr[3])
+{
+  DBUG_ENTER("NdbDictInterface::execCREATE_EVNT_REF");
+
+  const CreateEvntRef* const ref=
+    CAST_CONSTPTR(CreateEvntRef, signal->getDataPtr());
+  m_error.code= ref->getErrorCode();
+  DBUG_PRINT("error",("error=%d,line=%d,node=%d",ref->getErrorCode(),
+		      ref->getErrorLine(),ref->getErrorNode()));
+  if (m_error.code == CreateEvntRef::NotMaster)
+    m_masterNodeId = ref->getMasterNode();
+  m_waiter.signal(NO_WAIT);
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbDictInterface::execSUB_STOP_CONF(NdbApiSignal * signal,
+				      LinearSectionPtr ptr[3])
+{
+  DBUG_ENTER("NdbDictInterface::execSUB_STOP_CONF");
+  const SubStopConf * const subStopConf=
+    CAST_CONSTPTR(SubStopConf, signal->getDataPtr());
+
+  Uint32 subscriptionId = subStopConf->subscriptionId;
+  Uint32 subscriptionKey = subStopConf->subscriptionKey;
+  Uint32 subscriberData = subStopConf->subscriberData;
+
+  DBUG_PRINT("info",("subscriptionId=%d,subscriptionKey=%d,subscriberData=%d",
+		     subscriptionId,subscriptionKey,subscriberData));
+  m_waiter.signal(NO_WAIT);
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbDictInterface::execSUB_STOP_REF(NdbApiSignal * signal,
+				     LinearSectionPtr ptr[3])
+{
+  DBUG_ENTER("NdbDictInterface::execSUB_STOP_REF");
+  const SubStopRef * const subStopRef=
+    CAST_CONSTPTR(SubStopRef, signal->getDataPtr());
+
+  Uint32 subscriptionId = subStopRef->subscriptionId;
+  Uint32 subscriptionKey = subStopRef->subscriptionKey;
+  Uint32 subscriberData = subStopRef->subscriberData;
+  m_error.code= subStopRef->errorCode;
+
+  DBUG_PRINT("error",("subscriptionId=%d,subscriptionKey=%d,subscriberData=%d,error=%d",
+		      subscriptionId,subscriptionKey,subscriberData,m_error.code));
+  if (m_error.code == SubStopRef::NotMaster)
+    m_masterNodeId = subStopRef->m_masterNodeId;
+  m_waiter.signal(NO_WAIT);
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbDictInterface::execSUB_START_CONF(NdbApiSignal * signal,
+				     LinearSectionPtr ptr[3])
+{
+  DBUG_ENTER("NdbDictInterface::execSUB_START_CONF");
+  const SubStartConf * const subStartConf=
+    CAST_CONSTPTR(SubStartConf, signal->getDataPtr());
+
+  Uint32 subscriptionId = subStartConf->subscriptionId;
+  Uint32 subscriptionKey = subStartConf->subscriptionKey;
+  SubscriptionData::Part part = 
+    (SubscriptionData::Part)subStartConf->part;
+  Uint32 subscriberData = subStartConf->subscriberData;
+
+  switch(part) {
+  case SubscriptionData::MetaData: {
+    DBUG_PRINT("error",("SubscriptionData::MetaData"));
+    m_error.code= 1;
+    break;
+  }
+  case SubscriptionData::TableData: {
+    DBUG_PRINT("info",("SubscriptionData::TableData"));
+    break;
+  }
+  default: {
+    DBUG_PRINT("error",("wrong data"));
+    m_error.code= 2;
+    break;
+  }
+  }
+  DBUG_PRINT("info",("subscriptionId=%d,subscriptionKey=%d,subscriberData=%d",
+		     subscriptionId,subscriptionKey,subscriberData));
+  m_waiter.signal(NO_WAIT);
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbDictInterface::execSUB_START_REF(NdbApiSignal * signal,
+				    LinearSectionPtr ptr[3])
+{
+  DBUG_ENTER("NdbDictInterface::execSUB_START_REF");
+  const SubStartRef * const subStartRef=
+    CAST_CONSTPTR(SubStartRef, signal->getDataPtr());
+  m_error.code= subStartRef->errorCode;
+  if (m_error.code == SubStartRef::NotMaster)
+    m_masterNodeId = subStartRef->m_masterNodeId;
+  m_waiter.signal(NO_WAIT);
+  DBUG_VOID_RETURN;
+}
+
+/*****************************************************************
+ * Drop event
+ */
+int 
+NdbDictionaryImpl::dropEvent(const char * eventName)
+{
+  DBUG_ENTER("NdbDictionaryImpl::dropEvent");
+  DBUG_PRINT("info", ("name=%s", eventName));
+
+  NdbEventImpl *evnt = getEvent(eventName); // allocated
+  if (evnt == NULL) {
+    if (m_error.code != 723 && // no such table
+        m_error.code != 241)   // invalid table
+      DBUG_RETURN(-1);
+    DBUG_PRINT("info", ("no table err=%d, drop by name alone", m_error.code));
+    evnt = new NdbEventImpl();
+    evnt->setName(eventName);
+  }
+  int ret = dropEvent(*evnt);
+  delete evnt;  
+  DBUG_RETURN(ret);
+}
+
+int
+NdbDictionaryImpl::dropEvent(const NdbEventImpl& evnt)
+{
+  if (dropBlobEvents(evnt) != 0)
+    return -1;
+  if (m_receiver.dropEvent(evnt) != 0)
+    return -1;
+  return 0;
+}
+
+int
+NdbDictionaryImpl::dropBlobEvents(const NdbEventImpl& evnt)
+{
+  DBUG_ENTER("NdbDictionaryImpl::dropBlobEvents");
+  if (evnt.m_tableImpl != 0) {
+    const NdbTableImpl& t = *evnt.m_tableImpl;
+    Uint32 n = t.m_noOfBlobs;
+    Uint32 i;
+    for (i = 0; i < evnt.m_columns.size() && n > 0; i++) {
+      const NdbColumnImpl& c = *evnt.m_columns[i];
+      if (! c.getBlobType() || c.getPartSize() == 0)
+        continue;
+      n--;
+      NdbEventImpl* blob_evnt = getBlobEvent(evnt, i);
+      if (blob_evnt == NULL)
+        continue;
+      (void)dropEvent(*blob_evnt);
+      delete blob_evnt;
+    }
+  } else {
+    // loop over MAX_ATTRIBUTES_IN_TABLE ...
+    Uint32 i;
+    DBUG_PRINT("info", ("missing table definition, looping over "
+                        "MAX_ATTRIBUTES_IN_TABLE(%d)",
+                        MAX_ATTRIBUTES_IN_TABLE));
+    for (i = 0; i < MAX_ATTRIBUTES_IN_TABLE; i++) {
+      char bename[MAX_TAB_NAME_SIZE];
+      // XXX should get name from NdbBlob
+      sprintf(bename, "NDB$BLOBEVENT_%s_%u", evnt.getName(), i);
+      NdbEventImpl* bevnt = new NdbEventImpl();
+      bevnt->setName(bename);
+      (void)m_receiver.dropEvent(*bevnt);
+      delete bevnt;
+    }
+  }
+  DBUG_RETURN(0);
+}
+
+int
+NdbDictInterface::dropEvent(const NdbEventImpl &evnt)
+{
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_DROP_EVNT_REQ;
+  tSignal.theLength = DropEvntReq::SignalLength;
+  
+  DropEvntReq * const req = CAST_PTR(DropEvntReq, tSignal.getDataPtrSend());
+
+  req->setUserRef(m_reference);
+  req->setUserData(0);
+
+  UtilBufferWriter w(m_buffer);
+
+  w.add(SimpleProperties::StringValue, evnt.m_name.c_str());
+
+  LinearSectionPtr ptr[1];
+  ptr[0].p = (Uint32*)m_buffer.get_data();
+  ptr[0].sz = (m_buffer.length()+3) >> 2;
+
+  return dictSignal(&tSignal,ptr, 1,
+		    0 /*use masternode id*/,
+		    WAIT_CREATE_INDX_REQ,
+		    -1, 100,
+		    0, -1);
+}
+
+void
+NdbDictInterface::execDROP_EVNT_CONF(NdbApiSignal * signal,
+				     LinearSectionPtr ptr[3])
+{
+  DBUG_ENTER("NdbDictInterface::execDROP_EVNT_CONF");
+  m_waiter.signal(NO_WAIT);  
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbDictInterface::execDROP_EVNT_REF(NdbApiSignal * signal,
+				    LinearSectionPtr ptr[3])
+{
+  DBUG_ENTER("NdbDictInterface::execDROP_EVNT_REF");
+  const DropEvntRef* const ref=
+    CAST_CONSTPTR(DropEvntRef, signal->getDataPtr());
+  m_error.code= ref->getErrorCode();
+
+  DBUG_PRINT("info",("ErrorCode=%u Errorline=%u ErrorNode=%u",
+	     ref->getErrorCode(), ref->getErrorLine(), ref->getErrorNode()));
+  if (m_error.code == DropEvntRef::NotMaster)
+    m_masterNodeId = ref->getMasterNode();
+  m_waiter.signal(NO_WAIT);
+  DBUG_VOID_RETURN;
+}
+
+/*****************************************************************
  * List objects or indexes
  */
 int
@@ -2468,6 +4297,7 @@ NdbDictInterface::listObjects(NdbDiction
       getApiConstant(ListTablesConf::getTableState(d), objectStateMapping, 0);
     element.store = (NdbDictionary::Object::Store)
       getApiConstant(ListTablesConf::getTableStore(d), objectStoreMapping, 0);
+    element.temp = ListTablesConf::getTableTemp(d);
     // table or index name
     Uint32 n = (data[pos++] + 3) >> 2;
     BaseString databaseName;
@@ -2553,26 +4383,28 @@ NdbDictInterface::listObjects(NdbApiSign
   for (Uint32 i = 0; i < RETRIES; i++) {
     m_buffer.clear();
     // begin protected
-    m_transporter->lock_mutex();
+    /*
+      The PollGuard has an implicit call of unlock_and_signal through the
+      ~PollGuard method. This method is called implicitly by the compiler
+      in all places where the object is out of context due to a return,
+      break, continue or simply end of statement block
+    */
+    PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
     Uint16 aNodeId = m_transporter->get_an_alive_node();
     if (aNodeId == 0) {
       m_error.code= 4009;
-      m_transporter->unlock_mutex();
       return -1;
     }
     if (m_transporter->sendSignal(signal, aNodeId) != 0) {
-      m_transporter->unlock_mutex();
       continue;
     }
     m_error.code= 0;
-    m_waiter.m_node = aNodeId;
-    m_waiter.m_state = WAIT_LIST_TABLES_CONF;
-    m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT);
-    m_transporter->unlock_mutex();    
+    int ret_val= poll_guard.wait_n_unlock(DICT_WAITFOR_TIMEOUT,
+                                          aNodeId, WAIT_LIST_TABLES_CONF);
     // end protected
-    if (m_waiter.m_state == NO_WAIT && m_error.code == 0)
+    if (ret_val == 0 && m_error.code == 0)
       return 0;
-    if (m_waiter.m_state == WAIT_NODE_FAILURE)
+    if (ret_val == -2) //WAIT_NODE_FAILURE
       continue;
     return -1;
   }
@@ -2595,6 +4427,857 @@ NdbDictInterface::execLIST_TABLES_CONF(N
   }
 }
 
+int
+NdbDictionaryImpl::forceGCPWait()
+{
+  return m_receiver.forceGCPWait();
+}
+
+int
+NdbDictInterface::forceGCPWait()
+{
+  NdbApiSignal tSignal(m_reference);
+  WaitGCPReq* const req = CAST_PTR(WaitGCPReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->requestType = WaitGCPReq::CompleteForceStart;
+  tSignal.theReceiversBlockNumber = DBDIH;
+  tSignal.theVerId_signalNumber = GSN_WAIT_GCP_REQ;
+  tSignal.theLength = WaitGCPReq::SignalLength;
+
+  const Uint32 RETRIES = 100;
+  for (Uint32 i = 0; i < RETRIES; i++)
+  {
+    m_transporter->lock_mutex();
+    Uint16 aNodeId = m_transporter->get_an_alive_node();
+    if (aNodeId == 0) {
+      m_error.code= 4009;
+      m_transporter->unlock_mutex();
+      return -1;
+    }
+    if (m_transporter->sendSignal(&tSignal, aNodeId) != 0) {
+      m_transporter->unlock_mutex();
+      continue;
+    }
+    m_error.code= 0;
+    m_waiter.m_node = aNodeId;
+    m_waiter.m_state = WAIT_LIST_TABLES_CONF;
+    m_waiter.wait(DICT_WAITFOR_TIMEOUT);
+    m_transporter->unlock_mutex();    
+    return 0;
+  }
+  return -1;
+}
+
+void
+NdbDictInterface::execWAIT_GCP_CONF(NdbApiSignal* signal,
+				    LinearSectionPtr ptr[3])
+{
+  m_waiter.signal(NO_WAIT);
+}
+
+void
+NdbDictInterface::execWAIT_GCP_REF(NdbApiSignal* signal,
+				    LinearSectionPtr ptr[3])
+{
+  m_waiter.signal(NO_WAIT);
+}
+
+NdbFilegroupImpl::NdbFilegroupImpl(NdbDictionary::Object::Type t)
+  : NdbDictObjectImpl(t)
+{
+  m_extent_size = 0;
+  m_undo_buffer_size = 0;
+  m_logfile_group_id = ~0;
+  m_logfile_group_version = ~0;
+}
+
+NdbTablespaceImpl::NdbTablespaceImpl() : 
+  NdbDictionary::Tablespace(* this), 
+  NdbFilegroupImpl(NdbDictionary::Object::Tablespace), m_facade(this)
+{
+}
+
+NdbTablespaceImpl::NdbTablespaceImpl(NdbDictionary::Tablespace & f) : 
+  NdbDictionary::Tablespace(* this), 
+  NdbFilegroupImpl(NdbDictionary::Object::Tablespace), m_facade(&f)
+{
+}
+
+NdbTablespaceImpl::~NdbTablespaceImpl(){
+}
+
+int
+NdbTablespaceImpl::assign(const NdbTablespaceImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  if (!m_name.assign(org.m_name))
+    return -1;
+  m_grow_spec = org.m_grow_spec;
+  m_extent_size = org.m_extent_size;
+  m_undo_free_words = org.m_undo_free_words;
+  m_logfile_group_id = org.m_logfile_group_id;
+  m_logfile_group_version = org.m_logfile_group_version;
+  if (!m_logfile_group_name.assign(org.m_logfile_group_name))
+    return -1;
+  m_undo_free_words = org.m_undo_free_words;
+  return 0;
+}
+
+NdbLogfileGroupImpl::NdbLogfileGroupImpl() : 
+  NdbDictionary::LogfileGroup(* this), 
+  NdbFilegroupImpl(NdbDictionary::Object::LogfileGroup), m_facade(this)
+{
+}
+
+NdbLogfileGroupImpl::NdbLogfileGroupImpl(NdbDictionary::LogfileGroup & f) : 
+  NdbDictionary::LogfileGroup(* this), 
+  NdbFilegroupImpl(NdbDictionary::Object::LogfileGroup), m_facade(&f)
+{
+}
+
+NdbLogfileGroupImpl::~NdbLogfileGroupImpl(){
+}
+
+int
+NdbLogfileGroupImpl::assign(const NdbLogfileGroupImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  if (!m_name.assign(org.m_name))
+    return -1;
+  m_grow_spec = org.m_grow_spec;
+  m_extent_size = org.m_extent_size;
+  m_undo_free_words = org.m_undo_free_words;
+  m_logfile_group_id = org.m_logfile_group_id;
+  m_logfile_group_version = org.m_logfile_group_version;
+  if (!m_logfile_group_name.assign(org.m_logfile_group_name))
+    return -1;
+  m_undo_free_words = org.m_undo_free_words;
+  return 0;
+}
+
+NdbFileImpl::NdbFileImpl(NdbDictionary::Object::Type t)
+  : NdbDictObjectImpl(t)
+{
+  m_size = 0;
+  m_free = 0;
+  m_filegroup_id = ~0;
+  m_filegroup_version = ~0;
+}
+
+NdbDatafileImpl::NdbDatafileImpl() : 
+  NdbDictionary::Datafile(* this), 
+  NdbFileImpl(NdbDictionary::Object::Datafile), m_facade(this)
+{
+}
+
+NdbDatafileImpl::NdbDatafileImpl(NdbDictionary::Datafile & f) : 
+  NdbDictionary::Datafile(* this), 
+  NdbFileImpl(NdbDictionary::Object::Datafile), m_facade(&f)
+{
+}
+
+NdbDatafileImpl::~NdbDatafileImpl(){
+}
+
+int
+NdbDatafileImpl::assign(const NdbDatafileImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_size = org.m_size;
+  m_free = org.m_free;
+  m_filegroup_id = org.m_filegroup_id;
+  m_filegroup_version = org.m_filegroup_version;
+  if (!m_path.assign(org.m_path) ||
+      !m_filegroup_name.assign(org.m_filegroup_name))
+    return -1;
+  return 0;
+}
+
+NdbUndofileImpl::NdbUndofileImpl() : 
+  NdbDictionary::Undofile(* this), 
+  NdbFileImpl(NdbDictionary::Object::Undofile), m_facade(this)
+{
+}
+
+NdbUndofileImpl::NdbUndofileImpl(NdbDictionary::Undofile & f) : 
+  NdbDictionary::Undofile(* this), 
+  NdbFileImpl(NdbDictionary::Object::Undofile), m_facade(&f)
+{
+}
+
+NdbUndofileImpl::~NdbUndofileImpl(){
+}
+
+int
+NdbUndofileImpl::assign(const NdbUndofileImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_size = org.m_size;
+  m_free = org.m_free;
+  m_filegroup_id = org.m_filegroup_id;
+  m_filegroup_version = org.m_filegroup_version;
+  if (!m_path.assign(org.m_path) ||
+      !m_filegroup_name.assign(org.m_filegroup_name))
+    return 4000;
+  return 0;
+}
+
+int 
+NdbDictionaryImpl::createDatafile(const NdbDatafileImpl & file, 
+				  bool force,
+				  NdbDictObjectImpl* obj)
+  
+{
+  DBUG_ENTER("NdbDictionaryImpl::createDatafile");
+  NdbFilegroupImpl tmp(NdbDictionary::Object::Tablespace);
+  if(file.m_filegroup_version != ~(Uint32)0){
+    tmp.m_id = file.m_filegroup_id;
+    tmp.m_version = file.m_filegroup_version;
+    DBUG_RETURN(m_receiver.create_file(file, tmp, force, obj));
+  }
+  
+  
+  if(m_receiver.get_filegroup(tmp, NdbDictionary::Object::Tablespace,
+			      file.m_filegroup_name.c_str()) == 0){
+    DBUG_RETURN(m_receiver.create_file(file, tmp, force, obj));
+  }
+  DBUG_RETURN(-1); 
+}
+
+int
+NdbDictionaryImpl::dropDatafile(const NdbDatafileImpl & file){
+  return m_receiver.drop_file(file);
+}
+
+int
+NdbDictionaryImpl::createUndofile(const NdbUndofileImpl & file, 
+				  bool force,
+				  NdbDictObjectImpl* obj)
+{
+  DBUG_ENTER("NdbDictionaryImpl::createUndofile");
+  NdbFilegroupImpl tmp(NdbDictionary::Object::LogfileGroup);
+  if(file.m_filegroup_version != ~(Uint32)0){
+    tmp.m_id = file.m_filegroup_id;
+    tmp.m_version = file.m_filegroup_version;
+    DBUG_RETURN(m_receiver.create_file(file, tmp, force, obj));
+  }
+  
+  
+  if(m_receiver.get_filegroup(tmp, NdbDictionary::Object::LogfileGroup,
+			      file.m_filegroup_name.c_str()) == 0){
+    DBUG_RETURN(m_receiver.create_file(file, tmp, force, obj));
+  }
+  DBUG_PRINT("info", ("Failed to find filegroup"));
+  DBUG_RETURN(-1);
+}
+
+int
+NdbDictionaryImpl::dropUndofile(const NdbUndofileImpl & file)
+{
+  return m_receiver.drop_file(file);
+}
+
+int
+NdbDictionaryImpl::createTablespace(const NdbTablespaceImpl & fg,
+				    NdbDictObjectImpl* obj)
+{
+  return m_receiver.create_filegroup(fg, obj);
+}
+
+int
+NdbDictionaryImpl::dropTablespace(const NdbTablespaceImpl & fg)
+{
+  return m_receiver.drop_filegroup(fg);
+}
+
+int
+NdbDictionaryImpl::createLogfileGroup(const NdbLogfileGroupImpl & fg,
+				      NdbDictObjectImpl* obj)
+{
+  return m_receiver.create_filegroup(fg, obj);
+}
+
+int
+NdbDictionaryImpl::dropLogfileGroup(const NdbLogfileGroupImpl & fg)
+{
+  return m_receiver.drop_filegroup(fg);
+}
+
+int
+NdbDictInterface::create_file(const NdbFileImpl & file,
+			      const NdbFilegroupImpl & group,
+			      bool overwrite,
+			      NdbDictObjectImpl* obj)
+{
+  DBUG_ENTER("NdbDictInterface::create_file"); 
+  UtilBufferWriter w(m_buffer);
+  DictFilegroupInfo::File f; f.init();
+  snprintf(f.FileName, sizeof(f.FileName), file.m_path.c_str());
+  f.FileType = file.m_type;
+  f.FilegroupId = group.m_id;
+  f.FilegroupVersion = group.m_version;
+  f.FileSizeHi = (file.m_size >> 32);
+  f.FileSizeLo = (file.m_size & 0xFFFFFFFF);
+  
+  SimpleProperties::UnpackStatus s;
+  s = SimpleProperties::pack(w, 
+			     &f,
+			     DictFilegroupInfo::FileMapping, 
+			     DictFilegroupInfo::FileMappingSize, true);
+  
+  if(s != SimpleProperties::Eof){
+    abort();
+  }
+  
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber = GSN_CREATE_FILE_REQ;
+  tSignal.theLength = CreateFileReq::SignalLength;
+  
+  CreateFileReq* req = CAST_PTR(CreateFileReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->objType = file.m_type;
+  req->requestInfo = 0;
+  if (overwrite)
+    req->requestInfo |= CreateFileReq::ForceCreateFile;
+  
+  LinearSectionPtr ptr[3];
+  ptr[0].p = (Uint32*)m_buffer.get_data();
+  ptr[0].sz = m_buffer.length() / 4;
+
+  int err[] = { CreateFileRef::Busy, CreateFileRef::NotMaster, 0};
+  /*
+    Send signal without time-out since creating files can take a very long
+    time if the file is very big.
+  */
+  int ret = dictSignal(&tSignal, ptr, 1,
+		       0, // master
+		       WAIT_CREATE_INDX_REQ,
+		       -1, 100,
+		       err);
+
+  if (ret == 0 && obj)
+  {
+    Uint32* data = (Uint32*)m_buffer.get_data();
+    obj->m_id = data[0];
+    obj->m_version = data[1];
+  }
+
+  DBUG_RETURN(ret);
+}
+
+void
+NdbDictInterface::execCREATE_FILE_CONF(NdbApiSignal * signal,
+				       LinearSectionPtr ptr[3])
+{
+  const CreateFileConf* conf=
+    CAST_CONSTPTR(CreateFileConf, signal->getDataPtr());
+  m_buffer.grow(4 * 2); // 2 words
+  Uint32* data = (Uint32*)m_buffer.get_data();
+  data[0] = conf->fileId;
+  data[1] = conf->fileVersion;
+  
+  m_waiter.signal(NO_WAIT);  
+}
+
+void
+NdbDictInterface::execCREATE_FILE_REF(NdbApiSignal * signal,
+				      LinearSectionPtr ptr[3])
+{
+  const CreateFileRef* ref = 
+    CAST_CONSTPTR(CreateFileRef, signal->getDataPtr());
+  m_error.code = ref->errorCode;
+  m_masterNodeId = ref->masterNodeId;
+  m_waiter.signal(NO_WAIT);  
+}
+
+int
+NdbDictInterface::drop_file(const NdbFileImpl & file)
+{
+  DBUG_ENTER("NdbDictInterface::drop_file");
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber = GSN_DROP_FILE_REQ;
+  tSignal.theLength = DropFileReq::SignalLength;
+  
+  DropFileReq* req = CAST_PTR(DropFileReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->file_id = file.m_id;
+  req->file_version = file.m_version;
+
+  int err[] = { DropFileRef::Busy, DropFileRef::NotMaster, 0};
+  DBUG_RETURN(dictSignal(&tSignal, 0, 0,
+	                 0, // master
+		         WAIT_CREATE_INDX_REQ,
+		         DICT_WAITFOR_TIMEOUT, 100,
+		         err));
+}
+
+void
+NdbDictInterface::execDROP_FILE_CONF(NdbApiSignal * signal,
+					    LinearSectionPtr ptr[3])
+{
+  m_waiter.signal(NO_WAIT);  
+}
+
+void
+NdbDictInterface::execDROP_FILE_REF(NdbApiSignal * signal,
+					   LinearSectionPtr ptr[3])
+{
+  const DropFileRef* ref = 
+    CAST_CONSTPTR(DropFileRef, signal->getDataPtr());
+  m_error.code = ref->errorCode;
+  m_masterNodeId = ref->masterNodeId;
+  m_waiter.signal(NO_WAIT);  
+}
+
+int
+NdbDictInterface::create_filegroup(const NdbFilegroupImpl & group,
+				   NdbDictObjectImpl* obj)
+{
+  DBUG_ENTER("NdbDictInterface::create_filegroup");
+  UtilBufferWriter w(m_buffer);
+  DictFilegroupInfo::Filegroup fg; fg.init();
+  snprintf(fg.FilegroupName, sizeof(fg.FilegroupName), group.m_name.c_str());
+  switch(group.m_type){
+  case NdbDictionary::Object::Tablespace:
+  {
+    fg.FilegroupType = DictTabInfo::Tablespace;
+    //fg.TS_DataGrow = group.m_grow_spec;
+    fg.TS_ExtentSize = group.m_extent_size;
+
+    if(group.m_logfile_group_version != ~(Uint32)0)
+    {
+      fg.TS_LogfileGroupId = group.m_logfile_group_id;
+      fg.TS_LogfileGroupVersion = group.m_logfile_group_version;
+    }
+    else 
+    {
+      NdbLogfileGroupImpl tmp;
+      if(get_filegroup(tmp, NdbDictionary::Object::LogfileGroup, 
+		       group.m_logfile_group_name.c_str()) == 0)
+      {
+	fg.TS_LogfileGroupId = tmp.m_id;
+	fg.TS_LogfileGroupVersion = tmp.m_version;
+      }
+      else // error set by get filegroup
+      {
+	DBUG_RETURN(-1);
+      }
+    }
+  }
+  break;
+  case NdbDictionary::Object::LogfileGroup:
+    fg.LF_UndoBufferSize = group.m_undo_buffer_size;
+    fg.FilegroupType = DictTabInfo::LogfileGroup;
+    //fg.LF_UndoGrow = group.m_grow_spec;
+    break;
+  default:
+    abort();
+    DBUG_RETURN(-1);
+  };
+  
+  SimpleProperties::UnpackStatus s;
+  s = SimpleProperties::pack(w, 
+			     &fg,
+			     DictFilegroupInfo::Mapping, 
+			     DictFilegroupInfo::MappingSize, true);
+  
+  if(s != SimpleProperties::Eof){
+    abort();
+  }
+  
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber = GSN_CREATE_FILEGROUP_REQ;
+  tSignal.theLength = CreateFilegroupReq::SignalLength;
+  
+  CreateFilegroupReq* req = 
+    CAST_PTR(CreateFilegroupReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->objType = fg.FilegroupType;
+  
+  LinearSectionPtr ptr[3];
+  ptr[0].p = (Uint32*)m_buffer.get_data();
+  ptr[0].sz = m_buffer.length() / 4;
+
+  int err[] = { CreateFilegroupRef::Busy, CreateFilegroupRef::NotMaster, 0};
+  int ret = dictSignal(&tSignal, ptr, 1,
+		       0, // master
+		       WAIT_CREATE_INDX_REQ,
+		       DICT_WAITFOR_TIMEOUT, 100,
+		       err);
+  
+  if (ret == 0 && obj)
+  {
+    Uint32* data = (Uint32*)m_buffer.get_data();
+    obj->m_id = data[0];
+    obj->m_version = data[1];
+  }
+  
+  DBUG_RETURN(ret);
+}
+
+void
+NdbDictInterface::execCREATE_FILEGROUP_CONF(NdbApiSignal * signal,
+					    LinearSectionPtr ptr[3])
+{
+  const CreateFilegroupConf* conf=
+    CAST_CONSTPTR(CreateFilegroupConf, signal->getDataPtr());
+  m_buffer.grow(4 * 2); // 2 words
+  Uint32* data = (Uint32*)m_buffer.get_data();
+  data[0] = conf->filegroupId;
+  data[1] = conf->filegroupVersion;
+  m_waiter.signal(NO_WAIT);  
+}
+
+void
+NdbDictInterface::execCREATE_FILEGROUP_REF(NdbApiSignal * signal,
+					   LinearSectionPtr ptr[3])
+{
+  const CreateFilegroupRef* ref = 
+    CAST_CONSTPTR(CreateFilegroupRef, signal->getDataPtr());
+  m_error.code = ref->errorCode;
+  m_masterNodeId = ref->masterNodeId;
+  m_waiter.signal(NO_WAIT);  
+}
+
+int
+NdbDictInterface::drop_filegroup(const NdbFilegroupImpl & group)
+{
+  DBUG_ENTER("NdbDictInterface::drop_filegroup");
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber = GSN_DROP_FILEGROUP_REQ;
+  tSignal.theLength = DropFilegroupReq::SignalLength;
+  
+  DropFilegroupReq* req = CAST_PTR(DropFilegroupReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->filegroup_id = group.m_id;
+  req->filegroup_version = group.m_version;
+  
+  int err[] = { DropFilegroupRef::Busy, DropFilegroupRef::NotMaster, 0};
+  DBUG_RETURN(dictSignal(&tSignal, 0, 0,
+                         0, // master
+		         WAIT_CREATE_INDX_REQ,
+		         DICT_WAITFOR_TIMEOUT, 100,
+		         err));
+}
+
+void
+NdbDictInterface::execDROP_FILEGROUP_CONF(NdbApiSignal * signal,
+					    LinearSectionPtr ptr[3])
+{
+  m_waiter.signal(NO_WAIT);  
+}
+
+void
+NdbDictInterface::execDROP_FILEGROUP_REF(NdbApiSignal * signal,
+					   LinearSectionPtr ptr[3])
+{
+  const DropFilegroupRef* ref = 
+    CAST_CONSTPTR(DropFilegroupRef, signal->getDataPtr());
+  m_error.code = ref->errorCode;
+  m_masterNodeId = ref->masterNodeId;
+  m_waiter.signal(NO_WAIT);  
+}
+
+
+int
+NdbDictInterface::get_filegroup(NdbFilegroupImpl & dst,
+				NdbDictionary::Object::Type type,
+				const char * name){
+  DBUG_ENTER("NdbDictInterface::get_filegroup");
+  NdbApiSignal tSignal(m_reference);
+  GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
+
+  size_t strLen = strlen(name) + 1;
+
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->requestType = 
+    GetTabInfoReq::RequestByName | GetTabInfoReq::LongSignalConf;
+  req->tableNameLen = strLen;
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_GET_TABINFOREQ;
+  tSignal.theLength = GetTabInfoReq::SignalLength;
+
+  LinearSectionPtr ptr[1];
+  ptr[0].p  = (Uint32*)name;
+  ptr[0].sz = (strLen + 3)/4;
+  
+#ifndef IGNORE_VALGRIND_WARNINGS
+  if (strLen & 3)
+  {
+    Uint32 pad = 0;
+    m_buffer.clear();
+    m_buffer.append(name, strLen);
+    m_buffer.append(&pad, 4);
+    ptr[0].p = (Uint32*)m_buffer.get_data();
+  }
+#endif
+  
+  int r = dictSignal(&tSignal, ptr, 1,
+		     -1, // any node
+		     WAIT_GET_TAB_INFO_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100);
+  if (r)
+  {
+    dst.m_id = -1;
+    dst.m_version = ~0;
+    
+    DBUG_PRINT("info", ("get_filegroup failed dictSignal"));
+    DBUG_RETURN(-1);
+  }
+
+  m_error.code = parseFilegroupInfo(dst,
+				    (Uint32*)m_buffer.get_data(),
+				    m_buffer.length() / 4);
+
+  if(m_error.code)
+  {
+    DBUG_PRINT("info", ("get_filegroup failed parseFilegroupInfo %d",
+                         m_error.code));
+    DBUG_RETURN(m_error.code);
+  }
+
+  if(dst.m_type == NdbDictionary::Object::Tablespace)
+  {
+    NdbDictionary::LogfileGroup tmp;
+    get_filegroup(NdbLogfileGroupImpl::getImpl(tmp),
+		  NdbDictionary::Object::LogfileGroup,
+		  dst.m_logfile_group_id);
+    if (!dst.m_logfile_group_name.assign(tmp.getName()))
+      DBUG_RETURN(m_error.code = 4000);
+  }
+  
+  if(dst.m_type == type)
+  {
+    DBUG_RETURN(0);
+  }
+  DBUG_PRINT("info", ("get_filegroup failed no such filegroup"));
+  DBUG_RETURN(m_error.code = GetTabInfoRef::TableNotDefined);
+}
+
+int
+NdbDictInterface::parseFilegroupInfo(NdbFilegroupImpl &dst,
+				     const Uint32 * data, Uint32 len)
+  
+{
+  SimplePropertiesLinearReader it(data, len);
+
+  SimpleProperties::UnpackStatus status;
+  DictFilegroupInfo::Filegroup fg; fg.init();
+  status = SimpleProperties::unpack(it, &fg, 
+				    DictFilegroupInfo::Mapping, 
+				    DictFilegroupInfo::MappingSize, 
+				    true, true);
+  
+  if(status != SimpleProperties::Eof){
+    return CreateFilegroupRef::InvalidFormat;
+  }
+
+  dst.m_id = fg.FilegroupId;
+  dst.m_version = fg.FilegroupVersion;
+  dst.m_type = (NdbDictionary::Object::Type)fg.FilegroupType;
+  dst.m_status = NdbDictionary::Object::Retrieved;
+  
+  if (!dst.m_name.assign(fg.FilegroupName))
+    return 4000;
+  dst.m_extent_size = fg.TS_ExtentSize;
+  dst.m_undo_buffer_size = fg.LF_UndoBufferSize;
+  dst.m_logfile_group_id = fg.TS_LogfileGroupId;
+  dst.m_logfile_group_version = fg.TS_LogfileGroupVersion;
+  dst.m_undo_free_words= ((Uint64)fg.LF_UndoFreeWordsHi << 32)
+    | (fg.LF_UndoFreeWordsLo);
+
+  return 0;
+}
+
+int
+NdbDictInterface::get_filegroup(NdbFilegroupImpl & dst,
+				NdbDictionary::Object::Type type,
+				Uint32 id){
+  DBUG_ENTER("NdbDictInterface::get_filegroup");
+  NdbApiSignal tSignal(m_reference);
+  GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
+
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->requestType =
+    GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
+  req->tableId = id;
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_GET_TABINFOREQ;
+  tSignal.theLength = GetTabInfoReq::SignalLength;
+
+  int r = dictSignal(&tSignal, NULL, 1,
+		     -1, // any node
+		     WAIT_GET_TAB_INFO_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100);
+  if (r)
+  {
+    DBUG_PRINT("info", ("get_filegroup failed dictSignal"));
+    DBUG_RETURN(-1);
+  }
+
+  m_error.code = parseFilegroupInfo(dst,
+				    (Uint32*)m_buffer.get_data(),
+				    m_buffer.length() / 4);
+
+  if(m_error.code)
+  {
+    DBUG_PRINT("info", ("get_filegroup failed parseFilegroupInfo %d",
+                         m_error.code));
+    DBUG_RETURN(m_error.code);
+  }
+
+  if(dst.m_type == type)
+  {
+    DBUG_RETURN(0);
+  }
+  DBUG_PRINT("info", ("get_filegroup failed no such filegroup"));
+  DBUG_RETURN(m_error.code = GetTabInfoRef::TableNotDefined);
+}
+
+int
+NdbDictInterface::get_file(NdbFileImpl & dst,
+			   NdbDictionary::Object::Type type,
+			   int node,
+			   const char * name){
+  DBUG_ENTER("NdbDictInterface::get_file");
+  NdbApiSignal tSignal(m_reference);
+  GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
+
+  size_t strLen = strlen(name) + 1;
+
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->requestType =
+    GetTabInfoReq::RequestByName | GetTabInfoReq::LongSignalConf;
+  req->tableNameLen = strLen;
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_GET_TABINFOREQ;
+  tSignal.theLength = GetTabInfoReq::SignalLength;
+
+  LinearSectionPtr ptr[1];
+  ptr[0].p  = (Uint32*)name;
+  ptr[0].sz = (strLen + 3)/4;
+  
+#ifndef IGNORE_VALGRIND_WARNINGS
+  if (strLen & 3)
+  {
+    Uint32 pad = 0;
+    m_buffer.clear();
+    m_buffer.append(name, strLen);
+    m_buffer.append(&pad, 4);
+    ptr[0].p = (Uint32*)m_buffer.get_data();
+  }
+#endif
+  
+  int r = dictSignal(&tSignal, ptr, 1,
+		     node,
+		     WAIT_GET_TAB_INFO_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100);
+  if (r)
+  {
+    DBUG_PRINT("info", ("get_file failed dictSignal"));
+    DBUG_RETURN(-1);
+  }
+
+  m_error.code = parseFileInfo(dst,
+			       (Uint32*)m_buffer.get_data(),
+			       m_buffer.length() / 4);
+
+  if(m_error.code)
+  {
+    DBUG_PRINT("info", ("get_file failed parseFileInfo %d",
+                         m_error.code));
+    DBUG_RETURN(m_error.code);
+  }
+
+  if(dst.m_type == NdbDictionary::Object::Undofile)
+  {
+    NdbDictionary::LogfileGroup tmp;
+    get_filegroup(NdbLogfileGroupImpl::getImpl(tmp),
+		  NdbDictionary::Object::LogfileGroup,
+		  dst.m_filegroup_id);
+    if (!dst.m_filegroup_name.assign(tmp.getName()))
+      DBUG_RETURN(m_error.code = 4000);
+  }
+  else if(dst.m_type == NdbDictionary::Object::Datafile)
+  {
+    NdbDictionary::Tablespace tmp;
+    get_filegroup(NdbTablespaceImpl::getImpl(tmp),
+		  NdbDictionary::Object::Tablespace,
+		  dst.m_filegroup_id);
+    if (!dst.m_filegroup_name.assign(tmp.getName()))
+      DBUG_RETURN(m_error.code = 4000);
+    dst.m_free *= tmp.getExtentSize();
+  }
+  else
+    dst.m_filegroup_name.assign("Not Yet Implemented");
+  
+  if(dst.m_type == type)
+  {
+    DBUG_RETURN(0);
+  }
+  DBUG_PRINT("info", ("get_file failed no such file"));
+  DBUG_RETURN(m_error.code = GetTabInfoRef::TableNotDefined);
+}
+
+int
+NdbDictInterface::parseFileInfo(NdbFileImpl &dst,
+				const Uint32 * data, Uint32 len)
+{
+  SimplePropertiesLinearReader it(data, len);
+
+  SimpleProperties::UnpackStatus status;
+  DictFilegroupInfo::File f; f.init();
+  status = SimpleProperties::unpack(it, &f,
+				    DictFilegroupInfo::FileMapping,
+				    DictFilegroupInfo::FileMappingSize,
+				    true, true);
+
+  if(status != SimpleProperties::Eof){
+    return CreateFilegroupRef::InvalidFormat;
+  }
+
+  dst.m_type= (NdbDictionary::Object::Type)f.FileType;
+  dst.m_id= f.FileId;
+  dst.m_version = f.FileVersion;
+
+  dst.m_size= ((Uint64)f.FileSizeHi << 32) | (f.FileSizeLo);
+  if (!dst.m_path.assign(f.FileName))
+    return 4000;
+
+  dst.m_filegroup_id= f.FilegroupId;
+  dst.m_filegroup_version= f.FilegroupVersion;
+  dst.m_free=  f.FileFreeExtents;
+  return 0;
+}
+
 template class Vector<int>;
 template class Vector<Uint16>;
 template class Vector<Uint32>;
@@ -2602,3 +5285,16 @@ template class Vector<Vector<Uint32> >;
 template class Vector<NdbTableImpl*>;
 template class Vector<NdbColumnImpl*>;
 
+const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0;
+const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT_FIXED_MEMORY = 0;
+const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0;
+const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROW_SIZE = 0;
+const NdbDictionary::Column * NdbDictionary::Column::RANGE_NO = 0;
+const NdbDictionary::Column * NdbDictionary::Column::DISK_REF = 0;
+const NdbDictionary::Column * NdbDictionary::Column::RECORDS_IN_RANGE = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROWID = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROW_GCI = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ANY_VALUE = 0;
+const NdbDictionary::Column * NdbDictionary::Column::COPY_ROWID = 0;

--- 1.30.14.1/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2007-08-15 15:23:13 +00:00
+++ 1.76/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2007-08-15 15:23:13 +00:00
@@ -29,15 +29,37 @@
 #include "NdbWaiter.hpp"
 #include "DictCache.hpp"
 
+bool
+is_ndb_blob_table(const char* name, Uint32* ptab_id = 0, Uint32* pcol_no = 0);
+bool
+is_ndb_blob_table(const class NdbTableImpl* t);
+
+extern int ndb_dictionary_is_mysqld;
+#define ASSERT_NOT_MYSQLD assert(ndb_dictionary_is_mysqld == 0)
+
 class NdbDictObjectImpl {
 public:
+  int m_id;
   Uint32 m_version;
+  NdbDictionary::Object::Type m_type;
   NdbDictionary::Object::Status m_status;
   
   bool change();
+  
+  static NdbDictObjectImpl & getImpl(NdbDictionary::ObjectId & t) { 
+    return t.m_impl;
+  }
+  static const NdbDictObjectImpl & getImpl(const NdbDictionary::ObjectId & t){
+    return t.m_impl;
+  }
+  
 protected:
-  NdbDictObjectImpl() :
+  friend class NdbDictionary::ObjectId;
+
+  NdbDictObjectImpl(NdbDictionary::Object::Type type) :
+    m_type(type),
     m_status(NdbDictionary::Object::New) {
+    m_id = -1;
   }
 };
 
@@ -58,6 +80,7 @@ public:
   int m_precision;
   int m_scale;
   int m_length;
+  int m_column_no;
   CHARSET_INFO * m_cs;          // not const in MySQL
   
   bool m_pk;
@@ -72,7 +95,13 @@ public:
    * Internal types and sizes, and aggregates
    */
   Uint32 m_attrSize;            // element size (size when arraySize==1)
-  Uint32 m_arraySize;           // length or length+2 for Var* types
+  Uint32 m_arraySize;           // length or maxlength+1/2 for Var* types
+  Uint32 m_arrayType;           // NDB_ARRAYTYPE_FIXED or _VAR
+  Uint32 m_storageType;         // NDB_STORAGETYPE_MEMORY or _DISK
+  /*
+   * NdbTableImpl: if m_pk, 0-based index of key in m_attrId order
+   * NdbIndexImpl: m_column_no of primary table column
+   */
   Uint32 m_keyInfoPos;
   // TODO: use bits in attr desc 2
   bool getInterpretableType() const ;
@@ -89,10 +118,9 @@ public:
   static const NdbColumnImpl & getImpl(const NdbDictionary::Column & t);
   NdbDictionary::Column * m_facade;
 
-  static NdbDictionary::Column * create_psuedo(const char *);
+  static NdbDictionary::Column * create_pseudo(const char *);
 
   // Get total length in bytes, used by NdbOperation
-  // backported from 5.1
   bool get_var_length(const void* value, Uint32& len) const;
 };
 
@@ -105,13 +133,46 @@ public:
   void init();
   int setName(const char * name);
   const char * getName() const;
+  void setFragmentCount(Uint32 count);
+  Uint32 getFragmentCount() const;
+  int setFrm(const void* data, Uint32 len);
+  const void * getFrmData() const;
+  Uint32 getFrmLength() const;
+  int setFragmentData(const void* data, Uint32 len);
+  const void * getFragmentData() const;
+  Uint32 getFragmentDataLen() const;
+  int setTablespaceNames(const void* data, Uint32 len);
+  Uint32 getTablespaceNamesLen() const;
+  const void * getTablespaceNames() const;
+  int setTablespaceData(const void* data, Uint32 len);
+  const void * getTablespaceData() const;
+  Uint32 getTablespaceDataLen() const;
+  int setRangeListData(const void* data, Uint32 len);
+  const void * getRangeListData() const;
+  Uint32 getRangeListDataLen() const;
+
+  const char * getMysqlName() const;
+  int updateMysqlName();
+
+  int aggregate(NdbError& error);
+  int validate(NdbError& error);
 
   Uint32 m_changeMask;
-  Uint32 m_tableId;
+  Uint32 m_primaryTableId;
   BaseString m_internalName;
   BaseString m_externalName;
+  BaseString m_mysqlName;
   BaseString m_newExternalName; // Used for alter table
   UtilBuffer m_frm; 
+  UtilBuffer m_newFrm;       // Used for alter table
+  UtilBuffer m_ts_name;      //Tablespace Names
+  UtilBuffer m_new_ts_name;  //Tablespace Names
+  UtilBuffer m_ts;           //TablespaceData
+  UtilBuffer m_new_ts;       //TablespaceData
+  UtilBuffer m_fd;           //FragmentData
+  UtilBuffer m_new_fd;       //FragmentData
+  UtilBuffer m_range;        //Range Or List Array
+  UtilBuffer m_new_range;    //Range Or List Array
   NdbDictionary::Object::FragmentType m_fragmentType;
 
   /**
@@ -120,6 +181,7 @@ public:
   Uint32 m_columnHashMask;
   Vector<Uint32> m_columnHash;
   Vector<NdbColumnImpl *> m_columns;
+  void computeAggregates();
   int buildColumnHash(); 
 
   /**
@@ -131,8 +193,13 @@ public:
 
   Uint64 m_max_rows;
   Uint64 m_min_rows;
-
+  Uint32 m_default_no_part_flag;
+  bool m_linear_flag;
   bool m_logging;
+  bool m_temporary;
+  bool m_row_gci;
+  bool m_row_checksum;
+  bool m_force_var_part;
   int m_kvalue;
   int m_minLoadFactor;
   int m_maxLoadFactor;
@@ -140,7 +207,6 @@ public:
   Uint16 m_fragmentCount;
   Uint8 m_single_user_mode;
 
-  NdbDictionaryImpl * m_dictionary;
   NdbIndexImpl * m_index;
   NdbColumnImpl * getColumn(unsigned attrId);
   NdbColumnImpl * getColumn(const char * name);
@@ -151,15 +217,16 @@ public:
    * Index only stuff
    */
   BaseString m_primaryTable;
-  NdbDictionary::Index::Type m_indexType;
+  NdbDictionary::Object::Type m_indexType;
 
   /**
    * Aggregates
    */
   Uint8 m_noOfKeys;
+  // if all pk = dk then this is zero!
   Uint8 m_noOfDistributionKeys;
   Uint8 m_noOfBlobs;
-  
+  Uint8 m_noOfDiskColumns;
   Uint8 m_replicaCount;
 
   /**
@@ -176,6 +243,13 @@ public:
    * Return count
    */
   Uint32 get_nodes(Uint32 hashValue, const Uint16** nodes) const ;
+
+  /**
+   * Disk stuff
+   */
+  BaseString m_tablespace_name;
+  Uint32 m_tablespace_id;
+  Uint32 m_tablespace_version;
 };
 
 class NdbIndexImpl : public NdbDictionary::Index, public NdbDictObjectImpl {
@@ -191,15 +265,16 @@ public:
   const char * getTable() const;
   const NdbTableImpl * getIndexTable() const;
 
-  Uint32 m_indexId;
   BaseString m_internalName;
   BaseString m_externalName;
   BaseString m_tableName;
+  Uint32 m_table_id;
+  Uint32 m_table_version;
   Vector<NdbColumnImpl *> m_columns;
   Vector<int> m_key_ids;
-  NdbDictionary::Index::Type m_type;
 
   bool m_logging;
+  bool m_temporary;
   
   NdbTableImpl * m_table;
   
@@ -208,6 +283,147 @@ public:
   NdbDictionary::Index * m_facade;
 };
 
+class NdbEventImpl : public NdbDictionary::Event, public NdbDictObjectImpl {
+  friend class NdbDictInterface;
+  friend class NdbDictionaryImpl;
+  friend class NdbEventOperation;
+  friend class NdbEventOperationImpl;
+  friend class NdbEventBuffer;
+  friend class EventBufData_hash;
+  friend class NdbBlob;
+public:
+  NdbEventImpl();
+  NdbEventImpl(NdbDictionary::Event &);
+  ~NdbEventImpl();
+
+  void init();
+  int setName(const char * name);
+  const char * getName() const;
+  int setTable(const NdbDictionary::Table& table);
+  const NdbDictionary::Table * getTable() const;
+  int setTable(const char * table);
+  const char * getTableName() const;
+  void addTableEvent(const NdbDictionary::Event::TableEvent t);
+  bool getTableEvent(const NdbDictionary::Event::TableEvent t) const;
+  void setDurability(NdbDictionary::Event::EventDurability d);
+  NdbDictionary::Event::EventDurability  getDurability() const;
+  void setReport(NdbDictionary::Event::EventReport r);
+  NdbDictionary::Event::EventReport  getReport() const;
+  int getNoOfEventColumns() const;
+  const NdbDictionary::Column * getEventColumn(unsigned no) const;
+
+  void print() {
+    ndbout_c("NdbEventImpl: id=%d, key=%d",
+	     m_eventId,
+	     m_eventKey);
+  };
+
+  Uint32 m_eventId;
+  Uint32 m_eventKey;
+  AttributeMask m_attrListBitmask;
+  Uint32 m_table_id;
+  Uint32 m_table_version;
+  BaseString m_name;
+  Uint32 mi_type;
+  NdbDictionary::Event::EventDurability m_dur;
+  NdbDictionary::Event::EventReport m_rep;
+  bool m_mergeEvents;
+
+  BaseString m_tableName;
+  Vector<NdbColumnImpl *> m_columns;
+  Vector<unsigned> m_attrIds;
+
+  static NdbEventImpl & getImpl(NdbDictionary::Event & t);
+  static NdbEventImpl & getImpl(const NdbDictionary::Event & t);
+  NdbDictionary::Event * m_facade;
+private:
+  NdbTableImpl *m_tableImpl;
+  void setTable(NdbTableImpl *tableImpl);
+};
+
+struct NdbFilegroupImpl : public NdbDictObjectImpl {
+  NdbFilegroupImpl(NdbDictionary::Object::Type t);
+
+  BaseString m_name;
+  NdbDictionary::AutoGrowSpecification m_grow_spec;
+
+  union {
+    Uint32 m_extent_size;
+    Uint32 m_undo_buffer_size;
+  };
+
+  BaseString m_logfile_group_name;
+  Uint32 m_logfile_group_id;
+  Uint32 m_logfile_group_version;
+  Uint64 m_undo_free_words;
+};
+
+class NdbTablespaceImpl : public NdbDictionary::Tablespace, 
+			  public NdbFilegroupImpl {
+public:
+  NdbTablespaceImpl();
+  NdbTablespaceImpl(NdbDictionary::Tablespace &);
+  ~NdbTablespaceImpl();
+
+  int assign(const NdbTablespaceImpl&);
+
+  static NdbTablespaceImpl & getImpl(NdbDictionary::Tablespace & t);
+  static const NdbTablespaceImpl & getImpl(const NdbDictionary::Tablespace &);
+  NdbDictionary::Tablespace * m_facade;
+};
+
+class NdbLogfileGroupImpl : public NdbDictionary::LogfileGroup, 
+			    public NdbFilegroupImpl {
+public:
+  NdbLogfileGroupImpl();
+  NdbLogfileGroupImpl(NdbDictionary::LogfileGroup &);
+  ~NdbLogfileGroupImpl();
+
+  int assign(const NdbLogfileGroupImpl&);
+
+  static NdbLogfileGroupImpl & getImpl(NdbDictionary::LogfileGroup & t);
+  static const NdbLogfileGroupImpl& getImpl(const 
+					    NdbDictionary::LogfileGroup&);
+  NdbDictionary::LogfileGroup * m_facade;
+};
+
+struct NdbFileImpl : public NdbDictObjectImpl {
+  NdbFileImpl(NdbDictionary::Object::Type t);
+
+  Uint64 m_size;
+  Uint32 m_free;
+  BaseString m_path;
+  BaseString m_filegroup_name;
+  Uint32 m_filegroup_id;
+  Uint32 m_filegroup_version;
+};
+
+class NdbDatafileImpl : public NdbDictionary::Datafile, public NdbFileImpl {
+public:
+  NdbDatafileImpl();
+  NdbDatafileImpl(NdbDictionary::Datafile &);
+  ~NdbDatafileImpl();
+
+  int assign(const NdbDatafileImpl&);
+
+  static NdbDatafileImpl & getImpl(NdbDictionary::Datafile & t);
+  static const NdbDatafileImpl & getImpl(const NdbDictionary::Datafile & t);
+  NdbDictionary::Datafile * m_facade;
+};
+
+class NdbUndofileImpl : public NdbDictionary::Undofile, public NdbFileImpl {
+public:
+  NdbUndofileImpl();
+  NdbUndofileImpl(NdbDictionary::Undofile &);
+  ~NdbUndofileImpl();
+
+  int assign(const NdbUndofileImpl&);
+
+  static NdbUndofileImpl & getImpl(NdbDictionary::Undofile & t);
+  static const NdbUndofileImpl & getImpl(const NdbDictionary::Undofile & t);
+  NdbDictionary::Undofile * m_facade;
+};
+
 class NdbDictInterface {
 public:
   NdbDictInterface(NdbError& err) : m_error(err) {
@@ -221,54 +437,66 @@ public:
   bool setTransporter(class TransporterFacade * tf);
   
   // To abstract the stuff thats made in all create/drop/lists below
-  int
-  dictSignal(NdbApiSignal* signal, 
-	     LinearSectionPtr ptr[3], int noLPTR,
-	     const int useMasterNodeId,
-	     const Uint32 RETRIES,
-	     const WaitSignalType wst,
-	     const int theWait,
-	     const int *errcodes,
-	     const int noerrcodes,
-	     const int temporaryMask = 0);
+  int dictSignal(NdbApiSignal* signal, LinearSectionPtr ptr[3], int secs,
+		 int nodeId, // -1 any, 0 = master, >1 = specified
+		 WaitSignalType wst,
+		 int timeout, Uint32 RETRIES,
+		 const int *errcodes = 0, int temporaryMask = 0);
 
   int createOrAlterTable(class Ndb & ndb, NdbTableImpl &, bool alter);
 
   int createTable(class Ndb & ndb, NdbTableImpl &);
-  int createTable(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
-
   int alterTable(class Ndb & ndb, NdbTableImpl &);
-  int alterTable(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
-
-  int createIndex(class Ndb & ndb,
-		  NdbIndexImpl &, 
-		  const NdbTableImpl &);
-  int createIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
-  
   int dropTable(const NdbTableImpl &);
-  int dropTable(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
 
+  int createIndex(class Ndb & ndb, const NdbIndexImpl &, const NdbTableImpl &);
   int dropIndex(const NdbIndexImpl &, const NdbTableImpl &);
-  int dropIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
+  
+  int createEvent(class Ndb & ndb, NdbEventImpl &, int getFlag);
+  int dropEvent(const NdbEventImpl &);
+  int dropEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3], int noLSP);
 
+  int executeSubscribeEvent(class Ndb & ndb, NdbEventOperationImpl &);
+  int stopSubscribeEvent(class Ndb & ndb, NdbEventOperationImpl &);
+  
   int listObjects(NdbDictionary::Dictionary::List& list, Uint32 requestData, bool fullyQualifiedNames);
   int listObjects(NdbApiSignal* signal);
   
-/*  NdbTableImpl * getTable(int tableId, bool fullyQualifiedNames); */
+  NdbTableImpl * getTable(int tableId, bool fullyQualifiedNames);
   NdbTableImpl * getTable(const BaseString& name, bool fullyQualifiedNames);
   NdbTableImpl * getTable(class NdbApiSignal * signal, 
 			  LinearSectionPtr ptr[3],
 			  Uint32 noOfSections, bool fullyQualifiedNames);
 
+  int forceGCPWait();
+
   static int parseTableInfo(NdbTableImpl ** dst, 
 			    const Uint32 * data, Uint32 len,
 			    bool fullyQualifiedNames,
-			    bool hostByteOrder = true);
+                            Uint32 version= 0xFFFFFFFF);
+
+  static int parseFileInfo(NdbFileImpl &dst,
+			   const Uint32 * data, Uint32 len);
+
+  static int parseFilegroupInfo(NdbFilegroupImpl &dst,
+				const Uint32 * data, Uint32 len);
+  
+  int create_file(const NdbFileImpl &, const NdbFilegroupImpl&, 
+		  bool overwrite, NdbDictObjectImpl*);
+  int drop_file(const NdbFileImpl &);
+  int create_filegroup(const NdbFilegroupImpl &, NdbDictObjectImpl*);
+  int drop_filegroup(const NdbFilegroupImpl &);
+  
+  int get_filegroup(NdbFilegroupImpl&, NdbDictionary::Object::Type, Uint32);
+  int get_filegroup(NdbFilegroupImpl&,NdbDictionary::Object::Type,const char*);
+  int get_file(NdbFileImpl&, NdbDictionary::Object::Type, int, int);
+  int get_file(NdbFileImpl&, NdbDictionary::Object::Type, int, const char *);
   
   static int create_index_obj_from_table(NdbIndexImpl ** dst, 
 					 NdbTableImpl* index_table,
 					 const NdbTableImpl* primary_table);
   
+  const NdbError &getNdbError() const;  
   NdbError & m_error;
 private:
   Uint32 m_reference;
@@ -278,6 +506,7 @@ private:
   class TransporterFacade * m_transporter;
   
   friend class Ndb;
+  friend class NdbDictionaryImpl;
   static void execSignal(void* dictImpl, 
 			 class NdbApiSignal* signal, 
 			 struct LinearSectionPtr ptr[3]);
@@ -297,14 +526,53 @@ private:
   void execDROP_INDX_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
   void execDROP_INDX_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
 
+  void execCREATE_EVNT_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execCREATE_EVNT_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execSUB_START_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execSUB_START_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execSUB_STOP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execSUB_STOP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execDROP_EVNT_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execDROP_EVNT_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+
   void execDROP_TABLE_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
   void execDROP_TABLE_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
   void execLIST_TABLES_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
 
+  void execCREATE_FILE_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execCREATE_FILE_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  
+  void execCREATE_FILEGROUP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execCREATE_FILEGROUP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+
+  void execDROP_FILE_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execDROP_FILE_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  
+  void execDROP_FILEGROUP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execDROP_FILEGROUP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  
+  void execWAIT_GCP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execWAIT_GCP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+
   Uint32 m_fragmentId;
   UtilBuffer m_buffer;
 };
 
+class NdbDictionaryImpl;
+class GlobalCacheInitObject
+{
+public:
+  NdbDictionaryImpl *m_dict;
+  const BaseString &m_name;
+  GlobalCacheInitObject(NdbDictionaryImpl *dict,
+                        const BaseString &name) :
+    m_dict(dict),
+    m_name(name)
+  {}
+  virtual ~GlobalCacheInitObject() {}
+  virtual int init(NdbTableImpl &tab) const = 0;
+};
+
 class NdbDictionaryImpl : public NdbDictionary::Dictionary {
 public:
   NdbDictionaryImpl(Ndb &ndb);
@@ -315,8 +583,7 @@ public:
   bool setTransporter(class TransporterFacade * tf);
 
   int createTable(NdbTableImpl &t);
-  int createBlobTables(NdbTableImpl &);
-  int addBlobTables(NdbTableImpl &);
+  int createBlobTables(NdbTableImpl& org, NdbTableImpl& created);
   int alterTable(NdbTableImpl &t);
   int dropTable(const char * name);
   int dropTable(NdbTableImpl &);
@@ -325,22 +592,60 @@ public:
   int removeCachedObject(NdbTableImpl &);
 
   int createIndex(NdbIndexImpl &ix);
+  int createIndex(NdbIndexImpl &ix, NdbTableImpl & tab);
   int dropIndex(const char * indexName, 
 		const char * tableName);
-  int dropIndex(NdbIndexImpl &);
+  int dropIndex(NdbIndexImpl &, const char * tableName);
   NdbTableImpl * getIndexTable(NdbIndexImpl * index, 
 			       NdbTableImpl * table);
 
+  int createEvent(NdbEventImpl &);
+  int createBlobEvents(NdbEventImpl &);
+  int dropEvent(const char * eventName);
+  int dropEvent(const NdbEventImpl &);
+  int dropBlobEvents(const NdbEventImpl &);
+
+  int executeSubscribeEvent(NdbEventOperationImpl &);
+  int stopSubscribeEvent(NdbEventOperationImpl &);
+
+  int forceGCPWait();
+
   int listObjects(List& list, NdbDictionary::Object::Type type);
   int listIndexes(List& list, Uint32 indexId);
 
+  NdbTableImpl * getTableGlobal(const char * tableName);
+  NdbIndexImpl * getIndexGlobal(const char * indexName,
+                                NdbTableImpl &ndbtab);
+  int alterTableGlobal(NdbTableImpl &orig_impl, NdbTableImpl &impl);
+  int dropTableGlobal(NdbTableImpl &);
+  int dropIndexGlobal(NdbIndexImpl & impl);
+  int releaseTableGlobal(NdbTableImpl & impl, int invalidate);
+  int releaseIndexGlobal(NdbIndexImpl & impl, int invalidate);
+
   NdbTableImpl * getTable(const char * tableName, void **data= 0);
-  Ndb_local_table_info* get_local_table_info(
-    const BaseString& internalTableName, bool do_add_blob_tables);
+  NdbTableImpl * getBlobTable(const NdbTableImpl&, uint col_no);
+  NdbTableImpl * getBlobTable(uint tab_id, uint col_no);
+  void putTable(NdbTableImpl *impl);
+  int getBlobTables(NdbTableImpl &);
+  Ndb_local_table_info*
+    get_local_table_info(const BaseString& internalTableName);
   NdbIndexImpl * getIndex(const char * indexName,
 			  const char * tableName);
-  NdbIndexImpl * getIndex(const char * indexName,
-			  NdbTableImpl * table);
+  NdbIndexImpl * getIndex(const char * indexName, const NdbTableImpl& prim);
+  NdbEventImpl * getEvent(const char * eventName, NdbTableImpl* = NULL);
+  NdbEventImpl * getBlobEvent(const NdbEventImpl& ev, uint col_no);
+  NdbEventImpl * getEventImpl(const char * internalName);
+
+  int createDatafile(const NdbDatafileImpl &, bool force, NdbDictObjectImpl*);
+  int dropDatafile(const NdbDatafileImpl &);
+  int createUndofile(const NdbUndofileImpl &, bool force, NdbDictObjectImpl*);
+  int dropUndofile(const NdbUndofileImpl &);
+
+  int createTablespace(const NdbTablespaceImpl &, NdbDictObjectImpl*);
+  int dropTablespace(const NdbTablespaceImpl &);
+
+  int createLogfileGroup(const NdbLogfileGroupImpl &, NdbDictObjectImpl*);
+  int dropLogfileGroup(const NdbLogfileGroupImpl &);
   
   const NdbError & getNdbError() const;
   NdbError m_error;
@@ -355,13 +660,30 @@ public:
 
   NdbDictInterface m_receiver;
   Ndb & m_ndb;
-private:
+
+  NdbIndexImpl* getIndexImpl(const char * externalName,
+                             const BaseString& internalName,
+                             NdbTableImpl &tab,
+                             NdbTableImpl &prim);
   NdbIndexImpl * getIndexImpl(const char * name,
                               const BaseString& internalName);
-  Ndb_local_table_info * fetchGlobalTableImpl(const BaseString& internalName);
+private:
+  NdbTableImpl * fetchGlobalTableImplRef(const GlobalCacheInitObject &obj);
 };
 
 inline
+NdbEventImpl &
+NdbEventImpl::getImpl(const NdbDictionary::Event & t){
+  return t.m_impl;
+}
+
+inline
+NdbEventImpl &
+NdbEventImpl::getImpl(NdbDictionary::Event & t){
+  return t.m_impl;
+}
+
+inline
 NdbColumnImpl &
 NdbColumnImpl::getImpl(NdbDictionary::Column & t){
   return t.m_impl;
@@ -412,13 +734,11 @@ bool
 NdbColumnImpl::get_var_length(const void* value, Uint32& len) const
 {
   Uint32 max_len = m_attrSize * m_arraySize;
-  switch (m_type) {
-  case NdbDictionary::Column::Varchar:
-  case NdbDictionary::Column::Varbinary:
+  switch (m_arrayType) {
+  case NDB_ARRAYTYPE_SHORT_VAR:
     len = 1 + *((Uint8*)value);
     break;
-  case NdbDictionary::Column::Longvarchar:
-  case NdbDictionary::Column::Longvarbinary:
+  case NDB_ARRAYTYPE_MEDIUM_VAR:
     len = 2 + uint2korr((char*)value);
     break;
   default:
@@ -450,6 +770,13 @@ NdbTableImpl::getColumn(unsigned attrId)
 }
 
 inline
+const char *
+NdbTableImpl::getMysqlName() const
+{
+  return m_mysqlName.c_str();
+}
+
+inline
 Uint32
 Hash( const char* str ){
   Uint32 h = 0;
@@ -574,81 +901,320 @@ NdbDictionaryImpl::getImpl(const NdbDict
  * Inline:d getters
  */
 
+class InitTable : public GlobalCacheInitObject
+{
+public:
+  InitTable(NdbDictionaryImpl *dict,
+            const BaseString &name) :
+    GlobalCacheInitObject(dict, name)
+  {}
+  int init(NdbTableImpl &tab) const
+  {
+    return m_dict->getBlobTables(tab);
+  }
+};
+
+inline
+NdbTableImpl *
+NdbDictionaryImpl::getTableGlobal(const char * table_name)
+{
+  const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
+  return fetchGlobalTableImplRef(InitTable(this, internal_tabname));
+}
+
 inline
 NdbTableImpl *
 NdbDictionaryImpl::getTable(const char * table_name, void **data)
 {
+  DBUG_ENTER("NdbDictionaryImpl::getTable");
+  DBUG_PRINT("enter", ("table: %s", table_name));
+
+  if (unlikely(strchr(table_name, '$') != 0)) {
+    Uint32 tab_id, col_no;
+    if (is_ndb_blob_table(table_name, &tab_id, &col_no)) {
+      NdbTableImpl* t = getBlobTable(tab_id, col_no);
+      DBUG_RETURN(t);
+    }
+  }
+
   const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
   Ndb_local_table_info *info=
-    get_local_table_info(internal_tabname, true);
+    get_local_table_info(internal_tabname);
   if (info == 0)
-    return 0;
-
+    DBUG_RETURN(0);
   if (data)
     *data= info->m_local_data;
-
-  return info->m_table_impl;
+  DBUG_RETURN(info->m_table_impl);
 }
 
 inline
 Ndb_local_table_info * 
-NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName,
-					bool do_add_blob_tables)
+NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
 {
+  DBUG_ENTER("NdbDictionaryImpl::get_local_table_info");
+  DBUG_PRINT("enter", ("table: %s", internalTableName.c_str()));
+
   Ndb_local_table_info *info= m_localHash.get(internalTableName.c_str());
-  if (info == 0) {
-    info= fetchGlobalTableImpl(internalTableName);
-    if (info == 0) {
-      return 0;
+  if (info == 0)
+  {
+    NdbTableImpl *tab=
+      fetchGlobalTableImplRef(InitTable(this, internalTableName));
+    if (tab)
+    {
+      info= Ndb_local_table_info::create(tab, m_local_table_data_size);
+      if (info)
+      {
+        m_localHash.put(internalTableName.c_str(), info);
+      }
     }
   }
-  if (do_add_blob_tables && info->m_table_impl->m_noOfBlobs)
-    addBlobTables(*(info->m_table_impl));
-  
-  return info; // autoincrement already initialized
+  DBUG_RETURN(info); // autoincrement already initialized
 }
 
+class InitIndex : public GlobalCacheInitObject
+{
+public:
+  const char *m_index_name;
+  const NdbTableImpl &m_prim;
+
+  InitIndex(const BaseString &internal_indexname,
+	    const char *index_name,
+	    const NdbTableImpl &prim) :
+    GlobalCacheInitObject(0, internal_indexname),
+    m_index_name(index_name),
+    m_prim(prim)
+    {}
+  
+  int init(NdbTableImpl &tab) const {
+    DBUG_ENTER("InitIndex::init");
+    DBUG_ASSERT(tab.m_indexType != NdbDictionary::Object::TypeUndefined);
+    /**
+     * Create index impl
+     */
+    NdbIndexImpl* idx;
+    if(NdbDictInterface::create_index_obj_from_table(&idx, &tab, &m_prim) == 0)
+    {
+      idx->m_table = &tab;
+      if (!idx->m_externalName.assign(m_index_name) ||
+          !idx->m_internalName.assign(m_name))
+        DBUG_RETURN(4000);
+      tab.m_index = idx;
+      DBUG_RETURN(0);
+    }
+    DBUG_RETURN(1);
+  }
+};
 
 inline
 NdbIndexImpl * 
-NdbDictionaryImpl::getIndex(const char * index_name,
-			    const char * table_name)
+NdbDictionaryImpl::getIndexGlobal(const char * index_name,
+                                  NdbTableImpl &ndbtab)
 {
-  return getIndex(index_name, (table_name) ? getTable(table_name) : NULL);
+  DBUG_ENTER("NdbDictionaryImpl::getIndexGlobal");
+  const BaseString
+    internal_indexname(m_ndb.internalize_index_name(&ndbtab, index_name));
+  int retry= 2;
+
+  while (retry)
+  {
+    NdbTableImpl *tab=
+      fetchGlobalTableImplRef(InitIndex(internal_indexname,
+					index_name, ndbtab));
+    if (tab)
+    {
+      // tab->m_index sould be set. otherwise tab == 0
+      NdbIndexImpl *idx= tab->m_index;
+      if (idx->m_table_id != (unsigned)ndbtab.getObjectId() ||
+          idx->m_table_version != (unsigned)ndbtab.getObjectVersion())
+      {
+        releaseIndexGlobal(*idx, 1);
+        retry--;
+        continue;
+      }
+      DBUG_RETURN(idx);
+    }
+    break;
+  }
+  {
+    // Index not found, try old format
+    const BaseString
+      old_internal_indexname(m_ndb.old_internalize_index_name(&ndbtab, 
+							      index_name));
+    retry= 2;
+    while (retry)
+    {
+      NdbTableImpl *tab=
+	fetchGlobalTableImplRef(InitIndex(old_internal_indexname,
+					  index_name, ndbtab));
+      if (tab)
+      {
+	// tab->m_index sould be set. otherwise tab == 0
+	NdbIndexImpl *idx= tab->m_index;
+	if (idx->m_table_id != (unsigned)ndbtab.getObjectId() ||
+	    idx->m_table_version != (unsigned)ndbtab.getObjectVersion())
+	{
+	  releaseIndexGlobal(*idx, 1);
+	  retry--;
+	  continue;
+	}
+	DBUG_RETURN(idx);
+      }
+      break;
+    }
+  }
+  m_error.code= 4243;
+  DBUG_RETURN(0);
+}
+
+inline int
+NdbDictionaryImpl::releaseTableGlobal(NdbTableImpl & impl, int invalidate)
+{
+  DBUG_ENTER("NdbDictionaryImpl::releaseTableGlobal");
+  DBUG_PRINT("enter", ("internal_name: %s", impl.m_internalName.c_str()));
+  m_globalHash->lock();
+  m_globalHash->release(&impl, invalidate);
+  m_globalHash->unlock();
+  DBUG_RETURN(0);
+}
+
+inline int
+NdbDictionaryImpl::releaseIndexGlobal(NdbIndexImpl & impl, int invalidate)
+{
+  DBUG_ENTER("NdbDictionaryImpl::releaseIndexGlobal");
+  DBUG_PRINT("enter", ("internal_name: %s", impl.m_internalName.c_str()));
+  m_globalHash->lock();
+  m_globalHash->release(impl.m_table, invalidate);
+  m_globalHash->unlock();
+  DBUG_RETURN(0);
 }
 
 inline
 NdbIndexImpl * 
 NdbDictionaryImpl::getIndex(const char * index_name,
-			    NdbTableImpl * table)
+			    const char * table_name)
 {
-  if (table || m_ndb.usingFullyQualifiedNames())
+  if (table_name == 0)
   {
-    const BaseString internal_indexname(
-      (table)
-      ?
-      m_ndb.internalize_index_name(table, index_name)
-      :
-      m_ndb.internalize_table_name(index_name)); // Index is also a table
+    assert(0);
+    m_error.code= 4243;
+    return 0;
+  }
+  
+  
+  NdbTableImpl* prim = getTable(table_name);
+  if (prim == 0)
+  {
+    m_error.code= 4243;
+    return 0;
+  }
 
-    if (internal_indexname.length())
-    {
-      Ndb_local_table_info * info=
-        get_local_table_info(internal_indexname, false);
-      if (info)
-      {
-	NdbTableImpl * tab= info->m_table_impl;
-        if (tab->m_index == 0)
-          tab->m_index= getIndexImpl(index_name, internal_indexname);
-        if (tab->m_index != 0)
-          tab->m_index->m_table= tab;
-        return tab->m_index;
-      }
-    }
+  return getIndex(index_name, *prim);
+}
+
+inline
+NdbIndexImpl * 
+NdbDictionaryImpl::getIndex(const char* index_name,
+			    const NdbTableImpl& prim)
+{
+
+  const BaseString
+    internal_indexname(m_ndb.internalize_index_name(&prim, index_name));
+
+  Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str());
+  NdbTableImpl *tab;
+  if (info == 0)
+  {
+    tab= fetchGlobalTableImplRef(InitIndex(internal_indexname,
+					   index_name,
+					   prim));
+    if (!tab)
+      goto retry;
+
+    info= Ndb_local_table_info::create(tab, 0);
+    if (!info)
+      goto retry;
+    m_localHash.put(internal_indexname.c_str(), info);
   }
+  else
+    tab= info->m_table_impl;
+  
+  return tab->m_index;
+
+retry:
+  // Index not found, try fetching it from current database
+  const BaseString
+    old_internal_indexname(m_ndb.old_internalize_index_name(&prim, index_name));
 
+  info= m_localHash.get(old_internal_indexname.c_str());
+  if (info == 0)
+  {
+    tab= fetchGlobalTableImplRef(InitIndex(old_internal_indexname,
+					   index_name,
+					   prim));
+    if (!tab)
+      goto err;
+    
+    info= Ndb_local_table_info::create(tab, 0);
+    if (!info)
+      goto err;
+    m_localHash.put(old_internal_indexname.c_str(), info);
+  }
+  else
+    tab= info->m_table_impl;
+  
+  return tab->m_index;
+  
+err:
   m_error.code= 4243;
   return 0;
+}
+
+inline
+NdbTablespaceImpl &
+NdbTablespaceImpl::getImpl(NdbDictionary::Tablespace & t){
+  return t.m_impl;
+}
+
+inline
+const NdbTablespaceImpl &
+NdbTablespaceImpl::getImpl(const NdbDictionary::Tablespace & t){
+  return t.m_impl;
+}
+
+inline
+NdbLogfileGroupImpl &
+NdbLogfileGroupImpl::getImpl(NdbDictionary::LogfileGroup & t){
+  return t.m_impl;
+}
+
+inline
+const NdbLogfileGroupImpl &
+NdbLogfileGroupImpl::getImpl(const NdbDictionary::LogfileGroup & t){
+  return t.m_impl;
+}
+
+inline
+NdbDatafileImpl &
+NdbDatafileImpl::getImpl(NdbDictionary::Datafile & t){
+  return t.m_impl;
+}
+
+inline
+const NdbDatafileImpl &
+NdbDatafileImpl::getImpl(const NdbDictionary::Datafile & t){
+  return t.m_impl;
+}
+
+inline
+NdbUndofileImpl &
+NdbUndofileImpl::getImpl(NdbDictionary::Undofile & t){
+  return t.m_impl;
+}
+
+inline
+const NdbUndofileImpl &
+NdbUndofileImpl::getImpl(const NdbDictionary::Undofile & t){
+  return t.m_impl;
 }
 
 #endif
Thread
bk commit into 5.1 tree (lzhou:1.2528)lzhou15 Aug