List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:January 13 2012 3:49pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3701 to 3704)
View as plain text  
 3704 Mikael Ronstrom	2012-01-13
      Improve use of more resources when executing flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3703 Mikael Ronstrom	2012-01-13
      Many changes around local handling in flexAsynch.cpp

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3702 Mikael Ronstrom	2012-01-12
      Remove adaptive lock

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3701 Mikael Ronstrom	2012-01-11
      Fixed ndbd for new method

    modified:
      storage/ndb/src/kernel/vm/dummy_nonmt.cpp
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.cpp	revid:mikael.ronstrom@stripped
@@ -1127,7 +1127,7 @@ struct thr_data
    * with this part.
    */
   char unused_protection1[NDB_CL];
-  struct thr_adaptive_lock<64> m_jba_write_lock;
+  struct thr_spin_lock<64> m_jba_write_lock;
   struct thr_job_queue m_jba;
 
   struct thr_job_queue_head m_jba_head;
@@ -1263,7 +1263,7 @@ struct thr_repository
     /**
      * lock
      */
-    struct thr_adaptive_lock<8> m_send_lock;
+    struct thr_spin_lock<8> m_send_lock;
 
     /**
      * pending data

=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp	revid:mikael.ronstrom@stripped
@@ -39,6 +39,7 @@
 #define MAXATTR 64
 #define MAXTABLES 1
 #define NDB_MAXTHREADS 128
+#define NDB_MAX_NODES 48
 /*
   NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
   #define from <sys/thread.h> on AIX (IBM compiler).  We explicitly
@@ -93,8 +94,8 @@ static bool executeTransLoop(ThreadNdb* 
 static void executeCallback(int result, NdbConnection* NdbObject,
                             void* aObject);
 static bool error_handler(const NdbError & err);
-static Uint32 getKey(Uint32, Uint32) ;
 static void input_error();
+static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo);
 
 
 static int                              retry_opt = 3 ;
@@ -108,7 +109,10 @@ static int                              
 static longlong                 ThreadExecutions[NDB_MAXTHREADS];
 static StartType                ThreadStart[NDB_MAXTHREADS];
 static char                             tableName[MAXTABLES][MAXSTRLEN+1];
+static const NdbDictionary::Table *     tables[MAXTABLES];
 static char                             attrName[MAXATTR][MAXSTRLEN+1];
+static bool                             nodeTableArray[MAXTABLES][NDB_MAX_NODES + 1];
+static Uint32                           numberNodeTable[MAXTABLES];
 static RunType                          tRunType = RunAll;
 static int                              tStdTableNum = 0;
 static int                              tWarmupTime = 10; //Seconds
@@ -119,8 +123,7 @@ static int                              
 static NdbRecord * g_record[MAXTABLES];
 static bool tNdbRecord = false;
 
-static bool                             tLocal = false;
-static int                              tLocalPart = 0;
+static int                              tLocal = 0;
 static int                              tSendForce = 0;
 static int                              tNoOfLoops = 1;
 static int                              tAttributeSize = 1;
@@ -606,19 +609,18 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       tRunType == RunUpdate ||
       tRunType == RunDelete)
   {
-    longlong total_executions = 0;
-    longlong total_transactions;
+    longlong total_transactions = 0;
     longlong exec_time;
 
     if (tRunType == RunInsert || tRunType == RunDelete) {
-      total_executions = (longlong)tNoOfTransactions;
-      total_executions *= (longlong)tNoOfThreads;
+      total_transactions = (longlong)tNoOfTransactions;
+      total_transactions *= (longlong)tNoOfThreads;
+      total_transactions *= (longlong)tNoOfParallelTrans;
     } else {
       for (Uint32 i = 0; i < tNoOfThreads; i++){
-        total_executions += ThreadExecutions[i];
+        total_transactions += ThreadExecutions[i];
       }
     }
-    total_transactions = total_executions * tNoOfParallelTrans;
     if (tRunType == RunInsert || tRunType == RunDelete) {
       exec_time = (longlong)timer.elapsedTime();
     } else {
@@ -654,9 +656,10 @@ threadLoop(void* ThreadData)
   ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
   int threadNo = tabThread->ThreadNo;
   localNdb = new Ndb(g_cluster_connection+(threadNo % tConnections), "TEST_DB");
-  localNdb->init(1024);
+  localNdb->init(MAXPAR);
   localNdb->waitUntilReady(10000);
-  unsigned int threadBase = (threadNo << 16);
+  unsigned int threadBase = threadNo;
+
   
   for (;;){
     while (ThreadStart[threadNo] == stIdle) {
@@ -671,11 +674,18 @@ threadLoop(void* ThreadData)
     tType = ThreadStart[threadNo];
     ThreadStart[threadNo] = stIdle;
     if (tRunType == RunAll || tRunType == RunInsert || tRunType == RunDelete){
-      if(!executeThread(tabThread, tType, localNdb, threadBase)){
+      if(!executeThread(tabThread,
+                        tType,
+                        localNdb,
+                        threadBase)){
         break;
       }
     } else {
-      if(!executeTransLoop(tabThread, tType, localNdb, threadBase, threadNo)){
+      if(!executeTransLoop(tabThread,
+                           tType,
+                           localNdb,
+                           threadBase,
+                           threadNo)){
         break;
       }
     }
@@ -691,81 +701,142 @@ threadLoop(void* ThreadData)
 static int error_count = 0;
 
 static bool
+update_num_ops(Uint32 *num_ops, NdbConnection **tConArray)
+{
+  /*
+    Move num_ops forward to next unused position, can be old
+    transactions still outstanding
+  */
+  for ( ; *num_ops < tNoOfParallelTrans; (*num_ops)++)
+  {
+    if (tConArray[*num_ops])
+      continue;
+    else
+      break;
+  }
+  if (*num_ops == tNoOfParallelTrans)
+    return true;
+  return false;
+}
+
+static int
 executeTrans(ThreadNdb* pThread,
              StartType aType,
              Ndb* aNdbObject,
              unsigned int threadBase,
-             unsigned int i)
+             unsigned int record,
+             Uint32 nodeId,
+             NdbConnection **tConArray,
+             bool execute_all)
 {
-  NdbConnection* tConArray[1024];
   unsigned int tBase;
   unsigned int tBase2;
+  Uint32 threadBaseLoc, threadBaseLoc2;
+  Uint32 num_ops = 0;
+  Uint32 i, loops;
 
-  if (tLocal == false) {
-    tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
-  } else {
-    tBase = i * tNoOfParallelTrans * MAX_SEEK;
-  }//if
   START_REAL_TIME;
-  for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
-    if (tLocal == false) {
+  for (i = record, loops = 0;
+       i < tNoOfTransactions &&
+       loops < 16 &&
+       num_ops < tNoOfParallelTrans;
+       i++, loops++)
+  {
+    tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
+    threadBaseLoc = (threadBase * tNoOfTransactions * tNoOfParallelTrans) +
+                    (i * tNoOfParallelTrans);
+    for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
+      if (update_num_ops(&num_ops, tConArray))
+        break;
+      threadBaseLoc2 = threadBaseLoc + j;
       tBase2 = tBase + (j * tNoOfOpsPerTrans);
-    } else {
-      tBase2 = tBase + (j * MAX_SEEK);
-      tBase2 = getKey(threadBase, tBase2);
-    }//if
-    if (startTransGuess == true) {
-      union {
-        Uint64 Tkey64;
-        Uint32 Tkey32[2];
-      };
-      Tkey32[0] = threadBase;
-      Tkey32[1] = tBase2;
-      tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
-                                                  (const char*)&Tkey64, //Main PKey
-                                                  (Uint32)4);           //Key Length
-    } else {
-      tConArray[j] = aNdbObject->startTransaction();
-    }//if
-    if (tConArray[j] == NULL){
-      error_handler(aNdbObject->getNdbError());
-      ndbout << endl << "Unable to recover! Quiting now" << endl ;
-      return false;
-    }//if
-    
-    for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
-      //-------------------------------------------------------
-      // Define the operation, but do not execute it yet.
-      //-------------------------------------------------------
-      if (tNdbRecord)
-        defineNdbRecordOperation(pThread, 
-                                 tConArray[j], aType, threadBase,(tBase2+k));
+      if (startTransGuess == true) {
+        union {
+          Uint64 _align;
+          Uint32 Tkey32[2];
+        };
+        (void)_align;
+
+        Tkey32[0] = threadBaseLoc2;
+        Tkey32[1] = tBase2;
+        Ndb::Key_part_ptr hint[2];
+        hint[0].ptr = Tkey32+0;
+        hint[0].len = 4;
+        hint[1].ptr = 0;
+        hint[1].len = 0;
+
+        tConArray[num_ops] = aNdbObject->startTransaction(tables[0], hint);
+      }
       else
-        defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
-    }//for
+      {
+        tConArray[num_ops] = aNdbObject->startTransaction();
+      }
+
+      if (tConArray[num_ops] == NULL){
+        error_handler(aNdbObject->getNdbError());
+        ndbout << endl << "Unable to recover! Quitting now" << endl ;
+        return -1;
+      }//if
+   
+      if (nodeId != 0 &&
+          tConArray[num_ops]->getConnectedNodeId() != nodeId)
+      {
+        /*
+          We're running only local operations, this won't be local,
+          ignore this record
+        */
+        aNdbObject->closeTransaction(tConArray[num_ops]);
+        continue;
+      }
+      for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
+        //-------------------------------------------------------
+        // Define the operation, but do not execute it yet.
+        //-------------------------------------------------------
+        if (tNdbRecord)
+          defineNdbRecordOperation(pThread, 
+                                   tConArray[num_ops],
+                                   aType,
+                                   threadBaseLoc2,
+                                   (tBase2+k));
+        else
+          defineOperation(tConArray[num_ops],
+                          aType,
+                          threadBaseLoc2,
+                          (tBase2 + k));
+      }//for
     
