List:Commits« Previous MessageNext Message »
From:tomas Date:January 11 2006 11:51am
Subject:bk commit into 5.1 tree (tomas:1.2033)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2033 06/01/11 12:51:36 tomas@stripped +28 -0
  Merge tulin@stripped:/home/bk/mysql-5.1-new
  into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-wl2325-v6

  storage/ndb/tools/restore/restore_main.cpp
    1.37 06/01/11 12:51:28 tomas@stripped +0 -1
    manual merge

  storage/ndb/tools/restore/consumer_restore.hpp
    1.12 06/01/11 12:51:28 tomas@stripped +0 -0
    manual merge

  storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
    1.14 06/01/11 12:51:28 tomas@stripped +0 -3
    manual merge

  sql/ha_ndbcluster.cc
    1.238 06/01/11 12:51:28 tomas@stripped +1 -4
    manual merge

  libmysqld/Makefile.am
    1.78 06/01/11 12:51:28 tomas@stripped +1 -2
    manual merge

  storage/ndb/tools/restore/consumer_restore.cpp
    1.23 06/01/11 12:45:13 tomas@stripped +0 -0
    Auto merged

  storage/ndb/tools/restore/consumer.hpp
    1.9 06/01/11 12:45:13 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/ndberror.c
    1.47 06/01/11 12:45:13 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
    1.6 06/01/11 12:45:13 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
    1.37 06/01/11 12:45:13 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
    1.39 06/01/11 12:45:12 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
    1.96 06/01/11 12:45:12 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/NdbDictionary.cpp
    1.43 06/01/11 12:45:12 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/suma/Suma.cpp
    1.31 06/01/11 12:45:12 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
    1.18 06/01/11 12:45:12 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
    1.101 06/01/11 12:45:12 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
    1.47 06/01/11 12:45:12 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
    1.13 06/01/11 12:45:11 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
    1.24 06/01/11 12:45:11 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
    1.65 06/01/11 12:45:11 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/backup/Backup.cpp
    1.32 06/01/11 12:45:11 tomas@stripped +0 -0
    Auto merged

  storage/ndb/include/ndbapi/NdbDictionary.hpp
    1.59 06/01/11 12:45:11 tomas@stripped +0 -0
    Auto merged

  sql/sql_parse.cc
    1.507 06/01/11 12:45:11 tomas@stripped +0 -0
    Auto merged

  sql/mysql_priv.h
    1.366 06/01/11 12:45:10 tomas@stripped +0 -0
    Auto merged

  sql/log.cc
    1.184 06/01/11 12:45:10 tomas@stripped +0 -0
    Auto merged

  sql/handler.h
    1.180 06/01/11 12:45:10 tomas@stripped +0 -0
    Auto merged

  sql/ha_ndbcluster.h
    1.106 06/01/11 12:45:10 tomas@stripped +0 -0
    Auto merged

  sql/Makefile.am
    1.132 06/01/11 12:45:10 tomas@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.ndb.mysql.com
# Root:	/home/tomas/mysql-5.1-wl2325-v6/RESYNC

--- 1.131/sql/Makefile.am	2006-01-10 15:04:27 +01:00
+++ 1.132/sql/Makefile.am	2006-01-11 12:45:10 +01:00
@@ -100,7 +100,7 @@
 			sp_head.cc sp_pcontext.cc  sp_rcontext.cc sp.cc \
 			sp_cache.cc parse_file.cc sql_trigger.cc \
 			sql_plugin.cc sql_binlog.cc \
-			handlerton.cc
+			handlerton.cc sql_tablespace.cc
 EXTRA_mysqld_SOURCES =	ha_innodb.cc ha_berkeley.cc ha_archive.cc \
 			ha_innodb.h  ha_berkeley.h  ha_archive.h \
 			ha_blackhole.cc ha_federated.cc ha_ndbcluster.cc \

--- 1.179/sql/handler.h	2006-01-11 08:39:29 +01:00
+++ 1.180/sql/handler.h	2006-01-11 12:45:10 +01:00
@@ -325,6 +325,82 @@
 #define MAX_XID_LIST_SIZE  (1024*128)
 #endif
 
+/*
+  These structures are used to pass information from a set of SQL commands
+  on add/drop/change tablespace definitions to the proper hton.
+*/
+#define UNDEF_NODEGROUP 65535
+enum ts_command_type
+{
+  TS_CMD_NOT_DEFINED = -1,
+  CREATE_TABLESPACE = 0,
+  ALTER_TABLESPACE = 1,
+  CREATE_LOGFILE_GROUP = 2,
+  ALTER_LOGFILE_GROUP = 3,
+  DROP_TABLESPACE = 4,
+  DROP_LOGFILE_GROUP = 5,
+  CHANGE_FILE_TABLESPACE = 6,
+  ALTER_ACCESS_MODE_TABLESPACE = 7
+};
+
+enum ts_alter_tablespace_type
+{
+  TS_ALTER_TABLESPACE_TYPE_NOT_DEFINED = -1,
+  ALTER_TABLESPACE_ADD_FILE = 1,
+  ALTER_TABLESPACE_DROP_FILE = 2
+};
+
+enum tablespace_access_mode
+{
+  TS_NOT_DEFINED= -1,
+  TS_READ_ONLY = 0,
+  TS_READ_WRITE = 1,
+  TS_NOT_ACCESSIBLE = 2
+};
+
+class st_alter_tablespace : public Sql_alloc
+{
+  public:
+  const char *tablespace_name;
+  const char *logfile_group_name;
+  enum ts_command_type ts_cmd_type;
+  enum ts_alter_tablespace_type ts_alter_tablespace_type;
+  const char *data_file_name;
+  const char *undo_file_name;
+  const char *redo_file_name;
+  ulonglong extent_size;
+  ulonglong undo_buffer_size;
+  ulonglong redo_buffer_size;
+  ulonglong initial_size;
+  ulonglong autoextend_size;
+  ulonglong max_size;
+  uint nodegroup_id;
+  enum legacy_db_type storage_engine;
+  bool wait_until_completed;
+  const char *ts_comment;
+  enum tablespace_access_mode ts_access_mode;
+  st_alter_tablespace()
+  {
+    tablespace_name= NULL;
+    logfile_group_name= "DEFAULT_LG"; //Default log file group
+    ts_cmd_type= TS_CMD_NOT_DEFINED;
+    data_file_name= NULL;
+    undo_file_name= NULL;
+    redo_file_name= NULL;
+    extent_size= 1024*1024;        //Default 1 MByte
+    undo_buffer_size= 8*1024*1024; //Default 8 MByte
+    redo_buffer_size= 8*1024*1024; //Default 8 MByte
+    initial_size= 128*1024*1024;   //Default 128 MByte
+    autoextend_size= 0;            //No autoextension as default
+    max_size= 0;                   //Max size == initial size => no extension
+    storage_engine= DB_TYPE_UNKNOWN;
+    nodegroup_id= UNDEF_NODEGROUP;
+    wait_until_completed= TRUE;
+    ts_comment= NULL;
+    ts_access_mode= TS_NOT_DEFINED;
+  }
+};
+
 /* The handler for a table type.  Will be included in the TABLE structure */
 
 struct st_table;
@@ -444,6 +520,7 @@
    int (*start_consistent_snapshot)(THD *thd);
    bool (*flush_logs)();
    bool (*show_status)(THD *thd, stat_print_fn *print, enum ha_stat_type stat);
+   int (*alter_tablespace)(THD *thd, st_alter_tablespace *ts_info);
    uint32 flags;                                /* global handler flags */
    /* 
       Handlerton functions are not set in the different storage
@@ -755,7 +832,7 @@
 {
   CHARSET_INFO *table_charset, *default_table_charset;
   LEX_STRING connect_string;
-  const char *comment,*password;
+  const char *comment,*password, *tablespace;
   const char *data_file_name, *index_file_name;
   const char *alias;
   ulonglong max_rows,min_rows;
@@ -775,6 +852,7 @@
   bool table_existed;			/* 1 in create if table existed */
   bool frm_only;                        /* 1 if no ha_create_table() */
   bool varchar;                         /* 1 if table has a VARCHAR */
+  bool store_on_disk;                   /* 1 if table stored on disk */
 } HA_CREATE_INFO;
 
 
@@ -852,7 +930,6 @@
   const byte *buffer_end;     /* End of buffer */
   byte *end_of_used_area;     /* End of area that was used by handler */
 } HANDLER_BUFFER;
-
 
 class handler :public Sql_alloc
 {

--- 1.183/sql/log.cc	2006-01-09 21:33:28 +01:00
+++ 1.184/sql/log.cc	2006-01-11 12:45:10 +01:00
@@ -88,6 +88,7 @@
   NULL,                         /* Start Consistent Snapshot */
   NULL,                         /* Flush logs */
   NULL,                         /* Show status */
+  NULL,                         /* Alter Tablespace */
   HTON_NOT_USER_SELECTABLE | HTON_HIDDEN
 };
 

--- 1.365/sql/mysql_priv.h	2006-01-10 23:33:07 +01:00
+++ 1.366/sql/mysql_priv.h	2006-01-11 12:45:10 +01:00
@@ -693,6 +693,7 @@
 bool mysql_xa_recover(THD *thd);
 
 bool check_simple_select();
