List:Commits« Previous MessageNext Message »
From:tomas Date:March 8 2007 7:13pm
Subject:bk commit into 5.1 tree (tomas:1.2473)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2473 07/03/09 01:13:06 tomas@stripped +6 -0
  Merge poseidon.mysql.com:/home/tomas/mysql-5.0-ndb
  into  poseidon.mysql.com:/home/tomas/mysql-5.1-new-ndb

  storage/ndb/tools/restore/restore_main.cpp
    1.54 07/03/09 01:12:48 tomas@stripped +0 -2
    manual merge

  storage/ndb/tools/restore/Restore.hpp
    1.30 07/03/09 01:10:16 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/Ndb.cpp
    1.90 07/03/09 01:10:16 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
    1.139 07/03/09 01:10:16 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
    1.52 07/03/09 01:10:14 tomas@stripped +0 -0
    Auto merged

  storage/ndb/include/util/OutputStream.hpp
    1.15 07/03/09 01:10:14 tomas@stripped +0 -0
    Auto merged

  storage/ndb/tools/restore/restore_main.cpp
    1.29.16.2 07/03/09 01:10:13 tomas@stripped +0 -0
    Merge rename: ndb/tools/restore/restore_main.cpp ->
storage/ndb/tools/restore/restore_main.cpp

  storage/ndb/tools/restore/Restore.hpp
    1.18.8.2 07/03/09 01:10:13 tomas@stripped +0 -0
    Merge rename: ndb/tools/restore/Restore.hpp ->
storage/ndb/tools/restore/Restore.hpp

  storage/ndb/src/ndbapi/Ndb.cpp
    1.49.24.2 07/03/09 01:10:13 tomas@stripped +0 -0
    Merge rename: ndb/src/ndbapi/Ndb.cpp -> storage/ndb/src/ndbapi/Ndb.cpp

  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
    1.73.40.2 07/03/09 01:10:12 tomas@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbtc/DbtcMain.cpp ->
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp

  storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
    1.26.20.2 07/03/09 01:10:12 tomas@stripped +0 -0
    Merge rename: ndb/src/kernel/blocks/dbtc/Dbtc.hpp ->
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp

  storage/ndb/include/util/OutputStream.hpp
    1.3.9.2 07/03/09 01:10:12 tomas@stripped +0 -0
    Merge rename: ndb/include/util/OutputStream.hpp ->
storage/ndb/include/util/OutputStream.hpp

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.mysql.com
# Root:	/home/tomas/mysql-5.1-new-ndb/RESYNC

--- 1.3.9.1/ndb/include/util/OutputStream.hpp	2007-03-09 00:52:13 +07:00
+++ 1.15/storage/ndb/include/util/OutputStream.hpp	2007-03-09 01:10:14 +07:00
@@ -35,6 +35,7 @@ class FileOutputStream : public OutputSt
   FILE * f;
 public:
   FileOutputStream(FILE * file = stdout);
+  virtual ~FileOutputStream() {}
   FILE *getFile() { return f; }
 
   int print(const char * fmt, ...);
@@ -47,22 +48,16 @@ class SocketOutputStream : public Output
   unsigned m_timeout_ms;
 public:
   SocketOutputStream(NDB_SOCKET_TYPE socket, unsigned write_timeout_ms = 1000);
+  virtual ~SocketOutputStream() {}
 
   int print(const char * fmt, ...);
   int println(const char * fmt, ...);
 };
 
-class SoftOseOutputStream : public OutputStream {
-public:
-  SoftOseOutputStream();
-  
-  int print(const char * fmt, ...);
-  int println(const char * fmt, ...);
-};
-
 class NullOutputStream : public OutputStream {
 public:
   NullOutputStream() {}
+  virtual ~NullOutputStream() {}
   int print(const char * /* unused */, ...) { return 1;}
   int println(const char * /* unused */, ...) { return 1;}
 };

--- 1.18.8.1/ndb/tools/restore/Restore.hpp	2007-03-09 00:55:40 +07:00
+++ 1.30/storage/ndb/tools/restore/Restore.hpp	2007-03-09 01:10:16 +07:00
@@ -139,8 +139,6 @@ class TableS {
   Uint32 m_auto_val_id;
   Uint64 m_max_auto_val;
 
-  int pos;
-
   bool isSysTable;
   TableS *m_main_table;
   Uint32 m_local_id;
@@ -251,6 +249,8 @@ public:
   TableS& operator=(TableS& org) ; 
 }; // TableS;
 
+class RestoreLogIterator;
+
 class BackupFile {
 protected:
   FILE * m_file;
@@ -293,6 +293,11 @@ public:
   bool Twiddle(const AttributeDesc *  attr_desc, AttributeData * attr_data, Uint32
arraySize = 0);
 };
 
+struct DictObject {
+  Uint32 m_objType;
+  void * m_objPtr;
+};
+
 class RestoreMetaData : public BackupFile {
 
   Vector<TableS *> allTables;
@@ -309,6 +314,8 @@ class RestoreMetaData : public BackupFil
   
   bool parseTableDescriptor(const Uint32 * data, Uint32 len);
 
+  Vector<DictObject> m_objects;
+  
 public:
   RestoreMetaData(const char * path, Uint32 nodeId, Uint32 bNo);
   virtual ~RestoreMetaData();
@@ -320,6 +327,10 @@ public:
   const TableS * operator[](int i) const { return allTables[i];}
   TableS * getTable(Uint32 tableId) const;
 
+  Uint32 getNoOfObjects() const { return m_objects.size();}
+  Uint32 getObjType(Uint32 i) const { return m_objects[i].m_objType; }
+  void* getObjPtr(Uint32 i) const { return m_objects[i].m_objPtr; }
+  
   Uint32 getStopGCP() const;
 }; // RestoreMetaData
 
@@ -337,7 +348,7 @@ public:
   ~RestoreDataIterator() {};
   
   // Read data file fragment header
-  bool readFragmentHeader(int & res);
+  bool readFragmentHeader(int & res, Uint32 *fragmentId);
   bool validateFragmentFooter();
 
   const TupleS *getNextTuple(int & res);
@@ -350,8 +361,9 @@ public:
     LE_DELETE,
     LE_UPDATE
   };
+  Uint32 m_frag_id;
   EntryType m_type;
