List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:March 5 2012 2:10pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3792 to 3801)
View as plain text  
 3801 Mikael Ronstrom	2012-03-05
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3800 Mikael Ronstrom	2012-03-05
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3799 Mikael Ronstrom	2012-03-05
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3798 Mikael Ronstrom	2012-03-05
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3797 Mikael Ronstrom	2012-03-05
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3796 Mikael Ronstrom	2012-03-05
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3795 Mikael Ronstrom	2012-03-05
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3794 Mikael Ronstrom	2012-03-05
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3793 Mikael Ronstrom	2012-03-05
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3792 Mikael Ronstrom	2012-03-02
      Fixes for flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp	revid:mikael.ronstrom@stripped
@@ -148,6 +148,7 @@ static bool                     tempTabl
 static bool                     startTransGuess = true;
 static int                      tExtraReadLoop = 0;
 static bool                     tNew = false;
+static bool                     tImmediate = false;
 
 //Program Flags
 static int                              theTestFlag = 0;
@@ -720,24 +721,24 @@ executeCallback(int result, NdbConnectio
   NdbConnection **array_ref = (NdbConnection**)aObject;
   assert(NdbObject == *array_ref);
   *array_ref = NULL;
-  if (result == -1) {
+  if (result == -1 && failed < 100) {
 
-              // Add complete error handling here
+    // Add complete error handling here
 
-              int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
-              if (retCode == 1) {
-		if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
-                ndbout_c("execute: %s", NdbObject->getNdbError().message);
-                ndbout_c("Error code = %d", NdbObject->getNdbError().code);}
-              } else if (retCode == 2) {
-                ndbout << "4115 should not happen in flexAsynch" << endl;
-              } else if (retCode == 3) {
-		/* What can we do here? */
-                ndbout_c("execute: %s", NdbObject->getNdbError().message);
-                 }//if(retCode == 3)
-
-	      //    ndbout << "Error occured in poll:" << endl;
-	      //    ndbout << NdbObject->getNdbError() << endl;
+    int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
+    if (retCode == 1) {
+      if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
+        ndbout_c("execute: %s", NdbObject->getNdbError().message);
+        ndbout_c("Error code = %d", NdbObject->getNdbError().code);
+      }
+    } else if (retCode == 2) {
+      ndbout << "4115 should not happen in flexAsynch" << endl;
+    } else if (retCode == 3) {
+      /* What can we do here? */
+      ndbout_c("execute: %s", NdbObject->getNdbError().message);
+    }//if(retCode == 3)
+    //    ndbout << "Error occured in poll:" << endl;
+    //    ndbout << NdbObject->getNdbError() << endl;
     failed++ ;
   }//if
   NdbObject->close(); /* Close transaction */
@@ -1146,8 +1147,8 @@ setAggregateRun(void)
 static void* definer_thread(void *data);
 static void* executor_thread(void *data);
 
