From: Mauritz Sundell Date: June 11 2012 1:31pm Subject: bzr push into mysql-5.1-telco-7.0 branch (mauritz.sundell:4937 to 4939) List-Archive: http://lists.mysql.com/commits/144194 Message-Id: <201206111332.q5BDW499020727@acsmt356.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4939 Mauritz Sundell 2012-06-11 ndb - flexAsynch The revised flexAsynch program used in Mikael Ronstroms benchmarks, spring 2012. Part of Mikael Ronstroms "Patches used in benchmark tree with Intel" modified: storage/ndb/test/ndbapi/flexAsynch.cpp 4938 Mauritz Sundell 2012-06-11 ndb - NdbSleep_MicroSleep Added sleep function with microseconds resolution. Uses waitable timer in windows and nanosleep in posix, as fallback it rounds up to milliseconds and uses NdbSleep_MilliSleep modified: storage/ndb/include/ndb_config.h.in storage/ndb/include/portlib/NdbSleep.h storage/ndb/ndb_configure.cmake 4937 Mauritz Sundell 2012-06-07 ndb - fifo idle connection list This ensures that the list of connections to the TC block is kept in a FIFO queue rather than a LIFO queue. Using a LIFO queue gave very unbalanced accesses to the TC blocks, there could be as much as a 10x difference between the different TC blocks in a node. Part of Mikael Ronstroms "Patches used in benchmark tree with Intel" modified: storage/ndb/include/ndbapi/Ndb.hpp storage/ndb/src/ndbapi/Ndb.cpp storage/ndb/src/ndbapi/Ndbinit.cpp storage/ndb/src/ndbapi/Ndblist.cpp === modified file 'storage/ndb/include/ndb_config.h.in' --- a/storage/ndb/include/ndb_config.h.in 2012-02-07 15:41:33 +0000 +++ b/storage/ndb/include/ndb_config.h.in 2012-06-11 13:20:48 +0000 @@ -17,6 +17,7 @@ #cmakedefine HAVE_POSIX_MEMALIGN 1 #cmakedefine HAVE_CLOCK_GETTIME 1 +#cmakedefine HAVE_NANOSLEEP 1 #cmakedefine HAVE_PTHREAD_CONDATTR_SETCLOCK 1 #cmakedefine HAVE_PTHREAD_SELF 1 #cmakedefine HAVE_SCHED_GET_PRIORITY_MIN 1 === modified file 'storage/ndb/include/portlib/NdbSleep.h' --- a/storage/ndb/include/portlib/NdbSleep.h 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/include/portlib/NdbSleep.h 2012-06-11 13:20:48 +0000 @@ -20,6 +20,54 @@ #include +static inline void NdbSleep_MilliSleep(int milliseconds); + +static inline +void NdbSleep_MicroSleep(int microseconds) +{ + assert(0 < microseconds); +#ifdef _WIN32 + // Waitable timer use 100ns time unit, negative for relative time periods + LARGE_INTEGER liDueTime; + liDueTime.QuadPart = -10LL * microseconds; + + HANDLE hTimer = CreateWaitableTimer(NULL, TRUE, NULL); + if (NULL == hTimer || + !SetWaitableTimer(hTimer, &liDueTime, 0, NULL, NULL, 0) || + WaitForSingleObject(hTimer, INFINITE) != WAIT_OBJECT_0) + { +#ifndef NDEBUG + // Error code for crash analysis + DWORD winerr = GetLastError(); +#endif + assert(false); + // Fallback to millisleep in release + NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000); + } + if (NULL != hTimer) + { + CloseHandle(hTimer); + } +#elif defined(HAVE_NANOSLEEP) + struct timespec t; + t.tv_sec = microseconds / 1000000; + t.tv_nsec = 1000 * (microseconds % 1000000); + while (nanosleep(&t, &t) == -1) + { + if (errno != EINTR) + { + assert(false); + // Fallback to millisleep in release + NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000); + return ; + } + } +#else + // Fallback to millisleep + NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000); +#endif +} + static inline void NdbSleep_MilliSleep(int milliseconds) { === modified file 'storage/ndb/ndb_configure.cmake' --- a/storage/ndb/ndb_configure.cmake 2012-02-07 15:41:33 +0000 +++ b/storage/ndb/ndb_configure.cmake 2012-06-11 13:20:48 +0000 @@ -47,6 +47,7 @@ INCLUDE(ndb_require_variable) CHECK_FUNCTION_EXISTS(posix_memalign HAVE_POSIX_MEMALIGN) CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) +CHECK_FUNCTION_EXISTS(nanosleep HAVE_NANOSLEEP) CHECK_FUNCTION_EXISTS(pthread_condattr_setclock HAVE_PTHREAD_CONDATTR_SETCLOCK) CHECK_FUNCTION_EXISTS(pthread_self HAVE_PTHREAD_SELF) CHECK_FUNCTION_EXISTS(sched_get_priority_min HAVE_SCHED_GET_PRIORITY_MIN) === modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp' --- a/storage/ndb/test/ndbapi/flexAsynch.cpp 2012-02-14 08:16:48 +0000 +++ b/storage/ndb/test/ndbapi/flexAsynch.cpp 2012-06-11 13:30:32 +0000 @@ -1,5 +1,5 @@ /* - Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved. + Copyright (c) 2003, 2011, 2012, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,6 +24,8 @@ #include #include +#include +#include #include #include #include @@ -39,6 +41,9 @@ #define MAXATTR 511 #define MAXTABLES 1 #define NDB_MAXTHREADS 128 +#define MAX_EXECUTOR_THREADS 128 +#define MAX_DEFINER_THREADS 32 +#define MAX_REAL_THREADS 160 #define NDB_MAX_NODES 48 /* NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a @@ -52,22 +57,22 @@ #define PKSIZE 2 enum StartType { - stIdle, - stInsert, - stRead, - stUpdate, - stDelete, - stStop + stIdle = 0, + stInsert = 1, + stRead = 2, + stUpdate = 3, + stDelete = 4, + stStop = 5 } ; enum RunType { - RunInsert, - RunRead, - RunUpdate, - RunDelete, - RunCreateTable, - RunDropTable, - RunAll + RunInsert = 1, + RunRead = 2, + RunUpdate = 3, + RunDelete = 4, + RunCreateTable = 5, + RunDropTable = 6, + RunAll = 7 }; struct ThreadNdb @@ -85,7 +90,7 @@ static void dropTables(Ndb* pMyNdb); static int createTables(Ndb*); static void defineOperation(NdbConnection* aTransObject, StartType aType, Uint32 base, Uint32 aIndex); -static void defineNdbRecordOperation(ThreadNdb*, NdbConnection* aTransObject, StartType aType, +static void defineNdbRecordOperation(char*, NdbConnection* aTransObject, StartType aType, Uint32 base, Uint32 aIndex); static void execute(StartType aType); static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int); @@ -97,17 +102,20 @@ static bool error_handler(const NdbError static void input_error(); static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo); +static void main_thread(RunType run_type, NdbTimer & timer); +static Uint64 get_total_transactions(); +static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer); static int retry_opt = 3 ; static int failed = 0 ; ErrorData * flexAsynchErrorData; -static NdbThread* threadLife[NDB_MAXTHREADS]; +static NdbThread* threadLife[MAX_REAL_THREADS]; static int tNodeId; -static int ThreadReady[NDB_MAXTHREADS]; -static longlong ThreadExecutions[NDB_MAXTHREADS]; -static StartType ThreadStart[NDB_MAXTHREADS]; +static int ThreadReady[MAX_REAL_THREADS]; +static longlong ThreadExecutions[MAX_REAL_THREADS]; +static StartType ThreadStart[NDB_MAXTHREADS]; static char tableName[MAXTABLES][MAXSTRLEN+1]; static const NdbDictionary::Table * tables[MAXTABLES]; static char attrName[MAXATTR][MAXSTRLEN+1]; @@ -136,6 +144,8 @@ static unsigned int tLoadFac static bool tempTable = false; static bool startTransGuess = true; static int tExtraReadLoop = 0; +static bool tNew = false; +static bool tImmediate = false; //Program Flags static int theTestFlag = 0; @@ -177,13 +187,13 @@ resetThreads(){ } static void -waitForThreads(void) +waitForThreads(Uint32 num_threads_to_wait_for) { int cont = 0; do { cont = 0; NdbSleep_MilliSleep(20); - for (unsigned i = 0; i < tNoOfThreads ; i++) { + for (unsigned i = 0; i < num_threads_to_wait_for ; i++) { if (ThreadReady[i] == 0) { cont = 1; }//if @@ -204,9 +214,8 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f { ndb_init(); ThreadNdb* pThreadData; - int tLoops=0; - int returnValue = NDBT_OK; DEFINE_TIMER; + int returnValue = NDBT_OK; flexAsynchErrorData = new ErrorData; flexAsynchErrorData->resetErrorCounters(); @@ -256,7 +265,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f ndbout << endl; - NdbThread_SetConcurrencyLevel(2 + tNoOfThreads); + NdbThread_SetConcurrencyLevel(2 + (tNoOfThreads * 5 / 4)); /* print Setting */ flexAsynchErrorData->printSettings(ndbout); @@ -311,7 +320,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f } } - if (tNdbRecord) + if (tNdbRecord && !tNew) { Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]); sz += 3; @@ -325,260 +334,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f if(returnValue == NDBT_OK && tRunType != RunCreateTable && tRunType != RunDropTable){ - /**************************************************************** - * Create NDB objects. * - ****************************************************************/ - resetThreads(); - for (Uint32 i = 0; i < tNoOfThreads ; i++) { - pThreadData[i].ThreadNo = i; - threadLife[i] = NdbThread_Create(threadLoop, - (void**)&pThreadData[i], - 32768, - "flexAsynchThread", - NDB_THREAD_PRIO_LOW); - }//for - ndbout << endl << "All NDB objects and table created" << endl << endl; - int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads; - /**************************************************************** - * Execute program. * - ****************************************************************/ - - for(;;) { - - int loopCount = tLoops + 1 ; - ndbout << endl << "Loop # " << loopCount << endl << endl ; - - /**************************************************************** - * Perform inserts. * - ****************************************************************/ - - failed = 0 ; - if (tRunType == RunAll || tRunType == RunInsert){ - ndbout << "Executing inserts" << endl; - START_TIMER; - execute(stInsert); - STOP_TIMER; - } - if (tRunType == RunAll){ - a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime()); - PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); - - if (0 < failed) { - int i = retry_opt ; - int ci = 1 ; - while (0 < failed && 0 < i){ - ndbout << failed << " of the transactions returned errors!" - << endl << endl; - ndbout << "Attempting to redo the failed transactions now..." - << endl ; - ndbout << "Redo attempt " << ci <<" out of " << retry_opt - << endl << endl; - failed = 0 ; - START_TIMER; - execute(stInsert); - STOP_TIMER; - PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); - i-- ; - ci++; - } - if(0 == failed ){ - ndbout << endl <<"Redo attempt succeeded" << endl << endl; - }else{ - ndbout << endl <<"Redo attempt failed, moving on now..." << endl - << endl; - }//if - }//if - }//if - /**************************************************************** - * Perform read. * - ****************************************************************/ - - failed = 0 ; - - if (tRunType == RunAll || tRunType == RunRead){ - for (int ll = 0; ll < 1 + tExtraReadLoop; ll++) - { - ndbout << "Executing reads" << endl; - START_TIMER; - execute(stRead); - STOP_TIMER; - if (tRunType == RunAll){ - a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime()); - PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); - }//if - }//for - }//if - - if (tRunType == RunAll){ - if (0 < failed) { - int i = retry_opt ; - int cr = 1; - while (0 < failed && 0 < i){ - ndbout << failed << " of the transactions returned errors!"<record, tConArray[num_ops], aType, threadBaseLoc2, @@ -944,24 +722,31 @@ executeCallback(int result, NdbConnectio NdbConnection **array_ref = (NdbConnection**)aObject; assert(NdbObject == *array_ref); *array_ref = NULL; - if (result == -1) { - - // Add complete error handling here + if (result == -1 && failed < 100) + { - int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError()); - if (retCode == 1) { - if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){ - ndbout_c("execute: %s", NdbObject->getNdbError().message); - ndbout_c("Error code = %d", NdbObject->getNdbError().code);} - } else if (retCode == 2) { - ndbout << "4115 should not happen in flexAsynch" << endl; - } else if (retCode == 3) { - /* What can we do here? */ - ndbout_c("execute: %s", NdbObject->getNdbError().message); - }//if(retCode == 3) + // Add complete error handling here - // ndbout << "Error occured in poll:" << endl; - // ndbout << NdbObject->getNdbError() << endl; + int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError()); + if (retCode == 1) + { + if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630) + { + ndbout_c("execute: %s", NdbObject->getNdbError().message); + ndbout_c("Error code = %d", NdbObject->getNdbError().code); + } + } + else if (retCode == 2) + { + ndbout << "4115 should not happen in flexAsynch" << endl; + } + else if (retCode == 3) + { + /* What can we do here? */ + ndbout_c("execute: %s", NdbObject->getNdbError().message); + }//if(retCode == 3) + // ndbout << "Error occured in poll:" << endl; + // ndbout << NdbObject->getNdbError() << endl; failed++ ; }//if NdbObject->close(); /* Close transaction */ @@ -1067,11 +852,12 @@ defineOperation(NdbConnection* localNdbC static void -defineNdbRecordOperation(ThreadNdb* pThread, - NdbConnection* pTrans, StartType aType, - Uint32 threadBase, Uint32 aIndex) +defineNdbRecordOperation(char *record, + NdbConnection* pTrans, + StartType aType, + Uint32 threadBase, + Uint32 aIndex) { - char * record = pThread->record; Uint32 offset; NdbDictionary::getOffset(g_record[0], 0, offset); * (Uint32*)(record + offset) = threadBase; @@ -1102,15 +888,24 @@ defineNdbRecordOperation(ThreadNdb* pThr break; }//case case stRead: { // Read Case - op = pTrans->readTuple(g_record[0],record,g_record[0],record, NdbOperation::LM_CommittedRead); + op = pTrans->readTuple(g_record[0], + record, + g_record[0], + record, + NdbOperation::LM_CommittedRead); break; }//case case stUpdate:{ // Update Case - op = pTrans->updateTuple(g_record[0],record,g_record[0],record); + op = pTrans->updateTuple(g_record[0], + record, + g_record[0], + record); break; }//case case stDelete: { // Delete Case - op = pTrans->deleteTuple(g_record[0],record, g_record[0]); + op = pTrans->deleteTuple(g_record[0], + record, + g_record[0]); break; }//case default: { @@ -1193,6 +988,25 @@ setUpNodeTableArray(Uint32 tableNo, cons } static Uint32 +get_node_relative_id(Uint32 tableNo, Uint32 node_id) +{ + Uint32 rel_id = 0; + + for (Uint32 i = 1; i < node_id; i++) + { + if (nodeTableArray[tableNo][i]) + rel_id++; + } + return rel_id; +} + +static Uint32 +get_node_count(Uint32 tableNo) +{ + return get_node_relative_id(tableNo, NDB_MAX_NODES + 1); +} + +static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo) { Uint32 count = 0; @@ -1327,6 +1141,859 @@ setAggregateRun(void) theTableCreateFlag = 1; } +/* Start NEW Module */ + +/** + * This part contains the code used for the case --local 4 which is using + * the design pattern that could be used for asynchronous applications of + * the NDB API. + * + * This variant will always use transaction hints, it will always the + * NDB Record format in the NDB API. + */ + +static void* definer_thread(void *data); +static void* executor_thread(void *data); + +static Uint32 tNoOfExecutorThreads = 0; +static Uint32 tNoOfDefinerThreads = 0; + +enum RunState +{ + WARMUP = 0, + EXECUTING = 1, + COOLDOWN = 2 +}; + +RunState tRunState = WARMUP; + +typedef struct KeyOperation KEY_OPERATION; +struct KeyOperation +{ + Uint32 first_key; + Uint32 second_key; + Uint32 definer_thread_id; + Uint32 executor_thread_id; + RunType operation_type; + KEY_OPERATION *next_key_op; +}; + +typedef struct key_list_header KEY_LIST_HEADER; +struct key_list_header +{ + KEY_OPERATION *first_in_list; + KEY_OPERATION *last_in_list; + Uint32 num_in_list; +}; + + +typedef struct thread_data_struct THREAD_DATA; +struct thread_data_struct +{ + KEY_LIST_HEADER list_header; + Uint32 thread_id; + bool ready; + bool stop; + bool start; + + char *record; + NdbMutex *transport_mutex; + struct NdbCondition *transport_cond; + struct NdbCondition *main_cond; + struct NdbCondition *start_cond; + char not_used[52]; +}; +THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS]; + +static Uint64 +get_total_transactions() +{ + Uint64 total_transactions = 0; + + for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++) + { + total_transactions += ThreadExecutions[i]; + } + return total_transactions; +} + +static void +init_list_headers(KEY_LIST_HEADER *list_header, + Uint32 num_list_headers) +{ + Uint32 i; + KEY_LIST_HEADER *list_header_ref; + char *list_header_ptr = (char*)list_header; + for (i = 0; + i < num_list_headers; + i++, list_header_ptr += sizeof(KEY_LIST_HEADER)) + { + list_header_ref = (KEY_LIST_HEADER*)list_header_ptr; + list_header_ref->first_in_list = NULL; + list_header_ref->last_in_list = NULL; + list_header_ref->num_in_list = 0; + } +} + +static void +wait_thread_ready(THREAD_DATA *my_thread_data) +{ + NdbMutex_Lock(my_thread_data->transport_mutex); + while (1) + { + if (my_thread_data->ready) + break; + NdbCondition_Wait(my_thread_data->main_cond, + my_thread_data->transport_mutex); + } + NdbMutex_Unlock(my_thread_data->transport_mutex); +} + +static void +wait_for_threads_ready(Uint32 num_threads) +{ + for (Uint32 i = 0; i < num_threads; i++) + wait_thread_ready(&thread_data_array[i]); +} + +static void +signal_thread_to_start(THREAD_DATA *my_thread_data) +{ + NdbMutex_Lock(my_thread_data->transport_mutex); + my_thread_data->start = true; + my_thread_data->ready = false; + NdbCondition_Signal(my_thread_data->start_cond); + NdbMutex_Unlock(my_thread_data->transport_mutex); +} + +static void +signal_definer_threads_to_start() +{ + for (Uint32 i = 0; i < tNoOfDefinerThreads; i++) + signal_thread_to_start(&thread_data_array[i]); +} + +static void +signal_executor_threads_to_start() +{ + for (Uint32 i = 0; i < tNoOfExecutorThreads; i++) + signal_thread_to_start(&thread_data_array[tNoOfDefinerThreads + i]); +} + +static void +signal_thread_ready_wait_for_start(THREAD_DATA *my_thread_data) +{ + NdbMutex_Lock(my_thread_data->transport_mutex); + my_thread_data->ready = true; + NdbCondition_Signal(my_thread_data->main_cond); + while (1) + { + if (my_thread_data->start) + break; + NdbCondition_Wait(my_thread_data->start_cond, + my_thread_data->transport_mutex); + } + my_thread_data->start = false; + NdbMutex_Unlock(my_thread_data->transport_mutex); +} + +static void +signal_thread_to_stop(THREAD_DATA *my_thread_data) +{ + NdbMutex_Lock(my_thread_data->transport_mutex); + my_thread_data->stop = true; + NdbCondition_Signal(my_thread_data->transport_cond); + NdbMutex_Unlock(my_thread_data->transport_mutex); +} + +static void +signal_definer_threads_to_stop() +{ + for (Uint32 i = 0; i < tNoOfDefinerThreads; i++) + signal_thread_to_stop(&thread_data_array[i]); +} + +static void +signal_executor_threads_to_stop() +{ + for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++) + signal_thread_to_stop(&thread_data_array[i]); +} + +static void +destroy_thread_data(THREAD_DATA *my_thread_data) +{ + free(my_thread_data->record); + NdbMutex_Destroy(my_thread_data->transport_mutex); + NdbCondition_Destroy(my_thread_data->transport_cond); + NdbCondition_Destroy(my_thread_data->start_cond); + NdbCondition_Destroy(my_thread_data->main_cond); +} + +static void +init_thread_data(THREAD_DATA *my_thread_data, Uint32 thread_id) +{ + Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]); + my_thread_data->record = (char*)malloc(sz); + memset(my_thread_data->record, 0, sz); + init_list_headers(&my_thread_data->list_header, 1); + my_thread_data->stop = false; + my_thread_data->ready = false; + my_thread_data->start = false; + my_thread_data->transport_mutex = NdbMutex_Create(); + my_thread_data->transport_cond = NdbCondition_Create(); + my_thread_data->main_cond = NdbCondition_Create(); + my_thread_data->start_cond = NdbCondition_Create(); + my_thread_data->thread_id = thread_id; +} + +static void +create_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id) +{ + init_thread_data(my_thread_data, thread_id); + threadLife[thread_id] = NdbThread_Create(definer_thread, + (void**)my_thread_data, + 1024 * 1024, + "flexAsynchThread", + NDB_THREAD_PRIO_LOW); +} + +static void +create_definer_threads() +{ + for (Uint32 i = 0; i < tNoOfDefinerThreads; i++) + { + Uint32 thread_id = i; + create_definer_thread(&thread_data_array[thread_id], thread_id); + } +} + +static void +create_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id) +{ + init_thread_data(my_thread_data, thread_id); + threadLife[thread_id] = NdbThread_Create(executor_thread, + (void**)my_thread_data, + 1024 * 1024, + "flexAsynchThread", + NDB_THREAD_PRIO_LOW); +} + +static void +create_executor_threads() +{ + for (Uint32 i = 0; i < tNoOfExecutorThreads; i++) + { + Uint32 thread_id = tNoOfDefinerThreads + i; + create_executor_thread(&thread_data_array[thread_id], thread_id); + } +} + +static void +main_thread(RunType start_type, NdbTimer & timer) +{ + bool insert_delete; + + tNoOfExecutorThreads = tNoOfThreads; + if (tNoOfDefinerThreads == 0) + { + tNoOfDefinerThreads = (tNoOfThreads + 3)/4; + } + tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads; + + if (start_type == RunInsert || + start_type == RunDelete) + insert_delete = true; + else + insert_delete = false; + + create_definer_threads(); + create_executor_threads(); + + wait_for_threads_ready(tNoOfThreads); + + /** + * Start threads, start with execution threads to ensure they are + * up and running before definer threads starts sending data to + * them + */ + START_TIMER; + signal_definer_threads_to_start(); + signal_executor_threads_to_start(); + + if (!insert_delete) + { + sleep(tWarmupTime); + tRunState = EXECUTING; + sleep(tExecutionTime); + tRunState = COOLDOWN; + sleep(tCooldownTime); + signal_definer_threads_to_stop(); + } + wait_for_threads_ready(tNoOfDefinerThreads); + STOP_TIMER; + + signal_executor_threads_to_stop(); + wait_for_threads_ready(tNoOfThreads); + + /** + * Now all threads are stopped and prepared to be destroyed, + * now start them just to destroy themselves + */ + signal_definer_threads_to_start(); + signal_executor_threads_to_start(); + + void * tmp; + for (Uint32 i = 0; i < tNoOfThreads; i++) + { + NdbThread_WaitFor(threadLife[i], &tmp); + NdbThread_Destroy(&threadLife[i]); + } +} + +static NdbConnection* +get_trans_object(Uint32 first_key, + Uint32 second_key, + Ndb *my_ndb) +{ + union { + Uint64 _align; + Uint32 Tkey32[2]; + }; + (void)_align; + + Tkey32[0] = first_key; + Tkey32[1] = second_key; + Ndb::Key_part_ptr hint[2]; + hint[0].ptr = Tkey32+0; + hint[0].len = 4; + hint[1].ptr = 0; + hint[1].len = 0; + + return my_ndb->startTransaction(tables[0], hint); +} + +static Ndb* +get_ndb_object(Uint32 my_thread_id) +{ + Ndb *my_ndb = new Ndb(g_cluster_connection+(my_thread_id % tConnections), + "TEST_DB"); + my_ndb->init(MAXPAR); + my_ndb->waitUntilReady(10000); + return my_ndb; +} + +static void +insert_list(KEY_LIST_HEADER *list_header, + KEY_OPERATION *insert_op) +{ + KEY_OPERATION *current_last = list_header->last_in_list; + insert_op->next_key_op = NULL; + list_header->last_in_list = insert_op; + if (current_last) + current_last->next_key_op = insert_op; + else + list_header->first_in_list = insert_op; + list_header->num_in_list++; +} + +static KEY_OPERATION* +get_first_free(KEY_LIST_HEADER *list_header) +{ + assert(list_header->first_in_list); + KEY_OPERATION *key_op = list_header->first_in_list; + list_header->first_in_list = key_op->next_key_op; + list_header->num_in_list--; + if (!list_header->first_in_list) + { + list_header->last_in_list = NULL; + } + key_op->next_key_op = NULL; + return key_op; +} + +static void +move_list(KEY_LIST_HEADER *src_list_header, + KEY_LIST_HEADER *dst_list_header) +{ + KEY_OPERATION *last_completed_op = dst_list_header->last_in_list; + KEY_OPERATION *first_in_list = src_list_header->first_in_list; + if (!first_in_list) + return; + if (last_completed_op) + { + last_completed_op->next_key_op = first_in_list; + } + else + { + dst_list_header->first_in_list = first_in_list; + } + dst_list_header->last_in_list = src_list_header->last_in_list; + dst_list_header->num_in_list += src_list_header->num_in_list; + src_list_header->num_in_list = 0; + src_list_header->first_in_list = NULL; + src_list_header->last_in_list = NULL; +} + +/** + * Retrieve a linked list of prepared operations. If no operations + * prepared we wait on a condition until operations are defined for + * us to execute. + */ +static void +receive_operations(THREAD_DATA *my_thread_data, + KEY_LIST_HEADER *list_header, + bool wait) +{ + bool first = true; + KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header; + list_header->first_in_list = NULL; + list_header->last_in_list = NULL; + list_header->num_in_list = 0; + +recheck: + NdbMutex_Lock(my_thread_data->transport_mutex); + while (!my_thread_data->stop && + (first || thread_list_header->first_in_list)) + { + move_list(thread_list_header, list_header); + if (list_header->first_in_list) + break; + NdbCondition_Wait(my_thread_data->transport_cond, + my_thread_data->transport_mutex); + first = false; + } + NdbMutex_Unlock(my_thread_data->transport_mutex); + if (first && wait && + thread_list_header->num_in_list < ((tNoOfParallelTrans + 1) / 2)) + { + /** + * We will wait for at least 200 microseconds if we haven't yet received + * at least half of the number of records we desire to execute. + */ + NdbSleep_MicroSleep(200); + first = false; + goto recheck; + } +} + +static void +send_operations(Uint32 thread_id, + KEY_LIST_HEADER *list_header) +{ + THREAD_DATA *recv_thread = &thread_data_array[thread_id]; + + NdbMutex_Lock(recv_thread->transport_mutex); + /** + * We are moving operations into the list, thus we need + * to wake any threads waiting for operations to execute. + */ + move_list(list_header, + &recv_thread->list_header); + NdbCondition_Signal(recv_thread->transport_cond); + NdbMutex_Unlock(recv_thread->transport_mutex); +} + +static void +init_key_op_list(char *key_op_ptr, + KEY_LIST_HEADER *list_header, + Uint32 max_outstanding, + Uint32 my_thread_id, + RunType my_run_type) +{ + KEY_OPERATION *key_op; + + list_header->first_in_list = (KEY_OPERATION*)key_op_ptr; + for (Uint32 i = 0; + i < max_outstanding; + i++, key_op_ptr += sizeof(KEY_OPERATION)) + { + key_op = (KEY_OPERATION*)key_op_ptr; + key_op->next_key_op = (KEY_OPERATION*)(key_op_ptr + sizeof(KEY_OPERATION)); + key_op->definer_thread_id = my_thread_id; + key_op->executor_thread_id = MAX_EXECUTOR_THREADS; + key_op->operation_type = my_run_type; + } + key_op->next_key_op = NULL; /* Last key operation */ + list_header->last_in_list = key_op; + list_header->num_in_list = max_outstanding; +} + +static Uint32 +get_thread_id_for_record(Uint32 record_id, + Uint32 node_count, + Uint32 thread_count, + Uint32 thread_group, + Uint32 num_thread_groups, + Ndb *my_ndb) +{ + Uint32 thread_id; + NdbConnection *trans = get_trans_object(record_id, record_id, my_ndb); + Uint32 node_id = trans->getConnectedNodeId(); + trans->close(); + Uint32 node_rel_id = get_node_relative_id((Uint32)0, node_id); + if (node_count >= thread_count) + { + thread_id = node_rel_id % thread_count; + return thread_id; + } + +recalculate: + thread_id = thread_group * node_count + node_rel_id; + if (thread_id >= thread_count) + { + /** + * Only the last thread group may have less than + * node_count threads, so choosing a random group + * except the last will always give a valid + * thread_id. + */ + thread_group = rand() % (num_thread_groups - 1); + goto recalculate; + } + return thread_id; +} + +void init_thread_id_mem(char *thread_id_mem, + Uint32 first_record, + Uint32 total_records, + Ndb *my_ndb) +{ + Uint32 node_count = get_node_count((Uint32)0); + Uint32 thread_count = tNoOfExecutorThreads; + Uint32 num_thread_groups = (thread_count + node_count - 1) / node_count; + Uint32 thread_group = 0; + for (Uint32 record_id = first_record, i = 0; + i < total_records; + i++, record_id++) + { + thread_id_mem[i] = (char)get_thread_id_for_record(record_id, + node_count, + thread_count, + thread_group, + num_thread_groups, + my_ndb); + thread_group++; + if (thread_group == num_thread_groups) + thread_group = 0; + } +} + +static bool +check_for_outstanding(Uint32 *thread_state) +{ + for (Uint32 i = 0; i < tNoOfExecutorThreads; i++) + { + if (thread_state[i]) + return true; + } + return false; +} + +static void +update_thread_state(KEY_LIST_HEADER *list_header, + Uint32 *thread_state) +{ + KEY_OPERATION *key_op = list_header->first_in_list; + + while (key_op) + { + thread_state[key_op->executor_thread_id]--; + key_op->executor_thread_id = MAX_EXECUTOR_THREADS; + key_op = key_op->next_key_op; + } +} + +static void +wait_until_all_completed(THREAD_DATA *my_thread_data, + Uint32 *thread_state, + KEY_LIST_HEADER *free_list_header) +{ + KEY_LIST_HEADER list_header; + bool outstanding = true; + while (outstanding && !my_thread_data->stop) + { + receive_operations(my_thread_data, &list_header, false); + update_thread_state(&list_header, thread_state); + move_list(&list_header, free_list_header); + outstanding = check_for_outstanding(thread_state); + } +} + +static Uint32 +prepare_operations(char *thread_id_mem, + KEY_LIST_HEADER *free_list_header, + Uint32 *thread_state, + Uint32 first_record_to_define, + Uint32 num_records_to_define, + Uint32 first_record, + Uint32 last_record, + Uint32 max_per_thread) +{ + KEY_LIST_HEADER thread_list_headers[MAX_EXECUTOR_THREADS]; + Uint32 record_id, i, num_records; + + init_list_headers(&thread_list_headers[0], tNoOfExecutorThreads); + for (record_id = first_record_to_define, i = 0; + record_id <= last_record && i < num_records_to_define; + record_id++, i++) + { + KEY_OPERATION *define_op = get_first_free(free_list_header); + Uint32 thread_id = (Uint32)thread_id_mem[record_id - first_record]; + define_op->first_key = record_id; + define_op->second_key = record_id; + define_op->executor_thread_id = thread_id; + thread_state[thread_id]++; + KEY_LIST_HEADER *thread_list_header = &thread_list_headers[thread_id]; + insert_list(thread_list_header, define_op); + if (thread_list_header->num_in_list >= max_per_thread) + { + /** + * One thread has max number of records, we won't define any + * more to keep the code simple. + */ + break; + } + } + num_records = i; + for (i = 0; i < tNoOfExecutorThreads; i++) + { + KEY_LIST_HEADER *thread_list_header = &thread_list_headers[i]; + if (thread_list_header->num_in_list) + { + send_operations(tNoOfDefinerThreads + i, thread_list_header); + } + } + return num_records; +} + +static void* +definer_thread(void *data) +{ + THREAD_DATA *my_thread_data = (THREAD_DATA*)data; + Uint32 my_thread_id = my_thread_data->thread_id; + RunType run_type = tRunType; + Uint32 thread_state[MAX_EXECUTOR_THREADS]; + Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) / + tNoOfDefinerThreads; + Uint32 max_per_thread = 1000 / tNoOfDefinerThreads; + Uint32 total_records = max_outstanding * tNoOfTransactions; + Uint32 first_record = total_records * my_thread_id; + Uint32 my_last_record = first_record + total_records - 1; + Uint32 current_record = first_record; + KEY_LIST_HEADER free_list_header; + void *key_op_mem = malloc(sizeof(KEY_OPERATION) * max_outstanding); + char *thread_id_mem = (char*)malloc(total_records); + memset((char*)&thread_state[0], 0, sizeof(thread_state)); + + init_key_op_list((char*)key_op_mem, + &free_list_header, + max_outstanding, + my_thread_id, + run_type); + Ndb *my_ndb = get_ndb_object(my_thread_id); + init_thread_id_mem(thread_id_mem, + first_record, + total_records, + my_ndb); + delete my_ndb; + ThreadExecutions[my_thread_id] = 0; + signal_thread_ready_wait_for_start(my_thread_data); + + while (!my_thread_data->stop) + { + Uint32 defined_ops = prepare_operations(thread_id_mem, + &free_list_header, + &thread_state[0], + current_record, + max_outstanding, + first_record, + my_last_record, + max_per_thread); + current_record += defined_ops; + if (defined_ops) + { + wait_until_all_completed(my_thread_data, + &thread_state[0], + &free_list_header); + } + if (current_record > my_last_record) + { + if (run_type != RunRead && + run_type != RunUpdate) + { + /** + * Inserts and deletes are done when first round is + * completed. Reads and updates proceed until time is + * completed. + */ + break; + } + current_record = first_record; + } + } + signal_thread_ready_wait_for_start(my_thread_data); + free(key_op_mem); + free(thread_id_mem); + destroy_thread_data(my_thread_data); + return NULL; +} + +/** + * This method receives a linked list of key operations and executes + * all of them. + * + * Return Value: >= 0 means successful completion of this many operations + * -1 Failure, stop test + */ +static int +execute_operations(char *record, + Ndb* my_ndb, + KEY_OPERATION *key_op) +{ + NdbConnection* ndb_conn_array[MAXPAR]; + Uint32 num_ops = 0; + + while (key_op) + { + ndb_conn_array[num_ops] = get_trans_object(key_op->first_key, + key_op->second_key, + my_ndb); + if (ndb_conn_array[num_ops] == NULL){ + error_handler(my_ndb->getNdbError()); + ndbout << endl << "Unable to recover! Quitting now" << endl ; + return -1; + } + //------------------------------------------------------- + // Define the operation, but do not execute it yet. + //------------------------------------------------------- + defineNdbRecordOperation(record, + ndb_conn_array[num_ops], + (StartType)key_op->operation_type, + key_op->first_key, + key_op->second_key); + + ndb_conn_array[num_ops]->executeAsynchPrepare(Commit, + &executeCallback, + (void*)&ndb_conn_array[num_ops]); + num_ops++; + key_op = key_op->next_key_op; + } + if (num_ops == 0) + return 0; + + /** + * Now execute each defined operation and wait for all of them to + * complete. + */ + int Tcomp = my_ndb->sendPollNdb(3000, + num_ops, + tSendForce); + if (Tcomp != (int)num_ops && + my_ndb->getNdbError().code != 0) + { + /* Error handling */ + if (error_count > 100) + return -1; + + error_count++; + ndbout << "error = " << my_ndb->getNdbError().code << endl; + } + return Tcomp; +} + +static void +report_back_operations(KEY_OPERATION *first_defined_op) +{ + KEY_LIST_HEADER thread_list_header[MAX_DEFINER_THREADS]; + KEY_OPERATION *next_op, *executed_op; + + init_list_headers(&thread_list_header[0], tNoOfDefinerThreads); + executed_op = first_defined_op; + while (executed_op) + { + next_op = executed_op->next_key_op; + insert_list(&thread_list_header[executed_op->definer_thread_id], + executed_op); + executed_op = next_op; + } + for (Uint32 i = 0; i < tNoOfDefinerThreads; i++) + { + if (thread_list_header[i].first_in_list) + { + send_operations(i, &thread_list_header[i]); + } + } +} + +/** + * This is the main function of the executor threads, these threads + * receive linked lists of operations to execute from the definer + * threads. The definer threads stops these threads by simply + * sending a stop operation. + */ +static void* +executor_thread(void *data) +{ + THREAD_DATA *my_thread_data = (THREAD_DATA*)data; + Uint32 my_thread_id = my_thread_data->thread_id; + Uint64 exec_count = 0; + Uint32 error_count = 0; + Uint32 executions = 0; + Uint32 error_flag = false; + int ret_code; + KEY_LIST_HEADER list_header; + + Ndb *my_ndb = get_ndb_object(my_thread_id); + ThreadExecutions[my_thread_id] = 0; + + signal_thread_ready_wait_for_start(my_thread_data); + + while (!my_thread_data->stop) + { + receive_operations(my_thread_data, &list_header, !tImmediate); + if (list_header.num_in_list == 0) + { + break; + } + ret_code = 0; + if (!error_flag) + { + /* Ignore to execute after errors to simplify error handling */ + ret_code = execute_operations(my_thread_data->record, + my_ndb, + list_header.first_in_list); + } + report_back_operations(list_header.first_in_list); + if (ret_code < 0) + { + ndbout_c("executor thread id = %u received error", my_thread_id); + error_flag = true; + } + else if (!error_flag && + (tRunType == RunInsert || + tRunType == RunDelete || + tRunState == EXECUTING)) + { + executions++; + exec_count += (Uint64)ret_code; + } + } + + ThreadExecutions[my_thread_id] = exec_count; + if (error_count) + { + ndbout_c("Received %u errors in executor thread, id = %u", + error_count, my_thread_id); + } + signal_thread_ready_wait_for_start(my_thread_data); + delete my_ndb; + destroy_thread_data(my_thread_data); + return NULL; +} + +/* End NEW Module */ + static int readArguments(int argc, const char** argv){ @@ -1339,6 +2006,15 @@ readArguments(int argc, const char** arg ndbout_c("Invalid no of threads"); return -1; } + } + else if (strcmp(argv[i], "-d") == 0) + { + tNoOfDefinerThreads = atoi(argv[i+1]); + if (tNoOfDefinerThreads > NDB_MAXTHREADS) + { + ndbout_c("Invalid no of definer threads"); + return -1; + } } else if (strcmp(argv[i], "-p") == 0){ tNoOfParallelTrans = atoi(argv[i+1]); if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){ @@ -1463,6 +2139,18 @@ readArguments(int argc, const char** arg } else if (strcmp(argv[i], "-create_table") == 0){ tRunType = RunCreateTable; argc++; + } + else if (strcmp(argv[i], "-new") == 0) + { + tNew = true; + tNdbRecord = true; + argc++; + i--; + } + else if (strcmp(argv[i], "-immediate") == 0) + { + tImmediate = true; + argc++; i--; } else if (strcmp(argv[i], "-drop_table") == 0){ tRunType = RunDropTable; @@ -1536,4 +2224,303 @@ input_error(){ ndbout_c(" -table Number of standard table, default 0"); } +static void +run_old_flexAsynch(ThreadNdb *pThreadData, + NdbTimer & timer) +{ + int tLoops=0; + /**************************************************************** + * Create NDB objects. * + ****************************************************************/ + resetThreads(); + for (Uint32 i = 0; i < tNoOfThreads ; i++) + { + pThreadData[i].ThreadNo = i; + threadLife[i] = NdbThread_Create(threadLoop, + (void**)&pThreadData[i], + 32768, + "flexAsynchThread", + NDB_THREAD_PRIO_LOW); + }//for + ndbout << endl << "All NDB objects and table created" << endl << endl; + int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads; + /**************************************************************** + * Execute program. * + ****************************************************************/ + + for (;;) + { + + int loopCount = tLoops + 1 ; + ndbout << endl << "Loop # " << loopCount << endl << endl ; + + /**************************************************************** + * Perform inserts. * + ****************************************************************/ + + failed = 0 ; + if (tRunType == RunAll || tRunType == RunInsert) + { + ndbout << "Executing inserts" << endl; + START_TIMER; + execute(stInsert); + STOP_TIMER; + } + if (tRunType == RunAll) + { + a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime()); + PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); + + if (0 < failed) + { + int i = retry_opt ; + int ci = 1 ; + while (0 < failed && 0 < i) + { + ndbout << failed << " of the transactions returned errors!" + << endl << endl; + ndbout << "Attempting to redo the failed transactions now..." + << endl ; + ndbout << "Redo attempt " << ci <<" out of " << retry_opt + << endl << endl; + failed = 0 ; + START_TIMER; + execute(stInsert); + STOP_TIMER; + PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); + i-- ; + ci++; + } + if (0 == failed ) + { + ndbout << endl <<"Redo attempt succeeded" << endl << endl; + } + else + { + ndbout << endl <<"Redo attempt failed, moving on now..." << endl + << endl; + }//if + }//if + }//if + /**************************************************************** + * Perform read. * + ****************************************************************/ + + failed = 0 ; + + if (tRunType == RunAll || tRunType == RunRead) + { + for (int ll = 0; ll < 1 + tExtraReadLoop; ll++) + { + ndbout << "Executing reads" << endl; + START_TIMER; + execute(stRead); + STOP_TIMER; + if (tRunType == RunAll) + { + a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime()); + PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); + }//if + }//for + }//if + + if (tRunType == RunAll) + { + if (0 < failed) + { + int i = retry_opt ; + int cr = 1; + while (0 < failed && 0 < i) + { + ndbout << failed << " of the transactions returned errors!"<; No bundle (reason: useless for push emails).