-    tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+      tConArray[num_ops]->executeAsynchPrepare(Commit,
+                                               &executeCallback,
+                                               (void*)&tConArray[num_ops]);
+      num_ops++;
+    }//for
   }//for
   STOP_REAL_TIME;
+  if (num_ops == 0)
+    return 0;
   //-------------------------------------------------------
   // Now we have defined a set of operations, it is now time
-  // to execute all of them.
+  // to execute all of them. If execute_all isn't set, we
+  // only execute at least half of them. In this manner we
+  // can cater for different execution speeds in different
+  // parts of the system.
   //-------------------------------------------------------
-  int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
-  while (unsigned(Tcomp) < tNoOfParallelTrans) {
-    int TlocalComp = aNdbObject->pollNdb(3000, 0);
+  int min_execs = execute_all ? (int)num_ops :
+                     (num_ops > 1 ? (int)(num_ops / 2) : 1);
+  int Tcomp = aNdbObject->sendPollNdb(3000,
+                                      min_execs,
+                                      tSendForce);
+  while (Tcomp < min_execs) {
+    int TlocalComp = aNdbObject->pollNdb(3000, min_execs - Tcomp);
     Tcomp += TlocalComp;
   }//while
-  for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
-    if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
-      error_count++;
-      ndbout << "i = " << i << ", j = " << j << ", error = ";
-      ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
-      ndbout << hex << threadBase << endl;
-    }
-    aNdbObject->closeTransaction(tConArray[j]);
-  }//for
-  return true;
+  if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
+    error_count++;
+    ndbout << "i = " << i << ", error = ";
+    ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
+    ndbout << hex << threadBase << endl;
+  }
+  return Tcomp;
 }
 
 static 
