List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:November 9 2011 2:20pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4648 to 4649)
View as plain text  
 4649 Jonas Oreland	2011-11-09
      ndb - include mikaels extensions for flexAsynch...

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 4648 Ole John Aske	2011-11-09
      Fix for bug#13355055: CLUSTER INTERNALS FAILS TO TERMINATE BATCH AT MAX 'BATCHBYTESIZE'
      
        We have observed SCANREQs with a surprisingly small 'BatchSize' argument as part
      of debugging and tuning SPJ. Where we expected 'BatchSize=64' (Default) we
      have observed values around ~10. This directly translated into suboptimal performance.
      
      When debugging this, we found the root cause in NdbRecord::calculate_batch_size(), which
      returns the batchsize (#rows) and  arguments for the SCANREQ signal.
      It contained the following questionable logic:
      
       1) Calculate the worst case record length based on that *all columns* are selected
          from a table, and all varchar() columns being filled to their *max limit*.
      
       2) If that record length is such that 'batchsize * recLength' > ,
          reduce batchsize such that batchbytesize would never be exceeded.
      
      This effectively put ::calculate_batch_size() in control of the batchbytesize
      logic. The negative impact if that logic was that 'batchsize' could be severely
      restricted in cases where we could have delivered a lot more rows in that batch.
      
      However, there are logic in LQH+TUP which are intended to keep the delivered batches
      withing the batchsize limits. This is a much better place to control this as
      LQH & TUP knows the exact size of the TRANSID_AI payload being delivered, taking
      actual varchar length and only the selected columns into acount.
      
      Debugging that logic, it turned out that it contained bugs in how the produced
      batchsize was counted: Actually a mixup between whether the 'length' was in
      specified in number of bytes or Uint32. - So the above questionable
      ::calculate_batch_size() logic seems to have been invented only to
      circumvent this bug......
      
      Fixing that bug allowed us to now leave the entire batch control to
      the LQH block.
      
      - ::calculate_batch_size could then be significantly simplified.
      - The specified BatchSize & BatchByteSize arguments could be used as
        specified directly as args in SCANREQ signals.
      - Will likely give better performance (larger effective batches) when
        scanning a table with 'max record length > BatchByteSize / BatchSize'
        (~500 bytes with default config)
      
      
      Fix number of bytes/Uint32 mixup in how m_curr_batch_size_bytes is counted
      ******
      Fix number of bytes/Uint32 mixup in how the SPJ adaptive parallelism count m_totalBytes
      ******
      Simplify ::calculate_batch_size() as LQH now correctly will stay within the specified batch_size rows/bytes limits
      ******
      Remove NdbRecord::m_max_transid_ai_bytes which is now obsolete
      ******
      Remove unused args from NdbRecord::calculate_batch_size()
      ******
      Fix SPJs adaptive paralellism logic to also handle batchsize termination due to BatchByteSize being exhausted

    modified:
      storage/ndb/include/kernel/signaldata/ScanFrag.hpp
      storage/ndb/include/kernel/signaldata/TupKey.hpp
      storage/ndb/include/ndbapi/NdbReceiver.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
      storage/ndb/src/ndbapi/NdbRecord.hpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp	2011-11-09 14:19:23 +0000
@@ -37,7 +37,7 @@
 #define MAX_SEEK 16 
 #define MAXSTRLEN 16 
 #define MAXATTR 64
