From: Date: October 3 2008 12:09am Subject: bzr push into mysql-5.1 branch (jonas:2875) List-Archive: http://lists.mysql.com/commits/55151 Message-Id: <20081002220906.2F16E91950A@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 2875 Jonas Oreland 2008-10-02 ndbmtd - fix ndb_add_partition, 1) fix thread unsafty in dbtupbuffer 2) fix various minor tc/dih bugs 3) fix correct hashmap/fragments for unique index 4) detach LQHFRAGREQ from CREATE_TABLE in proxy modified: storage/ndb/include/kernel/signaldata/DiGetNodes.hpp storage/ndb/include/kernel/signaldata/DictTabInfo.hpp storage/ndb/src/kernel/blocks/backup/Backup.cpp storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp storage/ndb/src/kernel/blocks/suma/Suma.cpp === modified file 'storage/ndb/include/kernel/signaldata/DiGetNodes.hpp' --- a/storage/ndb/include/kernel/signaldata/DiGetNodes.hpp 2008-08-11 11:44:42 +0000 +++ b/storage/ndb/include/kernel/signaldata/DiGetNodes.hpp 2008-10-02 21:58:44 +0000 @@ -32,13 +32,12 @@ struct DiGetNodesConf { */ friend class Dbdih; - STATIC_CONST( SignalLength = 4 + MAX_REPLICAS ); + STATIC_CONST( SignalLength = 3 + MAX_REPLICAS ); STATIC_CONST( REORG_MOVING = 0x80000000); Uint32 zero; Uint32 fragId; Uint32 reqinfo; - Uint32 instanceKey; Uint32 nodes[MAX_REPLICAS]; //+1 }; /** === modified file 'storage/ndb/include/kernel/signaldata/DictTabInfo.hpp' --- a/storage/ndb/include/kernel/signaldata/DictTabInfo.hpp 2008-06-05 20:25:12 +0000 +++ b/storage/ndb/include/kernel/signaldata/DictTabInfo.hpp 2008-10-02 21:58:44 +0000 @@ -183,8 +183,7 @@ public: DistrKeyHash = 4, DistrKeyLin = 5, UserDefined = 6, - DistrKeyUniqueHashIndex = 7, - DistrKeyOrderedIndex = 8, + DistrKeyOrderedIndex = 8, // alias HashMapPartition = 9 }; === modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp' --- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-10-02 17:51:17 +0000 +++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-10-02 21:58:44 +0000 @@ -3917,7 +3917,7 @@ Backup::execTRANSID_AI(Signal* signal) const Uint32 filePtrI = signal->theData[0]; //const Uint32 transId1 = signal->theData[1]; //const Uint32 transId2 = signal->theData[2]; - const Uint32 dataLen = signal->length() - 3; + Uint32 dataLen = signal->length() - 3; BackupFilePtr filePtr LINT_SET_PTR; c_backupFilePool.getPtr(filePtr, filePtrI); @@ -3927,14 +3927,30 @@ Backup::execTRANSID_AI(Signal* signal) /** * Unpack data */ + Uint32 * dst = op.dst; + if (signal->getNoOfSections() == 0) + { + jam(); + const Uint32 * src = &signal->theData[3]; + * dst = htonl(dataLen); + memcpy(dst + 1, src, 4*dataLen); + } + else + { + jam(); + SectionHandle handle(this, signal); + SegmentedSectionPtr dataPtr; + handle.getSection(dataPtr, 0); + dataLen = dataPtr.sz; + + * dst = htonl(dataLen); + copy(dst + 1, dataPtr); + releaseSections(handle); + } + op.attrSzTotal += dataLen; ndbrequire(dataLen < op.maxRecordSize); - const Uint32 * src = &signal->theData[3]; - Uint32 * dst = op.dst; - - * dst = htonl(dataLen); - memcpy(dst + 1, src, 4*dataLen); op.finished(dataLen); op.newRecord(dst + dataLen + 1); === modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp' --- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-10-02 17:51:17 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2008-10-02 21:58:44 +0000 @@ -4277,8 +4277,9 @@ void Dbdict::handleTabInfoInit(SimplePro if (fragments == 0) { jam(); - fragments = get_default_fragments(); + tablePtr.p->fragmentCount = fragments = get_default_fragments(); } + char buf[MAX_TAB_NAME_SIZE+1]; BaseString::snprintf(buf, sizeof(buf), "DEFAULT-HASHMAP-%u-%u", NDB_DEFAULT_HASHMAP_BUCKTETS, @@ -4858,8 +4859,7 @@ Dbdict::create_fragmentation(Signal* sig * and distributed in the same manner but has always a normal hash * fragmentation. */ - frag_req->primaryTableId = tabPtr.p->primaryTableId; - frag_req->fragmentationType = DictTabInfo::DistrKeyUniqueHashIndex; + frag_req->primaryTableId = RNIL; } else { === modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-10-01 06:47:23 +0000 +++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-10-02 21:58:44 +0000 @@ -1222,14 +1222,12 @@ void Dbdih::execREAD_CONFIG_REQ(Signal* ndbrequireErr(!ndb_mgm_get_int_parameter(p, CFG_DIH_TABLE, &ctabFileSize), NDBD_EXIT_INVALID_CONFIG); - Uint32 workers = 1; if (isNdbMtLqh()) { jam(); - ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_THREADS, &workers); - c_fragments_per_node = workers; + c_fragments_per_node = getLqhWorkers(); } - ndbout_c("Using %u fragments per node", workers); + ndbout_c("Using %u fragments per node", c_fragments_per_node); cfileFileSize = (2 * ctabFileSize) + 2; initRecords(); @@ -6916,7 +6914,8 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ const Uint32 flags = req->requestInfo; Uint32 err = 0; - const Uint32 defaultFragments = cnoOfNodeGroups * cnoReplicas; + const Uint32 defaultFragments = + c_fragments_per_node * cnoOfNodeGroups * cnoReplicas; do { NodeGroupRecordPtr NGPtr; @@ -7273,7 +7272,6 @@ void Dbdih::execDIADDTABREQ(Signal* sign jam(); tabPtr.p->method = TabRecord::NORMAL_HASH; break; - case DictTabInfo::DistrKeyUniqueHashIndex: case DictTabInfo::DistrKeyOrderedIndex: { TabRecordPtr primTabPtr; @@ -7340,6 +7338,9 @@ void Dbdih::execDIADDTABREQ(Signal* sign jam(); tabPtr.p->m_map_ptr_i = req->hashMapPtrI; tabPtr.p->m_new_map_ptr_i = RNIL; + Ptr mapPtr; + g_hash_map.getPtr(mapPtr, tabPtr.p->m_map_ptr_i); + ndbrequire(tabPtr.p->totalfragments >= mapPtr.p->m_fragments); } Uint32 index = 2; @@ -8256,11 +8257,11 @@ void Dbdih::execDIGETNODESREQ(Signal* si getFragstore(tabPtr.p, fragId, fragPtr); Uint32 nodeCount = extractNodeInfo(fragPtr.p, conf->nodes); Uint32 sig2 = (nodeCount - 1) + - (fragPtr.p->distributionKey << 16); + (fragPtr.p->distributionKey << 16) + + (dihGetInstanceKey(fragPtr) << 24); conf->zero = 0; conf->reqinfo = sig2; conf->fragId = fragId; - conf->instanceKey = dihGetInstanceKey(fragPtr); if (unlikely(newFragId != RNIL)) { @@ -8270,7 +8271,8 @@ void Dbdih::execDIGETNODESREQ(Signal* si nodeCount = extractNodeInfo(fragPtr.p, conf->nodes + 2 + MAX_REPLICAS); conf->nodes[MAX_REPLICAS] = newFragId; conf->nodes[MAX_REPLICAS + 1] = (nodeCount - 1) + - (fragPtr.p->distributionKey << 16); + (fragPtr.p->distributionKey << 16) + + (dihGetInstanceKey(fragPtr) << 24); } }//Dbdih::execDIGETNODESREQ() === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-10-01 06:53:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-10-02 21:58:44 +0000 @@ -10358,7 +10358,7 @@ Uint32 Dblqh::sendKeyinfo20(Signal* sign */ const BlockReference ref = scanP->scanApiBlockref; const Uint32 scanOp = scanP->m_curr_batch_size_rows; - const Uint32 nodeId = refToNode(ref); + Uint32 nodeId = refToNode(ref); const bool connectedToNode = getNodeInfo(nodeId).m_connected; #ifdef NOT_USED const Uint32 type = getNodeInfo(nodeId).m_type; @@ -10370,7 +10370,23 @@ Uint32 Dblqh::sendKeyinfo20(Signal* sign Uint32 * dst = keyInfo->keyData; dst += nodeId == getOwnNodeId() ? 0 : KeyInfo20::DataLength; - Uint32 keyLen = readPrimaryKeys(scanP, tcConP, dst); + /** + * This is ugly :-( + * currently only SUMA receives KEYINFO20 inside kernel.. + * and it's not really interested in the actual keyinfo, + * only the scanInfo_Node...so send only that and avoid + * messing with if's below... + */ + Uint32 keyLen ; + if (refToMain(ref) == SUMA && nodeId == getOwnNodeId()) + { + keyLen = 0; + } + else + { + keyLen = readPrimaryKeys(scanP, tcConP, dst); + } + Uint32 fragId = tcConP->fragmentid; keyInfo->clientOpPtr = scanP->scanApiOpPtr; keyInfo->keyLen = keyLen; @@ -10384,6 +10400,11 @@ Uint32 Dblqh::sendKeyinfo20(Signal* sign { jam(); + if (isNdbMtLqh() && instance() != refToInstance(ref)) + { + jam(); + nodeId = 0; // prevent execute direct + } if (nodeId == getOwnNodeId()) { EXECUTE_DIRECT(refToBlock(ref), GSN_KEYINFO20, signal, === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-09-16 18:33:52 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-10-02 21:58:44 +0000 @@ -32,8 +32,6 @@ DblqhProxy::DblqhProxy(Block_context& ct // GSN_LQHFRAGREQ addRecSignal(GSN_LQHFRAGREQ, &DblqhProxy::execLQHFRAGREQ); - addRecSignal(GSN_LQHFRAGCONF, &DblqhProxy::execLQHFRAGCONF); - addRecSignal(GSN_LQHFRAGREF, &DblqhProxy::execLQHFRAGREF); // GSN_TAB_COMMITREQ addRecSignal(GSN_TAB_COMMITREQ, &DblqhProxy::execTAB_COMMITREQ); @@ -273,92 +271,17 @@ DblqhProxy::sendLQHADDATTCONF(Signal* si ssRelease(ssId); } -// GSN_LQHFRAGREQ [ sub-op ] +// GSN_LQHFRAGREQ [ pass-through ] void DblqhProxy::execLQHFRAGREQ(Signal* signal) { - Ss_LQHFRAGREQ& ss = ssSeize(1); // lost connection - - const LqhFragReq* req = (const LqhFragReq*)signal->getDataPtr(); - ss.m_req = *req; - sendREQ(signal, ss); -} - -void -DblqhProxy::sendLQHFRAGREQ(Signal* signal, Uint32 ssId) -{ - Ss_LQHFRAGREQ& ss = ssFind(ssId); - LqhFragReq* req = (LqhFragReq*)signal->getDataPtrSend(); - *req = ss.m_req; - - NdbLogPartInfo lpinfo(workerInstance(ss.m_worker)); - Uint32 logPartNo = lpinfo.partNoFromId(req->logPartId); - if (!lpinfo.partNoOwner(logPartNo)) { - jam(); - skipReq(ss); - return; - } - - req->senderRef = reference(); - req->senderData = ssId; - sendSignal(workerRef(ss.m_worker), GSN_LQHFRAGREQ, - signal, LqhFragReq::SignalLength, JBB); -} - -void -DblqhProxy::execLQHFRAGCONF(Signal* signal) -{ - const LqhFragConf* conf = (const LqhFragConf*)signal->getDataPtr(); - Uint32 ssId = conf->senderData; - Ss_LQHFRAGREQ& ss = ssFind(ssId); - recvCONF(signal, ss); -} - -void -DblqhProxy::execLQHFRAGREF(Signal* signal) -{ - const LqhFragRef* ref = (const LqhFragRef*)signal->getDataPtr(); - Uint32 ssId = ref->senderData; - Ss_LQHFRAGREQ& ss = ssFind(ssId); - recvREF(signal, ss, ref->errorCode); -} - -void -DblqhProxy::sendLQHFRAGCONF(Signal* signal, Uint32 ssId) -{ - Ss_LQHFRAGREQ& ss = ssFind(ssId); - Ss_CREATE_TAB_REQ& ss_main = ssFind(ssId); - BlockReference dictRef = ss_main.m_req.senderRef; - - if (!lastReply(ss)) - return; - - if (ss.m_error == 0) { - LqhFragConf* conf = (LqhFragConf*)signal->getDataPtrSend(); - conf->senderData = ss.m_req.senderData; - conf->lqhFragPtr = RNIL; //wl4391_todo - conf->tableId = ss.m_req.tableId; - conf->fragId = ss.m_req.fragId; - conf->changeMask = 0; - sendSignal(dictRef, GSN_LQHFRAGCONF, - signal, LqhFragConf::SignalLength, JBB); - } else { - jam(); - LqhFragRef* ref = (LqhFragRef*)signal->getDataPtrSend(); - ref->senderData = ss.m_req.senderData; - ref->errorCode = ss.m_error; - ref->tableId = ss.m_req.tableId; - ref->fragId = ss.m_req.fragId; - ref->requestInfo = 0; - ref->changeMask = 0; - sendSignal(dictRef, GSN_LQHFRAGREF, - signal, LqhFragRef::SignalLength, JBB); - ssRelease(ssId); - } + Uint32 instance = getInstanceKey(req->tableId, req->fragId); - ssRelease(ssId); + // wl4391_todo impl. method that fakes senders block-ref + sendSignal(numberToRef(DBLQH, instance, getOwnNodeId()), + GSN_LQHFRAGREQ, signal, LqhFragReq::SignalLength, JBB); } // GSN_TAB_COMMITREQ === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-09-16 18:33:52 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-10-02 21:58:44 +0000 @@ -82,24 +82,8 @@ protected: void execLQHADDATTREF(Signal*); void sendLQHADDATTCONF(Signal*, Uint32 ssId); - // GSN_LQHFRAGREQ [ sub-op ] - struct Ss_LQHFRAGREQ : SsParallel { - LqhFragReq m_req; - Ss_LQHFRAGREQ() { - m_sendREQ = (SsFUNC)&DblqhProxy::sendLQHFRAGREQ; - m_sendCONF = (SsFUNC)&DblqhProxy::sendLQHFRAGCONF; - } - enum { poolSize = 1 }; - static SsPool& pool(LocalProxy* proxy) { - return ((DblqhProxy*)proxy)->c_ss_LQHFRAGREQ; - } - }; - SsPool c_ss_LQHFRAGREQ; + // GSN_LQHFRAGREQ [ pass-through ] void execLQHFRAGREQ(Signal*); - void sendLQHFRAGREQ(Signal*, Uint32 ssId); - void execLQHFRAGCONF(Signal*); - void execLQHFRAGREF(Signal*); - void sendLQHFRAGCONF(Signal*, Uint32 ssId); // GSN_TAB_COMMITREQ [ sub-op ] struct Ss_TAB_COMMITREQ : SsParallel { === modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-10-01 06:36:52 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-10-02 21:58:44 +0000 @@ -3083,8 +3083,7 @@ void Dbtc::tckeyreq050Lab(Signal* signal regTcPtr->tcNodedata[2] = Tdata5; regTcPtr->tcNodedata[3] = Tdata6; - UintR Tdata7 = conf->instanceKey; - regTcPtr->lqhInstanceKey = Tdata7; + regTcPtr->lqhInstanceKey = (Tdata2 >> 24) & 127;// 1 bit used for reorg moving Uint8 Toperation = regTcPtr->operation; Uint8 Tdirty = regTcPtr->dirtyOp; === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp 2008-09-16 18:29:47 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp 2008-10-02 21:58:44 +0000 @@ -250,22 +250,20 @@ void Dbtup::sendReadAttrinfo(Signal* sig * in LCP case since user-backup uses single worker. */ BlockNumber blockMain = blockToMain(block); + const bool sameInstance = blockToInstance(block) == instance(); if (blockMain == DBLQH) { EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex); jamEntry(); } - else if (blockMain == SUMA) + else if (blockMain == SUMA && sameInstance) { - // wl4391_todo not MT safe - EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex, 0); + EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex); jamEntry(); } - else if (blockMain == BACKUP) + else if (blockMain == BACKUP && sameInstance) { - Uint32 iNo = blockToInstance(block); - // wl4391_todo maybe not MT safe in non-LCP case - EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex, iNo); + EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex); jamEntry(); } else === modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp' --- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2008-10-02 17:51:17 +0000 +++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2008-10-02 21:58:44 +0000 @@ -3415,7 +3415,7 @@ Suma::execTRANSID_AI(Signal* signal) CRASH_INSERTION(13015); TransIdAI * const data = (TransIdAI*)signal->getDataPtr(); const Uint32 opPtrI = data->connectPtr; - const Uint32 length = signal->length() - 3; + Uint32 length = signal->length() - 3; if(f_bufferLock == 0){ f_bufferLock = opPtrI; @@ -3423,6 +3423,16 @@ Suma::execTRANSID_AI(Signal* signal) ndbrequire(f_bufferLock == opPtrI); } + if (signal->getNoOfSections()) + { + SectionHandle handle(this, signal); + SegmentedSectionPtr dataPtr; + handle.getSection(dataPtr, 0); + length = dataPtr.sz; + copy(data->attrData, dataPtr); + releaseSections(handle); + } + Ptr syncPtr; c_syncPool.getPtr(syncPtr, (opPtrI >> 16));