+int mysql_alter_tablespace(THD* thd, st_alter_tablespace *ts_info);
 
 SORT_FIELD * make_unireg_sortorder(ORDER *order, uint *length);
 int setup_order(THD *thd, Item **ref_pointer_array, TABLE_LIST *tables,

--- 1.506/sql/sql_parse.cc	2006-01-09 15:05:24 +01:00
+++ 1.507/sql/sql_parse.cc	2006-01-11 12:45:11 +01:00
@@ -4818,6 +4818,12 @@
   case SQLCOM_XA_RECOVER:
     res= mysql_xa_recover(thd);
     break;
+  case SQLCOM_ALTER_TABLESPACE:
+    if (check_access(thd, ALTER_ACL, thd->db, 0, 1, 0, thd->db ? is_schema_db(thd->db) : 0))
+      break;
+    if (!(res= mysql_alter_tablespace(thd, lex->alter_tablespace_info)))
+      send_ok(thd);
+    break;
   case SQLCOM_INSTALL_PLUGIN:
     if (! (res= mysql_install_plugin(thd, &thd->lex->comment,
                                      &thd->lex->ident)))

--- 1.58/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-01-05 12:01:58 +01:00
+++ 1.59/storage/ndb/include/ndbapi/NdbDictionary.hpp	2006-01-11 12:45:11 +01:00
@@ -93,7 +93,9 @@
      * Get version of object
      */
     virtual int getObjectVersion() const = 0;
-
+    
+    virtual int getObjectId() const = 0;
+    
     /**
      * Object type
      */
@@ -501,6 +503,8 @@
     static const Column * RANGE_NO;
     static const Column * DISK_REF;
     static const Column * RECORDS_IN_RANGE;
+    static const Column * ROWID;
+    static const Column * ROW_GCI;
     
     int getSizeInBytes() const;
 #endif
@@ -751,6 +755,7 @@
     void setTablespace(const char * name);
     void setTablespace(const class Tablespace &);
     const char * getTablespace() const;
+    Uint32 getTablespaceId() const;
 
     /**
      * Get table object type
@@ -768,6 +773,11 @@
     virtual int getObjectVersion() const;
 
     /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
+    /**
      * Set frm file to store with this table
      */ 
     void setFrm(const void* data, Uint32 len);
@@ -784,6 +794,15 @@
 
     /** @} *******************************************************************/
 
+    /**
+     * 
+     */
+    void setRowGCIIndicator(bool value);
+    bool getRowGCIIndicator() const;
+
+    void setRowChecksumIndicator(bool value);
+    bool getRowChecksumIndicator() const;
+ 
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     const char *getMysqlName() const;
 
@@ -887,6 +906,11 @@
      */
     virtual int getObjectVersion() const;
 
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
     /** @} *******************************************************************/
 
     /** 
@@ -1180,6 +1204,11 @@
      */
     virtual int getObjectVersion() const;
 
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     void print();
 #endif
@@ -1206,6 +1235,7 @@
   class LogfileGroup : public Object {
   public:
     LogfileGroup();
+    LogfileGroup(const LogfileGroup&);
     virtual ~LogfileGroup();
 
     void setName(const char * name);
@@ -1217,6 +1247,8 @@
     void setAutoGrowSpecification(const AutoGrowSpecification&);
     const AutoGrowSpecification& getAutoGrowSpecification() const;
 
+    Uint64 getUndoFreeWords() const;
+
     /**
      * Get object status
      */
@@ -1227,6 +1259,11 @@
      */
     virtual int getObjectVersion() const;
 
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
   private:
     friend class NdbDictionaryImpl;
     friend class NdbLogfileGroupImpl;
@@ -1240,6 +1277,7 @@
   class Tablespace : public Object {
   public:
     Tablespace();
+    Tablespace(const Tablespace&);
     virtual ~Tablespace();
 
     void setName(const char * name);
@@ -1253,7 +1291,9 @@
 
     void setDefaultLogfileGroup(const char * name);
     void setDefaultLogfileGroup(const class LogfileGroup&);
+
     const char * getDefaultLogfileGroup() const;
+    Uint32 getDefaultLogfileGroupId() const;
     
     /**
      * Get object status
@@ -1265,6 +1305,11 @@
      */
     virtual int getObjectVersion() const;
 
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
   private:
     friend class NdbTablespaceImpl;
     class NdbTablespaceImpl & m_impl;
@@ -1274,6 +1319,7 @@
   class Datafile : public Object {
   public:
     Datafile();
+    Datafile(const Datafile&);
     virtual ~Datafile();
 
     void setPath(const char * name);
@@ -1286,6 +1332,7 @@
     void setTablespace(const char * name);
     void setTablespace(const class Tablespace &);
     const char * getTablespace() const;
+    Uint32 getTablespaceId() const;
 
     void setNode(Uint32 nodeId);
     Uint32 getNode() const;
@@ -1302,6 +1349,11 @@
      */
     virtual int getObjectVersion() const;
 
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
+
   private:
     friend class NdbDatafileImpl;
     class NdbDatafileImpl & m_impl;
@@ -1311,6 +1363,7 @@
   class Undofile : public Object {
   public:
     Undofile();
+    Undofile(const Undofile&);
     virtual ~Undofile();
 
     void setPath(const char * path);
@@ -1318,11 +1371,11 @@
   
     void setSize(Uint64);
     Uint64 getSize() const;
-    Uint64 getFree() const;
 
     void setLogfileGroup(const char * name);
     void setLogfileGroup(const class LogfileGroup &);
     const char * getLogfileGroup() const;
+    Uint32 getLogfileGroupId() const;
 
     void setNode(Uint32 nodeId);
     Uint32 getNode() const;
@@ -1338,6 +1391,11 @@
      * Get object version
      */
     virtual int getObjectVersion() const;
+
+    /**
+     * Get object id
+     */
+    virtual int getObjectId() const;
 
   private:
     friend class NdbUndofileImpl;

--- 1.31/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2006-01-10 12:37:10 +01:00
+++ 1.32/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2006-01-11 12:45:11 +01:00
@@ -14,6 +14,7 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
+#include <my_config.h>
 #include "Backup.hpp"
 
 #include <ndb_version.h>
@@ -2516,15 +2517,22 @@
     Uint32 tableId = ListTablesConf::getTableId(conf->tableData[i]);
     Uint32 tableType = ListTablesConf::getTableType(conf->tableData[i]);
     Uint32 state= ListTablesConf::getTableState(conf->tableData[i]);
-    if (!DictTabInfo::isTable(tableType) && !DictTabInfo::isIndex(tableType)){
+
+    if (! (DictTabInfo::isTable(tableType) ||
+	   DictTabInfo::isIndex(tableType) ||
+	   DictTabInfo::isFilegroup(tableType) ||
+	   DictTabInfo::isFile(tableType)))
+    {
       jam();
       continue;
-    }//if
+    }
+    
     if (state != DictTabInfo::StateOnline)
     {
       jam();
       continue;
-    }//if
+    }
+    
     TablePtr tabPtr;
     ptr.p->tables.seize(tabPtr);
     if(tabPtr.i == RNIL) {
@@ -2834,6 +2842,8 @@
   //const Uint32 senderRef = info->senderRef;
   const Uint32 len = conf->totalLen;
   const Uint32 senderData = conf->senderData;
+  const Uint32 tableType = conf->tableType;
+  const Uint32 tableId = conf->tableId;
 
   BackupRecordPtr ptr;
   c_backupPool.getPtr(ptr, senderData);
@@ -2842,6 +2852,9 @@
   signal->getSection(dictTabInfoPtr, GetTabInfoConf::DICT_TAB_INFO);
   ndbrequire(dictTabInfoPtr.sz == len);
 
+  TablePtr tabPtr ;
+  ndbrequire(findTable(ptr, tabPtr, tableId));
+
   /**
    * No of pages needed
    */
@@ -2862,7 +2875,7 @@
   ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
   FsBuffer & buf = filePtr.p->operation.dataBuffer;
   { // Write into ctl file
-    Uint32* dst, dstLen = len + 2;
+    Uint32* dst, dstLen = len + 3;
     if(!buf.getWritePtr(&dst, dstLen)) {
       jam();
       ndbrequire(false);
@@ -2877,49 +2890,62 @@
       BackupFormat::CtlFile::TableDescription * desc = 
         (BackupFormat::CtlFile::TableDescription*)dst;
       desc->SectionType = htonl(BackupFormat::TABLE_DESCRIPTION);
-      desc->SectionLength = htonl(len + 2);
-      dst += 2;
-
+      desc->SectionLength = htonl(len + 3);
+      desc->TableType = htonl(tableType);
+      dst += 3;
+      
       copy(dst, dictTabInfoPtr);
       buf.updateWritePtr(dstLen);
     }//if
   }
-  
-  ndbrequire(ptr.p->pages.getSize() >= noPages);
-  Page32Ptr pagePtr;
-  ptr.p->pages.getPtr(pagePtr, 0);
-  copy(&pagePtr.p->data[0], dictTabInfoPtr);
-  releaseSections(signal);
-  
+
   if(ptr.p->checkError()) {
     jam();
+    releaseSections(signal);
     defineBackupRef(signal, ptr);
     return;
   }//if
 
-  TablePtr tabPtr = parseTableDescription(signal, ptr, len);
-  if(tabPtr.i == RNIL) {
+  if (!DictTabInfo::isTable(tabPtr.p->tableType))
+  {
     jam();
-    defineBackupRef(signal, ptr);
-    return;
-  }//if
+    releaseSections(signal);
 
-  TablePtr tmp = tabPtr;
-  ptr.p->tables.next(tabPtr);
-  if(DictTabInfo::isIndex(tmp.p->tableType))
+    TablePtr tmp = tabPtr;
+    ptr.p->tables.next(tabPtr);
+    ptr.p->tables.release(tmp);
+    goto next;
+  }
+  
+  ndbrequire(ptr.p->pages.getSize() >= noPages);
+  Page32Ptr pagePtr;
+  ptr.p->pages.getPtr(pagePtr, 0);
+  copy(&pagePtr.p->data[0], dictTabInfoPtr);
+  releaseSections(signal);
+  
+  if (!parseTableDescription(signal, ptr, tabPtr, len))
   {
     jam();
-    ptr.p->tables.release(tmp);
+    defineBackupRef(signal, ptr);
+    return;
   }
-  else if(!ptr.p->is_lcp())
+  
+  if(!ptr.p->is_lcp())
   {
     jam();
-    signal->theData[0] = tmp.p->tableId;
+    signal->theData[0] = tabPtr.p->tableId;
     signal->theData[1] = 1; // lock
     EXECUTE_DIRECT(DBDICT, GSN_BACKUP_FRAGMENT_REQ, signal, 2);
   }
   
-  if(tabPtr.i == RNIL) {
+  ptr.p->tables.next(tabPtr);
+  
+next:
+  if(tabPtr.i == RNIL) 
+  {
+    /**
+     * Done with all tables...
+     */
     jam();
     
     ptr.p->pages.release();
@@ -2939,6 +2965,9 @@
     return;
   }//if
 
+  /**
+   * Fetch next table...
+   */
   signal->theData[0] = BackupContinueB::BUFFER_FULL_META;
   signal->theData[1] = ptr.i;
   signal->theData[2] = tabPtr.i;
@@ -2946,8 +2975,11 @@
   return;
 }
 
-Backup::TablePtr
-Backup::parseTableDescription(Signal* signal, BackupRecordPtr ptr, Uint32 len)
+bool
+Backup::parseTableDescription(Signal* signal, 
+			      BackupRecordPtr ptr, 
+			      TablePtr tabPtr, 
+			      Uint32 len)
 {
 
   Page32Ptr pagePtr;
@@ -2964,18 +2996,15 @@
 				  DictTabInfo::TableMappingSize, 
 				  true, true);
   ndbrequire(stat == SimpleProperties::Break);
-  
-  TablePtr tabPtr;
-  ndbrequire(findTable(ptr, tabPtr, tmpTab.TableId));
-  if(DictTabInfo::isIndex(tabPtr.p->tableType)){
-    jam();
-    return tabPtr;
-  }
 
+  bool lcp = ptr.p->is_lcp();
+
+  ndbrequire(tabPtr.p->tableId == tmpTab.TableId);
+  ndbrequire(lcp || (tabPtr.p->tableType == tmpTab.TableType));
+  
   /**
    * LCP should not save disk attributes but only mem attributes
    */
-  bool lcp = ptr.p->is_lcp();
   
   /**
    * Initialize table object
@@ -3020,8 +3049,7 @@
     {
       jam();
       ptr.p->setErrorCode(DefineBackupRef::FailedToAllocateAttributeRecord);
-      tabPtr.i = RNIL;
-      return tabPtr;
+      return false;
     }
     
     attrPtr.p->data.m_flags = 0;
@@ -3048,26 +3076,58 @@
     }
   }//for
 
-  if(lcp && disk)
+
+  if(lcp)
   {
-    /**
-     * Remove all disk attributes, but add DISK_REF (8 bytes)
-     */
-    tabPtr.p->noOfAttributes -= (disk - 1);
-    
-    AttributePtr attrPtr;
-    ndbrequire(tabPtr.p->attributes.seize(attrPtr));
-    
-    Uint32 sz32 = 2;
-    attrPtr.p->data.m_flags = 0;
-    attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
-    attrPtr.p->data.m_flags = Attribute::COL_FIXED;
-    attrPtr.p->data.sz32 = sz32;
-    
-    attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
-    tabPtr.p->sz_FixedAttributes += sz32;
+    if (disk)
+    {
+      /**
+       * Remove all disk attributes, but add DISK_REF (8 bytes)
+       */
+      tabPtr.p->noOfAttributes -= (disk - 1);
+      
+      AttributePtr attrPtr;
+      ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+      
+      Uint32 sz32 = 2;
+      attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
+      attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+      attrPtr.p->data.sz32 = 2;
+      
+      attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+      tabPtr.p->sz_FixedAttributes += sz32;
+    }
+   
+    {
+      AttributePtr attrPtr;
+      ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+      
+      Uint32 sz32 = 2;
+      attrPtr.p->data.attrId = AttributeHeader::ROWID;
+      attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+      attrPtr.p->data.sz32 = 2;
+      
+      attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+      tabPtr.p->sz_FixedAttributes += sz32;
+      tabPtr.p->noOfAttributes ++;
+    }
+
+    if (tmpTab.RowGCIFlag)
+    {
+      AttributePtr attrPtr;
+      ndbrequire(tabPtr.p->attributes.seize(attrPtr));
+      
+      Uint32 sz32 = 2;
+      attrPtr.p->data.attrId = AttributeHeader::ROW_GCI;
+      attrPtr.p->data.m_flags = Attribute::COL_FIXED;
+      attrPtr.p->data.sz32 = 2;
+      
+      attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
+      tabPtr.p->sz_FixedAttributes += sz32;
+      tabPtr.p->noOfAttributes ++;
+    }
   }
-  return tabPtr;
+  return true;
 }
 
 void

--- 1.36/storage/ndb/tools/restore/restore_main.cpp	2006-01-05 12:01:59 +01:00
+++ 1.37/storage/ndb/tools/restore/restore_main.cpp	2006-01-11 12:51:28 +01:00
@@ -51,6 +51,7 @@
 static int _print_log = 0;
 static int _restore_data = 0;
 static int _restore_meta = 0;
+static int _no_restore_disk = 0;
   
 static struct my_option my_long_options[] =
 {
@@ -72,6 +73,9 @@
     "Restore meta data into NDB Cluster using NDBAPI",
     (gptr*) &_restore_meta, (gptr*) &_restore_meta,  0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "no-restore-disk-objects", 'd',
+    "Dont restore disk objects (tablespace/logfilegroups etc)",
+    (gptr*) &_no_restore_disk, (gptr*) &_no_restore_disk,  0,
   { "restore_epoch", 'e', 
     "Restore epoch info into the status table. Convenient on a MySQL Cluster "
     "replication slave, for starting replication. The row in "
@@ -197,6 +201,11 @@
     restore->m_restore_meta = true;
   }
 
+  if (_no_restore_disk)
+  {
+    restore->m_no_restore_disk = true;
+  }
+  
   if (ga_restore_epoch)
   {
     restore->m_restore_epoch = true;
@@ -320,6 +329,19 @@
 
   }
 
+  for(i = 0; i<metaData.getNoOfObjects(); i++)
+  {
+    for(Uint32 j= 0; j < g_consumers.size(); j++)
+      if (!g_consumers[j]->object(metaData.getObjType(i),
+				  metaData.getObjPtr(i)))
+      {
+	ndbout_c("Restore: Failed to restore table: %s. "
+		 "Exiting...", 
+		 metaData[i]->getTableName());
+	exitHandler(NDBT_FAILED);
+      } 
+  }
+  
   for(i = 0; i<metaData.getNoOfTables(); i++)
   {
     if (checkSysTable(metaData[i]->getTableName()))

--- 1.64/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2006-01-11 11:31:02 +01:00
+++ 1.65/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2006-01-11 12:45:11 +01:00
@@ -353,7 +353,9 @@
   case DictTabInfo::LogfileGroup:{
     FilegroupPtr fg_ptr;
     ndbrequire(c_filegroup_hash.find(fg_ptr, tableId));
-    packFilegroupIntoPages(w, fg_ptr);
+    const Uint32 free_hi= signal->theData[4];
+    const Uint32 free_lo= signal->theData[5];
+    packFilegroupIntoPages(w, fg_ptr, free_hi, free_lo);
     break;
   }
   case DictTabInfo::Datafile:{
@@ -420,7 +422,13 @@
   w.add(DictTabInfo::NoOfVariable, (Uint32)0);
   w.add(DictTabInfo::KeyLength, tablePtr.p->tupKeyLength);
   
-  w.add(DictTabInfo::TableLoggedFlag, tablePtr.p->storedTable);
+  w.add(DictTabInfo::TableLoggedFlag, 
+	!!(tablePtr.p->m_bits & TableRecord::TR_Logged));
+  w.add(DictTabInfo::RowGCIFlag, 
+	!!(tablePtr.p->m_bits & TableRecord::TR_RowGCI));
+  w.add(DictTabInfo::RowChecksumFlag, 
+	!!(tablePtr.p->m_bits & TableRecord::TR_RowChecksum));
+  
   w.add(DictTabInfo::MinLoadFactor, tablePtr.p->minLoadFactor);
   w.add(DictTabInfo::MaxLoadFactor, tablePtr.p->maxLoadFactor);
   w.add(DictTabInfo::TableKValue, tablePtr.p->kValue);
@@ -537,7 +545,9 @@
 
 void
 Dbdict::packFilegroupIntoPages(SimpleProperties::Writer & w,
-			       FilegroupPtr fg_ptr){
+			       FilegroupPtr fg_ptr,
+			       const Uint32 undo_free_hi,
+			       const Uint32 undo_free_lo){
   
   DictFilegroupInfo::Filegroup fg; fg.init();
   ConstRope r(c_rope_pool, fg_ptr.p->m_name);
@@ -558,6 +568,8 @@
     break;
   case DictTabInfo::LogfileGroup:
     fg.LF_UndoBufferSize = fg_ptr.p->m_logfilegroup.m_undo_buffer_size;
+    fg.LF_UndoFreeWordsHi= undo_free_hi;
+    fg.LF_UndoFreeWordsLo= undo_free_lo;
     //fg.LF_UndoGrow = ;
     break;
   default:
@@ -1799,7 +1811,7 @@
   tablePtr.p->minLoadFactor = 70;
   tablePtr.p->noOfPrimkey = 1;
   tablePtr.p->tupKeyLength = 1;
-  tablePtr.p->storedTable = true;
+  tablePtr.p->m_bits = 0;
   tablePtr.p->tableType = DictTabInfo::UserTable;
   tablePtr.p->primaryTableId = RNIL;
   // volatile elements
@@ -2314,7 +2326,7 @@
     req->setParallelism(16);
 
     // from file index state is not defined currently
-    if (indexPtr.p->storedTable) {
+    if (indexPtr.p->m_bits & TableRecord::TR_Logged) {
       // rebuild not needed
       req->addRequestFlag((Uint32)RequestFlag::RF_NOBUILD);
     }
@@ -2984,6 +2996,26 @@
       signal->theData[4]= free_extents;
       sendSignal(reference(), GSN_CONTINUEB, signal, 5, JBB);
     }
+    else if(refToBlock(conf->senderRef) == LGMAN
+	    && (refToNode(conf->senderRef) == 0
+		|| refToNode(conf->senderRef) == getOwnNodeId()))
+    {
+      jam();
+      FilegroupPtr fg_ptr;
+      ndbrequire(c_filegroup_hash.find(fg_ptr, conf->tableId));
+      const Uint32 free_hi= conf->freeWordsHi;
+      const Uint32 free_lo= conf->freeWordsLo;
+      const Uint32 id= conf->tableId;
+      const Uint32 type= conf->tableType;
+      const Uint32 data= conf->senderData;
+      signal->theData[0]= ZPACK_TABLE_INTO_PAGES;
+      signal->theData[1]= id;
+      signal->theData[2]= type;
+      signal->theData[3]= data;
+      signal->theData[4]= free_hi;
+      signal->theData[5]= free_lo;
+      sendSignal(reference(), GSN_CONTINUEB, signal, 6, JBB);
+    }
     else
     {
       jam();
@@ -5072,7 +5104,7 @@
   req->fragType = tabPtr.p->fragmentType;
   req->kValue = tabPtr.p->kValue;
   req->noOfReplicas = 0;
-  req->storedTable = tabPtr.p->storedTable;
+  req->storedTable = !!(tabPtr.p->m_bits & TableRecord::TR_Logged);
   req->tableType = tabPtr.p->tableType;
   req->schemaVersion = tabPtr.p->tableVersion;
   req->primaryTableId = tabPtr.p->primaryTableId;
@@ -5171,6 +5203,7 @@
   Uint32 fragCount = req->totalFragments;
   Uint32 requestInfo = req->requestInfo;
   Uint32 startGci = req->startGci;
+  Uint32 logPart = req->logPartId;
 
   ndbrequire(node == getOwnNodeId());
 
@@ -5220,11 +5253,12 @@
     // noOfCharsets passed to TUP in upper half
     req->noOfNewAttr |= (tabPtr.p->noOfCharsets << 16);
     req->checksumIndicator = 1;
-    req->GCPIndicator = 0;
+    req->GCPIndicator = 1;
     req->startGci = startGci;
     req->tableType = tabPtr.p->tableType;
     req->primaryTableId = tabPtr.p->primaryTableId;
     req->tablespace_id= tabPtr.p->m_tablespace_id;
+    req->logPartId = logPart;
     sendSignal(DBLQH_REF, GSN_LQHFRAGREQ, signal, 
 	       LqhFragReq::SignalLength, JBB);
   }
@@ -5417,7 +5451,7 @@
     
     signal->theData[0] = tabPtr.i;
     signal->theData[1] = tabPtr.p->tableVersion;
-    signal->theData[2] = (Uint32)tabPtr.p->storedTable;     
+    signal->theData[2] = (Uint32)!!(tabPtr.p->m_bits & TableRecord::TR_Logged);
     signal->theData[3] = reference();
     signal->theData[4] = (Uint32)tabPtr.p->tableType;
     signal->theData[5] = createTabPtr.p->key;
@@ -5821,7 +5855,12 @@
   }
   
   tablePtr.p->noOfAttributes = tableDesc.NoOfAttributes;
-  tablePtr.p->storedTable = tableDesc.TableLoggedFlag;
+  tablePtr.p->m_bits |= 
+    (tableDesc.TableLoggedFlag ? TableRecord::TR_Logged : 0);
+  tablePtr.p->m_bits |= 
+    (tableDesc.RowChecksumFlag ? TableRecord::TR_RowChecksum : 0);
+  tablePtr.p->m_bits |= 
+    (tableDesc.RowGCIFlag ? TableRecord::TR_RowGCI : 0);
   tablePtr.p->minLoadFactor = tableDesc.MinLoadFactor;
   tablePtr.p->maxLoadFactor = tableDesc.MaxLoadFactor;
   tablePtr.p->fragmentType = (DictTabInfo::FragmentType)tableDesc.FragmentType;
@@ -5858,7 +5897,7 @@
   tablePtr.p->buildTriggerId = RNIL;
   tablePtr.p->indexLocal = 0;
   
-  handleTabInfo(it, parseP, tableDesc.TablespaceVersion);
+  handleTabInfo(it, parseP, tableDesc);
   
   if(parseP->errorCode != 0)
   {
@@ -5871,7 +5910,7 @@
 
 void Dbdict::handleTabInfo(SimpleProperties::Reader & it,
 			   ParseDictTabInfoRecord * parseP,
-			   Uint32 tablespaceVersion)
+			   DictTabInfo::Table &tableDesc)
 {
   TableRecordPtr tablePtr = parseP->tablePtr;
   
@@ -6110,7 +6149,7 @@
       tabRequire(false, CreateTableRef::NotATablespace);
     }
     
-    if(tablespacePtr.p->m_version != tablespaceVersion)
+    if(tablespacePtr.p->m_version != tableDesc.TablespaceVersion)
     {
       tabRequire(false, CreateTableRef::InvalidTablespaceVersion);
     }
@@ -7080,6 +7119,18 @@
     sendSignal(TSMAN_REF, GSN_GET_TABINFOREQ, signal,
 	       GetTabInfoReq::SignalLength, JBB);
   }
+  else if(objEntry->m_tableType==DictTabInfo::LogfileGroup)
+  {
+    jam();
+    GetTabInfoReq *req= (GetTabInfoReq*)signal->theData;
+    req->senderData= c_retrieveRecord.retrievePage;
+    req->senderRef= reference();
+    req->requestType= GetTabInfoReq::RequestById;
+    req->tableId= obj_id;
+
+    sendSignal(LGMAN_REF, GSN_GET_TABINFOREQ, signal,
+	       GetTabInfoReq::SignalLength, JBB);
+  }
   else
   {
     jam();
@@ -7228,7 +7279,7 @@
 	}
       }
       // store
-      if (! tablePtr.p->storedTable) {
+      if (! (tablePtr.p->m_bits & TableRecord::TR_Logged)) {
 	conf->setTableStore(pos, DictTabInfo::StoreTemporary);
       } else {
 	conf->setTableStore(pos, DictTabInfo::StorePermanent);
@@ -7261,6 +7312,7 @@
       conf->tableData[pos] = 0;
       conf->setTableId(pos, iter.curr.p->m_id);
       conf->setTableType(pos, type); // type
+      conf->setTableState(pos, DictTabInfo::StateOnline);  // XXX todo
       pos++;
     }
     if (DictTabInfo::isFile(type)){
@@ -7268,6 +7320,7 @@
       conf->tableData[pos] = 0;
       conf->setTableId(pos, iter.curr.p->m_id);
       conf->setTableType(pos, type); // type
+      conf->setTableState(pos, DictTabInfo::StateOnline); // XXX todo
       pos++;
     }
     
@@ -7589,8 +7642,9 @@
   indexPtr.i = RNIL;            // invalid
   indexPtr.p = &indexRec;
   initialiseTableRecord(indexPtr);
+  indexPtr.p->m_bits = TableRecord::TR_RowChecksum;
   if (req->getIndexType() == DictTabInfo::UniqueHashIndex) {
-    indexPtr.p->storedTable = opPtr.p->m_storedIndex;
+    indexPtr.p->m_bits |= (opPtr.p->m_storedIndex ? TableRecord::TR_Logged:0);
     indexPtr.p->fragmentType = DictTabInfo::DistrKeyUniqueHashIndex;
   } else if (req->getIndexType() == DictTabInfo::OrderedIndex) {
     // first version will not supported logging
@@ -7600,7 +7654,6 @@
       opPtr.p->m_errorLine = __LINE__;
       return;
     }
-    indexPtr.p->storedTable = false;
     indexPtr.p->fragmentType = DictTabInfo::DistrKeyOrderedIndex;
   } else {
     jam();
@@ -7684,7 +7737,7 @@
   indexPtr.p->noOfNullAttr = 0;
   // write index table
   w.add(DictTabInfo::TableName, opPtr.p->m_indexName);
-  w.add(DictTabInfo::TableLoggedFlag, indexPtr.p->storedTable);
+  w.add(DictTabInfo::TableLoggedFlag, !!(indexPtr.p->m_bits & TableRecord::TR_Logged));
   w.add(DictTabInfo::FragmentTypeVal, indexPtr.p->fragmentType);
   w.add(DictTabInfo::TableTypeVal, indexPtr.p->tableType);
   Rope name(c_rope_pool, tablePtr.p->tableName);
@@ -13913,6 +13966,7 @@
   createObjPtr.p->m_obj_type = objType;
   createObjPtr.p->m_obj_version = objVersion;
   createObjPtr.p->m_obj_info_ptr_i = objInfoPtr.i;
+  createObjPtr.p->m_obj_ptr_i = RNIL;
 
   createObjPtr.p->m_callback.m_callbackData = key;
   createObjPtr.p->m_callback.m_callbackFunction= 
@@ -14609,6 +14663,9 @@
   SegmentedSectionPtr objInfoPtr;
   getSection(objInfoPtr, ((OpCreateObj*)op)->m_obj_info_ptr_i);
   SimplePropertiesSectionReader it(objInfoPtr, getSectionSegmentPool());
+
+  Ptr<DictObject> obj_ptr; obj_ptr.setNull();
+  FilegroupPtr fg_ptr; fg_ptr.setNull();
   
   SimpleProperties::UnpackStatus status;
   DictFilegroupInfo::Filegroup fg; fg.init();
@@ -14648,15 +14705,12 @@
       break;
     }
     
-    Ptr<DictObject> obj_ptr;
     if(!c_obj_pool.seize(obj_ptr)){
       op->m_errorCode = CreateTableRef::NoMoreTableRecords;
       break;
     }
     
-    FilegroupPtr fg_ptr;
     if(!c_filegroup_pool.seize(fg_ptr)){
-      c_obj_pool.release(obj_ptr);
       op->m_errorCode = CreateTableRef::NoMoreTableRecords;
       break;
     }
@@ -14665,8 +14719,6 @@
       Rope name(c_rope_pool, obj_ptr.p->m_name);
       if(!name.assign(fg.FilegroupName, len, hash)){
 	op->m_errorCode = CreateTableRef::TableNameTooLong;
-	c_obj_pool.release(obj_ptr);
-	c_filegroup_pool.release(fg_ptr);
 	break;
       }
     }
@@ -14714,8 +14766,24 @@
     
     op->m_obj_ptr_i = fg_ptr.i;
   } while(0);
-  
+
 error:
+  if (op->m_errorCode)
+  {
+    jam();
+    if (!fg_ptr.isNull())
+    {
+      jam();
+      c_filegroup_pool.release(fg_ptr);
+    }
+
+    if (!obj_ptr.isNull())
+    {
+      jam();
+      c_obj_pool.release(obj_ptr);
+    }
+  }
+  
   execute(signal, op->m_callback, 0);
 }
 
@@ -14786,14 +14854,33 @@
 
 void
 Dbdict::create_fg_abort_start(Signal* signal, SchemaOp* op){
-  execute(signal, op->m_callback, 0);
-  abort();
+  CreateFilegroupImplReq* req = 
+    (CreateFilegroupImplReq*)signal->getDataPtrSend();
+
+  if (op->m_obj_ptr_i != RNIL)
+  {
+    jam();
+    send_drop_fg(signal, op, DropFilegroupImplReq::Commit);
+    return;
+  }
+  
+  execute(signal, op->m_callback, 0);  
 }
 
 void
 Dbdict::create_fg_abort_complete(Signal* signal, SchemaOp* op){
+  
+  if (op->m_obj_ptr_i != RNIL)
+  {
+    jam();
+    FilegroupPtr fg_ptr;
+    c_filegroup_pool.getPtr(fg_ptr, op->m_obj_ptr_i);
+    
+    release_object(fg_ptr.p->m_obj_ptr_i);
+    c_filegroup_hash.release(fg_ptr);
+  }
+  
   execute(signal, op->m_callback, 0);
-  abort();
 }
 
 void 
@@ -14805,6 +14892,9 @@
   getSection(objInfoPtr, ((OpCreateObj*)op)->m_obj_info_ptr_i);
   SimplePropertiesSectionReader it(objInfoPtr, getSectionSegmentPool());
   
+  Ptr<DictObject> obj_ptr; obj_ptr.setNull();
+  FilePtr filePtr; filePtr.setNull();
+
   DictFilegroupInfo::File f; f.init();
   SimpleProperties::UnpackStatus status;
   status = SimpleProperties::unpack(it, &f, 
@@ -14854,16 +14944,13 @@
     }
     
     // Loop through all filenames...
-    Ptr<DictObject> obj_ptr;
     if(!c_obj_pool.seize(obj_ptr)){
       op->m_errorCode = CreateTableRef::NoMoreTableRecords;
       break;
     }
     
-    FilePtr filePtr;
     if (! c_file_pool.seize(filePtr)){
       op->m_errorCode = CreateFileRef::OutOfFileRecords;
-      c_obj_pool.release(obj_ptr);
       break;
     }
 
@@ -14871,8 +14958,6 @@
       Rope name(c_rope_pool, obj_ptr.p->m_name);
       if(!name.assign(f.FileName, len, hash)){
 	op->m_errorCode = CreateTableRef::TableNameTooLong;
-	c_obj_pool.release(obj_ptr);
-	c_file_pool.release(filePtr);
 	break;
       }
     }