-#define MAXTABLES 64
+#define MAXTABLES 1
 #define NDB_MAXTHREADS 128
 /*
   NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
@@ -59,6 +59,16 @@ enum StartType {
   stStop 
 } ;
 
+enum RunType {
+  RunInsert,
+  RunRead,
+  RunUpdate,
+  RunDelete,
+  RunCreateTable,
+  RunDropTable,
+  RunAll
+};
+
 struct ThreadNdb
 {
   int NoOfOps;
@@ -70,6 +80,7 @@ extern "C" { static void* threadLoop(voi
 static void setAttrNames(void);
 static void setTableNames(void);
 static int readArguments(int argc, const char** argv);
+static void dropTables(Ndb* pMyNdb);
 static int createTables(Ndb*);
 static void defineOperation(NdbConnection* aTransObject, StartType aType, 
                             Uint32 base, Uint32 aIndex);
@@ -77,6 +88,8 @@ static void defineNdbRecordOperation(Thr
                             Uint32 base, Uint32 aIndex);
 static void execute(StartType aType);
 static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
+static bool executeTransLoop(ThreadNdb* pThread, StartType aType, Ndb* aNdbObject,
+                             unsigned int threadBase, int threadNo);
 static void executeCallback(int result, NdbConnection* NdbObject,
                             void* aObject);
 static bool error_handler(const NdbError & err);
@@ -92,9 +105,15 @@ ErrorData * flexAsynchErrorData;
 static NdbThread*               threadLife[NDB_MAXTHREADS];
 static int                              tNodeId;
 static int                              ThreadReady[NDB_MAXTHREADS];
+static longlong                 ThreadExecutions[NDB_MAXTHREADS];
 static StartType                ThreadStart[NDB_MAXTHREADS];
 static char                             tableName[MAXTABLES][MAXSTRLEN+1];
 static char                             attrName[MAXATTR][MAXSTRLEN+1];
+static RunType                          tRunType = RunAll;
+static int                              tStdTableNum = 0;
+static int                              tWarmupTime = 10; //Seconds
+static int                              tExecutionTime = 30; //Seconds
+static int                              tCooldownTime = 10; //Seconds
 
 // Program Parameters
 static NdbRecord * g_record[MAXTABLES];
@@ -126,9 +145,10 @@ static int
 
 #define START_REAL_TIME
 #define STOP_REAL_TIME
-#define START_TIMER { NdbTimer timer; timer.doStart();
+#define DEFINE_TIMER NdbTimer timer
+#define START_TIMER timer.doStart();
 #define STOP_TIMER timer.doStop();
-#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; 
+#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans)
 
 NDBT_Stats a_i, a_u, a_d, a_r;
 
@@ -183,6 +203,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   ThreadNdb*            pThreadData;
   int                   tLoops=0;
   int                   returnValue = NDBT_OK;
+  DEFINE_TIMER;
 
   flexAsynchErrorData = new ErrorData;
   flexAsynchErrorData->resetErrorCounters();
@@ -201,7 +222,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   ndbout << "  " << tNoOfParallelTrans;
   ndbout << " number of parallel operation per thread " << endl;
   ndbout << "  " << tNoOfTransactions << " transaction(s) per round " << endl;
-  ndbout << "  " << tNoOfLoops << " iterations " << endl;
+  if (tRunType == RunAll){
+    ndbout << "  " << tNoOfLoops << " iterations " << endl;
+  } else if (tRunType == RunRead || tRunType == RunUpdate){
+    ndbout << "  Warmup time is " << tWarmupTime << endl;
+    ndbout << "  Execution time is " << tExecutionTime << endl;
+    ndbout << "  Cooldown time is " << tCooldownTime << endl;
+  }
   ndbout << "  " << "Load Factor is " << tLoadFactor << "%" << endl;
   ndbout << "  " << tNoOfAttributes << " attributes per table " << endl;
   ndbout << "  " << tAttributeSize;
@@ -262,10 +289,20 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   if (pNdb->waitUntilReady(10000) != 0){
     ndbout << "NDB is not ready" << endl;
     ndbout << "Benchmark failed!" << endl;
-    returnValue = NDBT_FAILED;
+    return NDBT_ProgramExit(NDBT_FAILED);
   }
 
-  if(returnValue == NDBT_OK){
+  if (tRunType == RunCreateTable)
+  {
+    if (createTables(pNdb) != 0){
+      returnValue = NDBT_FAILED;
+    }
+  }
+  else if (tRunType == RunDropTable)
+  {
+    dropTables(pNdb);
+  }
+  else if(returnValue == NDBT_OK){
     if (createTables(pNdb) != 0){
       returnValue = NDBT_FAILED;
     }
@@ -282,14 +319,15 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
     }
   }
 
-  if(returnValue == NDBT_OK){
+  if(returnValue == NDBT_OK &&
+     tRunType != RunCreateTable &&
+     tRunType != RunDropTable){
     /****************************************************************
      *  Create NDB objects.                                   *
      ****************************************************************/
     resetThreads();
     for (Uint32 i = 0; i < tNoOfThreads ; i++) {
-      pThreadData[i].ThreadNo = i
-;
+      pThreadData[i].ThreadNo = i;
       threadLife[i] = NdbThread_Create(threadLoop,
                                        (void**)&pThreadData[i],
                                        32768,
@@ -312,76 +350,86 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
        ****************************************************************/
           
       failed = 0 ;
-
-      START_TIMER;
-      execute(stInsert);
-      STOP_TIMER;
-      a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-      PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int ci = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!" 
-                 << endl << endl;
-          ndbout << "Attempting to redo the failed transactions now..." 
-                 << endl ;
-          ndbout << "Redo attempt " << ci <<" out of " << retry_opt 
-                 << endl << endl;
-          failed = 0 ;
-          START_TIMER;
-          execute(stInsert);
-          STOP_TIMER;
-          PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          ci++;
-        }
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-        }else{
-          ndbout << endl <<"Redo attempt failed, moving on now..." << endl 
-                 << endl;
+      if (tRunType == RunAll || tRunType == RunInsert){
+        ndbout << "Executing inserts" << endl;
+        START_TIMER;
+        execute(stInsert);
+        STOP_TIMER;
+      }
+      if (tRunType == RunAll){
+        a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int ci = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!" 
+                   << endl << endl;
+            ndbout << "Attempting to redo the failed transactions now..." 
+                   << endl ;
+            ndbout << "Redo attempt " << ci <<" out of " << retry_opt 
+                   << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stInsert);
+            STOP_TIMER;
+            PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            ci++;
+          }
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+          }else{
+            ndbout << endl <<"Redo attempt failed, moving on now..." << endl 
+                   << endl;
+          }//if
         }//if
