From: Jonas Oreland Date: September 23 2011 9:17am Subject: bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4546 to 4550) List-Archive: http://lists.mysql.com/commits/141106 Message-Id: <20110923091723.B0512714673@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4550 Jonas Oreland 2011-09-23 ndb - add config part (still only explicit, using ThreadConfig) of MT-TC - remove ThreadConfig-printout from ndbd (single threaded) modified: storage/ndb/src/kernel/SimBlockList.cpp storage/ndb/src/kernel/ndbd.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/mt_thr_config.cpp storage/ndb/src/kernel/vm/mt_thr_config.hpp 4549 Jonas Oreland 2011-09-23 ndb - fix mttc bug in DIH. Read variables *before* retry-loop (found while testing...really cool) modified: storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 4548 Jonas Oreland 2011-09-23 ndb - fix so that LQH doesn't execute direct into mt-tc modified: storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 4547 Jonas Oreland 2011-09-23 ndb - fix incorrect code in spj, blocking multi instance spj config modified: storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 4546 jonas oreland 2011-09-22 [merge] ndb - merge 63 to 70 === modified file 'storage/ndb/src/kernel/SimBlockList.cpp' --- a/storage/ndb/src/kernel/SimBlockList.cpp 2011-08-27 06:06:02 +0000 +++ b/storage/ndb/src/kernel/SimBlockList.cpp 2011-09-23 09:13:22 +0000 @@ -49,6 +49,8 @@ #include #include #include +#include +#include #include #ifndef VM_TRACE @@ -128,7 +130,10 @@ SimBlockList::load(EmulatorData& data){ theList[8] = NEW_BLOCK(Dblqh)(ctx); else theList[8] = NEW_BLOCK(DblqhProxy)(ctx); - theList[9] = NEW_BLOCK(Dbtc)(ctx); + if (globalData.ndbMtTcThreads == 0) + theList[9] = NEW_BLOCK(Dbtc)(ctx); + else + theList[9] = NEW_BLOCK(DbtcProxy)(ctx); if (!mtLqh) theList[10] = NEW_BLOCK(Dbtup)(ctx); else @@ -151,18 +156,12 @@ SimBlockList::load(EmulatorData& data){ else theList[18] = NEW_BLOCK(RestoreProxy)(ctx); theList[19] = NEW_BLOCK(Dbinfo)(ctx); - theList[20] = NEW_BLOCK(Dbspj)(ctx); + if (globalData.ndbMtTcThreads == 0) + theList[20] = NEW_BLOCK(Dbspj)(ctx); + else + theList[20] = NEW_BLOCK(DbspjProxy)(ctx); assert(NO_OF_BLOCKS == 21); - if (globalData.isNdbMt) { - add_main_thr_map(); - if (globalData.isNdbMtLqh) { - for (int i = 0; i < noOfBlocks; i++) - theList[i]->loadWorkers(); - } - finalize_thr_map(); - } - // Check that all blocks could be created for (int i = 0; i < noOfBlocks; i++) { @@ -172,6 +171,14 @@ SimBlockList::load(EmulatorData& data){ "Failed to create block", ""); } } + + if (globalData.isNdbMt) + { + add_main_thr_map(); + for (int i = 0; i < noOfBlocks; i++) + theList[i]->loadWorkers(); + finalize_thr_map(); + } } void === modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-09-15 20:21:59 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-09-23 08:52:14 +0000 @@ -9018,6 +9018,7 @@ void Dbdih::execDIGETNODESREQ(Signal* si TabRecordPtr tabPtr; tabPtr.i = req->tableId; Uint32 hashValue = req->hashValue; + Uint32 distr_key_indicator = req->distr_key_indicator; Uint32 ttabFileSize = ctabFileSize; Uint32 fragId, newFragId = RNIL; DiGetNodesConf * const conf = (DiGetNodesConf *)&signal->theData[0]; @@ -9042,7 +9043,7 @@ loop: * of distribution algorithm in use, hashValue * IS fragment id. */ - if (req->distr_key_indicator) + if (distr_key_indicator) { fragId = hashValue; if (unlikely(fragId >= tabPtr.p->totalfragments)) === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-07-04 13:40:57 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-09-23 07:47:41 +0000 @@ -3865,7 +3865,8 @@ void Dblqh::sendLqhkeyconfTc(Signal* sig { lqhKeyConf->connectPtr = tcConnectptr.i; if (instance() == refToInstance(atcBlockref) && - (Thostptr.i == 0 || Thostptr.i == getOwnNodeId())) + (Thostptr.i == 0 || Thostptr.i == getOwnNodeId()) && + globalData.ndbMtTcThreads == 0) { /** * This EXECUTE_DIRECT is multi-thread safe, as we only get here === modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-09-16 12:34:46 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-09-23 07:43:25 +0000 @@ -280,16 +280,15 @@ Dbspj::execAPI_FAILREQ(Signal* signal) { jamEntry(); Uint32 failedApiNode = signal->theData[0]; - ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR + Uint32 ref = signal->theData[1]; /** * We only need to care about lookups * as SCAN's are aborted by DBTC */ - signal->theData[0] = failedApiNode; signal->theData[1] = reference(); - sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB); + sendSignal(ref, GSN_API_FAILCONF, signal, 2, JBB); } void === modified file 'storage/ndb/src/kernel/ndbd.cpp' --- a/storage/ndb/src/kernel/ndbd.cpp 2011-09-08 11:49:24 +0000 +++ b/storage/ndb/src/kernel/ndbd.cpp 2011-09-23 09:13:22 +0000 @@ -310,7 +310,9 @@ get_multithreaded_config(EmulatorData& e return 0; ndbout << "NDBMT: workers=" << globalData.ndbMtLqhWorkers - << " threads=" << globalData.ndbMtLqhThreads << endl; + << " threads=" << globalData.ndbMtLqhThreads + << " tc=" << globalData.ndbMtTcThreads + << endl; return 0; } === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-09-13 09:10:52 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-09-23 09:13:22 +0000 @@ -436,18 +436,21 @@ Configuration::setupConfiguration(){ m_thr_config.getErrorMessage()); } } - if (thrconfigstring) + if (NdbIsMultiThreaded()) { - ndbout_c("ThreadConfig: input: %s LockExecuteThreadToCPU: %s => parsed: %s", - thrconfigstring, - lockmask ? lockmask : "", - m_thr_config.getConfigString()); - } - else - { - ndbout_c("ThreadConfig (old ndb_mgmd) LockExecuteThreadToCPU: %s => parsed: %s", - lockmask ? lockmask : "", - m_thr_config.getConfigString()); + if (thrconfigstring) + { + ndbout_c("ThreadConfig: input: %s LockExecuteThreadToCPU: %s => parsed: %s", + thrconfigstring, + lockmask ? lockmask : "", + m_thr_config.getConfigString()); + } + else + { + ndbout_c("ThreadConfig (old ndb_mgmd) LockExecuteThreadToCPU: %s => parsed: %s", + lockmask ? lockmask : "", + m_thr_config.getConfigString()); + } } ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config); @@ -466,6 +469,7 @@ Configuration::setupConfiguration(){ if (!globalData.isNdbMt) break; + globalData.ndbMtTcThreads = m_thr_config.getThreadCount(THRConfig::T_TC); globalData.isNdbMtLqh = true; { if (m_thr_config.getMtClassic()) === modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-09-04 08:52:42 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-09-23 09:13:22 +0000 @@ -32,7 +32,8 @@ static const struct THRConfig::Entries m { "ldm", THRConfig::T_LDM, 1, MAX_NDBMT_LQH_THREADS }, { "recv", THRConfig::T_RECV, 1, 1 }, { "rep", THRConfig::T_REP, 1, 1 }, - { "io", THRConfig::T_IO, 1, 1 } + { "io", THRConfig::T_IO, 1, 1 }, + { "tc", THRConfig::T_TC, 0, MAX_NDBMT_TC_THREADS } }; static const struct THRConfig::Param m_params[] = @@ -140,6 +141,7 @@ THRConfig::do_parse(unsigned MaxNoOfExec return do_bindings(); } + Uint32 tcthreads = 0; Uint32 lqhthreads = 0; switch(MaxNoOfExecutionThreads){ case 0: @@ -171,6 +173,11 @@ THRConfig::do_parse(unsigned MaxNoOfExec add(T_LDM); } + for(Uint32 i = 0; i < tcthreads; i++) + { + add(T_TC); + } + return do_bindings() || do_validate(); } @@ -283,6 +290,13 @@ THRConfig::do_bindings() "LockExecuteThreadToCPU. Only %u specified " " but %u was needed, this may cause contention.\n", cnt, num_threads); + + if (count_unbound(m_threads[T_TC])) + { + m_err_msg.assfmt("Too CPU specifed with LockExecuteThreadToCPU. " + "This is not supported when using multiple TC threads"); + return -1; + } } if (cnt >= num_threads) @@ -867,7 +881,13 @@ THRConfig::do_parse(const char * ThreadC add((T_Type)i); } - return do_bindings() || do_validate(); + int res = do_bindings(); + if (res != 0) + { + return res; + } + + return do_validate(); } unsigned @@ -916,6 +936,10 @@ THRConfigApplier::find_thread(const unsi { return &m_threads[T_MAIN][instanceNo]; } + else if ((instanceNo = findBlock(DBTC, instancelist, cnt)) >= 0) + { + return &m_threads[T_TC][instanceNo - 1]; // remove proxy + } else if ((instanceNo = findBlock(DBLQH, instancelist, cnt)) >= 0) { return &m_threads[T_LDM][instanceNo - 1]; // remove proxy... @@ -1014,6 +1038,8 @@ TAPTEST(mt_thr_config) "ldm={count=3,cpubind=1-2,5 }, ldm", "ldm={cpuset=1-3,count=3 },ldm", "main,ldm={},ldm", + "main,ldm={},ldm,tc", + "main,ldm={},ldm,tc,tc", 0 }; @@ -1026,6 +1052,7 @@ TAPTEST(mt_thr_config) "main={ keso=88, count=23},ldm,ldm", "main={ cpuset=1-3 }, ldm={cpuset=3-4}", "main={ cpuset=1-3 }, ldm={cpubind=2}", + "tc,tc,tc", 0 }; @@ -1065,45 +1092,71 @@ TAPTEST(mt_thr_config) /** threads, LockExecuteThreadToCPU, answer */ "1-8", "ldm={count=4}", + "OK", "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}", "1-5", "ldm={count=4}", + "OK", "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}", "1-3", "ldm={count=4}", + "OK", "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}", "1-4", "ldm={count=4}", + "OK", "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}", "1-8", "ldm={count=4},io={cpubind=8}", + "OK", "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},io={cpubind=8}", "1-8", "ldm={count=4,cpubind=1,4,5,6}", + "OK", "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}", + "1-9", + "ldm={count=4,cpubind=1,4,5,6},tc,tc", + "OK", + "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7},tc={cpubind=8},tc={cpubind=9}", + + "1-8", + "ldm={count=4,cpubind=1,4,5,6},tc", + "OK", + "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7},tc={cpubind=8}", + + "1-8", + "ldm={count=4,cpubind=1,4,5,6},tc,tc", + "FAIL", + "Too CPU specifed with LockExecuteThreadToCPU. This is not supported when using multiple TC threads", + // END 0 }; - for (unsigned i = 0; t[i]; i+= 3) + for (unsigned i = 0; t[i]; i+= 4) { THRConfig tmp; tmp.setLockExecuteThreadToCPU(t[i+0]); - int res = tmp.do_parse(t[i+1]); - int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0; + const int _res = tmp.do_parse(t[i+1]); + const int expect_res = strcmp(t[i+2], "OK") == 0 ? 0 : -1; + const int res = _res == expect_res ? 0 : -1; + int ok = expect_res == 0 ? + strcmp(tmp.getConfigString(), t[i+3]) == 0: + strcmp(tmp.getErrorMessage(), t[i+3]) == 0; printf("mask: %s conf: %s => %s(%s) - %s - %s\n", t[i+0], t[i+1], - res == 0 ? "OK" : "FAIL", - res == 0 ? "" : tmp.getErrorMessage(), + _res == 0 ? "OK" : "FAIL", + _res == 0 ? "" : tmp.getErrorMessage(), tmp.getConfigString(), ok == 1 ? "CORRECT" : "INCORRECT"); + OK(res == 0); OK(ok == 1); } === modified file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-09-02 17:24:52 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-09-23 09:13:22 +0000 @@ -43,8 +43,9 @@ public: T_RECV = 2, /* CMVMI */ T_REP = 3, /* SUMA */ T_IO = 4, /* FS, SocketServer etc */ + T_TC = 5, /* TC+SPJ */ - T_END = 5 + T_END = 6 }; THRConfig(); No bundle (reason: useless for push emails).