@@ -14909,6 +14994,22 @@
 
     op->m_obj_ptr_i = filePtr.i;
   } while(0);
+
+  if (op->m_errorCode)
+  {
+    jam();
+    if (!filePtr.isNull())
+    {
+      jam();
+      c_file_pool.release(filePtr);
+    }
+
+    if (!obj_ptr.isNull())
+    {
+      jam();
+      c_obj_pool.release(obj_ptr);
+    }
+  }
   
   execute(signal, op->m_callback, 0);
 }
@@ -14935,8 +15036,6 @@
     break;
   case 1:
     req->requestInfo = CreateFileImplReq::Open;
-    if(getNodeState().getNodeRestartInProgress())
-      req->requestInfo = CreateFileImplReq::CreateForce;      
     break;
   case 2:
     req->requestInfo = CreateFileImplReq::CreateForce;
@@ -15042,61 +15141,71 @@
 {
   CreateFileImplReq* req = (CreateFileImplReq*)signal->getDataPtrSend();
 
-  FilePtr f_ptr;
-  c_file_pool.getPtr(f_ptr, op->m_obj_ptr_i);
-
-  FilegroupPtr fg_ptr;
-  ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
-
-  req->senderData = op->key;
-  req->senderRef = reference();
-  req->requestInfo = CreateFileImplReq::Abort;
-  
-  req->file_id = f_ptr.p->key;
-  req->filegroup_id = f_ptr.p->m_filegroup_id;
-  req->filegroup_version = fg_ptr.p->m_version;
-
-  Uint32 ref= 0;
-  switch(op->m_obj_type){
-  case DictTabInfo::Datafile:
-    ref = TSMAN_REF;
-    break;
-  case DictTabInfo::Undofile:
-    ref = LGMAN_REF;
-    break;
-  default:
-    ndbrequire(false);
+  if (op->m_obj_ptr_i != RNIL)
+  {
+    FilePtr f_ptr;
+    c_file_pool.getPtr(f_ptr, op->m_obj_ptr_i);
+    
+    FilegroupPtr fg_ptr;
+    ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
+    
+    req->senderData = op->key;
+    req->senderRef = reference();
+    req->requestInfo = CreateFileImplReq::Abort;
+    
+    req->file_id = f_ptr.p->key;
+    req->filegroup_id = f_ptr.p->m_filegroup_id;
+    req->filegroup_version = fg_ptr.p->m_version;
+    
+    Uint32 ref= 0;
+    switch(op->m_obj_type){
+    case DictTabInfo::Datafile:
+      ref = TSMAN_REF;
+      break;
+    case DictTabInfo::Undofile:
+      ref = LGMAN_REF;
+      break;
+    default:
+      ndbrequire(false);
+    }
+    
+    sendSignal(ref, GSN_CREATE_FILE_REQ, signal, 
+	       CreateFileImplReq::AbortLength, JBB);
+    return;
   }
-  
-  sendSignal(ref, GSN_CREATE_FILE_REQ, signal, 
-	     CreateFileImplReq::AbortLength, JBB);
+
+  execute(signal, op->m_callback, 0);
 }
 
 void
 Dbdict::create_file_abort_complete(Signal* signal, SchemaOp* op)
 {
-  FilePtr f_ptr;
-  c_file_pool.getPtr(f_ptr, op->m_obj_ptr_i);
-
-  FilegroupPtr fg_ptr;
-  ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
-  
-  switch(fg_ptr.p->m_type){
-  case DictTabInfo::Tablespace:
-    decrease_ref_count(fg_ptr.p->m_obj_ptr_i);
-    break;
-  case DictTabInfo::LogfileGroup:
+  if (op->m_obj_ptr_i != RNIL)
   {
-    LocalDLList<File> list(c_file_pool, fg_ptr.p->m_logfilegroup.m_files);
-    list.remove(f_ptr);
-    break;
-  }
-  default:
-    ndbrequire(false);
+    FilePtr f_ptr;
+    c_file_pool.getPtr(f_ptr, op->m_obj_ptr_i);
+    
+    FilegroupPtr fg_ptr;
+    ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
+    
+    switch(fg_ptr.p->m_type){
+    case DictTabInfo::Tablespace:
+      decrease_ref_count(fg_ptr.p->m_obj_ptr_i);
+      break;
+    case DictTabInfo::LogfileGroup:
+    {
+      LocalDLList<File> list(c_file_pool, fg_ptr.p->m_logfilegroup.m_files);
+      list.remove(f_ptr);
+      break;
+    }
+    default:
+      ndbrequire(false);
+    }
+    
+    release_object(f_ptr.p->m_obj_ptr_i);
+    c_file_pool.release(f_ptr);
   }
   
-  release_object(f_ptr.p->m_obj_ptr_i);
-  
   execute(signal, op->m_callback, 0);
 }
 
@@ -15132,7 +15241,8 @@
 
   decrease_ref_count(fg_ptr.p->m_obj_ptr_i);
   release_object(f_ptr.p->m_obj_ptr_i);
-  
+  c_file_pool.release(f_ptr);
+
   execute(signal, op->m_callback, 0);
 }
 