-      }//if
-          
+      }//if  
       /****************************************************************
        * Perform read.                                                *
        ****************************************************************/
       
       failed = 0 ;
 
-      for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
-      {
-        START_TIMER;
-        execute(stRead);
-        STOP_TIMER;
-        a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-        PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-      }
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cr = 1;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<<endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now..." << endl;
-          ndbout << endl;
-          ndbout <<"Redo attempt " << cr <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
+      if (tRunType == RunAll || tRunType == RunRead){
+        for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+        {
+          ndbout << "Executing reads" << endl;
           START_TIMER;
           execute(stRead);
           STOP_TIMER;
-          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cr++ ;
-        }//while
-        if(0 == failed ) {
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
-        }else{
-          ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+          if (tRunType == RunAll){
+            a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+          }//if
+        }//for
+      }//if
+
+      if (tRunType == RunAll){
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cr = 1;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<<endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now..." << endl;
+            ndbout << endl;
+            ndbout <<"Redo attempt " << cr <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stRead);
+            STOP_TIMER;
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cr++ ;
+          }//while
+          if(0 == failed ) {
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+          }else{
+            ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+          }//if
         }//if
       }//if
           
@@ -391,35 +439,40 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
        ****************************************************************/
       
       failed = 0 ;
-          
-      START_TIMER;
-      execute(stUpdate);
-      STOP_TIMER;
-      a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-      PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cu = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<<endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now..." << endl;
-          ndbout << endl <<"Redo attempt " << cu <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
-          START_TIMER;
-          execute(stUpdate);
-          STOP_TIMER;
-          PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cu++ ;
-        }//while
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-        } else {
-          ndbout << endl;
-          ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+
+      if (tRunType == RunAll || tRunType == RunUpdate){
+        ndbout << "Executing updates" << endl;
+        START_TIMER;
+        execute(stUpdate);
+        STOP_TIMER;
+      }//if
+      if (tRunType == RunAll){
+        a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cu = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<<endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now..." << endl;
+            ndbout << endl <<"Redo attempt " << cu <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stUpdate);
+            STOP_TIMER;
+            PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cu++ ;
+          }//while
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+          } else {
+            ndbout << endl;
+            ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+          }//if
         }//if
       }//if
           
@@ -428,38 +481,41 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
        ****************************************************************/
       
       failed = 0 ;
