From: Mauritz Sundell Date: June 11 2012 2:48pm Subject: bzr push into mysql-5.1-telco-7.1 branch (mauritz.sundell:4564 to 4565) List-Archive: http://lists.mysql.com/commits/144195 Message-Id: <201206111448.q5BEmEoq005974@acsmt357.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4565 Mauritz Sundell 2012-06-11 [merge] Merge 7.0->7.1 modified: storage/ndb/include/ndb_config.h.in storage/ndb/include/ndbapi/Ndb.hpp storage/ndb/include/portlib/NdbSleep.h storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/ndb_configure.cmake storage/ndb/src/common/transporter/TCP_Transporter.cpp storage/ndb/src/common/transporter/Transporter.cpp storage/ndb/src/common/transporter/Transporter.hpp storage/ndb/src/common/util/HashMap2.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/ndbapi/Ndb.cpp storage/ndb/src/ndbapi/Ndbinit.cpp storage/ndb/src/ndbapi/Ndblist.cpp storage/ndb/test/ndbapi/flexAsynch.cpp 4564 Craig L Russell 2012-06-10 Add ScanFlag.SF_OrderBy to descending order scans modified: storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java === modified file 'storage/ndb/include/ndb_config.h.in' --- a/storage/ndb/include/ndb_config.h.in 2012-02-07 15:41:33 +0000 +++ b/storage/ndb/include/ndb_config.h.in 2012-06-11 13:20:48 +0000 @@ -17,6 +17,7 @@ #cmakedefine HAVE_POSIX_MEMALIGN 1 #cmakedefine HAVE_CLOCK_GETTIME 1 +#cmakedefine HAVE_NANOSLEEP 1 #cmakedefine HAVE_PTHREAD_CONDATTR_SETCLOCK 1 #cmakedefine HAVE_PTHREAD_SELF 1 #cmakedefine HAVE_SCHED_GET_PRIORITY_MIN 1 === modified file 'storage/ndb/include/ndbapi/Ndb.hpp' --- a/storage/ndb/include/ndbapi/Ndb.hpp 2012-01-19 18:13:07 +0000 +++ b/storage/ndb/include/ndbapi/Ndb.hpp 2012-06-11 14:47:18 +0000 @@ -1935,6 +1935,12 @@ private: * Returns NULL if none found */ NdbTransaction* getConnectedNdbTransaction(Uint32 nodeId, Uint32 instance); + /** + * Handle Connection Array lists + */ + void appendConnectionArray(NdbTransaction *aCon, Uint32 nodeId); + void prependConnectionArray(NdbTransaction *aCon, Uint32 nodeId); + void removeConnectionArray(NdbTransaction *first, Uint32 nodeId); // Release and disconnect from DBTC a connection // and seize it to theConIdleList @@ -2020,6 +2026,7 @@ private: NdbTransaction* theTransactionList; NdbTransaction** theConnectionArray; + NdbTransaction** theConnectionArrayLast; Uint32 theMyRef; // My block reference Uint32 theNode; // The node number of our node === modified file 'storage/ndb/include/portlib/NdbSleep.h' --- a/storage/ndb/include/portlib/NdbSleep.h 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/include/portlib/NdbSleep.h 2012-06-11 13:20:48 +0000 @@ -20,6 +20,54 @@ #include +static inline void NdbSleep_MilliSleep(int milliseconds); + +static inline +void NdbSleep_MicroSleep(int microseconds) +{ + assert(0 < microseconds); +#ifdef _WIN32 + // Waitable timer use 100ns time unit, negative for relative time periods + LARGE_INTEGER liDueTime; + liDueTime.QuadPart = -10LL * microseconds; + + HANDLE hTimer = CreateWaitableTimer(NULL, TRUE, NULL); + if (NULL == hTimer || + !SetWaitableTimer(hTimer, &liDueTime, 0, NULL, NULL, 0) || + WaitForSingleObject(hTimer, INFINITE) != WAIT_OBJECT_0) + { +#ifndef NDEBUG + // Error code for crash analysis + DWORD winerr = GetLastError(); +#endif + assert(false); + // Fallback to millisleep in release + NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000); + } + if (NULL != hTimer) + { + CloseHandle(hTimer); + } +#elif defined(HAVE_NANOSLEEP) + struct timespec t; + t.tv_sec = microseconds / 1000000; + t.tv_nsec = 1000 * (microseconds % 1000000); + while (nanosleep(&t, &t) == -1) + { + if (errno != EINTR) + { + assert(false); + // Fallback to millisleep in release + NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000); + return ; + } + } +#else + // Fallback to millisleep + NdbSleep_MilliSleep(1 + (microseconds - 1) / 1000); +#endif +} + static inline void NdbSleep_MilliSleep(int milliseconds) { === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-30 14:28:55 +0000 +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-06-07 14:46:55 +0000 @@ -294,6 +294,13 @@ public: const NodeBitmask& get_status_overloaded() const; /** + * Set or clear slowdown bit. + * Query if any slowdown bit is set. + */ + void set_status_slowdown(Uint32 nodeId, bool val); + const NodeBitmask& get_status_slowdown() const; + + /** * prepareSend * * When IOState is HaltOutput or HaltIO do not send or insert any @@ -433,8 +440,10 @@ private: /** * Overloaded bits, for fast check. + * Similarly slowdown bits for fast check. */ NodeBitmask m_status_overloaded; + NodeBitmask m_status_slowdown; /** * Unpack signal data. @@ -593,6 +602,8 @@ TransporterRegistry::set_status_overload assert(nodeId < MAX_NODES); if (val != m_status_overloaded.get(nodeId)) m_status_overloaded.set(nodeId, val); + if (val) + set_status_slowdown(nodeId, val); } inline const NodeBitmask& @@ -601,4 +612,18 @@ TransporterRegistry::get_status_overload return m_status_overloaded; } +inline void +TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val) +{ + assert(nodeId < MAX_NODES); + if (val != m_status_slowdown.get(nodeId)) + m_status_slowdown.set(nodeId, val); +} + +inline const NodeBitmask& +TransporterRegistry::get_status_slowdown() const +{ + return m_status_slowdown; +} + #endif // Define of TransporterRegistry_H === modified file 'storage/ndb/ndb_configure.cmake' --- a/storage/ndb/ndb_configure.cmake 2012-02-07 19:09:05 +0000 +++ b/storage/ndb/ndb_configure.cmake 2012-06-11 14:47:18 +0000 @@ -47,6 +47,7 @@ INCLUDE(ndb_require_variable) CHECK_FUNCTION_EXISTS(posix_memalign HAVE_POSIX_MEMALIGN) CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) +CHECK_FUNCTION_EXISTS(nanosleep HAVE_NANOSLEEP) CHECK_FUNCTION_EXISTS(pthread_condattr_setclock HAVE_PTHREAD_CONDATTR_SETCLOCK) CHECK_FUNCTION_EXISTS(pthread_self HAVE_PTHREAD_SELF) CHECK_FUNCTION_EXISTS(sched_get_priority_min HAVE_SCHED_GET_PRIORITY_MIN) === modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp' --- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2012-01-16 08:42:18 +0000 +++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2012-06-07 14:46:55 +0000 @@ -112,6 +112,10 @@ TCP_Transporter::TCP_Transporter(Transpo setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0); m_overload_limit = overload_limit(conf); + /** + * Always set slowdown limit to 60% of overload limit + */ + m_slowdown_limit = m_overload_limit * 6 / 10; } === modified file 'storage/ndb/src/common/transporter/Transporter.cpp' --- a/storage/ndb/src/common/transporter/Transporter.cpp 2012-04-24 08:33:27 +0000 +++ b/storage/ndb/src/common/transporter/Transporter.cpp 2012-06-07 14:46:55 +0000 @@ -43,7 +43,8 @@ Transporter::Transporter(TransporterRegi : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId), isServer(lNodeId==serverNodeId), m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer), - m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection), + m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF), + isMgmConnection(_isMgmConnection), m_connected(false), m_type(_type), m_transporter_registry(t_reg) === modified file 'storage/ndb/src/common/transporter/Transporter.hpp' --- a/storage/ndb/src/common/transporter/Transporter.hpp 2012-04-24 08:33:27 +0000 +++ b/storage/ndb/src/common/transporter/Transporter.hpp 2012-06-07 14:46:55 +0000 @@ -90,6 +90,8 @@ public: { m_transporter_registry.set_status_overloaded(remoteNodeId, used >= m_overload_limit); + m_transporter_registry.set_status_slowdown(remoteNodeId, + used >= m_slowdown_limit); } virtual int doSend() = 0; @@ -155,6 +157,7 @@ protected: Uint32 m_max_send_buffer; /* Overload limit, as configured with the OverloadLimit config parameter. */ Uint32 m_overload_limit; + Uint32 m_slowdown_limit; private: === modified file 'storage/ndb/src/common/util/HashMap2.cpp' --- a/storage/ndb/src/common/util/HashMap2.cpp 2011-09-07 22:50:01 +0000 +++ b/storage/ndb/src/common/util/HashMap2.cpp 2012-06-07 14:06:59 +0000 @@ -157,14 +157,14 @@ TAPTEST(HashMap2) /* Test iterator Api */ HashMap2::Iterator it(hash1); - IntIntKVPod* j; + IntIntKVPod* k; for (int i=0; i < 2; i++) { int count = 0; - while((j = it.next())) + while((k = it.next())) { - OK( j->b == ((j->a * 3) - i) ); - j->b--; + OK( k->b == ((k->a * 3) - i) ); + k->b--; count++; } OK( count == 100 ); === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-05-31 14:56:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-06-11 14:47:18 +0000 @@ -11353,13 +11353,13 @@ void Dblqh::scanTupkeyConfLab(Signal* si scanptr.p->m_curr_batch_size_rows = rows + 1; scanptr.p->m_last_row = tdata5; - const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded(); + const NodeBitmask& all = globalTransporterRegistry.get_status_slowdown(); if (unlikely(!all.isclear())) { if (all.get(refToNode(scanptr.p->scanApiBlockref))) { /** - * End scan batch if transporter-buffer are overloaded + * End scan batch if transporter-buffer are in slowdown state * * TODO: We should have counters for this... */ === modified file 'storage/ndb/src/ndbapi/Ndb.cpp' --- a/storage/ndb/src/ndbapi/Ndb.cpp 2011-10-17 12:43:31 +0000 +++ b/storage/ndb/src/ndbapi/Ndb.cpp 2012-06-07 14:49:35 +0000 @@ -162,6 +162,8 @@ Ndb::NDB_connect(Uint32 tNode, Uint32 in if (prev != 0) { prev->theNext = curr->theNext; + if (!curr->theNext) + theConnectionArrayLast[tNode] = prev; curr->theNext = tConArray; theConnectionArray[tNode] = curr; } @@ -210,12 +212,10 @@ Ndb::NDB_connect(Uint32 tNode, Uint32 in //************************************************ // Send and receive was successful //************************************************ - NdbTransaction* tPrevFirst = theConnectionArray[tNode]; tNdbCon->setConnectedNodeId(tNode, nodeSequence); tNdbCon->setMyBlockReference(theMyRef); - theConnectionArray[tNode] = tNdbCon; - tNdbCon->theNext = tPrevFirst; + prependConnectionArray(tNdbCon, tNode); DBUG_RETURN(1); } else { //**************************************************************************** @@ -261,6 +261,8 @@ Ndb::getConnectedNdbTransaction(Uint32 n { assert(false); // Should have been moved in NDB_connect prev->theNext = next->theNext; + if (!next->theNext) + theConnectionArrayLast[nodeId] = prev; goto found_middle; } else @@ -276,7 +278,7 @@ Ndb::getConnectedNdbTransaction(Uint32 n return 0; } found_first: - theConnectionArray[nodeId] = next->theNext; + removeConnectionArray(next, nodeId); found_middle: next->theNext = NULL; @@ -944,6 +946,48 @@ Ndb::startTransactionLocal(Uint32 aPrior DBUG_RETURN(tConnection); }//Ndb::startTransactionLocal() +void +Ndb::appendConnectionArray(NdbTransaction *aCon, Uint32 nodeId) +{ + NdbTransaction *last = theConnectionArrayLast[nodeId]; + if (last) + { + last->theNext = aCon; + } + else + { + theConnectionArray[nodeId] = aCon; + } + aCon->theNext = NULL; + theConnectionArrayLast[nodeId] = aCon; +} + +void +Ndb::prependConnectionArray(NdbTransaction *aCon, Uint32 nodeId) +{ + NdbTransaction *first = theConnectionArray[nodeId]; + aCon->theNext = first; + if (!first) + { + theConnectionArrayLast[nodeId] = aCon; + } + theConnectionArray[nodeId] = aCon; +} + +void +Ndb::removeConnectionArray(NdbTransaction *first, Uint32 nodeId) +{ + NdbTransaction *next = first->theNext; + if (!next) + { + theConnectionArray[nodeId] = theConnectionArrayLast[nodeId] = NULL; + } + else + { + theConnectionArray[nodeId] = next; + } +} + /***************************************************************************** void closeTransaction(NdbTransaction* aConnection); @@ -1044,8 +1088,8 @@ Ndb::closeTransaction(NdbTransaction* aC /** * Put it back in idle list for that node */ - aConnection->theNext = theConnectionArray[nodeId]; - theConnectionArray[nodeId] = aConnection; + appendConnectionArray(aConnection, nodeId); + DBUG_VOID_RETURN; } else { aConnection->theReleaseOnClose = false; === modified file 'storage/ndb/src/ndbapi/Ndbinit.cpp' --- a/storage/ndb/src/ndbapi/Ndbinit.cpp 2011-10-17 12:43:31 +0000 +++ b/storage/ndb/src/ndbapi/Ndbinit.cpp 2012-06-07 14:49:35 +0000 @@ -61,6 +61,7 @@ void Ndb::setup(Ndb_cluster_connection * theMinNoOfEventsToWakeUp= 0; theTransactionList= NULL; theConnectionArray= NULL; + theConnectionArrayLast= NULL; the_last_check_time= 0; theFirstTransId= 0; theRestartGCI= 0; @@ -83,13 +84,15 @@ void Ndb::setup(Ndb_cluster_connection * theError.code = 0; theConnectionArray = new NdbConnection * [MAX_NDB_NODES]; + theConnectionArrayLast = new NdbConnection * [MAX_NDB_NODES]; theCommitAckSignal = NULL; theCachedMinDbNodeVersion = 0; int i; for (i = 0; i < MAX_NDB_NODES ; i++) { theConnectionArray[i] = NULL; - }//forg + theConnectionArrayLast[i] = NULL; + }//for m_sys_tab_0 = NULL; theImpl->m_dbname.assign(aDataBase); @@ -158,6 +161,7 @@ Ndb::~Ndb() releaseTransactionArrays(); delete []theConnectionArray; + delete []theConnectionArrayLast; if(theCommitAckSignal != NULL){ delete theCommitAckSignal; theCommitAckSignal = NULL; === modified file 'storage/ndb/src/ndbapi/Ndblist.cpp' --- a/storage/ndb/src/ndbapi/Ndblist.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/ndbapi/Ndblist.cpp 2012-06-07 14:49:35 +0000 @@ -44,6 +44,7 @@ Ndb::checkFailedNode() */ NdbTransaction * tNdbCon = theConnectionArray[node_id]; theConnectionArray[node_id] = NULL; + theConnectionArrayLast[node_id] = NULL; while (tNdbCon != NULL) { NdbTransaction* tempNdbCon = tNdbCon; tNdbCon = tNdbCon->next(); === modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp' --- a/storage/ndb/test/ndbapi/flexAsynch.cpp 2012-02-14 08:16:48 +0000 +++ b/storage/ndb/test/ndbapi/flexAsynch.cpp 2012-06-11 13:30:32 +0000 @@ -1,5 +1,5 @@ /* - Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved. + Copyright (c) 2003, 2011, 2012, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,6 +24,8 @@ #include #include +#include +#include #include #include #include @@ -39,6 +41,9 @@ #define MAXATTR 511 #define MAXTABLES 1 #define NDB_MAXTHREADS 128 +#define MAX_EXECUTOR_THREADS 128 +#define MAX_DEFINER_THREADS 32 +#define MAX_REAL_THREADS 160 #define NDB_MAX_NODES 48 /* NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a @@ -52,22 +57,22 @@ #define PKSIZE 2 enum StartType { - stIdle, - stInsert, - stRead, - stUpdate, - stDelete, - stStop + stIdle = 0, + stInsert = 1, + stRead = 2, + stUpdate = 3, + stDelete = 4, + stStop = 5 } ; enum RunType { - RunInsert, - RunRead, - RunUpdate, - RunDelete, - RunCreateTable, - RunDropTable, - RunAll + RunInsert = 1, + RunRead = 2, + RunUpdate = 3, + RunDelete = 4, + RunCreateTable = 5, + RunDropTable = 6, + RunAll = 7 }; struct ThreadNdb @@ -85,7 +90,7 @@ static void dropTables(Ndb* pMyNdb); static int createTables(Ndb*); static void defineOperation(NdbConnection* aTransObject, StartType aType, Uint32 base, Uint32 aIndex); -static void defineNdbRecordOperation(ThreadNdb*, NdbConnection* aTransObject, StartType aType, +static void defineNdbRecordOperation(char*, NdbConnection* aTransObject, StartType aType, Uint32 base, Uint32 aIndex); static void execute(StartType aType); static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int); @@ -97,17 +102,20 @@ static bool error_handler(const NdbError static void input_error(); static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo); +static void main_thread(RunType run_type, NdbTimer & timer); +static Uint64 get_total_transactions(); +static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer); static int retry_opt = 3 ; static int failed = 0 ; ErrorData * flexAsynchErrorData; -static NdbThread* threadLife[NDB_MAXTHREADS]; +static NdbThread* threadLife[MAX_REAL_THREADS]; static int tNodeId; -static int ThreadReady[NDB_MAXTHREADS]; -static longlong ThreadExecutions[NDB_MAXTHREADS]; -static StartType ThreadStart[NDB_MAXTHREADS]; +static int ThreadReady[MAX_REAL_THREADS]; +static longlong ThreadExecutions[MAX_REAL_THREADS]; +static StartType ThreadStart[NDB_MAXTHREADS]; static char tableName[MAXTABLES][MAXSTRLEN+1]; static const NdbDictionary::Table * tables[MAXTABLES]; static char attrName[MAXATTR][MAXSTRLEN+1]; @@ -136,6 +144,8 @@ static unsigned int tLoadFac static bool tempTable = false; static bool startTransGuess = true; static int tExtraReadLoop = 0; +static bool tNew = false; +static bool tImmediate = false; //Program Flags static int theTestFlag = 0; @@ -177,13 +187,13 @@ resetThreads(){ } static void -waitForThreads(void) +waitForThreads(Uint32 num_threads_to_wait_for) { int cont = 0; do { cont = 0; NdbSleep_MilliSleep(20); - for (unsigned i = 0; i < tNoOfThreads ; i++) { + for (unsigned i = 0; i < num_threads_to_wait_for ; i++) { if (ThreadReady[i] == 0) { cont = 1; }//if @@ -204,9 +214,8 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f { ndb_init(); ThreadNdb* pThreadData; - int tLoops=0; - int returnValue = NDBT_OK; DEFINE_TIMER; + int returnValue = NDBT_OK; flexAsynchErrorData = new ErrorData; flexAsynchErrorData->resetErrorCounters(); @@ -256,7 +265,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f ndbout << endl; - NdbThread_SetConcurrencyLevel(2 + tNoOfThreads); + NdbThread_SetConcurrencyLevel(2 + (tNoOfThreads * 5 / 4)); /* print Setting */ flexAsynchErrorData->printSettings(ndbout); @@ -311,7 +320,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f } } - if (tNdbRecord) + if (tNdbRecord && !tNew) { Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]); sz += 3; @@ -325,260 +334,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f if(returnValue == NDBT_OK && tRunType != RunCreateTable && tRunType != RunDropTable){ - /**************************************************************** - * Create NDB objects. * - ****************************************************************/ - resetThreads(); - for (Uint32 i = 0; i < tNoOfThreads ; i++) { - pThreadData[i].ThreadNo = i; - threadLife[i] = NdbThread_Create(threadLoop, - (void**)&pThreadData[i], - 32768, - "flexAsynchThread", - NDB_THREAD_PRIO_LOW); - }//for - ndbout << endl << "All NDB objects and table created" << endl << endl; - int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads; - /**************************************************************** - * Execute program. * - ****************************************************************/ - - for(;;) { - - int loopCount = tLoops + 1 ; - ndbout << endl << "Loop # " << loopCount << endl << endl ; - - /**************************************************************** - * Perform inserts. * - ****************************************************************/ - - failed = 0 ; - if (tRunType == RunAll || tRunType == RunInsert){ - ndbout << "Executing inserts" << endl; - START_TIMER; - execute(stInsert); - STOP_TIMER; - } - if (tRunType == RunAll){ - a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime()); - PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); - - if (0 < failed) { - int i = retry_opt ; - int ci = 1 ; - while (0 < failed && 0 < i){ - ndbout << failed << " of the transactions returned errors!" - << endl << endl; - ndbout << "Attempting to redo the failed transactions now..." - << endl ; - ndbout << "Redo attempt " << ci <<" out of " << retry_opt - << endl << endl; - failed = 0 ; - START_TIMER; - execute(stInsert); - STOP_TIMER; - PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); - i-- ; - ci++; - } - if(0 == failed ){ - ndbout << endl <<"Redo attempt succeeded" << endl << endl; - }else{ - ndbout << endl <<"Redo attempt failed, moving on now..." << endl - << endl; - }//if - }//if - }//if - /**************************************************************** - * Perform read. * - ****************************************************************/ - - failed = 0 ; - - if (tRunType == RunAll || tRunType == RunRead){ - for (int ll = 0; ll < 1 + tExtraReadLoop; ll++) - { - ndbout << "Executing reads" << endl; - START_TIMER; - execute(stRead); - STOP_TIMER; - if (tRunType == RunAll){ - a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime()); - PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); - }//if - }//for - }//if - - if (tRunType == RunAll){ - if (0 < failed) { - int i = retry_opt ; - int cr = 1; - while (0 < failed && 0 < i){ - ndbout << failed << " of the transactions returned errors!"<record, tConArray[num_ops], aType, threadBaseLoc2, @@ -944,24 +722,31 @@ executeCallback(int result, NdbConnectio NdbConnection **array_ref = (NdbConnection**)aObject; assert(NdbObject == *array_ref); *array_ref = NULL; - if (result == -1) { - - // Add complete error handling here + if (result == -1 && failed < 100) + { - int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError()); - if (retCode == 1) { - if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){ - ndbout_c("execute: %s", NdbObject->getNdbError().message); - ndbout_c("Error code = %d", NdbObject->getNdbError().code);} - } else if (retCode == 2) { - ndbout << "4115 should not happen in flexAsynch" << endl; - } else if (retCode == 3) { - /* What can we do here? */ - ndbout_c("execute: %s", NdbObject->getNdbError().message); - }//if(retCode == 3) + // Add complete error handling here - // ndbout << "Error occured in poll:" << endl; - // ndbout << NdbObject->getNdbError() << endl; + int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError()); + if (retCode == 1) + { + if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630) + { + ndbout_c("execute: %s", NdbObject->getNdbError().message); + ndbout_c("Error code = %d", NdbObject->getNdbError().code); + } + } + else if (retCode == 2) + { + ndbout << "4115 should not happen in flexAsynch" << endl; + } + else if (retCode == 3) + { + /* What can we do here? */ + ndbout_c("execute: %s", NdbObject->getNdbError().message); + }//if(retCode == 3) + // ndbout << "Error occured in poll:" << endl; + // ndbout << NdbObject->getNdbError() << endl; failed++ ; }//if NdbObject->close(); /* Close transaction */ @@ -1067,11 +852,12 @@ defineOperation(NdbConnection* localNdbC static void -defineNdbRecordOperation(ThreadNdb* pThread, - NdbConnection* pTrans, StartType aType, - Uint32 threadBase, Uint32 aIndex) +defineNdbRecordOperation(char *record, + NdbConnection* pTrans, + StartType aType, + Uint32 threadBase, + Uint32 aIndex) { - char * record = pThread->record; Uint32 offset; NdbDictionary::getOffset(g_record[0], 0, offset); * (Uint32*)(record + offset) = threadBase; @@ -1102,15 +888,24 @@ defineNdbRecordOperation(ThreadNdb* pThr break; }//case case stRead: { // Read Case - op = pTrans->readTuple(g_record[0],record,g_record[0],record, NdbOperation::LM_CommittedRead); + op = pTrans->readTuple(g_record[0], + record, + g_record[0], + record, + NdbOperation::LM_CommittedRead); break; }//case case stUpdate:{ // Update Case - op = pTrans->updateTuple(g_record[0],record,g_record[0],record); + op = pTrans->updateTuple(g_record[0], + record, + g_record[0], + record); break; }//case case stDelete: { // Delete Case - op = pTrans->deleteTuple(g_record[0],record, g_record[0]); + op = pTrans->deleteTuple(g_record[0], + record, + g_record[0]); break; }//case default: { @@ -1193,6 +988,25 @@ setUpNodeTableArray(Uint32 tableNo, cons } static Uint32 +get_node_relative_id(Uint32 tableNo, Uint32 node_id) +{ + Uint32 rel_id = 0; + + for (Uint32 i = 1; i < node_id; i++) + { + if (nodeTableArray[tableNo][i]) + rel_id++; + } + return rel_id; +} + +static Uint32 +get_node_count(Uint32 tableNo) +{ + return get_node_relative_id(tableNo, NDB_MAX_NODES + 1); +} + +static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo) { Uint32 count = 0; @@ -1327,6 +1141,859 @@ setAggregateRun(void) theTableCreateFlag = 1; } +/* Start NEW Module */ + +/** + * This part contains the code used for the case --local 4 which is using + * the design pattern that could be used for asynchronous applications of + * the NDB API. + * + * This variant will always use transaction hints, it will always the + * NDB Record format in the NDB API. + */ + +static void* definer_thread(void *data); +static void* executor_thread(void *data); + +static Uint32 tNoOfExecutorThreads = 0; +static Uint32 tNoOfDefinerThreads = 0; + +enum RunState +{ + WARMUP = 0, + EXECUTING = 1, + COOLDOWN = 2 +}; + +RunState tRunState = WARMUP; + +typedef struct KeyOperation KEY_OPERATION; +struct KeyOperation +{ + Uint32 first_key; + Uint32 second_key; + Uint32 definer_thread_id; + Uint32 executor_thread_id; + RunType operation_type; + KEY_OPERATION *next_key_op; +}; + +typedef struct key_list_header KEY_LIST_HEADER; +struct key_list_header +{ + KEY_OPERATION *first_in_list; + KEY_OPERATION *last_in_list; + Uint32 num_in_list; +}; + + +typedef struct thread_data_struct THREAD_DATA; +struct thread_data_struct +{ + KEY_LIST_HEADER list_header; + Uint32 thread_id; + bool ready; + bool stop; + bool start; + + char *record; + NdbMutex *transport_mutex; + struct NdbCondition *transport_cond; + struct NdbCondition *main_cond; + struct NdbCondition *start_cond; + char not_used[52]; +}; +THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS]; + +static Uint64 +get_total_transactions() +{ + Uint64 total_transactions = 0; + + for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++) + { + total_transactions += ThreadExecutions[i]; + } + return total_transactions; +} + +static void +init_list_headers(KEY_LIST_HEADER *list_header, + Uint32 num_list_headers) +{ + Uint32 i; + KEY_LIST_HEADER *list_header_ref; + char *list_header_ptr = (char*)list_header; + for (i = 0; + i < num_list_headers; + i++, list_header_ptr += sizeof(KEY_LIST_HEADER)) + { + list_header_ref = (KEY_LIST_HEADER*)list_header_ptr; + list_header_ref->first_in_list = NULL; + list_header_ref->last_in_list = NULL; + list_header_ref->num_in_list = 0; + } +} + +static void +wait_thread_ready(THREAD_DATA *my_thread_data) +{ + NdbMutex_Lock(my_thread_data->transport_mutex); + while (1) + { + if (my_thread_data->ready) + break; + NdbCondition_Wait(my_thread_data->main_cond, + my_thread_data->transport_mutex); + } + NdbMutex_Unlock(my_thread_data->transport_mutex); +} + +static void +wait_for_threads_ready(Uint32 num_threads) +{ + for (Uint32 i = 0; i < num_threads; i++) + wait_thread_ready(&thread_data_array[i]); +} + +static void +signal_thread_to_start(THREAD_DATA *my_thread_data) +{ + NdbMutex_Lock(my_thread_data->transport_mutex); + my_thread_data->start = true; + my_thread_data->ready = false; + NdbCondition_Signal(my_thread_data->start_cond); + NdbMutex_Unlock(my_thread_data->transport_mutex); +} + +static void +signal_definer_threads_to_start() +{ + for (Uint32 i = 0; i < tNoOfDefinerThreads; i++) + signal_thread_to_start(&thread_data_array[i]); +} + +static void +signal_executor_threads_to_start() +{ + for (Uint32 i = 0; i < tNoOfExecutorThreads; i++) + signal_thread_to_start(&thread_data_array[tNoOfDefinerThreads + i]); +} + +static void +signal_thread_ready_wait_for_start(THREAD_DATA *my_thread_data) +{ + NdbMutex_Lock(my_thread_data->transport_mutex); + my_thread_data->ready = true; + NdbCondition_Signal(my_thread_data->main_cond); + while (1) + { + if (my_thread_data->start) + break; + NdbCondition_Wait(my_thread_data->start_cond, + my_thread_data->transport_mutex); + } + my_thread_data->start = false; + NdbMutex_Unlock(my_thread_data->transport_mutex); +} + +static void +signal_thread_to_stop(THREAD_DATA *my_thread_data) +{ + NdbMutex_Lock(my_thread_data->transport_mutex); + my_thread_data->stop = true; + NdbCondition_Signal(my_thread_data->transport_cond); + NdbMutex_Unlock(my_thread_data->transport_mutex); +} + +static void +signal_definer_threads_to_stop() +{ + for (Uint32 i = 0; i < tNoOfDefinerThreads; i++) + signal_thread_to_stop(&thread_data_array[i]); +} + +static void +signal_executor_threads_to_stop() +{ + for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++) + signal_thread_to_stop(&thread_data_array[i]); +} + +static void +destroy_thread_data(THREAD_DATA *my_thread_data) +{ + free(my_thread_data->record); + NdbMutex_Destroy(my_thread_data->transport_mutex); + NdbCondition_Destroy(my_thread_data->transport_cond); + NdbCondition_Destroy(my_thread_data->start_cond); + NdbCondition_Destroy(my_thread_data->main_cond); +} + +static void +init_thread_data(THREAD_DATA *my_thread_data, Uint32 thread_id) +{ + Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]); + my_thread_data->record = (char*)malloc(sz); + memset(my_thread_data->record, 0, sz); + init_list_headers(&my_thread_data->list_header, 1); + my_thread_data->stop = false; + my_thread_data->ready = false; + my_thread_data->start = false; + my_thread_data->transport_mutex = NdbMutex_Create(); + my_thread_data->transport_cond = NdbCondition_Create(); + my_thread_data->main_cond = NdbCondition_Create(); + my_thread_data->start_cond = NdbCondition_Create(); + my_thread_data->thread_id = thread_id; +} + +static void +create_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id) +{ + init_thread_data(my_thread_data, thread_id); + threadLife[thread_id] = NdbThread_Create(definer_thread, + (void**)my_thread_data, + 1024 * 1024, + "flexAsynchThread", + NDB_THREAD_PRIO_LOW); +} + +static void +create_definer_threads() +{ + for (Uint32 i = 0; i < tNoOfDefinerThreads; i++) + { + Uint32 thread_id = i; + create_definer_thread(&thread_data_array[thread_id], thread_id); + } +} + +static void +create_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id) +{ + init_thread_data(my_thread_data, thread_id); + threadLife[thread_id] = NdbThread_Create(executor_thread, + (void**)my_thread_data, + 1024 * 1024, + "flexAsynchThread", + NDB_THREAD_PRIO_LOW); +} + +static void +create_executor_threads() +{ + for (Uint32 i = 0; i < tNoOfExecutorThreads; i++) + { + Uint32 thread_id = tNoOfDefinerThreads + i; + create_executor_thread(&thread_data_array[thread_id], thread_id); + } +} + +static void +main_thread(RunType start_type, NdbTimer & timer) +{ + bool insert_delete; + + tNoOfExecutorThreads = tNoOfThreads; + if (tNoOfDefinerThreads == 0) + { + tNoOfDefinerThreads = (tNoOfThreads + 3)/4; + } + tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads; + + if (start_type == RunInsert || + start_type == RunDelete) + insert_delete = true; + else + insert_delete = false; + + create_definer_threads(); + create_executor_threads(); + + wait_for_threads_ready(tNoOfThreads); + + /** + * Start threads, start with execution threads to ensure they are + * up and running before definer threads starts sending data to + * them + */ + START_TIMER; + signal_definer_threads_to_start(); + signal_executor_threads_to_start(); + + if (!insert_delete) + { + sleep(tWarmupTime); + tRunState = EXECUTING; + sleep(tExecutionTime); + tRunState = COOLDOWN; + sleep(tCooldownTime); + signal_definer_threads_to_stop(); + } + wait_for_threads_ready(tNoOfDefinerThreads); + STOP_TIMER; + + signal_executor_threads_to_stop(); + wait_for_threads_ready(tNoOfThreads); + + /** + * Now all threads are stopped and prepared to be destroyed, + * now start them just to destroy themselves + */ + signal_definer_threads_to_start(); + signal_executor_threads_to_start(); + + void * tmp; + for (Uint32 i = 0; i < tNoOfThreads; i++) + { + NdbThread_WaitFor(threadLife[i], &tmp); + NdbThread_Destroy(&threadLife[i]); + } +} + +static NdbConnection* +get_trans_object(Uint32 first_key, + Uint32 second_key, + Ndb *my_ndb) +{ + union { + Uint64 _align; + Uint32 Tkey32[2]; + }; + (void)_align; + + Tkey32[0] = first_key; + Tkey32[1] = second_key; + Ndb::Key_part_ptr hint[2]; + hint[0].ptr = Tkey32+0; + hint[0].len = 4; + hint[1].ptr = 0; + hint[1].len = 0; + + return my_ndb->startTransaction(tables[0], hint); +} + +static Ndb* +get_ndb_object(Uint32 my_thread_id) +{ + Ndb *my_ndb = new Ndb(g_cluster_connection+(my_thread_id % tConnections), + "TEST_DB"); + my_ndb->init(MAXPAR); + my_ndb->waitUntilReady(10000); + return my_ndb; +} + +static void +insert_list(KEY_LIST_HEADER *list_header, + KEY_OPERATION *insert_op) +{ + KEY_OPERATION *current_last = list_header->last_in_list; + insert_op->next_key_op = NULL; + list_header->last_in_list = insert_op; + if (current_last) + current_last->next_key_op = insert_op; + else + list_header->first_in_list = insert_op; + list_header->num_in_list++; +} + +static KEY_OPERATION* +get_first_free(KEY_LIST_HEADER *list_header) +{ + assert(list_header->first_in_list); + KEY_OPERATION *key_op = list_header->first_in_list; + list_header->first_in_list = key_op->next_key_op; + list_header->num_in_list--; + if (!list_header->first_in_list) + { + list_header->last_in_list = NULL; + } + key_op->next_key_op = NULL; + return key_op; +} + +static void +move_list(KEY_LIST_HEADER *src_list_header, + KEY_LIST_HEADER *dst_list_header) +{ + KEY_OPERATION *last_completed_op = dst_list_header->last_in_list; + KEY_OPERATION *first_in_list = src_list_header->first_in_list; + if (!first_in_list) + return; + if (last_completed_op) + { + last_completed_op->next_key_op = first_in_list; + } + else + { + dst_list_header->first_in_list = first_in_list; + } + dst_list_header->last_in_list = src_list_header->last_in_list; + dst_list_header->num_in_list += src_list_header->num_in_list; + src_list_header->num_in_list = 0; + src_list_header->first_in_list = NULL; + src_list_header->last_in_list = NULL; +} + +/** + * Retrieve a linked list of prepared operations. If no operations + * prepared we wait on a condition until operations are defined for + * us to execute. + */ +static void +receive_operations(THREAD_DATA *my_thread_data, + KEY_LIST_HEADER *list_header, + bool wait) +{ + bool first = true; + KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header; + list_header->first_in_list = NULL; + list_header->last_in_list = NULL; + list_header->num_in_list = 0; + +recheck: + NdbMutex_Lock(my_thread_data->transport_mutex); + while (!my_thread_data->stop && + (first || thread_list_header->first_in_list)) + { + move_list(thread_list_header, list_header); + if (list_header->first_in_list) + break; + NdbCondition_Wait(my_thread_data->transport_cond, + my_thread_data->transport_mutex); + first = false; + } + NdbMutex_Unlock(my_thread_data->transport_mutex); + if (first && wait && + thread_list_header->num_in_list < ((tNoOfParallelTrans + 1) / 2)) + { + /** + * We will wait for at least 200 microseconds if we haven't yet received + * at least half of the number of records we desire to execute. + */ + NdbSleep_MicroSleep(200); + first = false; + goto recheck; + } +} + +static void +send_operations(Uint32 thread_id, + KEY_LIST_HEADER *list_header) +{ + THREAD_DATA *recv_thread = &thread_data_array[thread_id]; + + NdbMutex_Lock(recv_thread->transport_mutex); + /** + * We are moving operations into the list, thus we need + * to wake any threads waiting for operations to execute. + */ + move_list(list_header, + &recv_thread->list_header); + NdbCondition_Signal(recv_thread->transport_cond); + NdbMutex_Unlock(recv_thread->transport_mutex); +} + +static void +init_key_op_list(char *key_op_ptr, + KEY_LIST_HEADER *list_header, + Uint32 max_outstanding, + Uint32 my_thread_id, + RunType my_run_type) +{ + KEY_OPERATION *key_op; + + list_header->first_in_list = (KEY_OPERATION*)key_op_ptr; + for (Uint32 i = 0; + i < max_outstanding; + i++, key_op_ptr += sizeof(KEY_OPERATION)) + { + key_op = (KEY_OPERATION*)key_op_ptr; + key_op->next_key_op = (KEY_OPERATION*)(key_op_ptr + sizeof(KEY_OPERATION)); + key_op->definer_thread_id = my_thread_id; + key_op->executor_thread_id = MAX_EXECUTOR_THREADS; + key_op->operation_type = my_run_type; + } + key_op->next_key_op = NULL; /* Last key operation */ + list_header->last_in_list = key_op; + list_header->num_in_list = max_outstanding; +} + +static Uint32 +get_thread_id_for_record(Uint32 record_id, + Uint32 node_count, + Uint32 thread_count, + Uint32 thread_group, + Uint32 num_thread_groups, + Ndb *my_ndb) +{ + Uint32 thread_id; + NdbConnection *trans = get_trans_object(record_id, record_id, my_ndb); + Uint32 node_id = trans->getConnectedNodeId(); + trans->close(); + Uint32 node_rel_id = get_node_relative_id((Uint32)0, node_id); + if (node_count >= thread_count) + { + thread_id = node_rel_id % thread_count; + return thread_id; + } + +recalculate: + thread_id = thread_group * node_count + node_rel_id; + if (thread_id >= thread_count) + { + /** + * Only the last thread group may have less than + * node_count threads, so choosing a random group + * except the last will always give a valid + * thread_id. + */ + thread_group = rand() % (num_thread_groups - 1); + goto recalculate; + } + return thread_id; +} + +void init_thread_id_mem(char *thread_id_mem, + Uint32 first_record, + Uint32 total_records, + Ndb *my_ndb) +{ + Uint32 node_count = get_node_count((Uint32)0); + Uint32 thread_count = tNoOfExecutorThreads; + Uint32 num_thread_groups = (thread_count + node_count - 1) / node_count; + Uint32 thread_group = 0; + for (Uint32 record_id = first_record, i = 0; + i < total_records; + i++, record_id++) + { + thread_id_mem[i] = (char)get_thread_id_for_record(record_id, + node_count, + thread_count, + thread_group, + num_thread_groups, + my_ndb); + thread_group++; + if (thread_group == num_thread_groups) + thread_group = 0; + } +} + +static bool +check_for_outstanding(Uint32 *thread_state) +{ + for (Uint32 i = 0; i < tNoOfExecutorThreads; i++) + { + if (thread_state[i]) + return true; + } + return false; +} + +static void +update_thread_state(KEY_LIST_HEADER *list_header, + Uint32 *thread_state) +{ + KEY_OPERATION *key_op = list_header->first_in_list; + + while (key_op) + { + thread_state[key_op->executor_thread_id]--; + key_op->executor_thread_id = MAX_EXECUTOR_THREADS; + key_op = key_op->next_key_op; + } +} + +static void +wait_until_all_completed(THREAD_DATA *my_thread_data, + Uint32 *thread_state, + KEY_LIST_HEADER *free_list_header) +{ + KEY_LIST_HEADER list_header; + bool outstanding = true; + while (outstanding && !my_thread_data->stop) + { + receive_operations(my_thread_data, &list_header, false); + update_thread_state(&list_header, thread_state); + move_list(&list_header, free_list_header); + outstanding = check_for_outstanding(thread_state); + } +} + +static Uint32 +prepare_operations(char *thread_id_mem, + KEY_LIST_HEADER *free_list_header, + Uint32 *thread_state, + Uint32 first_record_to_define, + Uint32 num_records_to_define, + Uint32 first_record, + Uint32 last_record, + Uint32 max_per_thread) +{ + KEY_LIST_HEADER thread_list_headers[MAX_EXECUTOR_THREADS]; + Uint32 record_id, i, num_records; + + init_list_headers(&thread_list_headers[0], tNoOfExecutorThreads); + for (record_id = first_record_to_define, i = 0; + record_id <= last_record && i < num_records_to_define; + record_id++, i++) + { + KEY_OPERATION *define_op = get_first_free(free_list_header); + Uint32 thread_id = (Uint32)thread_id_mem[record_id - first_record]; + define_op->first_key = record_id; + define_op->second_key = record_id; + define_op->executor_thread_id = thread_id; + thread_state[thread_id]++; + KEY_LIST_HEADER *thread_list_header = &thread_list_headers[thread_id]; + insert_list(thread_list_header, define_op); + if (thread_list_header->num_in_list >= max_per_thread) + { + /** + * One thread has max number of records, we won't define any + * more to keep the code simple. + */ + break; + } + } + num_records = i; + for (i = 0; i < tNoOfExecutorThreads; i++) + { + KEY_LIST_HEADER *thread_list_header = &thread_list_headers[i]; + if (thread_list_header->num_in_list) + { + send_operations(tNoOfDefinerThreads + i, thread_list_header); + } + } + return num_records; +} + +static void* +definer_thread(void *data) +{ + THREAD_DATA *my_thread_data = (THREAD_DATA*)data; + Uint32 my_thread_id = my_thread_data->thread_id; + RunType run_type = tRunType; + Uint32 thread_state[MAX_EXECUTOR_THREADS]; + Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) / + tNoOfDefinerThreads; + Uint32 max_per_thread = 1000 / tNoOfDefinerThreads; + Uint32 total_records = max_outstanding * tNoOfTransactions; + Uint32 first_record = total_records * my_thread_id; + Uint32 my_last_record = first_record + total_records - 1; + Uint32 current_record = first_record; + KEY_LIST_HEADER free_list_header; + void *key_op_mem = malloc(sizeof(KEY_OPERATION) * max_outstanding); + char *thread_id_mem = (char*)malloc(total_records); + memset((char*)&thread_state[0], 0, sizeof(thread_state)); + + init_key_op_list((char*)key_op_mem, + &free_list_header, + max_outstanding, + my_thread_id, + run_type); + Ndb *my_ndb = get_ndb_object(my_thread_id); + init_thread_id_mem(thread_id_mem, + first_record, + total_records, + my_ndb); + delete my_ndb; + ThreadExecutions[my_thread_id] = 0; + signal_thread_ready_wait_for_start(my_thread_data); + + while (!my_thread_data->stop) + { + Uint32 defined_ops = prepare_operations(thread_id_mem, + &free_list_header, + &thread_state[0], + current_record, + max_outstanding, + first_record, + my_last_record, + max_per_thread); + current_record += defined_ops; + if (defined_ops) + { + wait_until_all_completed(my_thread_data, + &thread_state[0], + &free_list_header); + } + if (current_record > my_last_record) + { + if (run_type != RunRead && + run_type != RunUpdate) + { + /** + * Inserts and deletes are done when first round is + * completed. Reads and updates proceed until time is + * completed. + */ + break; + } + current_record = first_record; + } + } + signal_thread_ready_wait_for_start(my_thread_data); + free(key_op_mem); + free(thread_id_mem); + destroy_thread_data(my_thread_data); + return NULL; +} + +/** + * This method receives a linked list of key operations and executes + * all of them. + * + * Return Value: >= 0 means successful completion of this many operations + * -1 Failure, stop test + */ +static int +execute_operations(char *record, + Ndb* my_ndb, + KEY_OPERATION *key_op) +{ + NdbConnection* ndb_conn_array[MAXPAR]; + Uint32 num_ops = 0; + + while (key_op) + { + ndb_conn_array[num_ops] = get_trans_object(key_op->first_key, + key_op->second_key, + my_ndb); + if (ndb_conn_array[num_ops] == NULL){ + error_handler(my_ndb->getNdbError()); + ndbout << endl << "Unable to recover! Quitting now" << endl ; + return -1; + } + //------------------------------------------------------- + // Define the operation, but do not execute it yet. + //------------------------------------------------------- + defineNdbRecordOperation(record, + ndb_conn_array[num_ops], + (StartType)key_op->operation_type, + key_op->first_key, + key_op->second_key); + + ndb_conn_array[num_ops]->executeAsynchPrepare(Commit, + &executeCallback, + (void*)&ndb_conn_array[num_ops]); + num_ops++; + key_op = key_op->next_key_op; + } + if (num_ops == 0) + return 0; + + /** + * Now execute each defined operation and wait for all of them to + * complete. + */ + int Tcomp = my_ndb->sendPollNdb(3000, + num_ops, + tSendForce); + if (Tcomp != (int)num_ops && + my_ndb->getNdbError().code != 0) + { + /* Error handling */ + if (error_count > 100) + return -1; + + error_count++; + ndbout << "error = " << my_ndb->getNdbError().code << endl; + } + return Tcomp; +} + +static void +report_back_operations(KEY_OPERATION *first_defined_op) +{ + KEY_LIST_HEADER thread_list_header[MAX_DEFINER_THREADS]; + KEY_OPERATION *next_op, *executed_op; + + init_list_headers(&thread_list_header[0], tNoOfDefinerThreads); + executed_op = first_defined_op; + while (executed_op) + { + next_op = executed_op->next_key_op; + insert_list(&thread_list_header[executed_op->definer_thread_id], + executed_op); + executed_op = next_op; + } + for (Uint32 i = 0; i < tNoOfDefinerThreads; i++) + { + if (thread_list_header[i].first_in_list) + { + send_operations(i, &thread_list_header[i]); + } + } +} + +/** + * This is the main function of the executor threads, these threads + * receive linked lists of operations to execute from the definer + * threads. The definer threads stops these threads by simply + * sending a stop operation. + */ +static void* +executor_thread(void *data) +{ + THREAD_DATA *my_thread_data = (THREAD_DATA*)data; + Uint32 my_thread_id = my_thread_data->thread_id; + Uint64 exec_count = 0; + Uint32 error_count = 0; + Uint32 executions = 0; + Uint32 error_flag = false; + int ret_code; + KEY_LIST_HEADER list_header; + + Ndb *my_ndb = get_ndb_object(my_thread_id); + ThreadExecutions[my_thread_id] = 0; + + signal_thread_ready_wait_for_start(my_thread_data); + + while (!my_thread_data->stop) + { + receive_operations(my_thread_data, &list_header, !tImmediate); + if (list_header.num_in_list == 0) + { + break; + } + ret_code = 0; + if (!error_flag) + { + /* Ignore to execute after errors to simplify error handling */ + ret_code = execute_operations(my_thread_data->record, + my_ndb, + list_header.first_in_list); + } + report_back_operations(list_header.first_in_list); + if (ret_code < 0) + { + ndbout_c("executor thread id = %u received error", my_thread_id); + error_flag = true; + } + else if (!error_flag && + (tRunType == RunInsert || + tRunType == RunDelete || + tRunState == EXECUTING)) + { + executions++; + exec_count += (Uint64)ret_code; + } + } + + ThreadExecutions[my_thread_id] = exec_count; + if (error_count) + { + ndbout_c("Received %u errors in executor thread, id = %u", + error_count, my_thread_id); + } + signal_thread_ready_wait_for_start(my_thread_data); + delete my_ndb; + destroy_thread_data(my_thread_data); + return NULL; +} + +/* End NEW Module */ + static int readArguments(int argc, const char** argv){ @@ -1339,6 +2006,15 @@ readArguments(int argc, const char** arg ndbout_c("Invalid no of threads"); return -1; } + } + else if (strcmp(argv[i], "-d") == 0) + { + tNoOfDefinerThreads = atoi(argv[i+1]); + if (tNoOfDefinerThreads > NDB_MAXTHREADS) + { + ndbout_c("Invalid no of definer threads"); + return -1; + } } else if (strcmp(argv[i], "-p") == 0){ tNoOfParallelTrans = atoi(argv[i+1]); if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){ @@ -1463,6 +2139,18 @@ readArguments(int argc, const char** arg } else if (strcmp(argv[i], "-create_table") == 0){ tRunType = RunCreateTable; argc++; + } + else if (strcmp(argv[i], "-new") == 0) + { + tNew = true; + tNdbRecord = true; + argc++; + i--; + } + else if (strcmp(argv[i], "-immediate") == 0) + { + tImmediate = true; + argc++; i--; } else if (strcmp(argv[i], "-drop_table") == 0){ tRunType = RunDropTable; @@ -1536,4 +2224,303 @@ input_error(){ ndbout_c(" -table Number of standard table, default 0"); } +static void +run_old_flexAsynch(ThreadNdb *pThreadData, + NdbTimer & timer) +{ + int tLoops=0; + /**************************************************************** + * Create NDB objects. * + ****************************************************************/ + resetThreads(); + for (Uint32 i = 0; i < tNoOfThreads ; i++) + { + pThreadData[i].ThreadNo = i; + threadLife[i] = NdbThread_Create(threadLoop, + (void**)&pThreadData[i], + 32768, + "flexAsynchThread", + NDB_THREAD_PRIO_LOW); + }//for + ndbout << endl << "All NDB objects and table created" << endl << endl; + int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads; + /**************************************************************** + * Execute program. * + ****************************************************************/ + + for (;;) + { + + int loopCount = tLoops + 1 ; + ndbout << endl << "Loop # " << loopCount << endl << endl ; + + /**************************************************************** + * Perform inserts. * + ****************************************************************/ + + failed = 0 ; + if (tRunType == RunAll || tRunType == RunInsert) + { + ndbout << "Executing inserts" << endl; + START_TIMER; + execute(stInsert); + STOP_TIMER; + } + if (tRunType == RunAll) + { + a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime()); + PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); + + if (0 < failed) + { + int i = retry_opt ; + int ci = 1 ; + while (0 < failed && 0 < i) + { + ndbout << failed << " of the transactions returned errors!" + << endl << endl; + ndbout << "Attempting to redo the failed transactions now..." + << endl ; + ndbout << "Redo attempt " << ci <<" out of " << retry_opt + << endl << endl; + failed = 0 ; + START_TIMER; + execute(stInsert); + STOP_TIMER; + PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans); + i-- ; + ci++; + } + if (0 == failed ) + { + ndbout << endl <<"Redo attempt succeeded" << endl << endl; + } + else + { + ndbout << endl <<"Redo attempt failed, moving on now..." << endl + << endl; + }//if + }//if + }//if + /**************************************************************** + * Perform read. * + ****************************************************************/ + + failed = 0 ; + + if (tRunType == RunAll || tRunType == RunRead) + { + for (int ll = 0; ll < 1 + tExtraReadLoop; ll++) + { + ndbout << "Executing reads" << endl; + START_TIMER; + execute(stRead); + STOP_TIMER; + if (tRunType == RunAll) + { + a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime()); + PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); + }//if + }//for + }//if + + if (tRunType == RunAll) + { + if (0 < failed) + { + int i = retry_opt ; + int cr = 1; + while (0 < failed && 0 < i) + { + ndbout << failed << " of the transactions returned errors!"<; No bundle (reason: useless for push emails).