From: Mikael Ronstrom Date: March 5 2012 2:10pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3792 to 3801) List-Archive: http://lists.mysql.com/commits/143091 Message-Id: <201203051411.q25EB3wJ031560@dator6> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3801 Mikael Ronstrom 2012-03-05 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp 3800 Mikael Ronstrom 2012-03-05 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp 3799 Mikael Ronstrom 2012-03-05 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp 3798 Mikael Ronstrom 2012-03-05 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp 3797 Mikael Ronstrom 2012-03-05 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp 3796 Mikael Ronstrom 2012-03-05 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp 3795 Mikael Ronstrom 2012-03-05 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp 3794 Mikael Ronstrom 2012-03-05 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp 3793 Mikael Ronstrom 2012-03-05 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp 3792 Mikael Ronstrom 2012-03-02 Fixes for flexAsynch modified: storage/ndb/test/ndbapi/flexAsynch.cpp === modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp' --- a/storage/ndb/test/ndbapi/flexAsynch.cpp revid:mikael.ronstrom@stripped +++ b/storage/ndb/test/ndbapi/flexAsynch.cpp revid:mikael.ronstrom@stripped @@ -148,6 +148,7 @@ static bool tempTabl static bool startTransGuess = true; static int tExtraReadLoop = 0; static bool tNew = false; +static bool tImmediate = false; //Program Flags static int theTestFlag = 0; @@ -720,24 +721,24 @@ executeCallback(int result, NdbConnectio NdbConnection **array_ref = (NdbConnection**)aObject; assert(NdbObject == *array_ref); *array_ref = NULL; - if (result == -1) { + if (result == -1 && failed < 100) { - // Add complete error handling here + // Add complete error handling here - int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError()); - if (retCode == 1) { - if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){ - ndbout_c("execute: %s", NdbObject->getNdbError().message); - ndbout_c("Error code = %d", NdbObject->getNdbError().code);} - } else if (retCode == 2) { - ndbout << "4115 should not happen in flexAsynch" << endl; - } else if (retCode == 3) { - /* What can we do here? */ - ndbout_c("execute: %s", NdbObject->getNdbError().message); - }//if(retCode == 3) - - // ndbout << "Error occured in poll:" << endl; - // ndbout << NdbObject->getNdbError() << endl; + int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError()); + if (retCode == 1) { + if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){ + ndbout_c("execute: %s", NdbObject->getNdbError().message); + ndbout_c("Error code = %d", NdbObject->getNdbError().code); + } + } else if (retCode == 2) { + ndbout << "4115 should not happen in flexAsynch" << endl; + } else if (retCode == 3) { + /* What can we do here? */ + ndbout_c("execute: %s", NdbObject->getNdbError().message); + }//if(retCode == 3) + // ndbout << "Error occured in poll:" << endl; + // ndbout << NdbObject->getNdbError() << endl; failed++ ; }//if NdbObject->close(); /* Close transaction */ @@ -1146,8 +1147,8 @@ setAggregateRun(void) static void* definer_thread(void *data); static void* executor_thread(void *data); -static Uint32 tNoOfExecutorThreads; -static Uint32 tNoOfDefinerThreads; +static Uint32 tNoOfExecutorThreads = 0; +static Uint32 tNoOfDefinerThreads = 0; enum RunState { @@ -1182,15 +1183,17 @@ typedef struct thread_data_struct THREAD struct thread_data_struct { KEY_LIST_HEADER list_header; - char *record; Uint32 thread_id; bool ready; bool stop; bool start; + + char *record; NdbMutex *transport_mutex; struct NdbCondition *transport_cond; struct NdbCondition *main_cond; struct NdbCondition *start_cond; + char not_used[52]; }; THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS]; @@ -1350,7 +1353,6 @@ static void start_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id) { init_thread_data(my_thread_data, thread_id); - DEB_PRINT(("Now starting definer thread, id = %u", thread_id)); threadLife[thread_id] = NdbThread_Create(definer_thread, (void**)my_thread_data, 1024 * 1024, @@ -1372,7 +1374,6 @@ static void start_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id) { init_thread_data(my_thread_data, thread_id); - DEB_PRINT(("Now starting executor thread, id = %u", thread_id)); threadLife[thread_id] = NdbThread_Create(executor_thread, (void**)my_thread_data, 1024 * 1024, @@ -1395,15 +1396,13 @@ main_thread(RunType start_type, NdbTimer { bool insert_delete; - DEB_PRINT(("Executing type = %u in new manner", (uint)start_type)); tNoOfExecutorThreads = tNoOfThreads; - tNoOfDefinerThreads = (tNoOfThreads + 3)/4; + if (tNoOfDefinerThreads == 0) + { + tNoOfDefinerThreads = (tNoOfThreads + 3)/4; + } tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads; - DEB_PRINT(( - "num threads = %u, num definer threads = %u, num executor threads = %u", - tNoOfThreads, tNoOfDefinerThreads, tNoOfExecutorThreads)); - if (start_type == RunInsert || start_type == RunDelete) insert_delete = true; @@ -1451,7 +1450,6 @@ main_thread(RunType start_type, NdbTimer NdbThread_WaitFor(threadLife[i], &tmp); NdbThread_Destroy(&threadLife[i]); } - DEB_PRINT(("main: Definer and Executor threads are gone")); } static NdbConnection* @@ -1515,36 +1513,6 @@ get_first_free(KEY_LIST_HEADER *list_hea return key_op; } -/** - * Retrieve a linked list of prepared operations. If no operations - * prepared we wait on a condition until operations are defined for - * us to execute. - */ -static void -receive_operations(THREAD_DATA *my_thread_data, - KEY_LIST_HEADER *list_header) -{ - KEY_OPERATION *first_in_list = NULL; - KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header; - - NdbMutex_Lock(my_thread_data->transport_mutex); - while (!first_in_list && !my_thread_data->stop) - { - first_in_list = thread_list_header->first_in_list; - list_header->first_in_list = thread_list_header->first_in_list; - list_header->last_in_list = thread_list_header->last_in_list; - list_header->num_in_list = thread_list_header->num_in_list; - thread_list_header->first_in_list = NULL; - thread_list_header->last_in_list = NULL; - thread_list_header->num_in_list = 0; - if (first_in_list) - break; - NdbCondition_Wait(my_thread_data->transport_cond, - my_thread_data->transport_mutex); - } - NdbMutex_Unlock(my_thread_data->transport_mutex); -} - static void move_list(KEY_LIST_HEADER *src_list_header, KEY_LIST_HEADER *dst_list_header) @@ -1564,6 +1532,53 @@ move_list(KEY_LIST_HEADER *src_list_head dst_list_header->last_in_list = src_list_header->last_in_list; dst_list_header->num_in_list += src_list_header->num_in_list; src_list_header->num_in_list = 0; + src_list_header->first_in_list = NULL; + src_list_header->last_in_list = NULL; +} + +/** + * Retrieve a linked list of prepared operations. If no operations + * prepared we wait on a condition until operations are defined for + * us to execute. + */ +static void +receive_operations(THREAD_DATA *my_thread_data, + KEY_LIST_HEADER *list_header, + bool wait) +{ + bool first = true; + KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header; + list_header->first_in_list = NULL; + list_header->last_in_list = NULL; + list_header->num_in_list = 0; + +recheck: + NdbMutex_Lock(my_thread_data->transport_mutex); + while (!my_thread_data->stop && + (first || thread_list_header->first_in_list)) + { + move_list(thread_list_header, list_header); + if (list_header->first_in_list) + break; + NdbCondition_Wait(my_thread_data->transport_cond, + my_thread_data->transport_mutex); + first = false; + } + NdbMutex_Unlock(my_thread_data->transport_mutex); + if (first && wait && + thread_list_header->num_in_list < ((tNoOfParallelTrans + 1) / 2)) + { + /** + * We will wait for at least 20 microseconds if we haven't yet received + * at least half of the number of records we desire to execute. + */ + struct timespec timer; + timer.tv_sec = 0; + timer.tv_nsec = 20000; + first = false; + nanosleep(&timer, NULL); + goto recheck; + } } static void @@ -1572,20 +1587,14 @@ send_operations(Uint32 thread_id, { THREAD_DATA *recv_thread = &thread_data_array[thread_id]; - DEB_PRINT(("Sending %u operations to thread id %u", - list_header->num_in_list, thread_id)); NdbMutex_Lock(recv_thread->transport_mutex); - if (!recv_thread->list_header.first_in_list && - list_header->first_in_list) - { - /** - * We are moving operations into an empty list, thus we need - * to wake any threads waiting for operations to execute. - */ - NdbCondition_Signal(recv_thread->transport_cond); - } + /** + * We are moving operations into the list, thus we need + * to wake any threads waiting for operations to execute. + */ move_list(list_header, &recv_thread->list_header); + NdbCondition_Signal(recv_thread->transport_cond); NdbMutex_Unlock(recv_thread->transport_mutex); } @@ -1702,9 +1711,7 @@ wait_until_all_completed(THREAD_DATA *my bool outstanding = true; while (outstanding && !my_thread_data->stop) { - receive_operations(my_thread_data, &list_header); - DEB_PRINT(("received %u operations in thread id = %u", - list_header.num_in_list, my_thread_data->thread_id)); + receive_operations(my_thread_data, &list_header, false); update_thread_state(&list_header, thread_state); move_list(&list_header, free_list_header); outstanding = check_for_outstanding(thread_state); @@ -1763,8 +1770,6 @@ definer_thread(void *data) { THREAD_DATA *my_thread_data = (THREAD_DATA*)data; Uint32 my_thread_id = get_thread_id_safe(my_thread_data); - DEB_PRINT(("definer thread: my_thread_data = 0x%llx, my_thread_id = %u", - (Uint64)my_thread_data, my_thread_id)); RunType run_type = tRunType; Uint32 thread_state[MAX_EXECUTOR_THREADS]; Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) / @@ -1791,9 +1796,7 @@ definer_thread(void *data) my_ndb); delete my_ndb; ThreadExecutions[my_thread_id] = 0; - DEB_PRINT(("definer thread %u have completed startup", my_thread_id)); signal_thread_ready_wait_for_start(my_thread_data); - DEB_PRINT(("definer thread %u starts executing", my_thread_id)); while (!my_thread_data->stop) { @@ -1827,7 +1830,6 @@ definer_thread(void *data) current_record = first_record; } } - DEB_PRINT(("definer thread %u is done", my_thread_id)); signal_thread_ready_wait_for_start(my_thread_data); free(key_op_mem); free(thread_id_mem); @@ -1935,28 +1937,21 @@ executor_thread(void *data) Uint32 my_thread_id = get_thread_id_safe(my_thread_data); Uint64 exec_count = 0; Uint32 error_count = 0; + Uint32 executions = 0; Uint32 error_flag = false; int ret_code; KEY_LIST_HEADER list_header; - DEB_PRINT(("executor thread: my_thread_data = 0x%llx, thread_id = %u", - (Uint64)my_thread_data, my_thread_id)); Ndb *my_ndb = get_ndb_object(my_thread_id); ThreadExecutions[my_thread_id] = 0; - DEB_PRINT(("executor thread id = %u have completed startup", - my_thread_id)); signal_thread_ready_wait_for_start(my_thread_data); - DEB_PRINT(("executor thread id = %u are starting to execute", - my_thread_id)); - while (!my_thread_data->stop) { - receive_operations(my_thread_data, &list_header); + receive_operations(my_thread_data, &list_header, !tImmediate); if (list_header.num_in_list == 0) { - DEB_PRINT(("Stop executor thread has been requested by wakeup")); break; } ret_code = 0; @@ -1978,11 +1973,12 @@ executor_thread(void *data) tRunType == RunDelete || tRunState == EXECUTING)) { + executions++; exec_count += (Uint64)ret_code; } } - DEB_PRINT(("executor thread id = %u are done executing, executed %llu rows", - my_thread_id, exec_count)); + DEB_PRINT(("executor thread id = %u are done executing, executed %llu rows in %u executes", + my_thread_id, exec_count, executions)); ThreadExecutions[my_thread_id] = exec_count; if (error_count) { @@ -2009,6 +2005,12 @@ readArguments(int argc, const char** arg ndbout_c("Invalid no of threads"); return -1; } + } else if (strcmp(argv[i], "-d") == 0){ + tNoOfDefinerThreads = atoi(argv[i+1]); + if (tNoOfDefinerThreads > NDB_MAXTHREADS){ + ndbout_c("Invalid no of definer threads"); + return -1; + } } else if (strcmp(argv[i], "-p") == 0){ tNoOfParallelTrans = atoi(argv[i+1]); if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){ @@ -2138,6 +2140,10 @@ readArguments(int argc, const char** arg tNdbRecord = true; argc++; i--; + } else if (strcmp(argv[i], "-immediate") == 0){ + tImmediate = true; + argc++; + i--; } else if (strcmp(argv[i], "-drop_table") == 0){ tRunType = RunDropTable; argc++; No bundle (reason: useless for push emails).