-          
-      for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
-      {
-        START_TIMER;
-        execute(stRead);
-        STOP_TIMER;
-        a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-        PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-      }        
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cr2 = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<<endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now..." << endl;
-          ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
+
+      if (tRunType == RunAll){
+        for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+        {
+          ndbout << "Executing reads" << endl;
           START_TIMER;
           execute(stRead);
           STOP_TIMER;
+          a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
           PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cr2++ ;
-        }//while
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-        }else{
-          ndbout << endl;
-          ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+        }        
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cr2 = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<<endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now..." << endl;
+            ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stRead);
+            STOP_TIMER;
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cr2++ ;
+          }//while
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+          }else{
+            ndbout << endl;
+            ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+          }//if
         }//if
       }//if
           
@@ -470,34 +526,39 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       
       failed = 0 ;
           
-      START_TIMER;
-      execute(stDelete);
-      STOP_TIMER;
-      a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-      PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cd = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<< endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now:" << endl ;
-          ndbout << endl <<"Redo attempt " << cd <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
-          START_TIMER;
-          execute(stDelete);
-          STOP_TIMER;
-          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cd++ ;
-        }//while
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
-        }else{
-          ndbout << endl;
-          ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+      if (tRunType == RunAll || tRunType == RunDelete){
+        ndbout << "Executing deletes" << endl;
+        START_TIMER;
+        execute(stDelete);
+        STOP_TIMER;
+      }//if
+      if (tRunType == RunAll){
+        a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cd = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<< endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now:" << endl ;
+            ndbout << endl <<"Redo attempt " << cd <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stDelete);
+            STOP_TIMER;
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cd++ ;
+          }//while
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+          }else{
+            ndbout << endl;
+            ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+          }//if
         }//if
       }//if
           
@@ -516,17 +577,61 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       NdbThread_WaitFor(threadLife[i], &tmp);
       NdbThread_Destroy(&threadLife[i]);
     }
-  } 
+  }
+
+  if (tRunType == RunAll)
+  {
+    dropTables(pNdb);
+  }
   delete [] pThreadData;
   delete pNdb;
 
-  //printing errorCounters
-  flexAsynchErrorData->printErrorCounters(ndbout);
-
-  print("insert", a_i);
-  print("update", a_u);
-  print("delete", a_d);
-  print("read  ", a_r);
+  if (tRunType == RunAll ||
+      tRunType == RunInsert ||
+      tRunType == RunDelete ||
+      tRunType == RunUpdate ||
+      tRunType == RunRead)
+  {
+    //printing errorCounters
+    flexAsynchErrorData->printErrorCounters(ndbout);
+    if (tRunType == RunAll) {
+      print("insert", a_i);
+      print("update", a_u);
+      print("delete", a_d);
+      print("read  ", a_r);
+    }
+  }
+  if (tRunType == RunInsert ||
+      tRunType == RunRead ||
+      tRunType == RunUpdate ||
+      tRunType == RunDelete)
+  {
+    longlong total_executions = 0;
+    longlong total_transactions;
+    longlong exec_time;
+
+    if (tRunType == RunInsert || tRunType == RunDelete) {
+      total_executions = (longlong)tNoOfTransactions;
+      total_executions *= (longlong)tNoOfThreads;
+    } else {
+      for (Uint32 i = 0; i < tNoOfThreads; i++){
+        total_executions += ThreadExecutions[i];
+      }
+    }
+    total_transactions = total_executions * tNoOfParallelTrans;
+    if (tRunType == RunInsert || tRunType == RunDelete) {
+      exec_time = (longlong)timer.elapsedTime();
+    } else {
+      exec_time = (longlong)tExecutionTime * 1000;
+    }
+    ndbout << "Total number of transactions is " << total_transactions;
+    ndbout << endl;
+    ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
+
+    total_transactions = (total_transactions * 1000) / exec_time;
+    int trans_per_sec = (int)total_transactions;
+    ndbout << "Total transactions per second " << trans_per_sec << endl;
+  }
 
   delete [] g_cluster_connection;
 
@@ -551,7 +656,7 @@ threadLoop(void* ThreadData)
   localNdb = new Ndb(g_cluster_connection+(threadNo % tConnections), "TEST_DB");
   localNdb->init(1024);
   localNdb->waitUntilReady(10000);
