MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:Martin Skold Date:May 24 2006 10:57am
Subject:bk commit into 5.1 tree (mskold:1.2181)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of marty. When marty does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2181 06/05/24 12:57:04 mskold@stripped +5 -0
  Merge mysql.com:/home/marty/MySQL/mysql-5.0
  into  mysql.com:/home/marty/MySQL/mysql-5.1-new

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
    1.60 06/05/24 12:56:58 mskold@stripped +1 -16
    merge (using local)

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
    1.134 06/05/24 12:56:58 mskold@stripped +19 -5
    merge (using local)

  sql/ha_ndbcluster.cc
    1.308 06/05/24 12:56:58 mskold@stripped +1 -40
    merge (using local)

  storage/ndb/src/ndbapi/NdbDictionary.cpp
    1.57 06/05/24 11:52:51 mskold@stripped +0 -0
    Auto merged

  storage/ndb/include/ndbapi/NdbDictionary.hpp
    1.76 06/05/24 11:52:51 mskold@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
    1.30.5.3 06/05/24 11:52:51 mskold@stripped +0 -0
    Merge rename: ndb/src/ndbapi/NdbDictionaryImpl.hpp -> storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
    1.74.17.2 06/05/24 11:52:51 mskold@stripped +0 -0
    Merge rename: ndb/src/ndbapi/NdbDictionaryImpl.cpp -> storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp

  storage/ndb/src/ndbapi/NdbDictionary.cpp
    1.34.3.3 06/05/24 11:52:51 mskold@stripped +0 -0
    Merge rename: ndb/src/ndbapi/NdbDictionary.cpp -> storage/ndb/src/ndbapi/NdbDictionary.cpp

  storage/ndb/include/ndbapi/NdbDictionary.hpp
    1.48.6.3 06/05/24 11:52:51 mskold@stripped +0 -0
    Merge rename: ndb/include/ndbapi/NdbDictionary.hpp -> storage/ndb/include/ndbapi/NdbDictionary.hpp

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	mskold
# Host:	linux.site
# Root:	/home/marty/MySQL/mysql-5.1-new/RESYNC

--- 1.48.6.2/ndb/include/ndbapi/NdbDictionary.hpp	2006-05-24 10:57:27 +02:00
+++ 1.76/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-05-24 11:52:51 +02:00
@@ -96,7 +96,9 @@
      * Get version of object
      */
     virtual int getObjectVersion() const = 0;
-
+    
+    virtual int getObjectId() const = 0;
+    
     /**
      * Object type
      */
@@ -109,7 +111,11 @@
       HashIndexTrigger = 7,   ///< Index maintenance, internal
       IndexTrigger = 8,       ///< Index maintenance, internal
       SubscriptionTrigger = 9,///< Backup or replication, internal
-      ReadOnlyConstraint = 10 ///< Trigger, internal
+      ReadOnlyConstraint = 10,///< Trigger, internal
+      Tablespace = 20,        ///< Tablespace
+      LogfileGroup = 21,      ///< Logfile group
+      Datafile = 22,          ///< Datafile
+      Undofile = 23           ///< Undofile
     };
 
     /**
@@ -150,12 +156,17 @@
       FragSingle = 1,         ///< Only one fragment
       FragAllSmall = 2,       ///< One fragment per node, default
       FragAllMedium = 3,      ///< two fragments per node
-      FragAllLarge = 4        ///< Four fragments per node.
+      FragAllLarge = 4,       ///< Four fragments per node.
+      DistrKeyHash = 5,
+      DistrKeyLin = 6,
+      UserDefined = 7
     };
   };
 
   class Table; // forward declaration
-  
+  class Tablespace; // forward declaration
+//  class NdbEventOperation; // forward declaration
+
   /**
    * @class Column
    * @brief Represents a column in an NDB Cluster table
@@ -212,6 +223,34 @@
       Timestamp = NDB_TYPE_TIMESTAMP  ///< Unix time
     };
 
+    /*
+     * Array type specifies internal attribute format.
+     *
+     * - ArrayTypeFixed is stored as fixed number of bytes.  This type
+     *   is fastest to access but can waste space.
+     *
+     * - ArrayTypeVar is stored as variable number of bytes with a fixed
+     *   overhead of 2 bytes.
+     *
+     * Default is ArrayTypeVar for Var* types and ArrayTypeFixed for
+     * others.  The default is normally ok.
+     */
+    enum ArrayType {
+      ArrayTypeFixed = NDB_ARRAYTYPE_FIXED,          // 0 length bytes
+      ArrayTypeShortVar = NDB_ARRAYTYPE_SHORT_VAR,   // 1 length bytes
+      ArrayTypeMediumVar = NDB_ARRAYTYPE_MEDIUM_VAR // 2 length bytes
+    };
+
+    /*
+     * Storage type specifies whether attribute is stored in memory or
+     * on disk.  Default is memory.  Disk attributes are potentially
+     * much slower to access and cannot be indexed in version 5.1.
+     */
+    enum StorageType {
+      StorageTypeMemory = NDB_STORAGETYPE_MEMORY,
+      StorageTypeDisk = NDB_STORAGETYPE_DISK
+    };
+
     /** 
      * @name General 
      * @{
@@ -238,6 +277,10 @@
      */
     int getColumnNo() const;
 
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+    int getAttrId() const;
+#endif
+
     /**
      * Check if column is equal to some other column
      * @param  column  Column to compare with
@@ -331,6 +374,9 @@
     inline bool getDistributionKey() const { return getPartitionKey(); };
 #endif
 
+    ArrayType getArrayType() const;
+    StorageType getStorageType() const;
+
     /** @} *******************************************************************/
 
 
@@ -439,6 +485,9 @@
     { setPartitionKey(enable); };
 #endif
 
+    void setArrayType(ArrayType type);
+    void setStorageType(StorageType type);
+
     /** @} *******************************************************************/
 
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
@@ -456,6 +505,10 @@
     static const Column * COMMIT_COUNT;
     static const Column * ROW_SIZE;
     static const Column * RANGE_NO;
+    static const Column * DISK_REF;
+    static const Column * RECORDS_IN_RANGE;
+    static const Column * ROWID;
+    static const Column * ROW_GCI;
     
     int getSizeInBytes() const;
 #endif
@@ -619,6 +672,24 @@
     const void* getFrmData() const;
     Uint32 getFrmLength() const;
 
+    /**
+     * Get Fragment Data (id, state and node group)
+     */
+    const void *getFragmentData() const;
+    Uint32 getFragmentDataLen() const;
+
+    /**
+     * Get Range or List Array (value, partition)
+     */
+    const void *getRangeListData() const;
+    Uint32 getRangeListDataLen() const;
+
+    /**
+     * Get Tablespace Data (id, version)
+     */
+    const void *getTablespaceData() const;
+    Uint32 getTablespaceDataLen() const;
+
     /** @} *******************************************************************/
 
     /** 
@@ -665,7 +736,23 @@
      * @see NdbDictionary::Table::getLogging.
      */
     void setLogging(bool); 
-   
+  
+    /**
+     * Set/Get Linear Hash Flag
+     */ 
+    void setLinearFlag(Uint32 flag);
+    bool getLinearFlag() const;
+
+    /**
+     * Set fragment count
+     */
+    void setFragmentCount(Uint32);
+
+    /**
+     * Get fragment count
+     */
+    Uint32 getFragmentCount() const;
+
     /**
      * Set fragmentation type
      */
@@ -697,6 +784,11 @@
      */
     void setMaxLoadFactor(int);
 
+    void setTablespace(const char * name);
+    void setTablespace(const class Tablespace &);
+    const char * getTablespace() const;
+    bool getTablespace(Uint32 *id= 0, Uint32 *version= 0) const;
+
     /**
      * Get table object type
      */
@@ -706,6 +798,7 @@
      * Get object status
      */
     virtual Object::Status getObjectStatus() const;
+    void setStatusInvalid() const;
 
     /**
      * Get object version
@@ -713,18 +806,77 @@
     virtual int getObjectVersion() const;
 
     /**
+     * Set/Get Maximum number of rows in table (only used to calculate
+     * number of partitions).
+     */
+    void setMaxRows(Uint64 maxRows);
+    Uint64 getMaxRows() const;
+
+    /**
+     * Set/Get indicator if default number of partitions is used in table.
+     */
+    void setDefaultNoPartitionsFlag(Uint32 indicator);
+    Uint32 getDefaultNoPartitionsFlag() const;
+   
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
+    /**
      * Set frm file to store with this table
      */ 
     void setFrm(const void* data, Uint32 len);
 
     /**
+     * Set array of fragment information containing
+     * Fragment Identity
+     * Node group identity
+     * Fragment State
+     */
+    void setFragmentData(const void* data, Uint32 len);
+
+    /**
+     * Set/Get tablespace names per fragment
+     */
+    void setTablespaceNames(const void* data, Uint32 len);
+    const void *getTablespaceNames();
+    Uint32 getTablespaceNamesLen() const;
+
+    /**
+     * Set tablespace information per fragment
+     * Contains a tablespace id and a tablespace version
+     */
+    void setTablespaceData(const void* data, Uint32 len);
+
+    /**
+     * Set array of information mapping range values and list values
+     * to fragments. This is essentially a sorted map consisting of
+     * pairs of value, fragment identity. For range partitions there is
+     * one pair per fragment. For list partitions it could be any number
+     * of pairs, at least as many as there are fragments.
+     */
+    void setRangeListData(const void* data, Uint32 len);
+
+    /**
      * Set table object type
      */
     void setObjectType(Object::Type type);
 
     /** @} *******************************************************************/
 
+    /**
+     * 
+     */
+    void setRowGCIIndicator(bool value);
+    bool getRowGCIIndicator() const;
+
+    void setRowChecksumIndicator(bool value);
+    bool getRowChecksumIndicator() const;
+ 
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+    const char *getMysqlName() const;
+
     void setStoredTable(bool x) { setLogging(x); }
     bool getStoredTable() const { return getLogging(); }
 
@@ -736,7 +888,10 @@
 
   private:
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+    friend class Ndb;
+    friend class NdbDictionaryImpl;
     friend class NdbTableImpl;
+    friend class NdbEventOperationImpl;
 #endif
     class NdbTableImpl & m_impl;
     Table(NdbTableImpl&);
@@ -825,6 +980,11 @@
      */
     virtual int getObjectVersion() const;
 
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
     /** @} *******************************************************************/
 
     /** 
@@ -918,6 +1078,7 @@
   private:
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     friend class NdbIndexImpl;
+    friend class NdbIndexStat;
 #endif
     class NdbIndexImpl & m_impl;
     Index(NdbIndexImpl&);
@@ -936,12 +1097,45 @@
     /** TableEvent must match 1 << TriggerEvent */
 #endif
     enum TableEvent { 
-      TE_INSERT=1, ///< Insert event on table
-      TE_DELETE=2, ///< Delete event on table
-      TE_UPDATE=4, ///< Update event on table
-      TE_ALL=7     ///< Any/all event on table (not relevant when 
-                   ///< events are received)
+      TE_INSERT      =1<<0, ///< Insert event on table
+      TE_DELETE      =1<<1, ///< Delete event on table
+      TE_UPDATE      =1<<2, ///< Update event on table
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+      TE_SCAN        =1<<3, ///< Scan event on table
+      TE_FIRST_NON_DATA_EVENT =1<<4,
+#endif
+      TE_DROP        =1<<4, ///< Drop of table
+      TE_ALTER       =1<<5, ///< Alter of table
+      TE_CREATE      =1<<6, ///< Create of table
+      TE_GCP_COMPLETE=1<<7, ///< GCP is complete
+      TE_CLUSTER_FAILURE=1<<8, ///< Cluster is unavailable
+      TE_STOP        =1<<9, ///< Stop of event operation
+      TE_NODE_FAILURE=1<<10, ///< Node failed
+      TE_SUBSCRIBE   =1<<11, ///< Node subscribes
+      TE_UNSUBSCRIBE =1<<12, ///< Node unsubscribes
+      TE_ALL=0xFFFF         ///< Any/all event on table (not relevant when 
+                            ///< events are received)
+    };
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+    enum _TableEvent { 
+      _TE_INSERT=0,
+      _TE_DELETE=1,
+      _TE_UPDATE=2,
+      _TE_SCAN=3,
+      _TE_FIRST_NON_DATA_EVENT=4,
+      _TE_DROP=4,
+      _TE_ALTER=5,
+      _TE_CREATE=6,
+      _TE_GCP_COMPLETE=7,
+      _TE_CLUSTER_FAILURE=8,
+      _TE_STOP=9,
+      _TE_NODE_FAILURE=10,
+      _TE_SUBSCRIBE=11,
+      _TE_UNSUBSCRIBE=12,
+      _TE_NUL=13, // internal (e.g. INS o DEL within same GCI)
+      _TE_ACTIVE=14 // internal (node becomes active)
     };
+#endif
     /**
      *  Specifies the durability of an event
      * (future version may supply other types)
@@ -968,6 +1162,15 @@
     };
 
     /**
+     * Specifies reporting options for table events
+     */
+    enum EventReport {
+      ER_UPDATED = 0,
+      ER_ALL = 1, // except not-updated blob inlines
+      ER_SUBSCRIBE = 2
+    };
+
+    /**
      *  Constructor
      *  @param  name  Name of event
      */
@@ -988,6 +1191,12 @@
      */
     const char *getName() const;
     /**
+     * Get table that the event is defined on
+     *
+     * @return pointer to table or NULL if no table has been defined
+     */
+    const NdbDictionary::Table * getTable() const;
+    /**
      * Define table on which events should be detected
      *
      * @note calling this method will default to detection
@@ -1015,6 +1224,10 @@
      */
     void addTableEvent(const TableEvent te);
     /**
+     * Check if a specific table event will be detected
+     */
+    bool getTableEvent(const TableEvent te) const;
+    /**
      * Set durability of the event
      */
     void setDurability(EventDurability);
@@ -1022,6 +1235,14 @@
      * Get durability of the event
      */
     EventDurability getDurability() const;
+    /**
+     * Set report option of the event
+     */
+    void setReport(EventReport);
+    /**
+     * Get report option of the event
+     */
+    EventReport getReport() const;
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     void addColumn(const Column &c);
 #endif
@@ -1060,6 +1281,29 @@
     int getNoOfEventColumns() const;
 
     /**
+     * Get a specific column in the event
+     */
+    const Column * getEventColumn(unsigned no) const;
+
+    /**
+     * The merge events flag is false by default.  Setting it true
+     * implies that events are merged in following ways:
+     *
+     * - for given NdbEventOperation associated with this event,
+     *   events on same PK within same GCI are merged into single event
+     *
+     * - a blob table event is created for each blob attribute
+     *   and blob events are handled as part of main table events
+     *
+     * - blob post/pre data from the blob part events can be read
+     *   via NdbBlob methods as a single value
+     *
+     * NOTE: Currently this flag is not inherited by NdbEventOperation
+     * and must be set on NdbEventOperation explicitly.
+     */
+    void mergeEvents(bool flag);
+
+    /**
      * Get object status
      */
     virtual Object::Status getObjectStatus() const;
@@ -1069,6 +1313,11 @@
      */
     virtual int getObjectVersion() const;
 
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     void print();
 #endif
@@ -1082,6 +1331,187 @@
     Event(NdbEventImpl&);
   };
 
+  struct AutoGrowSpecification {
+    Uint32 min_free;
+    Uint64 max_size;
+    Uint64 file_size;
+    const char * filename_pattern;
+  };
+
+  /**
+   * @class LogfileGroup
+   */
+  class LogfileGroup : public Object {
+  public:
+    LogfileGroup();
+    LogfileGroup(const LogfileGroup&);
+    virtual ~LogfileGroup();
+
+    void setName(const char * name);
+    const char* getName() const;
+
+    void setUndoBufferSize(Uint32 sz);
+    Uint32 getUndoBufferSize() const;
+    
+    void setAutoGrowSpecification(const AutoGrowSpecification&);
+    const AutoGrowSpecification& getAutoGrowSpecification() const;
+
+    Uint64 getUndoFreeWords() const;
+
+    /**
+     * Get object status
+     */
+    virtual Object::Status getObjectStatus() const;
+
+    /**
+     * Get object version
+     */
+    virtual int getObjectVersion() const;
+
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
+  private:
+    friend class NdbDictionaryImpl;
+    friend class NdbLogfileGroupImpl;
+    class NdbLogfileGroupImpl & m_impl;
+    LogfileGroup(NdbLogfileGroupImpl&);
+  };
+
+  /**
+   * @class Tablespace
+   */
+  class Tablespace : public Object {
+  public:
+    Tablespace();
+    Tablespace(const Tablespace&);
+    virtual ~Tablespace();
+
+    void setName(const char * name);
+    const char* getName() const;
+
+    void setExtentSize(Uint32 sz);
+    Uint32 getExtentSize() const;
+
+    void setAutoGrowSpecification(const AutoGrowSpecification&);
+    const AutoGrowSpecification& getAutoGrowSpecification() const;
+
+    void setDefaultLogfileGroup(const char * name);
+    void setDefaultLogfileGroup(const class LogfileGroup&);
+
+    const char * getDefaultLogfileGroup() const;
+    Uint32 getDefaultLogfileGroupId() const;
+    
+    /**
+     * Get object status
+     */
+    virtual Object::Status getObjectStatus() const;
+
+    /**
+     * Get object version
+     */
+    virtual int getObjectVersion() const;
+
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
+  private:
+    friend class NdbTablespaceImpl;
+    class NdbTablespaceImpl & m_impl;
+    Tablespace(NdbTablespaceImpl&);
+  };
+
+  class Datafile : public Object {
+  public:
+    Datafile();
+    Datafile(const Datafile&);
+    virtual ~Datafile();
+
+    void setPath(const char * name);
+    const char* getPath() const;
+  
+    void setSize(Uint64);
+    Uint64 getSize() const;
+    Uint64 getFree() const;
+
+    void setTablespace(const char * name);
+    void setTablespace(const class Tablespace &);
+    const char * getTablespace() const;
+    Uint32 getTablespaceId() const;
+
+    void setNode(Uint32 nodeId);
+    Uint32 getNode() const;
+
+    Uint32 getFileNo() const;
+
+    /**
+     * Get object status
+     */
+    virtual Object::Status getObjectStatus() const;
+
+    /**
+     * Get object version
+     */
+    virtual int getObjectVersion() const;
+
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
+  private:
+    friend class NdbDatafileImpl;
+    class NdbDatafileImpl & m_impl;
+    Datafile(NdbDatafileImpl&);
+  };
+
+  class Undofile : public Object {
+  public:
+    Undofile();
+    Undofile(const Undofile&);
+    virtual ~Undofile();
+
+    void setPath(const char * path);
+    const char* getPath() const;
+  
+    void setSize(Uint64);
+    Uint64 getSize() const;
+
+    void setLogfileGroup(const char * name);
+    void setLogfileGroup(const class LogfileGroup &);
+    const char * getLogfileGroup() const;
+    Uint32 getLogfileGroupId() const;
+
+    void setNode(Uint32 nodeId);
+    Uint32 getNode() const;
+
+    Uint32 getFileNo() const;
+
+    /**
+     * Get object status
+     */
+    virtual Object::Status getObjectStatus() const;
+
+    /**
+     * Get object version
+     */
+    virtual int getObjectVersion() const;
+
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
+  private:
+    friend class NdbUndofileImpl;
+    class NdbUndofileImpl & m_impl;
+    Undofile(NdbUndofileImpl&);
+  };
+
   /**
    * @class Dictionary
    * @brief Dictionary for defining and retreiving meta data
@@ -1173,6 +1603,20 @@
      */
     const Table * getTable(const char * name) const;
 
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+    /**
+     * Given main table, get blob table.
+     */
+    const Table * getBlobTable(const Table *, const char * col_name);
+    const Table * getBlobTable(const Table *, Uint32 col_no);
+
+    /*
+     * Save a table definition in dictionary cache
+     * @param table Object to put into cache
+     */
+    void putTable(const Table * table);
+#endif
+
     /**
      * Get index with given name, NULL if undefined
      * @param indexName  Name of index to get.
@@ -1301,6 +1745,7 @@
      * @return 0 if successful otherwise -1.
      */
     int createIndex(const Index &index);
+    int createIndex(const Index &index, const Table &table);
 
     /**
      * Drop index with given name
@@ -1320,15 +1765,48 @@
 
 
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+    void removeCachedTable(const Table *table);
+    void removeCachedIndex(const Index *index);
+    void invalidateTable(const Table *table);
     /**
      * Invalidate cached index object
      */
     void invalidateIndex(const char * indexName,
                          const char * tableName);
+    void invalidateIndex(const Index *index);
+    /**
+     * Force gcp and wait for gcp complete
+     */
+    int forceGCPWait();
 #endif
 
     /** @} *******************************************************************/
 
+    /** @} *******************************************************************/
+    /** 
+     * @name Disk data objects
+     * @{
+     */
+    
+    int createLogfileGroup(const LogfileGroup &);
+    int dropLogfileGroup(const LogfileGroup&);
+    LogfileGroup getLogfileGroup(const char * name);
+
+    int createTablespace(const Tablespace &);
+    int dropTablespace(const Tablespace&);
+    Tablespace getTablespace(const char * name);
+    Tablespace getTablespace(Uint32 tablespaceId);
+
+    int createDatafile(const Datafile &, bool overwrite_existing = false);
+    int dropDatafile(const Datafile&);
+    Datafile getDatafile(Uint32 node, const char * path);
+    
+    int createUndofile(const Undofile &, bool overwrite_existing = false);
+    int dropUndofile(const Undofile&);
+    Undofile getUndofile(Uint32 node, const char * path);
+    
+    /** @} *******************************************************************/
+    
   protected:
     Dictionary(Ndb & ndb);
     ~Dictionary();
@@ -1347,6 +1825,15 @@
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     const Table * getTable(const char * name, void **data) const;
     void set_local_table_data_size(unsigned sz);
+
+    const Index * getIndexGlobal(const char * indexName,
+                                 const Table &ndbtab) const;
+    const Table * getTableGlobal(const char * tableName) const;
+    int alterTableGlobal(const Table &f, const Table &t);
+    int dropTableGlobal(const Table &ndbtab);
+    int dropIndexGlobal(const Index &index);
+    int removeIndexGlobal(const Index &ndbidx, int invalidate) const;
+    int removeTableGlobal(const Table &ndbtab, int invalidate) const;
 #endif
   };
 };

--- 1.34.3.2/ndb/src/ndbapi/NdbDictionary.cpp	2006-05-24 10:57:27 +02:00
+++ 1.57/storage/ndb/src/ndbapi/NdbDictionary.cpp	2006-05-24 11:52:51 +02:00
@@ -178,12 +178,12 @@
 
 void 
 NdbDictionary::Column::setPartitionKey(bool val){
-  m_impl.m_distributionKey = val;
+  m_impl.m_distributionKey = (val ? 2 : 0);
 }
 
 bool 
 NdbDictionary::Column::getPartitionKey() const{
-  return m_impl.m_distributionKey;
+  return (bool)m_impl.m_distributionKey;
 }
 
 const NdbDictionary::Table * 
@@ -223,7 +223,12 @@
 
 int
 NdbDictionary::Column::getColumnNo() const {
-  return m_impl.m_attrId;
+  return m_impl.m_column_no;
+}
+
+int
+NdbDictionary::Column::getAttrId() const {
+  return m_impl.m_attrId;;
 }
 
 bool
@@ -237,6 +242,30 @@
   return m_impl.m_attrSize * m_impl.m_arraySize;
 }
 
+void
+NdbDictionary::Column::setArrayType(ArrayType type)
+{
+  m_impl.m_arrayType = type;
+}
+
+NdbDictionary::Column::ArrayType
+NdbDictionary::Column::getArrayType() const
+{
+  return (ArrayType)m_impl.m_arrayType;
+}
+
+void
+NdbDictionary::Column::setStorageType(StorageType type)
+{
+  m_impl.m_storageType = type;
+}
+
+NdbDictionary::Column::StorageType
+NdbDictionary::Column::getStorageType() const
+{
+  return (StorageType)m_impl.m_storageType;
+}
+
 /*****************************************************************
  * Table facade
  */
@@ -247,8 +276,7 @@
 }
 
 NdbDictionary::Table::Table(const NdbDictionary::Table & org)
-  : NdbDictionary::Object(),
-    m_impl(* new NdbTableImpl(* this))
+  : Object(org), m_impl(* new NdbTableImpl(* this))
 {
   m_impl.assign(org.m_impl);
 }
@@ -284,9 +312,14 @@
   return m_impl.getName();
 }
 
+const char *
+NdbDictionary::Table::getMysqlName() const {
+  return m_impl.getMysqlName();
+}
+
 int
 NdbDictionary::Table::getTableId() const {
-  return m_impl.m_tableId;
+  return m_impl.m_id;
 }
 
 void 
@@ -294,12 +327,7 @@
   NdbColumnImpl* col = new NdbColumnImpl;
   (* col) = NdbColumnImpl::getImpl(c);
   m_impl.m_columns.push_back(col);
-  if(c.getPrimaryKey()){
-    m_impl.m_noOfKeys++;
-  }
-  if (col->getBlobType()) {
-    m_impl.m_noOfBlobs++;
-  }
+  m_impl.computeAggregates();
   m_impl.buildColumnHash();
 }
 
@@ -385,6 +413,30 @@
   return m_impl.m_noOfKeys;
 }
 
+void
+NdbDictionary::Table::setMaxRows(Uint64 maxRows)
+{
+  m_impl.m_max_rows = maxRows;
+}
+
+Uint64
+NdbDictionary::Table::getMaxRows() const
+{
+  return m_impl.m_max_rows;
+}
+
+void
+NdbDictionary::Table::setDefaultNoPartitionsFlag(Uint32 flag)
+{
+  m_impl.m_default_no_part_flag = flag;;
+}
+
+Uint32
+NdbDictionary::Table::getDefaultNoPartitionsFlag() const
+{
+  return m_impl.m_default_no_part_flag;
+}
+
 const char*
 NdbDictionary::Table::getPrimaryKey(int no) const {
   int count = 0;
@@ -399,17 +451,107 @@
 
 const void* 
 NdbDictionary::Table::getFrmData() const {
-  return m_impl.m_frm.get_data();
+  return m_impl.getFrmData();
 }
 
 Uint32
 NdbDictionary::Table::getFrmLength() const {
-  return m_impl.m_frm.length();
+  return m_impl.getFrmLength();
+}
+
+void
+NdbDictionary::Table::setTablespaceNames(const void *data, Uint32 len)
+{
+  m_impl.setTablespaceNames(data, len);
+}
+
+const void*
+NdbDictionary::Table::getTablespaceNames()
+{
+  return m_impl.getTablespaceNames();
+}
+
+Uint32
+NdbDictionary::Table::getTablespaceNamesLen() const
+{
+  return m_impl.getTablespaceNamesLen();
+}
+
+void
+NdbDictionary::Table::setLinearFlag(Uint32 flag)
+{
+  m_impl.m_linear_flag = flag;
+}
+
+bool
+NdbDictionary::Table::getLinearFlag() const
+{
+  return m_impl.m_linear_flag;
+}
+
+void
+NdbDictionary::Table::setFragmentCount(Uint32 count)
+{
+  m_impl.setFragmentCount(count);
+}
+
+Uint32
+NdbDictionary::Table::getFragmentCount() const
+{
+  return m_impl.getFragmentCount();
 }
 
 void
 NdbDictionary::Table::setFrm(const void* data, Uint32 len){
-  m_impl.m_frm.assign(data, len);
+  m_impl.setFrm(data, len);
+}
+
+const void* 
+NdbDictionary::Table::getFragmentData() const {
+  return m_impl.getFragmentData();
+}
+
+Uint32
+NdbDictionary::Table::getFragmentDataLen() const {
+  return m_impl.getFragmentDataLen();
+}
+
+void
+NdbDictionary::Table::setFragmentData(const void* data, Uint32 len)
+{
+  m_impl.setFragmentData(data, len);
+}
+
+const void* 
+NdbDictionary::Table::getTablespaceData() const {
+  return m_impl.getTablespaceData();
+}
+
+Uint32
+NdbDictionary::Table::getTablespaceDataLen() const {
+  return m_impl.getTablespaceDataLen();
+}
+
+void
+NdbDictionary::Table::setTablespaceData(const void* data, Uint32 len)
+{
+  m_impl.setTablespaceData(data, len);
+}
+
+const void* 
+NdbDictionary::Table::getRangeListData() const {
+  return m_impl.getRangeListData();
+}
+
+Uint32
+NdbDictionary::Table::getRangeListDataLen() const {
+  return m_impl.getRangeListDataLen();
+}
+
+void
+NdbDictionary::Table::setRangeListData(const void* data, Uint32 len)
+{
+  m_impl.setRangeListData(data, len);
 }
 
 NdbDictionary::Object::Status
@@ -417,11 +559,21 @@
   return m_impl.m_status;
 }
 
+void
+NdbDictionary::Table::setStatusInvalid() const {
+  m_impl.m_status = NdbDictionary::Object::Invalid;
+}
+
 int 
 NdbDictionary::Table::getObjectVersion() const {
   return m_impl.m_version;
 }
 
+int 
+NdbDictionary::Table::getObjectId() const {
+  return m_impl.m_id;
+}
+
 bool
 NdbDictionary::Table::equal(const NdbDictionary::Table & col) const {
   return m_impl.equal(col.m_impl);
@@ -453,6 +605,53 @@
   return pNdb->getDictionary()->createTable(* this);
 }
 
+bool
+NdbDictionary::Table::getTablespace(Uint32 *id, Uint32 *version) const 
+{
+  if (m_impl.m_tablespace_id == RNIL)
+    return false;
+  if (id)
+    *id= m_impl.m_tablespace_id;
+  if (version)
+    *version= m_impl.m_version;
+  return true;
+}
+
+void 
+NdbDictionary::Table::setTablespace(const char * name){
+  m_impl.m_tablespace_id = ~0;
+  m_impl.m_tablespace_version = ~0;
+  m_impl.m_tablespace_name.assign(name);
+}
+
+void 
+NdbDictionary::Table::setTablespace(const NdbDictionary::Tablespace & ts){
+  m_impl.m_tablespace_id = NdbTablespaceImpl::getImpl(ts).m_id;
+  m_impl.m_tablespace_version = ts.getObjectVersion();
+  m_impl.m_tablespace_name.assign(ts.getName());
+}
+
+void
+NdbDictionary::Table::setRowChecksumIndicator(bool val){
+  m_impl.m_row_checksum = val;
+}
+
+bool 
+NdbDictionary::Table::getRowChecksumIndicator() const {
+  return m_impl.m_row_checksum;
+}
+
+void
+NdbDictionary::Table::setRowGCIIndicator(bool val){
+  m_impl.m_row_gci = val;
+}
+
+bool 
+NdbDictionary::Table::getRowGCIIndicator() const {
+  return m_impl.m_row_gci;
+}
+
+
 /*****************************************************************
  * Index facade
  */
@@ -558,12 +757,12 @@
 
 void
 NdbDictionary::Index::setType(NdbDictionary::Index::Type t){
-  m_impl.m_type = t;
+  m_impl.m_type = (NdbDictionary::Object::Type)t;
 }
 
 NdbDictionary::Index::Type
 NdbDictionary::Index::getType() const {
-  return m_impl.m_type;
+  return (NdbDictionary::Index::Type)m_impl.m_type;
 }
 
 void
@@ -578,14 +777,20 @@
 
 NdbDictionary::Object::Status
 NdbDictionary::Index::getObjectStatus() const {
-  return m_impl.m_status;
+  return m_impl.m_table->m_status;
 }
 
 int 
 NdbDictionary::Index::getObjectVersion() const {
-  return m_impl.m_version;
+  return m_impl.m_table->m_version;
+}
+
+int 
+NdbDictionary::Index::getObjectId() const {
+  return m_impl.m_table->m_id;
 }
 
+
 /*****************************************************************
  * Event facade
  */
@@ -633,6 +838,12 @@
   m_impl.setTable(table);
 }
 
+const NdbDictionary::Table *
+NdbDictionary::Event::getTable() const
+{
+  return m_impl.getTable();
+}
+
 void 
 NdbDictionary::Event::setTable(const char * table)
 {
@@ -651,6 +862,12 @@
   m_impl.addTableEvent(t);
 }
 
+bool
+NdbDictionary::Event::getTableEvent(const TableEvent t) const
+{
+  return m_impl.getTableEvent(t);
+}
+
 void
 NdbDictionary::Event::setDurability(EventDurability d)
 {
@@ -664,6 +881,18 @@
 }
 
 void
+NdbDictionary::Event::setReport(EventReport r)
+{
+  m_impl.setReport(r);
+}
+
+NdbDictionary::Event::EventReport
+NdbDictionary::Event::getReport() const
+{
+  return m_impl.getReport();
+}
+
+void
 NdbDictionary::Event::addColumn(const Column & c){
   NdbColumnImpl* col = new NdbColumnImpl;
   (* col) = NdbColumnImpl::getImpl(c);
@@ -695,6 +924,17 @@
   return m_impl.getNoOfEventColumns();
 }
 
