List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:March 2 2012 7:03pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3758 to 3792)
View as plain text  
 3792 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3791 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3790 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3789 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3788 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3787 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3786 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3785 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3784 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3783 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3782 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3781 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3780 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3779 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3778 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3777 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3776 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3775 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3774 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3773 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3772 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3771 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3770 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3769 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3768 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3767 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3766 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3765 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3764 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3763 Mikael Ronstrom	2012-03-01
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3762 Mikael Ronstrom	2012-03-01
      Fixes for debug printouts

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3761 Mikael Ronstrom	2012-03-01
      Fixes for debug printouts

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3760 Mikael Ronstrom	2012-03-01
      Make source distributions compilable with debug

    modified:
      cmake/maintainer.cmake
 3759 Mikael Ronstrom	2012-03-01
      More debug printouts for new flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3758 Mikael Ronström	2012-02-29
      Signal to wake waiting executor threads

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
=== modified file 'cmake/maintainer.cmake'
--- a/cmake/maintainer.cmake	revid:mikael@dator9-20120229143837-wp3s7fcdo5bqnykz
+++ b/cmake/maintainer.cmake	revid:mikael.ronstrom@stripped20120302185524-mfpfeas69q69kqyy
@@ -18,7 +18,7 @@ INCLUDE(CheckCCompilerFlag)
 # Setup GCC (GNU C compiler) warning options.
 MACRO(SET_MYSQL_MAINTAINER_GNU_C_OPTIONS)
   SET(MY_MAINTAINER_WARNINGS
-      "-Wall -Wextra -Wunused -Wwrite-strings -Wno-strict-aliasing -Werror")
+      "-Wall -Wextra -Wunused -Wwrite-strings -Wno-strict-aliasing")
   CHECK_C_COMPILER_FLAG("-Wdeclaration-after-statement"
                         HAVE_DECLARATION_AFTER_STATEMENT)
   IF(HAVE_DECLARATION_AFTER_STATEMENT)

=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp	revid:mikael@dator9-20120229143837-wp3s7fcdo5bqnykz
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp	revid:mikael.ronstrom@stripped0302185524-mfpfeas69q69kqyy
@@ -56,6 +56,9 @@
 #define MAXATTRSIZE 1000
 #define PKSIZE 2
 
+#define DEB_PRINT(a) ndbout_c a
+//#define DEB_PRINT(a)
+
 enum StartType { 
   stIdle = 0, 
   stInsert = 1,
@@ -101,9 +104,10 @@ static void executeCallback(int result, 
 static bool error_handler(const NdbError & err);
 static void input_error();
 static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo);
-static void main_thread(RunType run_type);
-static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer);
 
+static void main_thread(RunType run_type, NdbTimer & timer);
+static Uint64 get_total_transactions();
+static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer);
 
 static int                              retry_opt = 3 ;
 static int                              failed = 0 ;
@@ -153,7 +157,6 @@ static int                              
 static int                              theStdTableNameFlag = 0;
 static int                              theTableCreateFlag = 0;
 static int                              tConnections = 1;
-static int                              tStop = 0;
 
 #define START_REAL_TIME
 #define STOP_REAL_TIME
@@ -186,13 +189,13 @@ resetThreads(){
 }
 
 static void 