-  TableS * m_table;  
+  TableS * m_table;
   Vector<AttributeS*> m_values;
   Vector<AttributeS*> m_values_e;
   AttributeS *add_attr() {

--- 1.29.16.1/ndb/tools/restore/restore_main.cpp	2007-03-09 00:55:40 +07:00
+++ 1.54/storage/ndb/tools/restore/restore_main.cpp	2007-03-09 01:12:48 +07:00
@@ -40,6 +40,11 @@ static BackupPrinter* g_printer = NULL;
 static const char* default_backupPath = "." DIR_SEPARATOR;
 static const char* ga_backupPath = default_backupPath;
 
+static const char *opt_nodegroup_map_str= 0;
+static unsigned opt_nodegroup_map_len= 0;
+static NODE_GROUP_MAP opt_nodegroup_map[MAX_NODE_GROUP_MAPS];
+#define OPT_NDB_NODEGROUP_MAP 'z'
+
 const char *opt_ndb_database= NULL;
 const char *opt_ndb_table= NULL;
 unsigned int opt_verbose;
@@ -53,6 +58,7 @@ NDB_STD_OPTS_VARS;
 /**
  * print and restore flags
  */
+static bool ga_restore_epoch = false;
 static bool ga_restore = false;
 static bool ga_print = false;
 static int _print = 0;
@@ -61,6 +67,7 @@ static int _print_data = 0;
 static int _print_log = 0;
 static int _restore_data = 0;
 static int _restore_meta = 0;
+static int _no_restore_disk = 0;
 BaseString g_options("ndb_restore");
 
 const char *load_default_groups[]= { "mysql_cluster","ndb_restore",0 };
@@ -107,6 +114,16 @@ static struct my_option my_long_options[
     "Restore meta data into NDB Cluster using NDBAPI",
     (gptr*) &_restore_meta, (gptr*) &_restore_meta,  0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "no-restore-disk-objects", 'd',
+    "Dont restore disk objects (tablespace/logfilegroups etc)",
+    (gptr*) &_no_restore_disk, (gptr*) &_no_restore_disk,  0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "restore_epoch", 'e', 
+    "Restore epoch info into the status table. Convenient on a MySQL Cluster "
+    "replication slave, for starting replication. The row in "
+    NDB_REP_DB "." NDB_APPLY_TABLE " with id 0 will be updated/inserted.", 
+    (gptr*) &ga_restore_epoch, (gptr*) &ga_restore_epoch,  0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { "parallelism", 'p',
     "No of parallel transactions during restore of data."
     "(parallelism can be 1 to 1024)", 
@@ -131,6 +148,12 @@ static struct my_option my_long_options[
     "Experimental. Do not ignore system table during restore.", 
     (gptr*) &ga_dont_ignore_systab_0, (gptr*) &ga_dont_ignore_systab_0, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "ndb-nodegroup-map", OPT_NDB_NODEGROUP_MAP,
+    "Nodegroup map for ndbcluster. Syntax: list of (source_ng, dest_ng)",
+    (gptr*) &opt_nodegroup_map_str,
+    (gptr*) &opt_nodegroup_map_str,
+    0,
+    GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
   { "fields-enclosed-by", OPT_FIELDS_ENCLOSED_BY,
     "Fields are enclosed by ...",
     (gptr*) &opt_fields_enclosed_by, (gptr*) &opt_fields_enclosed_by, 0,
@@ -165,6 +188,115 @@ static struct my_option my_long_options[
   { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
 };
 
+
+static char* analyse_one_map(char *map_str, uint16 *source, uint16 *dest)
+{
+  char *end_ptr;
+  int number;
+  DBUG_ENTER("analyse_one_map");
+  /*
+    Search for pattern ( source_ng , dest_ng )
+  */
+
+  while (isspace(*map_str)) map_str++;
+
+  if (*map_str != '(')
+  {
+    DBUG_RETURN(NULL);
+  }
+  map_str++;
+
+  while (isspace(*map_str)) map_str++;
+
+  number= strtol(map_str, &end_ptr, 10);
+  if (!end_ptr || number < 0 || number >= MAX_NODE_GROUP_MAPS)
+  {
+    DBUG_RETURN(NULL);
+  }
+  *source= (uint16)number;
+  map_str= end_ptr;
+
+  while (isspace(*map_str)) map_str++;
+
+  if (*map_str != ',')
+  {
+    DBUG_RETURN(NULL);
+  }
+  map_str++;
+
+  number= strtol(map_str, &end_ptr, 10);
+  if (!end_ptr || number < 0 || number >= UNDEF_NODEGROUP)
+  {
+    DBUG_RETURN(NULL);
+  }
+  *dest= (uint16)number;
+  map_str= end_ptr;
+
+  if (*map_str != ')')
+  {
+    DBUG_RETURN(NULL);
+  }
+  map_str++;
+
+  while (isspace(*map_str)) map_str++;
+  DBUG_RETURN(map_str);
+}
+
+static bool insert_ng_map(NODE_GROUP_MAP *ng_map,
+                          uint16 source_ng, uint16 dest_ng)
+{
+  uint index= source_ng;
+  uint ng_index= ng_map[index].no_maps;
+
+  opt_nodegroup_map_len++;
+  if (ng_index >= MAX_MAPS_PER_NODE_GROUP)
+    return true;
+  ng_map[index].no_maps++;
+  ng_map[index].map_array[ng_index]= dest_ng;
+  return false;
+}
+
+static void init_nodegroup_map()
+{
+  uint i,j;
+  NODE_GROUP_MAP *ng_map = &opt_nodegroup_map[0];
+
+  for (i = 0; i < MAX_NODE_GROUP_MAPS; i++)
+  {
+    ng_map[i].no_maps= 0;
+    for (j= 0; j < MAX_MAPS_PER_NODE_GROUP; j++)
+      ng_map[i].map_array[j]= UNDEF_NODEGROUP;
+  }
+}
+
+static bool analyse_nodegroup_map(const char *ng_map_str,
+                                  NODE_GROUP_MAP *ng_map)
+{
+  uint16 source_ng, dest_ng;
+  char *local_str= (char*)ng_map_str;
+  DBUG_ENTER("analyse_nodegroup_map");
+
+  do
+  {
+    if (!local_str)
+    {
+      DBUG_RETURN(TRUE);
+    }
+    local_str= analyse_one_map(local_str, &source_ng, &dest_ng);
+    if (!local_str)
+    {
+      DBUG_RETURN(TRUE);
+    }
+    if (insert_ng_map(ng_map, source_ng, dest_ng))
+    {
+      DBUG_RETURN(TRUE);
+    }
+    if (!(*local_str))
+      break;
+  } while (TRUE);
+  DBUG_RETURN(FALSE);
+}
+
 static void short_usage_sub(void)
 {
   printf("Usage: %s [OPTIONS] [<path to backup files>]\n", my_progname);
@@ -208,6 +340,21 @@ get_one_option(int optid, const struct m
     info.setLevel(254);
     info << "Backup Id = " << ga_backupId << endl;
     break;
+  case OPT_NDB_NODEGROUP_MAP:
+    /*
+      This option is used to set a map from nodegroup in original cluster
+      to nodegroup in new cluster.
+    */
+    opt_nodegroup_map_len= 0;
+
+    info.setLevel(254);
+    info << "Analyse node group map" << endl;
+    if (analyse_nodegroup_map(opt_nodegroup_map_str,
+                              &opt_nodegroup_map[0]))
+    {
+      exit(NDBT_ProgramExit(NDBT_WRONGARGS));
+    }
+    break;
   }
   return 0;
 }
@@ -218,18 +365,51 @@ readArguments(int *pargc, char*** pargv)
   debug << "Load defaults" << endl;
   const char *load_default_groups[]= { "mysql_cluster","ndb_restore",0 };
 
+  init_nodegroup_map();
   load_defaults("my",load_default_groups,pargc,pargv);
   debug << "handle_options" << endl;
   if (handle_options(pargc, pargv, my_long_options, get_one_option))
   {
     exit(NDBT_ProgramExit(NDBT_WRONGARGS));
   }
+  for (i = 0; i < MAX_NODE_GROUP_MAPS; i++)
+    opt_nodegroup_map[i].curr_index = 0;
 
-  g_printer = new BackupPrinter();
+#if 0
+  /*
+    Test code written t{
+o verify nodegroup mapping
+  */
+  printf("Handled options successfully\n");
+  Uint16 map_ng[16];
+  Uint32 j;
+  for (j = 0; j < 4; j++)
+  {
+  for (i = 0; i < 4 ; i++)
+    map_ng[i] = i;
+  map_nodegroups(&map_ng[0], (Uint32)4);
+  for (i = 0; i < 4 ; i++)
+    printf("NG %u mapped to %u \n", i, map_ng[i]);
+  }
+  for (j = 0; j < 4; j++)
+  {
+  for (i = 0; i < 8 ; i++)
+    map_ng[i] = i >> 1;
+  map_nodegroups(&map_ng[0], (Uint32)8);
+  for (i = 0; i < 8 ; i++)
+    printf("NG %u mapped to %u \n", i >> 1, map_ng[i]);
+  }
+  exit(NDBT_ProgramExit(NDBT_WRONGARGS));
+#endif
+
+  g_printer = new BackupPrinter(opt_nodegroup_map,
+                                opt_nodegroup_map_len);
   if (g_printer == NULL)
     return false;
 
-  BackupRestore* restore = new BackupRestore(ga_nParallelism);
+  BackupRestore* restore = new BackupRestore(opt_nodegroup_map,
+                                             opt_nodegroup_map_len,
+                                             ga_nParallelism);
   if (restore == NULL) 
   {
     delete g_printer;
@@ -271,6 +451,16 @@ readArguments(int *pargc, char*** pargv)
     restore->m_restore_meta = true;
   }
 
+  if (_no_restore_disk)
+  {
+    restore->m_no_restore_disk = true;
+  }
+  
+  if (ga_restore_epoch)
+  {
+    restore->m_restore_epoch = true;
+  }
+
   {
     BackupConsumer * c = g_printer;
     g_consumers.push_back(c);
@@ -452,6 +642,10 @@ main(int argc, char** argv)
     g_options.appfmt(" -m");
   if (_restore_data)
     g_options.appfmt(" -r");
+  if (ga_restore_epoch)
+    g_options.appfmt(" -e");
+  if (_no_restore_disk)
+    g_options.appfmt(" -d");
   g_options.appfmt(" -p %d", ga_nParallelism);
 
   g_connect_string = opt_connect_str;
@@ -478,6 +672,18 @@ main(int argc, char** argv)
   /**
    * check wheater we can restore the backup (right version).
    */
+  // in these versions there was an error in how replica info was
+  // stored on disk
+  if (version >= MAKE_VERSION(5,1,3) && version <= MAKE_VERSION(5,1,9))
+  {
+    err << "Restore program incompatible with backup versions between "
+        << getVersionString(MAKE_VERSION(5,1,3), 0, buf, sizeof(buf))
+        << " and "
+        << getVersionString(MAKE_VERSION(5,1,9), 0, buf, sizeof(buf))
+        << endl;
+    exitHandler(NDBT_FAILED);
+  }
+
   if (version > NDB_VERSION)
   {
     err << "Restore program older than backup version. Not supported. "
@@ -518,6 +724,18 @@ main(int argc, char** argv)
     }
 
   }
+  debug << "Restore objects (tablespaces, ..)" << endl;
+  for(i = 0; i<metaData.getNoOfObjects(); i++)
+  {
+    for(Uint32 j= 0; j < g_consumers.size(); j++)
+      if (!g_consumers[j]->object(metaData.getObjType(i),
+				  metaData.getObjPtr(i)))
+      {
+	err << "Restore: Failed to restore table: ";
+        err << metaData[i]->getTableName() << " ... Exiting " <<
endl;
+	exitHandler(NDBT_FAILED);
+      } 
+  }
 
   Vector<OutputStream *> table_output(metaData.getNoOfTables());
   debug << "Restoring tables" << endl;
@@ -562,6 +780,14 @@ main(int argc, char** argv)
 	  err << "Restore: Failed to restore table: ";
           err << table->getTableName() << " ... Exiting " << endl;
 	  exitHandler(NDBT_FAILED);
+	} 
+    } else {
+      for(Uint32 j= 0; j < g_consumers.size(); j++)
+        if (!g_consumers[j]->createSystable(* table))
+        {
+          err << "Restore: Failed to restore system table: ";
+          err << table->getTableName() << " ... Exiting " << endl;
+          exitHandler(NDBT_FAILED);
         }
     }
   }
@@ -586,8 +812,8 @@ main(int argc, char** argv)
 	exitHandler(NDBT_FAILED);
       }
       
-      
-      while (dataIter.readFragmentHeader(res= 0))
+      Uint32 fragmentId; 
+      while (dataIter.readFragmentHeader(res= 0, &fragmentId))
       {
 	const TupleS* tuple;
 	while ((tuple = dataIter.getNextTuple(res= 1)) != 0)
@@ -599,7 +825,7 @@ main(int argc, char** argv)
           OutputStream *tmp = ndbout.m_out;
           ndbout.m_out = output;
           for(Uint32 j= 0; j < g_consumers.size(); j++) 
-            g_consumers[j]->tuple(* tuple);
+            g_consumers[j]->tuple(* tuple, fragmentId);
           ndbout.m_out =  tmp;
 	} // while (tuple != NULL);
 	
@@ -678,6 +904,16 @@ main(int argc, char** argv)
       }
     }
   }