@@ -15319,7 +15429,8 @@
   c_filegroup_pool.getPtr(fg_ptr, op->m_obj_ptr_i);
   
   release_object(fg_ptr.p->m_obj_ptr_i);
-  
+  c_filegroup_hash.release(fg_ptr);
+
   execute(signal, op->m_callback, 0);
 }
 

--- 1.23/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2006-01-05 12:01:59 +01:00
+++ 1.24/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2006-01-11 12:45:11 +01:00
@@ -228,6 +228,15 @@
     /* Global checkpoint identity when table created */
     Uint32 gciTableCreated;
 
+    /* Is the table logged (i.e. data survives system restart) */
+    enum Bits
+    {
+      TR_Logged = 0x1,
+      TR_RowGCI = 0x2,
+      TR_RowChecksum = 0x4
+    };
+    Uint16 m_bits;
+
     /* Number of attibutes in table */
     Uint16 noOfAttributes;
 
@@ -266,9 +275,6 @@
      */
     Uint8 minLoadFactor;
 
-    /* Is the table logged (i.e. data survives system restart) */
-    bool storedTable;
-
     /* Convenience routines */
     bool isTable() const;
     bool isIndex() const;
@@ -511,6 +517,7 @@
     Uint32 m_filegroup_id;
     Uint32 m_type;
     Uint64 m_file_size;
+    Uint64 m_file_free;
     RopeHandle m_path;
     
     Uint32 nextList;
@@ -2002,7 +2009,10 @@
 			     AttributeRecordPtr & attrPtr);
   void packTableIntoPages(Signal* signal);
   void packTableIntoPages(SimpleProperties::Writer &, TableRecordPtr, Signal* =0);
-  void packFilegroupIntoPages(SimpleProperties::Writer &, FilegroupPtr);
+  void packFilegroupIntoPages(SimpleProperties::Writer &,
+			      FilegroupPtr,
+			      const Uint32 undo_free_hi,
+			      const Uint32 undo_free_lo);
   void packFileIntoPages(SimpleProperties::Writer &, FilePtr, const Uint32);
   
   void sendGET_TABINFOREQ(Signal* signal,
@@ -2027,7 +2037,7 @@
 			 ParseDictTabInfoRecord *,
 			 bool checkExist = true);
   void handleTabInfo(SimpleProperties::Reader & it, ParseDictTabInfoRecord *,
-		     Uint32 tablespaceVersion);
+		     DictTabInfo::Table & tableDesc);
   
   void handleAddTableFailure(Signal* signal,
                              Uint32 failureLine,

--- 1.12/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2006-01-10 12:56:33 +01:00
+++ 1.13/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2006-01-11 12:45:11 +01:00
@@ -232,6 +232,8 @@
     Uint32 storedReplicas;       /* "ALIVE" STORED REPLICAS */
     Uint32 nextFragmentChunk;
     
+    Uint32 m_log_part_id;
+    
     Uint8 distributionKey;
     Uint8 fragReplicas;
     Uint8 noOldStoredReplicas;  /* NUMBER OF "DEAD" STORED REPLICAS */
@@ -539,7 +541,9 @@
       TO_END_COPY = 19,
       TO_END_COPY_ONGOING = 20,
       TO_WAIT_ENDING = 21,
-      ENDING = 22
+      ENDING = 22,
+      
+      STARTING_LOCAL_FRAGMENTS = 24
     };
     enum ToSlaveStatus {
       TO_SLAVE_IDLE = 0,
@@ -968,7 +972,9 @@
   void initialiseRecordsLab(Signal *, Uint32 stepNo, Uint32, Uint32);
 
   void findReplica(ReplicaRecordPtr& regReplicaPtr,
-                   Fragmentstore* fragPtrP, Uint32 nodeId);
+                   Fragmentstore* fragPtrP, 
+		   Uint32 nodeId,
+		   bool oldStoredReplicas = false);
 //------------------------------------
 // Node failure handling methods
 //------------------------------------
@@ -1126,6 +1132,10 @@
   void setNodeCopyCompleted(Uint32 nodeId, bool newState);
   bool checkNodeAlive(Uint32 nodeId);
 
+  void nr_start_fragments(Signal*, TakeOverRecordPtr);
+  void nr_start_fragment(Signal*, TakeOverRecordPtr, ReplicaRecordPtr);
+  void nr_run_redo(Signal*, TakeOverRecordPtr);
+  
   // Initialisation
   void initData();
   void initRecords();
@@ -1152,7 +1162,8 @@
 
   Uint32 c_nextNodeGroup;
   NodeGroupRecord *nodeGroupRecord;
-
+  Uint32 c_nextLogPart;
+  
   NodeRecord *nodeRecord;
 
   PageRecord *pageRecord;

--- 1.46/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2006-01-10 12:56:33 +01:00
+++ 1.47/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2006-01-11 12:45:12 +01:00
@@ -610,6 +610,14 @@
     checkWaitDropTabFailedLqh(signal, nodeId, tableId);
     return;
   }
+  case DihContinueB::ZTO_START_FRAGMENTS:
+  {
+    TakeOverRecordPtr takeOverPtr;
+    takeOverPtr.i = signal->theData[1];
+    ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
+    nr_start_fragments(signal, takeOverPtr);
+    return;
+  }
   }//switch
   
   ndbrequire(false);
@@ -1772,11 +1780,6 @@
   ndbrequire(c_nodeStartMaster.startNode == Tnodeid);
   ndbrequire(getNodeStatus(Tnodeid) == NodeRecord::STARTING);
   
-  sendSTART_RECREQ(signal, Tnodeid);
-}//Dbdih::execSTART_MEREQ()
-
-void Dbdih::nodeRestartStartRecConfLab(Signal* signal) 
-{
   c_nodeStartMaster.blockLcp = true;
   if ((c_lcpState.lcpStatus != LCP_STATUS_IDLE) &&
       (c_lcpState.lcpStatus != LCP_TCGET)) {
@@ -2587,13 +2590,14 @@
     return;
   }//if
   c_startToLock = takeOverPtrI;
+
+  takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
   StartToReq * const req = (StartToReq *)&signal->theData[0];
   req->userPtr = takeOverPtr.i;
   req->userRef = reference();
   req->startingNodeId = takeOverPtr.p->toStartingNode;
   req->nodeTakenOver = takeOverPtr.p->toFailedNode;
   req->nodeRestart = takeOverPtr.p->toNodeRestart;
-  takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
   sendLoopMacro(START_TOREQ, sendSTART_TOREQ);
 }//Dbdih::sendStartTo()
 
@@ -2637,9 +2641,153 @@
   CRASH_INSERTION(7134);
   c_startToLock = RNIL;
 
+  if (takeOverPtr.p->toNodeRestart)
+  {
+    jam();
+    takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING_LOCAL_FRAGMENTS;
+    nr_start_fragments(signal, takeOverPtr);
+    return;
+  }
+
   startNextCopyFragment(signal, takeOverPtr.i);
 }//Dbdih::execSTART_TOCONF()
 
+void
+Dbdih::nr_start_fragments(Signal* signal, 
+			  TakeOverRecordPtr takeOverPtr)
+{
+  Uint32 loopCount = 0 ;
+  TabRecordPtr tabPtr;
+  while (loopCount++ < 100) {
+    tabPtr.i = takeOverPtr.p->toCurrentTabref;
+    if (tabPtr.i >= ctabFileSize) {
+      jam();
+      nr_run_redo(signal, takeOverPtr);
+      return;
+    }//if
+    ptrAss(tabPtr, tabRecord);
+    if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE){
+      jam();
+      takeOverPtr.p->toCurrentFragid = 0;
+      takeOverPtr.p->toCurrentTabref++;
+      continue;
+    }//if
+    Uint32 fragId = takeOverPtr.p->toCurrentFragid;
+    if (fragId >= tabPtr.p->totalfragments) {
+      jam();
+      takeOverPtr.p->toCurrentFragid = 0;
+      takeOverPtr.p->toCurrentTabref++;
+      continue;
+    }//if
+    FragmentstorePtr fragPtr;
+    getFragstore(tabPtr.p, fragId, fragPtr);
+    ReplicaRecordPtr loopReplicaPtr;
+    loopReplicaPtr.i = fragPtr.p->oldStoredReplicas;
+    while (loopReplicaPtr.i != RNIL) {
+      ptrCheckGuard(loopReplicaPtr, creplicaFileSize, replicaRecord);
+      if (loopReplicaPtr.p->procNode == takeOverPtr.p->toStartingNode) {
+        jam();
+	nr_start_fragment(signal, takeOverPtr, loopReplicaPtr);
+	break;
+      } else {
+        jam();
+        loopReplicaPtr.i = loopReplicaPtr.p->nextReplica;
+      }//if
+    }//while
+    takeOverPtr.p->toCurrentFragid++;
+  }//while
+  signal->theData[0] = DihContinueB::ZTO_START_FRAGMENTS;
+  signal->theData[1] = takeOverPtr.i;
+  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+}
+
+void
+Dbdih::nr_start_fragment(Signal* signal, 
+			 TakeOverRecordPtr takeOverPtr,
+			 ReplicaRecordPtr replicaPtr)
+{
+  Uint32 i, j = 0;
+  Uint32 maxLcpId = 0;
+  Uint32 maxLcpIndex = ~0;
+  
+  Uint32 restorableGCI = 0;
+  
+  ndbout_c("tab: %d frag: %d replicaP->nextLcp: %d",
+	   takeOverPtr.p->toCurrentTabref,
+	   takeOverPtr.p->toCurrentFragid,
+	   replicaPtr.p->nextLcp);
+  
+  Uint32 idx = replicaPtr.p->nextLcp;
+  for(i = 0; i<MAX_LCP_STORED; i++, idx = nextLcpNo(idx))
+  {
+    ndbout_c("scanning idx: %d lcpId: %d", idx, replicaPtr.p->lcpId[idx]);
+    if (replicaPtr.p->lcpStatus[idx] == ZVALID) 
+    {
+      ndbrequire(replicaPtr.p->lcpId[idx] > maxLcpId);
+      Uint32 startGci = replicaPtr.p->maxGciCompleted[idx];
+      Uint32 stopGci = replicaPtr.p->maxGciStarted[idx];
+      for (;j < replicaPtr.p->noCrashedReplicas; j++)
+      {
+	ndbout_c("crashed replica: %d(%d) replicaLastGci: %d",
+		 j, 
+		 replicaPtr.p->noCrashedReplicas,
+		 replicaPtr.p->replicaLastGci[j]);
+	if (replicaPtr.p->replicaLastGci[j] > stopGci)
+	{
+	  maxLcpId = replicaPtr.p->lcpId[idx];
+	  maxLcpIndex = idx;
+	  restorableGCI = replicaPtr.p->replicaLastGci[j];
+	  break;
+	}
+      }
+    }
+  }
+  
+  if (maxLcpIndex == ~0)
+  {
+    ndbout_c("Didnt find any LCP for node: %d tab: %d frag: %d",
+	     takeOverPtr.p->toStartingNode,
+	     takeOverPtr.p->toCurrentTabref,
+	     takeOverPtr.p->toCurrentFragid);
+    replicaPtr.p->lcpIdStarted = 0;
+  }
+  else
+  {
+    ndbout_c("Found LCP: %d(%d) maxGciStarted: %d maxGciCompleted: %d restorable: %d(%d) newestRestorableGCI: %d",
+	     maxLcpId,
+	     maxLcpIndex,
+	     replicaPtr.p->maxGciStarted[maxLcpIndex],
+	     replicaPtr.p->maxGciCompleted[maxLcpIndex],	     
+	     restorableGCI,
+	     SYSFILE->lastCompletedGCI[takeOverPtr.p->toStartingNode],
+	     SYSFILE->newestRestorableGCI);
+
+    replicaPtr.p->lcpIdStarted = restorableGCI;
+    BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toStartingNode);
+    StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
+    req->userPtr = 0;
+    req->userRef = reference();
+    req->lcpNo = maxLcpIndex;
+    req->lcpId = maxLcpId;
+    req->tableId = takeOverPtr.p->toCurrentTabref;
+    req->fragId = takeOverPtr.p->toCurrentFragid;
+    req->noOfLogNodes = 1;
+    req->lqhLogNode[0] = takeOverPtr.p->toStartingNode;
+    req->startGci[0] = replicaPtr.p->maxGciCompleted[maxLcpIndex];
+    req->lastGci[0] = restorableGCI;
+    sendSignal(ref, GSN_START_FRAGREQ, signal, 
+	       StartFragReq::SignalLength, JBB);
+  }
+}
+
+void
+Dbdih::nr_run_redo(Signal* signal, TakeOverRecordPtr takeOverPtr)
+{
+  takeOverPtr.p->toCurrentTabref = 0;
+  takeOverPtr.p->toCurrentFragid = 0;
+  sendSTART_RECREQ(signal, takeOverPtr.p->toStartingNode);
+}
+
 void Dbdih::initStartTakeOver(const StartToReq * req, 
 			      TakeOverRecordPtr takeOverPtr)
 {
@@ -2972,6 +3120,14 @@
     /*---------------------------------------------------------------------- */
     FragmentstorePtr fragPtr;
     getFragstore(tabPtr.p, fragId, fragPtr);
+    Uint32 gci = 0;
+    if (takeOverPtr.p->toNodeRestart)
+    {
+      ReplicaRecordPtr replicaPtr;
+      findReplica(replicaPtr, fragPtr.p, takeOverPtr.p->toStartingNode, true);
+      gci = replicaPtr.p->lcpIdStarted;
+      replicaPtr.p->lcpIdStarted = 0;
+    }
     takeOverPtr.p->toMasterStatus = TakeOverRecord::COPY_FRAG;
     BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toCopyNode);
     CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
