From: Jonas Oreland Date: September 26 2011 8:05am Subject: bzr push into mysql-5.5-cluster branch (jonas.oreland:3523 to 3525) List-Archive: http://lists.mysql.com/commits/141136 Message-Id: <20110926080504.D4E417CCA67@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3525 Jonas Oreland 2011-09-26 [merge] ndb - merge 7.1 to cluster-5.5 modified: storage/ndb/src/kernel/SimBlockList.cpp storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp storage/ndb/src/kernel/ndbd.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/mt_thr_config.cpp storage/ndb/src/kernel/vm/mt_thr_config.hpp 3524 Jonas Oreland 2011-09-26 [merge] ndb - merge 7.2.1 into cluster-5.5 modified: storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/mgmsrv/MgmtSrvr.cpp 3523 jonas oreland 2011-09-22 [merge] ndb - merge 71 to 72 === modified file 'storage/ndb/src/kernel/SimBlockList.cpp' --- a/storage/ndb/src/kernel/SimBlockList.cpp 2011-09-02 09:16:56 +0000 +++ b/storage/ndb/src/kernel/SimBlockList.cpp 2011-09-26 08:03:20 +0000 @@ -49,6 +49,8 @@ #include #include #include +#include +#include #include #ifndef VM_TRACE @@ -128,7 +130,10 @@ SimBlockList::load(EmulatorData& data){ theList[8] = NEW_BLOCK(Dblqh)(ctx); else theList[8] = NEW_BLOCK(DblqhProxy)(ctx); - theList[9] = NEW_BLOCK(Dbtc)(ctx); + if (globalData.ndbMtTcThreads == 0) + theList[9] = NEW_BLOCK(Dbtc)(ctx); + else + theList[9] = NEW_BLOCK(DbtcProxy)(ctx); if (!mtLqh) theList[10] = NEW_BLOCK(Dbtup)(ctx); else @@ -151,18 +156,12 @@ SimBlockList::load(EmulatorData& data){ else theList[18] = NEW_BLOCK(RestoreProxy)(ctx); theList[19] = NEW_BLOCK(Dbinfo)(ctx); - theList[20] = NEW_BLOCK(Dbspj)(ctx); + if (globalData.ndbMtTcThreads == 0) + theList[20] = NEW_BLOCK(Dbspj)(ctx); + else + theList[20] = NEW_BLOCK(DbspjProxy)(ctx); assert(NO_OF_BLOCKS == 21); - if (globalData.isNdbMt) { - add_main_thr_map(); - if (globalData.isNdbMtLqh) { - for (int i = 0; i < noOfBlocks; i++) - theList[i]->loadWorkers(); - } - finalize_thr_map(); - } - // Check that all blocks could be created for (int i = 0; i < noOfBlocks; i++) { @@ -172,6 +171,14 @@ SimBlockList::load(EmulatorData& data){ "Failed to create block", ""); } } + + if (globalData.isNdbMt) + { + add_main_thr_map(); + for (int i = 0; i < noOfBlocks; i++) + theList[i]->loadWorkers(); + finalize_thr_map(); + } } void === modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-09-16 14:44:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-09-26 08:03:20 +0000 @@ -9018,6 +9018,7 @@ void Dbdih::execDIGETNODESREQ(Signal* si TabRecordPtr tabPtr; tabPtr.i = req->tableId; Uint32 hashValue = req->hashValue; + Uint32 distr_key_indicator = req->distr_key_indicator; Uint32 ttabFileSize = ctabFileSize; Uint32 fragId, newFragId = RNIL; DiGetNodesConf * const conf = (DiGetNodesConf *)&signal->theData[0]; @@ -9042,7 +9043,7 @@ loop: * of distribution algorithm in use, hashValue * IS fragment id. */ - if (req->distr_key_indicator) + if (distr_key_indicator) { fragId = hashValue; if (unlikely(fragId >= tabPtr.p->totalfragments)) === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-09-02 09:16:56 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-09-26 08:03:20 +0000 @@ -3865,7 +3865,8 @@ void Dblqh::sendLqhkeyconfTc(Signal* sig { lqhKeyConf->connectPtr = tcConnectptr.i; if (instance() == refToInstance(atcBlockref) && - (Thostptr.i == 0 || Thostptr.i == getOwnNodeId())) + (Thostptr.i == 0 || Thostptr.i == getOwnNodeId()) && + globalData.ndbMtTcThreads == 0) { /** * This EXECUTE_DIRECT is multi-thread safe, as we only get here @@ -11347,8 +11348,8 @@ void Dblqh::scanTupkeyRefLab(Signal* sig scanReleaseLocksLab(signal); return; }//if - Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount; - if (rows) + Uint32 time_passed= cLqhTimeOutCount - tcConnectptr.p->tcTimer; + if (rows) { if (time_passed > 1) { === modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-09-16 14:44:00 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2011-09-26 08:03:20 +0000 @@ -280,16 +280,15 @@ Dbspj::execAPI_FAILREQ(Signal* signal) { jamEntry(); Uint32 failedApiNode = signal->theData[0]; - ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR + Uint32 ref = signal->theData[1]; /** * We only need to care about lookups * as SCAN's are aborted by DBTC */ - signal->theData[0] = failedApiNode; signal->theData[1] = reference(); - sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB); + sendSignal(ref, GSN_API_FAILCONF, signal, 2, JBB); } void === modified file 'storage/ndb/src/kernel/ndbd.cpp' --- a/storage/ndb/src/kernel/ndbd.cpp 2011-09-08 11:49:24 +0000 +++ b/storage/ndb/src/kernel/ndbd.cpp 2011-09-23 09:13:22 +0000 @@ -310,7 +310,9 @@ get_multithreaded_config(EmulatorData& e return 0; ndbout << "NDBMT: workers=" << globalData.ndbMtLqhWorkers - << " threads=" << globalData.ndbMtLqhThreads << endl; + << " threads=" << globalData.ndbMtLqhThreads + << " tc=" << globalData.ndbMtTcThreads + << endl; return 0; } === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-09-14 13:49:19 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-09-26 08:03:20 +0000 @@ -436,18 +436,21 @@ Configuration::setupConfiguration(){ m_thr_config.getErrorMessage()); } } - if (thrconfigstring) + if (NdbIsMultiThreaded()) { - ndbout_c("ThreadConfig: input: %s LockExecuteThreadToCPU: %s => parsed: %s", - thrconfigstring, - lockmask ? lockmask : "", - m_thr_config.getConfigString()); - } - else - { - ndbout_c("ThreadConfig (old ndb_mgmd) LockExecuteThreadToCPU: %s => parsed: %s", - lockmask ? lockmask : "", - m_thr_config.getConfigString()); + if (thrconfigstring) + { + ndbout_c("ThreadConfig: input: %s LockExecuteThreadToCPU: %s => parsed: %s", + thrconfigstring, + lockmask ? lockmask : "", + m_thr_config.getConfigString()); + } + else + { + ndbout_c("ThreadConfig (old ndb_mgmd) LockExecuteThreadToCPU: %s => parsed: %s", + lockmask ? lockmask : "", + m_thr_config.getConfigString()); + } } ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config); @@ -466,6 +469,7 @@ Configuration::setupConfiguration(){ if (!globalData.isNdbMt) break; + globalData.ndbMtTcThreads = m_thr_config.getThreadCount(THRConfig::T_TC); globalData.isNdbMtLqh = true; { if (m_thr_config.getMtClassic()) === modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-09-04 08:52:42 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-09-23 09:13:22 +0000 @@ -32,7 +32,8 @@ static const struct THRConfig::Entries m { "ldm", THRConfig::T_LDM, 1, MAX_NDBMT_LQH_THREADS }, { "recv", THRConfig::T_RECV, 1, 1 }, { "rep", THRConfig::T_REP, 1, 1 }, - { "io", THRConfig::T_IO, 1, 1 } + { "io", THRConfig::T_IO, 1, 1 }, + { "tc", THRConfig::T_TC, 0, MAX_NDBMT_TC_THREADS } }; static const struct THRConfig::Param m_params[] = @@ -140,6 +141,7 @@ THRConfig::do_parse(unsigned MaxNoOfExec return do_bindings(); } + Uint32 tcthreads = 0; Uint32 lqhthreads = 0; switch(MaxNoOfExecutionThreads){ case 0: @@ -171,6 +173,11 @@ THRConfig::do_parse(unsigned MaxNoOfExec add(T_LDM); } + for(Uint32 i = 0; i < tcthreads; i++) + { + add(T_TC); + } + return do_bindings() || do_validate(); } @@ -283,6 +290,13 @@ THRConfig::do_bindings() "LockExecuteThreadToCPU. Only %u specified " " but %u was needed, this may cause contention.\n", cnt, num_threads); + + if (count_unbound(m_threads[T_TC])) + { + m_err_msg.assfmt("Too CPU specifed with LockExecuteThreadToCPU. " + "This is not supported when using multiple TC threads"); + return -1; + } } if (cnt >= num_threads) @@ -867,7 +881,13 @@ THRConfig::do_parse(const char * ThreadC add((T_Type)i); } - return do_bindings() || do_validate(); + int res = do_bindings(); + if (res != 0) + { + return res; + } + + return do_validate(); } unsigned @@ -916,6 +936,10 @@ THRConfigApplier::find_thread(const unsi { return &m_threads[T_MAIN][instanceNo]; } + else if ((instanceNo = findBlock(DBTC, instancelist, cnt)) >= 0) + { + return &m_threads[T_TC][instanceNo - 1]; // remove proxy + } else if ((instanceNo = findBlock(DBLQH, instancelist, cnt)) >= 0) { return &m_threads[T_LDM][instanceNo - 1]; // remove proxy... @@ -1014,6 +1038,8 @@ TAPTEST(mt_thr_config) "ldm={count=3,cpubind=1-2,5 }, ldm", "ldm={cpuset=1-3,count=3 },ldm", "main,ldm={},ldm", + "main,ldm={},ldm,tc", + "main,ldm={},ldm,tc,tc", 0 }; @@ -1026,6 +1052,7 @@ TAPTEST(mt_thr_config) "main={ keso=88, count=23},ldm,ldm", "main={ cpuset=1-3 }, ldm={cpuset=3-4}", "main={ cpuset=1-3 }, ldm={cpubind=2}", + "tc,tc,tc", 0 }; @@ -1065,45 +1092,71 @@ TAPTEST(mt_thr_config) /** threads, LockExecuteThreadToCPU, answer */ "1-8", "ldm={count=4}", + "OK", "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}", "1-5", "ldm={count=4}", + "OK", "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}", "1-3", "ldm={count=4}", + "OK", "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}", "1-4", "ldm={count=4}", + "OK", "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}", "1-8", "ldm={count=4},io={cpubind=8}", + "OK", "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},io={cpubind=8}", "1-8", "ldm={count=4,cpubind=1,4,5,6}", + "OK", "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}", + "1-9", + "ldm={count=4,cpubind=1,4,5,6},tc,tc", + "OK", + "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7},tc={cpubind=8},tc={cpubind=9}", + + "1-8", + "ldm={count=4,cpubind=1,4,5,6},tc", + "OK", + "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7},tc={cpubind=8}", + + "1-8", + "ldm={count=4,cpubind=1,4,5,6},tc,tc", + "FAIL", + "Too CPU specifed with LockExecuteThreadToCPU. This is not supported when using multiple TC threads", + // END 0 }; - for (unsigned i = 0; t[i]; i+= 3) + for (unsigned i = 0; t[i]; i+= 4) { THRConfig tmp; tmp.setLockExecuteThreadToCPU(t[i+0]); - int res = tmp.do_parse(t[i+1]); - int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0; + const int _res = tmp.do_parse(t[i+1]); + const int expect_res = strcmp(t[i+2], "OK") == 0 ? 0 : -1; + const int res = _res == expect_res ? 0 : -1; + int ok = expect_res == 0 ? + strcmp(tmp.getConfigString(), t[i+3]) == 0: + strcmp(tmp.getErrorMessage(), t[i+3]) == 0; printf("mask: %s conf: %s => %s(%s) - %s - %s\n", t[i+0], t[i+1], - res == 0 ? "OK" : "FAIL", - res == 0 ? "" : tmp.getErrorMessage(), + _res == 0 ? "OK" : "FAIL", + _res == 0 ? "" : tmp.getErrorMessage(), tmp.getConfigString(), ok == 1 ? "CORRECT" : "INCORRECT"); + OK(res == 0); OK(ok == 1); } === modified file 'storage/ndb/src/kernel/vm/mt_thr_config.hpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-09-02 17:24:52 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.hpp 2011-09-23 09:13:22 +0000 @@ -43,8 +43,9 @@ public: T_RECV = 2, /* CMVMI */ T_REP = 3, /* SUMA */ T_IO = 4, /* FS, SocketServer etc */ + T_TC = 5, /* TC+SPJ */ - T_END = 5 + T_END = 6 }; THRConfig(); === modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp' --- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2011-09-19 19:55:58 +0000 +++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2011-09-26 07:43:46 +0000 @@ -3451,15 +3451,17 @@ MgmtSrvr::try_alloc_from_list(NodeId& no for (unsigned i = 0; i < nodes.size(); i++) { const unsigned id= nodes[i].id; - if (m_reserved_nodes.get(id)) + if (theFacade->ext_isConnected(id)) { - // Node is already reserved(locally in this node) + // Node is already reserved(connected via transporter) continue; } - if (theFacade->ext_isConnected(id)) + NdbMutex_Lock(m_reserved_nodes_mutex); + if (m_reserved_nodes.get(id)) { - // Node is already reserved(connected via transporter) + // Node is already reserved(locally in this node) + NdbMutex_Unlock(m_reserved_nodes_mutex); continue; } @@ -3483,16 +3485,14 @@ MgmtSrvr::try_alloc_from_list(NodeId& no more than one thread asked for same nodeid) since it's now reserved in data node */ - m_reserved_nodes.clear(id); + release_local_nodeid_reservation(id); } - NdbMutex_Lock(m_reserved_nodes_mutex); return true; } /* Release the local reservation */ - m_reserved_nodes.clear(id); - NdbMutex_Lock(m_reserved_nodes_mutex); + release_local_nodeid_reservation(id); if (res < 0) { @@ -3601,8 +3601,6 @@ MgmtSrvr::alloc_node_id_impl(NodeId& nod return false; } - Guard g(m_reserved_nodes_mutex); - /* Check timeout of nodeid reservations for NDB */ if (type == NDB_MGM_NODE_TYPE_NDB) { @@ -3610,8 +3608,11 @@ MgmtSrvr::alloc_node_id_impl(NodeId& nod for (unsigned i = 0; i < nodes.size(); i++) { const NodeId ndb_nodeid = nodes[i].id; - if (!m_reserved_nodes.has_timedout(ndb_nodeid, now)) - continue; + { + Guard g(m_reserved_nodes_mutex); + if (!m_reserved_nodes.has_timedout(ndb_nodeid, now)) + continue; + } // Found a timedout reservation if (theFacade->ext_isConnected(ndb_nodeid)) @@ -3621,7 +3622,7 @@ MgmtSrvr::alloc_node_id_impl(NodeId& nod "releasing it", ndb_nodeid); // Clear the reservation - m_reserved_nodes.clear(ndb_nodeid); + release_local_nodeid_reservation(ndb_nodeid); } } No bundle (reason: useless for push emails).