+  if (ga_restore_epoch)
+  {
+    for (i= 0; i < g_consumers.size(); i++)
+      if (!g_consumers[i]->update_apply_status(metaData))
+      {
+        err << "Restore: Failed to restore epoch" << endl;
+        return -1;
+      }
+  }
+
   for(Uint32 j= 0; j < g_consumers.size(); j++) 
   {
     if (g_consumers[j]->has_temp_error())

--- 1.26.20.1/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2007-03-09 00:46:32 +07:00
+++ 1.52/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2007-03-09 01:10:14 +07:00
@@ -388,6 +388,13 @@ public:
     Uint32 fireingOperation;
 
     /**
+     * The fragment id of the firing operation. This will be appended
+     * to the Primary Key such that the record can be found even in the
+     * case of user defined partitioning.
+     */
+    Uint32 fragId;
+
+    /**
      * Used for scrapping in case of node failure
      */
     Uint32 nodeId;
@@ -525,7 +532,7 @@ public:
   /**
    * The list of defined indexes
    */  
-  ArrayList<TcIndexData> c_theIndexes;
+  DLList<TcIndexData> c_theIndexes;
   UintR c_maxNumberOfIndexes;
 
   struct TcIndexOperation {
@@ -731,7 +738,7 @@ public:
     UintR accumulatingIndexOp;
     UintR executingIndexOp;
     UintR tcIndxSendArray[6];
-    ArrayList<TcIndexOperation> theSeizedIndexOperations;
+    DLList<TcIndexOperation> theSeizedIndexOperations;
   };
   
   typedef Ptr<ApiConnectRecord> ApiConnectRecordPtr;
@@ -866,7 +873,7 @@ public:
     
     Uint8  distributionKeyIndicator;
     Uint8  m_special_hash; // collation or distribution key
-    Uint8  unused2;
+    Uint8  m_no_disk_flag;
     Uint8  lenAiInTckeyreq;  /* LENGTH OF ATTRIBUTE INFORMATION IN TCKEYREQ */
 
     Uint8  fragmentDistributionKey;  /* DIH generation no */
@@ -877,11 +884,7 @@ public:
      */
     Uint8  opExec;     
 
-    /** 
-     * LOCK TYPE OF OPERATION IF READ OPERATION
-     * 0 = READ LOCK, 1 = WRITE LOCK                    
-     */
-    Uint8  opLock;     
+    Uint8  unused;
 
     /** 
      * IS THE OPERATION A SIMPLE TRANSACTION            
@@ -941,8 +944,7 @@ public:
       NF_CHECK_SCAN        = 0x2,
       NF_CHECK_TRANSACTION = 0x4,
       NF_CHECK_DROP_TAB    = 0x8,
-      NF_NODE_FAIL_BITS    = 0xF, // All bits...
-      NF_STARTED           = 0x10
+      NF_NODE_FAIL_BITS    = 0xF // All bits...
     };
     Uint32 m_nf_bits;
     NdbNodeBitmask m_lqh_trans_conf;
@@ -967,7 +969,8 @@ public:
     Uint8 noOfKeyAttr;
     Uint8 hasCharAttr;
     Uint8 noOfDistrKeys;
-    
+    Uint8 hasVarKeys;
+
     bool checkTable(Uint32 schemaVersion) const {
       return enabled && !dropping && 
 	(table_version_major(schemaVersion) == table_version_major(currentSchemaVersion));
@@ -1262,7 +1265,7 @@ public:
   typedef Ptr<TcFailRecord> TcFailRecordPtr;
 
 public:
-  Dbtc(const class Configuration &);
+  Dbtc(Block_context&);
   virtual ~Dbtc();
 
 private:
@@ -1282,7 +1285,7 @@ private:
   void execLQHKEYREF(Signal* signal);
   void execTRANSID_AI_R(Signal* signal);
   void execKEYINFO20_R(Signal* signal);
-
+  void execROUTE_ORD(Signal* signal);
   // Received signals
   void execDUMP_STATE_ORD(Signal* signal);
   void execSEND_PACKED(Signal* signal);
@@ -1320,7 +1323,6 @@ private:
   void execCOMMITCONF(Signal* signal);
   void execABORTCONF(Signal* signal);
   void execNODE_FAILREP(Signal* signal);
-  void execNODE_START_REP(Signal* signal);
   void execINCL_NODEREQ(Signal* signal);
   void execTIME_SIGNAL(Signal* signal);
   void execAPI_FAILREQ(Signal* signal);

--- 1.73.40.1/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-03-09 00:46:32 +07:00
+++ 1.139/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2007-03-09 01:10:16 +07:00
@@ -37,6 +37,7 @@
 #include <signaldata/TcContinueB.hpp>
 #include <signaldata/TcKeyFailConf.hpp>
 #include <signaldata/AbortAll.hpp>
+#include <signaldata/DihFragCount.hpp>
 #include <signaldata/ScanFrag.hpp>
 #include <signaldata/ScanTab.hpp>
 #include <signaldata/PrepDropTab.hpp>
@@ -70,6 +71,8 @@
 #include <NdbOut.hpp>
 #include <DebuggerNames.hpp>
 
+#include <signaldata/RouteOrd.hpp>
+
 // Use DEBUG to print messages that should be
 // seen only when we debug the product
 #ifdef VM_TRACE
@@ -343,7 +346,7 @@ void Dbtc::execTC_SCHVERREQ(Signal* sign
   tabptr.p->noOfKeyAttr = desc->noOfKeyAttr;
   tabptr.p->hasCharAttr = desc->hasCharAttr;
   tabptr.p->noOfDistrKeys = desc->noOfDistrKeys;
-  
+  tabptr.p->hasVarKeys = desc->noOfVarKeys > 0;
   signal->theData[0] = tabptr.i;
   signal->theData[1] = retPtr;
   sendSignal(retRef, GSN_TC_SCHVERCONF, signal, 2, JBB);
@@ -609,7 +612,7 @@ void Dbtc::execREAD_CONFIG_REQ(Signal* s
   jamEntry();
   
   const ndb_mgm_configuration_iterator * p = 
-    theConfiguration.getOwnConfigIterator();
+    m_ctx.m_config.getOwnConfigIterator();
   ndbrequire(p != 0);
   
   initData();
@@ -1299,6 +1302,7 @@ void Dbtc::execTCRELEASEREQ(Signal* sign
 	   apiConnectptr.p->firstTcConnect == RNIL))
       {
         jam();                                   /* JUST REPLY OK */
+	apiConnectptr.p->m_transaction_nodes.clear();
         releaseApiCon(signal, apiConnectptr.i);
         signal->theData[0] = tuserpointer;
         sendSignal(tapiBlockref,
@@ -2320,14 +2324,15 @@ Dbtc::handle_special_hash(Uint32 dstHash
 {
   Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY];
   const TableRecord* tabPtrP = &tableRecord[tabPtrI];
+  const bool hasVarKeys = tabPtrP->hasVarKeys;
   const bool hasCharAttr = tabPtrP->hasCharAttr;
-  const bool hasDistKeys = tabPtrP->noOfDistrKeys > 0;
+  const bool compute_distkey = distr && (tabPtrP->noOfDistrKeys > 0);
   
   Uint32 *dst = (Uint32*)Tmp;
   Uint32 dstPos = 0;
   Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
   Uint32 * keyPartLenPtr;
-  if(hasCharAttr)
+  if(hasCharAttr || (compute_distkey && hasVarKeys))
   {
     keyPartLenPtr = keyPartLen;
     dstPos = xfrm_key(tabPtrI, src, dst, sizeof(Tmp) >> 2, keyPartLenPtr);
@@ -2345,7 +2350,7 @@ Dbtc::handle_special_hash(Uint32 dstHash
   
   md5_hash(dstHash, (Uint64*)dst, dstPos);
   
-  if(distr && hasDistKeys)
+  if(compute_distkey)
   {
     jam();
     
@@ -2543,7 +2548,7 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
   ApiConnectRecord * const regApiPtr = &localApiConnectRecord[TapiIndex];
   apiConnectptr.p = regApiPtr;
 
-  Uint32 TstartFlag = tcKeyReq->getStartFlag(Treqinfo);
+  Uint32 TstartFlag = TcKeyReq::getStartFlag(Treqinfo);
   Uint32 TexecFlag = TcKeyReq::getExecuteFlag(Treqinfo);
 
   Uint8 isIndexOp = regApiPtr->isIndexOp;
@@ -2713,14 +2718,14 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
   /*                                                                        */
   /* ---------------------------------------------------------------------- */
 
-  UintR TapiVersionNo = tcKeyReq->getAPIVersion(tcKeyReq->attrLen);
+  UintR TapiVersionNo = TcKeyReq::getAPIVersion(tcKeyReq->attrLen);
   UintR Tlqhkeyreqrec = regApiPtr->lqhkeyreqrec;
   regApiPtr->lqhkeyreqrec = Tlqhkeyreqrec + 1;
   regCachePtr->apiVersionNo = TapiVersionNo;
 
   UintR TapiConnectptrIndex = apiConnectptr.i;
   UintR TsenderData = tcKeyReq->senderData;
-  UintR TattrLen = tcKeyReq->getAttrinfoLen(tcKeyReq->attrLen);
+  UintR TattrLen = TcKeyReq::getAttrinfoLen(tcKeyReq->attrLen);
   UintR TattrinfoCount = c_counters.cattrinfoCount;
 
   regTcPtr->apiConnect = TapiConnectptrIndex;
@@ -2746,21 +2751,23 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
 
   UintR TtabptrIndex = localTabptr.i;
   UintR TtableSchemaVersion = tcKeyReq->tableSchemaVersion;
-  Uint8 TOperationType = tcKeyReq->getOperationType(Treqinfo);
+  Uint8 TOperationType = TcKeyReq::getOperationType(Treqinfo);
   regCachePtr->tableref = TtabptrIndex;
   regCachePtr->schemaVersion = TtableSchemaVersion;
   regTcPtr->operation = TOperationType;
 
-  Uint8 TSimpleFlag         = tcKeyReq->getSimpleFlag(Treqinfo);
-  Uint8 TDirtyFlag          = tcKeyReq->getDirtyFlag(Treqinfo);
-  Uint8 TInterpretedFlag    = tcKeyReq->getInterpretedFlag(Treqinfo);
-  Uint8 TDistrKeyFlag       = tcKeyReq->getDistributionKeyFlag(Treqinfo);
+  Uint8 TSimpleFlag         = TcKeyReq::getSimpleFlag(Treqinfo);
+  Uint8 TDirtyFlag          = TcKeyReq::getDirtyFlag(Treqinfo);
+  Uint8 TInterpretedFlag    = TcKeyReq::getInterpretedFlag(Treqinfo);
+  Uint8 TDistrKeyFlag       = TcKeyReq::getDistributionKeyFlag(Treqinfo);
+  Uint8 TNoDiskFlag         = TcKeyReq::getNoDiskFlag(Treqinfo);
   Uint8 TexecuteFlag        = TexecFlag;
   
   regCachePtr->opSimple = TSimpleFlag;
   regCachePtr->opExec   = TInterpretedFlag;
   regTcPtr->dirtyOp  = TDirtyFlag;
   regCachePtr->distributionKeyIndicator = TDistrKeyFlag;
+  regCachePtr->m_no_disk_flag = TNoDiskFlag;
 
   //-------------------------------------------------------------
   // The next step is to read the upto three conditional words.
@@ -2768,10 +2775,10 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
   Uint32 TkeyIndex;
   Uint32* TOptionalDataPtr = (Uint32*)&tcKeyReq->scanInfo;
   {
-    Uint32  TDistrGHIndex    = tcKeyReq->getScanIndFlag(Treqinfo);
+    Uint32  TDistrGHIndex    = TcKeyReq::getScanIndFlag(Treqinfo);
     Uint32  TDistrKeyIndex   = TDistrGHIndex;
 
-    Uint32 TscanInfo = tcKeyReq->getTakeOverScanInfo(TOptionalDataPtr[0]);
+    Uint32 TscanInfo = TcKeyReq::getTakeOverScanInfo(TOptionalDataPtr[0]);
 
     regCachePtr->scanTakeOverInd = TDistrGHIndex;
     regCachePtr->scanInfo = TscanInfo;
@@ -2793,7 +2800,7 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
   regCachePtr->keydata[2] = Tdata3;
   regCachePtr->keydata[3] = Tdata4;
 
-  TkeyLength = tcKeyReq->getKeyLength(Treqinfo);
+  TkeyLength = TcKeyReq::getKeyLength(Treqinfo);
   Uint32 TAIDataIndex;
   if (TkeyLength > 8) {
     TAIDataIndex = TkeyIndex + 8;
@@ -2806,7 +2813,7 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
   }//if
   Uint32* TAIDataPtr = &TOptionalDataPtr[TAIDataIndex];
 
-  titcLenAiInTckeyreq = tcKeyReq->getAIInTcKeyReq(Treqinfo);
+  titcLenAiInTckeyreq = TcKeyReq::getAIInTcKeyReq(Treqinfo);
   regCachePtr->keylen = TkeyLength;
   regCachePtr->lenAiInTckeyreq = titcLenAiInTckeyreq;
   regCachePtr->currReclenAi = titcLenAiInTckeyreq;
@@ -2824,17 +2831,9 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
   regCachePtr->attrinfo15[2] = Tdata4;
   regCachePtr->attrinfo15[3] = Tdata5;
 
-  if (TOperationType == ZREAD) {
-    Uint32 TreadCount = c_counters.creadCount;
-    jam();
-    regCachePtr->opLock = 0;
-    c_counters.creadCount = TreadCount + 1;
-  } else if(TOperationType == ZREAD_EX){
+  if (TOperationType == ZREAD || TOperationType == ZREAD_EX) {
     Uint32 TreadCount = c_counters.creadCount;
     jam();
-    TOperationType = ZREAD;
-    regTcPtr->operation = ZREAD;
-    regCachePtr->opLock = ZUPDATE;
     c_counters.creadCount = TreadCount + 1;
   } else {
     if(regApiPtr->commitAckMarker == RNIL){
@@ -2851,6 +2850,12 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
         tmp.p->apiNodeId     = refToNode(regApiPtr->ndbapiBlockref);
         tmp.p->apiConnectPtr = TapiIndex;
         tmp.p->noOfLqhs      = 0;
+#if defined VM_TRACE || defined ERROR_INSERT
+	{
+	  CommitAckMarkerPtr check;
+	  ndbrequire(!m_commitAckMarkerHash.find(check, *tmp.p));
+	}
+#endif
         m_commitAckMarkerHash.add(tmp);
       }
     }
@@ -2868,24 +2873,10 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
     c_counters.cwriteCount = TwriteCount + 1;
     switch (TOperationType) {
     case ZUPDATE:
-      jam();
-      if (TattrLen == 0) {
-        //TCKEY_abort(signal, 5);
-        //return;
-      }//if
-      /*---------------------------------------------------------------------*/
-      // The missing break is intentional since we also want to set the opLock 
-      // variable also for updates
-      /*---------------------------------------------------------------------*/
     case ZINSERT:
     case ZDELETE:
-      jam();      
-      regCachePtr->opLock = TOperationType;
-      break;
     case ZWRITE:
       jam();
-      // A write operation is originally an insert operation.
-      regCachePtr->opLock = ZINSERT;  
       break;
     default:
       TCKEY_abort(signal, 9);
@@ -2893,14 +2884,14 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
     }//switch
   }//if
   
-  Uint32 TabortOption = tcKeyReq->getAbortOption(Treqinfo);
+  Uint32 TabortOption = TcKeyReq::getAbortOption(Treqinfo);
   regTcPtr->m_execAbortOption = TabortOption;
   
   /*-------------------------------------------------------------------------
    * Check error handling per operation
    * If CommitFlag is set state accordingly and check for early abort
    *------------------------------------------------------------------------*/
-  if (tcKeyReq->getCommitFlag(Treqinfo) == 1) {
+  if (TcKeyReq::getCommitFlag(Treqinfo) == 1) {
     ndbrequire(TexecuteFlag);
     regApiPtr->apiConnectstate = CS_REC_COMMITTING;
   } else {
@@ -3060,7 +3051,7 @@ void Dbtc::tckeyreq050Lab(Signal* signal
   tnoOfStandby = (tnodeinfo >> 8) & 3;
  
   regCachePtr->fragmentDistributionKey = (tnodeinfo >> 16) & 255;
-  if (Toperation == ZREAD) {
+  if (Toperation == ZREAD || Toperation == ZREAD_EX) {
     if (Tdirty == 1) {
       jam();
       /*-------------------------------------------------------------*/
@@ -3093,28 +3084,7 @@ void Dbtc::tckeyreq050Lab(Signal* signal
 	  }//if
 	}//for
       }
-
-      if (regTcPtr->tcNodedata[0] != getOwnNodeId())
-      {
-	jam();
-	for (Uint32 i = 0; i < tnoOfBackup + 1; i++)
-	{
-	  HostRecordPtr hostPtr;
-	  hostPtr.i = regTcPtr->tcNodedata[i];
-	  ptrCheckGuard(hostPtr, chostFilesize, hostRecord);
-	  if (hostPtr.p->m_nf_bits & HostRecord::NF_STARTED)
-	  {
-	    jam();
-	    if (i != 0)
-	    {
-	      jam();
-	      regTcPtr->tcNodedata[0] = hostPtr.i;
-	    }
-	    break;
-	  }
-	}
-      }//if
-    }
+    }//if
     jam();
     regTcPtr->lastReplicaNo = 0;
     regTcPtr->noOfNodes = 1;
@@ -3210,6 +3180,7 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
   TcConnectRecord * const regTcPtr = tcConnectptr.p;
   ApiConnectRecord * const regApiPtr = apiConnectptr.p;
   CacheRecord * const regCachePtr = cachePtr.p;
+  Uint32 version = getNodeInfo(refToNode(TBRef)).m_version;
   UintR sig0, sig1, sig2, sig3, sig4, sig5, sig6;
 #ifdef ERROR_INSERT
   if (ERROR_INSERTED(8002)) {
@@ -3253,7 +3224,12 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
   bool simpleRead = (sig1 == ZREAD && sig0 == ZTRUE);
   LqhKeyReq::setKeyLen(Tdata10, regCachePtr->keylen);
   LqhKeyReq::setLastReplicaNo(Tdata10, regTcPtr->lastReplicaNo);
-  LqhKeyReq::setLockType(Tdata10, regCachePtr->opLock);
+  if (unlikely(version < NDBD_ROWID_VERSION))
+  {
+    Uint32 op = regTcPtr->operation;
+    Uint32 lock = (Operation_t) op == ZREAD_EX ? ZUPDATE : (Operation_t) op == ZWRITE ?
ZINSERT : (Operation_t) op;
+    LqhKeyReq::setLockType(Tdata10, lock);
+  }
   /* ---------------------------------------------------------------------- */
   // Indicate Application Reference is present in bit 15
   /* ---------------------------------------------------------------------- */
@@ -3262,6 +3238,8 @@ void Dbtc::sendlqhkeyreq(Signal* signal,
   LqhKeyReq::setInterpretedFlag(Tdata10, regCachePtr->opExec);
   LqhKeyReq::setSimpleFlag(Tdata10, sig0);
   LqhKeyReq::setOperation(Tdata10, sig1);
+  LqhKeyReq::setNoDiskFlag(Tdata10, regCachePtr->m_no_disk_flag);
+
   /* ----------------------------------------------------------------------- 
    * Sequential Number of first LQH = 0, bit 22-23                           
    * IF ATTRIBUTE INFORMATION IS SENT IN TCKEYREQ,
@@ -3979,7 +3957,7 @@ void Dbtc::sendtckeyconf(Signal* signal,
   const UintR TopWords = (UintR)regApiPtr->tckeyrec;
   localHostptr.i = refToNode(regApiPtr->ndbapiBlockref);
   const Uint32 type = getNodeInfo(localHostptr.i).m_type;
-  const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
+  const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::MGM);
   const BlockNumber TblockNum = refToBlock(regApiPtr->ndbapiBlockref);
   const Uint32 Tmarker = (regApiPtr->commitAckMarker == RNIL) ? 0 : 1;
   ptrAss(localHostptr, hostRecord);
@@ -4661,7 +4639,8 @@ void Dbtc::sendApiCommit(Signal* signal)
     commitConf->transId1 = regApiPtr->transid[0];
     commitConf->transId2 = regApiPtr->transid[1];
     commitConf->gci = regApiPtr->globalcheckpointid;
-    sendSignal(regApiPtr->ndbapiBlockref, GSN_TC_COMMITCONF, signal, 
+
+    sendSignal(regApiPtr->ndbapiBlockref, GSN_TC_COMMITCONF, signal,
 	       TcCommitConf::SignalLength, JBB);
   } else if (regApiPtr->returnsignal == RS_NO_RETURN) {
     jam();
@@ -4851,13 +4830,14 @@ Dbtc::execTC_COMMIT_ACK(Signal* signal){
   key.transid2 = signal->theData[1];
 
   CommitAckMarkerPtr removedMarker;
-  m_commitAckMarkerHash.release(removedMarker, key);
+  m_commitAckMarkerHash.remove(removedMarker, key);
   if (removedMarker.i == RNIL) {
     jam();
     warningHandlerLab(signal, __LINE__);
     return;
   }//if
   sendRemoveMarkers(signal, removedMarker.p);
+  m_commitAckMarkerPool.release(removedMarker);
 }
 
 void
@@ -5202,6 +5182,19 @@ void Dbtc::execLQHKEYREF(Signal* signal)
 	return;
       }
       
+      /* Only ref in certain situations */
+      {
+	const Uint32 opType = regTcPtr->operation;
+	if (   (opType == ZDELETE && errCode != ZNOT_FOUND)
+	    || (opType == ZINSERT && errCode != ZALREADYEXIST)
+	    || (opType == ZUPDATE && errCode != ZNOT_FOUND)
+	    || (opType == ZWRITE  && errCode != 839 && errCode != 840))
+	{
+	  TCKEY_abort(signal, 49);
+	  return;
+	}
+      }
+
       /* *************** */
       /*    TCKEYREF   < */
       /* *************** */
@@ -7108,19 +7101,6 @@ void Dbtc::execNODE_FAILREP(Signal* sign
 }//Dbtc::execNODE_FAILREP()
 
 void
-Dbtc::execNODE_START_REP(Signal* signal)
-{
-  Uint32 nodeId = signal->theData[0];
-  hostptr.i = nodeId;
-  ptrCheckGuard(hostptr, chostFilesize, hostRecord);
-  if (hostptr.p->m_nf_bits == 0)
-  {
-    jam();
-    hostptr.p->m_nf_bits |= HostRecord::NF_STARTED;
-  }
-}
-
-void
 Dbtc::checkNodeFailComplete(Signal* signal, 
 			    Uint32 failedNodeId,
 			    Uint32 bit)
@@ -7208,15 +7188,20 @@ Dbtc::nodeFailCheckTransactions(Signal* 
   for (transPtr.i = transPtrI; transPtr.i < capiConnectFilesize; transPtr.i++)
   {
     ptrCheckGuard(transPtr, capiConnectFilesize, apiConnectRecord); 
+    Uint32 state = transPtr.p->apiConnectstate;
     if (transPtr.p->m_transaction_nodes.get(failedNodeId))
     {
       jam();
-
-      // Force timeout regardless of state      
-      c_appl_timeout_value = 1;
-      setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__);
-      timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT);
-      c_appl_timeout_value = TapplTimeout;
+      
+      // avoid assertion in timeoutfoundlab
+      if (state != CS_PREPARE_TO_COMMIT)
+      {
+	// Force timeout regardless of state      
+	c_appl_timeout_value = 1;
+	setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__);
+	timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT);
+	c_appl_timeout_value = TapplTimeout;
+      }
     }
     
     // Send CONTINUEB to continue later
@@ -8218,6 +8203,13 @@ void Dbtc::initApiConnectFail(Signal* si
     tmp.p->noOfLqhs      = 1;
     tmp.p->lqhNodeId[0]  = tnodeid;
     tmp.p->apiConnectPtr = apiConnectptr.i;
+
+#if defined VM_TRACE || defined ERROR_INSERT
+    {
+      CommitAckMarkerPtr check;
+      ndbrequire(!m_commitAckMarkerHash.find(check, *tmp.p));
+    }
+#endif
     m_commitAckMarkerHash.add(tmp);
   } 
 }//Dbtc::initApiConnectFail()
@@ -8374,6 +8366,12 @@ void Dbtc::updateApiStateFail(Signal* si
       tmp.p->noOfLqhs      = 1;
       tmp.p->lqhNodeId[0]  = tnodeid;
       tmp.p->apiConnectPtr = apiConnectptr.i;
+#if defined VM_TRACE || defined ERROR_INSERT
+      {
+	CommitAckMarkerPtr check;
+	ndbrequire(!m_commitAckMarkerHash.find(check, *tmp.p));
+      }
+#endif
       m_commitAckMarkerHash.add(tmp);
     } else {
       jam();
@@ -8918,6 +8916,7 @@ void Dbtc::initScanrec(ScanRecordPtr sca
   ScanFragReq::setDescendingFlag(tmp, ScanTabReq::getDescendingFlag(ri));
   ScanFragReq::setTupScanFlag(tmp, ScanTabReq::getTupScanFlag(ri));
   ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
+  ScanFragReq::setNoDiskFlag(tmp, ScanTabReq::getNoDiskFlag(ri));
   
   scanptr.p->scanRequestInfo = tmp;
   scanptr.p->scanStoredProcId = scanTabReq->storedProcId;
@@ -9039,9 +9038,11 @@ void Dbtc::diFcountReqLab(Signal* signal
      * THE FIRST STEP TO RECEIVE IS SUCCESSFULLY COMPLETED. 
      * WE MUST FIRST GET THE NUMBER OF  FRAGMENTS IN THE TABLE.
      ***************************************************/
-    signal->theData[0] = tcConnectptr.p->dihConnectptr;
-    signal->theData[1] = scanptr.p->scanTableref;
-    sendSignal(cdihblockref, GSN_DI_FCOUNTREQ, signal, 2, JBB);
+    DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend();
+    req->m_connectionData = tcConnectptr.p->dihConnectptr;
+    req->m_tableRef = scanptr.p->scanTableref;
+    sendSignal(cdihblockref, GSN_DI_FCOUNTREQ, signal, 
+               DihFragCountReq::SignalLength, JBB);
   }
   else 
   {
@@ -9052,17 +9053,18 @@ void Dbtc::diFcountReqLab(Signal* signal
     UintR TerrorIndicator = signal->theData[0];
     jamEntry();
     if (TerrorIndicator != 0) {
-      signal->theData[0] = tcConnectptr.i;
-      //signal->theData[1] Contains error
+      DihFragCountRef * const ref = (DihFragCountRef*)signal->getDataPtr();
+      ref->m_connectionData = tcConnectptr.i;
+      ref->m_error = signal->theData[1];
       execDI_FCOUNTREF(signal);
       return;
     }
     
     UintR Tdata1 = signal->theData[1];
     scanptr.p->scanNextFragId = Tdata1;
-
-    signal->theData[0] = tcConnectptr.i;
-    signal->theData[1] = 1; // Frag count
+    DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
+    conf->m_connectionData = tcConnectptr.i;
+    conf->m_fragmentCount = 1; // Frag count
     execDI_FCOUNTCONF(signal);
   }
   return;
@@ -9080,8 +9082,9 @@ void Dbtc::diFcountReqLab(Signal* signal
 void Dbtc::execDI_FCOUNTCONF(Signal* signal) 
 {
   jamEntry();
-  tcConnectptr.i = signal->theData[0];
-  Uint32 tfragCount = signal->theData[1];
+  DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
+  tcConnectptr.i = conf->m_connectionData;
+  Uint32 tfragCount = conf->m_fragmentCount;
   ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
   apiConnectptr.i = tcConnectptr.p->apiConnect;
   ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
@@ -9165,9 +9168,10 @@ void Dbtc::execDI_FCOUNTCONF(Signal* sig
 void Dbtc::execDI_FCOUNTREF(Signal* signal) 
 {
   jamEntry();
-  tcConnectptr.i = signal->theData[0];
+  DihFragCountRef * const ref = (DihFragCountRef*)signal->getDataPtr();
+  tcConnectptr.i = ref->m_connectionData;
   ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
-  const Uint32 errCode = signal->theData[1];
+  const Uint32 errCode = ref->m_error;
   apiConnectptr.i = tcConnectptr.p->apiConnect;
   ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
   ScanRecordPtr scanptr;
@@ -10322,6 +10326,7 @@ void Dbtc::initTable(Signal* signal) 
     tabptr.p->noOfKeyAttr = 0;
     tabptr.p->hasCharAttr = 0;
     tabptr.p->noOfDistrKeys = 0;
+    tabptr.p->hasVarKeys = 0;
   }//for
 }//Dbtc::initTable()
 
@@ -11356,7 +11361,6 @@ void Dbtc::execFIRE_TRIG_ORD(Signal* sig
   ApiConnectRecordPtr transPtr;
   TcConnectRecord *localTcConnectRecord = tcConnectRecord;
   TcConnectRecordPtr opPtr;
-
   /**
    * TODO
    * Check transid,
@@ -11370,6 +11374,7 @@ void Dbtc::execFIRE_TRIG_ORD(Signal* sig
     
     c_firedTriggerHash.remove(trigPtr);
 
+    trigPtr.p->fragId= fireOrd->fragId;
     bool ok = trigPtr.p->keyValues.getSize() == fireOrd->m_noPrimKeyWords;
     ok &= trigPtr.p->afterValues.getSize() == fireOrd->m_noAfterValueWords;
     ok &= trigPtr.p->beforeValues.getSize() == fireOrd->m_noBeforeValueWords;
@@ -11558,7 +11563,7 @@ void Dbtc::execTCINDXREQ(Signal* signal)
   // If operation is readTupleExclusive or updateTuple then read index 
   // table with exclusive lock
   Uint32 indexLength = TcKeyReq::getKeyLength(tcIndxRequestInfo);
-  Uint32 attrLength = tcIndxReq->attrLen;
+  Uint32 attrLength = TcKeyReq::getAttrinfoLen(tcIndxReq->attrLen);
   indexOp->expectedKeyInfo = indexLength;
   Uint32 includedIndexLength = MIN(indexLength, indexBufSize);
   indexOp->expectedAttrInfo = attrLength;
@@ -11592,7 +11597,7 @@ void Dbtc::sendTcIndxConf(Signal* signal
   const UintR TopWords = (UintR)regApiPtr->tcindxrec;
   localHostptr.i = refToNode(regApiPtr->ndbapiBlockref);
   const Uint32 type = getNodeInfo(localHostptr.i).m_type;
-  const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
+  const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::MGM);
   const BlockNumber TblockNum = refToBlock(regApiPtr->ndbapiBlockref);
   const Uint32 Tmarker = (regApiPtr->commitAckMarker == RNIL ? 0 : 1);
   ptrAss(localHostptr, hostRecord);
@@ -12289,7 +12294,11 @@ void Dbtc::executeIndexOperation(Signal*
   Uint32 dataPos = 0;
   TcKeyReq * const tcIndxReq = &indexOp->tcIndxReq;
   TcKeyReq * const tcKeyReq = (TcKeyReq *)signal->getDataPtrSend();
-  Uint32 * dataPtr = &tcKeyReq->scanInfo;
+  /*
+    Data points to distrGroupHashValue since scanInfo is used to send
+    fragment id of receiving fragment
+  */
+  Uint32 * dataPtr = &tcKeyReq->distrGroupHashValue;
   Uint32 tcKeyLength = TcKeyReq::StaticLength;
   Uint32 tcKeyRequestInfo = tcIndxReq->requestInfo;
   TcIndexData* indexData;
@@ -12328,11 +12337,16 @@ void Dbtc::executeIndexOperation(Signal*
   regApiPtr->executingIndexOp = indexOp->indexOpId;;
   regApiPtr->noIndexOp++; // Increase count
 
-  // Filter out AttributeHeader:s since this should not be in key
+  /*
+    Filter out AttributeHeader:s since this should not be in key.
+    Also filter out fragment id from primary key and handle that
+    separately by setting it as Distribution Key and set indicator.
+  */
+
   AttributeHeader* attrHeader = (AttributeHeader *) aiIter.data;
     
   Uint32 headerSize = attrHeader->getHeaderSize();
-  Uint32 keySize = attrHeader->getDataSize();
+  Uint32 keySize = attrHeader->getDataSize() - 1;
   TcKeyReq::setKeyLength(tcKeyRequestInfo, keySize);
   // Skip header
   if (headerSize == 1) {
@@ -12342,6 +12356,9 @@ void Dbtc::executeIndexOperation(Signal*
     jam();
     moreKeyData = indexOp->transIdAI.next(aiIter, headerSize - 1);
   }//if
+  tcKeyReq->scanInfo = *aiIter.data; //Fragment Id
+  moreKeyData = indexOp->transIdAI.next(aiIter);
+  TcKeyReq::setDistributionKeyFlag(tcKeyRequestInfo, 1U);
   while(// If we have not read complete key
 	(keySize != 0) &&
 	(dataPos < keyBufSize)) {
@@ -12607,7 +12624,7 @@ void Dbtc::executeTriggers(Signal* signa
 	  tmp2.release();
 	  LocalDataBuffer<11> tmp3(pool, trigPtr.p->afterValues);
 	  tmp3.release();
-          regApiPtr->theFiredTriggers.release(trigPtr.i);
+          regApiPtr->theFiredTriggers.release(trigPtr);
         }
 	trigPtr = nextTrigPtr;
       }
@@ -12724,7 +12741,7 @@ void Dbtc::insertIntoIndexTable(Signal* 
   AttributeBuffer::DataBufferIterator iter;
   Uint32 attrId = 0;
   Uint32 keyLength = 0;
-  Uint32 totalPrimaryKeyLength = 0;
+  Uint32 totalPrimaryKeyLength = 1; // fragment length
   Uint32 hops;
 
   indexTabPtr.i = indexData->indexId;
@@ -12777,11 +12794,12 @@ void Dbtc::insertIntoIndexTable(Signal* 
     hops = attrHeader->getHeaderSize() + attrHeader->getDataSize();
     moreAttrData = keyValues.next(iter, hops);
   }
-  AttributeHeader pkAttrHeader(attrId, totalPrimaryKeyLength);
+  AttributeHeader pkAttrHeader(attrId, totalPrimaryKeyLength << 2);
+  Uint32 attributesLength = afterValues.getSize() + 
+    pkAttrHeader.getHeaderSize() + pkAttrHeader.getDataSize();
   
   TcKeyReq::setKeyLength(tcKeyRequestInfo, keyLength);
-  tcKeyReq->attrLen = afterValues.getSize() + 
-    pkAttrHeader.getHeaderSize() + pkAttrHeader.getDataSize();
+  tcKeyReq->attrLen = attributesLength;
   tcKeyReq->tableId = indexData->indexId;
   TcKeyReq::setOperationType(tcKeyRequestInfo, ZINSERT);
   TcKeyReq::setExecutingTrigger(tcKeyRequestInfo, true);
@@ -12831,8 +12849,11 @@ void Dbtc::insertIntoIndexTable(Signal* 
   }
 
   tcKeyLength += dataPos;
-  Uint32 attributesLength = afterValues.getSize() + 
-    pkAttrHeader.getHeaderSize() + pkAttrHeader.getDataSize();
+  /*
+    Size of attrinfo is unique index attributes one by one, header for each
+    of them (all contained in the afterValues data structure), plus a header,
+    the primary key (compacted) and the fragment id before the primary key
+  */
   if (attributesLength <= attrBufSize) {
     jam();
     // ATTRINFO fits in TCKEYREQ
@@ -12849,6 +12870,10 @@ void Dbtc::insertIntoIndexTable(Signal* 
     // as one attribute
     pkAttrHeader.insertHeader(dataPtr);
     dataPtr += pkAttrHeader.getHeaderSize();
+    /*
+      Insert fragment id before primary key as part of reference to tuple
+    */
+    *dataPtr++ = firedTriggerData->fragId;
     moreAttrData = keyValues.first(iter);
     while(moreAttrData) {
       jam();
@@ -13044,6 +13069,29 @@ void Dbtc::insertIntoIndexTable(Signal* 
     pkAttrHeader.insertHeader(dataPtr);
     dataPtr += pkAttrHeader.getHeaderSize();
     attrInfoPos += pkAttrHeader.getHeaderSize();
+    /*
+      Add fragment id before primary key
+      TODO: This code really needs to be made into a long signal
+      to remove this messy code.
+    */
+    if (attrInfoPos == AttrInfo::DataLength)
+    {
+      jam();
+      // Flush ATTRINFO
+#if INTERNAL_TRIGGER_TCKEYREQ_JBA
+      sendSignal(reference(), GSN_ATTRINFO, signal, 
+                 AttrInfo::HeaderLength + AttrInfo::DataLength, JBA);
+#else
+      EXECUTE_DIRECT(DBTC, GSN_ATTRINFO, signal,
+                     AttrInfo::HeaderLength + AttrInfo::DataLength);
+      jamEntry();
+#endif
+      dataPtr = (Uint32 *) &attrInfo->attrData;	  
+      attrInfoPos = 0;
+    }
+    attrInfoPos++;
+    *dataPtr++ = firedTriggerData->fragId;
+
     moreAttrData = keyValues.first(iter);
     while(moreAttrData) {
       jam();
@@ -13344,3 +13392,55 @@ Dbtc::TableRecord::getErrorCode(Uint32 s
   return 0;
 }
 
+void
+Dbtc::execROUTE_ORD(Signal* signal)
+{
+  jamEntry();
+  if(!assembleFragments(signal)){
+    jam();
+    return;
+  }
+
+  RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
+  Uint32 dstRef = ord->dstRef;
+  Uint32 srcRef = ord->srcRef;
+  Uint32 gsn = ord->gsn;
+
+  if (likely(getNodeInfo(refToNode(dstRef)).m_connected))
+  {
+    jam();
+    Uint32 secCount = signal->getNoOfSections();
+    SegmentedSectionPtr ptr[3];
+    ndbrequire(secCount >= 1 && secCount <= 3);
+
+    jamLine(secCount);
+    for (Uint32 i = 0; i<secCount; i++)
+      signal->getSection(ptr[i], i);
+
+    /**
+     * Put section 0 in signal->theData
+     */
+    ndbrequire(ptr[0].sz <= 25);
+    copy(signal->theData, ptr[0]);
+
+    signal->header.m_noOfSections = 0;
+    
+    /**
+     * Shift rest of sections
+     */
+    for(Uint32 i = 1; i<secCount; i++)
+    {
+      signal->setSection(ptr[i], i - 1);
+    }
+
+    sendSignal(dstRef, gsn, signal, ptr[0].sz, JBB);
+
+    signal->header.m_noOfSections = 0;
+    signal->setSection(ptr[0], 0);
+    releaseSections(signal);
+    return ;
+  }
+
+  warningEvent("Unable to route GSN: %d from %x to %x",
+	       gsn, srcRef, dstRef);
+}

--- 1.49.24.1/ndb/src/ndbapi/Ndb.cpp	2007-03-09 00:46:32 +07:00
+++ 1.90/storage/ndb/src/ndbapi/Ndb.cpp	2007-03-09 01:10:16 +07:00
@@ -27,6 +27,8 @@ Name:          Ndb.cpp
 #include "NdbImpl.hpp"
 #include <NdbOperation.hpp>
 #include <NdbTransaction.hpp>
+#include <NdbEventOperation.hpp>
+#include <NdbEventOperationImpl.hpp>
 #include <NdbRecAttr.hpp>
 #include <md5_hash.hpp>
 #include <NdbSleep.h>
@@ -142,7 +144,7 @@ Ndb::NDB_connect(Uint32 tNode) 
 //***************************************************************************
   
   int	         tReturnCode;
-  TransporterFacade *tp = TransporterFacade::instance();
+  TransporterFacade *tp = theImpl->m_transporter_facade;
 
   DBUG_ENTER("Ndb::NDB_connect");
 
@@ -177,23 +179,9 @@ Ndb::NDB_connect(Uint32 tNode) 
   tSignal->setData(theMyRef, 2);	// Set my block reference
   tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
   Uint32 nodeSequence;
-  { // send and receive signal
-    Guard guard(tp->theMutexPtr);
-    nodeSequence = tp->getNodeSequence(tNode);
-    bool node_is_alive = tp->get_node_alive(tNode);
-    if (node_is_alive) { 
-      tReturnCode = tp->sendSignal(tSignal, tNode);  
-      releaseSignal(tSignal); 
-      if (tReturnCode != -1) {
-        theImpl->theWaiter.m_node = tNode;  
-        theImpl->theWaiter.m_state = WAIT_TC_SEIZE;  
-        tReturnCode = receiveResponse(); 
-      }//if
-    } else {
-      releaseSignal(tSignal);
-      tReturnCode = -1;
-    }//if
-  }
+  tReturnCode= sendRecSignal(tNode, WAIT_TC_SEIZE, tSignal,
+                             0, &nodeSequence);
+  releaseSignal(tSignal); 
   if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected))
{
     //************************************************
     // Send and receive was successful
@@ -239,10 +227,9 @@ Remark:        Disconnect all connection
 void 
 Ndb::doDisconnect()
 {
+  DBUG_ENTER("Ndb::doDisconnect");
   NdbTransaction* tNdbCon;
   CHECK_STATUS_MACRO_VOID;
-  /* DBUG_ENTER must be after CHECK_STATUS_MACRO_VOID because of 'return' */
-  DBUG_ENTER("Ndb::doDisconnect");
 
   Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes;
   Uint8 *theDBnodes= theImpl->theDBnodes;
@@ -598,6 +585,7 @@ Ndb::NdbTamper(TamperType aAction, int a
 #ifdef CUSTOMER_RELEASE
   return -1;
 #else
+  DBUG_ENTER("Ndb::NdbTamper");
   CHECK_STATUS_MACRO;
   checkFailedNode();
 
@@ -619,20 +607,20 @@ Ndb::NdbTamper(TamperType aAction, int a
 	break;
      default:
         theError.code = 4102;
-        return -1;
+        DBUG_RETURN(-1);
   }
 
   tNdbConn = getNdbCon();	// Get free connection object
   if (tNdbConn == NULL) {
     theError.code = 4000;
-    return -1;
+    DBUG_RETURN(-1);
   }
   tSignal.setSignal(GSN_DIHNDBTAMPER);
   tSignal.setData (tAction, 1);
   tSignal.setData(tNdbConn->ptr2int(),2);
   tSignal.setData(theMyRef,3);		// Set return block reference
   tNdbConn->Status(NdbTransaction::Connecting); // Set status to connecting
-  TransporterFacade *tp = TransporterFacade::instance();
+  TransporterFacade *tp = theImpl->m_transporter_facade;
   if (tAction == 3) {
     tp->lock_mutex();
     tp->sendSignal(&tSignal, aNode);
@@ -644,12 +632,12 @@ Ndb::NdbTamper(TamperType aAction, int a
     if (tNode == 0) {
       theError.code = 4002;
       releaseNdbCon(tNdbConn);
-      return -1;
+      DBUG_RETURN(-1);
     }//if
     ret_code = tp->sendSignal(&tSignal,aNode);
     tp->unlock_mutex();
     releaseNdbCon(tNdbConn);
-    return ret_code;
+    DBUG_RETURN(ret_code);
   } else {
     do {
       tp->lock_mutex();
@@ -660,7 +648,7 @@ Ndb::NdbTamper(TamperType aAction, int a
       if (tNode == 0) {
         theError.code = 4009;
         releaseNdbCon(tNdbConn);
-        return -1;
+        DBUG_RETURN(-1);
       }//if
       ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0);
       if (ret_code == 0) {  
@@ -668,15 +656,15 @@ Ndb::NdbTamper(TamperType aAction, int a
           theRestartGCI = 0;
         }//if
         releaseNdbCon(tNdbConn);
-        return theRestartGCI;
+        DBUG_RETURN(theRestartGCI);
       } else if ((ret_code == -5) || (ret_code == -2)) {
         TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping");
       } else {
-        return -1;
+        DBUG_RETURN(-1);
       }//if
     } while (1);
   }
-  return 0;
+  DBUG_RETURN(0);
 #endif
 }
 #if 0
@@ -775,15 +763,18 @@ Ndb::getAutoIncrementValue(const char* a
                            Uint64 & tupleId, Uint32 cacheSize)
 {
   DBUG_ENTER("Ndb::getAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   BaseString internal_tabname(internalize_table_name(aTableName));
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (getTupleIdFromNdb(info, tupleId, cacheSize) == -1)
+  const NdbTableImpl* table = info->m_table_impl;
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong) tupleId));
   DBUG_RETURN(0);
@@ -794,31 +785,48 @@ Ndb::getAutoIncrementValue(const NdbDict
                            Uint64 & tupleId, Uint32 cacheSize)
 {
   DBUG_ENTER("Ndb::getAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   assert(aTable != 0);
   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
   const BaseString& internal_tabname = table->m_internalName;
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (getTupleIdFromNdb(info, tupleId, cacheSize) == -1)
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
+    DBUG_RETURN(-1);
+  DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+  DBUG_RETURN(0);
+}
+
+int
+Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
+                           TupleIdRange & range, Uint64 & tupleId,
+                           Uint32 cacheSize)
+{
+  DBUG_ENTER("Ndb::getAutoIncrementValue");
+  assert(aTable != 0);
+  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+  if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
   DBUG_RETURN(0);
 }
 
 int
-Ndb::getTupleIdFromNdb(Ndb_local_table_info* info,
-                       Uint64 & tupleId, Uint32 cacheSize)
+Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
+                       TupleIdRange & range, Uint64 & tupleId, Uint32 cacheSize)
 {
   DBUG_ENTER("Ndb::getTupleIdFromNdb");
-  if (info->m_first_tuple_id != info->m_last_tuple_id)
+  if (range.m_first_tuple_id != range.m_last_tuple_id)
   {
-    assert(info->m_first_tuple_id < info->m_last_tuple_id);
-    tupleId = ++info->m_first_tuple_id;
+    assert(range.m_first_tuple_id < range.m_last_tuple_id);
+    tupleId = ++range.m_first_tuple_id;
     DBUG_PRINT("info", ("next cached value %lu", (ulong)tupleId));
   }
   else
@@ -831,7 +839,7 @@ Ndb::getTupleIdFromNdb(Ndb_local_table_i
      * and returns first tupleId in the new range.
      */
     Uint64 opValue = cacheSize;
-    if (opTupleIdOnNdb(info, opValue, 0) == -1)
+    if (opTupleIdOnNdb(table, range, opValue, 0) == -1)
       DBUG_RETURN(-1);
     tupleId = opValue;
   }
@@ -843,15 +851,18 @@ Ndb::readAutoIncrementValue(const char* 
                             Uint64 & tupleId)
 {
   DBUG_ENTER("Ndb::readAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   BaseString internal_tabname(internalize_table_name(aTableName));
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (readTupleIdFromNdb(info, tupleId) == -1)
+  const NdbTableImpl* table = info->m_table_impl;
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (readTupleIdFromNdb(table, range, tupleId) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
   DBUG_RETURN(0);
@@ -862,31 +873,47 @@ Ndb::readAutoIncrementValue(const NdbDic
                             Uint64 & tupleId)
 {
   DBUG_ENTER("Ndb::readAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   assert(aTable != 0);
   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
   const BaseString& internal_tabname = table->m_internalName;
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (readTupleIdFromNdb(info, tupleId) == -1)
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (readTupleIdFromNdb(table, range, tupleId) == -1)
     DBUG_RETURN(-1);
   DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
   DBUG_RETURN(0);
 }
 
 int
-Ndb::readTupleIdFromNdb(Ndb_local_table_info* info,
-                        Uint64 & tupleId)
+Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
+                            TupleIdRange & range, Uint64 & tupleId)
+{
+  DBUG_ENTER("Ndb::readAutoIncrementValue");
+  assert(aTable != 0);
+  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+  if (readTupleIdFromNdb(table, range, tupleId) == -1)
+    DBUG_RETURN(-1);
+  DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+  DBUG_RETURN(0);
+}
+
+int
+Ndb::readTupleIdFromNdb(const NdbTableImpl* table,
+                        TupleIdRange & range, Uint64 & tupleId)
 {
   DBUG_ENTER("Ndb::readTupleIdFromNdb");
-  if (info->m_first_tuple_id != info->m_last_tuple_id)
+  if (range.m_first_tuple_id != range.m_last_tuple_id)
   {
-    assert(info->m_first_tuple_id < info->m_last_tuple_id);
-    tupleId = info->m_first_tuple_id + 1;
+    assert(range.m_first_tuple_id < range.m_last_tuple_id);
+    tupleId = range.m_first_tuple_id + 1;
   }
   else
   {
@@ -895,7 +922,7 @@ Ndb::readTupleIdFromNdb(Ndb_local_table_
      * only if no other transactions are allowed.
      */
     Uint64 opValue = 0;
-    if (opTupleIdOnNdb(info, opValue, 3) == -1)
+    if (opTupleIdOnNdb(table, range, opValue, 3) == -1)
       DBUG_RETURN(-1);
     tupleId = opValue;
   }
@@ -907,15 +934,18 @@ Ndb::setAutoIncrementValue(const char* a
                            Uint64 tupleId, bool increase)
 {
   DBUG_ENTER("Ndb::setAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   BaseString internal_tabname(internalize_table_name(aTableName));
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (setTupleIdInNdb(info, tupleId, increase) == -1)
+  const NdbTableImpl* table = info->m_table_impl;
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
     DBUG_RETURN(-1);
   DBUG_RETURN(0);
 }
@@ -925,36 +955,52 @@ Ndb::setAutoIncrementValue(const NdbDict
                            Uint64 tupleId, bool increase)
 {
   DBUG_ENTER("Ndb::setAutoIncrementValue");
+  ASSERT_NOT_MYSQLD;
   assert(aTable != 0);
   const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
   const BaseString& internal_tabname = table->m_internalName;
 
   Ndb_local_table_info *info=
-    theDictionary->get_local_table_info(internal_tabname, false);
+    theDictionary->get_local_table_info(internal_tabname);
   if (info == 0) {
     theError.code = theDictionary->getNdbError().code;
     DBUG_RETURN(-1);
   }
-  if (setTupleIdInNdb(info, tupleId, increase) == -1)
+  TupleIdRange & range = info->m_tuple_id_range;
+  if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
+                           TupleIdRange & range, Uint64 tupleId,
+                           bool increase)
+{
+  DBUG_ENTER("Ndb::setAutoIncrementValue");
+  assert(aTable != 0);
+  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+
+  if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
     DBUG_RETURN(-1);
   DBUG_RETURN(0);
 }
 
 int
-Ndb::setTupleIdInNdb(Ndb_local_table_info* info,
-                     Uint64 tupleId, bool increase)
+Ndb::setTupleIdInNdb(const NdbTableImpl* table,
+                     TupleIdRange & range, Uint64 tupleId, bool increase)
 {
   DBUG_ENTER("Ndb::setTupleIdInNdb");
   if (increase)
   {
-    if (info->m_first_tuple_id != info->m_last_tuple_id)
+    if (range.m_first_tuple_id != range.m_last_tuple_id)
     {
-      assert(info->m_first_tuple_id < info->m_last_tuple_id);
-      if (tupleId <= info->m_first_tuple_id + 1)
+      assert(range.m_first_tuple_id < range.m_last_tuple_id);
+      if (tupleId <= range.m_first_tuple_id + 1)
 	DBUG_RETURN(0);
-      if (tupleId <= info->m_last_tuple_id)
+      if (tupleId <= range.m_last_tuple_id)
       {
-	info->m_first_tuple_id = tupleId - 1;
+	range.m_first_tuple_id = tupleId - 1;
         DBUG_PRINT("info", 
                    ("Setting next auto increment cached value to %lu",
                     (ulong)tupleId));  
@@ -965,7 +1011,7 @@ Ndb::setTupleIdInNdb(Ndb_local_table_inf
      * if tupleId <= NEXTID, do nothing.  otherwise update NEXTID to
      * tupleId and set cached range to first = last = tupleId - 1.
      */
-    if (opTupleIdOnNdb(info, tupleId, 2) == -1)
+    if (opTupleIdOnNdb(table, range, tupleId, 2) == -1)
       DBUG_RETURN(-1);
   }
   else
@@ -973,42 +1019,62 @@ Ndb::setTupleIdInNdb(Ndb_local_table_inf
     /*
      * update NEXTID to given value.  reset cached range.
      */
-    if (opTupleIdOnNdb(info, tupleId, 1) == -1)
+    if (opTupleIdOnNdb(table, range, tupleId, 1) == -1)
       DBUG_RETURN(-1);
   }
   DBUG_RETURN(0);
 }
 
+int Ndb::initAutoIncrement()
+{
+  if (m_sys_tab_0)
+    return 0;
+
+  BaseString currentDb(getDatabaseName());
+  BaseString currentSchema(getDatabaseSchemaName());
+
+  setDatabaseName("sys");
+  setDatabaseSchemaName("def");
+
+  m_sys_tab_0 = theDictionary->getTableGlobal("SYSTAB_0");
+
+  // Restore current name space
+  setDatabaseName(currentDb.c_str());
+  setDatabaseSchemaName(currentSchema.c_str());
+
+  if (m_sys_tab_0 == NULL) {
+    assert(theDictionary->m_error.code != 0);
+    theError.code = theDictionary->m_error.code;
+    return -1;
+  }
+
+  return 0;
+}
+
 int
-Ndb::opTupleIdOnNdb(Ndb_local_table_info* info, Uint64 & opValue, Uint32 op)
+Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
+                    TupleIdRange & range, Uint64 & opValue, Uint32 op)
 {
   DBUG_ENTER("Ndb::opTupleIdOnNdb");
-  Uint32 aTableId = info->m_table_impl->m_tableId;
+  Uint32 aTableId = table->m_id;
   DBUG_PRINT("enter", ("table: %u  value: %lu  op: %u",
                        aTableId, (ulong) opValue, op));
 
-  NdbTransaction*     tConnection;
-  NdbOperation*      tOperation= 0; // Compiler warning if not initialized
+  NdbTransaction*    tConnection = NULL;
+  NdbOperation*      tOperation = NULL;
   Uint64             tValue;
   NdbRecAttr*        tRecAttrResult;
 
-  NdbError savedError;
-
-  CHECK_STATUS_MACRO_ZERO;
+  CHECK_STATUS_MACRO;
 
-  BaseString currentDb(getDatabaseName());
-  BaseString currentSchema(getDatabaseSchemaName());
+  if (initAutoIncrement() == -1)
+    goto error_handler;
 
-  setDatabaseName("sys");
-  setDatabaseSchemaName("def");
   tConnection = this->startTransaction();
   if (tConnection == NULL)
-    goto error_return;
+    goto error_handler;
 
-  if (usingFullyQualifiedNames())
-    tOperation = tConnection->getNdbOperation("SYSTAB_0");
-  else
-    tOperation = tConnection->getNdbOperation("sys/def/SYSTAB_0");
+  tOperation = tConnection->getNdbOperation(m_sys_tab_0);
   if (tOperation == NULL)
     goto error_handler;
 
@@ -1016,18 +1082,18 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
     {
     case 0:
       tOperation->interpretedUpdateTuple();
-      tOperation->equal("SYSKEY_0", aTableId );
+      tOperation->equal("SYSKEY_0", aTableId);
       tOperation->incValue("NEXTID", opValue);
       tRecAttrResult = tOperation->getValue("NEXTID");
 
-      if (tConnection->execute( Commit ) == -1 )
+      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
         goto error_handler;
 
       tValue = tRecAttrResult->u_64_value();
 
-      info->m_first_tuple_id = tValue - opValue;
-      info->m_last_tuple_id  = tValue - 1;
-      opValue = info->m_first_tuple_id; // out
+      range.m_first_tuple_id = tValue - opValue;
+      range.m_last_tuple_id  = tValue - 1;
+      opValue = range.m_first_tuple_id; // out
       break;
     case 1:
       // create on first use
@@ -1035,11 +1101,10 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
       tOperation->equal("SYSKEY_0", aTableId );
       tOperation->setValue("NEXTID", opValue);
 
-      if (tConnection->execute( Commit ) == -1 )
+      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
         goto error_handler;
 
-      info->m_first_tuple_id = ~(Uint64)0;
-      info->m_last_tuple_id  = ~(Uint64)0;
+      range.reset();
       break;
     case 2:
       tOperation->interpretedUpdateTuple();
@@ -1053,7 +1118,7 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
       tOperation->def_label(0);
       tOperation->interpret_exit_nok(9999);
       
-      if (tConnection->execute( Commit ) == -1)
+      if (tConnection->execute( NdbTransaction::Commit ) == -1)
       {
         if (tConnection->theError.code != 9999)
           goto error_handler;
@@ -1062,15 +1127,15 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
       {
         DBUG_PRINT("info", 
                    ("Setting next auto increment value (db) to %lu",
-                    (ulong)opValue));  
-        info->m_first_tuple_id = info->m_last_tuple_id = opValue - 1;
+                    (ulong) opValue));  
+        range.m_first_tuple_id = range.m_last_tuple_id = opValue - 1;
       }
       break;
     case 3:
       tOperation->readTuple();
       tOperation->equal("SYSKEY_0", aTableId );
       tRecAttrResult = tOperation->getValue("NEXTID");
-      if (tConnection->execute( Commit ) == -1 )
+      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
         goto error_handler;
       opValue = tRecAttrResult->u_64_value(); // out
       break;
@@ -1080,29 +1145,28 @@ Ndb::opTupleIdOnNdb(Ndb_local_table_info
 
   this->closeTransaction(tConnection);
 
-  // Restore current name space
-  setDatabaseName(currentDb.c_str());
-  setDatabaseSchemaName(currentSchema.c_str());
-
   DBUG_RETURN(0);
 
-  error_handler:
+error_handler:
+  DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
+             theError.code,
+             tConnection != NULL ? tConnection->theError.code : -1,
+             tOperation != NULL ? tOperation->theError.code : -1));
+
+  if (theError.code == 0 && tConnection != NULL)
     theError.code = tConnection->theError.code;
+  if (theError.code == 0 && tOperation != NULL)
+    theError.code = tOperation->theError.code;
+  DBUG_ASSERT(theError.code != 0);
 
-    savedError = theError;
+  NdbError savedError;
+  savedError = theError;
 
+  if (tConnection != NULL)
     this->closeTransaction(tConnection);
-    theError = savedError;
 
-  error_return:
-    // Restore current name space
-    setDatabaseName(currentDb.c_str());
-    setDatabaseSchemaName(currentSchema.c_str());
+  theError = savedError;
 
-  DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
-             theError.code,
-             tConnection ? tConnection->theError.code : -1,
-             tOperation ? tOperation->theError.code : -1));
   DBUG_RETURN(-1);
 }
 
@@ -1123,39 +1187,43 @@ convertEndian(Uint32 Data)
   return Data;
 #endif
 }
+
+// <internal>
+Ndb_cluster_connection &
+Ndb::get_ndb_cluster_connection()
+{
+  return theImpl->m_ndb_cluster_connection;
+}
+
 const char * Ndb::getCatalogName() const
 {
   return theImpl->m_dbname.c_str();
 }
 
-
 void Ndb::setCatalogName(const char * a_catalog_name)
 {
-  if (a_catalog_name)
-  {
+  // TODO can table_name_separator be escaped?
+  if (a_catalog_name && ! strchr(a_catalog_name, table_name_separator)) {
     theImpl->m_dbname.assign(a_catalog_name);
     theImpl->update_prefix();
   }
 }
 
-
 const char * Ndb::getSchemaName() const
 {
   return theImpl->m_schemaname.c_str();
 }
 
-
 void Ndb::setSchemaName(const char * a_schema_name)
 {
-  if (a_schema_name) {
+  // TODO can table_name_separator be escaped?
+  if (a_schema_name && ! strchr(a_schema_name, table_name_separator)) {
     theImpl->m_schemaname.assign(a_schema_name);
     theImpl->update_prefix();
   }
 }
+// </internal>
  
-/*
-Deprecated functions
-*/
 const char * Ndb::getDatabaseName() const
 {
   return getCatalogName();
@@ -1175,6 +1243,26 @@ void Ndb::setDatabaseSchemaName(const ch
 {
   setSchemaName(a_schema_name);
 }
+
+int Ndb::setDatabaseAndSchemaName(const NdbDictionary::Table* t)
+{
+  const char* s0 = t->m_impl.m_internalName.c_str();
+  const char* s1 = strchr(s0, table_name_separator);
+  if (s1 && s1 != s0) {
+    const char* s2 = strchr(s1 + 1, table_name_separator);
+    if (s2 && s2 != s1 + 1) {
+      char buf[NAME_LEN + 1];
+      if (s1 - s0 <= NAME_LEN && s2 - (s1 + 1) <= NAME_LEN) {
+        sprintf(buf, "%.*s", (int) (s1 - s0), s0);
+        setDatabaseName(buf);
+        sprintf(buf, "%.*s", (int) (s2 - (s1 + 1)), s1 + 1);
+        setDatabaseSchemaName(buf);
+        return 0;
+      }
+    }
+  }
+  return -1;
+}
  
 bool Ndb::usingFullyQualifiedNames()
 {
@@ -1237,9 +1325,16 @@ Ndb::internalize_table_name(const char *
   if (fullyQualifiedNames)
   {
     /* Internal table name format <db>/<schema>/<table>
-       <db>/<schema> is already available in m_prefix
+       <db>/<schema>/ is already available in m_prefix
        so just concat the two strings
      */
+#ifdef VM_TRACE
+    // verify that m_prefix looks like abc/def/
+    const char* s0 = theImpl->m_prefix.c_str();
+    const char* s1 = s0 ? strchr(s0, table_name_separator) : 0;
+    const char* s2 = s1 ? strchr(s1 + 1, table_name_separator) : 0;
+    assert(s1 && s1 != s0 && s2 && s2 != s1 + 1 && *(s2 +
1) == 0);
+#endif
     ret.assfmt("%s%s",
                theImpl->m_prefix.c_str(),
                external_name);
@@ -1251,6 +1346,35 @@ Ndb::internalize_table_name(const char *
   DBUG_RETURN(ret);
 }
 
+const BaseString
+Ndb::old_internalize_index_name(const NdbTableImpl * table,
+				const char * external_name) const
+{
+  BaseString ret;
+  DBUG_ENTER("old_internalize_index_name");
+  DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
+                       external_name, table ? table->m_id : ~0));
+  if (!table)
+  {
+    DBUG_PRINT("error", ("!table"));
+    DBUG_RETURN(ret);
+  }
+
+  if (fullyQualifiedNames)
+  {
+    /* Internal index name format <db>/<schema>/<tabid>/<table>
*/
+    ret.assfmt("%s%d%c%s",
+               theImpl->m_prefix.c_str(),
+               table->m_id,
+               table_name_separator,
+               external_name);
+  }
+  else
+    ret.assign(external_name);
+
+  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
+  DBUG_RETURN(ret);
+}
 
 const BaseString
 Ndb::internalize_index_name(const NdbTableImpl * table,
@@ -1259,7 +1383,7 @@ Ndb::internalize_index_name(const NdbTab
   BaseString ret;
   DBUG_ENTER("internalize_index_name");
   DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
-                       external_name, table ? table->m_tableId : ~0));
+                       external_name, table ? table->m_id : ~0));
   if (!table)
   {
     DBUG_PRINT("error", ("!table"));
@@ -1268,10 +1392,10 @@ Ndb::internalize_index_name(const NdbTab
 
   if (fullyQualifiedNames)
   {
-    /* Internal index name format <db>/<schema>/<tabid>/<table>
*/
+    /* Internal index name format sys/def/<tabid>/<table> */
     ret.assfmt("%s%d%c%s",
-               theImpl->m_prefix.c_str(),
-               table->m_tableId,
+               theImpl->m_systemPrefix.c_str(),
+               table->m_id,
                table_name_separator,
                external_name);
   }
@@ -1316,6 +1440,113 @@ Ndb::getSchemaFromInternalName(const cha
   BaseString ret = BaseString(schemaName);
   delete [] schemaName;
   return ret;
+}
+
+// ToDo set event buffer size
+NdbEventOperation* Ndb::createEventOperation(const char* eventName)
+{
+  DBUG_ENTER("Ndb::createEventOperation");
+  NdbEventOperation* tOp= theEventBuffer->createEventOperation(eventName,
+							       theError);
+  if (tOp)
+  {
+    // keep track of all event operations
+    NdbEventOperationImpl *op=
+      NdbEventBuffer::getEventOperationImpl(tOp);
+    op->m_next= theImpl->m_ev_op;
+    op->m_prev= 0;
+    theImpl->m_ev_op= op;
+    if (op->m_next)
+      op->m_next->m_prev= op;
+  }
+
+  DBUG_RETURN(tOp);
+}
+
+int Ndb::dropEventOperation(NdbEventOperation* tOp)
+{
+  DBUG_ENTER("Ndb::dropEventOperation");
+  DBUG_PRINT("info", ("name: %s", tOp->getEvent()->getTable()->getName()));
+  // remove it from list
+  NdbEventOperationImpl *op=
+    NdbEventBuffer::getEventOperationImpl(tOp);
+  if (op->m_next)
+    op->m_next->m_prev= op->m_prev;
+  if (op->m_prev)
+    op->m_prev->m_next= op->m_next;
+  else
+    theImpl->m_ev_op= op->m_next;
+
+  DBUG_PRINT("info", ("first: %s",
+                      theImpl->m_ev_op ?
theImpl->m_ev_op->getEvent()->getTable()->getName() : "<empty>"));
+  assert(theImpl->m_ev_op == 0 || theImpl->m_ev_op->m_prev == 0);
+
+  theEventBuffer->dropEventOperation(tOp);
+  DBUG_RETURN(0);
+}
+
+NdbEventOperation *Ndb::getEventOperation(NdbEventOperation* tOp)
+{
+  NdbEventOperationImpl *op;
+  if (tOp)
+    op= NdbEventBuffer::getEventOperationImpl(tOp)->m_next;
+  else
+    op= theImpl->m_ev_op;
+  if (op)
+    return op->m_facade;
+  return 0;
+}
+
+int
+Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
+{
+  return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI);
+}
+
+int
+Ndb::flushIncompleteEvents(Uint64 gci)
+{
+  return theEventBuffer->flushIncompleteEvents(gci);
+}
+
+NdbEventOperation *Ndb::nextEvent()
+{
+  return theEventBuffer->nextEvent();
+}
+
+const NdbEventOperation*
+Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types)
+{
+  NdbEventOperationImpl* op =
+    theEventBuffer->getGCIEventOperations(iter, event_types);
+  if (op != NULL)
+    return op->m_facade;
+  return NULL;
+}
+
+Uint64 Ndb::getLatestGCI()
+{
+  return theEventBuffer->getLatestGCI();
+}
+
+void Ndb::setReportThreshEventGCISlip(unsigned thresh)
+{
+ if (theEventBuffer->m_free_thresh != thresh)
+ {
+   theEventBuffer->m_free_thresh= thresh;
+   theEventBuffer->m_min_free_thresh= thresh;
+   theEventBuffer->m_max_free_thresh= 100;
+ }
+}
+
+void Ndb::setReportThreshEventFreeMem(unsigned thresh)
+{
+  if (theEventBuffer->m_free_thresh != thresh)
+  {
+    theEventBuffer->m_free_thresh= thresh;
+    theEventBuffer->m_min_free_thresh= thresh;
+    theEventBuffer->m_max_free_thresh= 100;
+  }
 }
 
 #ifdef VM_TRACE
Thread
bk commit into 5.1 tree (tomas:1.2473)tomas8 Mar