+const NdbDictionary::Column *
+NdbDictionary::Event::getEventColumn(unsigned no) const
+{
+  return m_impl.getEventColumn(no);
+}
+
+void NdbDictionary::Event::mergeEvents(bool flag)
+{
+  m_impl.m_mergeEvents = flag;
+}
+
 NdbDictionary::Object::Status
 NdbDictionary::Event::getObjectStatus() const
 {
@@ -707,12 +947,364 @@
   return m_impl.m_version;
 }
 
+int 
+NdbDictionary::Event::getObjectId() const {
+  return m_impl.m_id;
+}
+
 void NdbDictionary::Event::print()
 {
   m_impl.print();
 }
 
 /*****************************************************************
+ * Tablespace facade
+ */
+NdbDictionary::Tablespace::Tablespace()
+  : m_impl(* new NdbTablespaceImpl(* this))
+{
+}
+
+NdbDictionary::Tablespace::Tablespace(NdbTablespaceImpl & impl)
+  : m_impl(impl) 
+{
+}
+
+NdbDictionary::Tablespace::Tablespace(const NdbDictionary::Tablespace & org)
+  : Object(org), m_impl(* new NdbTablespaceImpl(* this))
+{
+  m_impl.assign(org.m_impl);
+}
+
+NdbDictionary::Tablespace::~Tablespace(){
+  NdbTablespaceImpl * tmp = &m_impl;  
+  if(this != tmp){
+    delete tmp;
+  }
+}
+
+void 
+NdbDictionary::Tablespace::setName(const char * name){
+  m_impl.m_name.assign(name);
+}
+
+const char * 
+NdbDictionary::Tablespace::getName() const {
+  return m_impl.m_name.c_str();
+}
+
+void
+NdbDictionary::Tablespace::setAutoGrowSpecification
+(const NdbDictionary::AutoGrowSpecification& spec){
+  m_impl.m_grow_spec = spec;
+}
+const NdbDictionary::AutoGrowSpecification& 
+NdbDictionary::Tablespace::getAutoGrowSpecification() const {
+  return m_impl.m_grow_spec;
+}
+
+void
+NdbDictionary::Tablespace::setExtentSize(Uint32 sz){
+  m_impl.m_extent_size = sz;
+}
+
+Uint32
+NdbDictionary::Tablespace::getExtentSize() const {
+  return m_impl.m_extent_size;
+}
+
+void
+NdbDictionary::Tablespace::setDefaultLogfileGroup(const char * name){
+  m_impl.m_logfile_group_id = ~0;
+  m_impl.m_logfile_group_version = ~0;
+  m_impl.m_logfile_group_name.assign(name);
+}
+
+void
+NdbDictionary::Tablespace::setDefaultLogfileGroup
+(const NdbDictionary::LogfileGroup & lg){
+  m_impl.m_logfile_group_id = NdbLogfileGroupImpl::getImpl(lg).m_id;
+  m_impl.m_logfile_group_version = lg.getObjectVersion();
+  m_impl.m_logfile_group_name.assign(lg.getName());
+}
+
+const char * 
+NdbDictionary::Tablespace::getDefaultLogfileGroup() const {
+  return m_impl.m_logfile_group_name.c_str();
+}
+
+Uint32
+NdbDictionary::Tablespace::getDefaultLogfileGroupId() const {
+  return m_impl.m_logfile_group_id;
+}
+
+NdbDictionary::Object::Status
+NdbDictionary::Tablespace::getObjectStatus() const {
+  return m_impl.m_status;
+}
+
+int 
+NdbDictionary::Tablespace::getObjectVersion() const {
+  return m_impl.m_version;
+}
+
+int 
+NdbDictionary::Tablespace::getObjectId() const {
+  return m_impl.m_id;
+}
+
+/*****************************************************************
+ * LogfileGroup facade
+ */
+NdbDictionary::LogfileGroup::LogfileGroup()
+  : m_impl(* new NdbLogfileGroupImpl(* this))
+{
+}
+
+NdbDictionary::LogfileGroup::LogfileGroup(NdbLogfileGroupImpl & impl)
+  : m_impl(impl) 
+{
+}
+
+NdbDictionary::LogfileGroup::LogfileGroup(const NdbDictionary::LogfileGroup & org)
+  : Object(org), m_impl(* new NdbLogfileGroupImpl(* this)) 
+{
+  m_impl.assign(org.m_impl);
+}
+
+NdbDictionary::LogfileGroup::~LogfileGroup(){
+  NdbLogfileGroupImpl * tmp = &m_impl;  
+  if(this != tmp){
+    delete tmp;
+  }
+}
+
+void 
+NdbDictionary::LogfileGroup::setName(const char * name){
+  m_impl.m_name.assign(name);
+}
+
+const char * 
+NdbDictionary::LogfileGroup::getName() const {
+  return m_impl.m_name.c_str();
+}
+
+void
+NdbDictionary::LogfileGroup::setUndoBufferSize(Uint32 sz){
+  m_impl.m_undo_buffer_size = sz;
+}
+
+Uint32
+NdbDictionary::LogfileGroup::getUndoBufferSize() const {
+  return m_impl.m_undo_buffer_size;
+}
+
+void
+NdbDictionary::LogfileGroup::setAutoGrowSpecification
+(const NdbDictionary::AutoGrowSpecification& spec){
+  m_impl.m_grow_spec = spec;
+}
+const NdbDictionary::AutoGrowSpecification& 
+NdbDictionary::LogfileGroup::getAutoGrowSpecification() const {
+  return m_impl.m_grow_spec;
+}
+
+Uint64 NdbDictionary::LogfileGroup::getUndoFreeWords() const {
+  return m_impl.m_undo_free_words;
+}
+
+NdbDictionary::Object::Status
+NdbDictionary::LogfileGroup::getObjectStatus() const {
+  return m_impl.m_status;
+}
+
+int 
+NdbDictionary::LogfileGroup::getObjectVersion() const {
+  return m_impl.m_version;
+}
+
+int 
+NdbDictionary::LogfileGroup::getObjectId() const {
+  return m_impl.m_id;
+}
+
+/*****************************************************************
+ * Datafile facade
+ */
+NdbDictionary::Datafile::Datafile()
+  : m_impl(* new NdbDatafileImpl(* this))
+{
+}
+
+NdbDictionary::Datafile::Datafile(NdbDatafileImpl & impl)
+  : m_impl(impl) 
+{
+}
+
+NdbDictionary::Datafile::Datafile(const NdbDictionary::Datafile & org)
+  : Object(org), m_impl(* new NdbDatafileImpl(* this)) 
+{
+  m_impl.assign(org.m_impl);
+}
+
+NdbDictionary::Datafile::~Datafile(){
+  NdbDatafileImpl * tmp = &m_impl;  
+  if(this != tmp){
+    delete tmp;
+  }
+}
+
+void 
+NdbDictionary::Datafile::setPath(const char * path){
+  m_impl.m_path.assign(path);
+}
+
+const char * 
+NdbDictionary::Datafile::getPath() const {
+  return m_impl.m_path.c_str();
+}
+
+void 
+NdbDictionary::Datafile::setSize(Uint64 sz){
+  m_impl.m_size = sz;
+}
+
+Uint64
+NdbDictionary::Datafile::getSize() const {
+  return m_impl.m_size;
+}
+
+Uint64
+NdbDictionary::Datafile::getFree() const {
+  return m_impl.m_free;
+}
+
+void 
+NdbDictionary::Datafile::setTablespace(const char * tablespace){
+  m_impl.m_filegroup_id = ~0;
+  m_impl.m_filegroup_version = ~0;
+  m_impl.m_filegroup_name.assign(tablespace);
+}
+
+void 
+NdbDictionary::Datafile::setTablespace(const NdbDictionary::Tablespace & ts){
+  m_impl.m_filegroup_id = NdbTablespaceImpl::getImpl(ts).m_id;
+  m_impl.m_filegroup_version = ts.getObjectVersion();
+  m_impl.m_filegroup_name.assign(ts.getName());
+}
+
+const char *
+NdbDictionary::Datafile::getTablespace() const {
+  return m_impl.m_filegroup_name.c_str();
+}
+
+Uint32
+NdbDictionary::Datafile::getTablespaceId() const {
+  return m_impl.m_filegroup_id;
+}
+
+NdbDictionary::Object::Status
+NdbDictionary::Datafile::getObjectStatus() const {
+  return m_impl.m_status;
+}
+
+int 
+NdbDictionary::Datafile::getObjectVersion() const {
+  return m_impl.m_version;
+}
+
+int 
+NdbDictionary::Datafile::getObjectId() const {
+  return m_impl.m_id;
+}
+
+/*****************************************************************
+ * Undofile facade
+ */
+NdbDictionary::Undofile::Undofile()
+  : m_impl(* new NdbUndofileImpl(* this))
+{
+}
+
+NdbDictionary::Undofile::Undofile(NdbUndofileImpl & impl)
+  : m_impl(impl) 
+{
+}
+
+NdbDictionary::Undofile::Undofile(const NdbDictionary::Undofile & org)
+  : Object(org), m_impl(* new NdbUndofileImpl(* this))
+{
+  m_impl.assign(org.m_impl);
+}
+
+NdbDictionary::Undofile::~Undofile(){
+  NdbUndofileImpl * tmp = &m_impl;  
+  if(this != tmp){
+    delete tmp;
+  }
+}
+
+void 
+NdbDictionary::Undofile::setPath(const char * path){
+  m_impl.m_path.assign(path);
+}
+
+const char * 
+NdbDictionary::Undofile::getPath() const {
+  return m_impl.m_path.c_str();
+}
+
+void 
+NdbDictionary::Undofile::setSize(Uint64 sz){
+  m_impl.m_size = sz;
+}
+
+Uint64
+NdbDictionary::Undofile::getSize() const {
+  return m_impl.m_size;
+}
+
+void 
+NdbDictionary::Undofile::setLogfileGroup(const char * logfileGroup){
+  m_impl.m_filegroup_id = ~0;
+  m_impl.m_filegroup_version = ~0;
+  m_impl.m_filegroup_name.assign(logfileGroup);
+}
+
+void 
+NdbDictionary::Undofile::setLogfileGroup
+(const NdbDictionary::LogfileGroup & ts){
+  m_impl.m_filegroup_id = NdbLogfileGroupImpl::getImpl(ts).m_id;
+  m_impl.m_filegroup_version = ts.getObjectVersion();
+  m_impl.m_filegroup_name.assign(ts.getName());
+}
+
+const char *
+NdbDictionary::Undofile::getLogfileGroup() const {
+  return m_impl.m_filegroup_name.c_str();
+}
+
+Uint32
+NdbDictionary::Undofile::getLogfileGroupId() const {
+  return m_impl.m_filegroup_id;
+}
+
+NdbDictionary::Object::Status
+NdbDictionary::Undofile::getObjectStatus() const {
+  return m_impl.m_status;
+}
+
+int 
+NdbDictionary::Undofile::getObjectVersion() const {
+  return m_impl.m_version;
+}
+
+int 
+NdbDictionary::Undofile::getObjectId() const {
+  return m_impl.m_id;
+}
+
+/*****************************************************************
  * Dictionary facade
  */
 NdbDictionary::Dictionary::Dictionary(Ndb & ndb)
@@ -732,8 +1324,10 @@
 }
 
 int 
-NdbDictionary::Dictionary::createTable(const Table & t){
-  return m_impl.createTable(NdbTableImpl::getImpl(t));
+NdbDictionary::Dictionary::createTable(const Table & t)
+{
+  DBUG_ENTER("NdbDictionary::Dictionary::createTable");
+  DBUG_RETURN(m_impl.createTable(NdbTableImpl::getImpl(t)));
 }
 
 int
@@ -742,6 +1336,11 @@
 }
 
 int
+NdbDictionary::Dictionary::dropTableGlobal(const Table & t){
+  return m_impl.dropTableGlobal(NdbTableImpl::getImpl(t));
+}
+
+int
 NdbDictionary::Dictionary::dropTable(const char * name){
   return m_impl.dropTable(name);
 }
@@ -751,6 +1350,14 @@
   return m_impl.alterTable(NdbTableImpl::getImpl(t));
 }
 
+int
+NdbDictionary::Dictionary::alterTableGlobal(const Table & f,
+                                            const Table & t)
+{
+  return m_impl.alterTableGlobal(NdbTableImpl::getImpl(f),
+                                 NdbTableImpl::getImpl(t));
+}
+
 const NdbDictionary::Table * 
 NdbDictionary::Dictionary::getTable(const char * name, void **data) const
 {
@@ -760,6 +1367,47 @@
   return 0;
 }
 
+const NdbDictionary::Index * 
+NdbDictionary::Dictionary::getIndexGlobal(const char * indexName,
+                                          const Table &ndbtab) const
+{
+  NdbIndexImpl * i = m_impl.getIndexGlobal(indexName,
+                                           NdbTableImpl::getImpl(ndbtab));
+  if(i)
+    return i->m_facade;
+  return 0;
+}
+
+const NdbDictionary::Table * 
+NdbDictionary::Dictionary::getTableGlobal(const char * name) const
+{
+  NdbTableImpl * t = m_impl.getTableGlobal(name);
+  if(t)
+    return t->m_facade;
+  return 0;
+}
+
+int
+NdbDictionary::Dictionary::removeIndexGlobal(const Index &ndbidx,
+                                             int invalidate) const
+{
+  return m_impl.releaseIndexGlobal(NdbIndexImpl::getImpl(ndbidx), invalidate);
+}
+
+int
+NdbDictionary::Dictionary::removeTableGlobal(const Table &ndbtab,
+                                             int invalidate) const
+{
+  return m_impl.releaseTableGlobal(NdbTableImpl::getImpl(ndbtab), invalidate);
+}
+
+void NdbDictionary::Dictionary::putTable(const NdbDictionary::Table * table)
+{
+ NdbDictionary::Table  *copy_table = new NdbDictionary::Table;
+  *copy_table = *table;
+  m_impl.putTable(&NdbTableImpl::getImpl(*copy_table));
+}
+
 void NdbDictionary::Dictionary::set_local_table_data_size(unsigned sz)
 {
   m_impl.m_local_table_data_size= sz;
@@ -771,6 +1419,25 @@
   return getTable(name, 0);
 }
 
+const NdbDictionary::Table *
+NdbDictionary::Dictionary::getBlobTable(const NdbDictionary::Table* table,
+                                        const char* col_name)
+{
+  const NdbDictionary::Column* col = table->getColumn(col_name);
+  if (col == NULL) {
+    m_impl.m_error.code = 4318;
+    return NULL;
+  }
+  return getBlobTable(table, col->getColumnNo());
+}
+
+const NdbDictionary::Table *
+NdbDictionary::Dictionary::getBlobTable(const NdbDictionary::Table* table,
+                                        Uint32 col_no)
+{
+  return m_impl.getBlobTable(NdbTableImpl::getImpl(*table), col_no);
+}
+
 void
 NdbDictionary::Dictionary::invalidateTable(const char * name){
   DBUG_ENTER("NdbDictionaryImpl::invalidateTable");
@@ -781,18 +1448,37 @@
 }
 
 void
+NdbDictionary::Dictionary::invalidateTable(const Table *table){
+  NdbTableImpl &t = NdbTableImpl::getImpl(*table);
+  m_impl.invalidateObject(t);
+}
+
+void
 NdbDictionary::Dictionary::removeCachedTable(const char * name){
   NdbTableImpl * t = m_impl.getTable(name);
   if(t)
     m_impl.removeCachedObject(* t);
 }
 
+void
+NdbDictionary::Dictionary::removeCachedTable(const Table *table){
+  NdbTableImpl &t = NdbTableImpl::getImpl(*table);
+  m_impl.removeCachedObject(t);
+}
+
 int
 NdbDictionary::Dictionary::createIndex(const Index & ind)
 {
   return m_impl.createIndex(NdbIndexImpl::getImpl(ind));
 }
 
+int
+NdbDictionary::Dictionary::createIndex(const Index & ind, const Table & tab)
+{
+  return m_impl.createIndex(NdbIndexImpl::getImpl(ind),
+                            NdbTableImpl::getImpl(tab));
+}
+
 int 
 NdbDictionary::Dictionary::dropIndex(const char * indexName,
 				     const char * tableName)
@@ -801,6 +1487,12 @@
 }
 
 int 
+NdbDictionary::Dictionary::dropIndexGlobal(const Index &ind)
+{
+  return m_impl.dropIndexGlobal(NdbIndexImpl::getImpl(ind));
+}
+
+int 
 NdbDictionary::Dictionary::dropIndex(const Index & ind)
 {
   return m_impl.dropIndex(NdbIndexImpl::getImpl(ind));
@@ -827,6 +1519,15 @@
 }
 
 void
+NdbDictionary::Dictionary::invalidateIndex(const Index *index){
+  DBUG_ENTER("NdbDictionary::Dictionary::invalidateIndex");
+  NdbIndexImpl &i = NdbIndexImpl::getImpl(*index);
+  assert(i.m_table != 0);
+  m_impl.invalidateObject(* i.m_table);
+  DBUG_VOID_RETURN;
+}
+
+void
 NdbDictionary::Dictionary::invalidateIndex(const char * indexName,
                                            const char * tableName){
   DBUG_ENTER("NdbDictionaryImpl::invalidateIndex");
@@ -838,6 +1539,21 @@
   DBUG_VOID_RETURN;
 }
 
+int
+NdbDictionary::Dictionary::forceGCPWait()
+{
+  return m_impl.forceGCPWait();
+}
+
+void
+NdbDictionary::Dictionary::removeCachedIndex(const Index *index){
+  DBUG_ENTER("NdbDictionary::Dictionary::removeCachedIndex");
+  NdbIndexImpl &i = NdbIndexImpl::getImpl(*index);
+  assert(i.m_table != 0);
+  m_impl.removeCachedObject(* i.m_table);
+  DBUG_VOID_RETURN;
+}
+
 void
 NdbDictionary::Dictionary::removeCachedIndex(const char * indexName,
 					     const char * tableName){
@@ -1049,6 +1765,7 @@
       break;
     }
   }
+
   if (col.getPrimaryKey())
     out << " PRIMARY KEY";
   else if (! col.getNullable())
@@ -1059,12 +1776,116 @@
   if(col.getDistributionKey())
     out << " DISTRIBUTION KEY";
 
+  switch (col.getArrayType()) {
+  case NDB_ARRAYTYPE_FIXED:
+    out << " AT=FIXED";
+    break;
+  case NDB_ARRAYTYPE_SHORT_VAR:
+    out << " AT=SHORT_VAR";
+    break;
+  case NDB_ARRAYTYPE_MEDIUM_VAR:
+    out << " AT=MEDIUM_VAR";
+    break;
+  default:
+    out << " AT=" << (int)col.getArrayType() << "?";
+    break;
+  }
+
+  switch (col.getStorageType()) {
+  case NDB_STORAGETYPE_MEMORY:
+    out << " ST=MEMORY";
+    break;
+  case NDB_STORAGETYPE_DISK:
+    out << " ST=DISK";
+    break;
+  default:
+    out << " ST=" << (int)col.getStorageType() << "?";
+    break;
+  }
+
   return out;
 }
 
-const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0;
-const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT_MEMORY = 0;
-const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0;
-const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0;
-const NdbDictionary::Column * NdbDictionary::Column::ROW_SIZE = 0;
-const NdbDictionary::Column * NdbDictionary::Column::RANGE_NO = 0;
+int
+NdbDictionary::Dictionary::createLogfileGroup(const LogfileGroup & lg){
+  return m_impl.createLogfileGroup(NdbLogfileGroupImpl::getImpl(lg));
+}
+
+int
+NdbDictionary::Dictionary::dropLogfileGroup(const LogfileGroup & lg){
+  return m_impl.dropLogfileGroup(NdbLogfileGroupImpl::getImpl(lg));
+}
+
+NdbDictionary::LogfileGroup
+NdbDictionary::Dictionary::getLogfileGroup(const char * name){
+  NdbDictionary::LogfileGroup tmp;
+  m_impl.m_receiver.get_filegroup(NdbLogfileGroupImpl::getImpl(tmp), 
+				  NdbDictionary::Object::LogfileGroup, name);
+  return tmp;
+}
+
+int
+NdbDictionary::Dictionary::createTablespace(const Tablespace & lg){
+  return m_impl.createTablespace(NdbTablespaceImpl::getImpl(lg));
+}
+
+int
+NdbDictionary::Dictionary::dropTablespace(const Tablespace & lg){
+  return m_impl.dropTablespace(NdbTablespaceImpl::getImpl(lg));
+}
+
+NdbDictionary::Tablespace
+NdbDictionary::Dictionary::getTablespace(const char * name){
+  NdbDictionary::Tablespace tmp;
+  m_impl.m_receiver.get_filegroup(NdbTablespaceImpl::getImpl(tmp), 
+				  NdbDictionary::Object::Tablespace, name);
+  return tmp;
+}
+
+NdbDictionary::Tablespace
+NdbDictionary::Dictionary::getTablespace(Uint32 tablespaceId){
+  NdbDictionary::Tablespace tmp;
+  m_impl.m_receiver.get_filegroup(NdbTablespaceImpl::getImpl(tmp), 
+				  NdbDictionary::Object::Tablespace,
+                                  tablespaceId);
+  return tmp;
+}
+
+int
+NdbDictionary::Dictionary::createDatafile(const Datafile & df, bool force){
+  return m_impl.createDatafile(NdbDatafileImpl::getImpl(df), force);
+}
+
+int
+NdbDictionary::Dictionary::dropDatafile(const Datafile& df){
+  return m_impl.dropDatafile(NdbDatafileImpl::getImpl(df));
+}
+
+NdbDictionary::Datafile
+NdbDictionary::Dictionary::getDatafile(Uint32 node, const char * path){
+  NdbDictionary::Datafile tmp;
+  m_impl.m_receiver.get_file(NdbDatafileImpl::getImpl(tmp),
+			     NdbDictionary::Object::Datafile,
+			     node ? (int)node : -1, path);
+  return tmp;
+}
+
+int
+NdbDictionary::Dictionary::createUndofile(const Undofile & df, bool force){
+  return m_impl.createUndofile(NdbUndofileImpl::getImpl(df), force);
+}
+
+int
+NdbDictionary::Dictionary::dropUndofile(const Undofile& df){
+  return m_impl.dropUndofile(NdbUndofileImpl::getImpl(df));
+}
+
+NdbDictionary::Undofile
+NdbDictionary::Dictionary::getUndofile(Uint32 node, const char * path){
+  NdbDictionary::Undofile tmp;
+  m_impl.m_receiver.get_file(NdbUndofileImpl::getImpl(tmp),
+			     NdbDictionary::Object::Undofile,
+			     node ? (int)node : -1, path);
+  return tmp;
+}
+

--- 1.74.17.1/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-05-22 15:16:31 +02:00
+++ 1.134/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-05-24 12:56:58 +02:00
@@ -29,6 +29,9 @@
 #include <signaldata/AlterTable.hpp>
 #include <signaldata/DropIndx.hpp>
 #include <signaldata/ListTables.hpp>
+#include <signaldata/DropFilegroup.hpp>
+#include <signaldata/CreateFilegroup.hpp>
+#include <signaldata/WaitGCP.hpp>
 #include <SimpleProperties.hpp>
 #include <Bitmask.hpp>
 #include <AttributeList.hpp>
@@ -38,10 +41,36 @@
 #include "NdbBlobImpl.hpp"
 #include <AttributeHeader.hpp>
 #include <my_sys.h>
+#include <NdbEnv.h>
+#include <NdbMem.h>
+#include <ndb_version.h>
 
 #define DEBUG_PRINT 0
 #define INCOMPATIBLE_VERSION -2
 
+#define DICT_WAITFOR_TIMEOUT (7*24*60*60*1000)
+
+#define ERR_RETURN(a,b) \
+{\
+   DBUG_PRINT("exit", ("error %d", (a).code));\
+   DBUG_RETURN(b);\
+}
+
+extern Uint64 g_latest_trans_gci;
+int ndb_dictionary_is_mysqld = 0;
+
+bool
+is_ndb_blob_table(const char* name, Uint32* ptab_id, Uint32* pcol_no)
+{
+  return DictTabInfo::isBlobTableName(name, ptab_id, pcol_no);
+}
+
+bool
+is_ndb_blob_table(const NdbTableImpl* t)
+{
+  return is_ndb_blob_table(t->m_internalName.c_str());
+}
+
 //#define EVENT_DEBUG
 
 /**
@@ -50,18 +79,26 @@
 NdbColumnImpl::NdbColumnImpl()
   : NdbDictionary::Column(* this), m_attrId(-1), m_facade(this)
 {
+  DBUG_ENTER("NdbColumnImpl::NdbColumnImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 NdbColumnImpl::NdbColumnImpl(NdbDictionary::Column & f)
   : NdbDictionary::Column(* this), m_attrId(-1), m_facade(&f)
 {
+  DBUG_ENTER("NdbColumnImpl::NdbColumnImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 NdbColumnImpl&
 NdbColumnImpl::operator=(const NdbColumnImpl& col)
 {
+  DBUG_ENTER("NdbColumnImpl::operator=");
+  DBUG_PRINT("info", ("this: %p  &col: %p", this, &col));
   m_attrId = col.m_attrId;
   m_name = col.m_name;
   m_type = col.m_type;
@@ -77,11 +114,20 @@
   m_defaultValue = col.m_defaultValue;
   m_attrSize = col.m_attrSize; 
   m_arraySize = col.m_arraySize;
+  m_arrayType = col.m_arrayType;
+  m_storageType = col.m_storageType;
   m_keyInfoPos = col.m_keyInfoPos;
-  m_blobTable = col.m_blobTable;
+  if (col.m_blobTable == NULL)
+    m_blobTable = NULL;
+  else {
+    if (m_blobTable == NULL)
+      m_blobTable = new NdbTableImpl();
+    m_blobTable->assign(*col.m_blobTable);
+  }
+  m_column_no = col.m_column_no;
   // Do not copy m_facade !!
 
-  return *this;
+  DBUG_RETURN(*this);
 }
 
 void
@@ -108,6 +154,7 @@
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Olddecimal:
   case Olddecimalunsigned:
@@ -117,34 +164,57 @@
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Char:
+    m_precision = 0;
+    m_scale = 0;
+    m_length = 1;
+    m_cs = default_cs;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
+    break;
   case Varchar:
     m_precision = 0;
     m_scale = 0;
     m_length = 1;
     m_cs = default_cs;
+    m_arrayType = NDB_ARRAYTYPE_SHORT_VAR;
     break;
   case Binary:
+    m_precision = 0;
+    m_scale = 0;
+    m_length = 1;
+    m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
+    break;
   case Varbinary:
+    m_precision = 0;
+    m_scale = 0;
+    m_length = 1;
+    m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_SHORT_VAR;
+    break;
   case Datetime:
   case Date:
     m_precision = 0;
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Blob:
     m_precision = 256;
     m_scale = 8000;
     m_length = 4;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Text:
     m_precision = 256;
     m_scale = 8000;
     m_length = 4;
     m_cs = default_cs;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Time:
   case Year:
@@ -153,24 +223,28 @@
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Bit:
     m_precision = 0;
     m_scale = 0;
     m_length = 1;
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_FIXED;
     break;
   case Longvarchar:
     m_precision = 0;
     m_scale = 0;
     m_length = 1; // legal
     m_cs = default_cs;
+    m_arrayType = NDB_ARRAYTYPE_MEDIUM_VAR;
     break;
   case Longvarbinary:
     m_precision = 0;
     m_scale = 0;
     m_length = 1; // legal
     m_cs = NULL;
+    m_arrayType = NDB_ARRAYTYPE_MEDIUM_VAR;
     break;
   default:
   case Undefined:
@@ -187,16 +261,28 @@
   m_autoIncrement = false;
   m_autoIncrementInitialValue = 1;
   m_blobTable = NULL;
+  m_storageType = NDB_STORAGETYPE_MEMORY;
+#ifdef VM_TRACE
+  if(NdbEnv_GetEnv("NDB_DEFAULT_DISK", (char *)0, 0))
+    m_storageType = NDB_STORAGETYPE_DISK;
+#endif
 }
 
 NdbColumnImpl::~NdbColumnImpl()
 {
+  DBUG_ENTER("NdbColumnImpl::~NdbColumnImpl");
+  DBUG_PRINT("info", ("this: %p", this));
+  if (m_blobTable != NULL)
+    delete m_blobTable;
+  m_blobTable = NULL;
+  DBUG_VOID_RETURN;
 }
 
 bool
 NdbColumnImpl::equal(const NdbColumnImpl& col) const 
 {
   DBUG_ENTER("NdbColumnImpl::equal");
+  DBUG_PRINT("info", ("this: %p  &col: %p", this, &col));
   if(strcmp(m_name.c_str(), col.m_name.c_str()) != 0){
     DBUG_RETURN(false);
   }
@@ -209,13 +295,11 @@
   if(m_nullable != col.m_nullable){
     DBUG_RETURN(false);
   }
-#ifdef ndb_dictionary_dkey_fixed
-  if(m_pk){
-    if(m_distributionKey != col.m_distributionKey){
+  if (m_pk) {
+    if ((bool)m_distributionKey != (bool)col.m_distributionKey) {
       DBUG_RETURN(false);
     }
   }
-#endif
   if (m_precision != col.m_precision ||
       m_scale != col.m_scale ||
       m_length != col.m_length ||
@@ -229,11 +313,15 @@
     DBUG_RETURN(false);
   }
 
+  if (m_arrayType != col.m_arrayType || m_storageType != col.m_storageType){
+    DBUG_RETURN(false);
+  }
+
   DBUG_RETURN(true);
 }
 
 NdbDictionary::Column *
-NdbColumnImpl::create_psuedo(const char * name){
+NdbColumnImpl::create_pseudo(const char * name){
   NdbDictionary::Column * col = new NdbDictionary::Column();
   col->setName(name);
   if(!strcmp(name, "NDB$FRAGMENT")){
@@ -266,9 +354,31 @@
     col->m_impl.m_attrId = AttributeHeader::RANGE_NO;
     col->m_impl.m_attrSize = 4;
     col->m_impl.m_arraySize = 1;
+  } else if(!strcmp(name, "NDB$DISK_REF")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::DISK_REF;
+    col->m_impl.m_attrSize = 8;
+    col->m_impl.m_arraySize = 1;
+  } else if(!strcmp(name, "NDB$RECORDS_IN_RANGE")){
+    col->setType(NdbDictionary::Column::Unsigned);
+    col->m_impl.m_attrId = AttributeHeader::RECORDS_IN_RANGE;
+    col->m_impl.m_attrSize = 4;
+    col->m_impl.m_arraySize = 4;
+  } else if(!strcmp(name, "NDB$ROWID")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::ROWID;
+    col->m_impl.m_attrSize = 4;
+    col->m_impl.m_arraySize = 2;
+  } else if(!strcmp(name, "NDB$ROW_GCI")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::ROW_GCI;
+    col->m_impl.m_attrSize = 8;
+    col->m_impl.m_arraySize = 1;
+    col->m_impl.m_nullable = true;
   } else {
     abort();
   }
+  col->m_impl.m_storageType = NDB_STORAGETYPE_MEMORY;
   return col;
 }
 
@@ -277,48 +387,84 @@
  */
 
 NdbTableImpl::NdbTableImpl()