-static Uint32 tNoOfExecutorThreads;
-static Uint32 tNoOfDefinerThreads;
+static Uint32 tNoOfExecutorThreads = 0;
+static Uint32 tNoOfDefinerThreads = 0;
 
 enum RunState
 {
@@ -1182,15 +1183,17 @@ typedef struct thread_data_struct THREAD
 struct thread_data_struct
 {
   KEY_LIST_HEADER list_header;
-  char *record;
   Uint32 thread_id;
   bool ready;
   bool stop;
   bool start;
+
+  char *record;
   NdbMutex *transport_mutex;
   struct NdbCondition *transport_cond;
   struct NdbCondition *main_cond;
   struct NdbCondition *start_cond;
+  char not_used[52];
 };
 THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
 
@@ -1350,7 +1353,6 @@ static void
 start_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
 {
   init_thread_data(my_thread_data, thread_id);
-  DEB_PRINT(("Now starting definer thread, id = %u", thread_id));
   threadLife[thread_id] = NdbThread_Create(definer_thread,
                                            (void**)my_thread_data,
                                            1024 * 1024,
@@ -1372,7 +1374,6 @@ static void
 start_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
 {
   init_thread_data(my_thread_data, thread_id);
-  DEB_PRINT(("Now starting executor thread, id = %u", thread_id));
   threadLife[thread_id] = NdbThread_Create(executor_thread,
                                            (void**)my_thread_data,
                                            1024 * 1024,
@@ -1395,15 +1396,13 @@ main_thread(RunType start_type, NdbTimer
 {
   bool insert_delete;
 
-  DEB_PRINT(("Executing type = %u in new manner", (uint)start_type));
   tNoOfExecutorThreads = tNoOfThreads;
-  tNoOfDefinerThreads = (tNoOfThreads + 3)/4;
+  if (tNoOfDefinerThreads == 0)
+  {
+    tNoOfDefinerThreads = (tNoOfThreads + 3)/4;
+  }
   tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads;
 
-  DEB_PRINT((
-    "num threads = %u, num definer threads = %u, num executor threads = %u",
-      tNoOfThreads, tNoOfDefinerThreads, tNoOfExecutorThreads));
-
   if (start_type == RunInsert ||
       start_type == RunDelete)
     insert_delete = true;
@@ -1451,7 +1450,6 @@ main_thread(RunType start_type, NdbTimer
     NdbThread_WaitFor(threadLife[i], &tmp);
     NdbThread_Destroy(&threadLife[i]);
   }
-  DEB_PRINT(("main: Definer and Executor threads are gone"));
 }
 
 static NdbConnection*
@@ -1515,36 +1513,6 @@ get_first_free(KEY_LIST_HEADER *list_hea
   return key_op;
 }
 
-/**
- * Retrieve a linked list of prepared operations. If no operations
- * prepared we wait on a condition until operations are defined for
- * us to execute.
- */
-static void
-receive_operations(THREAD_DATA *my_thread_data,
-                   KEY_LIST_HEADER *list_header)
-{
-  KEY_OPERATION *first_in_list = NULL;
-  KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
-
-  NdbMutex_Lock(my_thread_data->transport_mutex);
-  while (!first_in_list && !my_thread_data->stop)
-  {
-    first_in_list = thread_list_header->first_in_list;
-    list_header->first_in_list = thread_list_header->first_in_list;
-    list_header->last_in_list = thread_list_header->last_in_list;
-    list_header->num_in_list = thread_list_header->num_in_list;
-    thread_list_header->first_in_list = NULL;
-    thread_list_header->last_in_list = NULL;
-    thread_list_header->num_in_list = 0;
-    if (first_in_list)
-      break;
-    NdbCondition_Wait(my_thread_data->transport_cond,
-                      my_thread_data->transport_mutex);
-  }
-  NdbMutex_Unlock(my_thread_data->transport_mutex);
-}
-
 static void
 move_list(KEY_LIST_HEADER *src_list_header,
           KEY_LIST_HEADER *dst_list_header)
@@ -1564,6 +1532,53 @@ move_list(KEY_LIST_HEADER *src_list_head
   dst_list_header->last_in_list = src_list_header->last_in_list;
   dst_list_header->num_in_list += src_list_header->num_in_list;
   src_list_header->num_in_list = 0;
+  src_list_header->first_in_list = NULL;
+  src_list_header->last_in_list = NULL;
+}
+
+/**
+ * Retrieve a linked list of prepared operations. If no operations
+ * prepared we wait on a condition until operations are defined for
+ * us to execute.
+ */
+static void
+receive_operations(THREAD_DATA *my_thread_data,
+                   KEY_LIST_HEADER *list_header,
+                   bool wait)
+{
+  bool first = true;
+  KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
+  list_header->first_in_list = NULL;
+  list_header->last_in_list = NULL;
+  list_header->num_in_list = 0;
+
+recheck:
+  NdbMutex_Lock(my_thread_data->transport_mutex);
+  while (!my_thread_data->stop &&
+         (first || thread_list_header->first_in_list))
+  {
+    move_list(thread_list_header, list_header);
+    if (list_header->first_in_list)
+      break;
+    NdbCondition_Wait(my_thread_data->transport_cond,
+                      my_thread_data->transport_mutex);
+    first = false;
+  }
+  NdbMutex_Unlock(my_thread_data->transport_mutex);
+  if (first && wait &&
+      thread_list_header->num_in_list < ((tNoOfParallelTrans + 1) / 2))
+  {
+    /**
+     * We will wait for at least 20 microseconds if we haven't yet received
+     * at least half of the number of records we desire to execute.
+     */
+    struct timespec timer;
+    timer.tv_sec = 0;
+    timer.tv_nsec = 20000;
+    first = false;
+    nanosleep(&timer, NULL);
+    goto recheck;
+  }
 }
 
 static void
@@ -1572,20 +1587,14 @@ send_operations(Uint32 thread_id,
 {
   THREAD_DATA *recv_thread = &thread_data_array[thread_id];
 
-  DEB_PRINT(("Sending %u operations to thread id %u",
-            list_header->num_in_list, thread_id));
   NdbMutex_Lock(recv_thread->transport_mutex);
-  if (!recv_thread->list_header.first_in_list &&
-      list_header->first_in_list)
-  {
-    /**
-     * We are moving operations into an empty list, thus we need
-     * to wake any threads waiting for operations to execute.
-     */
-    NdbCondition_Signal(recv_thread->transport_cond);
-  }
+  /**
+   * We are moving operations into the list, thus we need
+   * to wake any threads waiting for operations to execute.
+   */
   move_list(list_header,
             &recv_thread->list_header);
+  NdbCondition_Signal(recv_thread->transport_cond);
   NdbMutex_Unlock(recv_thread->transport_mutex);
 }
 
@@ -1702,9 +1711,7 @@ wait_until_all_completed(THREAD_DATA *my
   bool outstanding = true;
   while (outstanding && !my_thread_data->stop)
   {
-    receive_operations(my_thread_data, &list_header);
-    DEB_PRINT(("received %u operations in thread id = %u",
-              list_header.num_in_list, my_thread_data->thread_id));
+    receive_operations(my_thread_data, &list_header, false);
     update_thread_state(&list_header, thread_state);
     move_list(&list_header, free_list_header);
     outstanding = check_for_outstanding(thread_state);
@@ -1763,8 +1770,6 @@ definer_thread(void *data)
 {
   THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
   Uint32 my_thread_id = get_thread_id_safe(my_thread_data);
-  DEB_PRINT(("definer thread: my_thread_data = 0x%llx, my_thread_id = %u",
-            (Uint64)my_thread_data, my_thread_id));
   RunType  run_type = tRunType;
   Uint32 thread_state[MAX_EXECUTOR_THREADS];
   Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) /
@@ -1791,9 +1796,7 @@ definer_thread(void *data)
                      my_ndb);
   delete my_ndb;
   ThreadExecutions[my_thread_id] = 0;
-  DEB_PRINT(("definer thread %u have completed startup", my_thread_id));
   signal_thread_ready_wait_for_start(my_thread_data);
-  DEB_PRINT(("definer thread %u starts executing", my_thread_id));
 
   while (!my_thread_data->stop)
   {
@@ -1827,7 +1830,6 @@ definer_thread(void *data)
       current_record = first_record;
     }
   }
-  DEB_PRINT(("definer thread %u is done", my_thread_id));
   signal_thread_ready_wait_for_start(my_thread_data);
   free(key_op_mem);
   free(thread_id_mem);
@@ -1935,28 +1937,21 @@ executor_thread(void *data)
   Uint32 my_thread_id = get_thread_id_safe(my_thread_data);
   Uint64 exec_count = 0;
   Uint32 error_count = 0;
+  Uint32 executions = 0;
   Uint32 error_flag = false;
   int ret_code;
   KEY_LIST_HEADER list_header;
 
-  DEB_PRINT(("executor thread: my_thread_data = 0x%llx, thread_id = %u",
-             (Uint64)my_thread_data, my_thread_id));
   Ndb *my_ndb = get_ndb_object(my_thread_id);
   ThreadExecutions[my_thread_id] = 0;
-  DEB_PRINT(("executor thread id = %u have completed startup",
-            my_thread_id));
 
   signal_thread_ready_wait_for_start(my_thread_data);
 
-  DEB_PRINT(("executor thread id = %u are starting to execute",
-            my_thread_id));
-
   while (!my_thread_data->stop)
   {
-    receive_operations(my_thread_data, &list_header);
+    receive_operations(my_thread_data, &list_header, !tImmediate);
     if (list_header.num_in_list == 0)
     {
-      DEB_PRINT(("Stop executor thread has been requested by wakeup"));
       break;
     }
     ret_code = 0;
@@ -1978,11 +1973,12 @@ executor_thread(void *data)
               tRunType == RunDelete ||
               tRunState == EXECUTING))
     {
+      executions++;
       exec_count += (Uint64)ret_code;
     }
   }
-  DEB_PRINT(("executor thread id = %u are done executing, executed %llu rows",
-            my_thread_id, exec_count));
+  DEB_PRINT(("executor thread id = %u are done executing, executed %llu rows in %u executes",
+            my_thread_id, exec_count, executions));
   ThreadExecutions[my_thread_id] = exec_count;
   if (error_count)
   {
@@ -2009,6 +2005,12 @@ readArguments(int argc, const char** arg
 	ndbout_c("Invalid no of threads");
         return -1;
       }
+    } else if (strcmp(argv[i], "-d") == 0){
+      tNoOfDefinerThreads = atoi(argv[i+1]);
+      if (tNoOfDefinerThreads > NDB_MAXTHREADS){
+	ndbout_c("Invalid no of definer threads");
+        return -1;
+      }
     } else if (strcmp(argv[i], "-p") == 0){
       tNoOfParallelTrans = atoi(argv[i+1]);
       if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
@@ -2138,6 +2140,10 @@ readArguments(int argc, const char** arg
       tNdbRecord = true;
       argc++;
       i--;
+    } else if (strcmp(argv[i], "-immediate") == 0){
+      tImmediate = true;
+      argc++;
+      i--;
     } else if (strcmp(argv[i], "-drop_table") == 0){
       tRunType = RunDropTable;
       argc++;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3792 to 3801) Mikael Ronstrom5 Mar