From: Mauritz Sundell Date: September 28 2012 2:20pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (mauritz.sundell:4013 to 4014) List-Archive: http://lists.mysql.com/commits/144907 Message-Id: <201209281420.q8SEKTec012067@acsmt358.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4014 Mauritz Sundell 2012-09-28 [merge] merge 7.1 -> 7.2 added: mysql-test/suite/ndb_big/bug14000373.cnf mysql-test/suite/ndb_big/bug14000373.result mysql-test/suite/ndb_big/bug14000373.test storage/ndb/src/kernel/vm/LHLevel.cpp storage/ndb/src/kernel/vm/LHLevel.hpp modified: storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/vm/CMakeLists.txt storage/ndb/src/ndbjtie/jtie/jtie_tconv_array_impl.hpp 4013 bernd.ocklin@stripped 2012-09-27 fix clusterj install version number to be equal to jar version number modified: storage/ndb/clusterj/CMakeLists.txt storage/ndb/clusterj/clusterj-api/CMakeLists.txt === added file 'mysql-test/suite/ndb_big/bug14000373.cnf' --- a/mysql-test/suite/ndb_big/bug14000373.cnf 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/ndb_big/bug14000373.cnf 2012-09-28 12:58:59 +0000 @@ -0,0 +1,5 @@ +!include suite/ndb/my.cnf + +[cluster_config.1] +DataMemory=300M +IndexMemory=700M === added file 'mysql-test/suite/ndb_big/bug14000373.result' --- a/mysql-test/suite/ndb_big/bug14000373.result 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/ndb_big/bug14000373.result 2012-09-28 12:58:59 +0000 @@ -0,0 +1,21 @@ +set max_heap_table_size = 286720000; +create table t1 (a int key) engine=memory; +load data local infile 'suite/ndb/data/table_data10000.dat' into table t1 columns terminated by ' ' (a, @col2); +insert into t1 select a + 10000 from t1;; +insert into t1 select a + 10000 * 2 from t1;; +insert into t1 select a + 10000 * 2 * 2 from t1;; +insert into t1 select a + 10000 * 2 * 2 * 2 from t1;; +insert into t1 select a + 10000 * 2 * 2 * 2 * 2 from t1;; +insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 from t1;; +insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 * 2 from t1;; +insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 * 2 * 2 from t1;; +insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 from t1;; +select count(*) from t1; +count(*) +5120000 +alter table t1 engine=ndbcluster comment='NDB_TABLE=NOLOGGING' partition by key() partitions 1; +alter table t1 engine=memory; +select count(*) from t1; +count(*) +5120000 +drop table t1; === added file 'mysql-test/suite/ndb_big/bug14000373.test' --- a/mysql-test/suite/ndb_big/bug14000373.test 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/ndb_big/bug14000373.test 2012-09-28 12:58:59 +0000 @@ -0,0 +1,25 @@ +-- source include/have_ndb.inc + +# Test is using error insert, check that binaries support it +-- source suite/ndb/t/have_ndb_error_insert.inc + +# Use small LoadFactors to force sparse hash table +--exec $NDB_MGM --no-defaults --ndb-connectstring="$NDB_CONNECTSTRING" -e "all error 3003" >> $NDB_TOOLS_OUTPUT + +set max_heap_table_size = 286720000; +create table t1 (a int key) engine=memory; +load data local infile 'suite/ndb/data/table_data10000.dat' into table t1 columns terminated by ' ' (a, @col2); +let $i = 9; +let $b = 10000; +while ($i) +{ +--eval insert into t1 select a + $b from t1; + let $b = $b * 2; + dec $i; +} +select count(*) from t1; +alter table t1 engine=ndbcluster comment='NDB_TABLE=NOLOGGING' partition by key() partitions 1; +--exec $NDB_MGM --no-defaults --ndb-connectstring="$NDB_CONNECTSTRING" -e "all report memory" >> $NDB_TOOLS_OUTPUT +alter table t1 engine=memory; +select count(*) from t1; +drop table t1; === modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp' --- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2012-03-13 15:18:24 +0000 +++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2012-09-28 14:18:32 +0000 @@ -27,6 +27,7 @@ #include #include #include +#include #ifdef DBACC_C // Debug Macros @@ -200,7 +201,7 @@ class ElementHeader { * * l = Locked -- If true contains operation else scan bits + hash value * s = Scan bits - * h = Hash value + * h = Reduced hash value. The lower bits used for address is shifted away * o = Operation ptr I * * 1111111111222222222233 @@ -209,17 +210,16 @@ class ElementHeader { * ooooooooooooooooooooooooooooooo */ public: - STATIC_CONST( HASH_VALUE_PART_MASK = 0xFFFF ); - static bool getLocked(Uint32 data); static bool getUnlocked(Uint32 data); static Uint32 getScanBits(Uint32 data); - static Uint32 getHashValuePart(Uint32 data); static Uint32 getOpPtrI(Uint32 data); + static LHBits16 getReducedHashValue(Uint32 data); static Uint32 setLocked(Uint32 opPtrI); - static Uint32 setUnlocked(Uint32 hashValuePart, Uint32 scanBits); + static Uint32 setUnlocked(Uint32 scanBits, LHBits16 const& reducedHashValue); static Uint32 setScanBit(Uint32 header, Uint32 scanBit); + static Uint32 setReducedHashValue(Uint32 header, LHBits16 const& reducedHashValue); static Uint32 clearScanBit(Uint32 header, Uint32 scanBit); }; @@ -242,11 +242,11 @@ ElementHeader::getScanBits(Uint32 data){ return (data >> 1) & ((1 << MAX_PARALLEL_SCANS_PER_FRAG) - 1); } -inline -Uint32 -ElementHeader::getHashValuePart(Uint32 data){ +inline +LHBits16 +ElementHeader::getReducedHashValue(Uint32 data){ assert(getUnlocked(data)); - return data >> 16; + return LHBits16::unpack(data >> 16); } inline @@ -259,12 +259,15 @@ ElementHeader::getOpPtrI(Uint32 data){ inline Uint32 ElementHeader::setLocked(Uint32 opPtrI){ + assert(opPtrI < 0x8000000); return (opPtrI << 1) + 0; } inline Uint32 -ElementHeader::setUnlocked(Uint32 hashValue, Uint32 scanBits){ - return (hashValue << 16) + (scanBits << 1) + 1; +ElementHeader::setUnlocked(Uint32 scanBits, LHBits16 const& reducedHashValue) +{ + assert(scanBits < (1 << MAX_PARALLEL_SCANS_PER_FRAG)); + return (Uint32(reducedHashValue.pack()) << 16) | (scanBits << 1) | 1; } inline @@ -281,6 +284,13 @@ ElementHeader::clearScanBit(Uint32 heade return header & (~(scanBit << 1)); } +inline +Uint32 +ElementHeader::setReducedHashValue(Uint32 header, LHBits16 const& reducedHashValue) +{ + assert(getUnlocked(header)); + return (Uint32(reducedHashValue.pack()) << 16) | (header & 0xffff); +} class Dbacc: public SimulatedBlock { friend class DbaccProxy; @@ -402,14 +412,15 @@ struct Fragmentrec { // slackCheck When slack goes over this value it is time to expand. // slackCheck = (maxp + p + 1)*(maxloadfactor - minloadfactor) or // bucketSize * hysteresis +// Since at most RNIL 8KiB-pages can be used for a fragment, the extreme values +// for slack will be within -2^43 and +2^43 words. //----------------------------------------------------------------------------- + LHLevelRH level; Uint32 localkeylen; - Uint32 maxp; Uint32 maxloadfactor; Uint32 minloadfactor; - Uint32 p; - Uint32 slack; - Uint32 slackCheck; + Int64 slack; + Int64 slackCheck; //----------------------------------------------------------------------------- // nextfreefrag is the next free fragment if linked into a free list @@ -442,19 +453,17 @@ struct Fragmentrec { Uint16 keyLength; //----------------------------------------------------------------------------- -// This flag is used to avoid sending a big number of expand or shrink signals -// when simultaneously committing many inserts or deletes. +// Only allow one expand or shrink signal in queue at the time. //----------------------------------------------------------------------------- - Uint8 expandFlag; + bool expandOrShrinkQueued; //----------------------------------------------------------------------------- // hashcheckbit is the bit to check whether to send element to split bucket or not // k (== 6) is the number of buckets per page -// lhfragbits is the number of bits used to calculate the fragment id //----------------------------------------------------------------------------- - Uint8 hashcheckbit; - Uint8 k; - Uint8 lhfragbits; + STATIC_CONST( k = 6 ); + STATIC_CONST( MIN_HASH_COMPARE_BITS = 7 ); + STATIC_CONST( MAX_HASH_VALUE_BITS = 31 ); //----------------------------------------------------------------------------- // nodetype can only be STORED in this release. Is currently only set, never read @@ -470,6 +479,11 @@ struct Fragmentrec { // flag to mark that execEXPANDCHECK2 has failed due to DirRange full //----------------------------------------------------------------------------- Uint8 dirRangeFull; + +public: + Uint32 getPageNumber(Uint32 bucket_number) const; + Uint32 getPageIndex(Uint32 bucket_number) const; + bool enough_valid_bits(LHBits16 const& reduced_hash_value) const; }; typedef Ptr FragmentrecPtr; @@ -485,8 +499,7 @@ struct Operationrec { Uint32 elementPointer; Uint32 fid; Uint32 fragptr; - Uint32 hashvaluePart; - Uint32 hashValue; + LHBits32 hashValue; Uint32 nextLockOwnerOp; Uint32 nextOp; Uint32 nextParallelQue; @@ -512,7 +525,8 @@ struct Operationrec { Uint16 tupkeylen; Uint32 xfrmtupkeylen; Uint32 userblockref; - Uint32 scanBits; + Uint16 scanBits; + LHBits16 reducedHashValue; enum OpBits { OP_MASK = 0x0000F // 4 bits for operation type @@ -693,8 +707,8 @@ private: void releaseDirIndexResources(Signal* signal, FragmentrecPtr regFragPtr); void releaseFragRecord(Signal* signal, FragmentrecPtr regFragPtr); void initScanFragmentPart(Signal* signal); - Uint32 checkScanExpand(Signal* signal); - Uint32 checkScanShrink(Signal* signal); + Uint32 checkScanExpand(Signal* signal, Uint32 splitBucket); + Uint32 checkScanShrink(Signal* signal, Uint32 sourceBucket, Uint32 destBucket); void initialiseFragRec(Signal* signal); void initialiseFsConnectionRec(Signal* signal); void initialiseFsOpRec(Signal* signal); @@ -762,6 +776,10 @@ private: void seizeRightlist(Signal* signal); Uint32 readTablePk(Uint32 lkey1, Uint32 lkey2, Uint32 eh, OperationrecPtr); Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner); + LHBits32 getElementHash(OperationrecPtr& oprec); + LHBits32 getElementHash(Uint32 const* element, Int32 forward); + LHBits32 getElementHash(Uint32 const* element, Int32 forward, OperationrecPtr& oprec); + void shrink_adjust_reduced_hash_value(Uint32 bucket_number); Uint32 getPagePtr(DynArr256::Head&, Uint32); bool setPagePtr(DynArr256::Head& directory, Uint32 index, Uint32 ptri); Uint32 unsetPagePtr(DynArr256::Head& directory, Uint32 index); @@ -838,8 +856,6 @@ private: void zpagesize_error(const char* where); - void reenable_expand_after_redo_log_exection_complete(Signal*); - // charsets void xfrmKeyData(Signal* signal); @@ -1001,7 +1017,6 @@ private: Uint32 tgeContainerptr; Uint32 tgeElementptr; Uint32 tgeForward; - Uint32 texpReceivedBucket; Uint32 texpDirInd; Uint32 texpDirRangeIndex; Uint32 texpDirPageIndex; @@ -1035,7 +1050,6 @@ private: Uint32 tmp; Uint32 tmpP; Uint32 tmpP2; - Uint32 tmp1; Uint32 tmp2; Uint32 tgflPageindex; Uint32 tmpindex; @@ -1095,4 +1109,23 @@ private: Uint32 c_memusage_report_frequency; }; +inline Uint32 Dbacc::Fragmentrec::getPageNumber(Uint32 bucket_number) const +{ + assert(bucket_number < RNIL); + return bucket_number >> k; +} + +inline Uint32 Dbacc::Fragmentrec::getPageIndex(Uint32 bucket_number) const +{ + assert(bucket_number < RNIL); + return bucket_number & ((1 << k) - 1); +} + +inline bool Dbacc::Fragmentrec::enough_valid_bits(LHBits16 const& reduced_hash_value) const +{ + // Forte C 5.0 needs use of intermediate constant + int const bits = MIN_HASH_COMPARE_BITS; + return level.getNeededValidBits(bits) <= reduced_hash_value.valid_bits(); +} + #endif === modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2012-04-26 11:52:08 +0000 +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2012-09-28 14:18:32 +0000 @@ -34,6 +34,7 @@ #include #include #include +#include #ifdef VM_TRACE #define DEBUG(x) ndbout << "DBACC: "<< x << endl; @@ -902,7 +903,7 @@ void Dbacc::initOpRec(Signal* signal) Treqinfo = signal->theData[2]; - operationRecPtr.p->hashValue = signal->theData[3]; + operationRecPtr.p->hashValue = LHBits32(signal->theData[3]); operationRecPtr.p->tupkeylen = signal->theData[4]; operationRecPtr.p->xfrmtupkeylen = signal->theData[4]; operationRecPtr.p->transId1 = signal->theData[5]; @@ -1071,7 +1072,7 @@ void Dbacc::execACCKEYREQ(Signal* signal /*---------------------------------------------------------------*/ Uint32 eh = gePageptr.p->word32[tgeElementptr]; operationRecPtr.p->scanBits = ElementHeader::getScanBits(eh); - operationRecPtr.p->hashvaluePart = ElementHeader::getHashValuePart(eh); + operationRecPtr.p->reducedHashValue = ElementHeader::getReducedHashValue(eh); operationRecPtr.p->elementPage = gePageptr.i; operationRecPtr.p->elementContainer = tgeContainerptr; operationRecPtr.p->elementPointer = tgeElementptr; @@ -1533,10 +1534,8 @@ void Dbacc::insertelementLab(Signal* sig insertLockOwnersList(signal, operationRecPtr); - const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits; - operationRecPtr.p->hashvaluePart = - (operationRecPtr.p->hashValue >> tmp) & 0xFFFF; operationRecPtr.p->scanBits = 0; /* NOT ANY ACTIVE SCAN */ + operationRecPtr.p->reducedHashValue = fragrecptr.p->level.reduce(operationRecPtr.p->hashValue); tidrElemhead = ElementHeader::setLocked(operationRecPtr.i); idrPageptr = gdiPageptr; tidrPageindex = tgdiPageindex; @@ -2343,14 +2342,12 @@ void Dbacc::execACC_COMMITREQ(Signal* si if (fragrecptr.p->slack > fragrecptr.p->slackCheck) { /* TIME FOR JOIN BUCKETS PROCESS */ if (fragrecptr.p->expandCounter > 0) { - if (fragrecptr.p->expandFlag < 2) { + if (!fragrecptr.p->expandOrShrinkQueued) + { jam(); signal->theData[0] = fragrecptr.i; - signal->theData[1] = fragrecptr.p->p; - signal->theData[2] = fragrecptr.p->maxp; - signal->theData[3] = fragrecptr.p->expandFlag; - fragrecptr.p->expandFlag = 2; - sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB); + fragrecptr.p->expandOrShrinkQueued = true; + sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 1, JBB); }//if }//if }//if @@ -2359,15 +2356,15 @@ void Dbacc::execACC_COMMITREQ(Signal* si jam(); /* EXPAND PROCESS HANDLING */ fragrecptr.p->noOfElements++; fragrecptr.p->slack -= fragrecptr.p->elementLength; - if (fragrecptr.p->slack >= (1u << 31)) { + if (fragrecptr.p->slack < 0 && !fragrecptr.p->level.isFull()) + { /* IT MEANS THAT IF SLACK < ZERO */ - if (fragrecptr.p->expandFlag == 0) { + if (!fragrecptr.p->expandOrShrinkQueued) + { jam(); - fragrecptr.p->expandFlag = 2; signal->theData[0] = fragrecptr.i; - signal->theData[1] = fragrecptr.p->p; - signal->theData[2] = fragrecptr.p->maxp; - sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB); + fragrecptr.p->expandOrShrinkQueued = true; + sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 1, JBB); }//if }//if } @@ -3181,21 +3178,11 @@ Uint32 Dbacc::unsetPagePtr(DynArr256::He void Dbacc::getdirindex(Signal* signal) { - Uint32 tgdiTmp; Uint32 tgdiAddress; - tgdiTmp = fragrecptr.p->k + fragrecptr.p->lhfragbits; /* OBS K = 6 */ - tgdiPageindex = operationRecPtr.p->hashValue & ((1 << fragrecptr.p->k) - 1); - tgdiTmp = operationRecPtr.p->hashValue >> tgdiTmp; - tgdiTmp = (tgdiTmp << fragrecptr.p->k) | tgdiPageindex; - tgdiAddress = tgdiTmp & fragrecptr.p->maxp; - if (tgdiAddress < fragrecptr.p->p) { - jam(); - tgdiAddress = tgdiTmp & ((fragrecptr.p->maxp << 1) | 1); - }//if - tgdiTmp = tgdiAddress >> fragrecptr.p->k; - ndbassert(tgdiTmp <= ElementHeader::HASH_VALUE_PART_MASK); - gdiPageptr.i = getPagePtr(fragrecptr.p->directory, tgdiTmp); + tgdiAddress = fragrecptr.p->level.getBucketNumber(operationRecPtr.p->hashValue); + tgdiPageindex = fragrecptr.p->getPageIndex(tgdiAddress); + gdiPageptr.i = getPagePtr(fragrecptr.p->directory, fragrecptr.p->getPageNumber(tgdiAddress)); ptrCheckGuard(gdiPageptr, cpagesize, page8); }//Dbacc::getdirindex() @@ -3278,6 +3265,7 @@ Dbacc::getElement(Signal* signal, Operat register Uint32 TelemLen = fragrecptr.p->elementLength; register Uint32* Tkeydata = (Uint32*)&signal->theData[7]; const Uint32 localkeylen = fragrecptr.p->localkeylen; + Uint32 bucket_number = fragrecptr.p->level.getBucketNumber(operationRecPtr.p->hashValue); getdirindex(signal); tgePageindex = tgdiPageindex; @@ -3292,8 +3280,6 @@ Dbacc::getElement(Signal* signal, Operat ndbrequire(TelemLen == ZELEM_HEAD_SIZE + localkeylen); tgeNextptrtype = ZLEFT; - const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits; - const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF; do { tgeContainerptr = mul_ZBUF_SIZE(tgePageindex); if (tgeNextptrtype == ZLEFT) { @@ -3345,23 +3331,25 @@ Dbacc::getElement(Signal* signal, Operat // Check if it is the element searched for. /* ------------------------------------------------------------------- */ do { + bool possible_match; tgeElementHeader = gePageptr.p->word32[tgeElementptr]; tgeRemLen = tgeRemLen - TelemLen; - Uint32 hashValuePart; Uint32 localkey1, localkey2; lockOwnerPtr.i = RNIL; lockOwnerPtr.p = NULL; + LHBits16 reducedHashValue; if (ElementHeader::getLocked(tgeElementHeader)) { jam(); lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader); ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec); - hashValuePart = lockOwnerPtr.p->hashvaluePart; + possible_match = lockOwnerPtr.p->hashValue.match(operationRecPtr.p->hashValue); + reducedHashValue = lockOwnerPtr.p->reducedHashValue; localkey1 = lockOwnerPtr.p->localdata[0]; localkey2 = lockOwnerPtr.p->localdata[1]; } else { jam(); + reducedHashValue = ElementHeader::getReducedHashValue(tgeElementHeader); Uint32 pos = tgeElementptr + tgeForward; - hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader); localkey1 = gePageptr.p->word32[pos]; if (likely(localkeylen == 1)) { @@ -3372,8 +3360,11 @@ Dbacc::getElement(Signal* signal, Operat { localkey2 = gePageptr.p->word32[pos + tgeForward]; } + possible_match = true; } - if (hashValuePart == opHashValuePart) { + if (possible_match && + operationRecPtr.p->hashValue.match(fragrecptr.p->level.enlarge(reducedHashValue, bucket_number))) + { jam(); bool found; if (! searchLocalKey) @@ -3514,8 +3505,7 @@ void Dbacc::commitdelete(Signal* signal) // We thus update the element header to ensure we log an unlocked element. We do not // need to restore it later since it is deleted immediately anyway. /* --------------------------------------------------------------------------------- */ - const Uint32 hv = operationRecPtr.p->hashvaluePart; - const Uint32 eh = ElementHeader::setUnlocked(hv, 0); + const Uint32 eh = ElementHeader::setUnlocked(0, LHBits16()); delPageptr.p->word32[tdelElementptr] = eh; if (operationRecPtr.p->elementPage == lastPageptr.i) { if (operationRecPtr.p->elementPointer == tlastElementptr) { @@ -3588,8 +3578,7 @@ void Dbacc::deleteElement(Signal* signal // An undo of the delete will reinstall the moved record. We have to ensure that the // lock is removed to ensure that no such thing happen. /* --------------------------------------------------------------------------------- */ - Uint32 eh = ElementHeader::setUnlocked(deOperationRecPtr.p->hashvaluePart, - 0); + Uint32 eh = ElementHeader::setUnlocked(0, LHBits16()); lastPageptr.p->word32[tlastElementptr] = eh; }//if return; @@ -4303,8 +4292,7 @@ void Dbacc::abortOperation(Signal* signa taboElementptr = operationRecPtr.p->elementPointer; aboPageidptr.i = operationRecPtr.p->elementPage; - tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart, - operationRecPtr.p->scanBits); + tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->scanBits, operationRecPtr.p->reducedHashValue); ptrCheckGuard(aboPageidptr, cpagesize, page8); dbgWord32(aboPageidptr, taboElementptr, tmp2Olq); arrGuard(taboElementptr, 2048); @@ -4336,7 +4324,7 @@ Dbacc::commitDeleteCheck() OperationrecPtr deleteOpPtr; Uint32 elementDeleted = 0; bool deleteCheckOngoing = true; - Uint32 hashValue = 0; + LHBits32 hashValue; lastOpPtr = operationRecPtr; opPtr.i = operationRecPtr.p->nextParallelQue; while (opPtr.i != RNIL) { @@ -4463,8 +4451,7 @@ void Dbacc::commitOperation(Signal* sign coPageidptr.i = operationRecPtr.p->elementPage; tcoElementptr = operationRecPtr.p->elementPointer; - tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart, - operationRecPtr.p->scanBits); + tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->scanBits, operationRecPtr.p->reducedHashValue); ptrCheckGuard(coPageidptr, cpagesize, page8); dbgWord32(coPageidptr, tcoElementptr, tmp2Olq); arrGuard(tcoElementptr, 2048); @@ -4785,7 +4772,7 @@ Dbacc::release_lockowner(Signal* signal, newOwner.p->elementPointer = opPtr.p->elementPointer; newOwner.p->elementContainer = opPtr.p->elementContainer; newOwner.p->scanBits = opPtr.p->scanBits; - newOwner.p->hashvaluePart = opPtr.p->hashvaluePart; + newOwner.p->reducedHashValue = opPtr.p->reducedHashValue; newOwner.p->m_op_bits |= (opbits & Operationrec::OP_ELEMENT_DISAPPEARED); if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) { @@ -5095,7 +5082,7 @@ void Dbacc::allocOverflowPage(Signal* si /* BE EXPANDED ACORDING TO LH3, */ /* AND COMMIT TRANSACTION PROCESS */ /* WILL BE CONTINUED */ -Uint32 Dbacc::checkScanExpand(Signal* signal) +Uint32 Dbacc::checkScanExpand(Signal* signal, Uint32 splitBucket) { Uint32 Ti; Uint32 TreturnCode = 0; @@ -5108,7 +5095,7 @@ Uint32 Dbacc::checkScanExpand(Signal* si Page8Ptr TPageptr; ScanRecPtr TscanPtr; - TSplit = fragrecptr.p->p; + TSplit = splitBucket; for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) { TreleaseScanIndicator[Ti] = 0; if (fragrecptr.p->scan[Ti] != RNIL) { @@ -5162,8 +5149,8 @@ Uint32 Dbacc::checkScanExpand(Signal* si }//for if (TreleaseInd == 1) { TreleaseScanBucket = TSplit; - TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */ - TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */ + TPageIndex = fragrecptr.p->getPageIndex(TreleaseScanBucket); + TDirInd = fragrecptr.p->getPageNumber(TreleaseScanBucket); TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd); ptrCheckGuard(TPageptr, cpagesize, page8); for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) { @@ -5192,11 +5179,9 @@ void Dbacc::execEXPANDCHECK2(Signal* sig fragrecptr.i = signal->theData[0]; tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */ - Uint32 tmp = 1; - tmp = tmp << 31; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - fragrecptr.p->expandFlag = 0; - if (fragrecptr.p->slack < tmp) { + fragrecptr.p->expandOrShrinkQueued = false; + if (fragrecptr.p->slack > 0) { jam(); /* IT MEANS THAT IF SLACK > ZERO */ /*--------------------------------------------------------------*/ @@ -5232,7 +5217,24 @@ void Dbacc::execEXPANDCHECK2(Signal* sig /*--------------------------------------------------------------*/ return; }//if - if (checkScanExpand(signal) == 1) { + + if (fragrecptr.p->level.isFull()) + { + jam(); + /* + * The level structure does not allow more buckets. + * Do not expand. + */ + return; + } + + Uint32 splitBucket; + Uint32 receiveBucket; + + bool doSplit = fragrecptr.p->level.getSplitBucket(splitBucket, receiveBucket); + + // Check that splitted bucket is not currently scanned + if (doSplit && checkScanExpand(signal, splitBucket) == 1) { jam(); /*--------------------------------------------------------------*/ // A scan state was inconsistent with performing an expand @@ -5247,9 +5249,9 @@ void Dbacc::execEXPANDCHECK2(Signal* sig /* THE NEXT HASH BIT. THIS BIT IS USED IN THE SPLIT MECHANISM TO */ /* DECIDE WHICH ELEMENT GOES WHERE. */ /*--------------------------------------------------------------------------*/ - texpReceivedBucket = (fragrecptr.p->maxp + fragrecptr.p->p) + 1; /* RECEIVED BUCKET */ - texpDirInd = texpReceivedBucket >> fragrecptr.p->k; - if ((texpReceivedBucket & ((1 << fragrecptr.p->k) - 1)) == 0) + + texpDirInd = fragrecptr.p->getPageNumber(receiveBucket); + if (fragrecptr.p->getPageIndex(receiveBucket) == 0) { // Need new bucket expPageptr.i = RNIL; } @@ -5262,15 +5264,6 @@ void Dbacc::execEXPANDCHECK2(Signal* sig } if (expPageptr.i == RNIL) { jam(); - // We cannot expand if the new page index cannot be - // represented in the stored hash bits. - if (texpDirInd > ElementHeader::HASH_VALUE_PART_MASK) - { - jam(); - fragrecptr.p->dirRangeFull = ZTRUE; - tresult = ZDIR_RANGE_FULL_ERROR; - return; - } seizePage(signal); if (tresult > ZLIMIT_OF_ERROR) { jam(); @@ -5279,6 +5272,7 @@ void Dbacc::execEXPANDCHECK2(Signal* sig if (!setPagePtr(fragrecptr.p->directory, texpDirInd, spPageptr.i)) { jam(); + // TODO: should release seized page tresult = ZDIR_RANGE_FULL_ERROR; return; } @@ -5291,13 +5285,13 @@ void Dbacc::execEXPANDCHECK2(Signal* sig }//if fragrecptr.p->expReceivePageptr = expPageptr.i; - fragrecptr.p->expReceiveIndex = texpReceivedBucket & ((1 << fragrecptr.p->k) - 1); + fragrecptr.p->expReceiveIndex = fragrecptr.p->getPageIndex(receiveBucket); /*--------------------------------------------------------------------------*/ /* THE NEXT ACTION IS TO FIND THE PAGE, THE PAGE INDEX AND THE PAGE */ /* DIRECTORY OF THE BUCKET TO BE SPLIT. */ /*--------------------------------------------------------------------------*/ - cexcPageindex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */ - texpDirInd = fragrecptr.p->p >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */ + cexcPageindex = fragrecptr.p->getPageIndex(splitBucket); + texpDirInd = fragrecptr.p->getPageNumber(splitBucket); excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd); #ifdef VM_TRACE require(excPageptr.i != RNIL); @@ -5318,71 +5312,27 @@ void Dbacc::execEXPANDCHECK2(Signal* sig void Dbacc::endofexpLab(Signal* signal) { - fragrecptr.p->p++; fragrecptr.p->slack += fragrecptr.p->maxloadfactor; fragrecptr.p->expandCounter++; - if (fragrecptr.p->p > fragrecptr.p->maxp) { - jam(); - fragrecptr.p->maxp = (fragrecptr.p->maxp << 1) | 1; - fragrecptr.p->hashcheckbit++; - fragrecptr.p->p = 0; - }//if - Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p; + fragrecptr.p->level.expand(); + Uint32 noOfBuckets = fragrecptr.p->level.getSize(); Uint32 Thysteres = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor; - fragrecptr.p->slackCheck = noOfBuckets * Thysteres; - if (fragrecptr.p->slack > (1u << 31)) { + fragrecptr.p->slackCheck = Int64(noOfBuckets) * Thysteres; + if (fragrecptr.p->slack < 0 && !fragrecptr.p->level.isFull()) + { jam(); /* IT MEANS THAT IF SLACK < ZERO */ /* --------------------------------------------------------------------------------- */ /* IT IS STILL NECESSARY TO EXPAND THE FRAGMENT EVEN MORE. START IT FROM HERE */ /* WITHOUT WAITING FOR NEXT COMMIT ON THE FRAGMENT. */ /* --------------------------------------------------------------------------------- */ - fragrecptr.p->expandFlag = 2; signal->theData[0] = fragrecptr.i; - signal->theData[1] = fragrecptr.p->p; - signal->theData[2] = fragrecptr.p->maxp; - sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB); + fragrecptr.p->expandOrShrinkQueued = true; + sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 1, JBB); }//if return; }//Dbacc::endofexpLab() -void Dbacc::reenable_expand_after_redo_log_exection_complete(Signal* signal){ - - tabptr.i = signal->theData[0]; - Uint32 fragId = signal->theData[1]; - - ptrCheckGuard(tabptr, ctablesize, tabrec); - ndbrequire(getfragmentrec(signal, fragrecptr, fragId)); -#if 0 - ndbout_c("reenable expand check for table %d fragment: %d", - tabptr.i, fragId); -#endif - - switch(fragrecptr.p->expandFlag){ - case 0: - /** - * Hmm... this means that it's alreay has been reenabled... - */ - fragrecptr.p->expandFlag = 1; - break; - case 1: - /** - * Nothing is going on start expand check - */ - case 2: - /** - * A shrink is running, do expand check anyway - * (to reset expandFlag) - */ - fragrecptr.p->expandFlag = 2; - signal->theData[0] = fragrecptr.i; - signal->theData[1] = fragrecptr.p->p; - signal->theData[2] = fragrecptr.p->maxp; - sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB); - break; - } -} - void Dbacc::execDEBUG_SIG(Signal* signal) { jamEntry(); @@ -5392,6 +5342,86 @@ void Dbacc::execDEBUG_SIG(Signal* signal return; }//Dbacc::execDEBUG_SIG() +LHBits32 Dbacc::getElementHash(OperationrecPtr& oprec) +{ + jam(); + ndbassert(!oprec.isNull()); + + // Only calculate hash value if operation does not already have a complete hash value + if (oprec.p->hashValue.valid_bits() < fragrecptr.p->MAX_HASH_VALUE_BITS) + { + jam(); + Uint32 localkey[2]; + localkey[0] = oprec.p->localdata[0]; + localkey[1] = oprec.p->localdata[1]; + Uint32 len = readTablePk(localkey[0], localkey[1], ElementHeader::setLocked(oprec.i), oprec); + if (len > 0) + oprec.p->hashValue = LHBits32(md5_hash((Uint64*)ckeys, len)); + } + return oprec.p->hashValue; +} + +LHBits32 Dbacc::getElementHash(Uint32 const* elemptr, Int32 forward) +{ + jam(); + assert(ElementHeader::getUnlocked(*elemptr)); + + Uint32 elemhead = *elemptr; + Uint32 localkey[2]; + elemptr += forward; + localkey[0] = *elemptr; + if (likely(fragrecptr.p->localkeylen == 1)) + { + jam(); + localkey[1] = Local_key::ref2page_idx(localkey[0]); + localkey[0] = Local_key::ref2page_id(localkey[0]); + } + else + { + jam(); + elemptr += forward; + localkey[1] = *elemptr; + } + OperationrecPtr oprec; + oprec.i = RNIL; + Uint32 len = readTablePk(localkey[0], localkey[1], elemhead, oprec); + if (len > 0) + { + jam(); + return LHBits32(md5_hash((Uint64*)ckeys, len)); + } + else + { // Return an invalid hash value if no data + jam(); + return LHBits32(); + } +} + +LHBits32 Dbacc::getElementHash(Uint32 const* elemptr, Int32 forward, OperationrecPtr& oprec) +{ + jam(); + + if (!oprec.isNull()) + { + jam(); + return getElementHash(oprec); + } + + Uint32 elemhead = *elemptr; + if (ElementHeader::getUnlocked(elemhead)) + { + jam(); + return getElementHash(elemptr, forward); + } + else + { + jam(); + oprec.i = ElementHeader::getOpPtrI(elemhead); + ptrCheckGuard(oprec, coprecsize, operationrec); + return getElementHash(oprec); + } +} + /* --------------------------------------------------------------------------------- */ /* EXPANDCONTAINER */ /* INPUT: EXC_PAGEPTR (POINTER TO THE ACTIVE PAGE RECORD) */ @@ -5402,7 +5432,7 @@ void Dbacc::execDEBUG_SIG(Signal* signal /* --------------------------------------------------------------------------------- */ void Dbacc::expandcontainer(Signal* signal) { - Uint32 texcHashvalue; + LHBits32 texcHashvalue; Uint32 texcTmp; Uint32 texcIndex; Uint32 guard20; @@ -5444,17 +5474,42 @@ void Dbacc::expandcontainer(Signal* sign /* --------------------------------------------------------------------------------- */ arrGuard(cexcElementptr, 2048); tidrElemhead = excPageptr.p->word32[cexcElementptr]; - if (ElementHeader::getUnlocked(tidrElemhead)){ - jam(); - texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead); - } else { + bool move; + if (ElementHeader::getLocked(tidrElemhead)) + { jam(); idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead); ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec); - texcHashvalue = idrOperationRecPtr.p->hashvaluePart; - }//if - if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) { + ndbassert(idrOperationRecPtr.p->reducedHashValue.valid_bits() >= 1); + move = idrOperationRecPtr.p->reducedHashValue.get_bit(1); + idrOperationRecPtr.p->reducedHashValue.shift_out(); + if (!fragrecptr.p->enough_valid_bits(idrOperationRecPtr.p->reducedHashValue)) + { + jam(); + idrOperationRecPtr.p->reducedHashValue = + fragrecptr.p->level.reduce(getElementHash(idrOperationRecPtr)); + } + } + else + { + jam(); + LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tidrElemhead); + ndbassert(reducedHashValue.valid_bits() >= 1); + move = reducedHashValue.get_bit(1); + reducedHashValue.shift_out(); + if (!fragrecptr.p->enough_valid_bits(reducedHashValue)) + { + jam(); + reducedHashValue = + fragrecptr.p->level.reduce(getElementHash(&excPageptr.p->word32[cexcElementptr], cexcForward)); + } + tidrElemhead = ElementHeader::setReducedHashValue(tidrElemhead, reducedHashValue); + } + if (!move) + { jam(); + if (ElementHeader::getUnlocked(tidrElemhead)) + excPageptr.p->word32[cexcElementptr] = tidrElemhead; /* --------------------------------------------------------------------------------- */ /* THIS ELEMENT IS NOT TO BE MOVED. WE CALCULATE THE WHEREABOUTS OF THE NEXT */ /* ELEMENT AND PROCEED WITH THAT OR END THE SEARCH IF THERE ARE NO MORE */ @@ -5519,17 +5574,41 @@ void Dbacc::expandcontainer(Signal* sign ptrNull(idrOperationRecPtr); arrGuard(tlastElementptr, 2048); tidrElemhead = lastPageptr.p->word32[tlastElementptr]; - if (ElementHeader::getUnlocked(tidrElemhead)) { - jam(); - texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead); - } else { + if (ElementHeader::getLocked(tidrElemhead)) + { jam(); idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead); ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec); - texcHashvalue = idrOperationRecPtr.p->hashvaluePart; - }//if - if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) { + ndbassert(idrOperationRecPtr.p->reducedHashValue.valid_bits() >= 1); + move = idrOperationRecPtr.p->reducedHashValue.get_bit(1); + idrOperationRecPtr.p->reducedHashValue.shift_out(); + if (!fragrecptr.p->enough_valid_bits(idrOperationRecPtr.p->reducedHashValue)) + { + jam(); + idrOperationRecPtr.p->reducedHashValue = + fragrecptr.p->level.reduce(getElementHash(idrOperationRecPtr)); + } + } + else + { jam(); + LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tidrElemhead); + ndbassert(reducedHashValue.valid_bits() > 0); + move = reducedHashValue.get_bit(1); + reducedHashValue.shift_out(); + if (!fragrecptr.p->enough_valid_bits(reducedHashValue)) + { + jam(); + reducedHashValue = + fragrecptr.p->level.reduce(getElementHash(&lastPageptr.p->word32[tlastElementptr], tlastForward)); + } + tidrElemhead = ElementHeader::setReducedHashValue(tidrElemhead, reducedHashValue); + } + if (!move) + { + jam(); + if (ElementHeader::getUnlocked(tidrElemhead)) + lastPageptr.p->word32[tlastElementptr] = tidrElemhead; /* --------------------------------------------------------------------------------- */ /* THE LAST ELEMENT IS NOT TO BE MOVED. WE COPY IT TO THE CURRENT ELEMENT. */ /* --------------------------------------------------------------------------------- */ @@ -5602,7 +5681,7 @@ void Dbacc::expandcontainer(Signal* sign /* WILL BE JOINED ACORDING TO LH3 */ /* AND COMMIT TRANSACTION PROCESS */ /* WILL BE CONTINUED */ -Uint32 Dbacc::checkScanShrink(Signal* signal) +Uint32 Dbacc::checkScanShrink(Signal* signal, Uint32 sourceBucket, Uint32 destBucket) { Uint32 Ti; Uint32 TreturnCode = 0; @@ -5616,14 +5695,8 @@ Uint32 Dbacc::checkScanShrink(Signal* si Page8Ptr TPageptr; ScanRecPtr TscanPtr; - if (fragrecptr.p->p == 0) { - jam(); - TmergeDest = fragrecptr.p->maxp >> 1; - } else { - jam(); - TmergeDest = fragrecptr.p->p - 1; - }//if - TmergeSource = fragrecptr.p->maxp + fragrecptr.p->p; + TmergeDest = destBucket; + TmergeSource = sourceBucket; for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) { TreleaseScanIndicator[Ti] = 0; if (fragrecptr.p->scan[Ti] != RNIL) { @@ -5677,8 +5750,8 @@ Uint32 Dbacc::checkScanShrink(Signal* si if (TreleaseInd == 1) { jam(); TreleaseScanBucket = TmergeSource; - TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */ - TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */ + TPageIndex = fragrecptr.p->getPageIndex(TreleaseScanBucket); + TDirInd = fragrecptr.p->getPageNumber(TreleaseScanBucket); TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd); ptrCheckGuard(TPageptr, cpagesize, page8); for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) { @@ -5718,9 +5791,8 @@ void Dbacc::execSHRINKCHECK2(Signal* sig jamEntry(); fragrecptr.i = signal->theData[0]; - Uint32 oldFlag = signal->theData[3]; ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); - fragrecptr.p->expandFlag = oldFlag; + fragrecptr.p->expandOrShrinkQueued = false; tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */ if (fragrecptr.p->slack <= fragrecptr.p->slackCheck) { jam(); @@ -5730,7 +5802,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig /*--------------------------------------------------------------*/ return; }//if - if (fragrecptr.p->slack > (1u << 31)) { + if (fragrecptr.p->slack < 0) { jam(); /*--------------------------------------------------------------*/ /* THE SLACK IS NEGATIVE, IN THIS CASE WE WILL NOT NEED ANY */ @@ -5738,7 +5810,6 @@ void Dbacc::execSHRINKCHECK2(Signal* sig /*--------------------------------------------------------------*/ return; }//if - texpDirInd = (fragrecptr.p->maxp + fragrecptr.p->p) >> fragrecptr.p->k; if (fragrecptr.p->firstOverflowRec == RNIL) { jam(); allocOverflowPage(signal); @@ -5757,7 +5828,26 @@ void Dbacc::execSHRINKCHECK2(Signal* sig /*--------------------------------------------------------------*/ return; }//if - if (checkScanShrink(signal) == 1) { + + if (fragrecptr.p->level.isEmpty()) + { + jam(); + /* no need to shrink empty hash table */ + return; + } + + // Since expandCounter guards more shrinks than expands and + // all fragments starts with a full page of buckets + ndbassert(fragrecptr.p->getPageNumber(fragrecptr.p->level.getTop()) > 0); + + Uint32 mergeSourceBucket; + Uint32 mergeDestBucket; + bool doMerge = fragrecptr.p->level.getMergeBuckets(mergeSourceBucket, mergeDestBucket); + + ndbassert(doMerge); // Merge always needed since we never shrink below one page of buckets + + /* check that neither of source or destination bucket are currently scanned */ + if (doMerge && checkScanShrink(signal, mergeSourceBucket, mergeDestBucket) == 1) { jam(); /*--------------------------------------------------------------*/ // A scan state was inconsistent with performing a shrink @@ -5765,15 +5855,6 @@ void Dbacc::execSHRINKCHECK2(Signal* sig /*--------------------------------------------------------------*/ return; }//if - if (fragrecptr.p->p == 0) { - jam(); - fragrecptr.p->maxp = fragrecptr.p->maxp >> 1; - fragrecptr.p->p = fragrecptr.p->maxp; - fragrecptr.p->hashcheckbit--; - } else { - jam(); - fragrecptr.p->p--; - }//if if (ERROR_INSERTED(3002)) debug_lh_vars("SHR"); @@ -5782,12 +5863,14 @@ void Dbacc::execSHRINKCHECK2(Signal* sig fragrecptr.p->dirRangeFull = ZFALSE; } + shrink_adjust_reduced_hash_value(mergeDestBucket); + /*--------------------------------------------------------------------------*/ /* WE START BY FINDING THE NECESSARY INFORMATION OF THE BUCKET TO BE */ /* REMOVED WHICH WILL SEND ITS ELEMENTS TO THE RECEIVING BUCKET. */ /*--------------------------------------------------------------------------*/ - cexcPageindex = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) & ((1 << fragrecptr.p->k) - 1); - texpDirInd = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) >> fragrecptr.p->k; + cexcPageindex = fragrecptr.p->getPageIndex(mergeSourceBucket); + texpDirInd = fragrecptr.p->getPageNumber(mergeSourceBucket); excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd); fragrecptr.p->expSenderIndex = cexcPageindex; fragrecptr.p->expSenderPageptr = excPageptr.i; @@ -5796,9 +5879,9 @@ void Dbacc::execSHRINKCHECK2(Signal* sig /* WE NOW PROCEED BY FINDING THE NECESSARY INFORMATION ABOUT THE */ /* RECEIVING BUCKET. */ /*--------------------------------------------------------------------------*/ - texpReceivedBucket = fragrecptr.p->p >> fragrecptr.p->k; - fragrecptr.p->expReceivePageptr = getPagePtr(fragrecptr.p->directory, texpReceivedBucket); - fragrecptr.p->expReceiveIndex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1); + texpDirInd = fragrecptr.p->getPageNumber(mergeDestBucket); + fragrecptr.p->expReceivePageptr = getPagePtr(fragrecptr.p->directory, texpDirInd); + fragrecptr.p->expReceiveIndex = fragrecptr.p->getPageIndex(mergeDestBucket); fragrecptr.p->expReceiveForward = ZTRUE; if (excPageptr.i == RNIL) { jam(); @@ -5904,6 +5987,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig void Dbacc::endofshrinkbucketLab(Signal* signal) { + fragrecptr.p->level.shrink(); fragrecptr.p->expandCounter--; fragrecptr.p->slack -= fragrecptr.p->maxloadfactor; if (fragrecptr.p->expSenderIndex == 0) { @@ -5915,7 +5999,7 @@ void Dbacc::endofshrinkbucketLab(Signal* releasePage(signal); unsetPagePtr(fragrecptr.p->directory, fragrecptr.p->expSenderDirIndex); }//if - if (((((fragrecptr.p->p + fragrecptr.p->maxp) + 1) >> fragrecptr.p->k) & 0xff) == 0) { + if ((fragrecptr.p->getPageNumber(fragrecptr.p->level.getSize()) & 0xff) == 0) { jam(); DynArr256 dir(directoryPool, fragrecptr.p->directory); DynArr256::ReleaseIterator iter; @@ -5933,15 +6017,15 @@ void Dbacc::endofshrinkbucketLab(Signal* } }//if }//if - if (fragrecptr.p->slack < (1u << 31)) { + if (fragrecptr.p->slack > 0) { jam(); /*--------------------------------------------------------------*/ /* THE SLACK IS POSITIVE, IN THIS CASE WE WILL CHECK WHETHER */ /* WE WILL CONTINUE PERFORM ANOTHER SHRINK. */ /*--------------------------------------------------------------*/ - Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p; + Uint32 noOfBuckets = fragrecptr.p->level.getSize(); Uint32 Thysteresis = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor; - fragrecptr.p->slackCheck = noOfBuckets * Thysteresis; + fragrecptr.p->slackCheck = Int64(noOfBuckets) * Thysteresis; if (fragrecptr.p->slack > Thysteresis) { /*--------------------------------------------------------------*/ /* IT IS STILL NECESSARY TO SHRINK THE FRAGMENT MORE. THIS*/ @@ -5960,16 +6044,13 @@ void Dbacc::endofshrinkbucketLab(Signal* /* WAS REMOVED 2000-05-12. */ /*--------------------------------------------------------------*/ signal->theData[0] = fragrecptr.i; - signal->theData[1] = fragrecptr.p->p; - signal->theData[2] = fragrecptr.p->maxp; - signal->theData[3] = fragrecptr.p->expandFlag; - ndbrequire(fragrecptr.p->expandFlag < 2); - fragrecptr.p->expandFlag = 2; - sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB); + ndbrequire(!fragrecptr.p->expandOrShrinkQueued); + fragrecptr.p->expandOrShrinkQueued = true; + sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 1, JBB); }//if }//if }//if - ndbrequire(fragrecptr.p->maxp >= (Uint32)((1 << fragrecptr.p->k) - 1)); + ndbrequire(fragrecptr.p->getPageNumber(fragrecptr.p->level.getSize()) > 0); return; }//Dbacc::endofshrinkbucketLab() @@ -5980,9 +6061,121 @@ void Dbacc::endofshrinkbucketLab(Signal* /* CEXC_CONTAINERPTR (ARRAY INDEX OF THE CONTAINER). */ /* CEXC_FORWARD (CONTAINER FORWARD (+1) OR BACKWARD (-1)) */ /* */ -/* DESCRIPTION: ALL ELEMENTS OF THE ACTIVE CONTAINER HAVE TO MOVE TO THE NEW */ -/* CONTAINER. */ +/* DESCRIPTION: SCAN ALL ELEMENTS IN DESTINATION BUCKET BEFORE MERGE */ +/* AND ADJUST THE STORED REDUCED HASH VALUE (SHIFT IN ZERO). */ /* --------------------------------------------------------------------------------- */ +void +Dbacc::shrink_adjust_reduced_hash_value(Uint32 bucket_number) +{ + /* + * Note: function are a copy paste from getElement() with modified inner loop + * instead of finding a specific element, scan through all and modify. + */ + Uint32 tgeElementHeader; + Uint32 tgeElemStep; + Uint32 tgeContainerhead; + Uint32 tgePageindex; + Uint32 tgeActivePageDir; + Uint32 tgeNextptrtype; + register Uint32 tgeRemLen; + register Uint32 TelemLen = fragrecptr.p->elementLength; + const Uint32 localkeylen = fragrecptr.p->localkeylen; + + tgePageindex = fragrecptr.p->getPageIndex(bucket_number); + gePageptr.i = getPagePtr(fragrecptr.p->directory, fragrecptr.p->getPageNumber(bucket_number)); + ptrCheckGuard(gePageptr, cpagesize, page8); + + ndbrequire(TelemLen == ZELEM_HEAD_SIZE + localkeylen); + tgeNextptrtype = ZLEFT; + + /* Loop through all containers in a bucket */ + do { + tgeContainerptr = mul_ZBUF_SIZE(tgePageindex); + if (tgeNextptrtype == ZLEFT) + { + jam(); + tgeContainerptr = tgeContainerptr + ZHEAD_SIZE; + tgeElementptr = tgeContainerptr + ZCON_HEAD_SIZE; + tgeElemStep = TelemLen; + tgeForward = 1; + ndbrequire(tgeContainerptr < 2048); + tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26; + ndbrequire((tgeContainerptr + tgeRemLen - 1) < 2048); + } + else if (tgeNextptrtype == ZRIGHT) + { + jam(); + tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE); + tgeElementptr = tgeContainerptr - 1; + tgeElemStep = 0 - TelemLen; + tgeForward = (Uint32)-1; + ndbrequire(tgeContainerptr < 2048); + tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26; + ndbrequire((tgeContainerptr - tgeRemLen) < 2048); + } + else + { + jam(); + jamLine(tgeNextptrtype); + ndbrequire(false); + }//if + if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen) + { + ndbrequire(tgeRemLen <= ZBUF_SIZE); + /* ------------------------------------------------------------------- */ + /* Loop through all elements in a container */ + do + { + tgeElementHeader = gePageptr.p->word32[tgeElementptr]; + tgeRemLen = tgeRemLen - TelemLen; + /* + * Adjust the stored reduced hash value for element, shifting in a zero + */ + if (ElementHeader::getLocked(tgeElementHeader)) + { + jam(); + OperationrecPtr oprec; + oprec.i = ElementHeader::getOpPtrI(tgeElementHeader); + ptrCheckGuard(oprec, coprecsize, operationrec); + oprec.p->reducedHashValue.shift_in(false); + } + else + { + jam(); + LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tgeElementHeader); + reducedHashValue.shift_in(false); + tgeElementHeader = ElementHeader::setReducedHashValue(tgeElementHeader, reducedHashValue); + gePageptr.p->word32[tgeElementptr] = tgeElementHeader; + } + if (tgeRemLen <= ZCON_HEAD_SIZE) + { + break; + } + tgeElementptr = tgeElementptr + tgeElemStep; + } while (true); + }//if + ndbrequire(tgeRemLen == ZCON_HEAD_SIZE); + tgeContainerhead = gePageptr.p->word32[tgeContainerptr]; + tgeNextptrtype = (tgeContainerhead >> 7) & 0x3; + if (tgeNextptrtype == 0) + { + jam(); + return; /* NO MORE CONTAINER */ + }//if + tgePageindex = tgeContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */ + ndbrequire(tgePageindex <= ZEMPTYLIST); + if (((tgeContainerhead >> 9) & 1) == ZFALSE) + { + jam(); + tgeActivePageDir = gePageptr.p->word32[tgeContainerptr + 1]; /* NEXT PAGE ID */ + gePageptr.i = getPagePtr(fragrecptr.p->overflowdir, tgeActivePageDir); + ptrCheckGuard(gePageptr, cpagesize, page8); + }//if + } while (1); + + return; +}//Dbacc::shrink_adjust_reduced_hash_value() + void Dbacc::shrinkcontainer(Signal* signal) { Uint32 tshrElementptr; @@ -5991,7 +6184,6 @@ void Dbacc::shrinkcontainer(Signal* sign Uint32 tshrTmp; Uint32 tshrIndex; Uint32 guard21; - tshrRemLen = cexcContainerlen - ZCON_HEAD_SIZE; tshrInc = fragrecptr.p->elementLength; if (cexcForward == ZTRUE) { @@ -6020,7 +6212,14 @@ void Dbacc::shrinkcontainer(Signal* sign /* --------------------------------------------------------------------------------- */ idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead); ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec); + idrOperationRecPtr.p->reducedHashValue.shift_in(true); }//if + else + { + LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tidrElemhead); + reducedHashValue.shift_in(true); + tidrElemhead = ElementHeader::setReducedHashValue(tidrElemhead, reducedHashValue); + } tshrTmp = tshrElementptr + cexcForward; guard21 = fragrecptr.p->localkeylen - 1; for (tshrIndex = 0; tshrIndex <= guard21; tshrIndex++) { @@ -6094,7 +6293,6 @@ void Dbacc::initFragAdd(Signal* signal, FragmentrecPtr regFragPtr) { const AccFragReq * const req = (AccFragReq*)&signal->theData[0]; - Uint32 lhFragBits = req->lhFragBits + 1; Uint32 minLoadFactor = (req->minLoadFactor * ZBUF_SIZE) / 100; Uint32 maxLoadFactor = (req->maxLoadFactor * ZBUF_SIZE) / 100; if (ERROR_INSERTED(3003)) // use small LoadFactors to force sparse hash table @@ -6112,7 +6310,7 @@ void Dbacc::initFragAdd(Signal* signal, regFragPtr.p->myfid = req->fragId; regFragPtr.p->myTableId = req->tableId; ndbrequire(req->kValue == 6); - regFragPtr.p->k = req->kValue; /* TK_SIZE = 6 IN THIS VERSION */ + ndbrequire(req->kValue == regFragPtr.p->k); regFragPtr.p->expandCounter = 0; /** @@ -6121,24 +6319,20 @@ void Dbacc::initFragAdd(Signal* signal, * * Is later restored to 0 by LQH at end of REDO log execution */ - regFragPtr.p->expandFlag = 0; - regFragPtr.p->p = 0; - regFragPtr.p->maxp = (1 << req->kValue) - 1; + regFragPtr.p->expandOrShrinkQueued = false; + regFragPtr.p->level.setSize(1 << req->kValue); regFragPtr.p->minloadfactor = minLoadFactor; regFragPtr.p->maxloadfactor = maxLoadFactor; - regFragPtr.p->slack = (regFragPtr.p->maxp + 1) * maxLoadFactor; - regFragPtr.p->lhfragbits = lhFragBits; - regFragPtr.p->hashcheckbit = 0; //lhFragBits; + regFragPtr.p->slack = Int64(regFragPtr.p->level.getSize()) * maxLoadFactor; regFragPtr.p->localkeylen = req->localKeyLen; regFragPtr.p->nodetype = (req->reqInfo >> 4) & 0x3; regFragPtr.p->lastOverIndex = 0; regFragPtr.p->keyLength = req->keyLength; ndbrequire(req->keyLength != 0); regFragPtr.p->elementLength = ZELEM_HEAD_SIZE + regFragPtr.p->localkeylen; - Uint32 Tmp1 = (regFragPtr.p->maxp + 1) + regFragPtr.p->p; + Uint32 Tmp1 = regFragPtr.p->level.getSize(); Uint32 Tmp2 = regFragPtr.p->maxloadfactor - regFragPtr.p->minloadfactor; - Tmp2 = Tmp1 * Tmp2; - regFragPtr.p->slackCheck = Tmp2; + regFragPtr.p->slackCheck = Int64(Tmp1) * Tmp2; regFragPtr.p->mytabptr = req->tableId; regFragPtr.p->roothashcheck = req->kValue + req->lhFragBits; regFragPtr.p->noOfElements = 0; @@ -6318,17 +6512,14 @@ void Dbacc::checkNextBucketLab(Signal* s Uint32 tnsElementptr; Uint32 tnsContainerptr; Uint32 tnsIsLocked; - Uint32 tnsTmp1; - Uint32 tnsTmp2; Uint32 tnsCopyDir; - tnsCopyDir = scanPtr.p->nextBucketIndex >> fragrecptr.p->k; + tnsCopyDir = fragrecptr.p->getPageNumber(scanPtr.p->nextBucketIndex); tnsPageidptr.i = getPagePtr(fragrecptr.p->directory, tnsCopyDir); ptrCheckGuard(tnsPageidptr, cpagesize, page8); gnsPageidptr.i = tnsPageidptr.i; gnsPageidptr.p = tnsPageidptr.p; - tnsTmp1 = (1 << fragrecptr.p->k) - 1; - tgsePageindex = scanPtr.p->nextBucketIndex & tnsTmp1; + tgsePageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex); gsePageidptr.i = gnsPageidptr.i; gsePageidptr.p = gnsPageidptr.p; if (!getScanElement(signal)) { @@ -6344,7 +6535,7 @@ void Dbacc::checkNextBucketLab(Signal* s return; }//if } else if (scanPtr.p->scanBucketState == ScanRec::FIRST_LAP) { - if ((fragrecptr.p->p + fragrecptr.p->maxp) < scanPtr.p->nextBucketIndex) { + if (fragrecptr.p->level.getTop() < scanPtr.p->nextBucketIndex) { /* ---------------------------------------------------------------- */ // All buckets have been scanned a first time. /* ---------------------------------------------------------------- */ @@ -6366,7 +6557,7 @@ void Dbacc::checkNextBucketLab(Signal* s /* --------------------------------------------------------------------------------- */ scanPtr.p->nextBucketIndex = scanPtr.p->minBucketIndexToRescan; scanPtr.p->scanBucketState = ScanRec::SECOND_LAP; - if (scanPtr.p->maxBucketIndexToRescan > (fragrecptr.p->p + fragrecptr.p->maxp)) { + if (scanPtr.p->maxBucketIndexToRescan > fragrecptr.p->level.getTop()) { jam(); /* --------------------------------------------------------------------------------- */ // If we have had so many merges that the maximum is bigger than the number of buckets @@ -6379,7 +6570,7 @@ void Dbacc::checkNextBucketLab(Signal* s sendSystemerror(signal, __LINE__); return; }//if - scanPtr.p->maxBucketIndexToRescan = fragrecptr.p->p + fragrecptr.p->maxp; + scanPtr.p->maxBucketIndexToRescan = fragrecptr.p->level.getTop(); }//if }//if }//if @@ -6390,19 +6581,17 @@ void Dbacc::checkNextBucketLab(Signal* s // We will only reset the scan indicator on the buckets that existed at the start of the // scan. The others will be handled by the split and merge code. /* --------------------------------------------------------------------------------- */ - tnsTmp2 = (1 << fragrecptr.p->k) - 1; - trsbPageindex = scanPtr.p->nextBucketIndex & tnsTmp2; + trsbPageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex); if (trsbPageindex != 0) { jam(); rsbPageidptr.i = gnsPageidptr.i; rsbPageidptr.p = gnsPageidptr.p; } else { jam(); - tmpP = scanPtr.p->nextBucketIndex >> fragrecptr.p->k; + tmpP = fragrecptr.p->getPageNumber(scanPtr.p->nextBucketIndex); cscPageidptr.i = getPagePtr(fragrecptr.p->directory, tmpP); ptrCheckGuard(cscPageidptr, cpagesize, page8); - tmp1 = (1 << fragrecptr.p->k) - 1; - trsbPageindex = scanPtr.p->nextBucketIndex & tmp1; + trsbPageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex); rsbPageidptr.i = cscPageidptr.i; rsbPageidptr.p = cscPageidptr.p; }//if @@ -6539,12 +6728,12 @@ void Dbacc::initScanFragmentPart(Signal* scanPtr.p->activeLocalFrag = fragrecptr.i; scanPtr.p->nextBucketIndex = 0; /* INDEX OF SCAN BUCKET */ scanPtr.p->scanBucketState = ScanRec::FIRST_LAP; - scanPtr.p->startNoOfBuckets = fragrecptr.p->p + fragrecptr.p->maxp; + scanPtr.p->startNoOfBuckets = fragrecptr.p->level.getTop(); scanPtr.p->minBucketIndexToRescan = 0xFFFFFFFF; scanPtr.p->maxBucketIndexToRescan = 0; cnfPageidptr.i = getPagePtr(fragrecptr.p->directory, 0); ptrCheckGuard(cnfPageidptr, cpagesize, page8); - trsbPageindex = scanPtr.p->nextBucketIndex & ((1 << fragrecptr.p->k) - 1); + trsbPageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex); rsbPageidptr.i = cnfPageidptr.i; rsbPageidptr.p = cnfPageidptr.p; releaseScanBucket(signal); @@ -6914,6 +7103,7 @@ void Dbacc::initScanOpRec(Signal* signal operationRecPtr.p->localdata[0] = Tkey1; operationRecPtr.p->localdata[1] = isoPageptr.p->word32[tisoLocalPtr]; } + operationRecPtr.p->hashValue.clear(); operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength; operationRecPtr.p->xfrmtupkeylen = 0; // not used }//Dbacc::initScanOpRec() @@ -7287,7 +7477,7 @@ void Dbacc::setlock(Signal* signal) arrGuard(tslElementptr, 2048); tselTmp1 = slPageidptr.p->word32[tslElementptr]; operationRecPtr.p->scanBits = ElementHeader::getScanBits(tselTmp1); - operationRecPtr.p->hashvaluePart = ElementHeader::getHashValuePart(tselTmp1); + operationRecPtr.p->reducedHashValue = ElementHeader::getReducedHashValue(tselTmp1); tselTmp1 = ElementHeader::setLocked(operationRecPtr.i); dbgWord32(slPageidptr, tslElementptr, tselTmp1); @@ -8188,10 +8378,9 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal infoEvent("elementIsforward=%d, elementPage=%d, elementPointer=%d ", tmpOpPtr.p->elementIsforward, tmpOpPtr.p->elementPage, tmpOpPtr.p->elementPointer); - infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ", - tmpOpPtr.p->fid, tmpOpPtr.p->fragptr, - tmpOpPtr.p->hashvaluePart); - infoEvent("hashValue=%d", tmpOpPtr.p->hashValue); + infoEvent("fid=%d, fragptr=%d ", + tmpOpPtr.p->fid, tmpOpPtr.p->fragptr); + infoEvent("hashValue=%d", tmpOpPtr.p->hashValue.pack()); infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ", tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp, tmpOpPtr.p->nextParallelQue); @@ -8202,8 +8391,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue); infoEvent("prevSerialQue=%d, scanRecPtr=%d", tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr); - infoEvent("m_op_bits=0x%x, scanBits=%d ", - tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits); + infoEvent("m_op_bits=0x%x, scanBits=%d, reducedHashValue=%x ", + tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits, tmpOpPtr.p->reducedHashValue.pack()); return; } @@ -8405,17 +8594,16 @@ Dbacc::execNODE_STATE_REP(Signal* signal void Dbacc::debug_lh_vars(const char* where) { - Uint32 b = fragrecptr.p->maxp + fragrecptr.p->p; - Uint32 di = b >> fragrecptr.p->k; + Uint32 b = fragrecptr.p->level.getTop(); + Uint32 di = fragrecptr.p->getPageNumber(b); Uint32 ri = di >> 8; ndbout << "DBACC: " << where << ":" << " frag:" << fragrecptr.p->myTableId << "/" << fragrecptr.p->myfid - << " slack:" << (Int32)fragrecptr.p->slack + << " slack:" << fragrecptr.p->slack << "/" << fragrecptr.p->slackCheck - << " maxp:" << fragrecptr.p->maxp - << " p:" << fragrecptr.p->p + << " top:" << fragrecptr.p->level.getTop() << " di:" << di << " ri:" << ri << " full:" << fragrecptr.p->dirRangeFull === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-09-18 14:36:25 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-09-28 14:18:32 +0000 @@ -539,10 +539,6 @@ void Dblqh::execCONTINUEB(Signal* signal { jam(); c_lcp_complete_fragments.getPtr(fragptr); - signal->theData[0] = fragptr.p->tabRef; - signal->theData[1] = fragptr.p->fragId; - BlockReference accRef = calcInstanceBlockRef(DBACC); - sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB); Ptr save = fragptr; c_lcp_complete_fragments.next(fragptr); @@ -12290,19 +12286,6 @@ Dblqh::execPREPARE_COPY_FRAG_REQ(Signal* /** * */ - if (cstartType == NodeState::ST_SYSTEM_RESTART) - { - jam(); - signal->theData[0] = fragptr.p->tabRef; - signal->theData[1] = fragptr.p->fragId; - BlockReference accRef = calcInstanceBlockRef(DBACC); - sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB); - } - - - /** - * - */ fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED; fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION; fragptr.p->logFlag = Fragrecord::STATE_FALSE; @@ -17093,10 +17076,6 @@ void Dblqh::execSTART_FRAGREQ(Signal* si */ c_lcp_complete_fragments.add(fragptr); - signal->theData[0] = tabptr.i; - signal->theData[1] = fragId; - BlockReference accRef = calcInstanceBlockRef(DBACC); - sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB); c_tup->disk_restart_lcp_id(tabptr.i, fragId, RNIL); jamEntry(); return; @@ -17249,18 +17228,9 @@ void Dblqh::execRESTORE_LCP_CONF(Signal* c_lcp_restoring_fragments.remove(fragptr); c_lcp_complete_fragments.add(fragptr); - /** - * Disable expand check in ACC - * before running REDO - */ tabptr.i = fragptr.p->tabRef; ptrCheckGuard(tabptr, ctabrecFileSize, tablerec); - signal->theData[0] = fragptr.p->tabRef; - signal->theData[1] = fragptr.p->fragId; - BlockReference accRef = calcInstanceBlockRef(DBACC); - sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB); - if (!c_lcp_waiting_fragments.isEmpty()) { send_restore_lcp(signal); === modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt' --- a/storage/ndb/src/kernel/vm/CMakeLists.txt 2012-03-21 12:31:02 +0000 +++ b/storage/ndb/src/kernel/vm/CMakeLists.txt 2012-09-28 14:18:32 +0000 @@ -72,7 +72,7 @@ SET_TARGET_PROPERTIES(mt_thr_config-t PROPERTIES COMPILE_FLAGS "-DTEST_MT_THR_CONFIG") TARGET_LINK_LIBRARIES(mt_thr_config-t ndbgeneral) -FOREACH(testprog CountingPool DynArr256) +FOREACH(testprog CountingPool DynArr256 LHLevel) ADD_EXECUTABLE(${testprog}-t ${testprog}.cpp) SET_TARGET_PROPERTIES(${testprog}-t PROPERTIES COMPILE_FLAGS "-DTAP_TEST") TARGET_LINK_LIBRARIES(${testprog}-t ndbtest ndbkernel ndbsched ndberror === added file 'storage/ndb/src/kernel/vm/LHLevel.cpp' --- a/storage/ndb/src/kernel/vm/LHLevel.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/LHLevel.cpp 2012-09-28 13:10:15 +0000 @@ -0,0 +1,277 @@ +/* + Copyright (c) 2012 Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifdef TAP_TEST + +#include +#include + +#include +#include + +#include "md5_hash.hpp" +#include "random.h" +#include "LHLevel.hpp" + +#ifndef UINT32_MAX +#define UINT32_MAX (4294967295U) +#endif + +#define BUCKSIZE 3 + +struct elem +{ + Uint32 val; + Uint16 head; + static LHBits32 hash(Uint32 val) + { + return LHBits32(md5_hash((Uint64*)&val, 1)); + } +}; + +void expand(LHLevel& lh, elem(*arr)[BUCKSIZE]); +bool shrink(LHLevel& lh, elem(*arr)[BUCKSIZE]); +bool insert_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 v); +bool delete_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 w); +int count_elem(LHLevel& lh, elem(*arr)[BUCKSIZE]); + +Uint64 c_inserts = 0; +Uint64 c_expands = 0; +Uint64 c_shrinks = 0; +Uint64 c_deletes = 0; +Uint64 c_moved = 0; +Uint64 c_rehashed = 0; + +int main(int argc, char *argv[]) +{ + unsigned int nelem = argc > 1 ? atoi(argv[1]) : 1000000; + plan(4); + elem(*arr)[BUCKSIZE] = new elem[nelem][BUCKSIZE]; + bzero(arr, nelem * sizeof(elem[BUCKSIZE])); + LHLevel lh; + lh.clear(); + expand(lh, arr); + Uint32 v = 0; + myRandom48Init(nelem); + for (int lap = 1; lap <= 2; lap++) + { + // Fill up table, with occasionally shrink + for (;v < UINT32_MAX;v++) + { + if (lh.getSize() * (BUCKSIZE - 1) < c_inserts - c_deletes) + { + if (!lh.isFull() && lh.getSize() < nelem) + expand(lh, arr); + else + break; /* Filled up */ + } + insert_elem(lh, arr, v); + if (rand() % 100 == 0) + shrink(lh, arr); + } + + // First lap, shrink to half + // Second lap, delete all + Uint32 lim = lh.getSize(); + lim = lap * lim / 2; + while (v > 0 && lim > 0) + { + if (lh.isEmpty()) break; + Uint32 w = (Uint32)myRandom48(v + 1); + delete_elem(lh, arr, w); + delete_elem(lh, arr, v); + v--; + if (lh.getSize() * BUCKSIZE * 3 > c_inserts - c_deletes) + if (shrink(lh, arr)) + lim--; + } + + // Check table consistency + if (lap == 1) + { + int n = count_elem(lh, arr); + ok((n >= 0), "all element hash values match stored hash value and bucket address"); + if (n < 0) n = -n; + ok((c_inserts == c_deletes + n), + "scanned element count (%u) matches difference between inserts (%llu) and deletes (%llu)", + n, c_inserts, c_deletes); + } + } + ok((c_inserts == c_deletes), "inserts (%llu) equals deletes (%llu)", c_inserts, c_deletes); + ok((c_expands == c_shrinks), "expands (%llu) equals shrinks (%llu)", c_expands, c_shrinks); + return exit_status(); +} + +bool delete_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 w) +{ + LHBits32 hash(elem::hash(w)); + Uint32 addr = lh.getBucketNumber(hash); + int i; + bool found = false; + for (i = 0; i < BUCKSIZE && (arr[addr][i].head != 0); i++) + { + if (arr[addr][i].val == w) + { + found = true; + break; + } + } + if (found) + { + assert(arr[addr][i].head > 0); + int j; + c_deletes += arr[addr][i].head; + for (j = i + 1; j < BUCKSIZE; j++, i++) + arr[addr][i] = arr[addr][j]; + bzero(&arr[addr][i], sizeof(arr[addr][i])); + return true; + } + else if (i < BUCKSIZE) + { + assert(arr[addr][i].head == 0); + } + return false; +} + +bool shrink(LHLevel& lh, elem(*arr)[BUCKSIZE]) +{ + assert(!lh.isEmpty()); + Uint32 from; + Uint32 to; + if (!lh.getMergeBuckets(from, to)) + { + int c = 0; + for (int i = 0; i < BUCKSIZE && arr[from][i].head != 0; i++) + c++; + // Only shrink if the only bucket is empty + if (c==0) + { + c_shrinks++; + lh.shrink(); + return true; + } + return false; + } + assert(to < from); + int i, j; + int c = 0; + for (i = 0; i < BUCKSIZE && arr[to][i].head != 0; i++) + c++; + for (j = 0; j < BUCKSIZE && arr[from][j].head != 0; j++) + c++; + // Only shrink if both buckets element can fit in one bucket + if (c <= BUCKSIZE) + { + for (j = 0; j < BUCKSIZE && arr[from][j].head != 0; j++) + { + assert(i 0 ? -elements : elements; +} + +#endif === added file 'storage/ndb/src/kernel/vm/LHLevel.hpp' --- a/storage/ndb/src/kernel/vm/LHLevel.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/LHLevel.hpp 2012-09-28 13:11:17 +0000 @@ -0,0 +1,456 @@ +/* + Copyright (c) 2012 Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef NDB_LHLEVEL_H +#define NDB_LHLEVEL_H + +/** + * LHLevel keeps level information for linear hashing, + **/ + +/** + * Supports up to UINT32_MAX number of bucket addresses. + * If support for more is needed, one also have to + * increase hash key size. + **/ + +#include +#include "Bitmask.hpp" + +template class LHBits +{ +public: + explicit LHBits(); + explicit LHBits(Int bits); + template LHBits(LHBits const& bits); + ~LHBits(); + LHBits& operator=(LHBits const&); + + void clear(); +static LHBits unpack(Int packed); + Int pack() const; + bool match(LHBits other) const; + void shift_out(); + void shift_out(Uint8 bits); + void shift_in(bool bit); + void shift_in(Uint8 bits, Int value); + Uint8 valid_bits() const; + bool valid_bits(Int bits) const; + bool valid_bit(Int bit) const; + Int get_bits(Int bits) const; + Int get_bit(Int bit) const; +private: + Int highbit() const; + Int m_bits; +}; + +typedef LHBits LHBits16; +typedef LHBits LHBits32; + +class LHLevel +{ +public: + explicit LHLevel(); + explicit LHLevel(Uint32 size); + ~LHLevel() {} +private: + LHLevel(LHLevel const&); // Not to be implemented + LHLevel& operator=(LHLevel const&); // Not to be implemented +public: + void clear(); + bool isEmpty() const; + bool isFull() const; + Uint32 getSize() const; + void setSize(Uint32 size); + Uint32 getTop() const; +public: + Uint32 getBucketNumber(LHBits32 hash_value) const; + bool getSplitBucket(Uint32& from, Uint32& to) const; // true if data move needed + bool shouldMoveBeforeExpand(LHBits32 hash_value) const; + void expand(); + bool getMergeBuckets(Uint32& from, Uint32& to) const; // true if data move needed + void shrink(); +private: + enum { + ADDR_MAX = 0xFFFFFFFEU, + MAX_SIZE = 0xFFFFFFFFU, + MAXP_EMPTY = 0xFFFFFFFFU, + }; +protected: + Uint32 max_size() const; + Uint8 hashcheckbit() const { return m_hashcheckbit; } + Uint32 maxp() const { return m_maxp; } + Uint32 p() const { return m_p; } +private: + Uint32 m_maxp; + Uint32 m_p; + Uint8 m_hashcheckbit; +}; + +class LocalLHLevel : public LHLevel +{ +public: + explicit LocalLHLevel(Uint32& size): LHLevel(size), m_src(size) {} + ~LocalLHLevel() { m_src = getSize(); } +private: + LocalLHLevel(const LocalLHLevel&); // Not to be implemented + LocalLHLevel& operator=(const LocalLHLevel&); // Not to be implemented + Uint32& m_src; +}; + +/** + * LHLevelRH is LHLevel extended with support for + * a reduced hash value suitable to store with + * element in hash table. + */ + +class LHLevelRH: public LHLevel +{ +public: + explicit LHLevelRH(): LHLevel() {} + explicit LHLevelRH(Uint32 const& size): LHLevel(size) {} + ~LHLevelRH() {} +private: + LHLevelRH(LHLevelRH const&); // Not to be implemented + LHLevelRH& operator=(LHLevelRH const&); // Not to be implemented +public: + LHBits16 reduce(LHBits32 hash_value) const; + Uint8 getNeededValidBits(Uint8 bits) const; + LHBits32 enlarge(LHBits16 reduced_hash_value, Uint32 bucket_number) const; +}; + +/** + * Implementation LHBits<> + **/ + +template inline LHBits::LHBits() +: m_bits(1) +{ +} + +template inline LHBits::LHBits(Int bits) +: m_bits(bits | highbit()) +{ +} + +template template inline LHBits::LHBits(LHBits const& bits) +: m_bits(bits.pack()) +{ + if (m_bits != bits.pack()) + m_bits |= highbit(); +} + +template inline LHBits::~LHBits() +{ +} + +template inline LHBits& LHBits::operator=(LHBits const& src) +{ + m_bits = src.m_bits; + return *this; +} + +template inline void LHBits::clear() +{ + m_bits = 1; +} + +template inline LHBits LHBits::unpack(Int packed) +{ + LHBits x; + x.m_bits = packed; + return x; +} + +template inline Int LHBits::pack() const +{ + return m_bits; +} + +template inline bool LHBits::match(LHBits other) const +{ + assert(sizeof(Int) <= sizeof(Uint32)); + assert (m_bits != 0) ; + assert (other.m_bits != 0); + assert(sizeof(Int) <= sizeof(Uint32) && + (m_bits != 0) && + (other.m_bits != 0)); + // Warning: dont reduce to one shift below. + // << is only defined for right values < 32. + // and since m_bits != 0, clz(MIN(...)) is guaranteed <= 31 + return ((Uint32(m_bits ^ other.m_bits) << BitmaskImpl::clz(MIN(m_bits, other.m_bits))) << 1) == 0; +} + +template inline void LHBits::shift_out() +{ + m_bits >>= 1; + if (m_bits == 0) + m_bits++; +} + +template inline void LHBits::shift_out(Uint8 bits) +{ + assert(bits < 8 * sizeof(m_bits)); + m_bits >>= bits; + if (m_bits == 0) + m_bits++; +} + +template inline void LHBits::shift_in(bool bit) +{ + if (m_bits >= highbit()) + m_bits |= (highbit() >> 1); + m_bits = (m_bits << 1) | (bit ? 1 : 0); +} + +template inline void LHBits::shift_in(Uint8 bits, Int value) +{ + assert(m_bits != 0); + assert(bits < 8 * sizeof(m_bits)); + assert(value < (Int(1) << bits)); + if (bits == 0) + return; + if (m_bits >= (highbit() >> (bits - 1))) + m_bits = highbit() | (m_bits << bits) | value; + else + m_bits = (m_bits << bits) | value; +} + +template inline Uint8 LHBits::valid_bits() const +{ + assert(m_bits != 0); + return BitmaskImpl::fls(m_bits); +} + +template inline bool LHBits::valid_bits(Int bits) const +{ + // Only allow bits to be on the form 2^n-1 + assert((m_bits != 0) && + (((bits + 1U) | bits) == (bits << 1U) + 1U)); // bits is 0..01..1 + return m_bits > bits; +} + +template inline bool LHBits::valid_bit(Int bit) const +{ + // Only allow bit to be on the form 2^n + assert((m_bits != 0) && + ((((bit - 1U) | bit) >> 1U) == bit - 1U)); // bits is 0..010..0 + return (m_bits >> 1U) >= bit; +} + +template inline Int LHBits::get_bits(Int bits) const +{ + assert(valid_bits(bits)); + return m_bits & bits; +} + +template inline Int LHBits::get_bit(Int bit) const +{ + assert(valid_bit(bit)); + return m_bits & bit; +} + +template inline Int LHBits::highbit() const +{ + return 1 << (sizeof(Int) * 8 - 1); +} + +/** + * Implementaion LHLevel + **/ + +inline LHLevel::LHLevel() +{ + setSize(0); +} + +inline LHLevel::LHLevel(Uint32 _size) +{ + setSize(_size); +} + +inline void LHLevel::clear() +{ + m_maxp = MAXP_EMPTY; + m_p = 0; +} + +inline bool LHLevel::isEmpty() const +{ + return maxp() == MAXP_EMPTY; +} + +inline bool LHLevel::isFull() const +{ + return !isEmpty() && (getTop() == ADDR_MAX); +} + +inline Uint32 LHLevel::max_size() const +{ + return MAX_SIZE; +} + +inline Uint32 LHLevel::getSize() const +{ + assert(!isEmpty() || (maxp() + 1 + p() == 0)); + return maxp() + 1 + p(); +} + +inline void LHLevel::setSize(Uint32 size) +{ + assert(size <= max_size()); + if (size == 0) + clear(); + else + { + m_hashcheckbit = BitmaskImpl::fls(size); + m_maxp = (1 << hashcheckbit()) - 1; + m_p = size - 1 - maxp(); + } +} + +inline Uint32 LHLevel::getTop() const +{ + assert(!isEmpty()); + return maxp() + p(); +} + +inline Uint32 LHLevel::getBucketNumber(LHBits32 hash_value) const +{ + assert(!isEmpty()); + Uint32 addr = hash_value.get_bits(maxp()); + if (addr < p()) + { + addr |= hash_value.get_bit(maxp() + 1); + } + return addr; +} + +inline bool LHLevel::getSplitBucket(Uint32& from, Uint32& to) const +{ + assert(!isFull()); + + from = p(); + to = getSize(); // == getTop() + 1 + + // true if data move needed, that is, it was not empty + return to > 0; +} + +inline void LHLevel::expand() +{ + assert(!isFull()); + + if (isEmpty()) + { + m_p = 0; + m_hashcheckbit = 0; + m_maxp = 0; + } + else if (p() == maxp()) + { + m_maxp = (maxp() << 1) | 1; + m_hashcheckbit ++; + m_p = 0; + } + else + { + m_p ++; + } +} + +inline bool LHLevel::shouldMoveBeforeExpand(LHBits32 hash_value) const +{ + return hash_value.get_bit(1 << hashcheckbit()); +} + +inline bool LHLevel::getMergeBuckets(Uint32& from, Uint32& to) const +{ + assert(!isEmpty()); + + from = getTop(); + + if (likely(p() != 0)) + { + to = p() - 1; + } + else + { + to = maxp() >> 1; + } + + // true if data move needed, that is, all buckets disappeared + return from > 0; +} + +inline void LHLevel::shrink() +{ + assert(!isEmpty()); + + if (p() != 0) + { + m_p --; + } + else if (maxp() == 0) + { + m_maxp --; // == MAXP_EMPTY + } + else + { + m_maxp >>= 1; + m_hashcheckbit --; + m_p = m_maxp; + } +} + +/** + * Implementation LHLevelRH + **/ + +inline LHBits16 LHLevelRH::reduce(LHBits32 hash_value) const +{ + assert(!isEmpty()); + + if (!hash_value.valid_bits(maxp())) + return LHBits16(); + + Uint32 addr = hash_value.get_bits(maxp()); + LHBits32 hv(hash_value); + hv.shift_out(hashcheckbit()); + if (addr < p()) + hv.shift_out(); + return LHBits16(hv); +} + +inline Uint8 LHLevelRH::getNeededValidBits(Uint8 bits) const +{ + Uint8 const usable_bits_in_hash_value = 4 * sizeof(LHBits32) - 1; // == 31 + return MIN(bits, usable_bits_in_hash_value - hashcheckbit()); +} + +inline LHBits32 LHLevelRH::enlarge(LHBits16 reduced_hash_value, Uint32 bucket_number) const +{ + assert(!isEmpty()); + + Uint32 addr = bucket_number & maxp(); + LHBits32 hv(reduced_hash_value); + Uint8 addr_bits = hashcheckbit() + ((addr < p()) ? 1 : 0); + hv.shift_in(addr_bits, bucket_number); + return hv; +} + +#endif === modified file 'storage/ndb/src/ndbjtie/jtie/jtie_tconv_array_impl.hpp' --- a/storage/ndb/src/ndbjtie/jtie/jtie_tconv_array_impl.hpp 2011-02-02 09:52:33 +0000 +++ b/storage/ndb/src/ndbjtie/jtie/jtie_tconv_array_impl.hpp 2012-09-26 05:13:59 +0000 @@ -359,17 +359,17 @@ struct ObjectArrayConvImpl { private: // Returns a new Java Object array with all elements initialized to null; // in case of a NULL return, an exception is pending. - J * + static J * newJavaObjectArray(jclass cls, jsize n, JNIEnv * env); // Copies objects referred by a Java Object array over a C object array; // a non-zero return value indicates failure with an exception pending. - cstatus + static cstatus copyToCObjectArray(C * c, jobjectArray j, jsize n, JNIEnv * env); // Initializes a Java Object array with references from a C object array; // a non-zero return value indicates failure with an exception pending. - cstatus + static cstatus copyToJavaObjectArray(jobjectArray j, C * c, jsize n, JNIEnv * env); }; No bundle (reason: useless for push emails).