-  : NdbDictionary::Table(* this), m_facade(this)
+  : NdbDictionary::Table(* this), 
+    NdbDictObjectImpl(NdbDictionary::Object::UserTable), m_facade(this)
 {
+  DBUG_ENTER("NdbTableImpl::NdbTableImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 NdbTableImpl::NdbTableImpl(NdbDictionary::Table & f)
-  : NdbDictionary::Table(* this), m_facade(&f)
+  : NdbDictionary::Table(* this), 
+    NdbDictObjectImpl(NdbDictionary::Object::UserTable), m_facade(&f)
 {
+  DBUG_ENTER("NdbTableImpl::NdbTableImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 NdbTableImpl::~NdbTableImpl()
 {
+  DBUG_ENTER("NdbTableImpl::~NdbTableImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   if (m_index != 0) {
     delete m_index;
     m_index = 0;
   }
   for (unsigned i = 0; i < m_columns.size(); i++)
-    delete m_columns[i];  
+    delete m_columns[i];
+  DBUG_VOID_RETURN;
 }
 
 void
 NdbTableImpl::init(){
   m_changeMask= 0;
-  m_tableId= RNIL;
+  m_id= RNIL;
+  m_version = ~0;
+  m_status = NdbDictionary::Object::Invalid;
+  m_type = NdbDictionary::Object::TypeUndefined;
+  m_primaryTableId= RNIL;
+  m_internalName.clear();
+  m_externalName.clear();
+  m_newExternalName.clear();
+  m_mysqlName.clear();
   m_frm.clear();
+  m_newFrm.clear();
+  m_ts_name.clear();
+  m_new_ts_name.clear();
+  m_ts.clear();
+  m_new_ts.clear();
+  m_fd.clear();
+  m_new_fd.clear();
+  m_range.clear();
+  m_new_range.clear();
   m_fragmentType= NdbDictionary::Object::FragAllSmall;
   m_hashValueMask= 0;
   m_hashpointerValue= 0;
+  m_linear_flag= true;
+  m_primaryTable.clear();
+  m_max_rows = 0;
+  m_default_no_part_flag = 1;
   m_logging= true;
+  m_row_gci = true;
+  m_row_checksum = true;
   m_kvalue= 6;
   m_minLoadFactor= 78;
   m_maxLoadFactor= 80;
   m_keyLenInWords= 0;
   m_fragmentCount= 0;
-  m_dictionary= NULL;
   m_index= NULL;
-  m_indexType= NdbDictionary::Index::Undefined;
+  m_indexType= NdbDictionary::Object::TypeUndefined;
   m_noOfKeys= 0;
   m_noOfDistributionKeys= 0;
   m_noOfBlobs= 0;
   m_replicaCount= 0;
+  m_tablespace_name.clear();
+  m_tablespace_id = ~0;
+  m_tablespace_version = ~0;
 }
 
 bool
@@ -328,94 +474,266 @@
   if ((m_internalName.c_str() == NULL) || 
       (strcmp(m_internalName.c_str(), "") == 0) ||
       (obj.m_internalName.c_str() == NULL) || 
-      (strcmp(obj.m_internalName.c_str(), "") == 0)) {
+      (strcmp(obj.m_internalName.c_str(), "") == 0))
+  {
     // Shallow equal
-    if(strcmp(getName(), obj.getName()) != 0){
+    if(strcmp(getName(), obj.getName()) != 0)
+    {
       DBUG_PRINT("info",("name %s != %s",getName(),obj.getName()));
       DBUG_RETURN(false);    
     }
-  } else 
+  }
+  else
+  {
     // Deep equal
-    if(strcmp(m_internalName.c_str(), obj.m_internalName.c_str()) != 0){
+    if(strcmp(m_internalName.c_str(), obj.m_internalName.c_str()) != 0)
     {
       DBUG_PRINT("info",("m_internalName %s != %s",
 			 m_internalName.c_str(),obj.m_internalName.c_str()));
       DBUG_RETURN(false);
     }
   }
-  if(m_fragmentType != obj.m_fragmentType){
-    DBUG_PRINT("info",("m_fragmentType %d != %d",m_fragmentType,obj.m_fragmentType));
+  if (m_frm.length() != obj.m_frm.length() ||
+      (memcmp(m_frm.get_data(), obj.m_frm.get_data(), m_frm.length())))
+  {
+    DBUG_PRINT("info",("m_frm not equal"));
+    DBUG_RETURN(false);
+  }
+  if (m_fd.length() != obj.m_fd.length() ||
+      (memcmp(m_fd.get_data(), obj.m_fd.get_data(), m_fd.length())))
+  {
+    DBUG_PRINT("info",("m_fd not equal"));
+    DBUG_RETURN(false);
+  }
+  if (m_ts.length() != obj.m_ts.length() ||
+      (memcmp(m_ts.get_data(), obj.m_ts.get_data(), m_ts.length())))
+  {
+    DBUG_PRINT("info",("m_ts not equal"));
+    DBUG_RETURN(false);
+  }
+  if (m_range.length() != obj.m_range.length() ||
+      (memcmp(m_range.get_data(), obj.m_range.get_data(), m_range.length())))
+  {
+    DBUG_PRINT("info",("m_range not equal"));
     DBUG_RETURN(false);
   }
-  if(m_columns.size() != obj.m_columns.size()){
-    DBUG_PRINT("info",("m_columns.size %d != %d",m_columns.size(),obj.m_columns.size()));
+  if(m_fragmentType != obj.m_fragmentType)
+  {
+    DBUG_PRINT("info",("m_fragmentType %d != %d",m_fragmentType,
+                        obj.m_fragmentType));
+    DBUG_RETURN(false);
+  }
+  if(m_columns.size() != obj.m_columns.size())
+  {
+    DBUG_PRINT("info",("m_columns.size %d != %d",m_columns.size(),
+                       obj.m_columns.size()));
     DBUG_RETURN(false);
   }
 
-  for(unsigned i = 0; i<obj.m_columns.size(); i++){
-    if(!m_columns[i]->equal(* obj.m_columns[i])){
+  for(unsigned i = 0; i<obj.m_columns.size(); i++)
+  {
+    if(!m_columns[i]->equal(* obj.m_columns[i]))
+    {
       DBUG_PRINT("info",("m_columns [%d] != [%d]",i,i));
       DBUG_RETURN(false);
     }
   }
   
-  if(m_logging != obj.m_logging){
+  if(m_linear_flag != obj.m_linear_flag)
+  {
+    DBUG_PRINT("info",("m_linear_flag %d != %d",m_linear_flag,
+                        obj.m_linear_flag));
+    DBUG_RETURN(false);
+  }
+
+  if(m_max_rows != obj.m_max_rows)
+  {
+    DBUG_PRINT("info",("m_max_rows %d != %d",(int32)m_max_rows,
+                       (int32)obj.m_max_rows));
+    DBUG_RETURN(false);
+  }
+
+  if(m_default_no_part_flag != obj.m_default_no_part_flag)
+  {
+    DBUG_PRINT("info",("m_default_no_part_flag %d != %d",m_default_no_part_flag,
+                        obj.m_default_no_part_flag));
+    DBUG_RETURN(false);
+  }
+
+  if(m_logging != obj.m_logging)
+  {
     DBUG_PRINT("info",("m_logging %d != %d",m_logging,obj.m_logging));
     DBUG_RETURN(false);
   }
 
-  if(m_kvalue != obj.m_kvalue){
+  if(m_row_gci != obj.m_row_gci)
+  {
+    DBUG_PRINT("info",("m_row_gci %d != %d",m_row_gci,obj.m_row_gci));
+    DBUG_RETURN(false);
+  }
+
+  if(m_row_checksum != obj.m_row_checksum)
+  {
+    DBUG_PRINT("info",("m_row_checksum %d != %d",m_row_checksum,
+                        obj.m_row_checksum));
+    DBUG_RETURN(false);
+  }
+
+  if(m_kvalue != obj.m_kvalue)
+  {
     DBUG_PRINT("info",("m_kvalue %d != %d",m_kvalue,obj.m_kvalue));
     DBUG_RETURN(false);
   }
 
-  if(m_minLoadFactor != obj.m_minLoadFactor){
-    DBUG_PRINT("info",("m_minLoadFactor %d != %d",m_minLoadFactor,obj.m_minLoadFactor));
+  if(m_minLoadFactor != obj.m_minLoadFactor)
+  {
+    DBUG_PRINT("info",("m_minLoadFactor %d != %d",m_minLoadFactor,
+                        obj.m_minLoadFactor));
+    DBUG_RETURN(false);
+  }
+
+  if(m_maxLoadFactor != obj.m_maxLoadFactor)
+  {
+    DBUG_PRINT("info",("m_maxLoadFactor %d != %d",m_maxLoadFactor,
+                        obj.m_maxLoadFactor));
     DBUG_RETURN(false);
   }
 
-  if(m_maxLoadFactor != obj.m_maxLoadFactor){
-    DBUG_PRINT("info",("m_maxLoadFactor %d != %d",m_maxLoadFactor,obj.m_maxLoadFactor));
+  if(m_tablespace_id != obj.m_tablespace_id)
+  {
+    DBUG_PRINT("info",("m_tablespace_id %d != %d",m_tablespace_id,
+                        obj.m_tablespace_id));
     DBUG_RETURN(false);
   }
-  
-   DBUG_RETURN(true);
+
+  if(m_tablespace_version != obj.m_tablespace_version)
+  {
+    DBUG_PRINT("info",("m_tablespace_version %d != %d",m_tablespace_version,
+                        obj.m_tablespace_version));
+    DBUG_RETURN(false);
+  }
+
+  if(m_id != obj.m_id)
+  {
+    DBUG_PRINT("info",("m_id %d != %d",m_id,obj.m_id));
+    DBUG_RETURN(false);
+  }
+
+  if(m_version != obj.m_version)
+  {
+    DBUG_PRINT("info",("m_version %d != %d",m_version,obj.m_version));
+    DBUG_RETURN(false);
+  }
+
+  if(m_type != obj.m_type)
+  {
+    DBUG_PRINT("info",("m_type %d != %d",m_type,obj.m_type));
+    DBUG_RETURN(false);
+  }
+
+  if (m_type == NdbDictionary::Object::UniqueHashIndex ||
+      m_type == NdbDictionary::Object::OrderedIndex)
+  {
+    if(m_primaryTableId != obj.m_primaryTableId)
+    {
+      DBUG_PRINT("info",("m_primaryTableId %d != %d",m_primaryTableId,
+                 obj.m_primaryTableId));
+      DBUG_RETURN(false);
+    }
+    if (m_indexType != obj.m_indexType)
+    {
+      DBUG_PRINT("info",("m_indexType %d != %d",m_indexType,obj.m_indexType));
+      DBUG_RETURN(false);
+    }
+    if(strcmp(m_primaryTable.c_str(), obj.m_primaryTable.c_str()) != 0)
+    {
+      DBUG_PRINT("info",("m_primaryTable %s != %s",
+			 m_primaryTable.c_str(),obj.m_primaryTable.c_str()));
+      DBUG_RETURN(false);
+    }
+  }
+  DBUG_RETURN(true);
 }
 
 void
 NdbTableImpl::assign(const NdbTableImpl& org)
 {
-  m_tableId = org.m_tableId;
+  DBUG_ENTER("NdbColumnImpl::assign");
+  DBUG_PRINT("info", ("this: %p  &org: %p", this, &org));
+  /* m_changeMask intentionally not copied */
+  m_primaryTableId = org.m_primaryTableId;
   m_internalName.assign(org.m_internalName);
-  m_externalName.assign(org.m_externalName);
-  m_newExternalName.assign(org.m_newExternalName);
+  updateMysqlName();
+  // If the name has been explicitly set, use that name
+  // otherwise use the fetched name
+  if (!org.m_newExternalName.empty())
+    m_externalName.assign(org.m_newExternalName);
+  else
+    m_externalName.assign(org.m_externalName);
   m_frm.assign(org.m_frm.get_data(), org.m_frm.length());
-  m_fragmentType = org.m_fragmentType;
-  m_fragmentCount = org.m_fragmentCount;
+  m_ts_name.assign(org.m_ts_name.get_data(), org.m_ts_name.length());
+  m_new_ts_name.assign(org.m_new_ts_name.get_data(),
+                       org.m_new_ts_name.length());
+  m_ts.assign(org.m_ts.get_data(), org.m_ts.length());
+  m_new_ts.assign(org.m_new_ts.get_data(), org.m_new_ts.length());
+  m_fd.assign(org.m_fd.get_data(), org.m_fd.length());
+  m_new_fd.assign(org.m_new_fd.get_data(), org.m_new_fd.length());
+  m_range.assign(org.m_range.get_data(), org.m_range.length());
+  m_new_range.assign(org.m_new_range.get_data(), org.m_new_range.length());
 
-  for(unsigned i = 0; i<org.m_columns.size(); i++){
+  m_fragmentType = org.m_fragmentType;
+  /*
+    m_columnHashMask, m_columnHash, m_hashValueMask, m_hashpointerValue
+    is state calculated by computeAggregates and buildColumnHash
+  */
+  unsigned i;
+  for(i = 0; i < m_columns.size(); i++)
+  {
+    delete m_columns[i];
+  }
+  m_columns.clear();
+  for(i = 0; i < org.m_columns.size(); i++)
+  {
     NdbColumnImpl * col = new NdbColumnImpl();
     const NdbColumnImpl * iorg = org.m_columns[i];
     (* col) = (* iorg);
     m_columns.push_back(col);
   }
 
+  m_fragments = org.m_fragments;
+
+  m_linear_flag = org.m_linear_flag;
+  m_max_rows = org.m_max_rows;
+  m_default_no_part_flag = org.m_default_no_part_flag;
   m_logging = org.m_logging;
+  m_row_gci = org.m_row_gci;
+  m_row_checksum = org.m_row_checksum;
   m_kvalue = org.m_kvalue;
   m_minLoadFactor = org.m_minLoadFactor;
   m_maxLoadFactor = org.m_maxLoadFactor;
+  m_keyLenInWords = org.m_keyLenInWords;
+  m_fragmentCount = org.m_fragmentCount;
   
   if (m_index != 0)
     delete m_index;
   m_index = org.m_index;
-  
-  m_noOfDistributionKeys = org.m_noOfDistributionKeys;
+ 
+  m_primaryTable = org.m_primaryTable;
+  m_indexType = org.m_indexType;
+
   m_noOfKeys = org.m_noOfKeys;
-  m_keyLenInWords = org.m_keyLenInWords;
+  m_noOfDistributionKeys = org.m_noOfDistributionKeys;
   m_noOfBlobs = org.m_noOfBlobs;
+  m_replicaCount = org.m_replicaCount;
 
+  m_id = org.m_id;
   m_version = org.m_version;
   m_status = org.m_status;
+
+  m_tablespace_name = org.m_tablespace_name;
+  m_tablespace_id= org.m_tablespace_id;
+  m_tablespace_version = org.m_tablespace_version;
+  DBUG_VOID_RETURN;
 }
 
 void NdbTableImpl::setName(const char * name)
@@ -432,11 +750,206 @@
     return m_newExternalName.c_str();
 }
 
+void
+NdbTableImpl::computeAggregates()
+{
+  m_noOfKeys = 0;
+  m_keyLenInWords = 0;
+  m_noOfDistributionKeys = 0;
+  m_noOfBlobs = 0;
+  Uint32 i, n;
+  for (i = 0; i < m_columns.size(); i++) {
+    NdbColumnImpl* col = m_columns[i];
+    if (col->m_pk) {
+      m_noOfKeys++;
+      m_keyLenInWords += (col->m_attrSize * col->m_arraySize + 3) / 4;
+    }
+    if (col->m_distributionKey == 2)    // set by user
+      m_noOfDistributionKeys++;
+    
+    if (col->getBlobType())
+      m_noOfBlobs++;
+    col->m_keyInfoPos = ~0;
+  }
+  if (m_noOfDistributionKeys == m_noOfKeys) {
+    // all is none!
+    m_noOfDistributionKeys = 0;
+  }
+
+  if (m_noOfDistributionKeys == 0) 
+  {
+    // none is all!
+    for (i = 0, n = m_noOfKeys; n != 0; i++) {
+      NdbColumnImpl* col = m_columns[i];
+      if (col->m_pk) {
+        col->m_distributionKey = true;  // set by us
+        n--;
+      }
+    }
+  }
+  else 
+  {
+    for (i = 0, n = m_noOfKeys; n != 0; i++) {
+      NdbColumnImpl* col = m_columns[i];
+      if (col->m_pk)
+      {
+	if(col->m_distributionKey == 1)
+	  col->m_distributionKey = 0;  
+        n--;
+      }
+    }
+  }
+  
+  Uint32 keyInfoPos = 0;
+  for (i = 0, n = m_noOfKeys; n != 0; i++) {
+    NdbColumnImpl* col = m_columns[i];
+    if (col->m_pk) {
+      col->m_keyInfoPos = keyInfoPos++;
+      n--;
+    }
+  }
+}
+
+const void*
+NdbTableImpl::getTablespaceNames() const
+{
+  if (m_new_ts_name.empty())
+    return m_ts_name.get_data();
+  else
+    return m_new_ts_name.get_data();
+}
+
+Uint32
+NdbTableImpl::getTablespaceNamesLen() const
+{
+  if (m_new_ts_name.empty())
+    return m_ts_name.length();
+  else
+    return m_new_ts_name.length();
+}
+
+void NdbTableImpl::setTablespaceNames(const void *data, Uint32 len)
+{
+  m_new_ts_name.assign(data, len);
+}
+
+void NdbTableImpl::setFragmentCount(Uint32 count)
+{
+  m_fragmentCount= count;
+}
+
+Uint32 NdbTableImpl::getFragmentCount() const
+{
+  return m_fragmentCount;
+}
+
+void NdbTableImpl::setFrm(const void* data, Uint32 len)
+{
+  m_newFrm.assign(data, len);
+}
+
+const void * 
+NdbTableImpl::getFrmData() const
+{
+  if (m_newFrm.empty())
+    return m_frm.get_data();
+  else
+    return m_newFrm.get_data();
+}
+
+Uint32
+NdbTableImpl::getFrmLength() const 
+{
+  if (m_newFrm.empty())
+    return m_frm.length();
+  else
+    return m_newFrm.length();
+}
+
+void NdbTableImpl::setFragmentData(const void* data, Uint32 len)
+{
+  m_new_fd.assign(data, len);
+}
+
+const void * 
+NdbTableImpl::getFragmentData() const
+{
+  if (m_new_fd.empty())
+    return m_fd.get_data();
+  else
+    return m_new_fd.get_data();
+}
+
+Uint32
+NdbTableImpl::getFragmentDataLen() const 
+{
+  if (m_new_fd.empty())
+    return m_fd.length();
+  else
+    return m_new_fd.length();
+}
+
+void NdbTableImpl::setTablespaceData(const void* data, Uint32 len)
+{
+  m_new_ts.assign(data, len);
+}
+
+const void * 
+NdbTableImpl::getTablespaceData() const
+{
+  if (m_new_ts.empty())
+    return m_ts.get_data();
+  else
+    return m_new_ts.get_data();
+}
+
+Uint32
+NdbTableImpl::getTablespaceDataLen() const 
+{
+  if (m_new_ts.empty())
+    return m_ts.length();
+  else
+    return m_new_ts.length();
+}
+
+void NdbTableImpl::setRangeListData(const void* data, Uint32 len)
+{
+  m_new_range.assign(data, len);
+}
+
+const void * 
+NdbTableImpl::getRangeListData() const
+{
+  if (m_new_range.empty())
+    return m_range.get_data();
+  else
+    return m_new_range.get_data();
+}
+
+Uint32
+NdbTableImpl::getRangeListDataLen() const 
+{
+  if (m_new_range.empty())
+    return m_range.length();
+  else
+    return m_new_range.length();
+}
+
+void
+NdbTableImpl::updateMysqlName()
+{
+  Vector<BaseString> v;
+  if (m_internalName.split(v,"/") == 3)
+  {
+    m_mysqlName.assfmt("%s/%s",v[0].c_str(),v[2].c_str());
+    return;
+  }
+  m_mysqlName.assign("");
+}
 
 void
 NdbTableImpl::buildColumnHash(){
   const Uint32 size = m_columns.size();
-
   int i;
   for(i = 31; i >= 0; i--){
     if(((1 << i) & size) != 0){
@@ -503,19 +1016,35 @@
 Uint32
 NdbTableImpl::get_nodes(Uint32 hashValue, const Uint16 ** nodes) const
 {
-  if(m_replicaCount > 0)
+  Uint32 fragmentId;
+  if(m_replicaCount == 0)
+    return 0;
+  switch (m_fragmentType)
   {
-    Uint32 fragmentId = hashValue & m_hashValueMask;
-    if(fragmentId < m_hashpointerValue) 
+    case NdbDictionary::Object::FragAllSmall:
+    case NdbDictionary::Object::FragAllMedium:
+    case NdbDictionary::Object::FragAllLarge:
+    case NdbDictionary::Object::FragSingle:
+    case NdbDictionary::Object::DistrKeyLin:
     {
-      fragmentId = hashValue & ((m_hashValueMask << 1) + 1);
+      fragmentId = hashValue & m_hashValueMask;
+      if(fragmentId < m_hashpointerValue) 
+        fragmentId = hashValue & ((m_hashValueMask << 1) + 1);
+      break;
     }
-    Uint32 pos = fragmentId * m_replicaCount;
-    if(pos + m_replicaCount <= m_fragments.size())
+    case NdbDictionary::Object::DistrKeyHash:
     {
-      * nodes = m_fragments.getBase()+pos;
-      return m_replicaCount;
+      fragmentId = hashValue % m_fragmentCount;
+      break;
     }
+    default:
+      return 0;
+  }
+  Uint32 pos = fragmentId * m_replicaCount;
+  if (pos + m_replicaCount <= m_fragments.size())
+  {
+    *nodes = m_fragments.getBase()+pos;
+    return m_replicaCount;
   }
   return 0;
 }
@@ -525,23 +1054,23 @@
  */
 
 NdbIndexImpl::NdbIndexImpl() : 
-  NdbDictionary::Index(* this), 
-  m_facade(this)
+  NdbDictionary::Index(* this),
+  NdbDictObjectImpl(NdbDictionary::Object::OrderedIndex), m_facade(this)
 {
   init();
 }
 
 NdbIndexImpl::NdbIndexImpl(NdbDictionary::Index & f) : 
   NdbDictionary::Index(* this), 
-  m_facade(&f)
+  NdbDictObjectImpl(NdbDictionary::Object::OrderedIndex), m_facade(&f)
 {
   init();
 }
 
 void NdbIndexImpl::init()
 {
-  m_indexId= RNIL;
-  m_type= NdbDictionary::Index::Undefined;
+  m_id= RNIL;
+  m_type= NdbDictionary::Object::TypeUndefined;
   m_logging= true;
   m_table= NULL;
 }
@@ -586,54 +1115,86 @@
 
 NdbEventImpl::NdbEventImpl() : 
   NdbDictionary::Event(* this),
-  m_facade(this)
+  NdbDictObjectImpl(NdbDictionary::Object::TypeUndefined), m_facade(this)
 {
+  DBUG_ENTER("NdbEventImpl::NdbEventImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 NdbEventImpl::NdbEventImpl(NdbDictionary::Event & f) : 
   NdbDictionary::Event(* this),
-  m_facade(&f)
+  NdbDictObjectImpl(NdbDictionary::Object::TypeUndefined), m_facade(&f)
 {
+  DBUG_ENTER("NdbEventImpl::NdbEventImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   init();
+  DBUG_VOID_RETURN;
 }
 
 void NdbEventImpl::init()
 {
   m_eventId= RNIL;
   m_eventKey= RNIL;
-  m_tableId= RNIL;
   mi_type= 0;
   m_dur= NdbDictionary::Event::ED_UNDEFINED;
+  m_mergeEvents = false;
   m_tableImpl= NULL;
-  m_bufferId= RNIL;
-  eventOp= NULL;
+  m_rep= NdbDictionary::Event::ER_UPDATED;
 }
 
 NdbEventImpl::~NdbEventImpl()
 {
+  DBUG_ENTER("NdbEventImpl::~NdbEventImpl");
+  DBUG_PRINT("info", ("this: %p", this));
   for (unsigned i = 0; i < m_columns.size(); i++)
     delete  m_columns[i];
+  if (m_tableImpl)
+    delete m_tableImpl;
+  DBUG_VOID_RETURN;
 }
 
 void NdbEventImpl::setName(const char * name)
 {
-  m_externalName.assign(name);
+  m_name.assign(name);
 }
 
 const char *NdbEventImpl::getName() const
 {
-  return m_externalName.c_str();
+  return m_name.c_str();
 }
 
 void 
 NdbEventImpl::setTable(const NdbDictionary::Table& table)
 {
-  m_tableImpl= &NdbTableImpl::getImpl(table);
+  setTable(&NdbTableImpl::getImpl(table));
   m_tableName.assign(m_tableImpl->getName());
 }
 
 void 
+NdbEventImpl::setTable(NdbTableImpl *tableImpl)
+{
+  DBUG_ENTER("NdbEventImpl::setTable");
+  DBUG_PRINT("info", ("this: %p  tableImpl: %p", this, tableImpl));
+  DBUG_ASSERT(tableImpl->m_status != NdbDictionary::Object::Invalid);
+  if (!m_tableImpl) 
+    m_tableImpl = new NdbTableImpl();
+  // Copy table, since event might be accessed from different threads
+  m_tableImpl->assign(*tableImpl);
+  DBUG_VOID_RETURN;
+}
+
+const NdbDictionary::Table *
+NdbEventImpl::getTable() const
+{
+  if (m_tableImpl) 
+    return m_tableImpl->m_facade;
+  else
+    return NULL;
+}
+
+void 
 NdbEventImpl::setTable(const char * table)
 {
   m_tableName.assign(table);
@@ -648,12 +1209,13 @@
 void
 NdbEventImpl::addTableEvent(const NdbDictionary::Event::TableEvent t =  NdbDictionary::Event::TE_ALL)
 {
-  switch (t) {
-  case NdbDictionary::Event::TE_INSERT : mi_type |= 1; break;
-  case NdbDictionary::Event::TE_DELETE : mi_type |= 2; break;
-  case NdbDictionary::Event::TE_UPDATE : mi_type |= 4; break;
-  default: mi_type = 4 | 2 | 1; // all types
-  }
+  mi_type |= (unsigned)t;
+}
+
+bool
+NdbEventImpl::getTableEvent(const NdbDictionary::Event::TableEvent t) const
+{
+  return (mi_type & (unsigned)t) == (unsigned)t;
 }
 
 void
@@ -668,11 +1230,46 @@
   return m_dur;
 }
 
+void
+NdbEventImpl::setReport(NdbDictionary::Event::EventReport r)
+{
+  m_rep = r;
+}
+
+NdbDictionary::Event::EventReport
+NdbEventImpl::getReport() const
+{
+  return m_rep;
+}
+
 int NdbEventImpl::getNoOfEventColumns() const
 {
   return m_attrIds.size() + m_columns.size();
 }
 
+const NdbDictionary::Column *
+NdbEventImpl::getEventColumn(unsigned no) const
+{
+  if (m_columns.size())
+  {
+    if (no < m_columns.size())
+    {
+      return m_columns[no];
+    }
+  }
+  else if (m_attrIds.size())
+  {
+    if (no < m_attrIds.size())
+    {
+      NdbTableImpl* tab= m_tableImpl;
+      if (tab == 0)
+        return 0;
+      return tab->getColumn(m_attrIds[no]);
+    }
+  }
+  return 0;
+}
+
 /**
  * NdbDictionaryImpl
  */
@@ -721,12 +1318,20 @@
       delete NdbDictionary::Column::COMMIT_COUNT;
       delete NdbDictionary::Column::ROW_SIZE;
       delete NdbDictionary::Column::RANGE_NO;
+      delete NdbDictionary::Column::DISK_REF;
+      delete NdbDictionary::Column::RECORDS_IN_RANGE;
+      delete NdbDictionary::Column::ROWID;
+      delete NdbDictionary::Column::ROW_GCI;
       NdbDictionary::Column::FRAGMENT= 0;
       NdbDictionary::Column::FRAGMENT_MEMORY= 0;
       NdbDictionary::Column::ROW_COUNT= 0;
       NdbDictionary::Column::COMMIT_COUNT= 0;
       NdbDictionary::Column::ROW_SIZE= 0;
       NdbDictionary::Column::RANGE_NO= 0;
+      NdbDictionary::Column::DISK_REF= 0;
+      NdbDictionary::Column::RECORDS_IN_RANGE= 0;
+      NdbDictionary::Column::ROWID= 0;
+      NdbDictionary::Column::ROW_GCI= 0;
     }
     m_globalHash->unlock();
   } else {
@@ -734,36 +1339,125 @@
   }
 }
 
-Ndb_local_table_info *
-NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
+NdbTableImpl *
+NdbDictionaryImpl::fetchGlobalTableImplRef(const GlobalCacheInitObject &obj)
 {
+  DBUG_ENTER("fetchGlobalTableImplRef");
   NdbTableImpl *impl;
 
   m_globalHash->lock();
-  impl = m_globalHash->get(internalTableName.c_str());
+  impl = m_globalHash->get(obj.m_name.c_str());
   m_globalHash->unlock();
 
   if (impl == 0){
-    impl = m_receiver.getTable(internalTableName,
+    impl = m_receiver.getTable(obj.m_name.c_str(),
 			       m_ndb.usingFullyQualifiedNames());
+    if (impl != 0 && obj.init(*impl))
+    {
+      delete impl;
+      impl = 0;
+    }
     m_globalHash->lock();
-    m_globalHash->put(internalTableName.c_str(), impl);
+    m_globalHash->put(obj.m_name.c_str(), impl);
     m_globalHash->unlock();
-    
-    if(impl == 0){
-      return 0;
-    }
   }
 
+  DBUG_RETURN(impl);
+}
+
+void
+NdbDictionaryImpl::putTable(NdbTableImpl *impl)
+{
+  NdbTableImpl *old;
+
+  int ret = getBlobTables(*impl);
+  assert(ret == 0);
+
+  m_globalHash->lock();
+  if ((old= m_globalHash->get(impl->m_internalName.c_str())))
+  {
+    m_globalHash->alter_table_rep(old->m_internalName.c_str(),
+                                  impl->m_id,
+                                  impl->m_version,
+                                  FALSE);
+  }
+  m_globalHash->put(impl->m_internalName.c_str(), impl);
+  m_globalHash->unlock();
   Ndb_local_table_info *info=
     Ndb_local_table_info::create(impl, m_local_table_data_size);
-
-  m_localHash.put(internalTableName.c_str(), info);
+  
+  m_localHash.put(impl->m_internalName.c_str(), info);
 
   m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
   m_ndb.theLastTupleId[impl->getTableId()]  = ~0;
-  
-  return info;
+}
+
+int
+NdbDictionaryImpl::getBlobTables(NdbTableImpl &t)
+{
+  unsigned n= t.m_noOfBlobs;
+  DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
+  // optimized for blob column being the last one
+  // and not looking for more than one if not neccessary
+  for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
+    i--;
+    NdbColumnImpl & c = *t.m_columns[i];
+    if (! c.getBlobType() || c.getPartSize() == 0)
+      continue;
+    n--;
+    // retrieve blob table def from DICT - by-pass cache
+    char btname[NdbBlobImpl::BlobTableNameSize];
+    NdbBlob::getBlobTableName(btname, &t, &c);
+    BaseString btname_internal = m_ndb.internalize_table_name(btname);
+    NdbTableImpl* bt =
+      m_receiver.getTable(btname_internal, m_ndb.usingFullyQualifiedNames());
+    if (bt == NULL)
+      DBUG_RETURN(-1);
+
+    // TODO check primary id/version when returned by DICT
+
+    // the blob column owns the blob table
+    assert(c.m_blobTable == NULL);
+    c.m_blobTable = bt;
+  }
+  DBUG_RETURN(0); 
+}
+
+NdbTableImpl*
+NdbDictionaryImpl::getBlobTable(const NdbTableImpl& tab, uint col_no)
+{
+  if (col_no < tab.m_columns.size()) {
+    NdbColumnImpl* col = tab.m_columns[col_no];
+    if (col != NULL) {
+      NdbTableImpl* bt = col->m_blobTable;
+      if (bt != NULL)
+        return bt;
+      else
+        m_error.code = 4273; // No blob table..
+    } else
+      m_error.code = 4249; // Invalid table..
+  } else
+    m_error.code = 4318; // Invalid attribute..
+  return NULL;
+}
+
+NdbTableImpl*
+NdbDictionaryImpl::getBlobTable(uint tab_id, uint col_no)
+{
+  DBUG_ENTER("NdbDictionaryImpl::getBlobTable");
+  DBUG_PRINT("enter", ("tab_id: %u col_no %u", tab_id, col_no));
+
+  NdbTableImpl* tab = m_receiver.getTable(tab_id,
+                                          m_ndb.usingFullyQualifiedNames());
+  if (tab == NULL)
+    DBUG_RETURN(NULL);
+  Ndb_local_table_info* info =
+    get_local_table_info(tab->m_internalName);
+  delete tab;
+  if (info == NULL)
+    DBUG_RETURN(NULL);
+  NdbTableImpl* bt = getBlobTable(*info->m_table_impl, col_no);
+  DBUG_RETURN(bt);
 }
 
 #if 0
@@ -788,17 +1482,25 @@
     m_globalHash->lock();
     if(f_dictionary_count++ == 0){
       NdbDictionary::Column::FRAGMENT= 
-	NdbColumnImpl::create_psuedo("NDB$FRAGMENT");
+	NdbColumnImpl::create_pseudo("NDB$FRAGMENT");
       NdbDictionary::Column::FRAGMENT_MEMORY= 
-	NdbColumnImpl::create_psuedo("NDB$FRAGMENT_MEMORY");
+	NdbColumnImpl::create_pseudo("NDB$FRAGMENT_MEMORY");
       NdbDictionary::Column::ROW_COUNT= 
-	NdbColumnImpl::create_psuedo("NDB$ROW_COUNT");
+	NdbColumnImpl::create_pseudo("NDB$ROW_COUNT");
       NdbDictionary::Column::COMMIT_COUNT= 
-	NdbColumnImpl::create_psuedo("NDB$COMMIT_COUNT");
+	NdbColumnImpl::create_pseudo("NDB$COMMIT_COUNT");
       NdbDictionary::Column::ROW_SIZE=
-	NdbColumnImpl::create_psuedo("NDB$ROW_SIZE");
+	NdbColumnImpl::create_pseudo("NDB$ROW_SIZE");
       NdbDictionary::Column::RANGE_NO= 
-	NdbColumnImpl::create_psuedo("NDB$RANGE_NO");
+	NdbColumnImpl::create_pseudo("NDB$RANGE_NO");
+      NdbDictionary::Column::DISK_REF= 
+	NdbColumnImpl::create_pseudo("NDB$DISK_REF");
+      NdbDictionary::Column::RECORDS_IN_RANGE= 
+	NdbColumnImpl::create_pseudo("NDB$RECORDS_IN_RANGE");
+      NdbDictionary::Column::ROWID= 
+	NdbColumnImpl::create_pseudo("NDB$ROWID");
+      NdbDictionary::Column::ROW_GCI= 
+	NdbColumnImpl::create_pseudo("NDB$ROW_GCI");
     }
     m_globalHash->unlock();
     return true;
@@ -912,12 +1614,6 @@
   case GSN_SUB_START_REF:
     tmp->execSUB_START_REF(signal, ptr);
     break;
-  case GSN_SUB_TABLE_DATA:
-    tmp->execSUB_TABLE_DATA(signal, ptr);
-    break;
-  case GSN_SUB_GCP_COMPLETE_REP:
-    tmp->execSUB_GCP_COMPLETE_REP(signal, ptr);
-    break;
   case GSN_SUB_STOP_CONF:
     tmp->execSUB_STOP_CONF(signal, ptr);
     break;
@@ -933,6 +1629,36 @@
   case GSN_LIST_TABLES_CONF:
     tmp->execLIST_TABLES_CONF(signal, ptr);
     break;
+  case GSN_CREATE_FILEGROUP_REF:
+    tmp->execCREATE_FILEGROUP_REF(signal, ptr);
+    break;
+  case GSN_CREATE_FILEGROUP_CONF:
+    tmp->execCREATE_FILEGROUP_CONF(signal, ptr);
+    break;
+  case GSN_CREATE_FILE_REF:
+    tmp->execCREATE_FILE_REF(signal, ptr);
+    break;
+  case GSN_CREATE_FILE_CONF:
+    tmp->execCREATE_FILE_CONF(signal, ptr);
+    break;
+  case GSN_DROP_FILEGROUP_REF:
+    tmp->execDROP_FILEGROUP_REF(signal, ptr);
+    break;
+  case GSN_DROP_FILEGROUP_CONF:
+    tmp->execDROP_FILEGROUP_CONF(signal, ptr);
+    break;
+  case GSN_DROP_FILE_REF:
+    tmp->execDROP_FILE_REF(signal, ptr);
+    break;
+  case GSN_DROP_FILE_CONF:
+    tmp->execDROP_FILE_CONF(signal, ptr);
+    break;
+  case GSN_WAIT_GCP_CONF:
+    tmp->execWAIT_GCP_CONF(signal, ptr);
+    break;
+  case GSN_WAIT_GCP_REF:
+    tmp->execWAIT_GCP_REF(signal, ptr);
+    break;
   default:
     abort();
   }
@@ -954,72 +1680,56 @@
 }
 
 int
-NdbDictInterface::dictSignal(NdbApiSignal* signal, 
-			     LinearSectionPtr ptr[3],int noLSP,
-			     const int useMasterNodeId,
-			     const Uint32 RETRIES,
-			     const WaitSignalType wst,
-			     const int theWait,
-			     const int *errcodes,
-			     const int noerrcodes,
-			     const int temporaryMask)
+NdbDictInterface::dictSignal(NdbApiSignal* sig, 
+			     LinearSectionPtr ptr[3], int secs,
+			     int node_specification,
+			     WaitSignalType wst,
+			     int timeout, Uint32 RETRIES,
+			     const int *errcodes, int temporaryMask)
 {
   DBUG_ENTER("NdbDictInterface::dictSignal");
-  DBUG_PRINT("enter", ("useMasterNodeId: %d", useMasterNodeId));
+  DBUG_PRINT("enter", ("useMasterNodeId: %d", node_specification));
   for(Uint32 i = 0; i<RETRIES; i++){
-    //if (useMasterNodeId == 0)
     m_buffer.clear();
 
     // Protected area
-    m_transporter->lock_mutex();
-    Uint32 aNodeId;
-    if (useMasterNodeId) {
-      if ((m_masterNodeId == 0) ||
-	  (!m_transporter->get_node_alive(m_masterNodeId))) {
-	m_masterNodeId = m_transporter->get_an_alive_node();
-      }//if
-      aNodeId = m_masterNodeId;
-    } else {
-      aNodeId = m_transporter->get_an_alive_node();
+    /*
+      The PollGuard has an implicit call of unlock_and_signal through the
+      ~PollGuard method. This method is called implicitly by the compiler
+      in all places where the object is out of context due to a return,
+      break, continue or simply end of statement block
+    */
+    PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
+    Uint32 node;
+    switch(node_specification){
+    case 0:
+      node = (m_transporter->get_node_alive(m_masterNodeId) ? m_masterNodeId :
+	      (m_masterNodeId = m_transporter->get_an_alive_node()));
+      break;
+    case -1:
+      node = m_transporter->get_an_alive_node();
+      break;
+    default:
+      node = node_specification;
     }
-    if(aNodeId == 0){
+    DBUG_PRINT("info", ("node %d", node));
+    if(node == 0){
       m_error.code= 4009;
-      m_transporter->unlock_mutex();
       DBUG_RETURN(-1);
     }
-    {
-      int r;
-      if (ptr) {
-#ifdef EVENT_DEBUG
-	printf("Long signal %d ptr", noLSP);
-	for (int q=0;q<noLSP;q++) {
-	  printf(" sz %d", ptr[q].sz);
-	}
-	printf("\n");
-#endif
-	r = m_transporter->sendFragmentedSignal(signal, aNodeId, ptr, noLSP);
-      } else {
-#ifdef EVENT_DEBUG
-	printf("Short signal\n");
-#endif
-	r = m_transporter->sendSignal(signal, aNodeId);
-      }
-      if(r != 0){
-	m_transporter->unlock_mutex();
-	continue;
-      }
-    }
+    int res = (ptr ? 
+	       m_transporter->sendFragmentedSignal(sig, node, ptr, secs):
+	       m_transporter->sendSignal(sig, node));
+    if(res != 0){
+      DBUG_PRINT("info", ("dictSignal failed to send signal"));
+      continue;
+    }    
     
     m_error.code= 0;
-    
-    m_waiter.m_node = aNodeId;
-    m_waiter.m_state = wst;
-
-    m_waiter.wait(theWait);
-    m_transporter->unlock_mutex();    
+    int ret_val= poll_guard.wait_n_unlock(timeout, node, wst);
     // End of Protected area  
     
-    if(m_waiter.m_state == NO_WAIT && m_error.code == 0){
+    if(ret_val == 0 && m_error.code == 0){
       // Normal return
       DBUG_RETURN(0);
     }
@@ -1027,34 +1737,44 @@
     /**
      * Handle error codes
      */
-    if(m_waiter.m_state == WAIT_NODE_FAILURE)
+    if(ret_val == -2) //WAIT_NODE_FAILURE
+    {
       continue;
-
+    }
     if(m_waiter.m_state == WST_WAIT_TIMEOUT)
     {
+      DBUG_PRINT("info", ("dictSignal caught time-out"));
       m_error.code = 4008;
       DBUG_RETURN(-1);
     }
     
-    if ( (temporaryMask & m_error.code) != 0 ) {
+    if ( temporaryMask == -1)
+    {
+      const NdbError &error= getNdbError();
+      if (error.status ==  NdbError::TemporaryError)
+	continue;
+    }
+    else if ( (temporaryMask & m_error.code) != 0 ) {
       continue;
     }
-    if (errcodes) {
-      int doContinue = 0;
-      for (int j=0; j < noerrcodes; j++)
-	if(m_error.code == errcodes[j]) {
-	  doContinue = 1;
+    DBUG_PRINT("info", ("dictSignal caught error= %d", m_error.code));
+    
+    if(m_error.code && errcodes)
+    {
+      int j;
+      for(j = 0; errcodes[j] ; j++){
+	if(m_error.code == errcodes[j]){
 	  break;
 	}
-      if (doContinue)
+      }
+      if(errcodes[j]) // Accepted error code
 	continue;
     }
-
-    DBUG_RETURN(-1);
+    break;
   }
   DBUG_RETURN(-1);
 }
-#if 0
+
 /*
   Get dictionary information for a table using table id as reference
 
@@ -1065,8 +1785,8 @@
 NdbDictInterface::getTable(int tableId, bool fullyQualifiedNames)
 {
   NdbApiSignal tSignal(m_reference);
-  GetTabInfoReq* const req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
-
+  GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
+  
   req->senderRef = m_reference;
   req->senderData = 0;
   req->requestType =
@@ -1078,8 +1798,6 @@
 
   return getTable(&tSignal, 0, 0, fullyQualifiedNames);
 }
-#endif
-
 
 /*
   Get dictionary information for a table using table name as the reference
@@ -1130,22 +1848,23 @@
 			   LinearSectionPtr ptr[3],
 			   Uint32 noOfSections, bool fullyQualifiedNames)
 {
-  int errCodes[] = {GetTabInfoRef::Busy };
-
-  int r = dictSignal(signal,ptr,noOfSections,
-		     0/*do not use masternode id*/,
-		     100,
+  int errCodes[] = {GetTabInfoRef::Busy, 0 };
+  int r = dictSignal(signal, ptr, noOfSections,
+		     -1, // any node
 		     WAIT_GET_TAB_INFO_REQ,
-		     WAITFOR_RESPONSE_TIMEOUT,
-		     errCodes, 1);
-  if (r) return 0;
+		     DICT_WAITFOR_TIMEOUT, 100, errCodes);
 
+  if (r)
+    return 0;
+  
   NdbTableImpl * rt = 0;
-  m_error.code= parseTableInfo(&rt, 
-			       (Uint32*)m_buffer.get_data(), 
-			       m_buffer.length() / 4, fullyQualifiedNames);
-  if (rt != 0)
+  m_error.code = parseTableInfo(&rt, 
+				(Uint32*)m_buffer.get_data(), 
+  				m_buffer.length() / 4, 
+				fullyQualifiedNames);
+  if(rt)
     rt->buildColumnHash();
+  
   return rt;
 }
 
@@ -1177,8 +1896,9 @@
 NdbDictInterface::execGET_TABINFO_REF(NdbApiSignal * signal,
 				      LinearSectionPtr ptr[3])
 {
-  const GetTabInfoRef* ref = CAST_CONSTPTR(GetTabInfoRef, signal->getDataPtr());
-
+  const GetTabInfoRef* ref = CAST_CONSTPTR(GetTabInfoRef, 
+					   signal->getDataPtr());
+  
   m_error.code= ref->errorCode;
   m_waiter.signal(NO_WAIT);
 }
@@ -1226,6 +1946,9 @@
   { DictTabInfo::AllNodesMediumTable, NdbDictionary::Object::FragAllMedium },
   { DictTabInfo::AllNodesLargeTable,  NdbDictionary::Object::FragAllLarge },
   { DictTabInfo::SingleFragment,      NdbDictionary::Object::FragSingle },
+  { DictTabInfo::DistrKeyHash,      NdbDictionary::Object::DistrKeyHash },
+  { DictTabInfo::DistrKeyLin,      NdbDictionary::Object::DistrKeyLin },
+  { DictTabInfo::UserDefined,      NdbDictionary::Object::UserDefined },
   { -1, -1 }
 };
 
@@ -1240,6 +1963,10 @@
   { DictTabInfo::IndexTrigger,       NdbDictionary::Object::IndexTrigger },
   { DictTabInfo::SubscriptionTrigger,NdbDictionary::Object::SubscriptionTrigger },
   { DictTabInfo::ReadOnlyConstraint ,NdbDictionary::Object::ReadOnlyConstraint },
+  { DictTabInfo::Tablespace,         NdbDictionary::Object::Tablespace },
+  { DictTabInfo::LogfileGroup,       NdbDictionary::Object::LogfileGroup },
+  { DictTabInfo::Datafile,           NdbDictionary::Object::Datafile },
+  { DictTabInfo::Undofile,           NdbDictionary::Object::Undofile },
   { -1, -1 }
 };
 
@@ -1274,62 +2001,86 @@
 int
 NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
 				 const Uint32 * data, Uint32 len,
-				 bool fullyQualifiedNames)
+				 bool fullyQualifiedNames,
+                                 Uint32 version)
 {
-  DBUG_ENTER("NdbDictInterface::parseTableInfo");
-
   SimplePropertiesLinearReader it(data, len);
-  DictTabInfo::Table tableDesc; tableDesc.init();
+  DictTabInfo::Table *tableDesc;
   SimpleProperties::UnpackStatus s;
-  s = SimpleProperties::unpack(it, &tableDesc, 
+  DBUG_ENTER("NdbDictInterface::parseTableInfo");
+
+  tableDesc = (DictTabInfo::Table*)NdbMem_Allocate(sizeof(DictTabInfo::Table));
+  if (!tableDesc)
+  {
+    DBUG_RETURN(4000);
+  }
+  tableDesc->init();
+  s = SimpleProperties::unpack(it, tableDesc, 
 			       DictTabInfo::TableMapping, 
 			       DictTabInfo::TableMappingSize, 
 			       true, true);
   
   if(s != SimpleProperties::Break){
+    NdbMem_Free((void*)tableDesc);
     DBUG_RETURN(703);
   }
-  const char * internalName = tableDesc.TableName;
+  const char * internalName = tableDesc->TableName;
   const char * externalName = Ndb::externalizeTableName(internalName, fullyQualifiedNames);
 
   NdbTableImpl * impl = new NdbTableImpl();
-  impl->m_tableId = tableDesc.TableId;
-  impl->m_version = tableDesc.TableVersion;
+  impl->m_id = tableDesc->TableId;
+  impl->m_version = tableDesc->TableVersion;
   impl->m_status = NdbDictionary::Object::Retrieved;
   impl->m_internalName.assign(internalName);
+  impl->updateMysqlName();
   impl->m_externalName.assign(externalName);
 
-  impl->m_frm.assign(tableDesc.FrmData, tableDesc.FrmLen);
+  impl->m_frm.assign(tableDesc->FrmData, tableDesc->FrmLen);
+  impl->m_fd.assign(tableDesc->FragmentData, tableDesc->FragmentDataLen);
+  impl->m_range.assign(tableDesc->RangeListData, tableDesc->RangeListDataLen);
+  impl->m_fragmentCount = tableDesc->FragmentCount;
+
+  /*
+    We specifically don't get tablespace data and range/list arrays here
+    since those are known by the MySQL Server through analysing the
+    frm file.
+    Fragment Data contains the real node group mapping and the fragment
+    identities used for each fragment. At the moment we have no need for
+    this.
+    Frm file is needed for autodiscovery.
+  */
   
   impl->m_fragmentType = (NdbDictionary::Object::FragmentType)
-    getApiConstant(tableDesc.FragmentType, 
+    getApiConstant(tableDesc->FragmentType, 
 		   fragmentTypeMapping, 
 		   (Uint32)NdbDictionary::Object::FragUndefined);
   
-  impl->m_logging = tableDesc.TableLoggedFlag;
-  impl->m_kvalue = tableDesc.TableKValue;
-  impl->m_minLoadFactor = tableDesc.MinLoadFactor;
-  impl->m_maxLoadFactor = tableDesc.MaxLoadFactor;
+  Uint64 max_rows = ((Uint64)tableDesc->MaxRowsHigh) << 32;
+  max_rows += tableDesc->MaxRowsLow;
+  impl->m_max_rows = max_rows;
+  impl->m_default_no_part_flag = tableDesc->DefaultNoPartFlag;
+  impl->m_linear_flag = tableDesc->LinearHashFlag;
+  impl->m_logging = tableDesc->TableLoggedFlag;
+  impl->m_row_gci = tableDesc->RowGCIFlag;
+  impl->m_row_checksum = tableDesc->RowChecksumFlag;
+  impl->m_kvalue = tableDesc->TableKValue;
+  impl->m_minLoadFactor = tableDesc->MinLoadFactor;
+  impl->m_maxLoadFactor = tableDesc->MaxLoadFactor;
 
-  impl->m_indexType = (NdbDictionary::Index::Type)
-    getApiConstant(tableDesc.TableType,
+  impl->m_indexType = (NdbDictionary::Object::Type)
+    getApiConstant(tableDesc->TableType,
 		   indexTypeMapping,
-		   NdbDictionary::Index::Undefined);
+		   NdbDictionary::Object::TypeUndefined);
   
-  if(impl->m_indexType == NdbDictionary::Index::Undefined){
+  if(impl->m_indexType == NdbDictionary::Object::TypeUndefined){
   } else {
     const char * externalPrimary = 
-      Ndb::externalizeTableName(tableDesc.PrimaryTable, fullyQualifiedNames);
+      Ndb::externalizeTableName(tableDesc->PrimaryTable, fullyQualifiedNames);
     impl->m_primaryTable.assign(externalPrimary);
   }
   
-  Uint32 keyInfoPos = 0;
-  Uint32 keyCount = 0;
-  Uint32 blobCount = 0;
-  Uint32 distKeys = 0;
-  
   Uint32 i;
-  for(i = 0; i < tableDesc.NoOfAttributes; i++) {
+  for(i = 0; i < tableDesc->NoOfAttributes; i++) {
     DictTabInfo::Attribute attrDesc; attrDesc.init();
     s = SimpleProperties::unpack(it, 
 				 &attrDesc, 
@@ -1338,6 +2089,7 @@
 				 true, true);
     if(s != SimpleProperties::Break){
       delete impl;
+      NdbMem_Free((void*)tableDesc);
       DBUG_RETURN(703);
     }
     
@@ -1348,6 +2100,7 @@
     // check type and compute attribute size and array size
     if (! attrDesc.translateExtType()) {
       delete impl;
+      NdbMem_Free((void*)tableDesc);
       DBUG_RETURN(703);
     }
     col->m_type = (NdbDictionary::Column::Type)attrDesc.AttributeExtType;
@@ -1359,69 +2112,52 @@
     // charset is defined exactly for char types
     if (col->getCharType() != (cs_number != 0)) {
       delete impl;
+      NdbMem_Free((void*)tableDesc);
       DBUG_RETURN(703);
     }
     if (col->getCharType()) {
       col->m_cs = get_charset(cs_number, MYF(0));
       if (col->m_cs == NULL) {
         delete impl;
+        NdbMem_Free((void*)tableDesc);
         DBUG_RETURN(743);
       }
     }
     col->m_attrSize = (1 << attrDesc.AttributeSize) / 8;
     col->m_arraySize = attrDesc.AttributeArraySize;
+    col->m_arrayType = attrDesc.AttributeArrayType;
     if(attrDesc.AttributeSize == 0)
     {
       col->m_attrSize = 4;
       col->m_arraySize = (attrDesc.AttributeArraySize + 31) >> 5;
     }
+    col->m_storageType = attrDesc.AttributeStorageType;
     
     col->m_pk = attrDesc.AttributeKeyFlag;
-    col->m_distributionKey = attrDesc.AttributeDKey;
+    col->m_distributionKey = attrDesc.AttributeDKey ? 2 : 0;
     col->m_nullable = attrDesc.AttributeNullableFlag;
     col->m_autoIncrement = (attrDesc.AttributeAutoIncrement ? true : false);
     col->m_autoIncrementInitialValue = ~0;
     col->m_defaultValue.assign(attrDesc.AttributeDefaultValue);
 
-    if(attrDesc.AttributeKeyFlag){
-      col->m_keyInfoPos = keyInfoPos + 1;
-      keyInfoPos += ((col->m_attrSize * col->m_arraySize + 3) / 4);
-      keyCount++;
-      
-      if(attrDesc.AttributeDKey)
-	distKeys++;
-    } else {
-      col->m_keyInfoPos = 0;
-    }
-    if (col->getBlobType())
-      blobCount++;
-    NdbColumnImpl * null = 0;
-    impl->m_columns.fill(attrDesc.AttributeId, null);
-    if(impl->m_columns[attrDesc.AttributeId] != 0){
-      delete col;
-      delete impl;
-      DBUG_RETURN(703);
-    }
-    impl->m_columns[attrDesc.AttributeId] = col;
+    col->m_column_no = impl->m_columns.size();
+    impl->m_columns.push_back(col);
     it.next();
   }
 
-  impl->m_noOfKeys = keyCount;
-  impl->m_keyLenInWords = keyInfoPos;
-  impl->m_noOfBlobs = blobCount;
-  impl->m_noOfDistributionKeys = distKeys;
+  impl->computeAggregates();
 
-  if(tableDesc.FragmentDataLen > 0)
+  if(tableDesc->ReplicaDataLen > 0)
   {
-    Uint32 replicaCount = tableDesc.FragmentData[0];
-    Uint32 fragCount = tableDesc.FragmentData[1];
+    Uint16 replicaCount = ntohs(tableDesc->ReplicaData[0]);
+    Uint16 fragCount = ntohs(tableDesc->ReplicaData[1]);
 
     impl->m_replicaCount = replicaCount;
     impl->m_fragmentCount = fragCount;
-
-    for(i = 0; i<(fragCount*replicaCount); i++)
+    DBUG_PRINT("info", ("replicaCount=%x , fragCount=%x",replicaCount,fragCount));
+    for(i = 0; i < (Uint32) (fragCount*replicaCount); i++)
     {
-      impl->m_fragments.push_back(tableDesc.FragmentData[i+2]);
+      impl->m_fragments.push_back(ntohs(tableDesc->ReplicaData[i+2]));
     }
 
     Uint32 topBit = (1 << 31);
@@ -1433,23 +2169,26 @@
   }
   else
   {
-    impl->m_fragmentCount = tableDesc.FragmentCount;
+    impl->m_fragmentCount = tableDesc->FragmentCount;
     impl->m_replicaCount = 0;
     impl->m_hashValueMask = 0;
     impl->m_hashpointerValue = 0;
   }
 
-  if(distKeys == 0)
-  {
-    for(i = 0; i < tableDesc.NoOfAttributes; i++)
-    {
-      if(impl->m_columns[i]->getPrimaryKey())
-	impl->m_columns[i]->m_distributionKey = true;
-    }
-  }
+  impl->m_tablespace_id = tableDesc->TablespaceId;
+  impl->m_tablespace_version = tableDesc->TablespaceVersion;
   
   * ret = impl;
 
+  NdbMem_Free((void*)tableDesc);
+  if (version < MAKE_VERSION(5,1,3))
+  {
+    ;
+  } 
+  else
+  {
+    DBUG_ASSERT(impl->m_fragmentCount > 0);
+  }
   DBUG_RETURN(0);
 }
 
@@ -1459,79 +2198,105 @@
 int
 NdbDictionaryImpl::createTable(NdbTableImpl &t)
 { 
+  DBUG_ENTER("NdbDictionaryImpl::createTable");
+
+  // if the new name has not been set, use the copied name
+  if (t.m_newExternalName.empty())
+    t.m_newExternalName.assign(t.m_externalName);
+
+  // create table
   if (m_receiver.createTable(m_ndb, t) != 0)
-    return -1;
-  if (t.m_noOfBlobs == 0)
-    return 0;
-  // update table def from DICT
-  Ndb_local_table_info *info=
-    get_local_table_info(t.m_internalName,false);
-  if (info == NULL) {
-    m_error.code= 709;
-    return -1;
+    DBUG_RETURN(-1);
+  Uint32* data = (Uint32*)m_receiver.m_buffer.get_data();
+  t.m_id = data[0];
+  t.m_version = data[1];
+
+  // update table def from DICT - by-pass cache
+  NdbTableImpl* t2 =
+    m_receiver.getTable(t.m_internalName, m_ndb.usingFullyQualifiedNames());
+
+  // check if we got back same table
+  if (t2 == NULL) {
+    DBUG_PRINT("info", ("table %s dropped by another thread", 
+                        t.m_internalName.c_str()));
+    m_error.code = 283;
+    DBUG_RETURN(-1);
+  }
+  if (t.m_id != t2->m_id || t.m_version != t2->m_version) {
+    DBUG_PRINT("info", ("table %s re-created by another thread",
+                        t.m_internalName.c_str()));
+    m_error.code = 283;
+    delete t2;
+    DBUG_RETURN(-1);
+  }
+
+  // auto-increment - use "t" because initial value is not in DICT
+  {
+    bool autoIncrement = false;
+    Uint64 initialValue = 0;
+    for (Uint32 i = 0; i < t.m_columns.size(); i++) {
+      const NdbColumnImpl* c = t.m_columns[i];
+      assert(c != NULL);
+      if (c->m_autoIncrement) {
+        if (autoIncrement) {
+          m_error.code = 4335;
+          delete t2;
+          DBUG_RETURN(-1);
+        }
+        autoIncrement = true;
+        initialValue = c->m_autoIncrementInitialValue;
+      }
+    }
+    if (autoIncrement) {
+      // XXX unlikely race condition - t.m_id may no longer be same table
+      if (! m_ndb.setTupleIdInNdb(t.m_id, initialValue, false)) {
+        if (m_ndb.theError.code)
+          m_error.code = m_ndb.theError.code;
+        else
+          m_error.code = 4336;
+        delete t2;
+        DBUG_RETURN(-1);
+      }
+    }
   }
-  if (createBlobTables(*(info->m_table_impl)) != 0) {
+
+  // blob tables - use "t2" to get values set by kernel
+  if (t2->m_noOfBlobs != 0 && createBlobTables(*t2) != 0) {
     int save_code = m_error.code;
-    (void)dropTable(t);
-    m_error.code= save_code;
-    return -1;
+    (void)dropTable(*t2);
+    m_error.code = save_code;
+    delete t2;
+    DBUG_RETURN(-1);
   }
-  return 0;
+
+  // not entered in cache
+  delete t2;
+  DBUG_RETURN(0);
 }
 
 int
 NdbDictionaryImpl::createBlobTables(NdbTableImpl &t)
 {
+  DBUG_ENTER("NdbDictionaryImpl::createBlobTables");
   for (unsigned i = 0; i < t.m_columns.size(); i++) {
     NdbColumnImpl & c = *t.m_columns[i];
     if (! c.getBlobType() || c.getPartSize() == 0)
       continue;
     NdbTableImpl bt;
     NdbBlob::getBlobTable(bt, &t, &c);
-    if (createTable(bt) != 0)
-      return -1;
-    // Save BLOB table handle
-    Ndb_local_table_info *info=
-      get_local_table_info(bt.m_internalName, false);
-    if (info == 0) {
-      return -1;
-    }
-    c.m_blobTable = info->m_table_impl;
-  }
-  
-  return 0;
-}
-
-int
-NdbDictionaryImpl::addBlobTables(NdbTableImpl &t)
-{
-  unsigned n= t.m_noOfBlobs;
-  // optimized for blob column being the last one
-  // and not looking for more than one if not neccessary
-  for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
-    i--;
-    NdbColumnImpl & c = *t.m_columns[i];
-    if (! c.getBlobType() || c.getPartSize() == 0)
-      continue;
-    n--;
-    char btname[NdbBlobImpl::BlobTableNameSize];
-    NdbBlob::getBlobTableName(btname, &t, &c);
-    // Save BLOB table handle
-    NdbTableImpl * cachedBlobTable = getTable(btname);
-    if (cachedBlobTable == 0) {
-      return -1;
+    if (createTable(bt) != 0) {
+      DBUG_RETURN(-1);
     }
-    c.m_blobTable = cachedBlobTable;
   }
-  
-  return 0;
+  DBUG_RETURN(0); 
 }
 
 int 
 NdbDictInterface::createTable(Ndb & ndb,
 			      NdbTableImpl & impl)
 {
-  return createOrAlterTable(ndb, impl, false);
+  DBUG_ENTER("NdbDictInterface::createTable");
+  DBUG_RETURN(createOrAlterTable(ndb, impl, false));
 }
 
 int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)
@@ -1541,30 +2306,43 @@
 
   DBUG_ENTER("NdbDictionaryImpl::alterTable");
   Ndb_local_table_info * local = 0;
-  if((local= get_local_table_info(originalInternalName, false)) == 0)
+  if((local= get_local_table_info(originalInternalName)) == 0)
   {
     m_error.code = 709;
     DBUG_RETURN(-1);
   }
 
   // Alter the table
-  int ret = m_receiver.alterTable(m_ndb, impl);
-  if(ret == 0){
-    // Remove cached information and let it be refreshed at next access
+  int ret = alterTableGlobal(*local->m_table_impl, impl);
+  if(ret == 0)
+  {
     m_globalHash->lock();
-    local->m_table_impl->m_status = NdbDictionary::Object::Invalid;
-    m_globalHash->drop(local->m_table_impl);
+    m_globalHash->release(local->m_table_impl, 1);
     m_globalHash->unlock();
     m_localHash.drop(originalInternalName);
   }
   DBUG_RETURN(ret);
 }
 
+int NdbDictionaryImpl::alterTableGlobal(NdbTableImpl &old_impl,
+                                        NdbTableImpl &impl)
+{
+  DBUG_ENTER("NdbDictionaryImpl::alterTableGlobal");
+  // Alter the table
+  int ret = m_receiver.alterTable(m_ndb, impl);
+  old_impl.m_status = NdbDictionary::Object::Invalid;
+  if(ret == 0){
+    DBUG_RETURN(ret);
+  }
+  ERR_RETURN(getNdbError(), ret);
+}
+
 int 
 NdbDictInterface::alterTable(Ndb & ndb,
 			      NdbTableImpl & impl)
 {
-  return createOrAlterTable(ndb, impl, true);
+  DBUG_ENTER("NdbDictInterface::alterTable");
+  DBUG_RETURN(createOrAlterTable(ndb, impl, true));
 }
 
 int 
@@ -1572,8 +2350,12 @@
 				     NdbTableImpl & impl,
 				     bool alter)
 {
-  DBUG_ENTER("NdbDictInterface::createOrAlterTable");
   unsigned i, err;
+  char *ts_names[MAX_NDB_PARTITIONS];
+  DBUG_ENTER("NdbDictInterface::createOrAlterTable");
+
+  impl.computeAggregates();
+
   if((unsigned)impl.getNoOfPrimaryKeys() > NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY){
     m_error.code= 4317;
     DBUG_RETURN(-1);
@@ -1583,73 +2365,245 @@
     m_error.code= 4318;
     DBUG_RETURN(-1);
   }
-
+  
+  // Check if any changes for alter table
+  
+  // Name change
   if (!impl.m_newExternalName.empty()) {
+    if (alter)
+    {
+      AlterTableReq::setNameFlag(impl.m_changeMask, true);
+    }
     impl.m_externalName.assign(impl.m_newExternalName);
-    AlterTableReq::setNameFlag(impl.m_changeMask, true);
+    impl.m_newExternalName.clear();
+  }
+  // Definition change (frm)
+  if (!impl.m_newFrm.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setFrmFlag(impl.m_changeMask, true);
+    }
+    impl.m_frm.assign(impl.m_newFrm.get_data(), impl.m_newFrm.length());
+    impl.m_newFrm.clear();
+  }
+  // Change FragmentData (fragment identity, state, tablespace id)
+  if (!impl.m_new_fd.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setFragDataFlag(impl.m_changeMask, true);
+    }
+    impl.m_fd.assign(impl.m_new_fd.get_data(), impl.m_new_fd.length());
+    impl.m_new_fd.clear();
+  }
+  // Change Tablespace Name Data
+  if (!impl.m_new_ts_name.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setTsNameFlag(impl.m_changeMask, true);
+    }
+    impl.m_ts_name.assign(impl.m_new_ts_name.get_data(),
+                          impl.m_new_ts_name.length());
+    impl.m_new_ts_name.clear();
+  }
+  // Change Range/List Data
+  if (!impl.m_new_range.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setRangeListFlag(impl.m_changeMask, true);
+    }
+    impl.m_range.assign(impl.m_new_range.get_data(),
+                          impl.m_new_range.length());
+    impl.m_new_range.clear();
+  }
+  // Change Tablespace Data
+  if (!impl.m_new_ts.empty())
+  {
+    if (alter)
+    {
+      AlterTableReq::setTsFlag(impl.m_changeMask, true);
+    }
+    impl.m_ts.assign(impl.m_new_ts.get_data(),
+                     impl.m_new_ts.length());
+    impl.m_new_ts.clear();
   }
 
+
+  /*
+     TODO RONM: Here I need to insert checks for fragment array and
+     range or list array
+  */
+  
   //validate();
   //aggregate();
 
   const BaseString internalName(
     ndb.internalize_table_name(impl.m_externalName.c_str()));
   impl.m_internalName.assign(internalName);
-  UtilBufferWriter w(m_buffer);
-  DictTabInfo::Table tmpTab; tmpTab.init();
-  BaseString::snprintf(tmpTab.TableName,
-	   sizeof(tmpTab.TableName),
+  impl.updateMysqlName();
+  DictTabInfo::Table *tmpTab;
+
+  tmpTab = (DictTabInfo::Table*)NdbMem_Allocate(sizeof(DictTabInfo::Table));
+  if (!tmpTab)
+  {
+    m_error.code = 4000;
+    DBUG_RETURN(-1);
+  }
+  tmpTab->init();
+  BaseString::snprintf(tmpTab->TableName,
+	   sizeof(tmpTab->TableName),
 	   internalName.c_str());
 
-  bool haveAutoIncrement = false;
-  Uint64 autoIncrementValue = 0;
   Uint32 distKeys= 0;
-  for(i = 0; i<sz; i++){
+  for(i = 0; i<sz; i++) {
     const NdbColumnImpl * col = impl.m_columns[i];
-    if(col == 0)
-      continue;
-    if (col->m_autoIncrement) {
-      if (haveAutoIncrement) {
-        m_error.code= 4335;
-        DBUG_RETURN(-1);
-      }
-      haveAutoIncrement = true;
-      autoIncrementValue = col->m_autoIncrementInitialValue;
+    if (col == NULL) {
+      m_error.code = 4272;
+      NdbMem_Free((void*)tmpTab);
+      DBUG_RETURN(-1);
     }
     if (col->m_distributionKey)
+    {
       distKeys++;
+    }
   }
+  if (distKeys == impl.m_noOfKeys)
+    distKeys= 0;
+  impl.m_noOfDistributionKeys= distKeys;
+
 
   // Check max length of frm data
   if (impl.m_frm.length() > MAX_FRM_DATA_SIZE){
     m_error.code= 1229;
+    NdbMem_Free((void*)tmpTab);
     DBUG_RETURN(-1);
   }
-  tmpTab.FrmLen = impl.m_frm.length();
-  memcpy(tmpTab.FrmData, impl.m_frm.get_data(), impl.m_frm.length());
+  /*
+    TODO RONM: This needs to change to dynamic arrays instead
+    Frm Data, FragmentData, TablespaceData, RangeListData, TsNameData
+  */
+  tmpTab->FrmLen = impl.m_frm.length();
+  memcpy(tmpTab->FrmData, impl.m_frm.get_data(), impl.m_frm.length());
+
+  tmpTab->FragmentDataLen = impl.m_fd.length();
+  memcpy(tmpTab->FragmentData, impl.m_fd.get_data(), impl.m_fd.length());
+
+  tmpTab->TablespaceDataLen = impl.m_ts.length();
+  memcpy(tmpTab->TablespaceData, impl.m_ts.get_data(), impl.m_ts.length());
+
+  tmpTab->RangeListDataLen = impl.m_range.length();
+  memcpy(tmpTab->RangeListData, impl.m_range.get_data(),
+         impl.m_range.length());
+
+  memcpy(ts_names, impl.m_ts_name.get_data(),
+         impl.m_ts_name.length());
+
+  tmpTab->FragmentCount= impl.m_fragmentCount;
+  tmpTab->TableLoggedFlag = impl.m_logging;
+  tmpTab->RowGCIFlag = impl.m_row_gci;
+  tmpTab->RowChecksumFlag = impl.m_row_checksum;
+  tmpTab->TableKValue = impl.m_kvalue;
+  tmpTab->MinLoadFactor = impl.m_minLoadFactor;
+  tmpTab->MaxLoadFactor = impl.m_maxLoadFactor;
+  tmpTab->TableType = DictTabInfo::UserTable;
+  tmpTab->PrimaryTableId = impl.m_primaryTableId;
+  tmpTab->NoOfAttributes = sz;
+  tmpTab->MaxRowsHigh = (Uint32)(impl.m_max_rows >> 32);
+  tmpTab->MaxRowsLow = (Uint32)(impl.m_max_rows & 0xFFFFFFFF);
+  tmpTab->DefaultNoPartFlag = impl.m_default_no_part_flag;
+  tmpTab->LinearHashFlag = impl.m_linear_flag;
 
-  tmpTab.TableLoggedFlag = impl.m_logging;
-  tmpTab.TableKValue = impl.m_kvalue;
-  tmpTab.MinLoadFactor = impl.m_minLoadFactor;
-  tmpTab.MaxLoadFactor = impl.m_maxLoadFactor;
-  tmpTab.TableType = DictTabInfo::UserTable;
-  tmpTab.NoOfAttributes = sz;
-  
-  tmpTab.FragmentType = getKernelConstant(impl.m_fragmentType,
-					  fragmentTypeMapping,
-					  DictTabInfo::AllNodesSmallTable);
-  tmpTab.TableVersion = rand();
+  if (impl.m_ts_name.length())
+  {
+    char **ts_name_ptr= (char**)ts_names;
+    i= 0;
+    do
+    {
+      NdbTablespaceImpl tmp;
+      if (*ts_name_ptr)
+      {
+        if(get_filegroup(tmp, NdbDictionary::Object::Tablespace, 
+                         (const char*)*ts_name_ptr) == 0)
+        {
+          tmpTab->TablespaceData[2*i] = tmp.m_id;
+          tmpTab->TablespaceData[2*i + 1] = tmp.m_version;
+        }
+        else
+        { 
+          NdbMem_Free((void*)tmpTab);
+          DBUG_RETURN(-1);
+        }
+      }
+      else
+      {
+        /*
+          No tablespace used, set tablespace id to NULL
+        */
+        tmpTab->TablespaceData[2*i] = RNIL;
+        tmpTab->TablespaceData[2*i + 1] = 0;
+      }
+      ts_name_ptr++;
+    } while (++i < tmpTab->FragmentCount);
+    tmpTab->TablespaceDataLen= 4*i;
+  }
 
+  tmpTab->FragmentType = getKernelConstant(impl.m_fragmentType,
+ 					   fragmentTypeMapping,
+					   DictTabInfo::AllNodesSmallTable);
+  tmpTab->TableVersion = rand();
+
+  const char *tablespace_name= impl.m_tablespace_name.c_str();
+loop:
+  if(impl.m_tablespace_id != ~(Uint32)0)
+  {
+    tmpTab->TablespaceId = impl.m_tablespace_id;
+    tmpTab->TablespaceVersion = impl.m_tablespace_version;
+  }
+  else if(strlen(tablespace_name))
+  {
+    NdbTablespaceImpl tmp;
+    if(get_filegroup(tmp, NdbDictionary::Object::Tablespace, 
+		     tablespace_name) == 0)
+    {
+      tmpTab->TablespaceId = tmp.m_id;
+      tmpTab->TablespaceVersion = tmp.m_version;
+    }
+    else 
+    {
+      // error set by get filegroup
+      NdbMem_Free((void*)tmpTab);
+      DBUG_RETURN(-1);
+    }
+  } 
+  else
+  {
+    for(i = 0; i<sz; i++)
+    {
+      if(impl.m_columns[i]->m_storageType == NDB_STORAGETYPE_DISK)
+      {
+	tablespace_name = "DEFAULT-TS";
+	goto loop;
+      }
+    }
+  }
+  
+  UtilBufferWriter w(m_buffer);
   SimpleProperties::UnpackStatus s;
   s = SimpleProperties::pack(w, 
-			     &tmpTab,
+			     tmpTab,
 			     DictTabInfo::TableMapping, 
 			     DictTabInfo::TableMappingSize, true);
   
   if(s != SimpleProperties::Eof){
     abort();
   }
+  NdbMem_Free((void*)tmpTab);
   
+  DBUG_PRINT("info",("impl.m_noOfDistributionKeys: %d impl.m_noOfKeys: %d distKeys: %d",
+		     impl.m_noOfDistributionKeys, impl.m_noOfKeys, distKeys));
   if (distKeys == impl.m_noOfKeys)
     distKeys= 0;
   impl.m_noOfDistributionKeys= distKeys;
@@ -1659,19 +2613,33 @@
     if(col == 0)
       continue;
     
+    DBUG_PRINT("info",("column: %s(%d) col->m_distributionKey: %d",
+		       col->m_name.c_str(), i, col->m_distributionKey));
     DictTabInfo::Attribute tmpAttr; tmpAttr.init();
     BaseString::snprintf(tmpAttr.AttributeName, sizeof(tmpAttr.AttributeName), 
 	     col->m_name.c_str());
-    tmpAttr.AttributeId = i;
+    tmpAttr.AttributeId = col->m_attrId;
     tmpAttr.AttributeKeyFlag = col->m_pk;
     tmpAttr.AttributeNullableFlag = col->m_nullable;
-    tmpAttr.AttributeDKey = distKeys ? col->m_distributionKey : 0;
+    tmpAttr.AttributeDKey = distKeys ? (bool)col->m_distributionKey : 0;
 
     tmpAttr.AttributeExtType = (Uint32)col->m_type;
     tmpAttr.AttributeExtPrecision = ((unsigned)col->m_precision & 0xFFFF);
     tmpAttr.AttributeExtScale = col->m_scale;
     tmpAttr.AttributeExtLength = col->m_length;
+    if(col->m_storageType == NDB_STORAGETYPE_DISK)
+      tmpAttr.AttributeArrayType = NDB_ARRAYTYPE_FIXED;
+    else
+      tmpAttr.AttributeArrayType = col->m_arrayType;
+
+    if(col->m_pk)
+      tmpAttr.AttributeStorageType = NDB_STORAGETYPE_MEMORY;      
+    else
+      tmpAttr.AttributeStorageType = col->m_storageType;
 
+    if(col->getBlobType())
+      tmpAttr.AttributeStorageType = NDB_STORAGETYPE_MEMORY;      
+    
     // check type and compute attribute size and array size
     if (! tmpAttr.translateExtType()) {
       m_error.code= 703;
@@ -1691,8 +2659,14 @@
     }
     // distribution key not supported for Char attribute
     if (distKeys && col->m_distributionKey && col->m_cs != NULL) {
-      m_error.code= 745;
-      DBUG_RETURN(-1);
+      // we can allow this for non-var char where strxfrm does nothing
+      if (col->m_type == NdbDictionary::Column::Char &&
+          (col->m_cs->state & MY_CS_BINSORT))
+        ;
+      else {
+        m_error.code= 745;
+        DBUG_RETURN(-1);
+      }
     }
     // charset in upper half of precision
     if (col->getCharType()) {
@@ -1710,140 +2684,89 @@
     w.add(DictTabInfo::AttributeEnd, 1);
   }
 
-  NdbApiSignal tSignal(m_reference);
-  tSignal.theReceiversBlockNumber = DBDICT;
+  int ret;
   
   LinearSectionPtr ptr[1];
   ptr[0].p = (Uint32*)m_buffer.get_data();
   ptr[0].sz = m_buffer.length() / 4;
-  int ret;
-  if (alter)
-  {
-    AlterTableReq * const req = 
-      CAST_PTR(AlterTableReq, tSignal.getDataPtrSend());
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  if (alter) {
+    tSignal.theVerId_signalNumber   = GSN_ALTER_TABLE_REQ;
+    tSignal.theLength = AlterTableReq::SignalLength;
+
+    AlterTableReq * req = CAST_PTR(AlterTableReq, tSignal.getDataPtrSend());
     
     req->senderRef = m_reference;
     req->senderData = 0;
     req->changeMask = impl.m_changeMask;
-    req->tableId = impl.m_tableId;
+    req->tableId = impl.m_id;
     req->tableVersion = impl.m_version;;
-    tSignal.theVerId_signalNumber   = GSN_ALTER_TABLE_REQ;
-    tSignal.theLength = AlterTableReq::SignalLength;
-    ret= alterTable(&tSignal, ptr);
-  }
-  else
-  {
-    CreateTableReq * const req = 
-      CAST_PTR(CreateTableReq, tSignal.getDataPtrSend());
+
+    int errCodes[] = { AlterTableRef::NotMaster, AlterTableRef::Busy, 0 };
+    ret = dictSignal(&tSignal, ptr, 1,
+		     0, // master
+		     WAIT_ALTER_TAB_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100,
+		     errCodes);
     
-    req->senderRef = m_reference;
-    req->senderData = 0;
+    if(m_error.code == AlterTableRef::InvalidTableVersion) {
+      // Clear caches and try again
+      DBUG_RETURN(INCOMPATIBLE_VERSION);
+    }
+  } else {
     tSignal.theVerId_signalNumber   = GSN_CREATE_TABLE_REQ;
     tSignal.theLength = CreateTableReq::SignalLength;
-    ret= createTable(&tSignal, ptr);
 
-    if (ret)
-      DBUG_RETURN(ret);
-
-    if (haveAutoIncrement) {
-      if (!ndb.setAutoIncrementValue(impl.m_externalName.c_str(),
-				     autoIncrementValue)) {
-	if (ndb.theError.code == 0) {
-	  m_error.code= 4336;
-	  ndb.theError = m_error;
-	} else
-	  m_error= ndb.theError;
-	ret = -1; // errorcode set in initialize_autoincrement
-      }
-    }
+    CreateTableReq * req = CAST_PTR(CreateTableReq, tSignal.getDataPtrSend());
+    req->senderRef = m_reference;
+    req->senderData = 0;
+    int errCodes[] = { CreateTableRef::Busy, CreateTableRef::NotMaster, 0 };
+    ret = dictSignal(&tSignal, ptr, 1,
+		     0, // master node
+		     WAIT_CREATE_INDX_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100,
+		     errCodes);
   }
+  
   DBUG_RETURN(ret);
 }
 
-int
-NdbDictInterface::createTable(NdbApiSignal* signal, LinearSectionPtr ptr[3])
-{
-#if DEBUG_PRINT
-  ndbout_c("BufferLen = %d", ptr[0].sz);
-  SimplePropertiesLinearReader r(ptr[0].p, ptr[0].sz);
-  r.printAll(ndbout);
-#endif
-  const int noErrCodes = 2;
-  int errCodes[noErrCodes] = 
-     {CreateTableRef::Busy,
-      CreateTableRef::NotMaster};
-  return dictSignal(signal,ptr,1,
-		    1/*use masternode id*/,
-		    100,
-		    WAIT_CREATE_INDX_REQ,
-		    WAITFOR_RESPONSE_TIMEOUT,
-		    errCodes,noErrCodes);
-}
-
-
 void
 NdbDictInterface::execCREATE_TABLE_CONF(NdbApiSignal * signal,
 					LinearSectionPtr ptr[3])
 {
-#if 0
   const CreateTableConf* const conf=
     CAST_CONSTPTR(CreateTableConf, signal->getDataPtr());
-  Uint32 tableId= conf->tableId;
-  Uint32 tableVersion= conf->tableVersion;
-#endif
+  m_buffer.grow(4 * 2); // 2 words
+  Uint32* data = (Uint32*)m_buffer.get_data();
+  data[0] = conf->tableId;
+  data[1] = conf->tableVersion;
   m_waiter.signal(NO_WAIT);  
 }
 
 void
-NdbDictInterface::execCREATE_TABLE_REF(NdbApiSignal * signal,
+NdbDictInterface::execCREATE_TABLE_REF(NdbApiSignal * sig,
 				       LinearSectionPtr ptr[3])
 {
-  const CreateTableRef* const ref=
-    CAST_CONSTPTR(CreateTableRef, signal->getDataPtr());
+  const CreateTableRef* ref = CAST_CONSTPTR(CreateTableRef, sig->getDataPtr());
   m_error.code= ref->errorCode;
   m_masterNodeId = ref->masterNodeId;
   m_waiter.signal(NO_WAIT);  
 }
 
-int
-NdbDictInterface::alterTable(NdbApiSignal* signal, LinearSectionPtr ptr[3])
-{
-#if DEBUG_PRINT
-  ndbout_c("BufferLen = %d", ptr[0].sz);
-  SimplePropertiesLinearReader r(ptr[0].p, ptr[0].sz);
-  r.printAll(ndbout);
-#endif
-  const int noErrCodes = 2;
-  int errCodes[noErrCodes] =
-    {AlterTableRef::NotMaster,
-     AlterTableRef::Busy};
-  int r = dictSignal(signal,ptr,1,
-		     1/*use masternode id*/,
-		     100,WAIT_ALTER_TAB_REQ,
-		     WAITFOR_RESPONSE_TIMEOUT,
-		     errCodes, noErrCodes);
-  if(m_error.code == AlterTableRef::InvalidTableVersion) {
-    // Clear caches and try again
-    return INCOMPATIBLE_VERSION;
-  }
-
-  return r;
-}
-
 void
 NdbDictInterface::execALTER_TABLE_CONF(NdbApiSignal * signal,
                                        LinearSectionPtr ptr[3])
 {
-  //AlterTableConf* const conf = CAST_CONSTPTR(AlterTableConf, signal->getDataPtr());
   m_waiter.signal(NO_WAIT);
 }
 
 void
-NdbDictInterface::execALTER_TABLE_REF(NdbApiSignal * signal,
+NdbDictInterface::execALTER_TABLE_REF(NdbApiSignal * sig,
 				      LinearSectionPtr ptr[3])
 {
-  const AlterTableRef * const ref = 
-    CAST_CONSTPTR(AlterTableRef, signal->getDataPtr());
+  const AlterTableRef * ref = CAST_CONSTPTR(AlterTableRef, sig->getDataPtr());
   m_error.code= ref->errorCode;
   m_masterNodeId = ref->masterNodeId;
   m_waiter.signal(NO_WAIT);
@@ -1857,6 +2780,7 @@
 {
   DBUG_ENTER("NdbDictionaryImpl::dropTable");
   DBUG_PRINT("enter",("name: %s", name));
+  ASSERT_NOT_MYSQLD;
   NdbTableImpl * tab = getTable(name);
   if(tab == 0){
     DBUG_RETURN(-1);
@@ -1866,16 +2790,14 @@
   // we must clear the cache and try again
   if (ret == INCOMPATIBLE_VERSION) {
     const BaseString internalTableName(m_ndb.internalize_table_name(name));
-
     DBUG_PRINT("info",("INCOMPATIBLE_VERSION internal_name: %s", internalTableName.c_str()));
     m_localHash.drop(internalTableName.c_str());
     m_globalHash->lock();
-    tab->m_status = NdbDictionary::Object::Invalid;
-    m_globalHash->drop(tab);
+    m_globalHash->release(tab, 1);
     m_globalHash->unlock();
     DBUG_RETURN(dropTable(name));
   }
-
+  
   DBUG_RETURN(ret);
 }
 
@@ -1888,13 +2810,14 @@
     return dropTable(name);
   }
 
-  if (impl.m_indexType != NdbDictionary::Index::Undefined) {
+  if (impl.m_indexType != NdbDictionary::Object::TypeUndefined)
+  {
     m_receiver.m_error.code= 1228;
     return -1;
   }
 
   List list;
-  if ((res = listIndexes(list, impl.m_tableId)) == -1){
+  if ((res = listIndexes(list, impl.m_id)) == -1){
     return -1;
   }
   for (unsigned i = 0; i < list.count; i++) {
@@ -1912,14 +2835,13 @@
   }
   
   int ret = m_receiver.dropTable(impl);  
-  if(ret == 0 || m_error.code == 709){
+  if(ret == 0 || m_error.code == 709 || m_error.code == 723){
     const char * internalTableName = impl.m_internalName.c_str();
 
     
     m_localHash.drop(internalTableName);
     m_globalHash->lock();
-    impl.m_status = NdbDictionary::Object::Invalid;
-    m_globalHash->drop(&impl);
+    m_globalHash->release(&impl, 1);
     m_globalHash->unlock();
 
     return 0;
@@ -1929,6 +2851,50 @@
 }
 
 int
+NdbDictionaryImpl::dropTableGlobal(NdbTableImpl & impl)
+{
+  int res;
+  const char * name = impl.getName();
+  DBUG_ENTER("NdbDictionaryImpl::dropTableGlobal");
+  DBUG_ASSERT(impl.m_status != NdbDictionary::Object::New);
+  DBUG_ASSERT(impl.m_indexType == NdbDictionary::Object::TypeUndefined);
+
+  List list;
+  if ((res = listIndexes(list, impl.m_id)) == -1){
+    ERR_RETURN(getNdbError(), -1);
+  }
+  for (unsigned i = 0; i < list.count; i++) {
+    const List::Element& element = list.elements[i];
+    NdbIndexImpl *idx= getIndexGlobal(element.name, impl);
+    if (idx == NULL)
+    {
+      ERR_RETURN(getNdbError(), -1);
+    }
+    if ((res = dropIndexGlobal(*idx)) == -1)
+    {
+      releaseIndexGlobal(*idx, 1);
+      ERR_RETURN(getNdbError(), -1);
+    }
+    releaseIndexGlobal(*idx, 1);
+  }
+  
+  if (impl.m_noOfBlobs != 0) {
+    if (dropBlobTables(impl) != 0){
+      ERR_RETURN(getNdbError(), -1);
+    }
+  }
+  
+  int ret = m_receiver.dropTable(impl);  
+  impl.m_status = NdbDictionary::Object::Invalid;
+  if(ret == 0 || m_error.code == 709 || m_error.code == 723)
+  {
+    DBUG_RETURN(0);
+  }
+  
+  ERR_RETURN(getNdbError(), ret);
+}
+
+int
 NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t)
 {
   DBUG_ENTER("NdbDictionaryImpl::dropBlobTables");
@@ -1936,15 +2902,21 @@
     NdbColumnImpl & c = *t.m_columns[i];
     if (! c.getBlobType() || c.getPartSize() == 0)
       continue;
-    char btname[NdbBlobImpl::BlobTableNameSize];
-    NdbBlob::getBlobTableName(btname, &t, &c);
-    if (dropTable(btname) != 0) {
-      if (m_error.code != 709){
-	DBUG_PRINT("exit",("error %u - exiting",m_error.code));
-        DBUG_RETURN(-1);
-      }
-      DBUG_PRINT("info",("error %u - continuing",m_error.code));
+    NdbTableImpl* bt = c.m_blobTable;
+    if (bt == NULL) {
+      DBUG_PRINT("info", ("col %s: blob table pointer is NULL",
+                          c.m_name.c_str()));
+      continue; // "force" mode on
+    }
+    // drop directly - by-pass cache
+    int ret = m_receiver.dropTable(*c.m_blobTable);
+    if (ret != 0) {
+      DBUG_PRINT("info", ("col %s: blob table %s: error %d",
+                 c.m_name.c_str(), bt->m_internalName.c_str(), m_error.code));
+      if (! (ret == 709 || ret == 723)) // "force" mode on
+        ERR_RETURN(getNdbError(), -1);
     }
+    // leave c.m_blobTable defined
   }
   DBUG_RETURN(0);
 }
@@ -1957,28 +2929,21 @@
   tSignal.theVerId_signalNumber   = GSN_DROP_TABLE_REQ;
   tSignal.theLength = DropTableReq::SignalLength;
   
-  DropTableReq * const req = CAST_PTR(DropTableReq, tSignal.getDataPtrSend());
+  DropTableReq * req = CAST_PTR(DropTableReq, tSignal.getDataPtrSend());
   req->senderRef = m_reference;
   req->senderData = 0;
-  req->tableId = impl.m_tableId;
+  req->tableId = impl.m_id;
   req->tableVersion = impl.m_version;
 
-  return dropTable(&tSignal, 0);
-}
-
-int
-NdbDictInterface::dropTable(NdbApiSignal* signal, LinearSectionPtr ptr[3])
-{
-  const int noErrCodes = 3;
-  int errCodes[noErrCodes] =
-         {DropTableRef::NoDropTableRecordAvailable,
-          DropTableRef::NotMaster,
-          DropTableRef::Busy};
-  int r = dictSignal(signal,NULL,0,
-		     1/*use masternode id*/,
-		     100,WAIT_DROP_TAB_REQ,
-		     WAITFOR_RESPONSE_TIMEOUT,
-		     errCodes, noErrCodes);
+  int errCodes[] =
+    { DropTableRef::NoDropTableRecordAvailable,
+      DropTableRef::NotMaster,
+      DropTableRef::Busy, 0 };
+  int r = dictSignal(&tSignal, 0, 0,
+		     0, // master
+		     WAIT_DROP_TAB_REQ, 
+		     DICT_WAITFOR_TIMEOUT, 100,
+		     errCodes);
   if(m_error.code == DropTableRef::InvalidTableVersion) {
     // Clear caches and try again
     return INCOMPATIBLE_VERSION;
@@ -2002,7 +2967,7 @@
 				      LinearSectionPtr ptr[3])
 {
   DBUG_ENTER("NdbDictInterface::execDROP_TABLE_REF");
-  const DropTableRef* const ref = CAST_CONSTPTR(DropTableRef, signal->getDataPtr());
+  const DropTableRef* ref = CAST_CONSTPTR(DropTableRef, signal->getDataPtr());
   m_error.code= ref->errorCode;
   m_masterNodeId = ref->masterNodeId;
   m_waiter.signal(NO_WAIT);  
@@ -2015,10 +2980,10 @@
   const char * internalTableName = impl.m_internalName.c_str();
   DBUG_ENTER("NdbDictionaryImpl::invalidateObject");
   DBUG_PRINT("enter", ("internal_name: %s", internalTableName));
+
   m_localHash.drop(internalTableName);
   m_globalHash->lock();
-  impl.m_status = NdbDictionary::Object::Invalid;
-  m_globalHash->drop(&impl);
+  m_globalHash->release(&impl, 1);
   m_globalHash->unlock();
   DBUG_RETURN(0);
 }
@@ -2027,68 +2992,29 @@
 NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl)
 {
   const char * internalTableName = impl.m_internalName.c_str();
+  DBUG_ENTER("NdbDictionaryImpl::removeCachedObject");
+  DBUG_PRINT("enter", ("internal_name: %s", internalTableName));
 
   m_localHash.drop(internalTableName);  
   m_globalHash->lock();
   m_globalHash->release(&impl);
   m_globalHash->unlock();
-  return 0;
-}
-
-/*****************************************************************
- * Get index info
- */
-NdbIndexImpl*
-NdbDictionaryImpl::getIndexImpl(const char * externalName,
-				const BaseString& internalName)
-{
-  Ndb_local_table_info * info = get_local_table_info(internalName,
-						     false);
-  if(info == 0){
-    m_error.code = 4243;
-    return 0;
-  }
-  NdbTableImpl * tab = info->m_table_impl;
-
-  if(tab->m_indexType == NdbDictionary::Index::Undefined){
-    // Not an index
-    m_error.code = 4243;
-    return 0;
-  }
-
-  NdbTableImpl* prim = getTable(tab->m_primaryTable.c_str());
-  if(prim == 0){
-    m_error.code = 4243;
-    return 0;
-  }
-
-  /**
-   * Create index impl
-   */
-  NdbIndexImpl* idx;
-  if(NdbDictInterface::create_index_obj_from_table(&idx, tab, prim) == 0){
-    idx->m_table = tab;
-    idx->m_externalName.assign(externalName);
-    idx->m_internalName.assign(internalName);
-    // TODO Assign idx to tab->m_index
-    // Don't do it right now since assign can't asign a table with index
-    // tab->m_index = idx;
-    return idx;
-  }
-  return 0;
+  DBUG_RETURN(0);
 }
 
 int
 NdbDictInterface::create_index_obj_from_table(NdbIndexImpl** dst,
 					      NdbTableImpl* tab,
-					      const NdbTableImpl* prim){
+					      const NdbTableImpl* prim)
+{
+  DBUG_ENTER("NdbDictInterface::create_index_obj_from_table");
   NdbIndexImpl *idx = new NdbIndexImpl();
   idx->m_version = tab->m_version;
   idx->m_status = tab->m_status;
-  idx->m_indexId = tab->m_tableId;
+  idx->m_id = tab->m_id;
   idx->m_externalName.assign(tab->getName());
   idx->m_tableName.assign(prim->m_externalName);
-  NdbDictionary::Index::Type type = idx->m_type = tab->m_indexType;
+  NdbDictionary::Object::Type type = idx->m_type = tab->m_indexType;
   idx->m_logging = tab->m_logging;
   // skip last attribute (NDB$PK or NDB$TNODE)
 
@@ -2114,7 +3040,7 @@
     idx->m_key_ids[key_id] = i;
     col->m_keyInfoPos = key_id;
 
-    if(type == NdbDictionary::Index::OrderedIndex && 
+    if(type == NdbDictionary::Object::OrderedIndex && 
        (primCol->m_distributionKey ||
 	(distKeys == 0 && primCol->getPrimaryKey())))
     {
@@ -2133,8 +3059,12 @@
       tab->m_columns[i]->m_distributionKey = 0;
   }
 
+  idx->m_table_id = prim->getObjectId();
+  idx->m_table_version = prim->getObjectVersion();
+  
   * dst = idx;
-  return 0;
+  DBUG_PRINT("exit", ("m_id: %d  m_version: %d", idx->m_id, idx->m_version));
+  DBUG_RETURN(0);
 }
 
 /*****************************************************************
@@ -2143,6 +3073,7 @@
 int
 NdbDictionaryImpl::createIndex(NdbIndexImpl &ix)
 {
+  ASSERT_NOT_MYSQLD;
   NdbTableImpl* tab = getTable(ix.getTable());
   if(tab == 0){
     m_error.code = 4249;
@@ -2152,9 +3083,15 @@
   return m_receiver.createIndex(m_ndb, ix, * tab);
 }
 
+int
+NdbDictionaryImpl::createIndex(NdbIndexImpl &ix, NdbTableImpl &tab)
+{
+  return m_receiver.createIndex(m_ndb, ix, tab);
+}
+
 int 
 NdbDictInterface::createIndex(Ndb & ndb,
-			      NdbIndexImpl & impl, 
+			      const NdbIndexImpl & impl, 
 			      const NdbTableImpl & table)
 {
   //validate();
@@ -2168,8 +3105,6 @@
   }
   const BaseString internalName(
     ndb.internalize_index_name(&table, impl.getName()));
-  impl.m_internalName.assign(internalName);
-
   w.add(DictTabInfo::TableName, internalName.c_str());
   w.add(DictTabInfo::TableLoggedFlag, impl.m_logging);
 
@@ -2194,7 +3129,7 @@
   }
   req->setIndexType((DictTabInfo::TableType) it);
   
-  req->setTableId(table.m_tableId);
+  req->setTableId(table.m_id);
   req->setOnline(true);
   AttributeList attributeList;
   attributeList.sz = impl.m_columns.size();
@@ -2205,7 +3140,7 @@
       m_error.code = 4247;
       return -1;
     }
-    // Copy column definition
+    // Copy column definition  XXX must be wrong, overwrites
     *impl.m_columns[i] = *col;
 
     // index key type check
@@ -2218,44 +3153,35 @@
       m_error.code = err;
       return -1;
     }
-    attributeList.id[i] = col->m_attrId;
+    // API uses external column number to talk to DICT
+    attributeList.id[i] = col->m_column_no;
   }
   LinearSectionPtr ptr[2];
   ptr[0].p = (Uint32*)&attributeList;
   ptr[0].sz = 1 + attributeList.sz;
   ptr[1].p = (Uint32*)m_buffer.get_data();
   ptr[1].sz = m_buffer.length() >> 2;                //BUG?
-  return createIndex(&tSignal, ptr);
-}
 
-int
-NdbDictInterface::createIndex(NdbApiSignal* signal, 
-			      LinearSectionPtr ptr[3])
-{
-  const int noErrCodes = 2;
-  int errCodes[noErrCodes] = {CreateIndxRef::Busy, CreateIndxRef::NotMaster};
-  return dictSignal(signal,ptr,2,
-		    1 /*use masternode id*/,
-		    100,
+  int errCodes[] = { CreateIndxRef::Busy, CreateIndxRef::NotMaster, 0 };
+  return dictSignal(&tSignal, ptr, 2,
+		    0, // master
 		    WAIT_CREATE_INDX_REQ,
-		    -1,
-		    errCodes,noErrCodes);
+		    DICT_WAITFOR_TIMEOUT, 100,
+		    errCodes);
 }
 
 void
 NdbDictInterface::execCREATE_INDX_CONF(NdbApiSignal * signal,
 				       LinearSectionPtr ptr[3])
 {
-  //CreateTableConf* const conf = CAST_CONSTPTR(CreateTableConf, signal->getDataPtr());
-  
   m_waiter.signal(NO_WAIT);  
 }
 
 void
-NdbDictInterface::execCREATE_INDX_REF(NdbApiSignal * signal,
+NdbDictInterface::execCREATE_INDX_REF(NdbApiSignal * sig,
 				      LinearSectionPtr ptr[3])
 {
-  const CreateIndxRef* const ref = CAST_CONSTPTR(CreateIndxRef, signal->getDataPtr());
+  const CreateIndxRef* ref = CAST_CONSTPTR(CreateIndxRef, sig->getDataPtr());
   m_error.code = ref->getErrorCode();
   if(m_error.code == ref->NotMaster)
     m_masterNodeId= ref->masterNodeId;
@@ -2269,12 +3195,13 @@
 NdbDictionaryImpl::dropIndex(const char * indexName, 
 			     const char * tableName)
 {
+  ASSERT_NOT_MYSQLD;
   NdbIndexImpl * idx = getIndex(indexName, tableName);
   if (idx == 0) {
     m_error.code = 4243;
     return -1;
   }
-  int ret = dropIndex(*idx); //, tableName);
+  int ret = dropIndex(*idx, tableName);
   // If index stored in cache is incompatible with the one in the kernel
   // we must clear the cache and try again
   if (ret == INCOMPATIBLE_VERSION) {
@@ -2286,8 +3213,7 @@
 
     m_localHash.drop(internalIndexName.c_str());
     m_globalHash->lock();
-    idx->m_table->m_status = NdbDictionary::Object::Invalid;
-    m_globalHash->drop(idx->m_table);
+    m_globalHash->release(idx->m_table, 1);
     m_globalHash->unlock();
     return dropIndex(indexName, tableName);
   }
@@ -2296,29 +3222,62 @@
 }
 
 int
-NdbDictionaryImpl::dropIndex(NdbIndexImpl & impl)
+NdbDictionaryImpl::dropIndex(NdbIndexImpl & impl, const char * tableName)
 {
+  const char * indexName = impl.getName();
+  if (tableName || m_ndb.usingFullyQualifiedNames()) {
     NdbTableImpl * timpl = impl.m_table;
     
     if (timpl == 0) {
       m_error.code = 709;
       return -1;
     }
-    int ret = m_receiver.dropIndex(impl, *timpl);
-    if(ret == 0){
-      m_localHash.drop(timpl->m_internalName.c_str());
+
+    const BaseString internalIndexName((tableName)
+      ?
+      m_ndb.internalize_index_name(getTable(tableName), indexName)
+      :
+      m_ndb.internalize_table_name(indexName)); // Index is also a table
+
+    if(impl.m_status == NdbDictionary::Object::New){
+      return dropIndex(indexName, tableName);
+    }
+
+    int ret= dropIndexGlobal(impl);
+    if (ret == 0)
+    {
       m_globalHash->lock();
-      timpl->m_status = NdbDictionary::Object::Invalid;
-      m_globalHash->drop(timpl);
+      m_globalHash->release(impl.m_table, 1);
       m_globalHash->unlock();
+      m_localHash.drop(internalIndexName.c_str());
     }
     return ret;
+  }
+
+  m_error.code = 4243;
+  return -1;
+}
+
+int
+NdbDictionaryImpl::dropIndexGlobal(NdbIndexImpl & impl)
+{
+  DBUG_ENTER("NdbDictionaryImpl::dropIndexGlobal");
+  int ret = m_receiver.dropIndex(impl, *impl.m_table);
+  impl.m_status = NdbDictionary::Object::Invalid;
+  if(ret == 0)
+  {
+    DBUG_RETURN(0);
+  }
+  ERR_RETURN(getNdbError(), ret);
 }
 
 int
 NdbDictInterface::dropIndex(const NdbIndexImpl & impl, 
 			    const NdbTableImpl & timpl)
 {
+  DBUG_ENTER("NdbDictInterface::dropIndex");
+  DBUG_PRINT("enter", ("indexId: %d  indexVersion: %d",
+                       timpl.m_id, timpl.m_version));
   NdbApiSignal tSignal(m_reference);
   tSignal.theReceiversBlockNumber = DBDICT;
   tSignal.theVerId_signalNumber   = GSN_DROP_INDX_REQ;
@@ -2329,28 +3288,20 @@
   req->setConnectionPtr(0);
   req->setRequestType(DropIndxReq::RT_USER);
   req->setTableId(~0);  // DICT overwrites
-  req->setIndexId(timpl.m_tableId);
+  req->setIndexId(timpl.m_id);
   req->setIndexVersion(timpl.m_version);
 
-  return dropIndex(&tSignal, 0);
-}
-
-int
-NdbDictInterface::dropIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3])
-{
-  const int noErrCodes = 2;
-  int errCodes[noErrCodes] = {DropIndxRef::Busy, DropIndxRef::NotMaster};
-  int r = dictSignal(signal,NULL,0,
-		     1/*Use masternode id*/,
-		     100,
+  int errCodes[] = { DropIndxRef::Busy, DropIndxRef::NotMaster, 0 };
+  int r = dictSignal(&tSignal, 0, 0,
+		     0, // master
 		     WAIT_DROP_INDX_REQ,
-		     WAITFOR_RESPONSE_TIMEOUT,
-		     errCodes,noErrCodes);
+		     DICT_WAITFOR_TIMEOUT, 100,
+		     errCodes);
   if(m_error.code == DropIndxRef::InvalidIndexVersion) {
     // Clear caches and try again
-    return INCOMPATIBLE_VERSION;
+    ERR_RETURN(m_error, INCOMPATIBLE_VERSION);
   }
-  return r;
+  ERR_RETURN(m_error, r);
 }
 
 void
@@ -2364,7 +3315,7 @@
 NdbDictInterface::execDROP_INDX_REF(NdbApiSignal * signal,
 				      LinearSectionPtr ptr[3])
 {
-  const DropIndxRef* const ref = CAST_CONSTPTR(DropIndxRef, signal->getDataPtr());
+  const DropIndxRef* ref = CAST_CONSTPTR(DropIndxRef, signal->getDataPtr());
   m_error.code = ref->getErrorCode();
   if(m_error.code == ref->NotMaster)
     m_masterNodeId= ref->masterNodeId;
@@ -2378,26 +3329,24 @@
 int
 NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
 {
+  DBUG_ENTER("NdbDictionaryImpl::createEvent");
   int i;
-  NdbTableImpl* tab = getTable(evnt.getTableName());
-
-  if(tab == 0){
-#ifdef EVENT_DEBUG
-    ndbout_c("NdbDictionaryImpl::createEvent: table not found: %s",
-	     evnt.getTableName());
-#endif
-    return -1;
+  NdbTableImpl* tab= evnt.m_tableImpl;
+  if (tab == 0)
+  {
+    tab= getTable(evnt.getTableName());
+    if(tab == 0){
+      DBUG_PRINT("info",("NdbDictionaryImpl::createEvent: table not found: %s",
+			 evnt.getTableName()));
+      ERR_RETURN(getNdbError(), -1);
+    }
+    evnt.setTable(tab);
   }
 
-  evnt.m_tableId = tab->m_tableId;
-  evnt.m_tableImpl = tab;
-#ifdef EVENT_DEBUG
-  ndbout_c("Event on tableId=%d", evnt.m_tableId);
-#endif
+  DBUG_PRINT("info",("Table: id: %d version: %d", tab->m_id, tab->m_version));
 
   NdbTableImpl &table = *evnt.m_tableImpl;
 
-
   int attributeList_sz = evnt.m_attrIds.size();
 
   for (i = 0; i < attributeList_sz; i++) {
@@ -2408,17 +3357,19 @@
       ndbout_c("Attr id %u in table %s not found", evnt.m_attrIds[i],
 	       evnt.getTableName());
       m_error.code= 4713;
-      return -1;
+      ERR_RETURN(getNdbError(), -1);
     }
   }
 
   evnt.m_attrIds.clear();
 
   attributeList_sz = evnt.m_columns.size();
-#ifdef EVENT_DEBUG
-  ndbout_c("creating event %s", evnt.m_externalName.c_str());
-  ndbout_c("no of columns %d", evnt.m_columns.size());
-#endif
+
+  DBUG_PRINT("info",("Event on tableId=%d, tableVersion=%d, event name %s, no of columns %d",
+		     table.m_id, table.m_version,
+		     evnt.m_name.c_str(),
+		     evnt.m_columns.size()));
+
   int pk_count = 0;
   evnt.m_attrListBitmask.clear();
 
@@ -2427,7 +3378,7 @@
       table.getColumn(evnt.m_columns[i]->m_name.c_str());
     if(col == 0){
       m_error.code= 4247;
-      return -1;
+      ERR_RETURN(getNdbError(), -1);
     }
     // Copy column definition
     *evnt.m_columns[i] = *col;
@@ -2453,7 +3404,7 @@
   for(i = 1; i<attributeList_sz; i++) {
     if (evnt.m_columns[i-1]->m_attrId == evnt.m_columns[i]->m_attrId) {
       m_error.code= 4258;
-      return -1;
+      ERR_RETURN(getNdbError(), -1);
     }
   }
   
@@ -2464,7 +3415,37 @@
 #endif
 
   // NdbDictInterface m_receiver;
-  return m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */);
+  if (m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */) != 0)
+    ERR_RETURN(getNdbError(), -1);
+
+  // Create blob events
+  if (evnt.m_mergeEvents && createBlobEvents(evnt) != 0) {
+    int save_code = m_error.code;
+    (void)dropEvent(evnt.m_name.c_str());
+    m_error.code = save_code;
+    ERR_RETURN(getNdbError(), -1);
+  }
+  DBUG_RETURN(0);
+}
+
+int
+NdbDictionaryImpl::createBlobEvents(NdbEventImpl& evnt)
+{
+  DBUG_ENTER("NdbDictionaryImpl::createBlobEvents");
+  NdbTableImpl& t = *evnt.m_tableImpl;
+  Uint32 n = t.m_noOfBlobs;
+  Uint32 i;
+  for (i = 0; i < evnt.m_columns.size() && n > 0; i++) {
+    NdbColumnImpl & c = *evnt.m_columns[i];
+    if (! c.getBlobType() || c.getPartSize() == 0)
+      continue;
+    n--;
+    NdbEventImpl blob_evnt;
+    NdbBlob::getBlobEvent(blob_evnt, &evnt, &c);
+    if (createEvent(blob_evnt) != 0)
+      ERR_RETURN(getNdbError(), -1);
+  }
+  DBUG_RETURN(0);
 }
 
 int
@@ -2472,6 +3453,9 @@
 			      NdbEventImpl & evnt,
 			      int getFlag)
 {
+  DBUG_ENTER("NdbDictInterface::createEvent");
+  DBUG_PRINT("enter",("getFlag=%d",getFlag));
+
   NdbApiSignal tSignal(m_reference);
   tSignal.theReceiversBlockNumber = DBDICT;
   tSignal.theVerId_signalNumber   = GSN_CREATE_EVNT_REQ;
@@ -2489,22 +3473,31 @@
     // getting event from Dictionary
     req->setRequestType(CreateEvntReq::RT_USER_GET);
   } else {
+    DBUG_PRINT("info",("tableId: %u tableVersion: %u",
+		       evnt.m_tableImpl->m_id, 
+                       evnt.m_tableImpl->m_version));
     // creating event in Dictionary
     req->setRequestType(CreateEvntReq::RT_USER_CREATE);
-    req->setTableId(evnt.m_tableId);
+    req->setTableId(evnt.m_tableImpl->m_id);
+    req->setTableVersion(evnt.m_tableImpl->m_version);
     req->setAttrListBitmask(evnt.m_attrListBitmask);
     req->setEventType(evnt.mi_type);
+    req->clearFlags();
+    if (evnt.m_rep & NdbDictionary::Event::ER_ALL)
+      req->setReportAll();
+    if (evnt.m_rep & NdbDictionary::Event::ER_SUBSCRIBE)
+      req->setReportSubscribe();
   }
 
   UtilBufferWriter w(m_buffer);
 
-  const size_t len = strlen(evnt.m_externalName.c_str()) + 1;
+  const size_t len = strlen(evnt.m_name.c_str()) + 1;
   if(len > MAX_TAB_NAME_SIZE) {
     m_error.code= 4241;
-    return -1;
+    ERR_RETURN(getNdbError(), -1);
   }
 
-  w.add(SimpleProperties::StringValue, evnt.m_externalName.c_str());
+  w.add(SimpleProperties::StringValue, evnt.m_name.c_str());
 
   if (getFlag == 0)
   {
@@ -2518,12 +3511,16 @@
   ptr[0].p = (Uint32*)m_buffer.get_data();
   ptr[0].sz = (m_buffer.length()+3) >> 2;
 
-  int ret = createEvent(&tSignal, ptr, 1);
+  int ret = dictSignal(&tSignal,ptr, 1,
+		       0, // master
+		       WAIT_CREATE_INDX_REQ,
+		       DICT_WAITFOR_TIMEOUT, 100,
+		       0, -1);
 
   if (ret) {
-    return ret;
+    ERR_RETURN(getNdbError(), ret);
   }
-
+  
   char *dataPtr = (char *)m_buffer.get_data();
   unsigned int lenCreateEvntConf = *((unsigned int *)dataPtr);
   dataPtr += sizeof(lenCreateEvntConf);
@@ -2532,91 +3529,74 @@
   
   //  NdbEventImpl *evntImpl = (NdbEventImpl *)evntConf->getUserData();
 
+  evnt.m_eventId = evntConf->getEventId();
+  evnt.m_eventKey = evntConf->getEventKey();
+  evnt.m_table_id = evntConf->getTableId();
+  evnt.m_table_version = evntConf->getTableVersion();
+
   if (getFlag) {
-    evnt.m_tableId         = evntConf->getTableId();
     evnt.m_attrListBitmask = evntConf->getAttrListBitmask();
     evnt.mi_type           = evntConf->getEventType();
     evnt.setTable(dataPtr);
   } else {
-    if (evnt.m_tableId         != evntConf->getTableId() ||
+    if (evnt.m_tableImpl->m_id         != evntConf->getTableId() ||
+	evnt.m_tableImpl->m_version    != evntConf->getTableVersion() ||
 	//evnt.m_attrListBitmask != evntConf->getAttrListBitmask() ||
 	evnt.mi_type           != evntConf->getEventType()) {
       ndbout_c("ERROR*************");
-      return 1;
+      ERR_RETURN(getNdbError(), 1);
     }
   }
 
-  evnt.m_eventId         = evntConf->getEventId();
-  evnt.m_eventKey        = evntConf->getEventKey();
-
-  return ret;
-}
-
-int
-NdbDictInterface::createEvent(NdbApiSignal* signal,
-			      LinearSectionPtr ptr[3], int noLSP)
-{
-  const int noErrCodes = 1;
-  int errCodes[noErrCodes] = {CreateEvntRef::Busy};
-  return dictSignal(signal,ptr,noLSP,
-		    1 /*use masternode id*/,
-		    100,
-		    WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
-		    -1,
-		    errCodes,noErrCodes, CreateEvntRef::Temporary);
+  DBUG_RETURN(0);
 }
 
 int
-NdbDictionaryImpl::executeSubscribeEvent(NdbEventImpl & ev)
+NdbDictionaryImpl::executeSubscribeEvent(NdbEventOperationImpl & ev_op)
 {
   // NdbDictInterface m_receiver;
-  return m_receiver.executeSubscribeEvent(m_ndb, ev);
+  return m_receiver.executeSubscribeEvent(m_ndb, ev_op);
 }
 
 int
 NdbDictInterface::executeSubscribeEvent(class Ndb & ndb,
-				 NdbEventImpl & evnt)
+					NdbEventOperationImpl & ev_op)
 {
   DBUG_ENTER("NdbDictInterface::executeSubscribeEvent");
   NdbApiSignal tSignal(m_reference);
-  //  tSignal.theReceiversBlockNumber = SUMA;
   tSignal.theReceiversBlockNumber = DBDICT;
   tSignal.theVerId_signalNumber   = GSN_SUB_START_REQ;
   tSignal.theLength = SubStartReq::SignalLength2;
   
-  SubStartReq * sumaStart = CAST_PTR(SubStartReq, tSignal.getDataPtrSend());
-
-  sumaStart->subscriptionId   = evnt.m_eventId;
-  sumaStart->subscriptionKey  = evnt.m_eventKey;
-  sumaStart->part             = SubscriptionData::TableData;
-  sumaStart->subscriberData   = evnt.m_bufferId & 0xFF;
-  sumaStart->subscriberRef    = m_reference;
+  SubStartReq * req = CAST_PTR(SubStartReq, tSignal.getDataPtrSend());
 
-  DBUG_RETURN(executeSubscribeEvent(&tSignal, NULL));
+  req->subscriptionId   = ev_op.m_eventImpl->m_eventId;
+  req->subscriptionKey  = ev_op.m_eventImpl->m_eventKey;
+  req->part             = SubscriptionData::TableData;
+  req->subscriberData   = ev_op.m_oid;
+  req->subscriberRef    = m_reference;
+
+  DBUG_PRINT("info",("GSN_SUB_START_REQ subscriptionId=%d,subscriptionKey=%d,"
+		     "subscriberData=%d",req->subscriptionId,
+		     req->subscriptionKey,req->subscriberData));
+
+  DBUG_RETURN(dictSignal(&tSignal,NULL,0,
+			 0 /*use masternode id*/,
+			 WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
+			 -1, 100,
+			 0, -1));
 }
 
 int
-NdbDictInterface::executeSubscribeEvent(NdbApiSignal* signal,
-					LinearSectionPtr ptr[3])
-{
-  return dictSignal(signal,NULL,0,
-		    1 /*use masternode id*/,
-		    100,
-		    WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
-		    -1,
-		    NULL,0);
-}
-
-int
-NdbDictionaryImpl::stopSubscribeEvent(NdbEventImpl & ev)
+NdbDictionaryImpl::stopSubscribeEvent(NdbEventOperationImpl & ev_op)
 {
   // NdbDictInterface m_receiver;
-  return m_receiver.stopSubscribeEvent(m_ndb, ev);
+  return m_receiver.stopSubscribeEvent(m_ndb, ev_op);
 }
 
 int
 NdbDictInterface::stopSubscribeEvent(class Ndb & ndb,
-				     NdbEventImpl & evnt)
+				     NdbEventOperationImpl & ev_op)
 {
   DBUG_ENTER("NdbDictInterface::stopSubscribeEvent");
 
@@ -2626,36 +3606,34 @@
   tSignal.theVerId_signalNumber   = GSN_SUB_STOP_REQ;
   tSignal.theLength = SubStopReq::SignalLength;
   
-  SubStopReq * sumaStop = CAST_PTR(SubStopReq, tSignal.getDataPtrSend());
-
-  sumaStop->subscriptionId  = evnt.m_eventId;
-  sumaStop->subscriptionKey = evnt.m_eventKey;
-  sumaStop->subscriberData  = evnt.m_bufferId & 0xFF;
-  sumaStop->part            = (Uint32) SubscriptionData::TableData;
-  sumaStop->subscriberRef   = m_reference;
-
-  DBUG_RETURN(stopSubscribeEvent(&tSignal, NULL));
-}
+  SubStopReq * req = CAST_PTR(SubStopReq, tSignal.getDataPtrSend());
 
-int
-NdbDictInterface::stopSubscribeEvent(NdbApiSignal* signal,
-				     LinearSectionPtr ptr[3])
-{
-  return dictSignal(signal,NULL,0,
-		    1 /*use masternode id*/,
-		    100,
-		    WAIT_CREATE_INDX_REQ /*WAIT_SUB_STOP__REQ*/,
-		    -1,
-		    NULL,0);
+  req->subscriptionId  = ev_op.m_eventImpl->m_eventId;
+  req->subscriptionKey = ev_op.m_eventImpl->m_eventKey;
+  req->subscriberData  = ev_op.m_oid;
+  req->part            = (Uint32) SubscriptionData::TableData;
+  req->subscriberRef   = m_reference;
+
+  DBUG_PRINT("info",("GSN_SUB_STOP_REQ subscriptionId=%d,subscriptionKey=%d,"
+		     "subscriberData=%d",req->subscriptionId,
+		     req->subscriptionKey,req->subscriberData));
+
+  DBUG_RETURN(dictSignal(&tSignal,NULL,0,
+			 0 /*use masternode id*/,
+			 WAIT_CREATE_INDX_REQ /*WAIT_SUB_STOP__REQ*/,
+			 -1, 100,
+			 0, -1));
 }
 
 NdbEventImpl * 
-NdbDictionaryImpl::getEvent(const char * eventName)
+NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
 {
-  NdbEventImpl *ev =  new NdbEventImpl();
+  DBUG_ENTER("NdbDictionaryImpl::getEvent");
+  DBUG_PRINT("enter",("eventName= %s", eventName));
 
+  NdbEventImpl *ev =  new NdbEventImpl();
   if (ev == NULL) {
-    return NULL;
+    DBUG_RETURN(NULL);
   }
 
   ev->setName(eventName);
@@ -2664,48 +3642,115 @@
 
   if (ret) {
     delete ev;
-    return NULL;
+    DBUG_RETURN(NULL);
   }
 
   // We only have the table name with internal name
-  ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));
-  ev->m_tableImpl = getTable(ev->getTableName());
+  DBUG_PRINT("info",("table %s", ev->getTableName()));
+  if (tab == NULL)
+  {
+    tab= fetchGlobalTableImplRef(InitTable(this, ev->getTableName()));
+    if (tab == 0)
+    {
+      DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
+      delete ev;
+      DBUG_RETURN(NULL);
+    }
+    if ((tab->m_status != NdbDictionary::Object::Retrieved) ||
+        (tab->m_id != ev->m_table_id) ||
+        (table_version_major(tab->m_version) !=
+         table_version_major(ev->m_table_version)))
+    {
+      DBUG_PRINT("info", ("mismatch on verison in cache"));
+      releaseTableGlobal(*tab, 1);
+      tab= fetchGlobalTableImplRef(InitTable(this, ev->getTableName()));
+      if (tab == 0)
+      {
+        DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
+        delete ev;
+        DBUG_RETURN(NULL);
+      }
+    }
+    ev->setTable(tab);
+    releaseTableGlobal(*tab, 0);
+  }
+  else
+    ev->setTable(tab);
+  tab = 0;
 
+  ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));  
   // get the columns from the attrListBitmask
-
   NdbTableImpl &table = *ev->m_tableImpl;
   AttributeMask & mask = ev->m_attrListBitmask;
-  int attributeList_sz = mask.count();
-  int id = -1;
+  unsigned attributeList_sz = mask.count();
 
-#ifdef EVENT_DEBUG
-  ndbout_c("NdbDictionaryImpl::getEvent attributeList_sz = %d",
-	   attributeList_sz);
+  DBUG_PRINT("info",("Table: id: %d version: %d", 
+                     table.m_id, table.m_version));
+
+  if (table.m_id != ev->m_table_id ||
+      table_version_major(table.m_version) !=
+      table_version_major(ev->m_table_version))
+  {
+    m_error.code = 241;
+    delete ev;
+    DBUG_RETURN(NULL);
+  }
+#ifndef DBUG_OFF
   char buf[128] = {0};
   mask.getText(buf);
-  ndbout_c("mask = %s", buf);
+  DBUG_PRINT("info",("attributeList_sz= %d, mask= %s", 
+                     attributeList_sz, buf));
 #endif
 
-  for(int i = 0; i < attributeList_sz; i++) {
-    id++; while (!mask.get(id)) id++;
+  
+  if ( attributeList_sz > table.getNoOfColumns() )
+  {
+    m_error.code = 241;
+    DBUG_PRINT("error",("Invalid version, too many columns"));
+    delete ev;
+    DBUG_RETURN(NULL);
+  }
 
-    const NdbColumnImpl* col = table.getColumn(id);
-    if(col == 0) {
-#ifdef EVENT_DEBUG
-      ndbout_c("NdbDictionaryImpl::getEvent could not find column id %d", id);
-#endif
-      m_error.code= 4247;
+  assert( (int)attributeList_sz <= table.getNoOfColumns() );
+  for(unsigned id= 0; ev->m_columns.size() < attributeList_sz; id++) {
+    if ( id >= table.getNoOfColumns())
+    {
+      m_error.code = 241;
+      DBUG_PRINT("error",("Invalid version, column %d out of range", id));
       delete ev;
-      return NULL;
+      DBUG_RETURN(NULL);
     }
+    if (!mask.get(id))
+      continue;
+
+    const NdbColumnImpl* col = table.getColumn(id);
+    DBUG_PRINT("info",("column %d %s", id, col->getName()));
     NdbColumnImpl* new_col = new NdbColumnImpl;
     // Copy column definition
     *new_col = *col;
-
     ev->m_columns.push_back(new_col);
   }
+  DBUG_RETURN(ev);
+}
+
+// ev is main event and has been retrieved previously
+NdbEventImpl *
+NdbDictionaryImpl::getBlobEvent(const NdbEventImpl& ev, uint col_no)
+{
+  DBUG_ENTER("NdbDictionaryImpl::getBlobEvent");
+  DBUG_PRINT("enter", ("ev=%s col=%u", ev.m_name.c_str(), col_no));
+
+  NdbTableImpl* tab = ev.m_tableImpl;
+  assert(tab != NULL && col_no < tab->m_columns.size());
+  NdbColumnImpl* col = tab->m_columns[col_no];
+  assert(col != NULL && col->getBlobType() && col->getPartSize() != 0);
+  NdbTableImpl* blob_tab = col->m_blobTable;
+  assert(blob_tab != NULL);
+  char bename[MAX_TAB_NAME_SIZE];
+  NdbBlob::getBlobEventName(bename, &ev, col);
 
-  return ev;
+  NdbEventImpl* blob_ev = getEvent(bename, blob_tab);
+  DBUG_RETURN(blob_ev);
 }
 
 void
@@ -2729,7 +3774,8 @@
   Uint32 subscriptionId = createEvntConf->getEventId();
   Uint32 subscriptionKey = createEvntConf->getEventKey();
 
-  DBUG_PRINT("info",("subscriptionId=%d,subscriptionKey=%d",
+  DBUG_PRINT("info",("nodeid=%d,subscriptionId=%d,subscriptionKey=%d",
+		     refToNode(signal->theSendersBlockRef),
 		     subscriptionId,subscriptionKey));
   m_waiter.signal(NO_WAIT);
   DBUG_VOID_RETURN;
@@ -2746,6 +3792,8 @@
   m_error.code= ref->getErrorCode();
   DBUG_PRINT("error",("error=%d,line=%d,node=%d",ref->getErrorCode(),
 		      ref->getErrorLine(),ref->getErrorNode()));
+  if (m_error.code == CreateEvntRef::NotMaster)
+    m_masterNodeId = ref->getMasterNode();
   m_waiter.signal(NO_WAIT);
   DBUG_VOID_RETURN;
 }
@@ -2783,6 +3831,8 @@
 
   DBUG_PRINT("error",("subscriptionId=%d,subscriptionKey=%d,subscriberData=%d,error=%d",
 		      subscriptionId,subscriptionKey,subscriberData,m_error.code));
+  if (m_error.code == SubStopRef::NotMaster)
+    m_masterNodeId = subStopRef->m_masterNodeId;
   m_waiter.signal(NO_WAIT);
   DBUG_VOID_RETURN;
 }
@@ -2831,77 +3881,11 @@
   const SubStartRef * const subStartRef=
     CAST_CONSTPTR(SubStartRef, signal->getDataPtr());
   m_error.code= subStartRef->errorCode;
+  if (m_error.code == SubStartRef::NotMaster)
+    m_masterNodeId = subStartRef->m_masterNodeId;
   m_waiter.signal(NO_WAIT);
   DBUG_VOID_RETURN;
 }
-void
-NdbDictInterface::execSUB_GCP_COMPLETE_REP(NdbApiSignal * signal,
-					   LinearSectionPtr ptr[3])
-{
-  const SubGcpCompleteRep * const rep=
-    CAST_CONSTPTR(SubGcpCompleteRep, signal->getDataPtr());
-
-  const Uint32 gci            = rep->gci;
-  //  const Uint32 senderRef      = rep->senderRef;
-  const Uint32 subscriberData = rep->subscriberData;
-
-  const Uint32 bufferId = subscriberData;
-
-  const Uint32 ref = signal->theSendersBlockRef;
-
-  NdbApiSignal tSignal(m_reference);
-  SubGcpCompleteAcc * acc=
-    CAST_PTR(SubGcpCompleteAcc, tSignal.getDataPtrSend());
-
-  acc->rep = *rep;
-
-  tSignal.theReceiversBlockNumber = refToBlock(ref);
-  tSignal.theVerId_signalNumber   = GSN_SUB_GCP_COMPLETE_ACC;
-  tSignal.theLength = SubGcpCompleteAcc::SignalLength;
-
-  Uint32 aNodeId = refToNode(ref);
-
-  //  m_transporter->lock_mutex();
-  int r;
-  r = m_transporter->sendSignal(&tSignal, aNodeId);
-  //  m_transporter->unlock_mutex();
-
-  NdbGlobalEventBufferHandle::latestGCI(bufferId, gci);
-}
-
-void
-NdbDictInterface::execSUB_TABLE_DATA(NdbApiSignal * signal,
-				     LinearSectionPtr ptr[3])
-{
-#ifdef EVENT_DEBUG
-  const char * FNAME = "NdbDictInterface::execSUB_TABLE_DATA";
-#endif
-  //TODO
-  const SubTableData * const sdata = CAST_CONSTPTR(SubTableData, signal->getDataPtr());
-
-  //  const Uint32 gci            = sdata->gci;
-  //  const Uint32 operation      = sdata->operation;
-  //  const Uint32 tableId        = sdata->tableId;
-  //  const Uint32 noOfAttrs      = sdata->noOfAttributes;
-  //  const Uint32 dataLen        = sdata->dataSize;
-  const Uint32 subscriberData = sdata->subscriberData;
-  //  const Uint32 logType        = sdata->logType;
-
-  for (int i=signal->m_noOfSections;i < 3; i++) {
-    ptr[i].p = NULL;
-    ptr[i].sz = 0;
-  }
-#ifdef EVENT_DEBUG
-  ndbout_c("%s: senderData %d, gci %d, operation %d, tableId %d, noOfAttrs %d, dataLen %d",
-	   FNAME, subscriberData, gci, operation, tableId, noOfAttrs, dataLen);
-  ndbout_c("ptr[0] %u %u ptr[1] %u %u ptr[2] %u %u\n",
-	   ptr[0].p,ptr[0].sz,ptr[1].p,ptr[1].sz,ptr[2].p,ptr[2].sz);
-#endif
-  const Uint32 bufferId = subscriberData;
-
-  NdbGlobalEventBufferHandle::insertDataL(bufferId,
-					  sdata, ptr);
-}
 
 /*****************************************************************
  * Drop event
@@ -2909,13 +3893,69 @@
 int 
 NdbDictionaryImpl::dropEvent(const char * eventName)
 {
-  NdbEventImpl *ev= new NdbEventImpl();
-  ev->setName(eventName);
-  int ret= m_receiver.dropEvent(*ev);
-  delete ev;  
+  DBUG_ENTER("NdbDictionaryImpl::dropEvent");
+  DBUG_PRINT("info", ("name=%s", eventName));
 
-  //  printf("__________________RET %u\n", ret);
-  return ret;
+  NdbEventImpl *evnt = getEvent(eventName); // allocated
+  if (evnt == NULL) {
+    if (m_error.code != 723 && // no such table
+        m_error.code != 241)   // invalid table
+      DBUG_RETURN(-1);
+    DBUG_PRINT("info", ("no table err=%d, drop by name alone", m_error.code));
+    evnt = new NdbEventImpl();
+    evnt->setName(eventName);
+  }
+  int ret = dropEvent(*evnt);
+  delete evnt;  
+  DBUG_RETURN(ret);
+}
+
+int
+NdbDictionaryImpl::dropEvent(const NdbEventImpl& evnt)
+{
+  if (dropBlobEvents(evnt) != 0)
+    return -1;
+  if (m_receiver.dropEvent(evnt) != 0)
+    return -1;
+  return 0;
+}
+
+int
+NdbDictionaryImpl::dropBlobEvents(const NdbEventImpl& evnt)
+{
+  DBUG_ENTER("NdbDictionaryImpl::dropBlobEvents");
+  if (evnt.m_tableImpl != 0) {
+    const NdbTableImpl& t = *evnt.m_tableImpl;
+    Uint32 n = t.m_noOfBlobs;
+    Uint32 i;
+    for (i = 0; i < evnt.m_columns.size() && n > 0; i++) {
+      const NdbColumnImpl& c = *evnt.m_columns[i];
+      if (! c.getBlobType() || c.getPartSize() == 0)
+        continue;
+      n--;
+      NdbEventImpl* blob_evnt = getBlobEvent(evnt, i);
+      if (blob_evnt == NULL)
+        continue;
+      (void)dropEvent(*blob_evnt);
+      delete blob_evnt;
+    }
+  } else {
+    // loop over MAX_ATTRIBUTES_IN_TABLE ...
+    Uint32 i;
+    DBUG_PRINT("info", ("missing table definition, looping over "
+                        "MAX_ATTRIBUTES_IN_TABLE(%d)",
+                        MAX_ATTRIBUTES_IN_TABLE));
+    for (i = 0; i < MAX_ATTRIBUTES_IN_TABLE; i++) {
+      char bename[MAX_TAB_NAME_SIZE];
+      // XXX should get name from NdbBlob
+      sprintf(bename, "NDB$BLOBEVENT_%s_%u", evnt.getName(), i);
+      NdbEventImpl* bevnt = new NdbEventImpl();
+      bevnt->setName(bename);
+      (void)m_receiver.dropEvent(*bevnt);
+      delete bevnt;
+    }
+  }
+  DBUG_RETURN(0);
 }
 
 int
@@ -2933,29 +3973,19 @@
 
   UtilBufferWriter w(m_buffer);
 
-  w.add(SimpleProperties::StringValue, evnt.m_externalName.c_str());
+  w.add(SimpleProperties::StringValue, evnt.m_name.c_str());
 
   LinearSectionPtr ptr[1];
   ptr[0].p = (Uint32*)m_buffer.get_data();
   ptr[0].sz = (m_buffer.length()+3) >> 2;
 
-  return dropEvent(&tSignal, ptr, 1);
+  return dictSignal(&tSignal,ptr, 1,
+		    0 /*use masternode id*/,
+		    WAIT_CREATE_INDX_REQ,
+		    -1, 100,
+		    0, -1);
 }
 
-int
-NdbDictInterface::dropEvent(NdbApiSignal* signal,
-			    LinearSectionPtr ptr[3], int noLSP)
-{
-  //TODO
-  const int noErrCodes = 1;
-  int errCodes[noErrCodes] = {DropEvntRef::Busy};
-  return dictSignal(signal,ptr,noLSP,
-		    1 /*use masternode id*/,
-		    100,
-		    WAIT_CREATE_INDX_REQ /*WAIT_CREATE_EVNT_REQ*/,
-		    -1,
-		    errCodes,noErrCodes, DropEvntRef::Temporary);
-}
 void
 NdbDictInterface::execDROP_EVNT_CONF(NdbApiSignal * signal,
 				     LinearSectionPtr ptr[3])
@@ -2976,7 +4006,8 @@
 
   DBUG_PRINT("info",("ErrorCode=%u Errorline=%u ErrorNode=%u",
 	     ref->getErrorCode(), ref->getErrorLine(), ref->getErrorNode()));
-
+  if (m_error.code == DropEvntRef::NotMaster)
+    m_masterNodeId = ref->getMasterNode();
   m_waiter.signal(NO_WAIT);
   DBUG_VOID_RETURN;
 }
@@ -3106,26 +4137,28 @@
   for (Uint32 i = 0; i < RETRIES; i++) {
     m_buffer.clear();
     // begin protected
-    m_transporter->lock_mutex();
+    /*
+      The PollGuard has an implicit call of unlock_and_signal through the
+      ~PollGuard method. This method is called implicitly by the compiler
+      in all places where the object is out of context due to a return,
+      break, continue or simply end of statement block
+    */
+    PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
     Uint16 aNodeId = m_transporter->get_an_alive_node();
     if (aNodeId == 0) {
       m_error.code= 4009;
-      m_transporter->unlock_mutex();
       return -1;
     }
     if (m_transporter->sendSignal(signal, aNodeId) != 0) {
-      m_transporter->unlock_mutex();
       continue;
     }
     m_error.code= 0;
-    m_waiter.m_node = aNodeId;
-    m_waiter.m_state = WAIT_LIST_TABLES_CONF;
-    m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT);
-    m_transporter->unlock_mutex();    
+    int ret_val= poll_guard.wait_n_unlock(DICT_WAITFOR_TIMEOUT,
+                                          aNodeId, WAIT_LIST_TABLES_CONF);
     // end protected
-    if (m_waiter.m_state == NO_WAIT && m_error.code == 0)
+    if (ret_val == 0 && m_error.code == 0)
       return 0;
-    if (m_waiter.m_state == WAIT_NODE_FAILURE)
+    if (ret_val == -2) //WAIT_NODE_FAILURE
       continue;
     return -1;
   }
@@ -3145,6 +4178,794 @@
   }
 }
 
+int
+NdbDictionaryImpl::forceGCPWait()
+{
+  return m_receiver.forceGCPWait();
+}
+
+int
+NdbDictInterface::forceGCPWait()
+{
+  NdbApiSignal tSignal(m_reference);
+  WaitGCPReq* const req = CAST_PTR(WaitGCPReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->requestType = WaitGCPReq::CompleteForceStart;
+  tSignal.theReceiversBlockNumber = DBDIH;
+  tSignal.theVerId_signalNumber = GSN_WAIT_GCP_REQ;
+  tSignal.theLength = WaitGCPReq::SignalLength;
+
+  const Uint32 RETRIES = 100;
+  for (Uint32 i = 0; i < RETRIES; i++)
+  {
+    m_transporter->lock_mutex();
+    Uint16 aNodeId = m_transporter->get_an_alive_node();
+    if (aNodeId == 0) {
+      m_error.code= 4009;
+      m_transporter->unlock_mutex();
+      return -1;
+    }
+    if (m_transporter->sendSignal(&tSignal, aNodeId) != 0) {
+      m_transporter->unlock_mutex();
+      continue;
+    }
+    m_error.code= 0;
+    m_waiter.m_node = aNodeId;
+    m_waiter.m_state = WAIT_LIST_TABLES_CONF;
+    m_waiter.wait(DICT_WAITFOR_TIMEOUT);
+    m_transporter->unlock_mutex();    
+    return 0;
+  }
+  return -1;
+}
+
+void
+NdbDictInterface::execWAIT_GCP_CONF(NdbApiSignal* signal,
+				    LinearSectionPtr ptr[3])
+{
+  const WaitGCPConf * const conf=
+    CAST_CONSTPTR(WaitGCPConf, signal->getDataPtr());
+  g_latest_trans_gci= conf->gcp;
+  m_waiter.signal(NO_WAIT);
+}
+
+void
+NdbDictInterface::execWAIT_GCP_REF(NdbApiSignal* signal,
+				    LinearSectionPtr ptr[3])
+{
+  m_waiter.signal(NO_WAIT);
+}
+
+NdbFilegroupImpl::NdbFilegroupImpl(NdbDictionary::Object::Type t)
+  : NdbDictObjectImpl(t)
+{
+  m_extent_size = 0;
+  m_undo_buffer_size = 0;
+  m_logfile_group_id = ~0;
+  m_logfile_group_version = ~0;
+}
+
+NdbTablespaceImpl::NdbTablespaceImpl() : 
+  NdbDictionary::Tablespace(* this), 
+  NdbFilegroupImpl(NdbDictionary::Object::Tablespace), m_facade(this)
+{
+}
+
+NdbTablespaceImpl::NdbTablespaceImpl(NdbDictionary::Tablespace & f) : 
+  NdbDictionary::Tablespace(* this), 
+  NdbFilegroupImpl(NdbDictionary::Object::Tablespace), m_facade(&f)
+{
+}
+
+NdbTablespaceImpl::~NdbTablespaceImpl(){
+}
+
+void
+NdbTablespaceImpl::assign(const NdbTablespaceImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_name.assign(org.m_name);
+  m_grow_spec = org.m_grow_spec;
+  m_extent_size = org.m_extent_size;
+  m_undo_free_words = org.m_undo_free_words;
+  m_logfile_group_id = org.m_logfile_group_id;
+  m_logfile_group_version = org.m_logfile_group_version;
+  m_logfile_group_name.assign(org.m_logfile_group_name);
+  m_undo_free_words = org.m_undo_free_words;
+}
+
+NdbLogfileGroupImpl::NdbLogfileGroupImpl() : 
+  NdbDictionary::LogfileGroup(* this), 
+  NdbFilegroupImpl(NdbDictionary::Object::LogfileGroup), m_facade(this)
+{
+}
+
+NdbLogfileGroupImpl::NdbLogfileGroupImpl(NdbDictionary::LogfileGroup & f) : 
+  NdbDictionary::LogfileGroup(* this), 
+  NdbFilegroupImpl(NdbDictionary::Object::LogfileGroup), m_facade(&f)
+{
+}
+
+NdbLogfileGroupImpl::~NdbLogfileGroupImpl(){
+}
+
+void
+NdbLogfileGroupImpl::assign(const NdbLogfileGroupImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_name.assign(org.m_name);
+  m_grow_spec = org.m_grow_spec;
+  m_extent_size = org.m_extent_size;
+  m_undo_free_words = org.m_undo_free_words;
+  m_logfile_group_id = org.m_logfile_group_id;
+  m_logfile_group_version = org.m_logfile_group_version;
+  m_logfile_group_name.assign(org.m_logfile_group_name);
+  m_undo_free_words = org.m_undo_free_words;
+}
+
+NdbFileImpl::NdbFileImpl(NdbDictionary::Object::Type t)
+  : NdbDictObjectImpl(t)
+{
+  m_size = 0;
+  m_free = 0;
+  m_filegroup_id = ~0;
+  m_filegroup_version = ~0;
+}
+
+NdbDatafileImpl::NdbDatafileImpl() : 
+  NdbDictionary::Datafile(* this), 
+  NdbFileImpl(NdbDictionary::Object::Datafile), m_facade(this)
+{
+}
+
+NdbDatafileImpl::NdbDatafileImpl(NdbDictionary::Datafile & f) : 
+  NdbDictionary::Datafile(* this), 
+  NdbFileImpl(NdbDictionary::Object::Datafile), m_facade(&f)
+{
+}
+
+NdbDatafileImpl::~NdbDatafileImpl(){
+}
+
+void
+NdbDatafileImpl::assign(const NdbDatafileImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_size = org.m_size;
+  m_free = org.m_free;
+  m_filegroup_id = org.m_filegroup_id;
+  m_filegroup_version = org.m_filegroup_version;
+  m_path.assign(org.m_path);
+  m_filegroup_name.assign(org.m_filegroup_name);
+}
+
+NdbUndofileImpl::NdbUndofileImpl() : 
+  NdbDictionary::Undofile(* this), 
+  NdbFileImpl(NdbDictionary::Object::Undofile), m_facade(this)
+{
+}
+
+NdbUndofileImpl::NdbUndofileImpl(NdbDictionary::Undofile & f) : 
+  NdbDictionary::Undofile(* this), 
+  NdbFileImpl(NdbDictionary::Object::Undofile), m_facade(&f)
+{
+}
+
+NdbUndofileImpl::~NdbUndofileImpl(){
+}
+
+void
+NdbUndofileImpl::assign(const NdbUndofileImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_size = org.m_size;
+  m_free = org.m_free;
+  m_filegroup_id = org.m_filegroup_id;
+  m_filegroup_version = org.m_filegroup_version;
+  m_path.assign(org.m_path);
+  m_filegroup_name.assign(org.m_filegroup_name);
+}
+
+int 
+NdbDictionaryImpl::createDatafile(const NdbDatafileImpl & file, bool force){
+  DBUG_ENTER("NdbDictionaryImpl::createDatafile");
+  NdbFilegroupImpl tmp(NdbDictionary::Object::Tablespace);
+  if(file.m_filegroup_version != ~(Uint32)0){
+    tmp.m_id = file.m_filegroup_id;
+    tmp.m_version = file.m_filegroup_version;
+    DBUG_RETURN(m_receiver.create_file(file, tmp));
+  }
+  
+  
+  if(m_receiver.get_filegroup(tmp, NdbDictionary::Object::Tablespace,
+			      file.m_filegroup_name.c_str()) == 0){
+    DBUG_RETURN(m_receiver.create_file(file, tmp, force));
+  }
+  DBUG_RETURN(-1); 
+}
+
+int
+NdbDictionaryImpl::dropDatafile(const NdbDatafileImpl & file){
+  return m_receiver.drop_file(file);
+}
+
+int
+NdbDictionaryImpl::createUndofile(const NdbUndofileImpl & file, bool force){
+  DBUG_ENTER("NdbDictionaryImpl::createUndofile");
+  NdbFilegroupImpl tmp(NdbDictionary::Object::LogfileGroup);
+  if(file.m_filegroup_version != ~(Uint32)0){
+    tmp.m_id = file.m_filegroup_id;
+    tmp.m_version = file.m_filegroup_version;
+    DBUG_RETURN(m_receiver.create_file(file, tmp));
+  }
+  
+  
+  if(m_receiver.get_filegroup(tmp, NdbDictionary::Object::LogfileGroup,
+			      file.m_filegroup_name.c_str()) == 0){
+    DBUG_RETURN(m_receiver.create_file(file, tmp, force));
+  }
+  DBUG_PRINT("info", ("Failed to find filegroup"));
+  DBUG_RETURN(-1);
+}
+
+int
+NdbDictionaryImpl::dropUndofile(const NdbUndofileImpl & file){
+  return m_receiver.drop_file(file);
+}
+
+int
+NdbDictionaryImpl::createTablespace(const NdbTablespaceImpl & fg){
+  return m_receiver.create_filegroup(fg);
+}
+
+int
+NdbDictionaryImpl::dropTablespace(const NdbTablespaceImpl & fg){
+  return m_receiver.drop_filegroup(fg);
+}
+
+int
+NdbDictionaryImpl::createLogfileGroup(const NdbLogfileGroupImpl & fg){
+ return m_receiver.create_filegroup(fg);
+}
+
+int
+NdbDictionaryImpl::dropLogfileGroup(const NdbLogfileGroupImpl & fg){
+  return m_receiver.drop_filegroup(fg);
+}
+
+int
+NdbDictInterface::create_file(const NdbFileImpl & file,
+			      const NdbFilegroupImpl & group,
+			      bool overwrite){
+  DBUG_ENTER("NdbDictInterface::create_file"); 
+  UtilBufferWriter w(m_buffer);
+  DictFilegroupInfo::File f; f.init();
+  snprintf(f.FileName, sizeof(f.FileName), file.m_path.c_str());
+  f.FileType = file.m_type;
+  f.FilegroupId = group.m_id;
+  f.FilegroupVersion = group.m_version;
+  f.FileSizeHi = (file.m_size >> 32);
+  f.FileSizeLo = (file.m_size & 0xFFFFFFFF);
+  
+  SimpleProperties::UnpackStatus s;
+  s = SimpleProperties::pack(w, 
+			     &f,
+			     DictFilegroupInfo::FileMapping, 
+			     DictFilegroupInfo::FileMappingSize, true);
+  
+  if(s != SimpleProperties::Eof){
+    abort();
+  }
+  
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber = GSN_CREATE_FILE_REQ;
+  tSignal.theLength = CreateFileReq::SignalLength;
+  
+  CreateFileReq* req = CAST_PTR(CreateFileReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->objType = file.m_type;
+  req->requestInfo = 0;
+  if (overwrite)
+    req->requestInfo |= CreateFileReq::ForceCreateFile;
+  
+  LinearSectionPtr ptr[3];
+  ptr[0].p = (Uint32*)m_buffer.get_data();
+  ptr[0].sz = m_buffer.length() / 4;
+
+  int err[] = { CreateFileRef::Busy, CreateFileRef::NotMaster, 0};
+  /*
+    Send signal without time-out since creating files can take a very long
+    time if the file is very big.
+  */
+  DBUG_RETURN(dictSignal(&tSignal, ptr, 1,
+	                 0, // master
+		         WAIT_CREATE_INDX_REQ,
+		         -1, 100,
+		         err));
+}
+
+void
+NdbDictInterface::execCREATE_FILE_CONF(NdbApiSignal * signal,
+					    LinearSectionPtr ptr[3])
+{
+  m_waiter.signal(NO_WAIT);  
+}
+
+void
+NdbDictInterface::execCREATE_FILE_REF(NdbApiSignal * signal,
+					   LinearSectionPtr ptr[3])
+{
+  const CreateFileRef* ref = 
+    CAST_CONSTPTR(CreateFileRef, signal->getDataPtr());
+  m_error.code = ref->errorCode;
+  m_masterNodeId = ref->masterNodeId;
+  m_waiter.signal(NO_WAIT);  
+}
+
+int
+NdbDictInterface::drop_file(const NdbFileImpl & file){
+  DBUG_ENTER("NdbDictInterface::drop_file");
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber = GSN_DROP_FILE_REQ;
+  tSignal.theLength = DropFileReq::SignalLength;
+  
+  DropFileReq* req = CAST_PTR(DropFileReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->file_id = file.m_id;
+  req->file_version = file.m_version;
+
+  int err[] = { DropFileRef::Busy, DropFileRef::NotMaster, 0};
+  DBUG_RETURN(dictSignal(&tSignal, 0, 0,
+	                 0, // master
+		         WAIT_CREATE_INDX_REQ,
+		         DICT_WAITFOR_TIMEOUT, 100,
+		         err));
+}
+
+void
+NdbDictInterface::execDROP_FILE_CONF(NdbApiSignal * signal,
+					    LinearSectionPtr ptr[3])
+{
+  m_waiter.signal(NO_WAIT);  
+}
+
+void
+NdbDictInterface::execDROP_FILE_REF(NdbApiSignal * signal,
+					   LinearSectionPtr ptr[3])
+{
+  const DropFileRef* ref = 
+    CAST_CONSTPTR(DropFileRef, signal->getDataPtr());
+  m_error.code = ref->errorCode;
+  m_masterNodeId = ref->masterNodeId;
+  m_waiter.signal(NO_WAIT);  
+}
+
+int
+NdbDictInterface::create_filegroup(const NdbFilegroupImpl & group){
+  DBUG_ENTER("NdbDictInterface::create_filegroup");
+  UtilBufferWriter w(m_buffer);
+  DictFilegroupInfo::Filegroup fg; fg.init();
+  snprintf(fg.FilegroupName, sizeof(fg.FilegroupName), group.m_name.c_str());
+  switch(group.m_type){
+  case NdbDictionary::Object::Tablespace:
+  {
+    fg.FilegroupType = DictTabInfo::Tablespace;
+    //fg.TS_DataGrow = group.m_grow_spec;
+    fg.TS_ExtentSize = group.m_extent_size;
+
+    if(group.m_logfile_group_version != ~(Uint32)0)
+    {
+      fg.TS_LogfileGroupId = group.m_logfile_group_id;
+      fg.TS_LogfileGroupVersion = group.m_logfile_group_version;
+    }
+    else 
+    {
+      NdbLogfileGroupImpl tmp;
+      if(get_filegroup(tmp, NdbDictionary::Object::LogfileGroup, 
+		       group.m_logfile_group_name.c_str()) == 0)
+      {
+	fg.TS_LogfileGroupId = tmp.m_id;
+	fg.TS_LogfileGroupVersion = tmp.m_version;
+      }
+      else // error set by get filegroup
+      {
+	DBUG_RETURN(-1);
+      }
+    }
+  }
+  break;
+  case NdbDictionary::Object::LogfileGroup:
+    fg.LF_UndoBufferSize = group.m_undo_buffer_size;
+    fg.FilegroupType = DictTabInfo::LogfileGroup;
+    //fg.LF_UndoGrow = group.m_grow_spec;
+    break;
+  default:
+    abort();
+    DBUG_RETURN(-1);
+  };
+  
+  SimpleProperties::UnpackStatus s;
+  s = SimpleProperties::pack(w, 
+			     &fg,
+			     DictFilegroupInfo::Mapping, 
+			     DictFilegroupInfo::MappingSize, true);
+  
+  if(s != SimpleProperties::Eof){
+    abort();
+  }
+  
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber = GSN_CREATE_FILEGROUP_REQ;
+  tSignal.theLength = CreateFilegroupReq::SignalLength;
+  
+  CreateFilegroupReq* req = 
+    CAST_PTR(CreateFilegroupReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->objType = fg.FilegroupType;
+  
+  LinearSectionPtr ptr[3];
+  ptr[0].p = (Uint32*)m_buffer.get_data();
+  ptr[0].sz = m_buffer.length() / 4;
+
+  int err[] = { CreateFilegroupRef::Busy, CreateFilegroupRef::NotMaster, 0};
+  DBUG_RETURN(dictSignal(&tSignal, ptr, 1,
+	                 0, // master
+		         WAIT_CREATE_INDX_REQ,
+		         DICT_WAITFOR_TIMEOUT, 100,
+		         err));
+}
+
+void
+NdbDictInterface::execCREATE_FILEGROUP_CONF(NdbApiSignal * signal,
+					    LinearSectionPtr ptr[3])
+{
+  m_waiter.signal(NO_WAIT);  
+}
+
+void
+NdbDictInterface::execCREATE_FILEGROUP_REF(NdbApiSignal * signal,
+					   LinearSectionPtr ptr[3])
+{
+  const CreateFilegroupRef* ref = 
+    CAST_CONSTPTR(CreateFilegroupRef, signal->getDataPtr());
+  m_error.code = ref->errorCode;
+  m_masterNodeId = ref->masterNodeId;
+  m_waiter.signal(NO_WAIT);  
+}
+
+int
+NdbDictInterface::drop_filegroup(const NdbFilegroupImpl & group){
+  DBUG_ENTER("NdbDictInterface::drop_filegroup");
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber = GSN_DROP_FILEGROUP_REQ;
+  tSignal.theLength = DropFilegroupReq::SignalLength;
+  
+  DropFilegroupReq* req = CAST_PTR(DropFilegroupReq, tSignal.getDataPtrSend());
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->filegroup_id = group.m_id;
+  req->filegroup_version = group.m_version;
+  
+  int err[] = { DropFilegroupRef::Busy, DropFilegroupRef::NotMaster, 0};
+  DBUG_RETURN(dictSignal(&tSignal, 0, 0,
+                         0, // master
+		         WAIT_CREATE_INDX_REQ,
+		         DICT_WAITFOR_TIMEOUT, 100,
+		         err));
+}
+
+void
+NdbDictInterface::execDROP_FILEGROUP_CONF(NdbApiSignal * signal,
+					    LinearSectionPtr ptr[3])
+{
+  m_waiter.signal(NO_WAIT);  
+}
+
+void
+NdbDictInterface::execDROP_FILEGROUP_REF(NdbApiSignal * signal,
+					   LinearSectionPtr ptr[3])
+{
+  const DropFilegroupRef* ref = 
+    CAST_CONSTPTR(DropFilegroupRef, signal->getDataPtr());
+  m_error.code = ref->errorCode;
+  m_masterNodeId = ref->masterNodeId;
+  m_waiter.signal(NO_WAIT);  
+}
+
+
+int
+NdbDictInterface::get_filegroup(NdbFilegroupImpl & dst,
+				NdbDictionary::Object::Type type,
+				const char * name){
+  DBUG_ENTER("NdbDictInterface::get_filegroup");
+  NdbApiSignal tSignal(m_reference);
+  GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
+
+  size_t strLen = strlen(name) + 1;
+
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->requestType = 
+    GetTabInfoReq::RequestByName | GetTabInfoReq::LongSignalConf;
+  req->tableNameLen = strLen;
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_GET_TABINFOREQ;
+  tSignal.theLength = GetTabInfoReq::SignalLength;
+
+  LinearSectionPtr ptr[1];
+  ptr[0].p  = (Uint32*)name;
+  ptr[0].sz = (strLen + 3)/4;
+  
+#ifndef IGNORE_VALGRIND_WARNINGS
+  if (strLen & 3)
+  {
+    Uint32 pad = 0;
+    m_buffer.clear();
+    m_buffer.append(name, strLen);
+    m_buffer.append(&pad, 4);
+    ptr[0].p = (Uint32*)m_buffer.get_data();
+  }
+#endif
+  
+  int r = dictSignal(&tSignal, ptr, 1,
+		     -1, // any node
+		     WAIT_GET_TAB_INFO_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100);
+  if (r)
+  {
+    dst.m_id = -1;
+    dst.m_version = ~0;
+    
+    DBUG_PRINT("info", ("get_filegroup failed dictSignal"));
+    DBUG_RETURN(-1);
+  }
+
+  m_error.code = parseFilegroupInfo(dst,
+				    (Uint32*)m_buffer.get_data(),
+				    m_buffer.length() / 4);
+
+  if(m_error.code)
+  {
+    DBUG_PRINT("info", ("get_filegroup failed parseFilegroupInfo %d",
+                         m_error.code));
+    DBUG_RETURN(m_error.code);
+  }
+
+  if(dst.m_type == NdbDictionary::Object::Tablespace)
+  {
+    NdbDictionary::LogfileGroup tmp;
+    get_filegroup(NdbLogfileGroupImpl::getImpl(tmp),
+		  NdbDictionary::Object::LogfileGroup,
+		  dst.m_logfile_group_id);
+    dst.m_logfile_group_name.assign(tmp.getName());
+  }
+  
+  if(dst.m_type == type)
+  {
+    DBUG_RETURN(0);
+  }
+  DBUG_PRINT("info", ("get_filegroup failed no such filegroup"));
+  DBUG_RETURN(m_error.code = GetTabInfoRef::TableNotDefined);
+}
+
+int
+NdbDictInterface::parseFilegroupInfo(NdbFilegroupImpl &dst,
+				     const Uint32 * data, Uint32 len)
+  
+{
+  SimplePropertiesLinearReader it(data, len);
+
+  SimpleProperties::UnpackStatus status;
+  DictFilegroupInfo::Filegroup fg; fg.init();
+  status = SimpleProperties::unpack(it, &fg, 
+				    DictFilegroupInfo::Mapping, 
+				    DictFilegroupInfo::MappingSize, 
+				    true, true);
+  
+  if(status != SimpleProperties::Eof){
+    return CreateFilegroupRef::InvalidFormat;
+  }
+
+  dst.m_id = fg.FilegroupId;
+  dst.m_version = fg.FilegroupVersion;
+  dst.m_type = (NdbDictionary::Object::Type)fg.FilegroupType;
+  dst.m_status = NdbDictionary::Object::Retrieved;
+  
+  dst.m_name.assign(fg.FilegroupName);
+  dst.m_extent_size = fg.TS_ExtentSize;
+  dst.m_undo_buffer_size = fg.LF_UndoBufferSize;
+  dst.m_logfile_group_id = fg.TS_LogfileGroupId;
+  dst.m_logfile_group_version = fg.TS_LogfileGroupVersion;
+  dst.m_undo_free_words= ((Uint64)fg.LF_UndoFreeWordsHi << 32)
+    | (fg.LF_UndoFreeWordsLo);
+
+  return 0;
+}
+
+int
+NdbDictInterface::get_filegroup(NdbFilegroupImpl & dst,
+				NdbDictionary::Object::Type type,
+				Uint32 id){
+  DBUG_ENTER("NdbDictInterface::get_filegroup");
+  NdbApiSignal tSignal(m_reference);
+  GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
+
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->requestType =
+    GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
+  req->tableId = id;
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_GET_TABINFOREQ;
+  tSignal.theLength = GetTabInfoReq::SignalLength;
+
+  int r = dictSignal(&tSignal, NULL, 1,
+		     -1, // any node
+		     WAIT_GET_TAB_INFO_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100);
+  if (r)
+  {
+    DBUG_PRINT("info", ("get_filegroup failed dictSignal"));
+    DBUG_RETURN(-1);
+  }
+
+  m_error.code = parseFilegroupInfo(dst,
+				    (Uint32*)m_buffer.get_data(),
+				    m_buffer.length() / 4);
+
+  if(m_error.code)
+  {
+    DBUG_PRINT("info", ("get_filegroup failed parseFilegroupInfo %d",
+                         m_error.code));
+    DBUG_RETURN(m_error.code);
+  }
+
+  if(dst.m_type == type)
+  {
+    DBUG_RETURN(0);
+  }
+  DBUG_PRINT("info", ("get_filegroup failed no such filegroup"));
+  DBUG_RETURN(m_error.code = GetTabInfoRef::TableNotDefined);
+}
+
+int
+NdbDictInterface::get_file(NdbFileImpl & dst,
+			   NdbDictionary::Object::Type type,
+			   int node,
+			   const char * name){
+  DBUG_ENTER("NdbDictInterface::get_file");
+  NdbApiSignal tSignal(m_reference);
+  GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
+
+  size_t strLen = strlen(name) + 1;
+
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->requestType =
+    GetTabInfoReq::RequestByName | GetTabInfoReq::LongSignalConf;
+  req->tableNameLen = strLen;
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_GET_TABINFOREQ;
+  tSignal.theLength = GetTabInfoReq::SignalLength;
+
+  LinearSectionPtr ptr[1];
+  ptr[0].p  = (Uint32*)name;
+  ptr[0].sz = (strLen + 3)/4;
+  
+#ifndef IGNORE_VALGRIND_WARNINGS
+  if (strLen & 3)
+  {
+    Uint32 pad = 0;
+    m_buffer.clear();
+    m_buffer.append(name, strLen);
+    m_buffer.append(&pad, 4);
+    ptr[0].p = (Uint32*)m_buffer.get_data();
+  }
+#endif
+  
+  int r = dictSignal(&tSignal, ptr, 1,
+		     node,
+		     WAIT_GET_TAB_INFO_REQ,
+		     DICT_WAITFOR_TIMEOUT, 100);
+  if (r)
+  {
+    DBUG_PRINT("info", ("get_file failed dictSignal"));
+    DBUG_RETURN(-1);
+  }
+
+  m_error.code = parseFileInfo(dst,
+			       (Uint32*)m_buffer.get_data(),
+			       m_buffer.length() / 4);
+
+  if(m_error.code)
+  {
+    DBUG_PRINT("info", ("get_file failed parseFileInfo %d",
+                         m_error.code));
+    DBUG_RETURN(m_error.code);
+  }
+
+  if(dst.m_type == NdbDictionary::Object::Undofile)
+  {
+    NdbDictionary::LogfileGroup tmp;
+    get_filegroup(NdbLogfileGroupImpl::getImpl(tmp),
+		  NdbDictionary::Object::LogfileGroup,
+		  dst.m_filegroup_id);
+    dst.m_filegroup_name.assign(tmp.getName());
+  }
+  else if(dst.m_type == NdbDictionary::Object::Datafile)
+  {
+    NdbDictionary::Tablespace tmp;
+    get_filegroup(NdbTablespaceImpl::getImpl(tmp),
+		  NdbDictionary::Object::Tablespace,
+		  dst.m_filegroup_id);
+    dst.m_filegroup_name.assign(tmp.getName());
+    dst.m_free *= tmp.getExtentSize();
+  }
+  else
+    dst.m_filegroup_name.assign("Not Yet Implemented");
+  
+  if(dst.m_type == type)
+  {
+    DBUG_RETURN(0);
+  }
+  DBUG_PRINT("info", ("get_file failed no such file"));
+  DBUG_RETURN(m_error.code = GetTabInfoRef::TableNotDefined);
+}
+
+int
+NdbDictInterface::parseFileInfo(NdbFileImpl &dst,
+				const Uint32 * data, Uint32 len)
+{
+  SimplePropertiesLinearReader it(data, len);
+
+  SimpleProperties::UnpackStatus status;
+  DictFilegroupInfo::File f; f.init();
+  status = SimpleProperties::unpack(it, &f,
+				    DictFilegroupInfo::FileMapping,
+				    DictFilegroupInfo::FileMappingSize,
+				    true, true);
+
+  if(status != SimpleProperties::Eof){
+    return CreateFilegroupRef::InvalidFormat;
+  }
+
+  dst.m_type= (NdbDictionary::Object::Type)f.FileType;
+  dst.m_id= f.FileId;
+  dst.m_version = f.FileVersion;
+
+  dst.m_size= ((Uint64)f.FileSizeHi << 32) | (f.FileSizeLo);
+  dst.m_path.assign(f.FileName);
+
+  dst.m_filegroup_id= f.FilegroupId;
+  dst.m_filegroup_version= f.FilegroupVersion;
+  dst.m_free=  f.FileFreeExtents;
+  return 0;
+}
+
 template class Vector<int>;
 template class Vector<Uint16>;
 template class Vector<Uint32>;
@@ -3152,3 +4973,13 @@
 template class Vector<NdbTableImpl*>;
 template class Vector<NdbColumnImpl*>;
 
+const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0;
+const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT_MEMORY = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0;
+const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROW_SIZE = 0;
+const NdbDictionary::Column * NdbDictionary::Column::RANGE_NO = 0;
+const NdbDictionary::Column * NdbDictionary::Column::DISK_REF = 0;
+const NdbDictionary::Column * NdbDictionary::Column::RECORDS_IN_RANGE = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROWID = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROW_GCI = 0;

--- 1.30.5.2/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-05-24 10:57:27 +02:00
+++ 1.60/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-05-24 12:56:58 +02:00
@@ -30,15 +30,27 @@
 #include "NdbWaiter.hpp"
 #include "DictCache.hpp"
 
+bool
+is_ndb_blob_table(const char* name, Uint32* ptab_id = 0, Uint32* pcol_no = 0);
+bool
+is_ndb_blob_table(const class NdbTableImpl* t);
+
+extern int ndb_dictionary_is_mysqld;
+#define ASSERT_NOT_MYSQLD assert(ndb_dictionary_is_mysqld == 0)
+
 class NdbDictObjectImpl {
 public:
+  int m_id;
   Uint32 m_version;
+  NdbDictionary::Object::Type m_type;
   NdbDictionary::Object::Status m_status;
   
   bool change();
 protected:
-  NdbDictObjectImpl() :
+  NdbDictObjectImpl(NdbDictionary::Object::Type type) :
+    m_type(type),
     m_status(NdbDictionary::Object::New) {
+    m_id = -1;
   }
 };
 
@@ -59,10 +71,15 @@
   int m_precision;
   int m_scale;
   int m_length;
+  int m_column_no;
   CHARSET_INFO * m_cs;          // not const in MySQL
   
   bool m_pk;
-  bool m_distributionKey;
+  /*
+   * Since "none" is "all" we distinguish between
+   * 1-set by us, 2-set by user
+   */
+  Uint32 m_distributionKey;
   bool m_nullable;
   bool m_autoIncrement;
   Uint64 m_autoIncrementInitialValue;
@@ -73,7 +90,13 @@
    * Internal types and sizes, and aggregates
    */
   Uint32 m_attrSize;            // element size (size when arraySize==1)
-  Uint32 m_arraySize;           // length or length+2 for Var* types
+  Uint32 m_arraySize;           // length or maxlength+1/2 for Var* types
+  Uint32 m_arrayType;           // NDB_ARRAYTYPE_FIXED or _VAR
+  Uint32 m_storageType;         // NDB_STORAGETYPE_MEMORY or _DISK
+  /*
+   * NdbTableImpl: if m_pk, 0-based index of key in m_attrId order
+   * NdbIndexImpl: m_column_no of primary table column
+   */
   Uint32 m_keyInfoPos;
   // TODO: use bits in attr desc 2
   bool getInterpretableType() const ;
@@ -90,10 +113,9 @@
   static const NdbColumnImpl & getImpl(const NdbDictionary::Column & t);
   NdbDictionary::Column * m_facade;
 
-  static NdbDictionary::Column * create_psuedo(const char *);
+  static NdbDictionary::Column * create_pseudo(const char *);
 
   // Get total length in bytes, used by NdbOperation
-  // backported from 5.1
   bool get_var_length(const void* value, Uint32& len) const;
 };
 
@@ -106,13 +128,43 @@
   void init();
   void setName(const char * name);
   const char * getName() const;
+  void setFragmentCount(Uint32 count);
+  Uint32 getFragmentCount() const;
+  void setFrm(const void* data, Uint32 len);
+  const void * getFrmData() const;
+  Uint32 getFrmLength() const;
+  void setFragmentData(const void* data, Uint32 len);
+  const void * getFragmentData() const;
+  Uint32 getFragmentDataLen() const;
+  void setTablespaceNames(const void* data, Uint32 len);
+  Uint32 getTablespaceNamesLen() const;
+  const void * getTablespaceNames() const;
+  void setTablespaceData(const void* data, Uint32 len);
+  const void * getTablespaceData() const;
+  Uint32 getTablespaceDataLen() const;
+  void setRangeListData(const void* data, Uint32 len);
+  const void * getRangeListData() const;
+  Uint32 getRangeListDataLen() const;
+
+  const char * getMysqlName() const;
+  void updateMysqlName();
 
   Uint32 m_changeMask;
-  Uint32 m_tableId;
+  Uint32 m_primaryTableId;
   BaseString m_internalName;
   BaseString m_externalName;
+  BaseString m_mysqlName;
   BaseString m_newExternalName; // Used for alter table
   UtilBuffer m_frm; 
+  UtilBuffer m_newFrm;       // Used for alter table
+  UtilBuffer m_ts_name;      //Tablespace Names
+  UtilBuffer m_new_ts_name;  //Tablespace Names
+  UtilBuffer m_ts;           //TablespaceData
+  UtilBuffer m_new_ts;       //TablespaceData
+  UtilBuffer m_fd;           //FragmentData
+  UtilBuffer m_new_fd;       //FragmentData
+  UtilBuffer m_range;        //Range Or List Array
+  UtilBuffer m_new_range;    //Range Or List Array
   NdbDictionary::Object::FragmentType m_fragmentType;
 
   /**
@@ -121,6 +173,7 @@
   Uint32 m_columnHashMask;
   Vector<Uint32> m_columnHash;
   Vector<NdbColumnImpl *> m_columns;
+  void computeAggregates();
   void buildColumnHash(); 
 
   /**
@@ -130,14 +183,18 @@
   Uint32 m_hashpointerValue;
   Vector<Uint16> m_fragments;
 
+  Uint64 m_max_rows;
+  Uint32 m_default_no_part_flag;
+  bool m_linear_flag;
   bool m_logging;
+  bool m_row_gci;
+  bool m_row_checksum;
   int m_kvalue;
   int m_minLoadFactor;
   int m_maxLoadFactor;
   Uint16 m_keyLenInWords;
   Uint16 m_fragmentCount;
 
-  NdbDictionaryImpl * m_dictionary;
   NdbIndexImpl * m_index;
   NdbColumnImpl * getColumn(unsigned attrId);
   NdbColumnImpl * getColumn(const char * name);
@@ -148,12 +205,13 @@
    * Index only stuff
    */
   BaseString m_primaryTable;
-  NdbDictionary::Index::Type m_indexType;
+  NdbDictionary::Object::Type m_indexType;
 
   /**
    * Aggregates
    */
   Uint8 m_noOfKeys;
+  // if all pk = dk then this is zero!
   Uint8 m_noOfDistributionKeys;
   Uint8 m_noOfBlobs;
   
@@ -173,6 +231,13 @@
    * Return count
    */
   Uint32 get_nodes(Uint32 hashValue, const Uint16** nodes) const ;
+
+  /**
+   * Disk stuff
+   */
+  BaseString m_tablespace_name;
+  Uint32 m_tablespace_id;
+  Uint32 m_tablespace_version;
 };
 
 class NdbIndexImpl : public NdbDictionary::Index, public NdbDictObjectImpl {
@@ -188,13 +253,13 @@
   const char * getTable() const;
   const NdbTableImpl * getIndexTable() const;
 
-  Uint32 m_indexId;
   BaseString m_internalName;
   BaseString m_externalName;
   BaseString m_tableName;
+  Uint32 m_table_id;
+  Uint32 m_table_version;
   Vector<NdbColumnImpl *> m_columns;
   Vector<int> m_key_ids;
-  NdbDictionary::Index::Type m_type;
 
   bool m_logging;
   
@@ -206,6 +271,13 @@
 };
 
 class NdbEventImpl : public NdbDictionary::Event, public NdbDictObjectImpl {
+  friend class NdbDictInterface;
+  friend class NdbDictionaryImpl;
+  friend class NdbEventOperation;
+  friend class NdbEventOperationImpl;
+  friend class NdbEventBuffer;
+  friend class EventBufData_hash;
+  friend class NdbBlob;
 public:
   NdbEventImpl();
   NdbEventImpl(NdbDictionary::Event &);
@@ -215,13 +287,17 @@
   void setName(const char * name);
   const char * getName() const;
   void setTable(const NdbDictionary::Table& table);
+  const NdbDictionary::Table * getTable() const;
   void setTable(const char * table);
   const char * getTableName() const;
   void addTableEvent(const NdbDictionary::Event::TableEvent t);
+  bool getTableEvent(const NdbDictionary::Event::TableEvent t) const;
   void setDurability(NdbDictionary::Event::EventDurability d);
   NdbDictionary::Event::EventDurability  getDurability() const;
-  void addEventColumn(const NdbColumnImpl &c);
+  void setReport(NdbDictionary::Event::EventReport r);
+  NdbDictionary::Event::EventReport  getReport() const;
   int getNoOfEventColumns() const;
+  const NdbDictionary::Column * getEventColumn(unsigned no) const;
 
   void print() {
     ndbout_c("NdbEventImpl: id=%d, key=%d",
@@ -231,28 +307,109 @@
 
   Uint32 m_eventId;
   Uint32 m_eventKey;
-  Uint32 m_tableId;
   AttributeMask m_attrListBitmask;
-  //BaseString m_internalName;
-  BaseString m_externalName;
+  Uint32 m_table_id;
+  Uint32 m_table_version;
+  BaseString m_name;
   Uint32 mi_type;
   NdbDictionary::Event::EventDurability m_dur;
+  NdbDictionary::Event::EventReport m_rep;
+  bool m_mergeEvents;
 
-
-  NdbTableImpl *m_tableImpl;
   BaseString m_tableName;
   Vector<NdbColumnImpl *> m_columns;
   Vector<unsigned> m_attrIds;
 
-  int m_bufferId;
-
-  NdbEventOperation *eventOp;
-
   static NdbEventImpl & getImpl(NdbDictionary::Event & t);
   static NdbEventImpl & getImpl(const NdbDictionary::Event & t);
   NdbDictionary::Event * m_facade;
+private:
+  NdbTableImpl *m_tableImpl;
+  void setTable(NdbTableImpl *tableImpl);
 };
 
+struct NdbFilegroupImpl : public NdbDictObjectImpl {
+  NdbFilegroupImpl(NdbDictionary::Object::Type t);
+
+  BaseString m_name;
+  NdbDictionary::AutoGrowSpecification m_grow_spec;
+
+  union {
+    Uint32 m_extent_size;
+    Uint32 m_undo_buffer_size;
+  };
+
+  BaseString m_logfile_group_name;
+  Uint32 m_logfile_group_id;
+  Uint32 m_logfile_group_version;
+  Uint64 m_undo_free_words;
+};
+
+class NdbTablespaceImpl : public NdbDictionary::Tablespace, 
+			  public NdbFilegroupImpl {
+public:
+  NdbTablespaceImpl();
+  NdbTablespaceImpl(NdbDictionary::Tablespace &);
+  ~NdbTablespaceImpl();
+
+  void assign(const NdbTablespaceImpl&);
+
+  static NdbTablespaceImpl & getImpl(NdbDictionary::Tablespace & t);
+  static const NdbTablespaceImpl & getImpl(const NdbDictionary::Tablespace &);
+  NdbDictionary::Tablespace * m_facade;
+};
+
+class NdbLogfileGroupImpl : public NdbDictionary::LogfileGroup, 
+			    public NdbFilegroupImpl {
+public:
+  NdbLogfileGroupImpl();
+  NdbLogfileGroupImpl(NdbDictionary::LogfileGroup &);
+  ~NdbLogfileGroupImpl();
+
+  void assign(const NdbLogfileGroupImpl&);
+
+  static NdbLogfileGroupImpl & getImpl(NdbDictionary::LogfileGroup & t);
+  static const NdbLogfileGroupImpl& getImpl(const 
+					    NdbDictionary::LogfileGroup&);
+  NdbDictionary::LogfileGroup * m_facade;
+};
+
+struct NdbFileImpl : public NdbDictObjectImpl {
+  NdbFileImpl(NdbDictionary::Object::Type t);
+
+  Uint64 m_size;
+  Uint32 m_free;
+  BaseString m_path;
+  BaseString m_filegroup_name;
+  Uint32 m_filegroup_id;
+  Uint32 m_filegroup_version;
+};
+
+class NdbDatafileImpl : public NdbDictionary::Datafile, public NdbFileImpl {
+public:
+  NdbDatafileImpl();
+  NdbDatafileImpl(NdbDictionary::Datafile &);
+  ~NdbDatafileImpl();
+
+  void assign(const NdbDatafileImpl&);
+
+  static NdbDatafileImpl & getImpl(NdbDictionary::Datafile & t);
+  static const NdbDatafileImpl & getImpl(const NdbDictionary::Datafile & t);
+  NdbDictionary::Datafile * m_facade;
+};
+
+class NdbUndofileImpl : public NdbDictionary::Undofile, public NdbFileImpl {
+public:
+  NdbUndofileImpl();
+  NdbUndofileImpl(NdbDictionary::Undofile &);
+  ~NdbUndofileImpl();
+
+  void assign(const NdbUndofileImpl&);
+
+  static NdbUndofileImpl & getImpl(NdbDictionary::Undofile & t);
+  static const NdbUndofileImpl & getImpl(const NdbDictionary::Undofile & t);
+  NdbDictionary::Undofile * m_facade;
+};
 
 class NdbDictInterface {
 public:
@@ -267,65 +424,65 @@
   bool setTransporter(class TransporterFacade * tf);
   
   // To abstract the stuff thats made in all create/drop/lists below
-  int
-  dictSignal(NdbApiSignal* signal, 
-	     LinearSectionPtr ptr[3], int noLPTR,
-	     const int useMasterNodeId,
-	     const Uint32 RETRIES,
-	     const WaitSignalType wst,
-	     const int theWait,
-	     const int *errcodes,
-	     const int noerrcodes,
-	     const int temporaryMask = 0);
+  int dictSignal(NdbApiSignal* signal, LinearSectionPtr ptr[3], int secs,
+		 int nodeId, // -1 any, 0 = master, >1 = specified
+		 WaitSignalType wst,
+		 int timeout, Uint32 RETRIES,
+		 const int *errcodes = 0, int temporaryMask = 0);
 
   int createOrAlterTable(class Ndb & ndb, NdbTableImpl &, bool alter);
 
   int createTable(class Ndb & ndb, NdbTableImpl &);
-  int createTable(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
-
   int alterTable(class Ndb & ndb, NdbTableImpl &);
-  int alterTable(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
-
-  int createIndex(class Ndb & ndb,
-		  NdbIndexImpl &, 
-		  const NdbTableImpl &);
-  int createIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
-  
-  int createEvent(class Ndb & ndb, NdbEventImpl &, int getFlag);
-  int createEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3], int noLSP);
-  
   int dropTable(const NdbTableImpl &);
-  int dropTable(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
 
+  int createIndex(class Ndb & ndb, const NdbIndexImpl &, const NdbTableImpl &);
   int dropIndex(const NdbIndexImpl &, const NdbTableImpl &);
-  int dropIndex(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
-
+  
+  int createEvent(class Ndb & ndb, NdbEventImpl &, int getFlag);
   int dropEvent(const NdbEventImpl &);
   int dropEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3], int noLSP);
 
-  int executeSubscribeEvent(class Ndb & ndb, NdbEventImpl &);
-  int executeSubscribeEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
-  
-  int stopSubscribeEvent(class Ndb & ndb, NdbEventImpl &);
-  int stopSubscribeEvent(NdbApiSignal* signal, LinearSectionPtr ptr[3]);
+  int executeSubscribeEvent(class Ndb & ndb, NdbEventOperationImpl &);
+  int stopSubscribeEvent(class Ndb & ndb, NdbEventOperationImpl &);
   
   int listObjects(NdbDictionary::Dictionary::List& list, Uint32 requestData, bool fullyQualifiedNames);
   int listObjects(NdbApiSignal* signal);
   
-/*  NdbTableImpl * getTable(int tableId, bool fullyQualifiedNames); */
+  NdbTableImpl * getTable(int tableId, bool fullyQualifiedNames);
   NdbTableImpl * getTable(const BaseString& name, bool fullyQualifiedNames);
   NdbTableImpl * getTable(class NdbApiSignal * signal, 
 			  LinearSectionPtr ptr[3],
 			  Uint32 noOfSections, bool fullyQualifiedNames);
 
+  int forceGCPWait();
+
   static int parseTableInfo(NdbTableImpl ** dst, 
 			    const Uint32 * data, Uint32 len,
-			    bool fullyQualifiedNames);
+			    bool fullyQualifiedNames,
+                            Uint32 version= 0xFFFFFFFF);
+
+  static int parseFileInfo(NdbFileImpl &dst,
+			   const Uint32 * data, Uint32 len);
+
+  static int parseFilegroupInfo(NdbFilegroupImpl &dst,
+				const Uint32 * data, Uint32 len);
+  
+  int create_file(const NdbFileImpl &, const NdbFilegroupImpl&, bool overwrite = false);
+  int drop_file(const NdbFileImpl &);
+  int create_filegroup(const NdbFilegroupImpl &);
+  int drop_filegroup(const NdbFilegroupImpl &);
+  
+  int get_filegroup(NdbFilegroupImpl&, NdbDictionary::Object::Type, Uint32);
+  int get_filegroup(NdbFilegroupImpl&,NdbDictionary::Object::Type,const char*);
+  int get_file(NdbFileImpl&, NdbDictionary::Object::Type, int, int);
+  int get_file(NdbFileImpl&, NdbDictionary::Object::Type, int, const char *);
   
   static int create_index_obj_from_table(NdbIndexImpl ** dst, 
 					 NdbTableImpl* index_table,
 					 const NdbTableImpl* primary_table);
   
+  const NdbError &getNdbError() const;  
   NdbError & m_error;
 private:
   Uint32 m_reference;
@@ -335,6 +492,7 @@
   class TransporterFacade * m_transporter;
   
   friend class Ndb;
+  friend class NdbDictionaryImpl;
   static void execSignal(void* dictImpl, 
 			 class NdbApiSignal* signal, 
 			 struct LinearSectionPtr ptr[3]);
@@ -358,8 +516,6 @@
   void execCREATE_EVNT_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
   void execSUB_START_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
   void execSUB_START_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
-  void execSUB_TABLE_DATA(NdbApiSignal *, LinearSectionPtr ptr[3]);
-  void execSUB_GCP_COMPLETE_REP(NdbApiSignal *, LinearSectionPtr ptr[3]);
   void execSUB_STOP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
   void execSUB_STOP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
   void execDROP_EVNT_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
@@ -369,10 +525,40 @@
   void execDROP_TABLE_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
   void execLIST_TABLES_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
 
+  void execCREATE_FILE_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execCREATE_FILE_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  
+  void execCREATE_FILEGROUP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execCREATE_FILEGROUP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+
+  void execDROP_FILE_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execDROP_FILE_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  
+  void execDROP_FILEGROUP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execDROP_FILEGROUP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  
+  void execWAIT_GCP_CONF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+  void execWAIT_GCP_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
+
   Uint32 m_fragmentId;
   UtilBuffer m_buffer;
 };
 
+class NdbDictionaryImpl;
+class GlobalCacheInitObject
+{
+public:
+  NdbDictionaryImpl *m_dict;
+  const BaseString &m_name;
+  GlobalCacheInitObject(NdbDictionaryImpl *dict,
+                        const BaseString &name) :
+    m_dict(dict),
+    m_name(name)
+  {}
+  virtual ~GlobalCacheInitObject() {}
+  virtual int init(NdbTableImpl &tab) const = 0;
+};
+
 class NdbDictionaryImpl : public NdbDictionary::Dictionary {
 public:
   NdbDictionaryImpl(Ndb &ndb);
@@ -383,8 +569,7 @@
   bool setTransporter(class TransporterFacade * tf);
 
   int createTable(NdbTableImpl &t);
-  int createBlobTables(NdbTableImpl &);
-  int addBlobTables(NdbTableImpl &);
+  int createBlobTables(NdbTableImpl& t);
   int alterTable(NdbTableImpl &t);
   int dropTable(const char * name);
   int dropTable(NdbTableImpl &);
@@ -393,32 +578,61 @@
   int removeCachedObject(NdbTableImpl &);
 
   int createIndex(NdbIndexImpl &ix);
+  int createIndex(NdbIndexImpl &ix, NdbTableImpl & tab);
   int dropIndex(const char * indexName, 
 		const char * tableName);
-  //  int dropIndex(NdbIndexImpl &, const char * tableName);
-  int dropIndex(NdbIndexImpl &);
+  int dropIndex(NdbIndexImpl &, const char * tableName);
   NdbTableImpl * getIndexTable(NdbIndexImpl * index, 
 			       NdbTableImpl * table);
 
   int createEvent(NdbEventImpl &);
+  int createBlobEvents(NdbEventImpl &);
   int dropEvent(const char * eventName);
+  int dropEvent(const NdbEventImpl &);
+  int dropBlobEvents(const NdbEventImpl &);
 
-  int executeSubscribeEvent(NdbEventImpl &);
-  int stopSubscribeEvent(NdbEventImpl &);
+  int executeSubscribeEvent(NdbEventOperationImpl &);
+  int stopSubscribeEvent(NdbEventOperationImpl &);
+
+  int forceGCPWait();
 
   int listObjects(List& list, NdbDictionary::Object::Type type);
   int listIndexes(List& list, Uint32 indexId);
 
+  NdbTableImpl * getTableGlobal(const char * tableName);
+  NdbIndexImpl * getIndexGlobal(const char * indexName,
+                                NdbTableImpl &ndbtab);
+  int alterTableGlobal(NdbTableImpl &orig_impl, NdbTableImpl &impl);
+  int dropTableGlobal(NdbTableImpl &);
+  int dropIndexGlobal(NdbIndexImpl & impl);
+  int releaseTableGlobal(NdbTableImpl & impl, int invalidate);
+  int releaseIndexGlobal(NdbIndexImpl & impl, int invalidate);
+
   NdbTableImpl * getTable(const char * tableName, void **data= 0);
-  Ndb_local_table_info* get_local_table_info(
-    const BaseString& internalTableName, bool do_add_blob_tables);
+  NdbTableImpl * getBlobTable(const NdbTableImpl&, uint col_no);
+  NdbTableImpl * getBlobTable(uint tab_id, uint col_no);
+  void putTable(NdbTableImpl *impl);
+  int getBlobTables(NdbTableImpl &);
+  Ndb_local_table_info*
+    get_local_table_info(const BaseString& internalTableName);
   NdbIndexImpl * getIndex(const char * indexName,
 			  const char * tableName);
-  NdbIndexImpl * getIndex(const char * indexName,
-			  NdbTableImpl * table);
-  NdbEventImpl * getEvent(const char * eventName);
+  NdbIndexImpl * getIndex(const char * indexName, const NdbTableImpl& prim);
+  NdbEventImpl * getEvent(const char * eventName, NdbTableImpl* = NULL);
+  NdbEventImpl * getBlobEvent(const NdbEventImpl& ev, uint col_no);
   NdbEventImpl * getEventImpl(const char * internalName);
-  
+
+  int createDatafile(const NdbDatafileImpl &, bool force = false);
+  int dropDatafile(const NdbDatafileImpl &);
+  int createUndofile(const NdbUndofileImpl &, bool force = false);
+  int dropUndofile(const NdbUndofileImpl &);
+
+  int createTablespace(const NdbTablespaceImpl &);
+  int dropTablespace(const NdbTablespaceImpl &);
+
+  int createLogfileGroup(const NdbLogfileGroupImpl &);
+  int dropLogfileGroup(const NdbLogfileGroupImpl &);
+
   const NdbError & getNdbError() const;
   NdbError m_error;
   Uint32 m_local_table_data_size;
@@ -432,10 +646,15 @@
 
   NdbDictInterface m_receiver;
   Ndb & m_ndb;
-private:
+
+  NdbIndexImpl* getIndexImpl(const char * externalName,
+                             const BaseString& internalName,
+                             NdbTableImpl &tab,
+                             NdbTableImpl &prim);
   NdbIndexImpl * getIndexImpl(const char * name,
                               const BaseString& internalName);
-  Ndb_local_table_info * fetchGlobalTableImpl(const BaseString& internalName);
+private:
+  NdbTableImpl * fetchGlobalTableImplRef(const GlobalCacheInitObject &obj);
 };
 
 inline
@@ -501,13 +720,11 @@
 NdbColumnImpl::get_var_length(const void* value, Uint32& len) const
 {
   Uint32 max_len = m_attrSize * m_arraySize;
-  switch (m_type) {
-  case NdbDictionary::Column::Varchar:
-  case NdbDictionary::Column::Varbinary:
+  switch (m_arrayType) {
+  case NDB_ARRAYTYPE_SHORT_VAR:
     len = 1 + *((Uint8*)value);
     break;
-  case NdbDictionary::Column::Longvarchar:
-  case NdbDictionary::Column::Longvarbinary:
+  case NDB_ARRAYTYPE_MEDIUM_VAR:
     len = 2 + uint2korr((char*)value);
     break;
   default:
@@ -539,6 +756,13 @@
 }
 
 inline
+const char *
+NdbTableImpl::getMysqlName() const
+{
+  return m_mysqlName.c_str();
+}
+
+inline
 Uint32
 Hash( const char* str ){
   Uint32 h = 0;
@@ -663,81 +887,270 @@
  * Inline:d getters
  */
 
+class InitTable : public GlobalCacheInitObject
+{
+public:
+  InitTable(NdbDictionaryImpl *dict,
+            const BaseString &name) :
+    GlobalCacheInitObject(dict, name)
+  {}
+  int init(NdbTableImpl &tab) const
+  {
+    return m_dict->getBlobTables(tab);
+  }
+};
+
+inline
+NdbTableImpl *
+NdbDictionaryImpl::getTableGlobal(const char * table_name)
+{
+  const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
+  return fetchGlobalTableImplRef(InitTable(this, internal_tabname));
+}
+
 inline
 NdbTableImpl *
 NdbDictionaryImpl::getTable(const char * table_name, void **data)
 {
+  DBUG_ENTER("NdbDictionaryImpl::getTable");
+  DBUG_PRINT("enter", ("table: %s", table_name));
+
+  if (unlikely(strchr(table_name, '$') != 0)) {
+    Uint32 tab_id, col_no;
+    if (is_ndb_blob_table(table_name, &tab_id, &col_no)) {
+      NdbTableImpl* t = getBlobTable(tab_id, col_no);
+      DBUG_RETURN(t);
+    }
+  }
+
   const BaseString internal_tabname(m_ndb.internalize_table_name(table_name));
   Ndb_local_table_info *info=
-    get_local_table_info(internal_tabname, true);
+    get_local_table_info(internal_tabname);
   if (info == 0)
-    return 0;
-
+    DBUG_RETURN(0);
   if (data)
     *data= info->m_local_data;
-
-  return info->m_table_impl;
+  DBUG_RETURN(info->m_table_impl);
 }
 
 inline
 Ndb_local_table_info * 
-NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName,
-					bool do_add_blob_tables)
+NdbDictionaryImpl::get_local_table_info(const BaseString& internalTableName)
 {
+  DBUG_ENTER("NdbDictionaryImpl::get_local_table_info");
+  DBUG_PRINT("enter", ("table: %s", internalTableName.c_str()));
+
   Ndb_local_table_info *info= m_localHash.get(internalTableName.c_str());
-  if (info == 0) {
-    info= fetchGlobalTableImpl(internalTableName);
-    if (info == 0) {
-      return 0;
+  if (info == 0)
+  {
+    NdbTableImpl *tab=
+      fetchGlobalTableImplRef(InitTable(this, internalTableName));
+    if (tab)
+    {
+      info= Ndb_local_table_info::create(tab, m_local_table_data_size);
+      if (info)
+      {
+        m_localHash.put(internalTableName.c_str(), info);
+        m_ndb.theFirstTupleId[tab->getTableId()] = ~0;
+        m_ndb.theLastTupleId[tab->getTableId()]  = ~0;
+      }
     }
   }
-  if (do_add_blob_tables && info->m_table_impl->m_noOfBlobs)
-    addBlobTables(*(info->m_table_impl));
-  
-  return info; // autoincrement already initialized
+  DBUG_RETURN(info); // autoincrement already initialized
 }
 
+class InitIndex : public GlobalCacheInitObject
+{
+public:
+  const char *m_index_name;
+  const NdbTableImpl &m_prim;
+
+  InitIndex(const BaseString &internal_indexname,
+	    const char *index_name,
+	    const NdbTableImpl &prim) :
+    GlobalCacheInitObject(0, internal_indexname),
+    m_index_name(index_name),
+    m_prim(prim)
+    {}
+  
+  int init(NdbTableImpl &tab) const {
+    DBUG_ENTER("InitIndex::init");
+    DBUG_ASSERT(tab.m_indexType != NdbDictionary::Object::TypeUndefined);
+    /**
+     * Create index impl
+     */
+    NdbIndexImpl* idx;
+    if(NdbDictInterface::create_index_obj_from_table(&idx, &tab, &m_prim) == 0)
+    {
+      idx->m_table = &tab;
+      idx->m_externalName.assign(m_index_name);
+      idx->m_internalName.assign(m_name);
+      tab.m_index = idx;
+      DBUG_RETURN(0);
+    }
+    DBUG_RETURN(1);
+  }
+};
 
 inline
 NdbIndexImpl * 
-NdbDictionaryImpl::getIndex(const char * index_name,
-			    const char * table_name)
+NdbDictionaryImpl::getIndexGlobal(const char * index_name,
+                                  NdbTableImpl &ndbtab)
+{
+  DBUG_ENTER("NdbDictionaryImpl::getIndexGlobal");
+  const BaseString
+    internal_indexname(m_ndb.internalize_index_name(&ndbtab, index_name));
+  int retry= 2;
+
+  while (retry)
+  {
+    NdbTableImpl *tab=
+      fetchGlobalTableImplRef(InitIndex(internal_indexname,
+					index_name, ndbtab));
+    if (tab)
+    {
+      // tab->m_index sould be set. otherwise tab == 0
+      NdbIndexImpl *idx= tab->m_index;
+      if (idx->m_table_id != (unsigned)ndbtab.getObjectId() ||
+          idx->m_table_version != (unsigned)ndbtab.getObjectVersion())
+      {
+        releaseIndexGlobal(*idx, 1);
+        retry--;
+        continue;
+      }
+      DBUG_RETURN(idx);
+    }
+    break;
+  }
+  m_error.code= 4243;
+  DBUG_RETURN(0);
+}
+
+inline int
+NdbDictionaryImpl::releaseTableGlobal(NdbTableImpl & impl, int invalidate)
+{
+  DBUG_ENTER("NdbDictionaryImpl::releaseTableGlobal");
+  DBUG_PRINT("enter", ("internal_name: %s", impl.m_internalName.c_str()));
+  m_globalHash->lock();
+  m_globalHash->release(&impl, invalidate);
+  m_globalHash->unlock();
+  DBUG_RETURN(0);
+}
+
+inline int
+NdbDictionaryImpl::releaseIndexGlobal(NdbIndexImpl & impl, int invalidate)
 {
-  return getIndex(index_name, (table_name) ? getTable(table_name) : NULL);
+  DBUG_ENTER("NdbDictionaryImpl::releaseIndexGlobal");
+  DBUG_PRINT("enter", ("internal_name: %s", impl.m_internalName.c_str()));
+  m_globalHash->lock();
+  m_globalHash->release(impl.m_table, invalidate);
+  m_globalHash->unlock();
+  DBUG_RETURN(0);
 }
 
 inline
 NdbIndexImpl * 
 NdbDictionaryImpl::getIndex(const char * index_name,
-			    NdbTableImpl * table)
+			    const char * table_name)
 {
-  if (table || m_ndb.usingFullyQualifiedNames())
+  if (table_name == 0)
   {
-    const BaseString internal_indexname(
-      (table)
-      ?
-      m_ndb.internalize_index_name(table, index_name)
-      :
-      m_ndb.internalize_table_name(index_name)); // Index is also a table
-
-    if (internal_indexname.length())
-    {
-      Ndb_local_table_info * info=
-        get_local_table_info(internal_indexname, false);
-      if (info)
-      {
-	NdbTableImpl * tab= info->m_table_impl;
-        if (tab->m_index == 0)
-          tab->m_index= getIndexImpl(index_name, internal_indexname);
-        if (tab->m_index != 0)
-          tab->m_index->m_table= tab;
-        return tab->m_index;
-      }
-    }
+    assert(0);
+    m_error.code= 4243;
+    return 0;
+  }
+  
+  
+  NdbTableImpl* prim = getTable(table_name);
+  if (prim == 0)
+  {
+    m_error.code= 4243;
+    return 0;
   }
 
+  return getIndex(index_name, *prim);
+}
+
+inline
+NdbIndexImpl * 
+NdbDictionaryImpl::getIndex(const char* index_name,
+			    const NdbTableImpl& prim)
+{
+
+  const BaseString
+    internal_indexname(m_ndb.internalize_index_name(&prim, index_name));
+
+  Ndb_local_table_info *info= m_localHash.get(internal_indexname.c_str());
+  NdbTableImpl *tab;
+  if (info == 0)
+  {
+    tab= fetchGlobalTableImplRef(InitIndex(internal_indexname,
+					   index_name,
+					   prim));
+    if (!tab)
+      goto err;
+
+    info= Ndb_local_table_info::create(tab, 0);
+    if (!info)
+      goto err;
+    m_localHash.put(internal_indexname.c_str(), info);
+  }
+  else
+    tab= info->m_table_impl;
+  
+  return tab->m_index;
+  
+err:
   m_error.code= 4243;
   return 0;
+}
+
+inline
+NdbTablespaceImpl &
+NdbTablespaceImpl::getImpl(NdbDictionary::Tablespace & t){
+  return t.m_impl;
+}
+
+inline
+const NdbTablespaceImpl &
+NdbTablespaceImpl::getImpl(const NdbDictionary::Tablespace & t){
+  return t.m_impl;
+}
+
+inline
+NdbLogfileGroupImpl &
+NdbLogfileGroupImpl::getImpl(NdbDictionary::LogfileGroup & t){
+  return t.m_impl;
+}
+
+inline
+const NdbLogfileGroupImpl &
+NdbLogfileGroupImpl::getImpl(const NdbDictionary::LogfileGroup & t){
+  return t.m_impl;
+}
+
+inline
+NdbDatafileImpl &
+NdbDatafileImpl::getImpl(NdbDictionary::Datafile & t){
+  return t.m_impl;
+}
+
+inline
+const NdbDatafileImpl &
+NdbDatafileImpl::getImpl(const NdbDictionary::Datafile & t){
+  return t.m_impl;
+}
+
+inline
+NdbUndofileImpl &
+NdbUndofileImpl::getImpl(NdbDictionary::Undofile & t){
+  return t.m_impl;
+}
+
+inline
+const NdbUndofileImpl &
+NdbUndofileImpl::getImpl(const NdbDictionary::Undofile & t){
+  return t.m_impl;
 }
 
 #endif
Thread
bk commit into 5.1 tree (mskold:1.2181)Martin Skold24 May