From: Pekka Nousiainen Date: October 17 2012 2:46pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (pekka.nousiainen:4037 to 4038) List-Archive: http://lists.mysql.com/commits/145067 Message-Id: <20121017144631.9153.19594.4038@cuda> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4038 Pekka Nousiainen 2012-10-17 [merge] merge to 7.2 modified: storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp storage/ndb/src/kernel/vm/SimulatedBlock.hpp 4037 Pekka Nousiainen 2012-10-17 [merge] merge to 7.2 modified: sql/ha_ndbcluster_binlog.cc === modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-08-13 14:03:42 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2012-10-17 14:43:50 +0000 @@ -950,16 +950,11 @@ public: typedef Ptr GcpRecordPtr; struct HostRecord { + struct PackedWordsContainer lqh_pack[MAX_NDBMT_LQH_THREADS+1]; + struct PackedWordsContainer tc_pack[MAX_NDBMT_TC_THREADS+1]; Uint8 inPackedList; Uint8 nodestatus; - Uint8 _unused[2]; - UintR noOfPackedWordsLqh; - UintR packedWordsLqh[30]; - UintR noOfPackedWordsTc; - UintR packedWordsTc[29]; - BlockReference hostLqhBlockRef; - BlockReference hostTcBlockRef; - };// Size 128 bytes + }; typedef Ptr HostRecordPtr; /* $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ */ @@ -2433,8 +2428,8 @@ private: void sendLqhkeyconfTc(Signal* signal, BlockReference atcBlockref); void sendCommitLqh(Signal* signal, BlockReference alqhBlockref); void sendCompleteLqh(Signal* signal, BlockReference alqhBlockref); - void sendPackedSignalLqh(Signal* signal, HostRecord * ahostptr); - void sendPackedSignalTc(Signal* signal, HostRecord * ahostptr); + void sendPackedSignal(Signal* signal, + struct PackedWordsContainer * container); void cleanUp(Signal* signal); void sendAttrinfoLoop(Signal* signal); void sendAttrinfoSignal(Signal* signal); === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-09-28 23:55:14 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-10-17 14:43:50 +0000 @@ -844,7 +844,7 @@ void Dblqh::execNDB_STTOR(Signal* signal /* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ void Dblqh::startphase1Lab(Signal* signal, Uint32 _dummy, Uint32 ownNodeId) { - UintR Ti; + UintR Ti, Tj; HostRecordPtr ThostPtr; /* ------- INITIATE ALL RECORDS ------- */ @@ -861,11 +861,19 @@ void Dblqh::startphase1Lab(Signal* signa * Valid only if receiver has same number of LQH workers. * In general full instance key of fragment must be used. */ - ThostPtr.p->hostLqhBlockRef = calcInstanceBlockRef(DBLQH, ThostPtr.i); - ThostPtr.p->hostTcBlockRef = calcTcBlockRef(ThostPtr.i); ThostPtr.p->inPackedList = false; - ThostPtr.p->noOfPackedWordsLqh = 0; - ThostPtr.p->noOfPackedWordsTc = 0; + for (Tj = 0; Tj < NDB_ARRAY_SIZE(ThostPtr.p->lqh_pack); Tj++) + { + ThostPtr.p->lqh_pack[Tj].noOfPackedWords = 0; + ThostPtr.p->lqh_pack[Tj].hostBlockRef = + numberToRef(DBLQH, Tj, ThostPtr.i); + } + for (Tj = 0; Tj < NDB_ARRAY_SIZE(ThostPtr.p->tc_pack); Tj++) + { + ThostPtr.p->tc_pack[Tj].noOfPackedWords = 0; + ThostPtr.p->tc_pack[Tj].hostBlockRef = + numberToRef(DBTC, Tj, ThostPtr.i); + } ThostPtr.p->nodestatus = ZNODE_DOWN; }//for cpackedListIndex = 0; @@ -3452,6 +3460,7 @@ void Dblqh::execSEND_PACKED(Signal* sign { HostRecordPtr Thostptr; UintR i; + UintR j; UintR TpackedListIndex = cpackedListIndex; jamEntry(); for (i = 0; i < TpackedListIndex; i++) { @@ -3459,14 +3468,22 @@ void Dblqh::execSEND_PACKED(Signal* sign ptrAss(Thostptr, hostRecord); jam(); ndbrequire(Thostptr.i - 1 < MAX_NDB_NODES - 1); - if (Thostptr.p->noOfPackedWordsLqh > 0) { - jam(); - sendPackedSignalLqh(signal, Thostptr.p); - }//if - if (Thostptr.p->noOfPackedWordsTc > 0) { - jam(); - sendPackedSignalTc(signal, Thostptr.p); - }//if + for (j = 0; j < NDB_ARRAY_SIZE(Thostptr.p->lqh_pack); j++) + { + struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[j]; + if (container->noOfPackedWords > 0) { + jam(); + sendPackedSignal(signal, container); + } + } + for (j = 0; j < NDB_ARRAY_SIZE(Thostptr.p->tc_pack); j++) + { + struct PackedWordsContainer * container = &Thostptr.p->tc_pack[j]; + if (container->noOfPackedWords > 0) { + jam(); + sendPackedSignal(signal, container); + } + } Thostptr.p->inPackedList = false; }//for cpackedListIndex = 0; @@ -3708,33 +3725,39 @@ void Dblqh::execTUPKEYREF(Signal* signal }//switch }//Dblqh::execTUPKEYREF() -void Dblqh::sendPackedSignalLqh(Signal* signal, HostRecord * ahostptr) +void Dblqh::sendPackedSignal(Signal* signal, + struct PackedWordsContainer * container) { - Uint32 noOfWords = ahostptr->noOfPackedWordsLqh; - BlockReference hostRef = ahostptr->hostLqhBlockRef; + Uint32 noOfWords = container->noOfPackedWords; + BlockReference hostRef = container->hostBlockRef; + container->noOfPackedWords = 0; MEMCOPY_NO_WORDS(&signal->theData[0], - &ahostptr->packedWordsLqh[0], + &container->packedWords[0], noOfWords); sendSignal(hostRef, GSN_PACKED_SIGNAL, signal, noOfWords, JBB); - ahostptr->noOfPackedWordsLqh = 0; -}//Dblqh::sendPackedSignalLqh() - -void Dblqh::sendPackedSignalTc(Signal* signal, HostRecord * ahostptr) -{ - Uint32 noOfWords = ahostptr->noOfPackedWordsTc; - BlockReference hostRef = ahostptr->hostTcBlockRef; - MEMCOPY_NO_WORDS(&signal->theData[0], - &ahostptr->packedWordsTc[0], - noOfWords); - sendSignal(hostRef, GSN_PACKED_SIGNAL, signal, noOfWords, JBB); - ahostptr->noOfPackedWordsTc = 0; -}//Dblqh::sendPackedSignalTc() +} void Dblqh::sendCommitLqh(Signal* signal, BlockReference alqhBlockref) { + Uint32 instanceKey = refToInstance(alqhBlockref); + ndbassert(refToMain(alqhBlockref) == DBLQH); + + if (instanceKey > MAX_NDBMT_LQH_THREADS) + { + /* No send packed support in these cases */ + jam(); + signal->theData[0] = tcConnectptr.p->clientConnectrec; + signal->theData[1] = tcConnectptr.p->transid[0]; + signal->theData[2] = tcConnectptr.p->transid[1]; + sendSignal(alqhBlockref, GSN_COMMIT, signal, 3, JBB); + return; + } + HostRecordPtr Thostptr; + Thostptr.i = refToNode(alqhBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); + struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey]; Uint32 Tdata[5]; Tdata[0] = tcConnectptr.p->clientConnectrec; @@ -3751,40 +3774,41 @@ void Dblqh::sendCommitLqh(Signal* signal len = 4; } - // currently packed signal cannot address specific instance - const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0; - if (send_unpacked) { + if (container->noOfPackedWords > 25 - len) { jam(); - FragrecordPtr Tfragptr; - Tfragptr.i = tcConnectptr.p->fragmentptr; - c_fragment_pool.getPtr(Tfragptr); - memcpy(&signal->theData[0], &Tdata[0], len << 2); - Uint32 Tnode = Thostptr.i; - Uint32 instanceKey = Tfragptr.p->lqhInstanceKey; - BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode); - sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB); - return; - } - - if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) { - jam(); - sendPackedSignalLqh(signal, Thostptr.p); + sendPackedSignal(signal, container); } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); } Tdata[0] |= (ZCOMMIT << 28); - Uint32 pos = Thostptr.p->noOfPackedWordsLqh; - memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2); - Thostptr.p->noOfPackedWordsLqh = pos + len; + Uint32 pos = container->noOfPackedWords; + container->noOfPackedWords = pos + len; + memcpy(&container->packedWords[pos], &Tdata[0], len << 2); } void Dblqh::sendCompleteLqh(Signal* signal, BlockReference alqhBlockref) { + Uint32 instanceKey = refToInstance(alqhBlockref); + ndbassert(refToMain(alqhBlockref) == DBLQH); + + if (instanceKey > MAX_NDBMT_LQH_THREADS) + { + /* No send packed support in these cases */ + jam(); + signal->theData[0] = tcConnectptr.p->clientConnectrec; + signal->theData[1] = tcConnectptr.p->transid[0]; + signal->theData[2] = tcConnectptr.p->transid[1]; + sendSignal(alqhBlockref, GSN_COMPLETE, signal, 3, JBB); + return; + } + HostRecordPtr Thostptr; + Thostptr.i = refToNode(alqhBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); + struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey]; Uint32 Tdata[3]; Tdata[0] = tcConnectptr.p->clientConnectrec; @@ -3792,39 +3816,28 @@ void Dblqh::sendCompleteLqh(Signal* sign Tdata[2] = tcConnectptr.p->transid[1]; Uint32 len = 3; - // currently packed signal cannot address specific instance - const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers != 0; - if (send_unpacked) { + if (container->noOfPackedWords > 22) { jam(); - FragrecordPtr Tfragptr; - Tfragptr.i = tcConnectptr.p->fragmentptr; - c_fragment_pool.getPtr(Tfragptr); - memcpy(&signal->theData[0], &Tdata[0], len << 2); - Uint32 Tnode = Thostptr.i; - Uint32 instanceKey = Tfragptr.p->lqhInstanceKey; - BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode); - sendSignal(lqhRef, GSN_COMPLETE, signal, len, JBB); - return; - } - - if (Thostptr.p->noOfPackedWordsLqh > 22) { - jam(); - sendPackedSignalLqh(signal, Thostptr.p); + sendPackedSignal(signal, container); } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); } Tdata[0] |= (ZCOMPLETE << 28); - Uint32 pos = Thostptr.p->noOfPackedWordsLqh; - memcpy(&Thostptr.p->packedWordsLqh[pos], &Tdata[0], len << 2); - Thostptr.p->noOfPackedWordsLqh = pos + len; + Uint32 pos = container->noOfPackedWords; + container->noOfPackedWords = pos + len; + memcpy(&container->packedWords[pos], &Tdata[0], len << 2); } void Dblqh::sendCommittedTc(Signal* signal, BlockReference atcBlockref) { - if (refToInstance(atcBlockref)) + Uint32 instanceKey = refToInstance(atcBlockref); + + ndbassert(refToMain(atcBlockref) == DBTC); + if (instanceKey > MAX_NDBMT_TC_THREADS) { + /* No send packed support in these cases */ jam(); signal->theData[0] = tcConnectptr.p->clientConnectrec; signal->theData[1] = tcConnectptr.p->transid[0]; @@ -3836,6 +3849,7 @@ void Dblqh::sendCommittedTc(Signal* sign HostRecordPtr Thostptr; Thostptr.i = refToNode(atcBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); + struct PackedWordsContainer * container = &Thostptr.p->tc_pack[instanceKey]; Uint32 Tdata[3]; Tdata[0] = tcConnectptr.p->clientConnectrec; @@ -3843,34 +3857,28 @@ void Dblqh::sendCommittedTc(Signal* sign Tdata[2] = tcConnectptr.p->transid[1]; Uint32 len = 3; - // currently TC is single-threaded - const bool send_unpacked = false; - if (send_unpacked) { - jam(); - memcpy(&signal->theData[0], &Tdata[0], len << 2); - BlockReference tcRef = Thostptr.p->hostTcBlockRef; - sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB); - return; - } - - if (Thostptr.p->noOfPackedWordsTc > 22) { + if (container->noOfPackedWords > 22) { jam(); - sendPackedSignalTc(signal, Thostptr.p); + sendPackedSignal(signal, container); } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); } Tdata[0] |= (ZCOMMITTED << 28); - Uint32 pos = Thostptr.p->noOfPackedWordsTc; - memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2); - Thostptr.p->noOfPackedWordsTc = pos + len; + Uint32 pos = container->noOfPackedWords; + container->noOfPackedWords = pos + len; + memcpy(&container->packedWords[pos], &Tdata[0], len << 2); } void Dblqh::sendCompletedTc(Signal* signal, BlockReference atcBlockref) { - if (refToInstance(atcBlockref)) + Uint32 instanceKey = refToInstance(atcBlockref); + + ndbassert(refToMain(atcBlockref) == DBTC); + if (instanceKey > MAX_NDBMT_TC_THREADS) { + /* No handling of send packed in those cases */ jam(); signal->theData[0] = tcConnectptr.p->clientConnectrec; signal->theData[1] = tcConnectptr.p->transid[0]; @@ -3882,6 +3890,7 @@ void Dblqh::sendCompletedTc(Signal* sign HostRecordPtr Thostptr; Thostptr.i = refToNode(atcBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); + struct PackedWordsContainer * container = &Thostptr.p->tc_pack[instanceKey]; Uint32 Tdata[3]; Tdata[0] = tcConnectptr.p->clientConnectrec; @@ -3889,110 +3898,107 @@ void Dblqh::sendCompletedTc(Signal* sign Tdata[2] = tcConnectptr.p->transid[1]; Uint32 len = 3; - // currently TC is single-threaded - const bool send_unpacked = false; - if (send_unpacked) { - jam(); - memcpy(&signal->theData[0], &Tdata[0], len << 2); - BlockReference tcRef = Thostptr.p->hostTcBlockRef; - sendSignal(tcRef, GSN_COMMITTED, signal, len, JBB); - return; - } - - if (Thostptr.p->noOfPackedWordsTc > 22) { + if (container->noOfPackedWords > 22) { jam(); - sendPackedSignalTc(signal, Thostptr.p); + sendPackedSignal(signal, container); } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); } Tdata[0] |= (ZCOMPLETED << 28); - Uint32 pos = Thostptr.p->noOfPackedWordsTc; - memcpy(&Thostptr.p->packedWordsTc[pos], &Tdata[0], len << 2); - Thostptr.p->noOfPackedWordsTc = pos + len; + Uint32 pos = container->noOfPackedWords; + container->noOfPackedWords = pos + len; + memcpy(&container->packedWords[pos], &Tdata[0], len << 2); } void Dblqh::sendLqhkeyconfTc(Signal* signal, BlockReference atcBlockref) { LqhKeyConf* lqhKeyConf; + struct PackedWordsContainer * container; + bool send_packed = true; HostRecordPtr Thostptr; - - bool packed= true; Thostptr.i = refToNode(atcBlockref); + Uint32 instanceKey = refToInstance(atcBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); - if (refToBlock(atcBlockref) == DBTC) { - jam(); + Uint32 block = refToMain(atcBlockref); + + if (block == DBLQH) + { + if (instanceKey <= MAX_NDBMT_LQH_THREADS) + { + container = &Thostptr.p->lqh_pack[instanceKey]; + } + else + { + send_packed = false; + } + } + else if (block == DBTC) + { + if (instanceKey <= MAX_NDBMT_TC_THREADS) + { + container = &Thostptr.p->tc_pack[instanceKey]; + } + else + { + send_packed = false; + } + } + else + { + send_packed = false; + } + /******************************************************************* +// Normal path // This signal was intended for DBTC as part of the normal transaction // execution. -********************************************************************/ - if (Thostptr.p->noOfPackedWordsTc > (25 - LqhKeyConf::SignalLength)) { - jam(); - sendPackedSignalTc(signal, Thostptr.p); - } else { - jam(); - updatePackedList(signal, Thostptr.p, Thostptr.i); - }//if - lqhKeyConf = (LqhKeyConf *) - &Thostptr.p->packedWordsTc[Thostptr.p->noOfPackedWordsTc]; - Thostptr.p->noOfPackedWordsTc += LqhKeyConf::SignalLength; - } else if(refToMain(atcBlockref) == DBLQH && - refToInstance(atcBlockref) == instance()) { - //wl4391_todo check - jam(); -/******************************************************************* +// More unusual path // This signal was intended for DBLQH as part of log execution or // node recovery. +// Yet another path +// Intended for DBSPJ as part of join processing ********************************************************************/ - if (Thostptr.p->noOfPackedWordsLqh > (25 - LqhKeyConf::SignalLength)) { + if (send_packed) + { + if (container->noOfPackedWords > (25 - LqhKeyConf::SignalLength)) { jam(); - sendPackedSignalLqh(signal, Thostptr.p); + sendPackedSignal(signal, container); } else { jam(); updatePackedList(signal, Thostptr.p, Thostptr.i); }//if lqhKeyConf = (LqhKeyConf *) - &Thostptr.p->packedWordsLqh[Thostptr.p->noOfPackedWordsLqh]; - Thostptr.p->noOfPackedWordsLqh += LqhKeyConf::SignalLength; - } else { - packed= false; - lqhKeyConf = (LqhKeyConf *)signal->getDataPtrSend(); + &container->packedWords[container->noOfPackedWords]; + container->noOfPackedWords += LqhKeyConf::SignalLength; } + else + { + lqhKeyConf = (LqhKeyConf *)&signal->theData[0]; + } + Uint32 ptrAndType = tcConnectptr.i | (ZLQHKEYCONF << 28); Uint32 tcOprec = tcConnectptr.p->tcOprec; Uint32 ownRef = cownref; + lqhKeyConf->connectPtr = ptrAndType; + lqhKeyConf->opPtr = tcOprec; + lqhKeyConf->userRef = ownRef; + Uint32 readlenAi = tcConnectptr.p->readlenAi; Uint32 transid1 = tcConnectptr.p->transid[0]; Uint32 transid2 = tcConnectptr.p->transid[1]; Uint32 noFiredTriggers = tcConnectptr.p->noFiredTriggers; - lqhKeyConf->connectPtr = ptrAndType; - lqhKeyConf->opPtr = tcOprec; - lqhKeyConf->userRef = ownRef; lqhKeyConf->readLen = readlenAi; lqhKeyConf->transId1 = transid1; lqhKeyConf->transId2 = transid2; lqhKeyConf->noFiredTriggers = noFiredTriggers; - if(!packed) + if (!send_packed) { lqhKeyConf->connectPtr = tcConnectptr.i; - if (instance() == refToInstance(atcBlockref) && - (Thostptr.i == 0 || Thostptr.i == getOwnNodeId()) && - globalData.ndbMtTcThreads == 0) - { - /** - * This EXECUTE_DIRECT is multi-thread safe, as we only get here - * for RESTORE block. - */ - EXECUTE_DIRECT(refToMain(atcBlockref), GSN_LQHKEYCONF, - signal, LqhKeyConf::SignalLength); - } - else - { - sendSignal(atcBlockref, GSN_LQHKEYCONF, - signal, LqhKeyConf::SignalLength, JBB); - } + sendSignal(atcBlockref, GSN_LQHKEYCONF, + signal, LqhKeyConf::SignalLength, JBB); } }//Dblqh::sendLqhkeyconfTc() @@ -7595,7 +7601,10 @@ Dblqh::sendFireTrigConfTc(Signal* signal BlockReference atcBlockref, Uint32 Tdata[]) { - if (refToInstance(atcBlockref) != 0) + Uint32 instanceKey = refToInstance(atcBlockref); + + ndbassert(refToMain(atcBlockref) == DBTC); + if (instanceKey > MAX_NDBMT_TC_THREADS) { jam(); memcpy(signal->theData, Tdata, 4 * FireTrigConf::SignalLength); @@ -7605,15 +7614,15 @@ Dblqh::sendFireTrigConfTc(Signal* signal } HostRecordPtr Thostptr; - Uint32 len = FireTrigConf::SignalLength; - Thostptr.i = refToNode(atcBlockref); ptrCheckGuard(Thostptr, chostFileSize, hostRecord); + Uint32 len = FireTrigConf::SignalLength; + struct PackedWordsContainer * container = &Thostptr.p->tc_pack[instanceKey]; - if (Thostptr.p->noOfPackedWordsTc > (25 - len)) + if (container->noOfPackedWords > (25 - len)) { jam(); - sendPackedSignalTc(signal, Thostptr.p); + sendPackedSignal(signal, container); } else { @@ -7622,8 +7631,8 @@ Dblqh::sendFireTrigConfTc(Signal* signal } ndbassert(FireTrigConf::SignalLength == 4); - Uint32 * dst = &Thostptr.p->packedWordsTc[Thostptr.p->noOfPackedWordsTc]; - Thostptr.p->noOfPackedWordsTc += len; + Uint32 * dst = &container->packedWords[container->noOfPackedWords]; + container->noOfPackedWords += len; dst[0] = Tdata[0] | (ZFIRE_TRIG_CONF << 28); dst[1] = Tdata[1]; dst[2] = Tdata[2]; === modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp' --- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2012-09-12 15:11:40 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2012-10-17 14:43:50 +0000 @@ -398,6 +398,10 @@ public: AttributeBuffer::DataBufferPool c_theAttributeBufferPool; + typedef DataBuffer<5> CommitAckMarkerBuffer; + + CommitAckMarkerBuffer::DataBufferPool c_theCommitAckMarkerBufferPool; + UintR c_transactionBufferSpace; @@ -998,14 +1002,11 @@ public: /* THIS RECORD IS ALIGNED TO BE 128 BYTES. */ /********************************************************/ struct HostRecord { + struct PackedWordsContainer lqh_pack[MAX_NDBMT_LQH_THREADS+1]; + struct PackedWordsContainer packTCKEYCONF; HostState hostStatus; LqhTransState lqhTransStatus; bool inPackedList; - UintR noOfPackedWordsLqh; - UintR packedWordsLqh[26]; - UintR noOfWordsTCKEYCONF; - UintR packedWordsTCKEYCONF[30]; - BlockReference hostLqhBlockRef; enum NodeFailBits { @@ -1017,7 +1018,7 @@ public: }; Uint32 m_nf_bits; NdbNodeBitmask m_lqh_trans_conf; - }; /* p2c: size = 128 bytes */ + }; typedef Ptr HostRecordPtr; @@ -1454,7 +1455,8 @@ private: void sendPackedTCKEYCONF(Signal* signal, HostRecord * ahostptr, UintR hostId); - void sendPackedSignalLqh(Signal* signal, HostRecord * ahostptr); + void sendPackedSignal(Signal* signal, + struct PackedWordsContainer * container); Uint32 sendCommitLqh(Signal* signal, TcConnectRecord * const regTcPtr); Uint32 sendCompleteLqh(Signal* signal, @@ -1473,14 +1475,14 @@ private: void timeOutFoundFragLab(Signal* signal, Uint32 TscanConPtr); void timeOutLoopStartFragLab(Signal* signal, Uint32 TscanConPtr); int releaseAndAbort(Signal* signal); - void findApiConnectFail(Signal* signal); + void findApiConnectFail(Signal* signal, Uint32 instanceKey); void findTcConnectFail(Signal* signal, Uint32 instanceKey); - void initApiConnectFail(Signal* signal); + void initApiConnectFail(Signal* signal, Uint32 instanceKey); void initTcConnectFail(Signal* signal, Uint32 instanceKey); void initTcFail(Signal* signal); void releaseTakeOver(Signal* signal); void setupFailData(Signal* signal); - void updateApiStateFail(Signal* signal); + void updateApiStateFail(Signal* signal, Uint32 instanceKey); void updateTcStateFail(Signal* signal, Uint32 instanceKey); void handleApiFailState(Signal* signal, UintR anApiConnectptr); void handleFailedApiNode(Signal* signal, @@ -1975,7 +1977,7 @@ public: Uint32 prevHash; Uint32 apiConnectPtr; Uint16 apiNodeId; - NdbNodeBitmask m_commit_ack_marker_nodes; + CommitAckMarkerBuffer::Head theDataBuffer; inline bool equal(const CommitAckMarker & p) const { return ((p.transid1 == transid1) && (p.transid2 == transid2)); @@ -1984,6 +1986,9 @@ public: inline Uint32 hashValue() const { return transid1; } + bool insert_in_commit_ack_marker(Dbtc *tc, + Uint32 instanceKey, + NodeId nodeId); }; private: @@ -1995,9 +2000,10 @@ private: RSS_AP_SNAPSHOT(m_commitAckMarkerPool); void execTC_COMMIT_ACK(Signal* signal); - void sendRemoveMarkers(Signal*, const CommitAckMarker *); + void sendRemoveMarkers(Signal*, CommitAckMarker *); void sendRemoveMarker(Signal* signal, NodeId nodeId, + Uint32 instanceKey, Uint32 transid1, Uint32 transid2); void removeMarkerForFailedAPI(Signal* signal, Uint32 nodeId, Uint32 bucket); === modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp' --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp 2011-07-05 12:46:07 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp 2012-10-17 14:43:50 +0000 @@ -116,6 +116,7 @@ void Dbtc::initRecords() m_commitAckMarkerPool.setSize(2 * capiConnectFilesize); m_commitAckMarkerHash.setSize(1024); + c_theCommitAckMarkerBufferPool.setSize(4 * capiConnectFilesize); hostRecord = (HostRecord*)allocRecord("HostRecord", sizeof(HostRecord), === modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2012-09-25 11:33:57 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2012-10-17 14:43:50 +0000 @@ -398,17 +398,6 @@ void Dbtc::execINCL_NODEREQ(Signal* sign } Uint32 Tnode = hostptr.i; - Uint32 lqhWorkers = getNodeInfo(Tnode).m_lqh_workers; - if (lqhWorkers == 1) - { - jam(); - hostptr.p->hostLqhBlockRef = numberToRef(DBLQH, 1, Tnode); - } - else - { - jam(); - hostptr.p->hostLqhBlockRef = numberToRef(DBLQH, Tnode); - } sendSignal(tblockref, GSN_INCL_NODECONF, signal, 2, JBB); @@ -894,16 +883,6 @@ void Dbtc::execREAD_NODESCONF(Signal* si con_lineNodes++; hostptr.p->hostStatus = HS_ALIVE; c_alive_nodes.set(i); - if (getNodeInfo(i).m_lqh_workers == 1) - { - jam(); - hostptr.p->hostLqhBlockRef = numberToRef(DBLQH, 1, i); - } - else - { - jam(); - hostptr.p->hostLqhBlockRef = numberToRef(DBLQH, i); - } if (!ndbd_deferred_unique_constraints(getNodeInfo(i).m_version)) { jam(); @@ -3066,11 +3045,14 @@ void Dbtc::execTCKEYREQ(Signal* signal) { regTcPtr->commitAckMarker = tmp.i; regApiPtr->commitAckMarker = tmp.i; + new (tmp.p) CommitAckMarker(); tmp.p->transid1 = tcKeyReq->transId1; tmp.p->transid2 = tcKeyReq->transId2; tmp.p->apiNodeId = refToNode(regApiPtr->ndbapiBlockref); tmp.p->apiConnectPtr = TapiIndex; - tmp.p->m_commit_ack_marker_nodes.clear(); + CommitAckMarkerBuffer::DataBufferPool & pool = + c_theCommitAckMarkerBufferPool; + LocalDataBuffer<5> head(pool, tmp.p->theDataBuffer); #if defined VM_TRACE || defined ERROR_INSERT { CommitAckMarkerPtr check; @@ -3822,7 +3804,6 @@ void Dbtc::sendlqhkeyreq(Signal* signal, handle.m_ptr[ LqhKeyReq::AttrInfoSectionNum ]= attrInfoSection; handle.m_cnt= 2; } - sendSignal(TBRef, GSN_LQHKEYREQ, signal, nextPos + LqhKeyReq::FixedSignalLength, JBB, &handle); @@ -4262,6 +4243,17 @@ void Dbtc::execSIGNAL_DROPPED_REP(Signal return; } +bool +Dbtc::CommitAckMarker::insert_in_commit_ack_marker(Dbtc *tc, + Uint32 instanceKey, + NodeId node_id) +{ + Uint32 item = instanceKey + (node_id << 16); + CommitAckMarkerBuffer::DataBufferPool & pool = + tc->c_theCommitAckMarkerBufferPool; + LocalDataBuffer<5> tmp(pool, this->theDataBuffer); + return tmp.append(&item, (Uint32)1); +} void Dbtc::execLQHKEYCONF(Signal* signal) { @@ -4408,7 +4400,6 @@ void Dbtc::execLQHKEYCONF(Signal* signal Uint32 commitAckMarker = regTcPtr->commitAckMarker; regTcPtr->commitAckMarker = RNIL; setApiConTimer(apiConnectptr.i, TtcTimer, __LINE__); - if (commitAckMarker != RNIL) { const Uint32 noOfLqhs = regTcPtr->noOfNodes; @@ -4419,7 +4410,16 @@ void Dbtc::execLQHKEYCONF(Signal* signal * Populate LQH array */ for(Uint32 i = 0; i < noOfLqhs; i++) - tmp->m_commit_ack_marker_nodes.set(regTcPtr->tcNodedata[i]); + { + jam(); + if (!tmp->insert_in_commit_ack_marker(this, + regTcPtr->lqhInstanceKey, + regTcPtr->tcNodedata[i])) + { + ndbout_c("Failed insert_in_commit_ack_marker"); + ; //RONM TODO error handling + } + } } if (regTcPtr->isIndexOp(regTcPtr->m_special_op_flags)) { jam(); @@ -4743,7 +4743,8 @@ void Dbtc::sendtckeyconf(Signal* signal, const BlockNumber TblockNum = refToBlock(regApiPtr->ndbapiBlockref); const Uint32 Tmarker = (regApiPtr->commitAckMarker == RNIL) ? 0 : 1; ptrAss(localHostptr, hostRecord); - UintR TcurrLen = localHostptr.p->noOfWordsTCKEYCONF; + struct PackedWordsContainer * container = &localHostptr.p->packTCKEYCONF; + UintR TcurrLen = container->noOfPackedWords; UintR confInfo = 0; TcKeyConf::setCommitFlag(confInfo, TcommitFlag == 1); TcKeyConf::setMarkerFlag(confInfo, Tmarker); @@ -4779,7 +4780,7 @@ void Dbtc::sendtckeyconf(Signal* signal, tc_clearbit(regApiPtr->m_flags, ApiConnectRecord::TF_EXEC_FLAG); } TcKeyConf::setNoOfOperations(confInfo, (TopWords >> 1)); - if ((TpacketLen + 1 /** gci_lo */ > 25) || !is_api){ + if ((TpacketLen + 1 /** gci_lo */ > 25) ||!is_api){ TcKeyConf * const tcKeyConf = (TcKeyConf *)signal->getDataPtrSend(); jam(); @@ -4810,6 +4811,8 @@ void Dbtc::sendtckeyconf(Signal* signal, // length - 3, since we have the real signal length plus one additional word // for the header we have to do - 4. // ------------------------------------------------------------------------- + container->noOfPackedWords = TcurrLen + TpacketLen + 1 /** gci_lo */; + UintR Tpack0 = (TblockNum << 16) + (TpacketLen - 4 + 1 /** gci_lo */); UintR Tpack1 = regApiPtr->ndbapiConnect; UintR Tpack2 = Uint32(regApiPtr->globalcheckpointid >> 32); @@ -4818,21 +4821,18 @@ void Dbtc::sendtckeyconf(Signal* signal, UintR Tpack5 = regApiPtr->transid[1]; UintR Tpack6 = Uint32(regApiPtr->globalcheckpointid); - localHostptr.p->noOfWordsTCKEYCONF = TcurrLen + TpacketLen + 1 /** gci_lo */; - - localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 0] = Tpack0; - localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 1] = Tpack1; - localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 2] = Tpack2; - localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 3] = Tpack3; - localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 4] = Tpack4; - localHostptr.p->packedWordsTCKEYCONF[TcurrLen + 5] = Tpack5; + container->packedWords[TcurrLen + 0] = Tpack0; + container->packedWords[TcurrLen + 1] = Tpack1; + container->packedWords[TcurrLen + 2] = Tpack2; + container->packedWords[TcurrLen + 3] = Tpack3; + container->packedWords[TcurrLen + 4] = Tpack4; + container->packedWords[TcurrLen + 5] = Tpack5; + + container->packedWords[TcurrLen + TpacketLen] = Tpack6; - UintR Ti; - for (Ti = 6; Ti < TpacketLen; Ti++) { - localHostptr.p->packedWordsTCKEYCONF[TcurrLen + Ti] = - regApiPtr->tcSendArray[Ti - 6]; + for (Uint32 Ti = 6; Ti < TpacketLen; Ti++) { + container->packedWords[TcurrLen + Ti] = regApiPtr->tcSendArray[Ti - 6]; }//for - localHostptr.p->packedWordsTCKEYCONF[TcurrLen + TpacketLen] = Tpack6; if (unlikely(!ndb_check_micro_gcp(getNodeInfo(localHostptr.i).m_version))) { @@ -4871,17 +4871,21 @@ void Dbtc::execSEND_PACKED(Signal* signa UintR TpackedListIndex = cpackedListIndex; jamEntry(); for (i = 0; i < TpackedListIndex; i++) { + jam(); Thostptr.i = cpackedList[i]; ptrAss(Thostptr, localHostRecord); arrGuard(Thostptr.i - 1, MAX_NODES - 1); - UintR TnoOfPackedWordsLqh = Thostptr.p->noOfPackedWordsLqh; - UintR TnoOfWordsTCKEYCONF = Thostptr.p->noOfWordsTCKEYCONF; - jam(); - if (TnoOfPackedWordsLqh > 0) { + for (Uint32 j = 0; j < NDB_ARRAY_SIZE(Thostptr.p->lqh_pack); j++) + { + struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[j]; jam(); - sendPackedSignalLqh(signal, Thostptr.p); - }//if - if (TnoOfWordsTCKEYCONF > 0) { + if (container->noOfPackedWords > 0) { + jam(); + sendPackedSignal(signal, container); + } + } + struct PackedWordsContainer * container = &Thostptr.p->packTCKEYCONF; + if (container->noOfPackedWords > 0) { jam(); sendPackedTCKEYCONF(signal, Thostptr.p, (Uint32)Thostptr.i); }//if @@ -4903,46 +4907,30 @@ Dbtc::updatePackedList(Signal* signal, H }//if }//Dbtc::updatePackedList() -void Dbtc::sendPackedSignalLqh(Signal* signal, HostRecord * ahostptr) +void Dbtc::sendPackedSignal(Signal* signal, + struct PackedWordsContainer * container) { - UintR Tj; - UintR TnoOfWords = ahostptr->noOfPackedWordsLqh; - for (Tj = 0; Tj < TnoOfWords; Tj += 4) { - UintR sig0 = ahostptr->packedWordsLqh[Tj + 0]; - UintR sig1 = ahostptr->packedWordsLqh[Tj + 1]; - UintR sig2 = ahostptr->packedWordsLqh[Tj + 2]; - UintR sig3 = ahostptr->packedWordsLqh[Tj + 3]; - signal->theData[Tj + 0] = sig0; - signal->theData[Tj + 1] = sig1; - signal->theData[Tj + 2] = sig2; - signal->theData[Tj + 3] = sig3; - }//for - ahostptr->noOfPackedWordsLqh = 0; - sendSignal(ahostptr->hostLqhBlockRef, + UintR TnoOfWords = container->noOfPackedWords; + ndbassert(TnoOfWords <= 25); + container->noOfPackedWords = 0; + memcpy(&signal->theData[0], &container->packedWords[0], 4 * TnoOfWords); + sendSignal(container->hostBlockRef, GSN_PACKED_SIGNAL, signal, TnoOfWords, JBB); -}//Dbtc::sendPackedSignalLqh() +}//Dbtc::sendPackedSignal() void Dbtc::sendPackedTCKEYCONF(Signal* signal, HostRecord * ahostptr, UintR hostId) { - UintR Tj; - UintR TnoOfWords = ahostptr->noOfWordsTCKEYCONF; + struct PackedWordsContainer * container = &ahostptr->packTCKEYCONF; + UintR TnoOfWords = container->noOfPackedWords; + ndbassert(TnoOfWords <= 25); + container->noOfPackedWords = 0; BlockReference TBref = numberToRef(API_PACKED, hostId); - for (Tj = 0; Tj < ahostptr->noOfWordsTCKEYCONF; Tj += 4) { - UintR sig0 = ahostptr->packedWordsTCKEYCONF[Tj + 0]; - UintR sig1 = ahostptr->packedWordsTCKEYCONF[Tj + 1]; - UintR sig2 = ahostptr->packedWordsTCKEYCONF[Tj + 2]; - UintR sig3 = ahostptr->packedWordsTCKEYCONF[Tj + 3]; - signal->theData[Tj + 0] = sig0; - signal->theData[Tj + 1] = sig1; - signal->theData[Tj + 2] = sig2; - signal->theData[Tj + 3] = sig3; - }//for - ahostptr->noOfWordsTCKEYCONF = 0; + memcpy(&signal->theData[0], &container->packedWords[0], 4 * TnoOfWords); sendSignal(TBref, GSN_TCKEYCONF, signal, TnoOfWords, JBB); }//Dbtc::sendPackedTCKEYCONF() @@ -5330,6 +5318,7 @@ Dbtc::sendCommitLqh(Signal* signal, { HostRecordPtr Thostptr; UintR ThostFilesize = chostFilesize; + Uint32 instanceKey = regTcPtr->lqhInstanceKey; ApiConnectRecord * const regApiPtr = apiConnectptr.p; Thostptr.i = regTcPtr->lastLqhNodeId; ptrCheckGuard(Thostptr, ThostFilesize, hostRecord); @@ -5352,20 +5341,18 @@ Dbtc::sendCommitLqh(Signal* signal, ndbassert(Tdata[4] == 0 || getNodeInfo(Thostptr.i).m_version == 0); len = 4; } - - // currently packed signal cannot address specific instance - const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers > 1; - if (send_unpacked) { + if (instanceKey > MAX_NDBMT_LQH_THREADS) { memcpy(&signal->theData[0], &Tdata[0], len << 2); - Uint32 instanceKey = regTcPtr->lqhInstanceKey; BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode); sendSignal(lqhRef, GSN_COMMIT, signal, len, JBB); return ret; } - if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) { + struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey]; + + if (container->noOfPackedWords > 25 - len) { jam(); - sendPackedSignalLqh(signal, Thostptr.p); + sendPackedSignal(signal, container); } else { jam(); ret = 1; @@ -5373,10 +5360,10 @@ Dbtc::sendCommitLqh(Signal* signal, } Tdata[0] |= (ZCOMMIT << 28); - UintR Tindex = Thostptr.p->noOfPackedWordsLqh; - UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex]; + UintR Tindex = container->noOfPackedWords; + container->noOfPackedWords = Tindex + len; + UintR* TDataPtr = &container->packedWords[Tindex]; memcpy(TDataPtr, &Tdata[0], len << 2); - Thostptr.p->noOfPackedWordsLqh = Tindex + len; return ret; } @@ -5726,8 +5713,9 @@ Dbtc::sendCompleteLqh(Signal* signal, { HostRecordPtr Thostptr; UintR ThostFilesize = chostFilesize; + Uint32 instanceKey = regTcPtr->lqhInstanceKey; ApiConnectRecord * const regApiPtr = apiConnectptr.p; - Thostptr.i = regTcPtr->lastLqhNodeId; //last??? + Thostptr.i = regTcPtr->lastLqhNodeId; ptrCheckGuard(Thostptr, ThostFilesize, hostRecord); Uint32 Tnode = Thostptr.i; @@ -5740,19 +5728,18 @@ Dbtc::sendCompleteLqh(Signal* signal, Tdata[2] = regApiPtr->transid[1]; Uint32 len = 3; - // currently packed signal cannot address specific instance - const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers > 1; - if (send_unpacked) { + if (instanceKey > MAX_NDBMT_LQH_THREADS) { memcpy(&signal->theData[0], &Tdata[0], len << 2); - Uint32 instanceKey = regTcPtr->lqhInstanceKey; BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode); sendSignal(lqhRef, GSN_COMPLETE, signal, 3, JBB); return ret; } - - if (Thostptr.p->noOfPackedWordsLqh > 22) { + + struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey]; + + if (container->noOfPackedWords > 22) { jam(); - sendPackedSignalLqh(signal, Thostptr.p); + sendPackedSignal(signal, container); } else { jam(); ret = 1; @@ -5760,11 +5747,10 @@ Dbtc::sendCompleteLqh(Signal* signal, } Tdata[0] |= (ZCOMPLETE << 28); - UintR Tindex = Thostptr.p->noOfPackedWordsLqh; - UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex]; + UintR Tindex = container->noOfPackedWords; + container->noOfPackedWords = Tindex + len; + UintR* TDataPtr = &container->packedWords[Tindex]; memcpy(TDataPtr, &Tdata[0], len << 2); - Thostptr.p->noOfPackedWordsLqh = Tindex + len; - return ret; } @@ -5852,6 +5838,7 @@ Dbtc::sendFireTrigReqLqh(Signal* signal, { HostRecordPtr Thostptr; UintR ThostFilesize = chostFilesize; + Uint32 instanceKey = regTcPtr.p->lqhInstanceKey; ApiConnectRecord * const regApiPtr = apiConnectptr.p; Thostptr.i = regTcPtr.p->tcNodedata[0]; ptrCheckGuard(Thostptr, ThostFilesize, hostRecord); @@ -5868,19 +5855,18 @@ Dbtc::sendFireTrigReqLqh(Signal* signal, req->pass = pass; Uint32 len = FireTrigReq::SignalLength; - // currently packed signal cannot address specific instance - const bool send_unpacked = getNodeInfo(Thostptr.i).m_lqh_workers > 1; - if (send_unpacked) { + if (instanceKey > MAX_NDBMT_LQH_THREADS) { memcpy(signal->theData, Tdata, len << 2); - Uint32 instanceKey = regTcPtr.p->lqhInstanceKey; BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode); sendSignal(lqhRef, GSN_FIRE_TRIG_REQ, signal, len, JBB); return ret; } - if (Thostptr.p->noOfPackedWordsLqh > 25 - len) { + struct PackedWordsContainer * container = &Thostptr.p->lqh_pack[instanceKey]; + + if (container->noOfPackedWords > 25 - len) { jam(); - sendPackedSignalLqh(signal, Thostptr.p); + sendPackedSignal(signal, container); } else { jam(); ret = 1; @@ -5888,10 +5874,10 @@ Dbtc::sendFireTrigReqLqh(Signal* signal, } Tdata[0] |= (ZFIRE_TRIG_REQ << 28); - UintR Tindex = Thostptr.p->noOfPackedWordsLqh; - UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex]; + UintR Tindex = container->noOfPackedWords; + container->noOfPackedWords = Tindex + len; + UintR* TDataPtr = &container->packedWords[Tindex]; memcpy(TDataPtr, Tdata, len << 2); - Thostptr.p->noOfPackedWordsLqh = Tindex + len; return ret; } @@ -6029,23 +6015,33 @@ Dbtc::execTC_COMMIT_ACK(Signal* signal){ } void -Dbtc::sendRemoveMarkers(Signal* signal, const CommitAckMarker * marker) +Dbtc::sendRemoveMarkers(Signal* signal, CommitAckMarker * marker) { jam(); const Uint32 transId1 = marker->transid1; const Uint32 transId2 = marker->transid2; - - for(Uint32 node_id = 1; node_id < MAX_NDB_NODES; node_id++) + + CommitAckMarkerBuffer::DataBufferPool & pool = + c_theCommitAckMarkerBufferPool; + LocalDataBuffer<5> commitAckMarkers(pool, marker->theDataBuffer); + CommitAckMarkerBuffer::DataBufferIterator iter; + bool next_flag = commitAckMarkers.first(iter); + while (next_flag) { jam(); - if (marker->m_commit_ack_marker_nodes.get(node_id)) - sendRemoveMarker(signal, node_id, transId1, transId2); + Uint32 dataWord = *iter.data; + Uint32 nodeId = dataWord >> 16; + Uint32 instanceKey = dataWord & 0xFFFF; + sendRemoveMarker(signal, nodeId, instanceKey, transId1, transId2); + next_flag = commitAckMarkers.next(iter, 1); } + commitAckMarkers.release(); } void Dbtc::sendRemoveMarker(Signal* signal, NodeId nodeId, + Uint32 instanceKey, Uint32 transid1, Uint32 transid2){ /** @@ -6062,38 +6058,32 @@ Dbtc::sendRemoveMarker(Signal* signal, Tdata[2] = transid2; Uint32 len = 3; - // currently packed signals can not address specific instance - Uint32 cnt_workers = getNodeInfo(hostPtr.i).m_lqh_workers; - bool send_unpacked = cnt_workers > 1; - if (send_unpacked) { + if (instanceKey > MAX_NDBMT_LQH_THREADS) { jam(); // first word omitted memcpy(&signal->theData[0], &Tdata[1], (len - 1) << 2); Uint32 Tnode = hostPtr.i; - Uint32 i; - for (i = 0; i < cnt_workers; i++) { - // wl4391_todo skip workers not part of tx - Uint32 instanceKey = 1 + i; - BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode); - sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, len - 1, JBB); - } + BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode); + sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, len - 1, JBB); return; } - if (hostPtr.p->noOfPackedWordsLqh > (25 - 3)){ + struct PackedWordsContainer * container = &hostPtr.p->lqh_pack[instanceKey]; + + if (container->noOfPackedWords > (25 - 3)){ jam(); - sendPackedSignalLqh(signal, hostPtr.p); + sendPackedSignal(signal, container); } else { jam(); updatePackedList(signal, hostPtr.p, hostPtr.i); } - UintR numWord = hostPtr.p->noOfPackedWordsLqh; - UintR* dataPtr = &hostPtr.p->packedWordsLqh[numWord]; + UintR numWord = container->noOfPackedWords; + UintR* dataPtr = &container->packedWords[numWord]; + container->noOfPackedWords = numWord + len; Tdata[0] |= (ZREMOVE_MARKER << 28); memcpy(dataPtr, &Tdata[0], len << 2); - hostPtr.p->noOfPackedWordsLqh = numWord + 3; } void Dbtc::execCOMPLETED(Signal* signal) @@ -6510,6 +6500,11 @@ void Dbtc::clearCommitAckMarker(ApiConne regApiPtr->commitAckMarker = RNIL; tc_clearbit(regApiPtr->m_flags, ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED); + CommitAckMarkerBuffer::DataBufferPool & pool = + c_theCommitAckMarkerBufferPool; + CommitAckMarker * tmp = m_commitAckMarkerHash.getPtr(commitAckMarker); + LocalDataBuffer<5> commitAckMarkers(pool, tmp->theDataBuffer); + commitAckMarkers.release(); m_commitAckMarkerHash.release(commitAckMarker); } } @@ -8919,7 +8914,19 @@ void Dbtc::execLQH_TRANSCONF(Signal* sig } } - findApiConnectFail(signal); + Uint32 instanceKey; + + if (unlikely(signal->getLength() < LqhTransConf::SignalLength_FRAG_ID)) + { + jam(); + instanceKey = 0; + } + else + { + jam(); + instanceKey = getInstanceKey(tableId, fragId); + } + findApiConnectFail(signal, instanceKey); if(apiConnectptr.p->ndbapiBlockref == 0 && tapplRef != 0){ apiConnectptr.p->ndbapiBlockref = ref; @@ -8929,19 +8936,6 @@ void Dbtc::execLQH_TRANSCONF(Signal* sig if (ttransStatus != LqhTransConf::Marker) { jam(); - - Uint32 instanceKey; - - if (unlikely(signal->getLength() < LqhTransConf::SignalLength_FRAG_ID)) - { - jam(); - instanceKey = 0; - } - else - { - jam(); - instanceKey = getInstanceKey(tableId, fragId); - } findTcConnectFail(signal, instanceKey); } }//Dbtc::execLQH_TRANSCONF() @@ -9217,6 +9211,11 @@ Dbtc::sendTCKEY_FAILREF(Signal* signal, if(marker != RNIL) { jam(); + CommitAckMarkerBuffer::DataBufferPool & pool = + c_theCommitAckMarkerBufferPool; + CommitAckMarker * tmp = m_commitAckMarkerHash.getPtr(marker); + LocalDataBuffer<5> commitAckMarkers(pool, tmp->theDataBuffer); + commitAckMarkers.release(); m_commitAckMarkerHash.release(marker); regApiPtr->commitAckMarker = RNIL; } @@ -9767,7 +9766,7 @@ void Dbtc::toCompleteHandlingLab(Signal* /* YET THEN SEIZE A NEW API CONNECT RECORD AND LINK IT */ /* INTO THE HASH TABLE. */ /*------------------------------------------------------------*/ -void Dbtc::findApiConnectFail(Signal* signal) +void Dbtc::findApiConnectFail(Signal* signal, Uint32 instanceKey) { ApiConnectRecordPtr fafPrevApiConnectptr; ApiConnectRecordPtr fafNextApiConnectptr; @@ -9798,7 +9797,7 @@ FAF_LOOP: fafPrevApiConnectptr.p->nextApiConnect = apiConnectptr.i; }//if apiConnectptr.p->nextApiConnect = RNIL; - initApiConnectFail(signal); + initApiConnectFail(signal, instanceKey); } else { jam(); fafPrevApiConnectptr.i = fafNextApiConnectptr.i; @@ -9811,7 +9810,7 @@ FAF_LOOP: (apiConnectptr.p->transid[0] != ttransid1)) { goto FAF_LOOP; }//if - updateApiStateFail(signal); + updateApiStateFail(signal, instanceKey); }//if }//Dbtc::findApiConnectFail() @@ -9854,7 +9853,7 @@ void Dbtc::findTcConnectFail(Signal* sig /*----------------------------------------------------------*/ /* INITIALISE AN API CONNECT FAIL RECORD */ /*----------------------------------------------------------*/ -void Dbtc::initApiConnectFail(Signal* signal) +void Dbtc::initApiConnectFail(Signal* signal, Uint32 instanceKey) { apiConnectptr.p->transid[0] = ttransid1; apiConnectptr.p->transid[1] = ttransid2; @@ -9899,14 +9898,17 @@ void Dbtc::initApiConnectFail(Signal* si m_commitAckMarkerHash.seize(tmp); ndbrequire(tmp.i != RNIL); - apiConnectptr.p->commitAckMarker = tmp.i; + + new (tmp.p) CommitAckMarker(); tmp.p->transid1 = ttransid1; tmp.p->transid2 = ttransid2; tmp.p->apiNodeId = refToNode(tapplRef); - tmp.p->m_commit_ack_marker_nodes.clear(); - tmp.p->m_commit_ack_marker_nodes.set(tnodeid); tmp.p->apiConnectPtr = apiConnectptr.i; + CommitAckMarkerBuffer::DataBufferPool & pool = + c_theCommitAckMarkerBufferPool; + LocalDataBuffer<5> head(pool, tmp.p->theDataBuffer); + ndbrequire(tmp.p->insert_in_commit_ack_marker(this, instanceKey, tnodeid)); #if defined VM_TRACE || defined ERROR_INSERT { @@ -10052,7 +10054,7 @@ void Dbtc::setupFailData(Signal* signal) /*----------------------------------------------------------*/ /* UPDATE THE STATE OF THE API CONNECT FOR THIS PART. */ /*----------------------------------------------------------*/ -void Dbtc::updateApiStateFail(Signal* signal) +void Dbtc::updateApiStateFail(Signal* signal, Uint32 instanceKey) { if(LqhTransConf::getMarkerFlag(treqinfo)) { @@ -10065,12 +10067,15 @@ void Dbtc::updateApiStateFail(Signal* si m_commitAckMarkerHash.seize(tmp); ndbrequire(tmp.i != RNIL); + new (tmp.p) CommitAckMarker(); apiConnectptr.p->commitAckMarker = tmp.i; tmp.p->transid1 = ttransid1; tmp.p->transid2 = ttransid2; tmp.p->apiNodeId = refToNode(tapplRef); tmp.p->apiConnectPtr = apiConnectptr.i; - tmp.p->m_commit_ack_marker_nodes.clear(); + CommitAckMarkerBuffer::DataBufferPool & pool = + c_theCommitAckMarkerBufferPool; + LocalDataBuffer<5> head(pool, tmp.p->theDataBuffer); #if defined VM_TRACE || defined ERROR_INSERT { CommitAckMarkerPtr check; @@ -10086,7 +10091,7 @@ void Dbtc::updateApiStateFail(Signal* si ndbassert(tmp.p->transid1 == ttransid1); ndbassert(tmp.p->transid2 == ttransid2); } - tmp.p->m_commit_ack_marker_nodes.set(tnodeid); + ndbrequire(tmp.p->insert_in_commit_ack_marker(this, instanceKey, tnodeid)); } switch (ttransStatus) { @@ -12614,9 +12619,15 @@ void Dbtc::inithost(Signal* signal) hostptr.p->hostStatus = HS_DEAD; hostptr.p->inPackedList = false; hostptr.p->lqhTransStatus = LTS_IDLE; - hostptr.p->noOfWordsTCKEYCONF = 0; - hostptr.p->noOfPackedWordsLqh = 0; - hostptr.p->hostLqhBlockRef = calcLqhBlockRef(hostptr.i); + struct PackedWordsContainer * containerTCKEYCONF = + &hostptr.p->packTCKEYCONF; + containerTCKEYCONF->noOfPackedWords = 0; + for (Uint32 i = 0; i < NDB_ARRAY_SIZE(hostptr.p->lqh_pack); i++) + { + struct PackedWordsContainer * container = &hostptr.p->lqh_pack[i]; + container->noOfPackedWords = 0; + container->hostBlockRef = numberToRef(DBLQH, i, hostptr.i); + } hostptr.p->m_nf_bits = 0; }//for c_alive_nodes.clear(); @@ -12875,6 +12886,11 @@ void Dbtc::releaseAbortResources(Signal* if (marker != RNIL) { jam(); + CommitAckMarkerBuffer::DataBufferPool & pool = + c_theCommitAckMarkerBufferPool; + CommitAckMarker * tmp = m_commitAckMarkerHash.getPtr(marker); + LocalDataBuffer<5> commitAckMarkers(pool, tmp->theDataBuffer); + commitAckMarkers.release(); m_commitAckMarkerHash.release(marker); apiConnectptr.p->commitAckMarker = RNIL; } @@ -13131,16 +13147,27 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) CommitAckMarkerIterator iter; for(m_commitAckMarkerHash.first(iter); iter.curr.i != RNIL; m_commitAckMarkerHash.next(iter)){ + Uint32 data[4]; + data[0] = data[1] = data[2] = data[3] = 0; + CommitAckMarkerBuffer::DataBufferPool & pool = + c_theCommitAckMarkerBufferPool; + LocalDataBuffer<5> commitAckMarkers(pool, iter.curr.p->theDataBuffer); + CommitAckMarkerBuffer::DataBufferIterator data_buf_iter; + bool next_flag = commitAckMarkers.first(data_buf_iter); + for (Uint32 i = 0; i < 4; i++) + { + if (!next_flag) + break; + data[i] = *data_buf_iter.data; + next_flag = commitAckMarkers.next(data_buf_iter); + } infoEvent("CommitAckMarker: i = %d (0x%x, 0x%x)" " Api: %d %x %x %x %x bucket = %d", iter.curr.i, iter.curr.p->transid1, iter.curr.p->transid2, iter.curr.p->apiNodeId, - iter.curr.p->m_commit_ack_marker_nodes.getWord(0), - iter.curr.p->m_commit_ack_marker_nodes.getWord(1), - iter.curr.p->m_commit_ack_marker_nodes.getWord(2), - iter.curr.p->m_commit_ack_marker_nodes.getWord(3), + data[0], data[1], data[2], data[3], iter.bucket); } return; === modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp' --- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2012-06-21 15:24:52 +0000 +++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2012-10-17 14:43:50 +0000 @@ -103,6 +103,12 @@ struct Block_context class Ndbd_mem_manager& m_mm; }; +struct PackedWordsContainer +{ + BlockReference hostBlockRef; + Uint32 noOfPackedWords; + Uint32 packedWords[30]; +}; // 128 bytes class SimulatedBlock { friend class TraceLCP; friend class SafeCounter; No bundle (reason: useless for push emails).