-  unsigned int threadBase = (threadNo << 16) + tNodeId ;
+  unsigned int threadBase = (threadNo << 16);
   
   for (;;){
     while (ThreadStart[threadNo] == stIdle) {
@@ -565,8 +670,14 @@ threadLoop(void* ThreadData)
 
     tType = ThreadStart[threadNo];
     ThreadStart[threadNo] = stIdle;
-    if(!executeThread(tabThread, tType, localNdb, threadBase)){
-      break;
+    if (tRunType == RunAll || tRunType == RunInsert || tRunType == RunDelete){
+      if(!executeThread(tabThread, tType, localNdb, threadBase)){
+        break;
+      }
+    } else {
+      if(!executeTransLoop(tabThread, tType, localNdb, threadBase, threadNo)){
+        break;
+      }
     }
     ThreadReady[threadNo] = 1;
   }//for
@@ -577,80 +688,131 @@ threadLoop(void* ThreadData)
   return NULL;
 }//threadLoop()
 
-static 
-bool
-executeThread(ThreadNdb* pThread, 
-	      StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+static int error_count = 0;
 
+static bool
+executeTrans(ThreadNdb* pThread,
+             StartType aType,
+             Ndb* aNdbObject,
+             unsigned int threadBase,
+             unsigned int i)
+{
   NdbConnection* tConArray[1024];
   unsigned int tBase;
   unsigned int tBase2;
 
-  unsigned int extraLoops= 0; // (aType == stRead) ? 100000 : 0;
-
-  for (unsigned int ex= 0; ex < (1 + extraLoops); ex++)
-  {
-    for (unsigned int i = 0; i < tNoOfTransactions; i++) {
-      if (tLocal == false) {
-        tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
-      } else {
-        tBase = i * tNoOfParallelTrans * MAX_SEEK;
-      }//if
-      START_REAL_TIME;
-      for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
-        if (tLocal == false) {
-          tBase2 = tBase + (j * tNoOfOpsPerTrans);
-        } else {
-          tBase2 = tBase + (j * MAX_SEEK);
-          tBase2 = getKey(threadBase, tBase2);
-        }//if
-        if (startTransGuess == true) {
-	  union {
-            Uint64 Tkey64;
-            Uint32 Tkey32[2];
-	  };
-          Tkey32[0] = threadBase;
-          Tkey32[1] = tBase2;
-          tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
-                                                      (const char*)&Tkey64, //Main PKey
-                                                      (Uint32)4);           //Key Length
-        } else {
-          tConArray[j] = aNdbObject->startTransaction();
-        }//if
-        if (tConArray[j] == NULL && 
-            !error_handler(aNdbObject->getNdbError()) ){
-          ndbout << endl << "Unable to recover! Quiting now" << endl ;
-          return false;
-        }//if
-        
-        for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
-          //-------------------------------------------------------
-          // Define the operation, but do not execute it yet.
-          //-------------------------------------------------------
-          if (tNdbRecord)
-            defineNdbRecordOperation(pThread, 
-                                     tConArray[j], aType, threadBase,(tBase2+k));
-          else
-            defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
-        }//for
-        
-        tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
-      }//for
-      STOP_REAL_TIME;
+  if (tLocal == false) {
+    tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
+  } else {
+    tBase = i * tNoOfParallelTrans * MAX_SEEK;
+  }//if
+  START_REAL_TIME;
+  for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
+    if (tLocal == false) {
+      tBase2 = tBase + (j * tNoOfOpsPerTrans);
+    } else {
+      tBase2 = tBase + (j * MAX_SEEK);
+      tBase2 = getKey(threadBase, tBase2);
+    }//if
+    if (startTransGuess == true) {
+      union {
+        Uint64 Tkey64;
+        Uint32 Tkey32[2];
+      };
+      Tkey32[0] = threadBase;
+      Tkey32[1] = tBase2;
+      tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
+                                                  (const char*)&Tkey64, //Main PKey
+                                                  (Uint32)4);           //Key Length
+    } else {
+      tConArray[j] = aNdbObject->startTransaction();
+    }//if
+    if (tConArray[j] == NULL){
+      error_handler(aNdbObject->getNdbError());
+      ndbout << endl << "Unable to recover! Quiting now" << endl ;
+      return false;
+    }//if
+    
+    for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
       //-------------------------------------------------------
-      // Now we have defined a set of operations, it is now time
-      // to execute all of them.
+      // Define the operation, but do not execute it yet.
       //-------------------------------------------------------
-      int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
-      while (unsigned(Tcomp) < tNoOfParallelTrans) {
-        int TlocalComp = aNdbObject->pollNdb(3000, 0);
-        Tcomp += TlocalComp;
-      }//while
-      for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
-        aNdbObject->closeTransaction(tConArray[j]);
-      }//for
+      if (tNdbRecord)
+        defineNdbRecordOperation(pThread, 
+                                 tConArray[j], aType, threadBase,(tBase2+k));
+      else
+        defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
     }//for
