From: Date: December 1 2008 7:52pm Subject: bzr push into mysql-5.1 branch (pekka:3139 to 3146) WL#4391 List-Archive: http://lists.mysql.com/commits/60320 Message-Id: <20081201185223.1DCDB2408F@sama.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7BIT 3146 Pekka Nousiainen 2008-12-01 wl#4391 00_fix.diff fix - compile error on no-debug modified: storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 3145 Pekka Nousiainen 2008-12-01 wl#4391 33e_mixed.diff mixed - SR modified: storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 3144 Pekka Nousiainen 2008-12-01 wl#4391 33d_mixed.diff mixed - DBLQH: send unpacked if remote is mt-lqh modified: storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 3143 Pekka Nousiainen 2008-12-01 wl#4391 33c_mixed.diff mixed - DBTC: send unpacked if remote is mt-lqh modified: storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 3142 Pekka Nousiainen 2008-12-01 wl#4391 33b_mixed.diff mixed - use instance key, not own instance modified: storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/vm/SimulatedBlock.hpp 3141 Pekka Nousiainen 2008-12-01 wl#4391 32a_mixed.diff mixed - store number of lqh workers in NodeInfo modified: storage/ndb/include/kernel/NodeInfo.hpp storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp storage/ndb/include/ndb_version.h.in storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp storage/ndb/src/kernel/main.cpp 3140 Pekka Nousiainen 2008-12-01 wl#4391 32b_mutex.diff recursive mutex - implement own modified: storage/ndb/src/kernel/blocks/lgman.cpp storage/ndb/src/kernel/blocks/tsman.cpp storage/ndb/src/kernel/vm/SafeMutex.cpp storage/ndb/src/kernel/vm/SafeMutex.hpp 3139 Jonas Oreland 2008-11-28 ndb - fix testDict -n FailAddPartition - fix testprg and bugs hidden by bug in testprg modified: storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp storage/ndb/test/ndbapi/testDict.cpp === modified file 'storage/ndb/include/kernel/NodeInfo.hpp' --- a/storage/ndb/include/kernel/NodeInfo.hpp 2008-11-04 08:43:06 +0000 +++ b/storage/ndb/include/kernel/NodeInfo.hpp 2008-12-01 18:04:19 +0000 @@ -37,6 +37,7 @@ public: Uint32 m_version; ///< Ndb version Uint32 m_mysql_version; ///< MySQL version + Uint32 m_lqh_workers; ///< LQH workers Uint32 m_type; ///< Node type Uint32 m_connectCount; ///< No of times connected bool m_connected; ///< Node is connected @@ -50,6 +51,7 @@ inline NodeInfo::NodeInfo(){ m_version = 0; m_mysql_version = 0; + m_lqh_workers = 0; m_type = INVALID; m_connectCount = 0; m_heartbeat_cnt= 0; === modified file 'storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp' --- a/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp 2007-01-06 00:21:39 +0000 +++ b/storage/ndb/include/kernel/signaldata/CmRegSignalData.hpp 2008-12-01 18:04:19 +0000 @@ -161,7 +161,7 @@ class CmNodeInfoReq { friend class Qmgr; public: - STATIC_CONST( SignalLength = 4 ); + STATIC_CONST( SignalLength = 5 ); private: /** @@ -171,6 +171,7 @@ private: Uint32 dynamicId; Uint32 version; Uint32 mysql_version; + Uint32 lqh_workers; // added in telco-6.4 }; class CmNodeInfoRef { @@ -198,20 +199,14 @@ class CmNodeInfoConf { friend class Qmgr; public: - STATIC_CONST( SignalLength = 4 ); + STATIC_CONST( SignalLength = 5 ); private: Uint32 nodeId; Uint32 dynamicId; Uint32 version; Uint32 mysql_version; + Uint32 lqh_workers; // added in telco-6.4 }; #endif - - - - - - - === modified file 'storage/ndb/include/ndb_version.h.in' --- a/storage/ndb/include/ndb_version.h.in 2008-11-14 09:12:01 +0000 +++ b/storage/ndb/include/ndb_version.h.in 2008-12-01 18:04:19 +0000 @@ -102,6 +102,7 @@ Uint32 ndbGetOwnVersion(); #define NDBD_LONG_LQHKEYREQ MAKE_VERSION(6,4,0) #define NDBD_MAX_RECVBYTESIZE_32K MAKE_VERSION(6,3,18) #define NDBD_LONG_SCANFRAGREQ MAKE_VERSION(6,4,0) +#define NDBD_MT_LQH_VERSION MAKE_VERSION(6,4,0) static === modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-11-19 11:01:17 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2008-12-01 18:05:11 +0000 @@ -859,6 +859,11 @@ public: * Log part */ Uint32 m_log_part_ptr_i; + + /** + * Instance key for fast access. + */ + Uint16 lqhInstanceKey; }; typedef Ptr FragrecordPtr; === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp 2008-11-16 12:59:12 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp 2008-12-01 18:05:11 +0000 @@ -72,3 +72,10 @@ NdbLogPartInfo::partNoIndex(Uint32 lpno) assert(partNo[i] == lpno); return i; } + +Uint32 +NdbLogPartInfo::instanceKey(Uint32 lpno) const +{ + assert(lpno < LogParts); + return 1 + lpno; +} === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp 2008-11-16 12:59:12 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp 2008-12-01 18:05:11 +0000 @@ -47,6 +47,7 @@ struct NdbLogPartInfo { bool partNoOwner(Uint32 lpno) const; bool partNoOwner(Uint32 tabId, Uint32 fragId); Uint32 partNoIndex(Uint32 lpno) const; + Uint32 instanceKey(Uint32 lpno) const; }; #endif === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-11-19 11:01:17 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-12-01 18:50:58 +0000 @@ -720,7 +720,10 @@ void Dblqh::startphase1Lab(Signal* signa for (Ti = 0; Ti < chostFileSize; Ti++) { ThostPtr.i = Ti; ptrCheckGuard(ThostPtr, chostFileSize, hostRecord); - // wl4391_todo using own instance() does not work with mixed versions + /* + * Valid only if receiver has same number of LQH workers. + * In general full instance key of fragment must be used. + */ ThostPtr.p->hostLqhBlockRef = calcInstanceBlockRef(DBLQH, ThostPtr.i); ThostPtr.p->hostTcBlockRef = calcTcBlockRef(ThostPtr.i); ThostPtr.p->inPackedList = false; @@ -1594,6 +1597,7 @@ void Dblqh::execLQHFRAGREQ(Signal* signa ndbrequire(ptr.p->logPartNo == logPartNo); fragptr.p->m_log_part_ptr_i = ptr.i; + fragptr.p->lqhInstanceKey = lpinfo.instanceKey(logPartNo); } if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) { @@ -3012,99 +3016,163 @@ void Dblqh::sendCommitLqh(Signal* signal HostRecordPtr Thostptr; Thostptr.i = refToNode(alqhBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); + + Uint32 Tdata[5]; + Tdata[0] = tcConnectptr.p->clientConnectrec; + Tdata[1] = tcConnectptr.p->gci_hi; + Tdata[2] = tcConnectptr.p->transid[0]; + Tdata[3] = tcConnectptr.p->transid[1]; + Tdata[4] = tcConnectptr.p->gci_lo; + Uint32 len = 5; + + if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version))) + { + jam(); + ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false); + len = 4; + } + + // currently packed signal cannot address specific instance + const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0; + if (send_unpacked) { + jam(); + FragrecordPtr Tfragptr; + Tfragptr.i = tcConnectptr.p->fragmentptr; + c_fragment_pool.getPtr(Tfragptr); + memcpy(&signal->theData[0], &Tdata[0], len << 2); + Uint32 Tnode = Thostptr.i; + Uint32 instanceKey = Tfragptr.p->lqhInstanceKey; + BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode); + sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB); + return; + } + if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) { jam(); sendPackedSignalLqh(signal, Thostptr.p); } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); - }//if - Uint32 pos = Thostptr.p->noOfPackedWordsLqh; - Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMIT << 28); - Uint32 gci_hi = tcConnectptr.p->gci_hi; - Uint32 gci_lo = tcConnectptr.p->gci_lo; - Uint32 transid1 = tcConnectptr.p->transid[0]; - Uint32 transid2 = tcConnectptr.p->transid[1]; - Thostptr.p->packedWordsLqh[pos] = ptrAndType; - Thostptr.p->packedWordsLqh[pos + 1] = gci_hi; - Thostptr.p->packedWordsLqh[pos + 2] = transid1; - Thostptr.p->packedWordsLqh[pos + 3] = transid2; - Thostptr.p->packedWordsLqh[pos + 4] = gci_lo; - Thostptr.p->noOfPackedWordsLqh = pos + 5; - - if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version))) - { - jam(); - ndbassert(gci_lo == 0 || getNodeInfo(Thostptr.i).m_connected == false); - Thostptr.p->noOfPackedWordsLqh = pos + 4; } -}//Dblqh::sendCommitLqh() + + Tdata[0] |= (ZCOMMIT << 28); + Uint32 pos = Thostptr.p->noOfPackedWordsLqh; + memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2); + Thostptr.p->noOfPackedWordsLqh = pos + len; +} void Dblqh::sendCompleteLqh(Signal* signal, BlockReference alqhBlockref) { HostRecordPtr Thostptr; Thostptr.i = refToNode(alqhBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); + + Uint32 Tdata[3]; + Tdata[0] = tcConnectptr.p->clientConnectrec; + Tdata[1] = tcConnectptr.p->transid[0]; + Tdata[2] = tcConnectptr.p->transid[1]; + Uint32 len = 3; + + // currently packed signal cannot address specific instance + const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0; + if (send_unpacked) { + jam(); + FragrecordPtr Tfragptr; + Tfragptr.i = tcConnectptr.p->fragmentptr; + c_fragment_pool.getPtr(Tfragptr); + memcpy(&signal->theData[0], &Tdata[0], len << 2); + Uint32 Tnode = Thostptr.i; + Uint32 instanceKey = Tfragptr.p->lqhInstanceKey; + BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode); + sendSignal(lqhRef, GSN_COMPLETE, signal, len, JBB); + return; + } + if (Thostptr.p->noOfPackedWordsLqh > 22) { jam(); sendPackedSignalLqh(signal, Thostptr.p); } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); - }//if + } + + Tdata[0] |= (ZCOMPLETE << 28); Uint32 pos = Thostptr.p->noOfPackedWordsLqh; - Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETE << 28); - Uint32 transid1 = tcConnectptr.p->transid[0]; - Uint32 transid2 = tcConnectptr.p->transid[1]; - Thostptr.p->packedWordsLqh[pos] = ptrAndType; - Thostptr.p->packedWordsLqh[pos + 1] = transid1; - Thostptr.p->packedWordsLqh[pos + 2] = transid2; - Thostptr.p->noOfPackedWordsLqh = pos + 3; -}//Dblqh::sendCompleteLqh() + memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2); + Thostptr.p->noOfPackedWordsLqh = pos + len; +} void Dblqh::sendCommittedTc(Signal* signal, BlockReference atcBlockref) { HostRecordPtr Thostptr; Thostptr.i = refToNode(atcBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); + + Uint32 Tdata[3]; + Tdata[0] = tcConnectptr.p->clientConnectrec; + Tdata[1] = tcConnectptr.p->transid[0]; + Tdata[2] = tcConnectptr.p->transid[1]; + Uint32 len = 3; + + // currently TC is single-threaded + const bool send_unpacked = false; + if (send_unpacked) { + jam(); + memcpy(&signal->theData[0], &Tdata[0], len << 2); + BlockReference tcRef = Thostptr.p->hostTcBlockRef; + sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB); + return; + } + if (Thostptr.p->noOfPackedWordsTc > 22) { jam(); sendPackedSignalTc(signal, Thostptr.p); } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); - }//if + } + + Tdata[0] |= (ZCOMMITTED << 28); Uint32 pos = Thostptr.p->noOfPackedWordsTc; - Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMMITTED << 28); - Uint32 transid1 = tcConnectptr.p->transid[0]; - Uint32 transid2 = tcConnectptr.p->transid[1]; - Thostptr.p->packedWordsTc[pos] = ptrAndType; - Thostptr.p->packedWordsTc[pos + 1] = transid1; - Thostptr.p->packedWordsTc[pos + 2] = transid2; - Thostptr.p->noOfPackedWordsTc = pos + 3; -}//Dblqh::sendCommittedTc() + memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2); + Thostptr.p->noOfPackedWordsTc = pos + len; +} void Dblqh::sendCompletedTc(Signal* signal, BlockReference atcBlockref) { HostRecordPtr Thostptr; Thostptr.i = refToNode(atcBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); + + Uint32 Tdata[3]; + Tdata[0] = tcConnectptr.p->clientConnectrec; + Tdata[1] = tcConnectptr.p->transid[0]; + Tdata[2] = tcConnectptr.p->transid[1]; + Uint32 len = 3; + + // currently TC is single-threaded + const bool send_unpacked = false; + if (send_unpacked) { + jam(); + memcpy(&signal->theData[0], &Tdata[0], len << 2); + BlockReference tcRef = Thostptr.p->hostTcBlockRef; + sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB); + return; + } + if (Thostptr.p->noOfPackedWordsTc > 22) { jam(); sendPackedSignalTc(signal, Thostptr.p); } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); - }//if + } + + Tdata[0] |= (ZCOMPLETED << 28); Uint32 pos = Thostptr.p->noOfPackedWordsTc; - Uint32 ptrAndType = tcConnectptr.p->clientConnectrec | (ZCOMPLETED << 28); - Uint32 transid1 = tcConnectptr.p->transid[0]; - Uint32 transid2 = tcConnectptr.p->transid[1]; - Thostptr.p->packedWordsTc[pos] = ptrAndType; - Thostptr.p->packedWordsTc[pos + 1] = transid1; - Thostptr.p->packedWordsTc[pos + 2] = transid2; - Thostptr.p->noOfPackedWordsTc = pos + 3; -}//Dblqh::sendCompletedTc() + memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2); + Thostptr.p->noOfPackedWordsTc = pos + len; +} void Dblqh::sendLqhkeyconfTc(Signal* signal, BlockReference atcBlockref) { @@ -3130,7 +3198,9 @@ void Dblqh::sendLqhkeyconfTc(Signal* sig lqhKeyConf = (LqhKeyConf *) &Thostptr.p->packedWordsTc[Thostptr.p->noOfPackedWordsTc]; Thostptr.p->noOfPackedWordsTc += LqhKeyConf::SignalLength; - } else if(refToMain(atcBlockref) == DBLQH){ + } else if(refToMain(atcBlockref) == DBLQH && + refToInstance(atcBlockref) == instance()) { + //wl4391_todo check jam(); /******************************************************************* // This signal was intended for DBLQH as part of log execution or @@ -5781,8 +5851,10 @@ void Dblqh::packLqhkeyreqLab(Signal* sig lqhKeyReq->variableData[nextPos + 0] = sig0; nextPos += LqhKeyReq::getGCIFlag(Treqinfo); - // wl4391_todo for mixed versions must recompute full instance key here - BlockReference lqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica); + // pass full instance key for remote to map to real instance + BlockReference lqhRef = numberToRef(DBLQH, + fragptr.p->lqhInstanceKey, + regTcPtr->nextReplica); if (likely(sendLongReq)) { @@ -6112,7 +6184,17 @@ void Dblqh::writeAttrinfoLab(Signal* sig /* ------------------------------------------------------------------------- */ void Dblqh::sendTupkey(Signal* signal) { - BlockReference lqhRef = calcInstanceBlockRef(DBLQH, tcConnectptr.p->nextReplica); + BlockReference lqhRef = 0; + { + // wl4391_todo fragptr + FragrecordPtr Tfragptr; + Tfragptr.i = tcConnectptr.p->fragmentptr; + c_fragment_pool.getPtr(Tfragptr); + Uint32 Tnode = tcConnectptr.p->nextReplica; + Uint32 instanceKey = Tfragptr.p->lqhInstanceKey; + lqhRef = numberToRef(DBLQH, instanceKey, Tnode); + } + signal->theData[0] = tcConnectptr.p->tcOprec; signal->theData[1] = tcConnectptr.p->transid[0]; signal->theData[2] = tcConnectptr.p->transid[1]; @@ -7286,7 +7368,12 @@ void Dblqh::execABORT(Signal* signal) /* ------------------------------------------------------------------------- */ // We will immediately send the ABORT message also to the next LQH node in line. /* ------------------------------------------------------------------------- */ - BlockReference TLqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica); + FragrecordPtr Tfragptr; + Tfragptr.i = regTcPtr->fragmentptr; + c_fragment_pool.getPtr(Tfragptr); + Uint32 Tnode = regTcPtr->nextReplica; + Uint32 instanceKey = Tfragptr.p->lqhInstanceKey; + BlockReference TLqhRef = numberToRef(DBLQH, instanceKey, Tnode); signal->theData[0] = regTcPtr->tcOprec; signal->theData[1] = regTcPtr->tcBlockref; signal->theData[2] = regTcPtr->transid[0]; @@ -15215,10 +15302,15 @@ void Dblqh::execSTART_EXEC_SR(Signal* si * WE NEED TO SEND THOSE SIGNALS EVEN IF WE HAVE NOT REQUESTED * ANY FRAGMENTS PARTICIPATE IN THIS PHASE. * --------------------------------------------------------------------- */ - BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance()); - NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes); signal->theData[0] = cownNodeid; - sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB); + if (!isNdbMtLqh()) { + NodeReceiverGroup rg(DBLQH, m_sr_nodes); + sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB); + } else { + const Uint32 sz = NdbNodeBitmask::Size; + m_sr_nodes.copyto(sz, &signal->theData[1]); + sendSignal(DBLQH_REF, GSN_EXEC_SRREQ, signal, 1 + sz, JBB); + } return; } else { jam(); @@ -15232,7 +15324,9 @@ void Dblqh::execSTART_EXEC_SR(Signal* si Uint32 index = csrPhasesCompleted; arrGuard(index, MAX_LOG_EXEC); - BlockReference ref = calcInstanceBlockRef(DBLQH, fragptr.p->srLqhLognode[index]); + Uint32 Tnode = fragptr.p->srLqhLognode[index]; + Uint32 instanceKey = fragptr.p->lqhInstanceKey; + BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode); fragptr.p->srStatus = Fragrecord::SS_STARTED; /* -------------------------------------------------------------------- @@ -15249,7 +15343,6 @@ void Dblqh::execSTART_EXEC_SR(Signal* si execFragReq->lastGci = fragptr.p->srLastGci[index]; sendSignal(ref, GSN_EXEC_FRAGREQ, signal, ExecFragReq::SignalLength, JBB); - } signal->theData[0] = next; sendSignal(cownref, GSN_START_EXEC_SR, signal, 1, JBB); @@ -15339,6 +15432,13 @@ void Dblqh::execEXEC_FRAGREF(Signal* sig /* *************** */ void Dblqh::execEXEC_SRCONF(Signal* signal) { + // wl4391_todo workaround until timing fixed + if (cnoOutstandingExecFragReq != 0) { + ndbout << "delay: reqs=" << cnoOutstandingExecFragReq << endl; + sendSignalWithDelay(reference(), GSN_EXEC_SRCONF, + signal, 10, signal->getLength()); + return; + } jamEntry(); Uint32 nodeId = signal->theData[0]; arrGuard(nodeId, MAX_NDB_NODES); @@ -16704,9 +16804,14 @@ void Dblqh::srPhase3Comp(Signal* signal) jamEntry(); signal->theData[0] = cownNodeid; - BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance()); - NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes); - sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB); + if (!isNdbMtLqh()) { + NodeReceiverGroup rg(DBLQH, m_sr_nodes); + sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB); + } else { + const Uint32 sz = NdbNodeBitmask::Size; + m_sr_nodes.copyto(sz, &signal->theData[1]); + sendSignal(DBLQH_REF, GSN_EXEC_SRCONF, signal, 1 + sz, JBB); + } return; }//Dblqh::srPhase3Comp() @@ -19543,7 +19648,7 @@ Dblqh::validate_filter(Signal* signal) default: infoEvent("Invalid filter op: 0x%x pos: %ld", * start, - start - (signal->theData + 1)); + (long int)(start - (signal->theData + 1))); return false; } } === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-11-16 15:28:22 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-12-01 18:06:58 +0000 @@ -84,6 +84,10 @@ DblqhProxy::DblqhProxy(Block_context& ct // GSN_SUB_GCP_COMPLETE_REP addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP); + + // GSN_EXEC_SRREQ + addRecSignal(GSN_EXEC_SRREQ, &DblqhProxy::execEXEC_SRREQ); + addRecSignal(GSN_EXEC_SRCONF, &DblqhProxy::execEXEC_SRCONF); } DblqhProxy::~DblqhProxy() @@ -1329,4 +1333,112 @@ DblqhProxy::sendEMPTY_LCP_CONF(Signal* s ssRelease(ssId); } +// GSN_EXEC_SR_1 [fictional gsn ] + +void +DblqhProxy::execEXEC_SRREQ(Signal* signal) +{ + const BlockReference senderRef = signal->getSendersBlockRef(); + + if (refToInstance(senderRef) != 0) { + jam(); + execEXEC_SR_2(signal, GSN_EXEC_SRREQ); + return; + } + + execEXEC_SR_1(signal, GSN_EXEC_SRREQ); +} + +void +DblqhProxy::execEXEC_SRCONF(Signal* signal) +{ + const BlockReference senderRef = signal->getSendersBlockRef(); + + if (refToInstance(senderRef) != 0) { + jam(); + execEXEC_SR_2(signal, GSN_EXEC_SRCONF); + return; + } + + execEXEC_SR_1(signal, GSN_EXEC_SRCONF); +} + +void +DblqhProxy::execEXEC_SR_1(Signal* signal, GlobalSignalNumber gsn) +{ + ndbrequire(signal->getLength() == Ss_EXEC_SR_1::Sig::SignalLength); + + const Ss_EXEC_SR_1::Sig* sig = + (const Ss_EXEC_SR_1::Sig*)signal->getDataPtr(); + Uint32 ssId = getSsId(sig); + Ss_EXEC_SR_1& ss = ssSeize(ssId); + ss.m_gsn = gsn; + ss.m_sig = *sig; + + sendREQ(signal, ss); + ssRelease(ss); +} + +void +DblqhProxy::sendEXEC_SR_1(Signal* signal, Uint32 ssId) +{ + Ss_EXEC_SR_1& ss = ssFind(ssId); + signal->theData[0] = ss.m_sig.nodeId; + sendSignal(workerRef(ss.m_worker), ss.m_gsn, signal, 1, JBB); +} + +// GSN_EXEC_SRREQ_2 [ fictional gsn ] + +void +DblqhProxy::execEXEC_SR_2(Signal* signal, GlobalSignalNumber gsn) +{ + ndbrequire(signal->getLength() == Ss_EXEC_SR_2::Sig::SignalLength); + + const Ss_EXEC_SR_2::Sig* sig = + (const Ss_EXEC_SR_2::Sig*)signal->getDataPtr(); + Uint32 ssId = getSsId(sig); + + bool found = false; + Ss_EXEC_SR_2& ss = ssFindSeize(ssId, &found); + if (!found) { + jam(); + setMask(ss); + } + + ndbrequire(sig->nodeId == getOwnNodeId()); + if (ss.m_sigcount == 0) { + jam(); + ss.m_gsn = gsn; + ss.m_sig = *sig; + } else { + jam(); + ndbrequire(ss.m_gsn == gsn); + ndbrequire(memcmp(&ss.m_sig, sig, sizeof(*sig)) == 0); + } + ss.m_sigcount++; + + // reversed roles + recvCONF(signal, ss); +} + +void +DblqhProxy::sendEXEC_SR_2(Signal* signal, Uint32 ssId) +{ + Ss_EXEC_SR_2& ss = ssFind(ssId); + + if (!lastReply(ss)) { + jam(); + return; + } + + NodeBitmask nodes; + nodes.assign(NdbNodeBitmask::Size, ss.m_sig.sr_nodes); + NodeReceiverGroup rg(DBLQH, nodes); + + signal->theData[0] = ss.m_sig.nodeId; + sendSignal(rg, ss.m_gsn, signal, 1, JBB); + + ssRelease(ssId); +} + BLOCK_FUNCTIONS(DblqhProxy) === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-11-16 15:28:22 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-12-01 18:06:58 +0000 @@ -380,9 +380,7 @@ protected: // GSN_START_RECREQ_2 [ sub-op, fictional gsn ] struct Ss_START_RECREQ_2 : SsParallel { -#ifdef VM_TRACE static const char* name() { return "START_RECREQ_2"; } -#endif struct Req { enum { SignalLength = 2 }; Uint32 lcpId; @@ -458,6 +456,71 @@ protected: void execEMPTY_LCP_CONF(Signal*); void execEMPTY_LCP_REF(Signal*); void sendEMPTY_LCP_CONF(Signal*, Uint32 ssId); + + // GSN_EXEC_SR_1 [ fictional gsn ] + struct Ss_EXEC_SR_1 : SsParallel { + /* + * Handle EXEC_SRREQ and EXEC_SRCONF. These are broadcast + * signals (not REQ/CONF). EXEC_SR_1 receives one signal and + * sends it to its workers. EXEC_SR_2 waits for signal from + * all workers and broadcasts it to all nodes. These are + * required to handle mixed versions (non-mt, mt-lqh-1,2,4). + */ + static const char* name() { return "EXEC_SR_1"; } + struct Sig { + enum { SignalLength = 1 }; + Uint32 nodeId; + }; + GlobalSignalNumber m_gsn; + Sig m_sig; + Ss_EXEC_SR_1() { + m_sendREQ = (SsFUNC)&DblqhProxy::sendEXEC_SR_1; + m_sendCONF = (SsFUNC)0; + m_gsn = 0; + }; + enum { poolSize = 1 }; + static SsPool& pool(LocalProxy* proxy) { + return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_1; + } + }; + SsPool c_ss_EXEC_SR_1; + Uint32 getSsId(const Ss_EXEC_SR_1::Sig* sig) { + return SsIdBase | refToNode(sig->nodeId); + }; + void execEXEC_SRREQ(Signal*); + void execEXEC_SRCONF(Signal*); + void execEXEC_SR_1(Signal*, GlobalSignalNumber gsn); + void sendEXEC_SR_1(Signal*, Uint32 ssId); + + // GSN_EXEC_SR_2 [ fictional gsn ] + struct Ss_EXEC_SR_2 : SsParallel { + static const char* name() { return "EXEC_SR_2"; } + struct Sig { + enum { SignalLength = 1 + NdbNodeBitmask::Size }; + Uint32 nodeId; + Uint32 sr_nodes[NdbNodeBitmask::Size]; // local signal so ok to add + }; + GlobalSignalNumber m_gsn; + Uint32 m_sigcount; + Sig m_sig; // all signals must be identical + Ss_EXEC_SR_2() { + // reversed roles + m_sendREQ = (SsFUNC)0; + m_sendCONF = (SsFUNC)&DblqhProxy::sendEXEC_SR_2; + m_gsn = 0; + m_sigcount = 0; + }; + enum { poolSize = 1 }; + static SsPool& pool(LocalProxy* proxy) { + return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_2; + } + }; + SsPool c_ss_EXEC_SR_2; + Uint32 getSsId(const Ss_EXEC_SR_2::Sig* sig) { + return SsIdBase | refToNode(sig->nodeId); + }; + void execEXEC_SR_2(Signal*, GlobalSignalNumber gsn); + void sendEXEC_SR_2(Signal*, Uint32 ssId); }; #endif === modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-11-13 15:05:07 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-12-01 18:05:48 +0000 @@ -4829,25 +4829,30 @@ void Dbtc::sendCommitLqh(Signal* signal, Thostptr.i = regTcPtr->lastLqhNodeId; ptrCheckGuard(Thostptr, ThostFilesize, hostRecord); - UintR Tdata1 = regTcPtr->lastLqhCon; - UintR Tdata2 = Uint32(regApiPtr->globalcheckpointid >> 32); - UintR Tdata3 = regApiPtr->transid[0]; - UintR Tdata4 = regApiPtr->transid[1]; - UintR Tdata5 = Uint32(regApiPtr->globalcheckpointid); + Uint32 Tdata[5]; + Tdata[0] = regTcPtr->lastLqhCon; + Tdata[1] = Uint32(regApiPtr->globalcheckpointid >> 32); + Tdata[2] = regApiPtr->transid[0]; + Tdata[3] = regApiPtr->transid[1]; + Tdata[4] = Uint32(regApiPtr->globalcheckpointid); + Uint32 len = 5; - // wl4391_todo testing own config is wrong for mixed versions - bool send_unpacked = isNdbMtLqh(); + if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version))) + { + jam(); + ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_connected == false); + len = 4; + } + + // currently packed signal cannot address specific instance + const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0; if (send_unpacked) { Uint32* data = signal->getDataPtrSend(); - data[0] = Tdata1; - data[1] = Tdata2; - data[2] = Tdata3; - data[3] = Tdata4; - data[4] = Tdata5; + memcpy(&signal->theData[0], &Tdata[0], len << 2); Uint32 Tnode = Thostptr.i; Uint32 instanceKey = regTcPtr->lqhInstanceKey; BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode); - sendSignal(lqhRef, GSN_COMMIT, signal, 5, JBB); + sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB); return; } @@ -4857,23 +4862,14 @@ void Dbtc::sendCommitLqh(Signal* signal, } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); - }//if + } + + Tdata[0] |= (ZCOMMIT << 28); UintR Tindex = Thostptr.p->noOfPackedWordsLqh; UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex]; - TDataPtr[0] = Tdata1 | (ZCOMMIT << 28); - TDataPtr[1] = Tdata2; - TDataPtr[2] = Tdata3; - TDataPtr[3] = Tdata4; - TDataPtr[4] = Tdata5; - Thostptr.p->noOfPackedWordsLqh = Tindex + 5; - - if (unlikely(!ndb_check_micro_gcp(getNodeInfo(Thostptr.i).m_version))) - { - jam(); - ndbassert(Tdata5 == 0 || getNodeInfo(Thostptr.i).m_connected == false); - Thostptr.p->noOfPackedWordsLqh = Tindex + 4; // no gci_lo - } -}//Dbtc::sendCommitLqh() + memcpy(TDataPtr, &Tdata[0], len << 2); + Thostptr.p->noOfPackedWordsLqh = Tindex + len; +} void Dbtc::DIVER_node_fail_handling(Signal* signal, Uint64 Tgci) @@ -5222,16 +5218,16 @@ void Dbtc::sendCompleteLqh(Signal* signa Thostptr.i = regTcPtr->lastLqhNodeId; //last??? ptrCheckGuard(Thostptr, ThostFilesize, hostRecord); - UintR Tdata1 = regTcPtr->lastLqhCon; - UintR Tdata2 = regApiPtr->transid[0]; - UintR Tdata3 = regApiPtr->transid[1]; + Uint32 Tdata[3]; + Tdata[0] = regTcPtr->lastLqhCon; + Tdata[1] = regApiPtr->transid[0]; + Tdata[2] = regApiPtr->transid[1]; + Uint32 len = 3; - bool send_unpacked = isNdbMtLqh(); + // currently packed signal cannot address specific instance + const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0; if (send_unpacked) { - Uint32* data = signal->getDataPtrSend(); - data[0] = Tdata1; - data[1] = Tdata2; - data[2] = Tdata3; + memcpy(&signal->theData[0], &Tdata[0], len << 2); Uint32 Tnode = Thostptr.i; Uint32 instanceKey = regTcPtr->lqhInstanceKey; BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode); @@ -5245,14 +5241,14 @@ void Dbtc::sendCompleteLqh(Signal* signa } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); - }//if + } + + Tdata[0] |= (ZCOMPLETE << 28); UintR Tindex = Thostptr.p->noOfPackedWordsLqh; UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex]; - TDataPtr[0] = Tdata1 | (ZCOMPLETE << 28); - TDataPtr[1] = Tdata2; - TDataPtr[2] = Tdata3; - Thostptr.p->noOfPackedWordsLqh = Tindex + 3; -}//Dbtc::sendCompleteLqh() + memcpy(TDataPtr, &Tdata[0], len << 2); + Thostptr.p->noOfPackedWordsLqh = Tindex + len; +} void Dbtc::execTC_COMMIT_ACK(Signal* signal){ @@ -5301,22 +5297,25 @@ Dbtc::sendRemoveMarker(Signal* signal, hostPtr.i = nodeId; ptrCheckGuard(hostPtr, ThostFilesize, hostRecord); - UintR Tdata1 = 0; - UintR Tdata2 = transid1; - UintR Tdata3 = transid2; + Uint32 Tdata[3]; + Tdata[0] = 0; + Tdata[1] = transid1; + Tdata[2] = transid2; + Uint32 len = 3; - bool send_unpacked = isNdbMtLqh(); + // currently packed signals can not address specific instance + bool send_unpacked = getNodeInfo(hostPtr.i).m_lqh_workers != 0; if (send_unpacked) { - Uint32* data = signal->getDataPtrSend(); - data[0] = Tdata2; - data[1] = Tdata3; + jam(); + // first word omitted + memcpy(&signal->theData[0], &Tdata[1], (len - 1) << 2); Uint32 Tnode = hostPtr.i; Uint32 i; for (i = 0; i < MAX_NDBMT_LQH_WORKERS; i++) { // wl4391_todo skip workers not part of tx Uint32 instanceKey = 1 + i; BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode); - sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, 2, JBB); + sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, len - 1, JBB); } return; } @@ -5327,14 +5326,13 @@ Dbtc::sendRemoveMarker(Signal* signal, } else { jam(); updatePackedList(signal, hostPtr.p, hostPtr.i); - }//if + } UintR numWord = hostPtr.p->noOfPackedWordsLqh; UintR* dataPtr = &hostPtr.p->packedWordsLqh[numWord]; - dataPtr[0] = Tdata1 | (ZREMOVE_MARKER << 28); - dataPtr[1] = Tdata2; - dataPtr[2] = Tdata3; + Tdata[0] |= (ZREMOVE_MARKER << 28); + memcpy(dataPtr, &Tdata[0], len << 2); hostPtr.p->noOfPackedWordsLqh = numWord + 3; } @@ -12274,7 +12272,7 @@ Dbtc::validate_filter(Signal* signal) default: infoEvent("Invalid filter op: 0x%x pos: %ld", * start, - start - (signal->theData + 1)); + (long int)(start - (signal->theData + 1))); return false; } } === modified file 'storage/ndb/src/kernel/blocks/lgman.cpp' --- a/storage/ndb/src/kernel/blocks/lgman.cpp 2008-11-20 13:32:13 +0000 +++ b/storage/ndb/src/kernel/blocks/lgman.cpp 2008-12-01 18:03:48 +0000 @@ -56,11 +56,7 @@ Lgman::Lgman(Block_context & ctx) : m_tup(0), m_logfile_group_list(m_logfile_group_pool), m_logfile_group_hash(m_logfile_group_pool), -#ifdef __sun // temp - m_client_mutex(1, false) -#else - m_client_mutex(2, true) -#endif + m_client_mutex("lgman-client", 2, true) { BLOCK_CONSTRUCTOR(Lgman); === modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp' --- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2008-11-13 13:36:29 +0000 +++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2008-12-01 18:04:19 +0000 @@ -991,6 +991,9 @@ void Qmgr::execCM_REGCONF(Signal* signal c_clusterNodes.assign(NdbNodeBitmask::Size, cmRegConf->allNdbNodes); myNodePtr.p->ndynamicId = TdynamicId; + + // set own MT config here or in REF, and others in CM_NODEINFOREQ/CONF + setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers; /*--------------------------------------------------------------*/ // Send this as an EVENT REPORT to inform about hearing about @@ -1109,6 +1112,7 @@ Qmgr::sendCmNodeInfoReq(Signal* signal, req->dynamicId = self->ndynamicId; req->version = getNodeInfo(getOwnNodeId()).m_version; req->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version; + req->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers; const Uint32 ref = calcQmgrBlockRef(nodeId); sendSignal(ref,GSN_CM_NODEINFOREQ, signal, CmNodeInfoReq::SignalLength, JBB); DEBUG_START(GSN_CM_NODEINFOREQ, nodeId, ""); @@ -1214,6 +1218,9 @@ void Qmgr::execCM_REGREF(Signal* signal) skip_nodes.bitAND(c_definedNodes); c_start.m_skip_nodes.bitOR(skip_nodes); + + // set own MT config here or in CONF, and others in CM_NODEINFOREQ/CONF + setNodeInfo(getOwnNodeId()).m_lqh_workers = globalData.ndbMtLqhWorkers; char buf[100]; switch (TrefuseReason) { @@ -1661,11 +1668,17 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s const Uint32 dynamicId = conf->dynamicId; const Uint32 version = conf->version; Uint32 mysql_version = conf->mysql_version; + Uint32 lqh_workers = conf->lqh_workers; if (version < NDBD_SPLIT_VERSION) { jam(); mysql_version = 0; } + if (version < NDBD_MT_LQH_VERSION) + { + jam(); + lqh_workers = 0; + } NodeRecPtr nodePtr; nodePtr.i = getOwnNodeId(); @@ -1684,6 +1697,7 @@ void Qmgr::execCM_NODEINFOCONF(Signal* s replyNodePtr.p->blockRef = signal->getSendersBlockRef(); setNodeInfo(replyNodePtr.i).m_version = version; setNodeInfo(replyNodePtr.i).m_mysql_version = mysql_version; + setNodeInfo(replyNodePtr.i).m_lqh_workers = lqh_workers; recompute_version_info(NodeInfo::DB, version); @@ -1741,8 +1755,13 @@ void Qmgr::execCM_NODEINFOREQ(Signal* si Uint32 mysql_version = req->mysql_version; if (req->version < NDBD_SPLIT_VERSION) mysql_version = 0; - setNodeInfo(addNodePtr.i).m_mysql_version = mysql_version; + + Uint32 lqh_workers = req->lqh_workers; + if (req->version < NDBD_MT_LQH_VERSION) + lqh_workers = 0; + setNodeInfo(addNodePtr.i).m_lqh_workers = lqh_workers; + c_maxDynamicId = req->dynamicId; cmAddPrepare(signal, addNodePtr, nodePtr.p); @@ -1799,6 +1818,7 @@ Qmgr::cmAddPrepare(Signal* signal, NodeR conf->dynamicId = self->ndynamicId; conf->version = getNodeInfo(getOwnNodeId()).m_version; conf->mysql_version = getNodeInfo(getOwnNodeId()).m_mysql_version; + conf->lqh_workers = getNodeInfo(getOwnNodeId()).m_lqh_workers; sendSignal(nodePtr.p->blockRef, GSN_CM_NODEINFOCONF, signal, CmNodeInfoConf::SignalLength, JBB); DEBUG_START(GSN_CM_NODEINFOCONF, refToNode(nodePtr.p->blockRef), ""); === modified file 'storage/ndb/src/kernel/blocks/tsman.cpp' --- a/storage/ndb/src/kernel/blocks/tsman.cpp 2008-11-20 13:32:13 +0000 +++ b/storage/ndb/src/kernel/blocks/tsman.cpp 2008-12-01 18:03:48 +0000 @@ -45,11 +45,7 @@ Tsman::Tsman(Block_context& ctx) : m_pgman(0), m_lgman(0), m_tup(0), -#ifdef __sun // temp - m_client_mutex(1, false) -#else - m_client_mutex(2, true) -#endif + m_client_mutex("tsman-client", 2, true) { BLOCK_CONSTRUCTOR(Tsman); === modified file 'storage/ndb/src/kernel/main.cpp' --- a/storage/ndb/src/kernel/main.cpp 2008-11-09 18:37:29 +0000 +++ b/storage/ndb/src/kernel/main.cpp 2008-12-01 18:04:19 +0000 @@ -293,8 +293,10 @@ get_multithreaded_config(EmulatorData& e { // multithreaded is compiled in ndbd/ndbmtd for now globalData.isNdbMt = SimulatedBlock::isMultiThreaded(); - if (!globalData.isNdbMt) + if (!globalData.isNdbMt) { + ndbout << "NDBMT: non-mt" << endl; return 0; + } const ndb_mgm_configuration_iterator * p = ed.theConfiguration->getOwnConfigIterator(); === modified file 'storage/ndb/src/kernel/vm/SafeMutex.cpp' --- a/storage/ndb/src/kernel/vm/SafeMutex.cpp 2008-11-20 13:32:13 +0000 +++ b/storage/ndb/src/kernel/vm/SafeMutex.cpp 2008-12-01 18:03:48 +0000 @@ -15,94 +15,151 @@ #include "SafeMutex.hpp" -NdbOut& -operator<<(NdbOut& out, const SafeMutex& dm) -{ - out << "level=" << dm.m_level << "," << "usage=" << dm.m_usage; - return out; -} - int SafeMutex::create() { - if (m_init) - return ErrState; - int ret = -1; -#ifdef HAVE_PTHREAD_MUTEX_RECURSIVE -#ifndef __WIN__ - if (m_limit > 1 || m_debug) { - pthread_mutexattr_t attr; - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - ret = pthread_mutex_init(&m_mutex, &attr); - } else { - // error-check mutex does not work right on my linux, skip it - ret = pthread_mutex_init(&m_mutex, 0); - } -#else - ret = pthread_mutex_init(&m_mutex, 0); -#endif -#else - if (m_limit > 1 || m_debug) - return ErrUnsupp; + int ret; + if (m_initdone) + return err(ErrState, __LINE__); ret = pthread_mutex_init(&m_mutex, 0); -#endif if (ret != 0) - return ret; - m_init = true; + return err(ret, __LINE__); + ret = pthread_cond_init(&m_cond, 0); + if (ret != 0) + return err(ret, __LINE__); + m_initdone = true; return 0; } int SafeMutex::destroy() { - if (!m_init) - return ErrState; - int ret = pthread_mutex_destroy(&m_mutex); + int ret; + if (!m_initdone) + return err(ErrState, __LINE__); + ret = pthread_cond_destroy(&m_cond); if (ret != 0) - return ret; - m_init = false; + return err(ret, __LINE__); + ret = pthread_mutex_destroy(&m_mutex); + if (ret != 0) + return err(ret, __LINE__); + m_initdone = false; return 0; } int SafeMutex::lock() { - pthread_t self = pthread_self(); - int ret = pthread_mutex_lock(&m_mutex); - /* have mutex */ + int ret; + if (m_simple) { + ret = pthread_mutex_lock(&m_mutex); + if (ret != 0) + return err(ret, __LINE__); + return 0; + } + ret = pthread_mutex_lock(&m_mutex); if (ret != 0) - return ret; - if (!(m_level < m_limit)) - return ErrLevel; - m_level++; - if (m_level > m_usage) - m_usage = m_level; - if (m_level == 1 && m_owner != 0) - return ErrOwner1; - if (m_level >= 2 && m_owner != self) - return ErrOwner2; - m_owner = self; + return err(ret, __LINE__); + return lock_impl(); +} + +int +SafeMutex::lock_impl() +{ + int ret; + pthread_t self = pthread_self(); + assert(self != 0); + while (1) { + if (m_level == 0) { + assert(m_owner == 0); + m_owner = self; + } else if (m_owner != self) { + ret = pthread_cond_wait(&m_cond, &m_mutex); + if (ret != 0) + return err(ret, __LINE__); + continue; + } + if (!(m_level < m_limit)) + return err(ErrLevel, __LINE__); + m_level++; + if (m_usage < m_level) + m_usage = m_level; + ret = pthread_cond_signal(&m_cond); + if (ret != 0) + return err(ret, __LINE__); + ret = pthread_mutex_unlock(&m_mutex); + if (ret != 0) + return err(ret, __LINE__); + break; + } return 0; } int SafeMutex::unlock() { + int ret; + if (m_simple) { + ret = pthread_mutex_unlock(&m_mutex); + if (ret != 0) + return err(ret, __LINE__); + return 0; + } + ret = pthread_mutex_lock(&m_mutex); + if (ret != 0) + return err(ret, __LINE__); + return unlock_impl(); +} + +int +SafeMutex::unlock_impl() +{ + int ret; pthread_t self = pthread_self(); - if (!(m_level > 0)) - return ErrState; + assert(self != 0); if (m_owner != self) - return ErrOwner3; - if (m_level == 1) - m_owner = 0; + return err(ErrOwner, __LINE__); + if (m_level == 0) + return err(ErrNolock, __LINE__); m_level--; - int ret = pthread_mutex_unlock(&m_mutex); - /* lose mutex */ + if (m_level == 0) { + m_owner = 0; + ret = pthread_cond_signal(&m_cond); + if (ret != 0) + return err(ret, __LINE__); + } + ret = pthread_mutex_unlock(&m_mutex); if (ret != 0) - return ret; + return err(ret, __LINE__); return 0; } +int +SafeMutex::err(int errcode, int errline) +{ + assert(errcode != 0); + m_errcode = errcode; + m_errline = errline; + ndbout << *this << endl; +#ifdef UNIT_TEST + abort(); +#endif + return errcode; +} + +NdbOut& +operator<<(NdbOut& out, const SafeMutex& sm) +{ + out << sm.m_name << ":"; + out << " level=" << sm.m_level; + out << " usage=" << sm.m_usage; + if (sm.m_errcode != 0) { + out << " errcode=" << sm.m_errcode; + out << " errline=" << sm.m_errline; + } + return out; +} + #ifdef UNIT_TEST struct sm_thr { @@ -144,10 +201,12 @@ sm_run(void* arg) } if (op == +1) { assert(level < thr.limit); + //ndbout << thr.index << ": lock" << endl; int ret = sm.lock(); assert(ret == 0); level++; } else if (op == -1) { + //ndbout << thr.index << ": unlock" << endl; int ret = sm.unlock(); assert(ret == 0); assert(level != 0); @@ -161,6 +220,7 @@ sm_run(void* arg) assert(ret == 0); level--; } + return 0; } int @@ -169,18 +229,19 @@ main(int argc, char** argv) const uint max_thr = 128; struct sm_thr thr[max_thr]; - // threads - loops - max level + // threads - loops - max level - debug uint num_thr = argc > 1 ? atoi(argv[1]) : 4; assert(num_thr != 0 && num_thr <= max_thr); uint loops = argc > 2 ? atoi(argv[2]) : 1000000; uint limit = argc > 3 ? atoi(argv[3]) : 10; assert(limit != 0); + bool debug = argc > 4 ? atoi(argv[4]) : true; ndbout << "threads=" << num_thr; ndbout << " loops=" << loops; ndbout << " max level=" << limit << endl; - SafeMutex sm(limit, true); + SafeMutex sm("test-mutex", limit, debug); int ret; ret = sm.create(); assert(ret == 0); === modified file 'storage/ndb/src/kernel/vm/SafeMutex.hpp' --- a/storage/ndb/src/kernel/vm/SafeMutex.hpp 2008-11-20 13:32:13 +0000 +++ b/storage/ndb/src/kernel/vm/SafeMutex.hpp 2008-12-01 18:03:48 +0000 @@ -26,61 +26,70 @@ #include #include -#undef HAVE_PTHREAD_MUTEX_RECURSIVE -#ifdef __linux -#define HAVE_PTHREAD_MUTEX_RECURSIVE -#endif - /* - * Recursive mutex with a recursion limit >= 1. Can be useful for - * debugging. If a recursive mutex is not wanted, one must rewrite - * caller code until limit 1 works. + * Recursive mutex with recursion limit >= 1. Intended for debugging. + * One should rewrite caller code until limit 1 works. * - * Implementation for limit > 1 uses a real OS recursive mutex. Should - * work on linux and solaris 10. There is a unit test testSafeMutex. + * The implementation uses a default mutex. If limit is > 1 or debug + * is specified then a recursive mutex is simulated. Operating system + * recursive mutex (if any) is not used. The simulation is several + * times slower. There is a unit test testSafeMutex. * * The caller currently is multi-threaded disk data. Here it is easy * to verify that the mutex is released within a time-slice. */ class SafeMutex { + const char* const m_name; + const Uint32 m_limit; // error if usage exceeds this + const bool m_debug; // use recursive implementation even for limit 1 + const bool m_simple; pthread_mutex_t m_mutex; + pthread_cond_t m_cond; pthread_t m_owner; - bool m_init; + bool m_initdone; Uint32 m_level; Uint32 m_usage; // max level used so far - const Uint32 m_limit; // error if usage exceeds this - const bool m_debug; // use recursive mutex even for limit 1 + int m_errcode; + int m_errline; + int err(int errcode, int errline); friend class NdbOut& operator<<(NdbOut&, const SafeMutex&); public: - SafeMutex(Uint32 limit, bool debug) : + SafeMutex(const char* name, Uint32 limit, bool debug) : + m_name(name), m_limit(limit), - m_debug(debug) + m_debug(debug), + m_simple(!(limit > 1 || debug)) { assert(m_limit >= 1), m_owner = 0; // wl4391_todo assuming numeric non-zero - m_init = false; + m_initdone = false; m_level = 0; m_usage = 0; + m_errcode = 0; + m_errline = 0; }; ~SafeMutex() { - (void)destroy(); + if (m_initdone) + (void)destroy(); } enum { - // caller must crash on any error - ErrUnsupp = -101, // limit > 1 or debug, and not supported by OS - ErrState = -102, // user error - ErrLevel = -103, // level exceeded limit - ErrOwner1 = -104, // owner not 0 at first lock (OS error) - ErrOwner2 = -105, // owner not self at recursive lock (OS error) - ErrOwner3 = -106 // owner not self at unlock (OS error) + // caller must crash on any error - recovery is not possible + ErrState = -101, // user error + ErrLevel = -102, // level exceeded limit + ErrOwner = -103, // unlock when not owner + ErrNolock = -104 // unlock when no lock }; int create(); int destroy(); int lock(); int unlock(); + +private: + int lock_impl(); + int unlock_impl(); }; #endif === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-11-19 11:01:17 +0000 +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-12-01 18:05:11 +0000 @@ -544,6 +544,7 @@ protected: BlockReference calcInstanceBlockRef(BlockNumber aBlock); // matching instance on another node e.g. LQH-LQH + // valid only if receiver has same number of workers BlockReference calcInstanceBlockRef(BlockNumber aBlock, NodeId aNode); /**