-waitForThreads(void)
+waitForThreads(Uint32 num_threads_to_wait_for)
 {
   int cont = 0;
   do {
     cont = 0;
     NdbSleep_MilliSleep(20);
-    for (unsigned i = 0; i < tNoOfThreads ; i++) {
+    for (unsigned i = 0; i < num_threads_to_wait_for ; i++) {
       if (ThreadReady[i] == 0) {
         cont = 1;
       }//if
@@ -264,7 +267,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
 
   ndbout << endl;
 
-  NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
+  NdbThread_SetConcurrencyLevel(2 + (tNoOfThreads * 5 / 4));
 
   /* print Setting */
   flexAsynchErrorData->printSettings(ndbout);
@@ -319,7 +322,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
     }
   }
 
-  if (tNdbRecord)
+  if (tNdbRecord && !tNew)
   {
     Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
     sz += 3;
@@ -335,7 +338,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
      tRunType != RunDropTable){
     if (tNew)
     {
-      main_thread(tRunType);
+      main_thread(tRunType, timer);
     }
     else
     {
@@ -370,27 +373,39 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       tRunType == RunUpdate ||
       tRunType == RunDelete)
   {
-    longlong total_transactions = 0;
-    longlong exec_time;
+    Uint64 total_transactions = 0;
+    Uint64 exec_time;
 
-    if (tRunType == RunInsert || tRunType == RunDelete) {
-      total_transactions = (longlong)tNoOfTransactions;
-      total_transactions *= (longlong)tNoOfThreads;
-      total_transactions *= (longlong)tNoOfParallelTrans;
-    } else {
-      for (Uint32 i = 0; i < tNoOfThreads; i++){
-        total_transactions += ThreadExecutions[i];
+    if (tNew)
+    {
+      total_transactions = get_total_transactions();
+    }
+    else
+    {
+      if (tRunType == RunInsert || tRunType == RunDelete) {
+        total_transactions = (longlong)tNoOfTransactions;
+        total_transactions *= (longlong)tNoOfThreads;
+        total_transactions *= (longlong)tNoOfParallelTrans;
+      } else {
+        for (Uint32 i = 0; i < tNoOfThreads; i++){
+          total_transactions += ThreadExecutions[i];
+        }
       }
     }
     if (tRunType == RunInsert || tRunType == RunDelete) {
-      exec_time = (longlong)timer.elapsedTime();
+      exec_time = (Uint64)timer.elapsedTime();
     } else {
-      exec_time = (longlong)tExecutionTime * 1000;
+      exec_time = (Uint64)tExecutionTime * 1000;
     }
     ndbout << "Total number of transactions is " << total_transactions;
     ndbout << endl;
     ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
 
+    if (!exec_time)
+    {
+      exec_time = 1; /* Avoid floating point exception */
+      ndbout_c("Zero execution time!!!");
+    }
     total_transactions = (total_transactions * 1000) / exec_time;
     int trans_per_sec = (int)total_transactions;
     ndbout << "Total transactions per second " << trans_per_sec << endl;
@@ -406,7 +421,7 @@ static void execute(StartType aType)
 {
   resetThreads();
   tellThreads(aType);
-  waitForThreads();
+  waitForThreads(tNoOfThreads);
 }//execute()
 
 static void*
@@ -1128,6 +1143,9 @@ setAggregateRun(void)
  * NDB Record format in the NDB API.
  */
 
+static void* definer_thread(void *data);
+static void* executor_thread(void *data);
+
 static Uint32 tNoOfExecutorThreads;
 static Uint32 tNoOfDefinerThreads;
 
@@ -1159,70 +1177,232 @@ struct key_list_header
   Uint32 num_in_list;
 };
 
-typedef struct thread_data THREAD_DATA;
-struct thread_data
+
+typedef struct thread_data_struct THREAD_DATA;
+struct thread_data_struct
 {
   KEY_LIST_HEADER list_header;
   char *record;
   Uint32 thread_id;
+  bool ready;
+  bool stop;
+  bool start;
   NdbMutex *transport_mutex;
   struct NdbCondition *transport_cond;
+  struct NdbCondition *main_cond;
+  struct NdbCondition *start_cond;
 };
+THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
 
-static void* definer_thread(void *data);
-static void* executor_thread(void *data);
+static Uint64
+get_total_transactions()
+{
+  Uint64 total_transactions = 0;
+
+  for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++){
+    total_transactions += ThreadExecutions[i];
+  }
+  return total_transactions;
+}
 
 static void
-wait_for_start_signal(THREAD_DATA *my_thread_data)
+init_list_headers(KEY_LIST_HEADER *list_header,
+                  Uint32 num_list_headers)
+{
+  Uint32 i;
+  KEY_LIST_HEADER *list_header_ref;
+  char *list_header_ptr = (char*)list_header;
+  for (i = 0;
+       i < num_list_headers;
+       i++, list_header_ptr += sizeof(KEY_LIST_HEADER))
+  {
+    list_header_ref = (KEY_LIST_HEADER*)list_header_ptr;
+    list_header_ref->first_in_list = NULL;
+    list_header_ref->last_in_list = NULL;
+    list_header_ref->num_in_list = 0;
+  }
+}
+
+static Uint32
+get_thread_id_safe(THREAD_DATA *my_thread_data)
 {
   NdbMutex_Lock(my_thread_data->transport_mutex);
-  NdbCondition_Wait(my_thread_data->transport_cond,
-                    my_thread_data->transport_mutex);
+  Uint32 thread_id = my_thread_data->thread_id;
   NdbMutex_Unlock(my_thread_data->transport_mutex);
+  return thread_id;
 }
 
 static void
-signal_thread_to_start(THREAD_DATA *thread_data)
+wait_thread_ready(THREAD_DATA *my_thread_data)
 {
-  NdbMutex_Lock(thread_data->transport_mutex);
-  NdbCondition_Signal(thread_data->transport_cond);
-  NdbMutex_Unlock(thread_data->transport_mutex);
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  while (1)
+  {
+    if (my_thread_data->ready)
+      break;
+    NdbCondition_Wait(my_thread_data->main_cond,
+                      my_thread_data->transport_mutex);
+  }
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
 }
 
 static void
-start_definer_thread(THREAD_DATA *thread_data, Uint32 thread_id)
+wait_for_threads_ready(Uint32 num_threads)
 {
-  thread_data->thread_id = thread_id;
+  for (Uint32 i = 0; i < num_threads; i++)
+    wait_thread_ready(&thread_data_array[i]);
+}
+
+static void
+signal_thread_to_start(THREAD_DATA *my_thread_data)
+{
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  my_thread_data->start = true;
+  my_thread_data->ready = false;
+  NdbCondition_Signal(my_thread_data->start_cond);
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_start()
+{
+  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+    signal_thread_to_start(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_start()
+{
+  for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+    signal_thread_to_start(&thread_data_array[tNoOfDefinerThreads + i]);
+}
+
+static void
+signal_thread_ready_wait_for_start(THREAD_DATA *my_thread_data)
+{
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  my_thread_data->ready = true;
+  NdbCondition_Signal(my_thread_data->main_cond);
+  while (1)
+  {
+    if (my_thread_data->start)
+      break;
+    NdbCondition_Wait(my_thread_data->start_cond,
+                      my_thread_data->transport_mutex);
+  }
+  my_thread_data->start = false;
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_thread_to_stop(THREAD_DATA *my_thread_data)
+{
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  my_thread_data->stop = true;
+  NdbCondition_Signal(my_thread_data->transport_cond);
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+signal_definer_threads_to_stop()
+{
+  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+    signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+signal_executor_threads_to_stop()
+{
+  for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
+    signal_thread_to_stop(&thread_data_array[i]);
+}
+
+static void
+destroy_thread_data(THREAD_DATA *my_thread_data)
+{
+  free(my_thread_data->record);
+  NdbMutex_Destroy(my_thread_data->transport_mutex);
+  NdbCondition_Destroy(my_thread_data->transport_cond);
+  NdbCondition_Destroy(my_thread_data->start_cond);
+  NdbCondition_Destroy(my_thread_data->main_cond);
+}
+
+static void
+init_thread_data(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+  Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
+  my_thread_data->record = (char*)malloc(sz);
+  memset(my_thread_data->record, 0, sz);
+  init_list_headers(&my_thread_data->list_header, 1);
+  my_thread_data->stop = false;
+  my_thread_data->ready = false;
+  my_thread_data->start = false;
+  my_thread_data->transport_mutex = NdbMutex_Create();
+  my_thread_data->transport_cond = NdbCondition_Create();
+  my_thread_data->main_cond = NdbCondition_Create();
+  my_thread_data->start_cond = NdbCondition_Create();
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  my_thread_data->thread_id = thread_id;
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+}
+
+static void
+start_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
+{
+  init_thread_data(my_thread_data, thread_id);
+  DEB_PRINT(("Now starting definer thread, id = %u", thread_id));
   threadLife[thread_id] = NdbThread_Create(definer_thread,
-                                           (void**)thread_data,
+                                           (void**)my_thread_data,
                                            1024 * 1024,
                                            "flexAsynchThread",
                                            NDB_THREAD_PRIO_LOW);
 }
 
 static void
-start_executor_thread(THREAD_DATA *thread_data, Uint32 thread_id)
+start_definer_threads()
+{
+  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
+  {
+    Uint32 thread_id = i;
+    start_definer_thread(&thread_data_array[thread_id], thread_id);
+  }
+}
+
+static void
+start_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
 {
-  thread_data->thread_id = thread_id;
+  init_thread_data(my_thread_data, thread_id);
+  DEB_PRINT(("Now starting executor thread, id = %u", thread_id));
   threadLife[thread_id] = NdbThread_Create(executor_thread,
-                                           (void**)thread_data,
+                                           (void**)my_thread_data,
                                            1024 * 1024,
                                            "flexAsynchThread",
                                            NDB_THREAD_PRIO_LOW);
 }
 
-THREAD_DATA thread_data[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
+static void
+start_executor_threads()
+{
+  for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
+  {
+    Uint32 thread_id = tNoOfDefinerThreads + i;
+    start_executor_thread(&thread_data_array[thread_id], thread_id);
+  }
+}
 
 static void
-main_thread(RunType start_type)
+main_thread(RunType start_type, NdbTimer & timer)
 {
   bool insert_delete;
 
+  DEB_PRINT(("Executing type = %u in new manner", (uint)start_type));
   tNoOfExecutorThreads = tNoOfThreads;
   tNoOfDefinerThreads = (tNoOfThreads + 3)/4;
   tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads;
 
-  resetThreads();
+  DEB_PRINT((
+    "num threads = %u, num definer threads = %u, num executor threads = %u",
+      tNoOfThreads, tNoOfDefinerThreads, tNoOfExecutorThreads));
 
   if (start_type == RunInsert ||
       start_type == RunDelete)
@@ -1230,49 +1410,48 @@ main_thread(RunType start_type)
   else
     insert_delete = false;
 
-  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
-  {
-    Uint32 thread_id = i;
-    start_definer_thread(&thread_data[thread_id], thread_id);
-  }
-  for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
-  {
-    Uint32 thread_id = tNoOfDefinerThreads + i;
-    start_executor_thread(&thread_data[thread_id], thread_id);
-  }
-  waitForThreads(); /* Wait for threads to complete startup */
+  start_definer_threads();
+  start_executor_threads();
+
+  wait_for_threads_ready(tNoOfThreads);
+
+  DEB_PRINT(("main: Definer and execution threads have completed startup"));
 
   /**
    * Start threads, start with execution threads to ensure they are
    * up and running before definer threads starts sending data to
    * them
    */
-  for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
-    signal_thread_to_start(&thread_data[tNoOfDefinerThreads + i]);
-  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
-    signal_thread_to_start(&thread_data[i]);
+  START_TIMER;
+  signal_definer_threads_to_start();
+  signal_executor_threads_to_start();
 
-  if (insert_delete)
-  {
-    waitForThreads();
-  }
-  else
+  if (!insert_delete)
   {
     sleep(tWarmupTime);
     tRunState = EXECUTING;
     sleep(tExecutionTime);
     tRunState = COOLDOWN;
     sleep(tCooldownTime);
-    tStop = true;
-    waitForThreads();
+    signal_definer_threads_to_stop();
   }
+  wait_for_threads_ready(tNoOfDefinerThreads);
+  STOP_TIMER;
+  DEB_PRINT(("main: Definer threads are done"));
+  signal_executor_threads_to_stop();
+  wait_for_threads_ready(tNoOfThreads);
+  DEB_PRINT(("main: Definer and Executor threads are done"));
+
+  signal_definer_threads_to_start();
+  signal_executor_threads_to_start();
 
   void * tmp;
-  for(Uint32 i = 0; i < (tNoOfDefinerThreads + tNoOfExecutorThreads); i++)
+  for(Uint32 i = 0; i < tNoOfThreads; i++)
   {
     NdbThread_WaitFor(threadLife[i], &tmp);
     NdbThread_Destroy(&threadLife[i]);
   }
+  DEB_PRINT(("main: Definer and Executor threads are gone"));
 }
 
 static NdbConnection*
@@ -1336,43 +1515,6 @@ get_first_free(KEY_LIST_HEADER *list_hea
   return key_op;
 }
 
-static void
-init_list_headers(KEY_LIST_HEADER *list_header,
-                  Uint32 num_list_headers)
-{
-  Uint32 i;
-  KEY_LIST_HEADER *list_header_ref;
-  char *list_header_ptr = (char*)list_header;
-  for (i = 0;
-       i < num_list_headers;
-       list_header_ptr += sizeof(KEY_LIST_HEADER))
-  {
-    list_header_ref = (KEY_LIST_HEADER*)list_header_ptr;
-    list_header_ref->first_in_list = NULL;
-    list_header_ref->last_in_list = NULL;
-    list_header_ref->num_in_list = 0;
-  }
-}
-
-static void
-init_thread_data(THREAD_DATA *thread_data)
-{
-  Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
-  thread_data->record = (char*)malloc(sz);
-  memset(&thread_data->record, 0, sz);
-  init_list_headers(&thread_data->list_header, 1);
-  thread_data->transport_mutex = NdbMutex_Create();
-  thread_data->transport_cond = NdbCondition_Create();
-}
-
-static void
-destroy_thread_data(THREAD_DATA *thread_data)
-{
-  free(thread_data->record);
-  NdbMutex_Destroy(thread_data->transport_mutex);
-  NdbCondition_Destroy(thread_data->transport_cond);
-}
-
 /**
  * Retrieve a linked list of prepared operations. If no operations
  * prepared we wait on a condition until operations are defined for
@@ -1386,7 +1528,7 @@ receive_operations(THREAD_DATA *my_threa
   KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
 
   NdbMutex_Lock(my_thread_data->transport_mutex);
-  while (!first_in_list)
+  while (!first_in_list && !my_thread_data->stop)
   {
     first_in_list = thread_list_header->first_in_list;
     list_header->first_in_list = thread_list_header->first_in_list;
@@ -1428,8 +1570,10 @@ static void
 send_operations(Uint32 thread_id,
                 KEY_LIST_HEADER *list_header)
 {
-  THREAD_DATA *recv_thread = &thread_data[thread_id];
+  THREAD_DATA *recv_thread = &thread_data_array[thread_id];
 
+  DEB_PRINT(("Sending %u operations to thread id %u",
+            list_header->num_in_list, thread_id));
   NdbMutex_Lock(recv_thread->transport_mutex);
   if (!recv_thread->list_header.first_in_list &&
       list_header->first_in_list)
@@ -1475,6 +1619,7 @@ get_thread_id_for_record(Uint32 record_i
                          Uint32 node_count,
                          Uint32 thread_count,
                          Uint32 thread_group,
+                         Uint32 num_thread_groups,
                          Ndb *my_ndb)
 {
   Uint32 thread_id;
@@ -1482,11 +1627,17 @@ get_thread_id_for_record(Uint32 record_i
   Uint32 node_id = trans->getConnectedNodeId();
   trans->close();
   Uint32 node_rel_id = get_node_relative_id((Uint32)0, node_id);
+  if (node_count >= thread_count)
+  {
+    thread_id = node_rel_id % thread_count;
+    return thread_id;
+  }
+  
 recalculate:
   thread_id = thread_group * node_count + node_rel_id;
-  if (thread_id > thread_count)
+  if (thread_id >= thread_count)
   {
-    thread_group = rand() % thread_group;
+    thread_group = rand() % (num_thread_groups - 1);
     goto recalculate;
   }
   return thread_id;
@@ -1499,7 +1650,7 @@ void init_thread_id_mem(char *thread_id_
 {
   Uint32 node_count = get_node_count((Uint32)0);
   Uint32 thread_count = tNoOfExecutorThreads;
-  Uint32 thread_groups = (thread_count + node_count - 1) / node_count;
+  Uint32 num_thread_groups = (thread_count + node_count - 1) / node_count;
   Uint32 thread_group = 0;
   for (Uint32 record_id = first_record, i = 0;
        i < total_records;
@@ -1509,9 +1660,10 @@ void init_thread_id_mem(char *thread_id_
                                                       node_count,
                                                       thread_count,
                                                       thread_group,
+                                                      num_thread_groups,
                                                       my_ndb);
     thread_group++;
-    if (thread_group == thread_groups)
+    if (thread_group == num_thread_groups)
       thread_group = 0;
   }
 }
@@ -1519,7 +1671,7 @@ void init_thread_id_mem(char *thread_id_
 static bool
 check_for_outstanding(Uint32 *thread_state)
 {
-  for (Uint32 i = 0; i < MAX_EXECUTOR_THREADS; i++)
+  for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
   {
     if (thread_state[i])
       return true;
@@ -1548,9 +1700,11 @@ wait_until_all_completed(THREAD_DATA *my
 {
   KEY_LIST_HEADER list_header;
   bool outstanding = true;
-  while (outstanding)
+  while (outstanding && !my_thread_data->stop)
   {
     receive_operations(my_thread_data, &list_header);
+    DEB_PRINT(("received %u operations in thread id = %u",
+              list_header.num_in_list, my_thread_data->thread_id));
     update_thread_state(&list_header, thread_state);
     move_list(&list_header, free_list_header);
     outstanding = check_for_outstanding(thread_state);
@@ -1559,31 +1713,31 @@ wait_until_all_completed(THREAD_DATA *my
 
 static Uint32
 prepare_operations(char *thread_id_mem,
-                   KEY_LIST_HEADER *list_header,
+                   KEY_LIST_HEADER *free_list_header,
                    Uint32 *thread_state,
                    Uint32 first_record_to_define,
                    Uint32 num_records_to_define,
+                   Uint32 first_record,
                    Uint32 last_record,
                    Uint32 max_per_thread)
 {
   KEY_LIST_HEADER thread_list_headers[MAX_EXECUTOR_THREADS];
   Uint32 record_id, i, num_records;
 
-  init_list_headers(&thread_list_headers[0],
-                    MAX_EXECUTOR_THREADS);
+  init_list_headers(&thread_list_headers[0], tNoOfExecutorThreads);
   for (record_id = first_record_to_define, i = 0;
-       record_id < last_record && i < num_records_to_define;
+       record_id <= last_record && i < num_records_to_define;
        record_id++, i++)
   {
-    KEY_OPERATION *define_op = get_first_free(list_header);
-    Uint32 thread_id = (Uint32)thread_id_mem[record_id];
+    KEY_OPERATION *define_op = get_first_free(free_list_header);
+    Uint32 thread_id = (Uint32)thread_id_mem[record_id - first_record];
     define_op->first_key = record_id;
     define_op->second_key = record_id;
     define_op->executor_thread_id = thread_id;
     thread_state[thread_id]++;
-    KEY_LIST_HEADER *list_header = &thread_list_headers[thread_id];
-    insert_list(list_header, define_op);
-    if (list_header->num_in_list >= max_per_thread)
+    KEY_LIST_HEADER *thread_list_header = &thread_list_headers[thread_id];
+    insert_list(thread_list_header, define_op);
+    if (thread_list_header->num_in_list >= max_per_thread)
     {
       /**
        * One thread has max number of records, we won't define any
@@ -1593,12 +1747,12 @@ prepare_operations(char *thread_id_mem,
     }
   }
   num_records = i;
-  for (i = 0; i < MAX_EXECUTOR_THREADS; i++)
+  for (i = 0; i < tNoOfExecutorThreads; i++)
   {
-    KEY_LIST_HEADER *list_header = &thread_list_headers[i];
-    if (list_header->num_in_list)
+    KEY_LIST_HEADER *thread_list_header = &thread_list_headers[i];
+    if (thread_list_header->num_in_list)
     {
-      send_operations(tNoOfDefinerThreads + i, list_header);
+      send_operations(tNoOfDefinerThreads + i, thread_list_header);
     }
   }
   return num_records;
@@ -1608,12 +1762,14 @@ static void*
 definer_thread(void *data)
 {
   THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
-  Uint32 my_thread_id = my_thread_data->thread_id;
+  Uint32 my_thread_id = get_thread_id_safe(my_thread_data);
+  DEB_PRINT(("definer thread: my_thread_data = 0x%llx, my_thread_id = %u",
+            (Uint64)my_thread_data, my_thread_id));
   RunType  run_type = tRunType;
   Uint32 thread_state[MAX_EXECUTOR_THREADS];
-  Uint32 max_outstanding = (tNoOfThreads * tNoOfParallelTrans) /
+  Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) /
                             tNoOfDefinerThreads;
-  Uint32 max_per_thread = 1024 / tNoOfDefinerThreads;
+  Uint32 max_per_thread = 1000 / tNoOfDefinerThreads;
   Uint32 total_records = max_outstanding * tNoOfTransactions;
   Uint32 first_record = total_records * my_thread_id;
   Uint32 my_last_record = first_record + total_records - 1;
@@ -1621,34 +1777,41 @@ definer_thread(void *data)
   KEY_LIST_HEADER free_list_header;
   void *key_op_mem = malloc(sizeof(KEY_OPERATION) * max_outstanding);
   char *thread_id_mem = (char*)malloc(total_records);
-  Ndb *my_ndb = get_ndb_object(my_thread_id);
+  memset((char*)&thread_state[0], 0, sizeof(thread_state));
 
-  init_list_headers(&my_thread_data->list_header, 1);
   init_key_op_list((char*)key_op_mem,
                    &free_list_header,
                    max_outstanding,
                    my_thread_id,
                    run_type);
+  Ndb *my_ndb = get_ndb_object(my_thread_id);
   init_thread_id_mem(thread_id_mem,
                      first_record,
                      total_records,
                      my_ndb);
   delete my_ndb;
-  init_thread_data(my_thread_data);
   ThreadExecutions[my_thread_id] = 0;
-  ThreadReady[my_thread_id] = 1; /* Flag we've completed startup */
-  wait_for_start_signal(my_thread_data);
+  DEB_PRINT(("definer thread %u have completed startup", my_thread_id));
+  signal_thread_ready_wait_for_start(my_thread_data);
+  DEB_PRINT(("definer thread %u starts executing", my_thread_id));
 
-  while (!tStop)
+  while (!my_thread_data->stop)
   {
     Uint32 defined_ops = prepare_operations(thread_id_mem,
                                             &free_list_header,
                                             &thread_state[0],
                                             current_record,
                                             max_outstanding,
+                                            first_record,
                                             my_last_record,
                                             max_per_thread);
     current_record += defined_ops;
+    if (defined_ops)
+    {
+      wait_until_all_completed(my_thread_data,
+                               &thread_state[0],
+                               &free_list_header);
+    }
     if (current_record > my_last_record)
     {
       if (run_type != RunRead &&
@@ -1663,14 +1826,12 @@ definer_thread(void *data)
       }
       current_record = first_record;
     }
-    wait_until_all_completed(my_thread_data,
-                             &thread_state[0],
-                             &free_list_header);
   }
+  DEB_PRINT(("definer thread %u is done", my_thread_id));
+  signal_thread_ready_wait_for_start(my_thread_data);
   free(key_op_mem);
   free(thread_id_mem);
   destroy_thread_data(my_thread_data);
-  ThreadReady[my_thread_id] = 1;
   return NULL;
 }
 
@@ -1743,7 +1904,7 @@ report_back_operations(KEY_OPERATION *fi
   KEY_LIST_HEADER thread_list_header[MAX_DEFINER_THREADS];
   KEY_OPERATION *next_op, *executed_op;
 
-  init_list_headers(&thread_list_header[0], MAX_DEFINER_THREADS);
+  init_list_headers(&thread_list_header[0], tNoOfDefinerThreads);
   executed_op = first_defined_op;
   while (executed_op)
   {
@@ -1752,7 +1913,7 @@ report_back_operations(KEY_OPERATION *fi
                 executed_op);
     executed_op = next_op;
   }
-  for (Uint32 i = 0; i < MAX_DEFINER_THREADS; i++)
+  for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
   {
     if (thread_list_header[i].first_in_list)
     {
@@ -1771,34 +1932,66 @@ static void*
 executor_thread(void *data)
 {
   THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
-  Uint32 my_thread_id = my_thread_data->thread_id;
+  Uint32 my_thread_id = get_thread_id_safe(my_thread_data);
   Uint64 exec_count = 0;
+  Uint32 error_count = 0;
+  Uint32 error_flag = false;
   int ret_code;
   KEY_LIST_HEADER list_header;
-  Ndb *my_ndb = get_ndb_object(my_thread_id);
 
-  init_thread_data(my_thread_data);
+  DEB_PRINT(("executor thread: my_thread_data = 0x%llx, thread_id = %u",
+             (Uint64)my_thread_data, my_thread_id));
+  Ndb *my_ndb = get_ndb_object(my_thread_id);
   ThreadExecutions[my_thread_id] = 0;
-  ThreadReady[my_thread_id] = 1; /* Flag we've completed startup */
-  wait_for_start_signal(my_thread_data);
+  DEB_PRINT(("executor thread id = %u have completed startup",
+            my_thread_id));
+
+  signal_thread_ready_wait_for_start(my_thread_data);
 
-  while (!tStop)
+  DEB_PRINT(("executor thread id = %u are starting to execute",
+            my_thread_id));
+
+  while (!my_thread_data->stop)
   {
     receive_operations(my_thread_data, &list_header);
-    ret_code = execute_operations(my_thread_data->record,
-                                  my_ndb,
-                                  list_header.first_in_list);
-    if (ret_code < 0)
+    if (list_header.num_in_list == 0)
+    {
+      DEB_PRINT(("Stop executor thread has been requested by wakeup"));
       break;
+    }
+    ret_code = 0;
+    if (!error_flag)
+    {
+      /* Ignore to execute after errors to simplify error handling */
+      ret_code = execute_operations(my_thread_data->record,
+                                    my_ndb,
+                                    list_header.first_in_list);
+    }
     report_back_operations(list_header.first_in_list);
-    if (tRunState == EXECUTING)
+    if (ret_code < 0)
+    {
+      ndbout_c("executor thread id = %u received error", my_thread_id);
+      error_flag = true;
+    }
+    else if (!error_flag &&
+             (tRunType == RunInsert ||
+              tRunType == RunDelete ||
+              tRunState == EXECUTING))
+    {
       exec_count += (Uint64)ret_code;
+    }
   }
+  DEB_PRINT(("executor thread id = %u are done executing, executed %llu rows",
+            my_thread_id, exec_count));
   ThreadExecutions[my_thread_id] = exec_count;
-  free(my_thread_data->record);
+  if (error_count)
+  {
+    ndbout_c("Received %u errors in executor thread, id = %u",
+             error_count, my_thread_id);
+  }
+  signal_thread_ready_wait_for_start(my_thread_data);
   delete my_ndb;
   destroy_thread_data(my_thread_data);
-  ThreadReady[my_thread_id] = 1;
   return NULL;
 }
 
@@ -1942,6 +2135,7 @@ readArguments(int argc, const char** arg
       argc++;
     } else if (strcmp(argv[i], "-new") == 0){
       tNew = true;
+      tNdbRecord = true;
       argc++;
       i--;
     } else if (strcmp(argv[i], "-drop_table") == 0){

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3758 to 3792) Mikael Ronstrom5 Mar