@@ -779,28 +850,64 @@ executeTransLoop(ThreadNdb* pThread, 
   int time_expired;
   longlong executions = 0;
   unsigned int i = 0;
+  Uint32 nodeId;
+  int ops = 0;
+  int record;
+  Uint32 local_count = 0;
+  bool execute_all = false;
   DEFINE_TIMER;
+  NdbConnection* tConArray[MAXPAR];
 
+  for (Uint32 i = 0; i < MAXPAR; i++) tConArray[i] = NULL;
+  if (tLocal > 0)
+  {
+    nodeId = get_my_node_id((Uint32)0, threadBase);
+  }
+  else
+    nodeId = 0;
   ThreadExecutions[threadNo] = 0;
   START_TIMER;
   do
   {
-    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i++))
+    if (tLocal == 2)
+    {
+      /* Select node on round robin basis */
+      local_count++;
+      nodeId = get_my_node_id((Uint32)0, local_count);
+    }
+    else if (tLocal == 3)
+    {
+      /* Select node on random basis */
+      local_count = (Uint32)(rand() % numberNodeTable[0]);
+      nodeId = get_my_node_id((Uint32)0, local_count);
+    }
+    record = rand() % tNoOfTransactions;
+    if ((ops = executeTrans(pThread,
+                            aType,
+                            aNdbObject,
+                            threadBase,
+                            (Uint32)record,
+                            nodeId,
+                            tConArray,
+                            execute_all)) < 0)
       return false;
     STOP_TIMER;
     time_expired = (int)(timer.elapsedTime() / 1000);
     if (time_expired < tWarmupTime)
       ; //Do nothing
     else if (time_expired < (tWarmupTime + tExecutionTime)){
-      executions++; //Count measurement
+      executions += ops; //Count measurement
     }
     else if (time_expired < (tWarmupTime + tExecutionTime + tCooldownTime))
       ; //Do nothing
     else