-  } // for
+    
+    tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+  }//for
+  STOP_REAL_TIME;
+  //-------------------------------------------------------
+  // Now we have defined a set of operations, it is now time
+  // to execute all of them.
+  //-------------------------------------------------------
+  int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
+  while (unsigned(Tcomp) < tNoOfParallelTrans) {
+    int TlocalComp = aNdbObject->pollNdb(3000, 0);
+    Tcomp += TlocalComp;
+  }//while
+  for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
+    if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
+      error_count++;
+      ndbout << "i = " << i << ", j = " << j << ", error = ";
+      ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
+      ndbout << hex << threadBase << endl;
+    }
+    aNdbObject->closeTransaction(tConArray[j]);
+  }//for
+  return true;
+}
+
+static 
+bool
+executeTransLoop(ThreadNdb* pThread, 
+                 StartType aType,
+                 Ndb* aNdbObject,
+                 unsigned int threadBase,
+                 int threadNo) {
+  bool continue_flag = true;
+  int time_expired;
+  longlong executions = 0;
+  unsigned int i = 0;
+  DEFINE_TIMER;
+
+  ThreadExecutions[threadNo] = 0;
+  START_TIMER;
+  do
+  {
+    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i++))
+      return false;
+    STOP_TIMER;
+    time_expired = timer.elapsedTime() / 1000;
+    if (time_expired < tWarmupTime)
+      ; //Do nothing
+    else if (time_expired < (tWarmupTime + tExecutionTime)){
+      executions++; //Count measurement
+    }
+    else if (time_expired < (tWarmupTime + tExecutionTime + tCooldownTime))
+      ; //Do nothing
+    else
+      continue_flag = false; //Time expired
+    if (i == tNoOfTransactions) /* Make sure the record exists */
+      i = 0;
+  } while (continue_flag);
+  ThreadExecutions[threadNo] = executions;
+  return true;
+}//executeTransLoop()
+
+static 
+bool
+executeThread(ThreadNdb* pThread, 
+	      StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+  for (unsigned int i = 0; i < tNoOfTransactions; i++) {
+    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i))
+      return false;
+  }//for
   return true;
 }//executeThread()
 
@@ -880,8 +1042,20 @@ static void setTableNames()
       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%u", i, 
                (unsigned)(NdbTick_CurrentMillisecond()+rand()));
     } else {
-      BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
+      BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", tStdTableNum);
     }
+    ndbout << "Using table name " << tableName[0] << endl;
+  }
+}
+
+static void
+dropTables(Ndb* pMyNdb)
+{
+  int i;
+  for (i = 0; i < MAXTABLES; i++)
+  {
+    ndbout << "Dropping table " << tableName[i] << "..." << endl;
+    pMyNdb->getDictionary()->dropTable(tableName[i]);
   }
 }
 