@@ -2982,6 +3138,7 @@
     copyFragReq->nodeId = takeOverPtr.p->toStartingNode;
     copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
     copyFragReq->distributionKey = fragPtr.p->distributionKey;
+    copyFragReq->gci = gci;
     sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB);
   } else {
     ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
@@ -4034,6 +4191,8 @@
 						  Uint32 takeOverPtrI)
 {
   jam();
+  ndbout_c("checkTakeOverInMasterStartNodeFailure %x",
+	   takeOverPtrI);
   if (takeOverPtrI == RNIL) {
     jam();
     return;
@@ -4047,6 +4206,9 @@
   takeOverPtr.i = takeOverPtrI;
   ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
 
+  ndbout_c("takeOverPtr.p->toMasterStatus: %x", 
+	   takeOverPtr.p->toMasterStatus);
+  
   bool ok = false;
   switch (takeOverPtr.p->toMasterStatus) {
   case TakeOverRecord::IDLE:
@@ -4155,6 +4317,13 @@
     //-----------------------------------------------------------------------
     endTakeOver(takeOverPtr.i);
     break;
+
+  case TakeOverRecord::STARTING_LOCAL_FRAGMENTS:
+    ok = true;
+    jam();
+    endTakeOver(takeOverPtr.i);
+    break;
+    
     /**
      * The following are states that it should not be possible to "be" in
      */
@@ -6586,6 +6755,8 @@
     Uint32 activeIndex = 0;
     getFragstore(tabPtr.p, fragId, fragPtr);
     fragPtr.p->preferredPrimary = fragments[index];
+    fragPtr.p->m_log_part_id = c_nextLogPart++;
+    
     for (Uint32 i = 0; i<noReplicas; i++) {
       const Uint32 nodeId = fragments[index++];
       ReplicaRecordPtr replicaPtr;
@@ -6630,9 +6801,9 @@
   jam();
   const Uint32 fragCount = tabPtr.p->totalfragments;
   ReplicaRecordPtr replicaPtr; replicaPtr.i = RNIL;
+  FragmentstorePtr fragPtr;
   for(; fragId<fragCount; fragId++){
     jam();
-    FragmentstorePtr fragPtr;
     getFragstore(tabPtr.p, fragId, fragPtr);    
     
     replicaPtr.i = fragPtr.p->storedReplicas;
@@ -6690,6 +6861,7 @@
     req->nodeId = getOwnNodeId();
     req->totalFragments = fragCount;
     req->startGci = SYSFILE->newestRestorableGCI;
+    req->logPartId = fragPtr.p->m_log_part_id;
     sendSignal(DBDICT_REF, GSN_ADD_FRAGREQ, signal, 
 	       AddFragReq::SignalLength, JBB);
     return;
@@ -8903,8 +9075,8 @@
     // otherwise we have a problem.
     /* --------------------------------------------------------------------- */
     jam();
-    ndbrequire(senderNodeId == c_nodeStartMaster.startNode);
-    nodeRestartStartRecConfLab(signal);
+    ndbout_c("startNextCopyFragment");
+    startNextCopyFragment(signal, findTakeOver(senderNodeId));
     return;
   } else {
     /* --------------------------------------------------------------------- */
@@ -9923,9 +10095,11 @@
 }
 
 void Dbdih::findReplica(ReplicaRecordPtr& replicaPtr, 
-			Fragmentstore* fragPtrP, Uint32 nodeId)
+			Fragmentstore* fragPtrP, 
+			Uint32 nodeId,
+			bool old)
 {
-  replicaPtr.i = fragPtrP->storedReplicas;
+  replicaPtr.i = old ? fragPtrP->oldStoredReplicas : fragPtrP->storedReplicas;
   while(replicaPtr.i != RNIL){
     ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
     if (replicaPtr.p->procNode == nodeId) {
@@ -11196,6 +11370,7 @@
   currentgcp = 0;
   cverifyQueueCounter = 0;
   cwaitLcpSr = false;
+  c_nextLogPart = 0;
 
   nodeResetStart();
   c_nodeStartMaster.wait = ZFALSE;
@@ -12000,6 +12175,8 @@
     jam();
     fragPtr.p->distributionKey = TdistKey;
   }//if
+
+  fragPtr.p->m_log_part_id = readPageWord(rf);
 }//Dbdih::readFragment()
 
 Uint32 Dbdih::readPageWord(RWFragment* rf) 
@@ -13090,6 +13267,7 @@
   writePageWord(wf, fragPtr.p->noStoredReplicas);
   writePageWord(wf, fragPtr.p->noOldStoredReplicas);
   writePageWord(wf, fragPtr.p->distributionKey);
+  writePageWord(wf, fragPtr.p->m_log_part_id);
 }//Dbdih::writeFragment()
 
 void Dbdih::writePageWord(RWFragment* wf, Uint32 dataWord)

--- 1.100/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2006-01-10 12:37:10 +01:00
+++ 1.101/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2006-01-11 12:45:12 +01:00
@@ -2808,17 +2808,9 @@
   regCachePtr->attrinfo15[2] = Tdata4;
   regCachePtr->attrinfo15[3] = Tdata5;
 
-  if (TOperationType == ZREAD) {
+  if (TOperationType == ZREAD || TOperationType == ZREAD_EX) {
     Uint32 TreadCount = c_counters.creadCount;
     jam();
-    regCachePtr->opLock = 0;
-    c_counters.creadCount = TreadCount + 1;
-  } else if(TOperationType == ZREAD_EX){
-    Uint32 TreadCount = c_counters.creadCount;
-    jam();
-    TOperationType = ZREAD;
-    regTcPtr->operation = ZREAD;
-    regCachePtr->opLock = ZUPDATE;
     c_counters.creadCount = TreadCount + 1;
   } else {
     if(regApiPtr->commitAckMarker == RNIL){
@@ -2852,24 +2844,10 @@
     c_counters.cwriteCount = TwriteCount + 1;
     switch (TOperationType) {
     case ZUPDATE:
-      jam();
-      if (TattrLen == 0) {
-        //TCKEY_abort(signal, 5);
-        //return;
-      }//if
-      /*---------------------------------------------------------------------*/
-      // The missing break is intentional since we also want to set the opLock 
-      // variable also for updates
-      /*---------------------------------------------------------------------*/
     case ZINSERT:
     case ZDELETE:
-      jam();      
-      regCachePtr->opLock = TOperationType;
-      break;
     case ZWRITE:
       jam();
-      // A write operation is originally an insert operation.
-      regCachePtr->opLock = ZINSERT;  
       break;
     default:
       TCKEY_abort(signal, 9);
@@ -3040,7 +3018,7 @@
   tnoOfStandby = (tnodeinfo >> 8) & 3;
  
   regCachePtr->fragmentDistributionKey = (tnodeinfo >> 16) & 255;
-  if (Toperation == ZREAD) {
+  if (Toperation == ZREAD || Toperation == ZREAD_EX) {
     if (Tdirty == 1) {
       jam();
       /*-------------------------------------------------------------*/
@@ -3169,6 +3147,7 @@
   TcConnectRecord * const regTcPtr = tcConnectptr.p;
   ApiConnectRecord * const regApiPtr = apiConnectptr.p;
   CacheRecord * const regCachePtr = cachePtr.p;
+  Uint32 version = getNodeInfo(refToNode(TBRef)).m_version;
 #ifdef ERROR_INSERT
   if (ERROR_INSERTED(8002)) {
     systemErrorLab(signal, __LINE__);
@@ -3208,7 +3187,12 @@
   Tdata10 = 0;
   LqhKeyReq::setKeyLen(Tdata10, regCachePtr->keylen);
   LqhKeyReq::setLastReplicaNo(Tdata10, regTcPtr->lastReplicaNo);
-  LqhKeyReq::setLockType(Tdata10, regCachePtr->opLock);
+  if (unlikely(version < NDBD_ROWID_VERSION))
+  {
+    Uint32 op = regTcPtr->operation;
+    Uint32 lock = op == ZREAD_EX ? ZUPDATE : op == ZWRITE ? ZINSERT : op;
+    LqhKeyReq::setLockType(Tdata10, lock);
+  }
   /* ---------------------------------------------------------------------- */
   // Indicate Application Reference is present in bit 15
   /* ---------------------------------------------------------------------- */

--- 1.17/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2006-01-05 12:55:44 +01:00
+++ 1.18/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2006-01-11 12:45:12 +01:00
@@ -582,7 +582,12 @@
   {
     regOperPtr->op_struct.op_type = ZUPDATE;
   }
-
+  
+  /**
+   * Set disk page
+   */
+  req_struct->m_disk_page_ptr.i = m_pgman.m_ptr.i;
+  
   ndbrequire(regOperPtr->is_first_operation());
   triggerList.first(trigPtr);
   while (trigPtr.i != RNIL) {
@@ -819,8 +824,8 @@
 //--------------------------------------------------------------------
 // Read Primary Key Values
 //--------------------------------------------------------------------
-  if (regTabPtr->need_expand(false)) // no disk 
-    prepare_read(req_struct, regTabPtr, false); // setup varsize
+  if (regTabPtr->need_expand()) 
+    prepare_read(req_struct, regTabPtr, true);
   
   int ret = readAttributes(req_struct,
 			   &tableDescriptor[regTabPtr->readKeyArray].tabDescr,
@@ -904,8 +909,8 @@
       req_struct->m_tuple_ptr= (Tuple_header*)ptr;
     }
 
-    if (regTabPtr->need_expand(false)) // no disk 
-      prepare_read(req_struct, regTabPtr, false); // setup varsize
+    if (regTabPtr->need_expand()) // no disk 
+      prepare_read(req_struct, regTabPtr, true);
     
     int ret = readAttributes(req_struct,
 			     &readBuffer[0],
@@ -1170,7 +1175,7 @@
   req->pageIndex = regOperPtr->m_tuple_location.m_page_idx;
   req->tupVersion = tupVersion;
   req->opInfo = TuxMaintReq::OpRemove;
-  removeTuxEntries(signal, regOperPtr, regTabPtr);
+  removeTuxEntries(signal, regTabPtr);
 }
 
 void
@@ -1202,12 +1207,11 @@
   req->pageIndex = regOperPtr->m_tuple_location.m_page_idx;
   req->tupVersion = tupVersion;
   req->opInfo = TuxMaintReq::OpRemove;
-  removeTuxEntries(signal, regOperPtr, regTabPtr);
+  removeTuxEntries(signal, regTabPtr);
 }
 
 void
 Dbtup::removeTuxEntries(Signal* signal,
-                        Operationrec* regOperPtr,
                         Tablerec* regTabPtr)
 {
   TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();

--- 1.30/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2006-01-10 12:37:10 +01:00
+++ 1.31/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2006-01-11 12:45:12 +01:00
@@ -14,6 +14,7 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
+#include <my_config.h>
 #include "Suma.hpp"
 
 #include <ndb_version.h>
@@ -3249,7 +3250,8 @@
 	  Page_pos pos= bucket->m_buffer_head;
 	  ndbrequire(pos.m_max_gci < gci);
 
-	  Buffer_page* page= (Buffer_page*)(m_tup->cpage+pos.m_page_id);
+	  Buffer_page* page= (Buffer_page*)
+	    m_tup->c_page_pool.getPtr(pos.m_page_id);
 	  ndbout_c("takeover %d", pos.m_page_id);
 	  page->m_max_gci = pos.m_max_gci;
 	  page->m_words_used = pos.m_page_pos;
@@ -4191,7 +4193,7 @@
   Bucket* bucket= c_buckets+buck;
   Page_pos pos= bucket->m_buffer_head;
 
-  Buffer_page* page= (Buffer_page*)(m_tup->cpage+pos.m_page_id);
+  Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
   Uint32* ptr= page->m_data + pos.m_page_pos;
 
   const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022));
@@ -4250,7 +4252,7 @@
     pos.m_page_pos = sz;
     pos.m_last_gci = gci;
     
-    page= (Buffer_page*)(m_tup->cpage+pos.m_page_id);
+    page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
     page->m_next_page= RNIL;
     ptr= page->m_data;
     goto loop; //
@@ -4281,7 +4283,7 @@
   
   if(tail != RNIL)
   {
-    Buffer_page* page= (Buffer_page*)(m_tup->cpage+tail);
+    Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
     bucket->m_buffer_tail = page->m_next_page;
     free_page(tail, page);
     signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
@@ -4325,8 +4327,8 @@
   Uint32 ref= m_first_free_page;
   if(likely(ref != RNIL))
   {
-    m_first_free_page = ((Buffer_page*)m_tup->cpage+ref)->m_next_page;
-    Uint32 chunk = ((Buffer_page*)m_tup->cpage+ref)->m_page_chunk_ptr_i;
+    m_first_free_page = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_next_page;
+    Uint32 chunk = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_page_chunk_ptr_i;
     c_page_chunk_pool.getPtr(ptr, chunk);
     ndbassert(ptr.p->m_free);
     ptr.p->m_free--;
@@ -4349,7 +4351,7 @@
   Buffer_page* page;
   for(Uint32 i = 0; i<count; i++)
   {
-    page = (Buffer_page*)(m_tup->cpage+ref);
+    page = (Buffer_page*)m_tup->c_page_pool.getPtr(ref);
     page->m_page_state= SUMA_SEQUENCE;
     page->m_page_chunk_ptr_i = ptr.i;
     page->m_next_page = ++ref;
@@ -4413,7 +4415,7 @@
   else
   {
     jam();
-    Buffer_page* page= (Buffer_page*)(m_tup->cpage+tail);
+    Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
     Uint32 max_gci = page->m_max_gci;
     Uint32 next_page = page->m_next_page;
 
@@ -4506,7 +4508,7 @@
   Bucket* bucket= c_buckets+buck;
   Uint32 tail= bucket->m_buffer_tail;
 
-  Buffer_page* page= (Buffer_page*)(m_tup->cpage+tail);
+  Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
   Uint32 max_gci = page->m_max_gci;
   Uint32 next_page = page->m_next_page;
   Uint32 *ptr = page->m_data + pos;

--- 1.42/storage/ndb/src/ndbapi/NdbDictionary.cpp	2006-01-05 12:01:59 +01:00
+++ 1.43/storage/ndb/src/ndbapi/NdbDictionary.cpp	2006-01-11 12:45:12 +01:00
@@ -466,6 +466,11 @@
   return m_impl.m_version;
 }
 
+int 
+NdbDictionary::Table::getObjectId() const {
+  return m_impl.m_id;
+}
+
 bool
 NdbDictionary::Table::equal(const NdbDictionary::Table & col) const {
   return m_impl.equal(col.m_impl);
@@ -497,6 +502,12 @@
   return pNdb->getDictionary()->createTable(* this);
 }
 
+Uint32
+NdbDictionary::Table::getTablespaceId() const 
+{
+  return m_impl.m_tablespace_id;
+}
+
 void 
 NdbDictionary::Table::setTablespace(const char * name){
   m_impl.m_tablespace_id = ~0;
@@ -511,6 +522,27 @@
   m_impl.m_tablespace_name.assign(ts.getName());
 }
 
+void
+NdbDictionary::Table::setRowChecksumIndicator(bool val){
+  m_impl.m_row_checksum = val;
+}
+
+bool 
+NdbDictionary::Table::getRowChecksumIndicator() const {
+  return m_impl.m_row_checksum;
+}
+
+void
+NdbDictionary::Table::setRowGCIIndicator(bool val){
+  m_impl.m_row_gci = val;
+}
+
+bool 
+NdbDictionary::Table::getRowGCIIndicator() const {
+  return m_impl.m_row_gci;
+}
+
+
 /*****************************************************************
  * Index facade
  */
@@ -644,6 +676,12 @@
   return m_impl.m_version;
 }
 
+int 
+NdbDictionary::Index::getObjectId() const {
+  return m_impl.m_id;
+}
+
+
 /*****************************************************************
  * Event facade
  */
@@ -777,6 +815,11 @@
   return m_impl.m_version;
 }
 
+int 
+NdbDictionary::Event::getObjectId() const {
+  return m_impl.m_id;
+}
+
 void NdbDictionary::Event::print()
 {
   m_impl.print();
@@ -795,6 +838,12 @@
 {
 }
 
+NdbDictionary::Tablespace::Tablespace(const NdbDictionary::Tablespace & org)
+  : Object(org), m_impl(* new NdbTablespaceImpl(* this))
+{
+  m_impl.assign(org.m_impl);
+}
+
 NdbDictionary::Tablespace::~Tablespace(){
   NdbTablespaceImpl * tmp = &m_impl;  
   if(this != tmp){
@@ -852,6 +901,11 @@
   return m_impl.m_logfile_group_name.c_str();
 }
 
+Uint32
+NdbDictionary::Tablespace::getDefaultLogfileGroupId() const {
+  return m_impl.m_logfile_group_id;
+}
+
 NdbDictionary::Object::Status
 NdbDictionary::Tablespace::getObjectStatus() const {
   return m_impl.m_status;
@@ -862,6 +916,11 @@
   return m_impl.m_version;
 }
 
+int 
+NdbDictionary::Tablespace::getObjectId() const {
+  return m_impl.m_id;
+}
+
 /*****************************************************************
  * LogfileGroup facade
  */
@@ -875,6 +934,12 @@
 {
 }
 
+NdbDictionary::LogfileGroup::LogfileGroup(const NdbDictionary::LogfileGroup & org)
+  : Object(org), m_impl(* new NdbLogfileGroupImpl(* this)) 
+{
+  m_impl.assign(org.m_impl);
+}
+
 NdbDictionary::LogfileGroup::~LogfileGroup(){
   NdbLogfileGroupImpl * tmp = &m_impl;  
   if(this != tmp){
@@ -912,6 +977,10 @@
   return m_impl.m_grow_spec;
 }
 
+Uint64 NdbDictionary::LogfileGroup::getUndoFreeWords() const {
+  return m_impl.m_undo_free_words;
+}
+
 NdbDictionary::Object::Status
 NdbDictionary::LogfileGroup::getObjectStatus() const {
   return m_impl.m_status;
@@ -922,6 +991,11 @@
   return m_impl.m_version;
 }
 
+int 
+NdbDictionary::LogfileGroup::getObjectId() const {
+  return m_impl.m_id;
+}
+
 /*****************************************************************
  * Datafile facade
  */
@@ -935,6 +1009,12 @@
 {
 }
 
+NdbDictionary::Datafile::Datafile(const NdbDictionary::Datafile & org)
+  : Object(org), m_impl(* new NdbDatafileImpl(* this)) 
+{
+  m_impl.assign(org.m_impl);
+}
+
 NdbDictionary::Datafile::~Datafile(){
   NdbDatafileImpl * tmp = &m_impl;  
   if(this != tmp){
@@ -986,6 +1066,11 @@
   return m_impl.m_filegroup_name.c_str();
 }
 
+Uint32
+NdbDictionary::Datafile::getTablespaceId() const {
+  return m_impl.m_filegroup_id;
+}
+
 NdbDictionary::Object::Status
 NdbDictionary::Datafile::getObjectStatus() const {
   return m_impl.m_status;
@@ -996,6 +1081,11 @@
   return m_impl.m_version;
 }
 
+int 
+NdbDictionary::Datafile::getObjectId() const {
+  return m_impl.m_id;
+}
+
 /*****************************************************************
  * Undofile facade
  */
@@ -1009,6 +1099,12 @@
 {
 }
 
+NdbDictionary::Undofile::Undofile(const NdbDictionary::Undofile & org)
+  : Object(org), m_impl(* new NdbUndofileImpl(* this))
+{
+  m_impl.assign(org.m_impl);
+}
+
 NdbDictionary::Undofile::~Undofile(){
   NdbUndofileImpl * tmp = &m_impl;  
   if(this != tmp){
@@ -1036,11 +1132,6 @@
   return m_impl.m_size;
 }
 
-Uint64
-NdbDictionary::Undofile::getFree() const {
-  return m_impl.m_free;
-}
-
 void 
 NdbDictionary::Undofile::setLogfileGroup(const char * logfileGroup){
   m_impl.m_filegroup_id = ~0;
@@ -1061,6 +1152,11 @@
   return m_impl.m_filegroup_name.c_str();
 }
 
+Uint32
+NdbDictionary::Undofile::getLogfileGroupId() const {
+  return m_impl.m_filegroup_id;
+}
+
 NdbDictionary::Object::Status
 NdbDictionary::Undofile::getObjectStatus() const {
   return m_impl.m_status;
@@ -1071,6 +1167,11 @@
   return m_impl.m_version;
 }
 
+int 
+NdbDictionary::Undofile::getObjectId() const {
+  return m_impl.m_id;
+}
+
 /*****************************************************************
  * Dictionary facade
  */
@@ -1515,11 +1616,3 @@
   return tmp;
 }
 
-const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0;
-const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT_MEMORY = 0;
-const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0;
-const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0;
-const NdbDictionary::Column * NdbDictionary::Column::ROW_SIZE = 0;
-const NdbDictionary::Column * NdbDictionary::Column::RANGE_NO = 0;
-const NdbDictionary::Column * NdbDictionary::Column::DISK_REF = 0;
-const NdbDictionary::Column * NdbDictionary::Column::RECORDS_IN_RANGE = 0;

--- 1.95/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-01-11 11:31:02 +01:00
+++ 1.96/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2006-01-11 12:45:12 +01:00
@@ -320,9 +320,21 @@
     col->m_impl.m_attrId = AttributeHeader::RECORDS_IN_RANGE;
     col->m_impl.m_attrSize = 4;
     col->m_impl.m_arraySize = 4;
+  } else if(!strcmp(name, "NDB$ROWID")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::ROWID;
+    col->m_impl.m_attrSize = 4;
+    col->m_impl.m_arraySize = 2;
+  } else if(!strcmp(name, "NDB$ROW_GCI")){
+    col->setType(NdbDictionary::Column::Bigunsigned);
+    col->m_impl.m_attrId = AttributeHeader::ROW_GCI;
+    col->m_impl.m_attrSize = 8;
+    col->m_impl.m_arraySize = 1;
+    col->m_impl.m_nullable = true;
   } else {
     abort();
   }
+  col->m_impl.m_storageType = NDB_STORAGETYPE_MEMORY;
   return col;
 }
 
@@ -378,6 +390,8 @@
   m_noOfBlobs= 0;
   m_replicaCount= 0;
   m_tablespace_id = ~0;
+  m_row_gci = true;
+  m_row_checksum = true;
 }
 
 bool
@@ -907,6 +921,8 @@
       delete NdbDictionary::Column::RANGE_NO;
       delete NdbDictionary::Column::DISK_REF;
       delete NdbDictionary::Column::RECORDS_IN_RANGE;
+      delete NdbDictionary::Column::ROWID;
+      delete NdbDictionary::Column::ROW_GCI;
       NdbDictionary::Column::FRAGMENT= 0;
       NdbDictionary::Column::FRAGMENT_MEMORY= 0;
       NdbDictionary::Column::ROW_COUNT= 0;
@@ -915,6 +931,8 @@
       NdbDictionary::Column::RANGE_NO= 0;
       NdbDictionary::Column::DISK_REF= 0;
       NdbDictionary::Column::RECORDS_IN_RANGE= 0;
+      NdbDictionary::Column::ROWID= 0;
+      NdbDictionary::Column::ROW_GCI= 0;
     }
     m_globalHash->unlock();
   } else {
@@ -991,6 +1009,10 @@
 	NdbColumnImpl::create_pseudo("NDB$DISK_REF");
       NdbDictionary::Column::RECORDS_IN_RANGE= 
 	NdbColumnImpl::create_pseudo("NDB$RECORDS_IN_RANGE");
+      NdbDictionary::Column::ROWID= 
+	NdbColumnImpl::create_pseudo("NDB$ROWID");
+      NdbDictionary::Column::ROW_GCI= 
+	NdbColumnImpl::create_pseudo("NDB$ROW_GCI");
     }
     m_globalHash->unlock();
     return true;
@@ -1523,6 +1545,8 @@
 		   (Uint32)NdbDictionary::Object::FragUndefined);
   
   impl->m_logging = tableDesc.TableLoggedFlag;
+  impl->m_row_gci = tableDesc.RowGCIFlag;
+  impl->m_row_checksum = tableDesc.RowChecksumFlag;
   impl->m_kvalue = tableDesc.TableKValue;
   impl->m_minLoadFactor = tableDesc.MinLoadFactor;
   impl->m_maxLoadFactor = tableDesc.MaxLoadFactor;
@@ -1861,6 +1885,9 @@
   memcpy(tmpTab.FragmentData, impl.m_ng.get_data(), impl.m_ng.length());
 
   tmpTab.TableLoggedFlag = impl.m_logging;
+  tmpTab.RowGCIFlag = impl.m_row_gci;
+  tmpTab.RowChecksumFlag = impl.m_row_checksum;
+  tmpTab.TableLoggedFlag = impl.m_logging;
   tmpTab.TableKValue = impl.m_kvalue;
   tmpTab.MinLoadFactor = impl.m_minLoadFactor;
   tmpTab.MaxLoadFactor = impl.m_maxLoadFactor;
@@ -3432,6 +3459,24 @@
 NdbTablespaceImpl::~NdbTablespaceImpl(){
 }
 
+void
+NdbTablespaceImpl::assign(const NdbTablespaceImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_name.assign(org.m_name);
+  m_grow_spec = org.m_grow_spec;
+  m_extent_size = org.m_extent_size;
+  m_undo_free_words = org.m_undo_free_words;
+  m_logfile_group_id = org.m_logfile_group_id;
+  m_logfile_group_version = org.m_logfile_group_version;
+  m_logfile_group_name.assign(org.m_logfile_group_name);
+  m_undo_free_words = org.m_undo_free_words;
+}
+
 NdbLogfileGroupImpl::NdbLogfileGroupImpl() : 
   NdbDictionary::LogfileGroup(* this), 
   NdbFilegroupImpl(NdbDictionary::Object::LogfileGroup), m_facade(this)
@@ -3447,6 +3492,24 @@
 NdbLogfileGroupImpl::~NdbLogfileGroupImpl(){
 }
 
+void
+NdbLogfileGroupImpl::assign(const NdbLogfileGroupImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_name.assign(org.m_name);
+  m_grow_spec = org.m_grow_spec;
+  m_extent_size = org.m_extent_size;
+  m_undo_free_words = org.m_undo_free_words;
+  m_logfile_group_id = org.m_logfile_group_id;
+  m_logfile_group_version = org.m_logfile_group_version;
+  m_logfile_group_name.assign(org.m_logfile_group_name);
+  m_undo_free_words = org.m_undo_free_words;
+}
+
 NdbFileImpl::NdbFileImpl(NdbDictionary::Object::Type t)
   : NdbDictObjectImpl(t)
 {
@@ -3471,6 +3534,22 @@
 NdbDatafileImpl::~NdbDatafileImpl(){
 }
 
+void
+NdbDatafileImpl::assign(const NdbDatafileImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_size = org.m_size;
+  m_free = org.m_free;
+  m_filegroup_id = org.m_filegroup_id;
+  m_filegroup_version = org.m_filegroup_version;
+  m_path.assign(org.m_path);
+  m_filegroup_name.assign(org.m_filegroup_name);
+}
+
 NdbUndofileImpl::NdbUndofileImpl() : 
   NdbDictionary::Undofile(* this), 
   NdbFileImpl(NdbDictionary::Object::Undofile), m_facade(this)
@@ -3486,6 +3565,22 @@
 NdbUndofileImpl::~NdbUndofileImpl(){
 }
 
+void
+NdbUndofileImpl::assign(const NdbUndofileImpl& org)
+{
+  m_id = org.m_id;
+  m_version = org.m_version;
+  m_status = org.m_status;
+  m_type = org.m_type;
+
+  m_size = org.m_size;
+  m_free = org.m_free;
+  m_filegroup_id = org.m_filegroup_id;
+  m_filegroup_version = org.m_filegroup_version;
+  m_path.assign(org.m_path);
+  m_filegroup_name.assign(org.m_filegroup_name);
+}
+
 int 
 NdbDictionaryImpl::createDatafile(const NdbDatafileImpl & file, bool force){
   DBUG_ENTER("NdbDictionaryImpl::createDatafile");
@@ -3804,10 +3899,10 @@
 NdbDictInterface::get_filegroup(NdbFilegroupImpl & dst,
 				NdbDictionary::Object::Type type,
 				const char * name){
-  DBUG_ENTER("NdbDictInterface::get_filegroup"); 
+  DBUG_ENTER("NdbDictInterface::get_filegroup");
   NdbApiSignal tSignal(m_reference);
   GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
-  
+
   size_t strLen = strlen(name) + 1;
 
   req->senderRef = m_reference;
@@ -3822,7 +3917,7 @@
   LinearSectionPtr ptr[1];
   ptr[0].p  = (Uint32*)name;
   ptr[0].sz = (strLen + 3)/4;
-  
+
   int r = dictSignal(&tSignal, ptr, 1,
 		     -1, // any node
 		     WAIT_GET_TAB_INFO_REQ,
@@ -3832,11 +3927,11 @@
     DBUG_PRINT("info", ("get_filegroup failed dictSignal"));
     DBUG_RETURN(-1);
   }
-  
-  m_error.code = parseFilegroupInfo(dst, 
-				    (Uint32*)m_buffer.get_data(), 
+
+  m_error.code = parseFilegroupInfo(dst,
+				    (Uint32*)m_buffer.get_data(),
 				    m_buffer.length() / 4);
-  
+
   if(m_error.code)
   {
     DBUG_PRINT("info", ("get_filegroup failed parseFilegroupInfo %d",
@@ -3844,6 +3939,15 @@
     DBUG_RETURN(m_error.code);
   }
 
+  if(dst.m_type == NdbDictionary::Object::Tablespace)
+  {
+    NdbDictionary::LogfileGroup tmp;
+    get_filegroup(NdbLogfileGroupImpl::getImpl(tmp),
+		  NdbDictionary::Object::LogfileGroup,
+		  dst.m_logfile_group_id);
+    dst.m_logfile_group_name.assign(tmp.getName());
+  }
+  
   if(dst.m_type == type)
   {
     DBUG_RETURN(0);
@@ -3880,10 +3984,59 @@
   dst.m_undo_buffer_size = fg.LF_UndoBufferSize;
   dst.m_logfile_group_id = fg.TS_LogfileGroupId;
   dst.m_logfile_group_version = fg.TS_LogfileGroupVersion;
+  dst.m_undo_free_words= ((Uint64)fg.LF_UndoFreeWordsHi << 32)
+    | (fg.LF_UndoFreeWordsLo);
+
   return 0;
 }
 
 int
+NdbDictInterface::get_filegroup(NdbFilegroupImpl & dst,
+				NdbDictionary::Object::Type type,
+				Uint32 id){
+  DBUG_ENTER("NdbDictInterface::get_filegroup");
+  NdbApiSignal tSignal(m_reference);
+  GetTabInfoReq * req = CAST_PTR(GetTabInfoReq, tSignal.getDataPtrSend());
+
+  req->senderRef = m_reference;
+  req->senderData = 0;
+  req->requestType =
+    GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
+  req->tableId = id;
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber   = GSN_GET_TABINFOREQ;
+  tSignal.theLength = GetTabInfoReq::SignalLength;
+
+  int r = dictSignal(&tSignal, NULL, 1,
+		     -1, // any node
+		     WAIT_GET_TAB_INFO_REQ,
+		     WAITFOR_RESPONSE_TIMEOUT, 100);
+  if (r)
+  {
+    DBUG_PRINT("info", ("get_filegroup failed dictSignal"));
+    DBUG_RETURN(-1);
+  }
+
+  m_error.code = parseFilegroupInfo(dst,
+				    (Uint32*)m_buffer.get_data(),
+				    m_buffer.length() / 4);
+
+  if(m_error.code)
+  {
+    DBUG_PRINT("info", ("get_filegroup failed parseFilegroupInfo %d",
+                         m_error.code));
+    DBUG_RETURN(m_error.code);
+  }
+
+  if(dst.m_type == type)
+  {
+    DBUG_RETURN(0);
+  }
+  DBUG_PRINT("info", ("get_filegroup failed no such filegroup"));
+  DBUG_RETURN(m_error.code = GetTabInfoRef::TableNotDefined);
+}
+
+int
 NdbDictInterface::get_file(NdbFileImpl & dst,
 			   NdbDictionary::Object::Type type,
 			   int node,
@@ -3928,6 +4081,26 @@
     DBUG_RETURN(m_error.code);
   }
 
+  if(dst.m_type == NdbDictionary::Object::Undofile)
+  {
+    NdbDictionary::LogfileGroup tmp;
+    get_filegroup(NdbLogfileGroupImpl::getImpl(tmp),
+		  NdbDictionary::Object::LogfileGroup,
+		  dst.m_filegroup_id);
+    dst.m_filegroup_name.assign(tmp.getName());
+  }
+  else if(dst.m_type == NdbDictionary::Object::Datafile)
+  {
+    NdbDictionary::Tablespace tmp;
+    get_filegroup(NdbTablespaceImpl::getImpl(tmp),
+		  NdbDictionary::Object::Tablespace,
+		  dst.m_filegroup_id);
+    dst.m_filegroup_name.assign(tmp.getName());
+    dst.m_free *= tmp.getExtentSize();
+  }
+  else
+    dst.m_filegroup_name.assign("Not Yet Implemented");
+  
   if(dst.m_type == type)
   {
     DBUG_RETURN(0);
@@ -3957,12 +4130,11 @@
   dst.m_id= f.FileNo;
 
   dst.m_size= ((Uint64)f.FileSizeHi << 32) | (f.FileSizeLo);
-  dst.m_free= f.FileFreeExtents;
   dst.m_path.assign(f.FileName);
-  //dst.m_filegroup_name
+
   dst.m_filegroup_id= f.FilegroupId;
   dst.m_filegroup_version= f.FilegroupVersion;
-
+  dst.m_free=  f.FileFreeExtents;
   return 0;
 }
 
@@ -3973,3 +4145,13 @@
 template class Vector<NdbTableImpl*>;
 template class Vector<NdbColumnImpl*>;
 
+const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT = 0;
+const NdbDictionary::Column * NdbDictionary::Column::FRAGMENT_MEMORY = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROW_COUNT = 0;
+const NdbDictionary::Column * NdbDictionary::Column::COMMIT_COUNT = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROW_SIZE = 0;
+const NdbDictionary::Column * NdbDictionary::Column::RANGE_NO = 0;
+const NdbDictionary::Column * NdbDictionary::Column::DISK_REF = 0;
+const NdbDictionary::Column * NdbDictionary::Column::RECORDS_IN_RANGE = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROWID = 0;
+const NdbDictionary::Column * NdbDictionary::Column::ROW_GCI = 0;

--- 1.38/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-01-05 12:01:59 +01:00
+++ 1.39/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2006-01-11 12:45:12 +01:00
@@ -154,6 +154,8 @@
   Vector<Uint16> m_fragments;
 
   bool m_logging;
+  bool m_row_gci;
+  bool m_row_checksum;
   int m_kvalue;
   int m_minLoadFactor;
   int m_maxLoadFactor;
@@ -294,6 +296,7 @@
   BaseString m_logfile_group_name;
   Uint32 m_logfile_group_id;
   Uint32 m_logfile_group_version;
+  Uint64 m_undo_free_words;
 };
 
 class NdbTablespaceImpl : public NdbDictionary::Tablespace, 
@@ -303,6 +306,8 @@
   NdbTablespaceImpl(NdbDictionary::Tablespace &);
   ~NdbTablespaceImpl();
 
+  void assign(const NdbTablespaceImpl&);
+
   static NdbTablespaceImpl & getImpl(NdbDictionary::Tablespace & t);
   static const NdbTablespaceImpl & getImpl(const NdbDictionary::Tablespace &);
   NdbDictionary::Tablespace * m_facade;
@@ -315,6 +320,8 @@
   NdbLogfileGroupImpl(NdbDictionary::LogfileGroup &);
   ~NdbLogfileGroupImpl();
 
+  void assign(const NdbLogfileGroupImpl&);
+
   static NdbLogfileGroupImpl & getImpl(NdbDictionary::LogfileGroup & t);
   static const NdbLogfileGroupImpl& getImpl(const 
 					    NdbDictionary::LogfileGroup&);
@@ -338,6 +345,8 @@
   NdbDatafileImpl(NdbDictionary::Datafile &);
   ~NdbDatafileImpl();
 
+  void assign(const NdbDatafileImpl&);
+
   static NdbDatafileImpl & getImpl(NdbDictionary::Datafile & t);
   static const NdbDatafileImpl & getImpl(const NdbDictionary::Datafile & t);
   NdbDictionary::Datafile * m_facade;
@@ -349,6 +358,8 @@
   NdbUndofileImpl(NdbDictionary::Undofile &);
   ~NdbUndofileImpl();
 
+  void assign(const NdbUndofileImpl&);
+
   static NdbUndofileImpl & getImpl(NdbDictionary::Undofile & t);
   static const NdbUndofileImpl & getImpl(const NdbDictionary::Undofile & t);
   NdbDictionary::Undofile * m_facade;
@@ -404,10 +415,10 @@
 			    const Uint32 * data, Uint32 len,
 			    bool fullyQualifiedNames);
 
-  static int parseFileInfo(NdbFileImpl &dst, 
+  static int parseFileInfo(NdbFileImpl &dst,
 			   const Uint32 * data, Uint32 len);
-  
-  static int parseFilegroupInfo(NdbFilegroupImpl &dst, 
+
+  static int parseFilegroupInfo(NdbFilegroupImpl &dst,
 				const Uint32 * data, Uint32 len);
   
   int create_file(const NdbFileImpl &, const NdbFilegroupImpl&, bool overwrite = false);
@@ -415,7 +426,7 @@
   int create_filegroup(const NdbFilegroupImpl &);
   int drop_filegroup(const NdbFilegroupImpl &);
   
-  int get_filegroup(NdbFilegroupImpl&, NdbDictionary::Object::Type, int);
+  int get_filegroup(NdbFilegroupImpl&, NdbDictionary::Object::Type, Uint32);
   int get_filegroup(NdbFilegroupImpl&,NdbDictionary::Object::Type,const char*);
   int get_file(NdbFileImpl&, NdbDictionary::Object::Type, int, int);
   int get_file(NdbFileImpl&, NdbDictionary::Object::Type, int, const char *);

--- 1.237/sql/ha_ndbcluster.cc	2006-01-11 12:43:16 +01:00
+++ 1.238/sql/ha_ndbcluster.cc	2006-01-11 12:51:28 +01:00
@@ -52,9 +52,7 @@
 // createable against NDB from this handler
 static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
 
-static bool ndbcluster_init(void);
-static int ndbcluster_end(ha_panic_function flag);
-static bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type);
+static int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info);
 
 handlerton ndbcluster_hton = {
   MYSQL_HANDLERTON_INTERFACE_VERSION,
@@ -3966,13 +3964,8 @@
                         field->pack_length()));
     if ((my_errno= create_ndb_column(col, field, info)))
       DBUG_RETURN(my_errno);
-
-    if (
-#ifdef NDB_DISKDATA
-	info->store_on_disk ||
-#else
-	getenv("NDB_DEFAULT_DISK"))
-#endif
+ 
+    if (info->store_on_disk || getenv("NDB_DEFAULT_DISK"))
       col.setStorageType(NdbDictionary::Column::StorageTypeDisk);
     else
       col.setStorageType(NdbDictionary::Column::StorageTypeMemory);
@@ -3992,14 +3985,11 @@
                              NdbDictionary::Column::StorageTypeMemory);
   }
 
-#ifdef NDB_DISKDATA
   if (info->store_on_disk)
     if (info->tablespace)
       tab.setTablespace(info->tablespace);
     else
       tab.setTablespace("DEFAULT-TS");
-#endif
-
   // No primary key, create shadow key as 64 bit, auto increment  
   if (form->s->primary_key == MAX_KEY) 
   {
@@ -5334,6 +5324,7 @@
     h.drop_database=    ndbcluster_drop_database;  /* Drop a database */
     h.panic=            ndbcluster_end;            /* Panic call */
     h.show_status=      ndbcluster_show_status;    /* Show status */
+    h.alter_tablespace= ndbcluster_alter_tablespace;    /* Show status */
 #ifdef HAVE_NDB_BINLOG
     ndbcluster_binlog_init_handlerton();
 #endif
@@ -8790,7 +8781,6 @@
   return COMPATIBLE_DATA_YES;
 }
 
-#ifdef NDB_DISKDATA
 bool set_up_tablespace(st_alter_tablespace *info,
                        NdbDictionary::Tablespace *ndb_ts)
 {
@@ -8831,21 +8821,25 @@
   return false;
 }
 
-int ha_ndbcluster::alter_tablespace(st_alter_tablespace *info)
+int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info)
 {
-  Ndb *ndb;
-  NDBDICT *dict;
-  int error;
   DBUG_ENTER("ha_ndbcluster::alter_tablespace");
-  if (check_ndb_connection())
+
+  Ndb *ndb= check_ndb_in_thd(thd);
+  if (ndb == NULL)
   {
-    DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
-  ndb= get_ndb();
-  dict= ndb->getDictionary();
+  
+  NDBDICT *dict = ndb->getDictionary();
+  int error;
+  const char * errmsg;
+
   switch (info->ts_cmd_type){
   case (CREATE_TABLESPACE):
   {
+    error= ER_CREATE_TABLESPACE_FAILED;
+    
     NdbDictionary::Tablespace ndb_ts;
     NdbDictionary::Datafile ndb_df;
     if (set_up_tablespace(info, &ndb_ts))
@@ -8856,23 +8850,24 @@
     {
       DBUG_RETURN(1);
     }
-    if (error= dict->createTablespace(ndb_ts))
+    errmsg= "TABLESPACE";
+    if (dict->createTablespace(ndb_ts))
     {
       DBUG_PRINT("error", ("createTablespace returned %d", error));
-      my_error(ER_CREATE_TABLESPACE_FAILED, MYF(0), "TABLESPACE");
-      DBUG_RETURN(1);
+      goto ndberror;
     }
     DBUG_PRINT("info", ("Successfully created Tablespace"));
-    if (error= dict->createDatafile(ndb_df))
+    errmsg= "DATAFILE";
+    if (dict->createDatafile(ndb_df))
     {
       DBUG_PRINT("error", ("createDatafile returned %d", error));
-      my_error(ER_CREATE_TABLESPACE_FAILED, MYF(0), "DATAFILE");
-      DBUG_RETURN(1);
+      goto ndberror;
     }
     break;
   }
   case (ALTER_TABLESPACE):
   {
+    error= ER_ALTER_TABLESPACE_FAILED;
     if (info->ts_alter_tablespace_type == ALTER_TABLESPACE_ADD_FILE)
     {
       NdbDictionary::Datafile ndb_df;
@@ -8880,11 +8875,10 @@
       {
 	DBUG_RETURN(1);
       }
-      if (error= dict->createDatafile(ndb_df))
+      errmsg= " CREATE DATAFILE";
+      if (dict->createDatafile(ndb_df))
       {
-	DBUG_PRINT("error", ("createDatafile returned %d", error));
-	my_error(ER_ALTER_TABLESPACE_FAILED, MYF(0), "CREATE DATAFILE");
-	DBUG_RETURN(1);
+	goto ndberror;
       }
     }
     else if(info->ts_alter_tablespace_type == ALTER_TABLESPACE_DROP_FILE)
@@ -8893,11 +8887,10 @@
 						     info->data_file_name);
       if (strcmp(df.getPath(), info->data_file_name) == 0)
       {
-	if (error= dict->dropDatafile(df))
+	errmsg= " DROP DATAFILE";
+	if (dict->dropDatafile(df))
 	{
-	  DBUG_PRINT("error", ("createDatafile returned %d", error));
-	  my_error(ER_ALTER_TABLESPACE_FAILED, MYF(0), " DROP DATAFILE");
-	  DBUG_RETURN(1);
+	  goto ndberror;
 	}
       }
       else
@@ -8917,6 +8910,7 @@
   }
   case (CREATE_LOGFILE_GROUP):
   {
+    error= ER_CREATE_TABLESPACE_FAILED;
     NdbDictionary::LogfileGroup ndb_lg;
     NdbDictionary::Undofile ndb_uf;
     if (info->undo_file_name == NULL)
@@ -8930,27 +8924,26 @@
     {
       DBUG_RETURN(1);
     }
-    if (error= dict->createLogfileGroup(ndb_lg))
+    errmsg= "LOGFILE GROUP";
+    if (dict->createLogfileGroup(ndb_lg))
     {
-      DBUG_PRINT("error", ("createLogfileGroup returned %d", error));
-      my_error(ER_CREATE_TABLESPACE_FAILED, MYF(0), "LOGFILE GROUP");
-      DBUG_RETURN(1);
+      goto ndberror;
     }
     DBUG_PRINT("info", ("Successfully created Logfile Group"));
     if (set_up_undofile(info, &ndb_uf))
     {
       DBUG_RETURN(1);
     }
-    if (error= dict->createUndofile(ndb_uf))
+    errmsg= "UNDOFILE";
+    if (dict->createUndofile(ndb_uf))
     {
-      DBUG_PRINT("error", ("createUndofile returned %d", error));
-      my_error(ER_CREATE_TABLESPACE_FAILED, MYF(0), "UNDOFILE");
-      DBUG_RETURN(1);
+      goto ndberror;
     }
     break;
   }
   case (ALTER_LOGFILE_GROUP):
   {
+    error= ER_ALTER_TABLESPACE_FAILED;
     if (info->undo_file_name == NULL)
     {
       /*
@@ -8963,32 +8956,30 @@
     {
       DBUG_RETURN(1);
     }
-    if (error= dict->createUndofile(ndb_uf))
+    errmsg= "CREATE UNDOFILE";
+    if (dict->createUndofile(ndb_uf))
     {
-      DBUG_PRINT("error", ("createUndofile returned %d", error));
-      my_error(ER_ALTER_TABLESPACE_FAILED, MYF(0), "CREATE UNDOFILE");
-      DBUG_RETURN(1);
+      goto ndberror;
     }
     break;
   }
   case (DROP_TABLESPACE):
   {
-    if (error= dict->dropTablespace(
-				    dict->getTablespace(info->tablespace_name)))
+    error= ER_DROP_TABLESPACE_FAILED;
+    errmsg= "TABLESPACE";
+    if (dict->dropTablespace(dict->getTablespace(info->tablespace_name)))
     {
-      DBUG_PRINT("error", ("dropTablespace returned %d", error));
-      my_error(ER_DROP_TABLESPACE_FAILED, MYF(0), "TABLESPACE");
-      DBUG_RETURN(1);
+      goto ndberror;
     }
     break;
   }
   case (DROP_LOGFILE_GROUP):
   {
-    if (error= dict->dropLogfileGroup(dict->getLogfileGroup(info->logfile_group_name)))
+    error= ER_DROP_TABLESPACE_FAILED;
+    errmsg= "LOGFILE GROUP";
+    if (dict->dropLogfileGroup(dict->getLogfileGroup(info->logfile_group_name)))
     {
-      DBUG_PRINT("error", ("dropLogfileGroup returned %d", error));
-      my_error(ER_DROP_TABLESPACE_FAILED, MYF(0), "LOGFILE GROUP");
-      DBUG_RETURN(1);
+      goto ndberror;
     }
     break;
   }
@@ -9006,6 +8997,13 @@
   }
   }
   DBUG_RETURN(FALSE);
+
+ndberror:
+  const NdbError err= dict->getNdbError();
+  ERR_PRINT(err);
+  ndb_to_mysql_error(&err);
+  
+  my_error(error, MYF(0), errmsg);
+  DBUG_RETURN(1);
 }
 
-#endif /* NDB_DISKDATA */

--- 1.105/sql/ha_ndbcluster.h	2006-01-10 23:33:06 +01:00
+++ 1.106/sql/ha_ndbcluster.h	2006-01-11 12:45:10 +01:00
@@ -537,6 +537,7 @@
                               bool eq_range, bool sorted,
                               byte* buf);
   int read_range_next();
+  int alter_tablespace(st_alter_tablespace *info);
 
   /**
    * Multi range stuff

--- 1.77/libmysqld/Makefile.am	2006-01-05 12:58:34 +01:00
+++ 1.78/libmysqld/Makefile.am	2006-01-11 12:51:28 +01:00
@@ -66,6 +66,7 @@
 	parse_file.cc sql_view.cc sql_trigger.cc my_decimal.cc \
 	item_xmlfunc.cc \
         rpl_filter.cc sql_partition.cc handlerton.cc sql_plugin.cc \
+        sql_tablespace.cc \
         rpl_injector.cc
 
 libmysqld_int_a_SOURCES= $(libmysqld_sources) $(libmysqlsources) $(sqlsources)

--- 1.46/storage/ndb/src/ndbapi/ndberror.c	2006-01-05 12:01:59 +01:00
+++ 1.47/storage/ndb/src/ndbapi/ndberror.c	2006-01-11 12:45:13 +01:00
@@ -174,6 +174,7 @@
   { 805,  DMEC, TR, "Out of attrinfo records in tuple manager" },
   { 830,  DMEC, TR, "Out of add fragment operation records" },
   { 873,  DMEC, TR, "Out of attrinfo records for scan in tuple manager" },
+  { 899,  DMEC, TR, "Rowid already allocated" },
   { 1217, DMEC, TR, "Out of operation records in local data manager (increase MaxNoOfLocalOperations)" },
   { 1220, DMEC, TR, "REDO log files overloaded, consult online manual (decrease TimeBetweenLocalCheckpoints, and|or increase NoOfFragmentLogFiles)" },
   { 1222, DMEC, TR, "Out of transaction markers in LQH" },
@@ -396,6 +397,20 @@
   { 1229, DMEC, SE, "Too long frm data supplied" },
   { 1231, DMEC, SE, "Invalid table or index to scan" },
   { 1232, DMEC, SE, "Invalid table or index to scan" },
+
+  { 1502, DMEC, IE, "Filegroup already exists" },
+  { 1503, DMEC, SE, "Out of filegroup records" },
+  { 1504, DMEC, SE, "Out of logbuffer memory" },
+  { 1505, DMEC, IE, "Invalid filegroup" },
+  { 1506, DMEC, IE, "Invalid filegroup version" },
+  { 1507, DMEC, IE, "File no already inuse" },
+  { 1508, DMEC, SE, "Out of file records" },
+  { 1509, DMEC, SE, "File system error, check if path,permissions etc" },
+  { 1510, DMEC, IE, "File meta data error" },
+  { 1511, DMEC, IE, "Out of memory" },
+  { 1512, DMEC, SE, "File read error" },
+  { 1513, DMEC, IE, "Filegroup not online" },
+  
 
   /**
    * FunctionNotImplemented

--- 1.13/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp	2006-01-05 12:01:58 +01:00
+++ 1.14/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp	2006-01-11 12:51:28 +01:00
@@ -19,6 +19,19 @@
 #define CLUSTER_CONNECTION_HPP
 #include <ndb_types.h>
 
+class Ndb_cluster_connection_node_iter
+{
+  friend class Ndb_cluster_connection_impl;
+public:
+  Ndb_cluster_connection_node_iter() : scan_state(~0),
+				       init_pos(0),
+				       cur_pos(0) {};
+private:
+  unsigned char scan_state;
+  unsigned char init_pos;
+  unsigned char cur_pos;
+};
+
 /**
  * @class Ndb_cluster_connection
  * @brief Represents a connection to a cluster of storage nodes.
@@ -103,7 +116,7 @@
   unsigned node_id();
 
   void init_get_next_node(Ndb_cluster_connection_node_iter &iter);
-  Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter);
+  unsigned int get_next_node(Ndb_cluster_connection_node_iter &iter);
 #endif
 
 private:

--- 1.36/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2006-01-05 12:01:59 +01:00
+++ 1.37/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2006-01-11 12:45:13 +01:00
@@ -556,5 +556,17 @@
   DBUG_VOID_RETURN;
 }
 
+void
+Ndb_cluster_connection::init_get_next_node(Ndb_cluster_connection_node_iter &iter)
+{
+  m_impl.init_get_next_node(iter);
+}
+
+Uint32
+Ndb_cluster_connection::get_next_node(Ndb_cluster_connection_node_iter &iter)
+{
+  return m_impl.get_next_node(iter);
+}
+
 template class Vector<Ndb_cluster_connection_impl::Node>;
 

--- 1.8/storage/ndb/tools/restore/consumer.hpp	2006-01-10 15:04:27 +01:00
+++ 1.9/storage/ndb/tools/restore/consumer.hpp	2006-01-11 12:45:13 +01:00
@@ -26,6 +26,7 @@
 public:
   virtual ~BackupConsumer() { }
   virtual bool init() { return true;}
+  virtual bool object(Uint32 tableType, const void*) { return true;}
   virtual bool table(const TableS &){return true;}
   virtual bool endOfTables() { return true; }
   virtual void tuple(const TupleS &){}

--- 1.22/storage/ndb/tools/restore/consumer_restore.cpp	2006-01-05 12:01:59 +01:00
+++ 1.23/storage/ndb/tools/restore/consumer_restore.cpp	2006-01-11 12:45:13 +01:00
@@ -152,6 +152,141 @@
   return ret;
 }
 
+#include <signaldata/DictTabInfo.hpp>
+
+bool
+BackupRestore::object(Uint32 type, const void * ptr)
+{
+  if (!m_restore_meta)
+    return true;
+  
+  NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
+  switch(type){
+  case DictTabInfo::Tablespace:
+  {
+    NdbDictionary::Tablespace old(*(NdbDictionary::Tablespace*)ptr);
+
+    Uint32 id = old.getObjectId();
+
+    if (!m_no_restore_disk)
+    {
+      NdbDictionary::LogfileGroup * lg = m_logfilegroups[old.getDefaultLogfileGroupId()];
+      old.setDefaultLogfileGroup(* lg);
+      int ret = dict->createTablespace(old);
+      if (ret)
+      {
+	NdbError errobj= dict->getNdbError();
+	err << "Failed to create tablespace \"" << old.getName() << "\": "
+	    << errobj << endl;
+	return false;
+      }
+      debug << "Created tablespace: " << old.getName() << endl;
+    }
+    
+    NdbDictionary::Tablespace curr = dict->getTablespace(old.getName());
+    NdbError errobj = dict->getNdbError();
+    if(errobj.classification == ndberror_cl_none)
+    {
+      NdbDictionary::Tablespace* currptr = new NdbDictionary::Tablespace(curr);
+      NdbDictionary::Tablespace * null = 0;
+      m_tablespaces.set(currptr, id, null);
+      debug << "Retreived tablspace: " << currptr->getName() 
+	    << " oldid: " << id << " newid: " << currptr->getObjectId() 
+	    << " " << (void*)currptr << endl;
+      return true;
+    }
+    
+    err << "Failed to retrieve tablespace \"" << old.getName() << "\": "
+	<< errobj << endl;
+    
+    return false;
+    break;
+  }
+  case DictTabInfo::LogfileGroup:
+  {
+    NdbDictionary::LogfileGroup old(*(NdbDictionary::LogfileGroup*)ptr);
+    
+    Uint32 id = old.getObjectId();
+    
+    if (!m_no_restore_disk)
+    {
+      int ret = dict->createLogfileGroup(old);
+      if (ret)
+      {
+	NdbError errobj= dict->getNdbError();
+	err << "Failed to create logfile group \"" << old.getName() << "\": "
+	    << errobj << endl;
+	return false;
+      }
+      debug << "Created logfile group: " << old.getName() << endl;
+    }
+    
+    NdbDictionary::LogfileGroup curr = dict->getLogfileGroup(old.getName());
+    NdbError errobj = dict->getNdbError();
+    if(errobj.classification == ndberror_cl_none)
+    {
+      NdbDictionary::LogfileGroup* currptr = 
+	new NdbDictionary::LogfileGroup(curr);
+      NdbDictionary::LogfileGroup * null = 0;
+      m_logfilegroups.set(currptr, id, null);
+      debug << "Retreived logfile group: " << currptr->getName() 
+	    << " oldid: " << id << " newid: " << currptr->getObjectId() 
+	    << " " << (void*)currptr << endl;
+      return true;
+    }
+    
+    err << "Failed to retrieve logfile group \"" << old.getName() << "\": "
+	<< errobj << endl;
+    
+    return false;
+    break;
+  }
+  case DictTabInfo::Datafile:
+  {
+    if (!m_no_restore_disk)
+    {
+      NdbDictionary::Datafile old(*(NdbDictionary::Datafile*)ptr);
+      NdbDictionary::Tablespace * ts = m_tablespaces[old.getTablespaceId()];
+      debug << "Connecting datafile " << old.getPath() 
+	    << " to tablespace: oldid: " << old.getTablespaceId() 
+	    << " newid: " << ts->getObjectId() << endl;
+      old.setTablespace(* ts);
+      if (dict->createDatafile(old))
+      {
+	err << "Failed to create datafile \"" << old.getPath() << "\": "
+	    << dict->getNdbError() << endl;
+	return false;
+      }
+    }
+    return true;
+    break;
+  }
+  case DictTabInfo::Undofile:
+  {
+    if (!m_no_restore_disk)
+    {
+      NdbDictionary::Undofile old(*(NdbDictionary::Undofile*)ptr);
+      NdbDictionary::LogfileGroup * lg = 
+	m_logfilegroups[old.getLogfileGroupId()];
+      debug << "Connecting undofile " << old.getPath() 
+	    << " to logfile group: oldid: " << old.getLogfileGroupId() 
+	    << " newid: " << lg->getObjectId() 
+	    << " " << (void*)lg << endl;
+      old.setLogfileGroup(* lg);
+      if (dict->createUndofile(old))
+      {
+	err << "Failed to create undofile \"" << old.getPath() << "\": "
+	    << dict->getNdbError() << endl;
+	return false;
+      }
+    }
+    return true;
+    break;
+  }
+  }
+  return true;
+}
+
 bool
 BackupRestore::update_apply_status(const RestoreMetaData &metaData)
 {
@@ -241,7 +376,15 @@
     NdbDictionary::Table copy(*table.m_dictTable);
 
     copy.setName(split[2].c_str());
-
+    if (copy.getTablespaceId() != RNIL)
+    {
+      Uint32 id = copy.getTablespaceId();
+      debug << "Connecting " << name << " to tablespace oldid: " << id << flush;
+      NdbDictionary::Tablespace* ts = m_tablespaces[copy.getTablespaceId()];
+      debug << " newid: " << ts->getObjectId() << endl;
+      copy.setTablespace(* ts);
+    }
+    
     if (dict->createTable(copy) == -1) 
     {
       err << "Create table " << table.getTableName() << " failed: "
@@ -804,3 +947,5 @@
 
 template class Vector<NdbDictionary::Table*>;
 template class Vector<const NdbDictionary::Table*>;
+template class Vector<NdbDictionary::Tablespace*>;
+template class Vector<NdbDictionary::LogfileGroup*>;

--- 1.11/storage/ndb/tools/restore/consumer_restore.hpp	2006-01-05 12:01:59 +01:00
+++ 1.12/storage/ndb/tools/restore/consumer_restore.hpp	2006-01-11 12:51:28 +01:00
@@ -39,6 +39,7 @@
     m_logCount = m_dataCount = 0;
     m_restore = false;
     m_restore_meta = false;
+    m_no_restore_disk = false;
     m_restore_epoch = false;
     m_parallelism = parallelism;
     m_callback = 0;
@@ -50,6 +51,7 @@
   virtual ~BackupRestore();
   virtual bool init();
   virtual void release();
+  virtual bool object(Uint32 type, const void* ptr);
   virtual bool table(const TableS &);
   virtual bool endOfTables();
   virtual void tuple(const TupleS &);
@@ -68,6 +70,7 @@
   Ndb_cluster_connection * m_cluster_connection;
   bool m_restore;
   bool m_restore_meta;
+  bool m_no_restore_disk;
   bool m_restore_epoch;
   Uint32 m_logCount;
   Uint32 m_dataCount;
@@ -91,6 +94,8 @@
   const NdbDictionary::Table* get_table(const NdbDictionary::Table* );
 
   Vector<const NdbDictionary::Table*> m_indexes;
+  Vector<NdbDictionary::Tablespace*> m_tablespaces;    // Index by id
+  Vector<NdbDictionary::LogfileGroup*> m_logfilegroups;// Index by id
 };
 
 #endif
Thread
bk commit into 5.1 tree (tomas:1.2033)tomas11 Jan