+    {
+      execute_all = true;
       continue_flag = false; //Time expired
+    }
     if (i == tNoOfTransactions) /* Make sure the record exists */
       i = 0;
-  } while (continue_flag);
+  } while (1);
   ThreadExecutions[threadNo] = executions;
   return true;
 }//executeTransLoop()
@@ -808,39 +915,32 @@ executeTransLoop(ThreadNdb* pThread, 
 static 
 bool
 executeThread(ThreadNdb* pThread, 
-	      StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+	      StartType aType,
+              Ndb* aNdbObject,
+              unsigned int threadBase) {
+  NdbConnection* tConArray[MAXPAR];
+
+  for (Uint32 i = 0; i < MAXPAR; i++) tConArray[i] = NULL;
   for (unsigned int i = 0; i < tNoOfTransactions; i++) {
-    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i))
+    if ((executeTrans(pThread,
+                      aType,
+                      aNdbObject,
+                      threadBase,
+                      i,
+                      (Uint32)0,
+                      tConArray,
+                      true)) < 0)
       return false;
   }//for
   return true;
 }//executeThread()
 
-static 
-Uint32
-getKey(Uint32 aBase, Uint32 anIndex) {
-  Uint32 Tfound = anIndex;
-  union {
-    Uint64 Tkey64;
-    Uint32 Tkey32[2];
-  };
-  Tkey32[0] = aBase;
-  Uint32 hash;
-  for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) {
-    Tkey32[1] = (Uint32)i;
-    hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
-    hash = (hash >> 6) & (MAX_PARTS - 1);
-    if (hash == unsigned(tLocalPart)) {
-      Tfound = i;
-      break;
-    }//if
-  }//for
-  return Tfound;
-}//getKey()
-
 static void
 executeCallback(int result, NdbConnection* NdbObject, void* aObject)
 {
+  NdbConnection **array_ref = (NdbConnection**)aObject;
+  assert(NdbObject == *array_ref);
+  *array_ref = NULL;
   if (result == -1) {
 
               // Add complete error handling here
@@ -860,8 +960,8 @@ executeCallback(int result, NdbConnectio
 	      //    ndbout << "Error occured in poll:" << endl;
 	      //    ndbout << NdbObject->getNdbError() << endl;
     failed++ ;
-    return;
   }//if
+  NdbObject->close(); /* Close transaction */
   return;
 }//executeCallback()
 
@@ -929,14 +1029,16 @@ defineOperation(NdbConnection* localNdbC
     error_handler(localNdbOperation->getNdbError()); 
   }//default
   }//switch