@@ -893,8 +1067,8 @@ createTables(Ndb* pMyNdb){
   NdbSchemaOp           *MySchemaOp;
   int                   check;
 
-  if (theTableCreateFlag == 0) {
-    for(int i=0; i < 1 ;i++) {
+  if (theTableCreateFlag == 0 || tRunType == RunCreateTable) {
+    for(int i=0; i < MAXTABLES ;i++) {
       ndbout << "Creating " << tableName[i] << "..." << endl;
       MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
       
@@ -953,31 +1127,35 @@ createTables(Ndb* pMyNdb){
         return -1;
       
       NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+    }
+  }
+  if (tNdbRecord)
+  {
+    for(int i=0; i < MAXTABLES ;i++) {
+      NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+      const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
 
-      if (tNdbRecord)
+      if (pTab == NULL){
+        error_handler(pDict->getNdbError());
+        return -1;
+      }
+      int off = 0;
+      Vector<NdbDictionary::RecordSpecification> spec;
+      for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
       {
-	NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
-	const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
-	
-	int off = 0;
-	Vector<NdbDictionary::RecordSpecification> spec;
-	for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
-	{
-	  NdbDictionary::RecordSpecification r0;
-	  r0.column = pTab->getColumn(j);
-	  r0.offset = off;
-	  off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
-	  spec.push_back(r0);
-	}
-	g_record[i] = 
+        NdbDictionary::RecordSpecification r0;
+        r0.column = pTab->getColumn(j);
+        r0.offset = off;
+        off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
+        spec.push_back(r0);
+      }
+      g_record[i] = 
 	  pDict->createRecord(pTab, spec.getBase(), 
 			      spec.size(),
 			      sizeof(NdbDictionary::RecordSpecification));
-	assert(g_record[i]);
-      }
+      assert(g_record[i]);
     }
   }
-  
   return 0;
 }
 
@@ -996,6 +1174,14 @@ bool error_handler(const NdbError & err)
   return false ; // return false to abort
 }
 
+static void
+setAggregateRun(void)
+{
+  tNoOfLoops = 1;
+  tExtraReadLoop = 0;
+  theTableCreateFlag = 1;
+}
+
 static
 int 
 readArguments(int argc, const char** argv){
@@ -1110,6 +1296,43 @@ readArguments(int argc, const char** arg
       tExtraReadLoop = atoi(argv[i+1]);
     } else if (strcmp(argv[i], "-con") == 0){
       tConnections = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-insert") == 0){
+      setAggregateRun();
+      tRunType = RunInsert;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-read") == 0){
+      setAggregateRun();
+      tRunType = RunRead;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-update") == 0){
+      setAggregateRun();
+      tRunType = RunUpdate;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-delete") == 0){
+      setAggregateRun();
+      tRunType = RunDelete;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-create_table") == 0){
+      tRunType = RunCreateTable;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-drop_table") == 0){
+      tRunType = RunDropTable;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-warmup_time") == 0){
+      tWarmupTime = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-execution_time") == 0){
+      tExecutionTime = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-cooldown_time") == 0){
+      tCooldownTime = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-table") == 0){
+      tStdTableNum = atoi(argv[i+1]);
+      theStdTableNameFlag = 1;
     } else {
       return -1;
     }
@@ -1131,7 +1354,6 @@ readArguments(int argc, const char** arg
 static
 void
 input_error(){
-  
   ndbout_c("FLEXASYNCH");
   ndbout_c("   Perform benchmark of insert, update and delete transactions");
   ndbout_c(" ");
@@ -1156,7 +1378,18 @@ input_error(){
   ndbout_c("   -force Force send when communicating");
   ndbout_c("   -non_adaptive Send at a 10 millisecond interval");
   ndbout_c("   -local Number of part, only use keys in one part out of 16");
-  ndbout_c("   -ndbrecord");
+  ndbout_c("   -ndbrecord Use NDB Record");
+  ndbout_c("   -r Number of extra loops");
+  ndbout_c("   -insert Only run inserts on standard table");
+  ndbout_c("   -read Only run reads on standard table");
+  ndbout_c("   -update Only run updates on standard table");
+  ndbout_c("   -delete Only run deletes on standard table");
+  ndbout_c("   -create_table Only run Create Table of standard table");
+  ndbout_c("   -drop_table Only run Drop Table on standard table");
+  ndbout_c("   -warmup_time Warmup Time before measurement starts");
+  ndbout_c("   -execution_time Execution Time where measurement is done");
+  ndbout_c("   -cooldown_time Cooldown time after measurement completed");
+  ndbout_c("   -table Number of standard table, default 0");
 }
   
 template class Vector<NdbDictionary::RecordSpecification>;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4648 to 4649) Jonas Oreland11 Nov