-  localNdbOperation->equal((Uint32)0,(char*)&attrValue[0]);
+
+  localNdbOperation->equal((Uint32)0, (char*)(attrValue + 0));
+  localNdbOperation->equal((Uint32)1, (char*)(attrValue + 1));
   switch (aType) {
   case stInsert:      // Insert case
   case stUpdate:      // Update Case
     {
       for (countAttributes = 1;
            countAttributes < loopCountAttributes; countAttributes++) {
-        localNdbOperation->setValue(countAttributes, 
+        localNdbOperation->setValue(countAttributes + 1,
                                     (char*)&attrValue[0]);
       }//for
       break;
@@ -944,7 +1046,7 @@ defineOperation(NdbConnection* localNdbC
   case stRead: {      // Read Case
     for (countAttributes = 1;
          countAttributes < loopCountAttributes; countAttributes++) {
-      localNdbOperation->getValue(countAttributes, 
+      localNdbOperation->getValue(countAttributes + 1,
                                   (char*)&attrValue[0]);
     }//for
     break;
@@ -1059,86 +1161,126 @@ dropTables(Ndb* pMyNdb)
   }
 }
 
+/*
+  Set up nodeTableArray with a boolean true for all nodes that
+  contains the table.
+*/
+static int
+setUpNodeTableArray(Uint32 tableNo, const NdbDictionary::Table *pTab)
+{
+  Uint32 numFragments = pTab->getFragmentCount();
+  Uint32 nodeId;
+  for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
+    nodeTableArray[tableNo][i] = false;
+  for (Uint32 i = 0; i < numFragments; i++)
+  {
+    if ((pTab->getFragmentNodes(i, &nodeId, (Uint32)1)) == 0)
+    {
+      return 1;
+    }
+    nodeTableArray[tableNo][nodeId] = true;
+  }
+  numberNodeTable[tableNo] = 0;
+  for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
+  {
+    if (nodeTableArray[tableNo][i])
+      numberNodeTable[tableNo]++;
+  }
+  return 0;
+}
+
+static Uint32
+get_my_node_id(Uint32 tableNo, Uint32 threadNo)
+{
+  Uint32 count = 0;
+  Uint32 n = threadNo % numberNodeTable[tableNo];
+  for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
+  {
+    if (nodeTableArray[tableNo][i])
+    {
+      if (count == n)
+        return i;
+      count++;
+    }
+  }
+  return 0;
+}
+
 static
 int 
 createTables(Ndb* pMyNdb){
 
-  NdbSchemaCon          *MySchemaTransaction;
-  NdbSchemaOp           *MySchemaOp;
-  int                   check;
-
-  if (theTableCreateFlag == 0 || tRunType == RunCreateTable) {
-    for(int i=0; i < MAXTABLES ;i++) {
+  NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+  if (theTableCreateFlag == 0 || tRunType == RunCreateTable)
+  {
+    for(int i=0; i < MAXTABLES ;i++)
+    {
       ndbout << "Creating " << tableName[i] << "..." << endl;
-      MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
-      
-      if(MySchemaTransaction == NULL && 
-         (!error_handler(MySchemaTransaction->getNdbError())))
-        return -1;
-      
-      MySchemaOp = MySchemaTransaction->getNdbSchemaOp();       
-      if(MySchemaOp == NULL &&
-         (!error_handler(MySchemaTransaction->getNdbError())))
-        return -1;
 
+      NdbDictionary::Table tab;
+      tab.setName(tableName[i]);
+      if (tempTable)
+      {
+        tab.setLogging(false);
+      }
 
-      check = MySchemaOp->createTable( tableName[i]
-                                       ,8                       // Table Size
-                                       ,TupleKey                // Key Type
-                                       ,40                      // Nr of Pages
-                                       ,All
-                                       ,6
-                                       ,(tLoadFactor - 5)
-                                       ,(tLoadFactor)
-                                       ,1
-                                       ,!tempTable
-                                       );
-      
-      if (check == -1 &&
-          (!error_handler(MySchemaTransaction->getNdbError())))
-        return -1;
-      
-      check = MySchemaOp->createAttribute( (char*)attrName[0],
-                                           TupleKey,
-                                           32,
-                                           PKSIZE,
-                                           UnSigned,
-                                           MMBased,
-                                           NotNullAttribute );
-      
-      if (check == -1 &&
-          (!error_handler(MySchemaTransaction->getNdbError())))
-        return -1;
-      for (unsigned j = 1; j < tNoOfAttributes ; j++){
-        check = MySchemaOp->createAttribute( (char*)attrName[j],
-                                             NoKey,
-                                             32,
-                                             tAttributeSize,
-                                             UnSigned,
-                                             MMBased,
-                                             NotNullAttribute );
-        if (check == -1 &&
-            (!error_handler(MySchemaTransaction->getNdbError())))
-          return -1;
+      {
+        NdbDictionary::Column distkey;
+        distkey.setName("DISTKEY");
+        distkey.setType(NdbDictionary::Column::Unsigned);
+        distkey.setPrimaryKey(true);
+        distkey.setDistributionKey(true);
+        tab.addColumn(distkey);
       }
-      
-      if (MySchemaTransaction->execute() == -1 &&
-          (!error_handler(MySchemaTransaction->getNdbError())))
+
+      {
+        NdbDictionary::Column pk;
+        pk.setName(attrName[0]);
+        pk.setType(NdbDictionary::Column::Unsigned);
+        pk.setPrimaryKey(true);
+        tab.addColumn(pk);
+      }
+
+      for (unsigned j = 1; j < tNoOfAttributes ; j++)
+      {
+        NdbDictionary::Column col;
+        col.setName(attrName[j]);
+        col.setType(NdbDictionary::Column::Unsigned);
+        col.setLength(tAttributeSize);
+        tab.addColumn(col);
+      }
+
+      int res = pDict->createTable(tab);
+      if (res != 0)
+      {
+        ndbout << pDict->getNdbError() << endl;
         return -1;
-      
-      NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+      }
+    }
+  }
+
+  for(int i=0; i < MAXTABLES ;i++)
+  {
+    const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
+    if (pTab == NULL)
+    {
+      error_handler(pDict->getNdbError());
+      return -1;
+    }
+    tables[i] = pTab;
+    if (setUpNodeTableArray(i, pTab))
+    {
+      error_handler(pDict->getNdbError());
+      return -1;
     }
   }
+
   if (tNdbRecord)
   {
-    for(int i=0; i < MAXTABLES ;i++) {
-      NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
-      const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
+    for(int i=0; i < MAXTABLES ;i++)
+    {
+      const NdbDictionary::Table * pTab = tables[i];
 
-      if (pTab == NULL){
-        error_handler(pDict->getNdbError());
-        return -1;
-      }
       int off = 0;
       Vector<NdbDictionary::RecordSpecification> spec;
       for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
@@ -1241,13 +1383,12 @@ readArguments(int argc, const char** arg
         return -1;
       }
     } else if (strcmp(argv[i], "-local") == 0){
-      tLocalPart = atoi(argv[i+1]);
-      tLocal = true;
-      startTransGuess = true;
-      if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
-	ndbout_c("Invalid local part");
+      tLocal = atoi(argv[i+1]);
+      if (tLocal < 1 || (tLocal > 3)){
+        ndbout_c("Invalid local value, only 1,2 or 3 allowed");
         return -1;
       }
+      startTransGuess = true;
     } else if (strcmp(argv[i], "-simple") == 0){
       theSimpleFlag = 1;
       argc++;
@@ -1340,7 +1481,7 @@ readArguments(int argc, const char** arg
     argc -= 2;
     i = i + 2;
   }//while
-  if (tLocal == true) {
+  if (tLocal > 0) {
     if (tNoOfOpsPerTrans != 1) {
       ndbout_c("Not valid to have more than one op per trans with local");
     }//if
@@ -1377,7 +1518,7 @@ input_error(){
   ndbout_c("   -adaptive Use adaptive send algorithm (default)");
   ndbout_c("   -force Force send when communicating");
   ndbout_c("   -non_adaptive Send at a 10 millisecond interval");
-  ndbout_c("   -local Number of part, only use keys in one part out of 16");
+  ndbout_c("   -local 1 = each thread its own node, 2 = round robin on node per parallel trans 3 = random node per parallel trans");
   ndbout_c("   -ndbrecord Use NDB Record");
   ndbout_c("   -r Number of extra loops");
   ndbout_c("   -insert Only run inserts on standard table");

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3701 to 3704) Mikael Ronstrom16 Jan