4992 Mauritz Sundell 2012-09-28 [merge]
ndb - removes the ~45Mrow limit per partition
Bug #13844405 - FRAGMENTS ARE LIMITED TO ~45M ROWS. (GOT ERROR 633)
Bug #14000373 SILENT DATA INCONSISTENCY WITH BIG FRAGMENTS (MORE THAN ~45MROWS)
There have been a limit on the number of rows in a partition due to the
limitations in the implementation of hash index used for primary key/unique key
hash index, and the primary key hash index is mandatory for a table.
added:
mysql-test/suite/ndb_big/bug14000373.cnf
mysql-test/suite/ndb_big/bug14000373.result
mysql-test/suite/ndb_big/bug14000373.test
storage/ndb/src/kernel/vm/LHLevel.cpp
storage/ndb/src/kernel/vm/LHLevel.hpp
modified:
storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/vm/CMakeLists.txt
storage/ndb/src/kernel/vm/Makefile.am
4991 Ole John Aske 2012-09-25
Fix for bug#14550056 BUSY WAIT IN DBTC::REMOVEMARKERFORFAILEDAPI IF CLIENT DISCONNECT WITH OPEN TXNS
In case Dbtc::removeMarkerForFailedAPI() waits for open transactions to be terminated,
it should do that with a 'sendSignalWithDelay(..., GSN_CONTINUEB, 1ms, ...)', instead of
a plain 'sendSignal()' which will cause a busy loop.
modified:
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
=== added file 'mysql-test/suite/ndb_big/bug14000373.cnf'
--- a/mysql-test/suite/ndb_big/bug14000373.cnf 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_big/bug14000373.cnf 2012-09-28 12:58:59 +0000
@@ -0,0 +1,5 @@
+!include suite/ndb/my.cnf
+
+[cluster_config.1]
+DataMemory=300M
+IndexMemory=700M
=== added file 'mysql-test/suite/ndb_big/bug14000373.result'
--- a/mysql-test/suite/ndb_big/bug14000373.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_big/bug14000373.result 2012-09-28 12:58:59 +0000
@@ -0,0 +1,21 @@
+set max_heap_table_size = 286720000;
+create table t1 (a int key) engine=memory;
+load data local infile 'suite/ndb/data/table_data10000.dat' into table t1 columns terminated by ' ' (a, @col2);
+insert into t1 select a + 10000 from t1;;
+insert into t1 select a + 10000 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 from t1;;
+select count(*) from t1;
+count(*)
+5120000
+alter table t1 engine=ndbcluster comment='NDB_TABLE=NOLOGGING' partition by key() partitions 1;
+alter table t1 engine=memory;
+select count(*) from t1;
+count(*)
+5120000
+drop table t1;
=== added file 'mysql-test/suite/ndb_big/bug14000373.test'
--- a/mysql-test/suite/ndb_big/bug14000373.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_big/bug14000373.test 2012-09-28 12:58:59 +0000
@@ -0,0 +1,25 @@
+-- source include/have_ndb.inc
+
+# Test is using error insert, check that binaries support it
+-- source suite/ndb/t/have_ndb_error_insert.inc
+
+# Use small LoadFactors to force sparse hash table
+--exec $NDB_MGM --no-defaults --ndb-connectstring="$NDB_CONNECTSTRING" -e "all error 3003" >> $NDB_TOOLS_OUTPUT
+
+set max_heap_table_size = 286720000;
+create table t1 (a int key) engine=memory;
+load data local infile 'suite/ndb/data/table_data10000.dat' into table t1 columns terminated by ' ' (a, @col2);
+let $i = 9;
+let $b = 10000;
+while ($i)
+{
+--eval insert into t1 select a + $b from t1;
+ let $b = $b * 2;
+ dec $i;
+}
+select count(*) from t1;
+alter table t1 engine=ndbcluster comment='NDB_TABLE=NOLOGGING' partition by key() partitions 1;
+--exec $NDB_MGM --no-defaults --ndb-connectstring="$NDB_CONNECTSTRING" -e "all report memory" >> $NDB_TOOLS_OUTPUT
+alter table t1 engine=memory;
+select count(*) from t1;
+drop table t1;
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2012-03-13 11:57:27 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2012-09-28 13:15:29 +0000
@@ -26,6 +26,7 @@
#include <pc.hpp>
#include <DynArr256.hpp>
#include <SimulatedBlock.hpp>
+#include <LHLevel.hpp>
#ifdef DBACC_C
// Debug Macros
@@ -199,7 +200,7 @@ class ElementHeader {
*
* l = Locked -- If true contains operation else scan bits + hash value
* s = Scan bits
- * h = Hash value
+ * h = Reduced hash value. The lower bits used for address is shifted away
* o = Operation ptr I
*
* 1111111111222222222233
@@ -208,17 +209,16 @@ class ElementHeader {
* ooooooooooooooooooooooooooooooo
*/
public:
- STATIC_CONST( HASH_VALUE_PART_MASK = 0xFFFF );
-
static bool getLocked(Uint32 data);
static bool getUnlocked(Uint32 data);
static Uint32 getScanBits(Uint32 data);
- static Uint32 getHashValuePart(Uint32 data);
static Uint32 getOpPtrI(Uint32 data);
+ static LHBits16 getReducedHashValue(Uint32 data);
static Uint32 setLocked(Uint32 opPtrI);
- static Uint32 setUnlocked(Uint32 hashValuePart, Uint32 scanBits);
+ static Uint32 setUnlocked(Uint32 scanBits, LHBits16 const& reducedHashValue);
static Uint32 setScanBit(Uint32 header, Uint32 scanBit);
+ static Uint32 setReducedHashValue(Uint32 header, LHBits16 const& reducedHashValue);
static Uint32 clearScanBit(Uint32 header, Uint32 scanBit);
};
@@ -241,11 +241,11 @@ ElementHeader::getScanBits(Uint32 data){
return (data >> 1) & ((1 << MAX_PARALLEL_SCANS_PER_FRAG) - 1);
}
-inline
-Uint32
-ElementHeader::getHashValuePart(Uint32 data){
+inline
+LHBits16
+ElementHeader::getReducedHashValue(Uint32 data){
assert(getUnlocked(data));
- return data >> 16;
+ return LHBits16::unpack(data >> 16);
}
inline
@@ -258,12 +258,15 @@ ElementHeader::getOpPtrI(Uint32 data){
inline
Uint32
ElementHeader::setLocked(Uint32 opPtrI){
+ assert(opPtrI < 0x8000000);
return (opPtrI << 1) + 0;
}
inline
Uint32
-ElementHeader::setUnlocked(Uint32 hashValue, Uint32 scanBits){
- return (hashValue << 16) + (scanBits << 1) + 1;
+ElementHeader::setUnlocked(Uint32 scanBits, LHBits16 const& reducedHashValue)
+{
+ assert(scanBits < (1 << MAX_PARALLEL_SCANS_PER_FRAG));
+ return (Uint32(reducedHashValue.pack()) << 16) | (scanBits << 1) | 1;
}
inline
@@ -280,6 +283,13 @@ ElementHeader::clearScanBit(Uint32 heade
return header & (~(scanBit << 1));
}
+inline
+Uint32
+ElementHeader::setReducedHashValue(Uint32 header, LHBits16 const& reducedHashValue)
+{
+ assert(getUnlocked(header));
+ return (Uint32(reducedHashValue.pack()) << 16) | (header & 0xffff);
+}
class Dbacc: public SimulatedBlock {
friend class DbaccProxy;
@@ -401,14 +411,15 @@ struct Fragmentrec {
// slackCheck When slack goes over this value it is time to expand.
// slackCheck = (maxp + p + 1)*(maxloadfactor - minloadfactor) or
// bucketSize * hysteresis
+// Since at most RNIL 8KiB-pages can be used for a fragment, the extreme values
+// for slack will be within -2^43 and +2^43 words.
//-----------------------------------------------------------------------------
+ LHLevelRH level;
Uint32 localkeylen;
- Uint32 maxp;
Uint32 maxloadfactor;
Uint32 minloadfactor;
- Uint32 p;
- Uint32 slack;
- Uint32 slackCheck;
+ Int64 slack;
+ Int64 slackCheck;
//-----------------------------------------------------------------------------
// nextfreefrag is the next free fragment if linked into a free list
@@ -441,19 +452,17 @@ struct Fragmentrec {
Uint16 keyLength;
//-----------------------------------------------------------------------------
-// This flag is used to avoid sending a big number of expand or shrink signals
-// when simultaneously committing many inserts or deletes.
+// Only allow one expand or shrink signal in queue at the time.
//-----------------------------------------------------------------------------
- Uint8 expandFlag;
+ bool expandOrShrinkQueued;
//-----------------------------------------------------------------------------
// hashcheckbit is the bit to check whether to send element to split bucket or not
// k (== 6) is the number of buckets per page
-// lhfragbits is the number of bits used to calculate the fragment id
//-----------------------------------------------------------------------------
- Uint8 hashcheckbit;
- Uint8 k;
- Uint8 lhfragbits;
+ STATIC_CONST( k = 6 );
+ STATIC_CONST( MIN_HASH_COMPARE_BITS = 7 );
+ STATIC_CONST( MAX_HASH_VALUE_BITS = 31 );
//-----------------------------------------------------------------------------
// nodetype can only be STORED in this release. Is currently only set, never read
@@ -469,6 +478,11 @@ struct Fragmentrec {
// flag to mark that execEXPANDCHECK2 has failed due to DirRange full
//-----------------------------------------------------------------------------
Uint8 dirRangeFull;
+
+public:
+ Uint32 getPageNumber(Uint32 bucket_number) const;
+ Uint32 getPageIndex(Uint32 bucket_number) const;
+ bool enough_valid_bits(LHBits16 const& reduced_hash_value) const;
};
typedef Ptr<Fragmentrec> FragmentrecPtr;
@@ -484,8 +498,7 @@ struct Operationrec {
Uint32 elementPointer;
Uint32 fid;
Uint32 fragptr;
- Uint32 hashvaluePart;
- Uint32 hashValue;
+ LHBits32 hashValue;
Uint32 nextLockOwnerOp;
Uint32 nextOp;
Uint32 nextParallelQue;
@@ -511,7 +524,8 @@ struct Operationrec {
Uint16 tupkeylen;
Uint32 xfrmtupkeylen;
Uint32 userblockref;
- Uint32 scanBits;
+ Uint16 scanBits;
+ LHBits16 reducedHashValue;
enum OpBits {
OP_MASK = 0x0000F // 4 bits for operation type
@@ -692,8 +706,8 @@ private:
void releaseDirIndexResources(Signal* signal, FragmentrecPtr regFragPtr);
void releaseFragRecord(Signal* signal, FragmentrecPtr regFragPtr);
void initScanFragmentPart(Signal* signal);
- Uint32 checkScanExpand(Signal* signal);
- Uint32 checkScanShrink(Signal* signal);
+ Uint32 checkScanExpand(Signal* signal, Uint32 splitBucket);
+ Uint32 checkScanShrink(Signal* signal, Uint32 sourceBucket, Uint32 destBucket);
void initialiseFragRec(Signal* signal);
void initialiseFsConnectionRec(Signal* signal);
void initialiseFsOpRec(Signal* signal);
@@ -761,6 +775,10 @@ private:
void seizeRightlist(Signal* signal);
Uint32 readTablePk(Uint32 lkey1, Uint32 lkey2, Uint32 eh, OperationrecPtr);
Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
+ LHBits32 getElementHash(OperationrecPtr& oprec);
+ LHBits32 getElementHash(Uint32 const* element, Int32 forward);
+ LHBits32 getElementHash(Uint32 const* element, Int32 forward, OperationrecPtr& oprec);
+ void shrink_adjust_reduced_hash_value(Uint32 bucket_number);
Uint32 getPagePtr(DynArr256::Head&, Uint32);
bool setPagePtr(DynArr256::Head& directory, Uint32 index, Uint32 ptri);
Uint32 unsetPagePtr(DynArr256::Head& directory, Uint32 index);
@@ -837,8 +855,6 @@ private:
void zpagesize_error(const char* where);
- void reenable_expand_after_redo_log_exection_complete(Signal*);
-
// charsets
void xfrmKeyData(Signal* signal);
@@ -1000,7 +1016,6 @@ private:
Uint32 tgeContainerptr;
Uint32 tgeElementptr;
Uint32 tgeForward;
- Uint32 texpReceivedBucket;
Uint32 texpDirInd;
Uint32 texpDirRangeIndex;
Uint32 texpDirPageIndex;
@@ -1034,7 +1049,6 @@ private:
Uint32 tmp;
Uint32 tmpP;
Uint32 tmpP2;
- Uint32 tmp1;
Uint32 tmp2;
Uint32 tgflPageindex;
Uint32 tmpindex;
@@ -1094,4 +1108,23 @@ private:
Uint32 c_memusage_report_frequency;
};
+inline Uint32 Dbacc::Fragmentrec::getPageNumber(Uint32 bucket_number) const
+{
+ assert(bucket_number < RNIL);
+ return bucket_number >> k;
+}
+
+inline Uint32 Dbacc::Fragmentrec::getPageIndex(Uint32 bucket_number) const
+{
+ assert(bucket_number < RNIL);
+ return bucket_number & ((1 << k) - 1);
+}
+
+inline bool Dbacc::Fragmentrec::enough_valid_bits(LHBits16 const& reduced_hash_value) const
+{
+ // Forte C 5.0 needs use of intermediate constant
+ int const bits = MIN_HASH_COMPARE_BITS;
+ return level.getNeededValidBits(bits) <= reduced_hash_value.valid_bits();
+}
+
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2012-04-26 10:33:16 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2012-09-28 13:15:29 +0000
@@ -34,6 +34,7 @@
#include <signaldata/TransIdAI.hpp>
#include <KeyDescriptor.hpp>
#include <signaldata/NodeStateSignalData.hpp>
+#include <md5_hash.hpp>
#ifdef VM_TRACE
#define DEBUG(x) ndbout << "DBACC: "<< x << endl;
@@ -902,7 +903,7 @@ void Dbacc::initOpRec(Signal* signal)
Treqinfo = signal->theData[2];
- operationRecPtr.p->hashValue = signal->theData[3];
+ operationRecPtr.p->hashValue = LHBits32(signal->theData[3]);
operationRecPtr.p->tupkeylen = signal->theData[4];
operationRecPtr.p->xfrmtupkeylen = signal->theData[4];
operationRecPtr.p->transId1 = signal->theData[5];
@@ -1071,7 +1072,7 @@ void Dbacc::execACCKEYREQ(Signal* signal
/*---------------------------------------------------------------*/
Uint32 eh = gePageptr.p->word32[tgeElementptr];
operationRecPtr.p->scanBits = ElementHeader::getScanBits(eh);
- operationRecPtr.p->hashvaluePart = ElementHeader::getHashValuePart(eh);
+ operationRecPtr.p->reducedHashValue = ElementHeader::getReducedHashValue(eh);
operationRecPtr.p->elementPage = gePageptr.i;
operationRecPtr.p->elementContainer = tgeContainerptr;
operationRecPtr.p->elementPointer = tgeElementptr;
@@ -1533,10 +1534,8 @@ void Dbacc::insertelementLab(Signal* sig
insertLockOwnersList(signal, operationRecPtr);
- const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
- operationRecPtr.p->hashvaluePart =
- (operationRecPtr.p->hashValue >> tmp) & 0xFFFF;
operationRecPtr.p->scanBits = 0; /* NOT ANY ACTIVE SCAN */
+ operationRecPtr.p->reducedHashValue = fragrecptr.p->level.reduce(operationRecPtr.p->hashValue);
tidrElemhead = ElementHeader::setLocked(operationRecPtr.i);
idrPageptr = gdiPageptr;
tidrPageindex = tgdiPageindex;
@@ -2343,14 +2342,12 @@ void Dbacc::execACC_COMMITREQ(Signal* si
if (fragrecptr.p->slack > fragrecptr.p->slackCheck) {
/* TIME FOR JOIN BUCKETS PROCESS */
if (fragrecptr.p->expandCounter > 0) {
- if (fragrecptr.p->expandFlag < 2) {
+ if (!fragrecptr.p->expandOrShrinkQueued)
+ {
jam();
signal->theData[0] = fragrecptr.i;
- signal->theData[1] = fragrecptr.p->p;
- signal->theData[2] = fragrecptr.p->maxp;
- signal->theData[3] = fragrecptr.p->expandFlag;
- fragrecptr.p->expandFlag = 2;
- sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB);
+ fragrecptr.p->expandOrShrinkQueued = true;
+ sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 1, JBB);
}//if
}//if
}//if
@@ -2359,15 +2356,15 @@ void Dbacc::execACC_COMMITREQ(Signal* si
jam(); /* EXPAND PROCESS HANDLING */
fragrecptr.p->noOfElements++;
fragrecptr.p->slack -= fragrecptr.p->elementLength;
- if (fragrecptr.p->slack >= (1u << 31)) {
+ if (fragrecptr.p->slack < 0 && !fragrecptr.p->level.isFull())
+ {
/* IT MEANS THAT IF SLACK < ZERO */
- if (fragrecptr.p->expandFlag == 0) {
+ if (!fragrecptr.p->expandOrShrinkQueued)
+ {
jam();
- fragrecptr.p->expandFlag = 2;
signal->theData[0] = fragrecptr.i;
- signal->theData[1] = fragrecptr.p->p;
- signal->theData[2] = fragrecptr.p->maxp;
- sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
+ fragrecptr.p->expandOrShrinkQueued = true;
+ sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 1, JBB);
}//if
}//if
}
@@ -3181,21 +3178,11 @@ Uint32 Dbacc::unsetPagePtr(DynArr256::He
void Dbacc::getdirindex(Signal* signal)
{
- Uint32 tgdiTmp;
Uint32 tgdiAddress;
- tgdiTmp = fragrecptr.p->k + fragrecptr.p->lhfragbits; /* OBS K = 6 */
- tgdiPageindex = operationRecPtr.p->hashValue & ((1 << fragrecptr.p->k) - 1);
- tgdiTmp = operationRecPtr.p->hashValue >> tgdiTmp;
- tgdiTmp = (tgdiTmp << fragrecptr.p->k) | tgdiPageindex;
- tgdiAddress = tgdiTmp & fragrecptr.p->maxp;
- if (tgdiAddress < fragrecptr.p->p) {
- jam();
- tgdiAddress = tgdiTmp & ((fragrecptr.p->maxp << 1) | 1);
- }//if
- tgdiTmp = tgdiAddress >> fragrecptr.p->k;
- ndbassert(tgdiTmp <= ElementHeader::HASH_VALUE_PART_MASK);
- gdiPageptr.i = getPagePtr(fragrecptr.p->directory, tgdiTmp);
+ tgdiAddress = fragrecptr.p->level.getBucketNumber(operationRecPtr.p->hashValue);
+ tgdiPageindex = fragrecptr.p->getPageIndex(tgdiAddress);
+ gdiPageptr.i = getPagePtr(fragrecptr.p->directory, fragrecptr.p->getPageNumber(tgdiAddress));
ptrCheckGuard(gdiPageptr, cpagesize, page8);
}//Dbacc::getdirindex()
@@ -3278,6 +3265,7 @@ Dbacc::getElement(Signal* signal, Operat
register Uint32 TelemLen = fragrecptr.p->elementLength;
register Uint32* Tkeydata = (Uint32*)&signal->theData[7];
const Uint32 localkeylen = fragrecptr.p->localkeylen;
+ Uint32 bucket_number = fragrecptr.p->level.getBucketNumber(operationRecPtr.p->hashValue);
getdirindex(signal);
tgePageindex = tgdiPageindex;
@@ -3292,8 +3280,6 @@ Dbacc::getElement(Signal* signal, Operat
ndbrequire(TelemLen == ZELEM_HEAD_SIZE + localkeylen);
tgeNextptrtype = ZLEFT;
- const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
- const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF;
do {
tgeContainerptr = mul_ZBUF_SIZE(tgePageindex);
if (tgeNextptrtype == ZLEFT) {
@@ -3345,23 +3331,25 @@ Dbacc::getElement(Signal* signal, Operat
// Check if it is the element searched for.
/* ------------------------------------------------------------------- */
do {
+ bool possible_match;
tgeElementHeader = gePageptr.p->word32[tgeElementptr];
tgeRemLen = tgeRemLen - TelemLen;
- Uint32 hashValuePart;
Uint32 localkey1, localkey2;
lockOwnerPtr.i = RNIL;
lockOwnerPtr.p = NULL;
+ LHBits16 reducedHashValue;
if (ElementHeader::getLocked(tgeElementHeader)) {
jam();
lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec);
- hashValuePart = lockOwnerPtr.p->hashvaluePart;
+ possible_match = lockOwnerPtr.p->hashValue.match(operationRecPtr.p->hashValue);
+ reducedHashValue = lockOwnerPtr.p->reducedHashValue;
localkey1 = lockOwnerPtr.p->localdata[0];
localkey2 = lockOwnerPtr.p->localdata[1];
} else {
jam();
+ reducedHashValue = ElementHeader::getReducedHashValue(tgeElementHeader);
Uint32 pos = tgeElementptr + tgeForward;
- hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader);
localkey1 = gePageptr.p->word32[pos];
if (likely(localkeylen == 1))
{
@@ -3372,8 +3360,11 @@ Dbacc::getElement(Signal* signal, Operat
{
localkey2 = gePageptr.p->word32[pos + tgeForward];
}
+ possible_match = true;
}
- if (hashValuePart == opHashValuePart) {
+ if (possible_match &&
+ operationRecPtr.p->hashValue.match(fragrecptr.p->level.enlarge(reducedHashValue, bucket_number)))
+ {
jam();
bool found;
if (! searchLocalKey)
@@ -3514,8 +3505,7 @@ void Dbacc::commitdelete(Signal* signal)
// We thus update the element header to ensure we log an unlocked element. We do not
// need to restore it later since it is deleted immediately anyway.
/* --------------------------------------------------------------------------------- */
- const Uint32 hv = operationRecPtr.p->hashvaluePart;
- const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
+ const Uint32 eh = ElementHeader::setUnlocked(0, LHBits16());
delPageptr.p->word32[tdelElementptr] = eh;
if (operationRecPtr.p->elementPage == lastPageptr.i) {
if (operationRecPtr.p->elementPointer == tlastElementptr) {
@@ -3588,8 +3578,7 @@ void Dbacc::deleteElement(Signal* signal
// An undo of the delete will reinstall the moved record. We have to ensure that the
// lock is removed to ensure that no such thing happen.
/* --------------------------------------------------------------------------------- */
- Uint32 eh = ElementHeader::setUnlocked(deOperationRecPtr.p->hashvaluePart,
- 0);
+ Uint32 eh = ElementHeader::setUnlocked(0, LHBits16());
lastPageptr.p->word32[tlastElementptr] = eh;
}//if
return;
@@ -4303,8 +4292,7 @@ void Dbacc::abortOperation(Signal* signa
taboElementptr = operationRecPtr.p->elementPointer;
aboPageidptr.i = operationRecPtr.p->elementPage;
- tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
- operationRecPtr.p->scanBits);
+ tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->scanBits, operationRecPtr.p->reducedHashValue);
ptrCheckGuard(aboPageidptr, cpagesize, page8);
dbgWord32(aboPageidptr, taboElementptr, tmp2Olq);
arrGuard(taboElementptr, 2048);
@@ -4336,7 +4324,7 @@ Dbacc::commitDeleteCheck()
OperationrecPtr deleteOpPtr;
Uint32 elementDeleted = 0;
bool deleteCheckOngoing = true;
- Uint32 hashValue = 0;
+ LHBits32 hashValue;
lastOpPtr = operationRecPtr;
opPtr.i = operationRecPtr.p->nextParallelQue;
while (opPtr.i != RNIL) {
@@ -4463,8 +4451,7 @@ void Dbacc::commitOperation(Signal* sign
coPageidptr.i = operationRecPtr.p->elementPage;
tcoElementptr = operationRecPtr.p->elementPointer;
- tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
- operationRecPtr.p->scanBits);
+ tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->scanBits, operationRecPtr.p->reducedHashValue);
ptrCheckGuard(coPageidptr, cpagesize, page8);
dbgWord32(coPageidptr, tcoElementptr, tmp2Olq);
arrGuard(tcoElementptr, 2048);
@@ -4785,7 +4772,7 @@ Dbacc::release_lockowner(Signal* signal,
newOwner.p->elementPointer = opPtr.p->elementPointer;
newOwner.p->elementContainer = opPtr.p->elementContainer;
newOwner.p->scanBits = opPtr.p->scanBits;
- newOwner.p->hashvaluePart = opPtr.p->hashvaluePart;
+ newOwner.p->reducedHashValue = opPtr.p->reducedHashValue;
newOwner.p->m_op_bits |= (opbits & Operationrec::OP_ELEMENT_DISAPPEARED);
if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
{
@@ -5095,7 +5082,7 @@ void Dbacc::allocOverflowPage(Signal* si
/* BE EXPANDED ACORDING TO LH3, */
/* AND COMMIT TRANSACTION PROCESS */
/* WILL BE CONTINUED */
-Uint32 Dbacc::checkScanExpand(Signal* signal)
+Uint32 Dbacc::checkScanExpand(Signal* signal, Uint32 splitBucket)
{
Uint32 Ti;
Uint32 TreturnCode = 0;
@@ -5108,7 +5095,7 @@ Uint32 Dbacc::checkScanExpand(Signal* si
Page8Ptr TPageptr;
ScanRecPtr TscanPtr;
- TSplit = fragrecptr.p->p;
+ TSplit = splitBucket;
for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
TreleaseScanIndicator[Ti] = 0;
if (fragrecptr.p->scan[Ti] != RNIL) {
@@ -5162,8 +5149,8 @@ Uint32 Dbacc::checkScanExpand(Signal* si
}//for
if (TreleaseInd == 1) {
TreleaseScanBucket = TSplit;
- TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
- TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
+ TPageIndex = fragrecptr.p->getPageIndex(TreleaseScanBucket);
+ TDirInd = fragrecptr.p->getPageNumber(TreleaseScanBucket);
TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd);
ptrCheckGuard(TPageptr, cpagesize, page8);
for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
@@ -5192,11 +5179,9 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
fragrecptr.i = signal->theData[0];
tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
- Uint32 tmp = 1;
- tmp = tmp << 31;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- fragrecptr.p->expandFlag = 0;
- if (fragrecptr.p->slack < tmp) {
+ fragrecptr.p->expandOrShrinkQueued = false;
+ if (fragrecptr.p->slack > 0) {
jam();
/* IT MEANS THAT IF SLACK > ZERO */
/*--------------------------------------------------------------*/
@@ -5232,7 +5217,24 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
/*--------------------------------------------------------------*/
return;
}//if
- if (checkScanExpand(signal) == 1) {
+
+ if (fragrecptr.p->level.isFull())
+ {
+ jam();
+ /*
+ * The level structure does not allow more buckets.
+ * Do not expand.
+ */
+ return;
+ }
+
+ Uint32 splitBucket;
+ Uint32 receiveBucket;
+
+ bool doSplit = fragrecptr.p->level.getSplitBucket(splitBucket, receiveBucket);
+
+ // Check that splitted bucket is not currently scanned
+ if (doSplit && checkScanExpand(signal, splitBucket) == 1) {
jam();
/*--------------------------------------------------------------*/
// A scan state was inconsistent with performing an expand
@@ -5247,9 +5249,9 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
/* THE NEXT HASH BIT. THIS BIT IS USED IN THE SPLIT MECHANISM TO */
/* DECIDE WHICH ELEMENT GOES WHERE. */
/*--------------------------------------------------------------------------*/
- texpReceivedBucket = (fragrecptr.p->maxp + fragrecptr.p->p) + 1; /* RECEIVED BUCKET */
- texpDirInd = texpReceivedBucket >> fragrecptr.p->k;
- if ((texpReceivedBucket & ((1 << fragrecptr.p->k) - 1)) == 0)
+
+ texpDirInd = fragrecptr.p->getPageNumber(receiveBucket);
+ if (fragrecptr.p->getPageIndex(receiveBucket) == 0)
{ // Need new bucket
expPageptr.i = RNIL;
}
@@ -5262,15 +5264,6 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
}
if (expPageptr.i == RNIL) {
jam();
- // We cannot expand if the new page index cannot be
- // represented in the stored hash bits.
- if (texpDirInd > ElementHeader::HASH_VALUE_PART_MASK)
- {
- jam();
- fragrecptr.p->dirRangeFull = ZTRUE;
- tresult = ZDIR_RANGE_FULL_ERROR;
- return;
- }
seizePage(signal);
if (tresult > ZLIMIT_OF_ERROR) {
jam();
@@ -5279,6 +5272,7 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
if (!setPagePtr(fragrecptr.p->directory, texpDirInd, spPageptr.i))
{
jam();
+ // TODO: should release seized page
tresult = ZDIR_RANGE_FULL_ERROR;
return;
}
@@ -5291,13 +5285,13 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
}//if
fragrecptr.p->expReceivePageptr = expPageptr.i;
- fragrecptr.p->expReceiveIndex = texpReceivedBucket & ((1 << fragrecptr.p->k) - 1);
+ fragrecptr.p->expReceiveIndex = fragrecptr.p->getPageIndex(receiveBucket);
/*--------------------------------------------------------------------------*/
/* THE NEXT ACTION IS TO FIND THE PAGE, THE PAGE INDEX AND THE PAGE */
/* DIRECTORY OF THE BUCKET TO BE SPLIT. */
/*--------------------------------------------------------------------------*/
- cexcPageindex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
- texpDirInd = fragrecptr.p->p >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
+ cexcPageindex = fragrecptr.p->getPageIndex(splitBucket);
+ texpDirInd = fragrecptr.p->getPageNumber(splitBucket);
excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
#ifdef VM_TRACE
require(excPageptr.i != RNIL);
@@ -5318,71 +5312,27 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
void Dbacc::endofexpLab(Signal* signal)
{
- fragrecptr.p->p++;
fragrecptr.p->slack += fragrecptr.p->maxloadfactor;
fragrecptr.p->expandCounter++;
- if (fragrecptr.p->p > fragrecptr.p->maxp) {
- jam();
- fragrecptr.p->maxp = (fragrecptr.p->maxp << 1) | 1;
- fragrecptr.p->hashcheckbit++;
- fragrecptr.p->p = 0;
- }//if
- Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p;
+ fragrecptr.p->level.expand();
+ Uint32 noOfBuckets = fragrecptr.p->level.getSize();
Uint32 Thysteres = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor;
- fragrecptr.p->slackCheck = noOfBuckets * Thysteres;
- if (fragrecptr.p->slack > (1u << 31)) {
+ fragrecptr.p->slackCheck = Int64(noOfBuckets) * Thysteres;
+ if (fragrecptr.p->slack < 0 && !fragrecptr.p->level.isFull())
+ {
jam();
/* IT MEANS THAT IF SLACK < ZERO */
/* --------------------------------------------------------------------------------- */
/* IT IS STILL NECESSARY TO EXPAND THE FRAGMENT EVEN MORE. START IT FROM HERE */
/* WITHOUT WAITING FOR NEXT COMMIT ON THE FRAGMENT. */
/* --------------------------------------------------------------------------------- */
- fragrecptr.p->expandFlag = 2;
signal->theData[0] = fragrecptr.i;
- signal->theData[1] = fragrecptr.p->p;
- signal->theData[2] = fragrecptr.p->maxp;
- sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
+ fragrecptr.p->expandOrShrinkQueued = true;
+ sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 1, JBB);
}//if
return;
}//Dbacc::endofexpLab()
-void Dbacc::reenable_expand_after_redo_log_exection_complete(Signal* signal){
-
- tabptr.i = signal->theData[0];
- Uint32 fragId = signal->theData[1];
-
- ptrCheckGuard(tabptr, ctablesize, tabrec);
- ndbrequire(getfragmentrec(signal, fragrecptr, fragId));
-#if 0
- ndbout_c("reenable expand check for table %d fragment: %d",
- tabptr.i, fragId);
-#endif
-
- switch(fragrecptr.p->expandFlag){
- case 0:
- /**
- * Hmm... this means that it's alreay has been reenabled...
- */
- fragrecptr.p->expandFlag = 1;
- break;
- case 1:
- /**
- * Nothing is going on start expand check
- */
- case 2:
- /**
- * A shrink is running, do expand check anyway
- * (to reset expandFlag)
- */
- fragrecptr.p->expandFlag = 2;
- signal->theData[0] = fragrecptr.i;
- signal->theData[1] = fragrecptr.p->p;
- signal->theData[2] = fragrecptr.p->maxp;
- sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
- break;
- }
-}
-
void Dbacc::execDEBUG_SIG(Signal* signal)
{
jamEntry();
@@ -5392,6 +5342,86 @@ void Dbacc::execDEBUG_SIG(Signal* signal
return;
}//Dbacc::execDEBUG_SIG()
+LHBits32 Dbacc::getElementHash(OperationrecPtr& oprec)
+{
+ jam();
+ ndbassert(!oprec.isNull());
+
+ // Only calculate hash value if operation does not already have a complete hash value
+ if (oprec.p->hashValue.valid_bits() < fragrecptr.p->MAX_HASH_VALUE_BITS)
+ {
+ jam();
+ Uint32 localkey[2];
+ localkey[0] = oprec.p->localdata[0];
+ localkey[1] = oprec.p->localdata[1];
+ Uint32 len = readTablePk(localkey[0], localkey[1], ElementHeader::setLocked(oprec.i), oprec);
+ if (len > 0)
+ oprec.p->hashValue = LHBits32(md5_hash((Uint64*)ckeys, len));
+ }
+ return oprec.p->hashValue;
+}
+
+LHBits32 Dbacc::getElementHash(Uint32 const* elemptr, Int32 forward)
+{
+ jam();
+ assert(ElementHeader::getUnlocked(*elemptr));
+
+ Uint32 elemhead = *elemptr;
+ Uint32 localkey[2];
+ elemptr += forward;
+ localkey[0] = *elemptr;
+ if (likely(fragrecptr.p->localkeylen == 1))
+ {
+ jam();
+ localkey[1] = Local_key::ref2page_idx(localkey[0]);
+ localkey[0] = Local_key::ref2page_id(localkey[0]);
+ }
+ else
+ {
+ jam();
+ elemptr += forward;
+ localkey[1] = *elemptr;
+ }
+ OperationrecPtr oprec;
+ oprec.i = RNIL;
+ Uint32 len = readTablePk(localkey[0], localkey[1], elemhead, oprec);
+ if (len > 0)
+ {
+ jam();
+ return LHBits32(md5_hash((Uint64*)ckeys, len));
+ }
+ else
+ { // Return an invalid hash value if no data
+ jam();
+ return LHBits32();
+ }
+}
+
+LHBits32 Dbacc::getElementHash(Uint32 const* elemptr, Int32 forward, OperationrecPtr& oprec)
+{
+ jam();
+
+ if (!oprec.isNull())
+ {
+ jam();
+ return getElementHash(oprec);
+ }
+
+ Uint32 elemhead = *elemptr;
+ if (ElementHeader::getUnlocked(elemhead))
+ {
+ jam();
+ return getElementHash(elemptr, forward);
+ }
+ else
+ {
+ jam();
+ oprec.i = ElementHeader::getOpPtrI(elemhead);
+ ptrCheckGuard(oprec, coprecsize, operationrec);
+ return getElementHash(oprec);
+ }
+}
+
/* --------------------------------------------------------------------------------- */
/* EXPANDCONTAINER */
/* INPUT: EXC_PAGEPTR (POINTER TO THE ACTIVE PAGE RECORD) */
@@ -5402,7 +5432,7 @@ void Dbacc::execDEBUG_SIG(Signal* signal
/* --------------------------------------------------------------------------------- */
void Dbacc::expandcontainer(Signal* signal)
{
- Uint32 texcHashvalue;
+ LHBits32 texcHashvalue;
Uint32 texcTmp;
Uint32 texcIndex;
Uint32 guard20;
@@ -5444,17 +5474,42 @@ void Dbacc::expandcontainer(Signal* sign
/* --------------------------------------------------------------------------------- */
arrGuard(cexcElementptr, 2048);
tidrElemhead = excPageptr.p->word32[cexcElementptr];
- if (ElementHeader::getUnlocked(tidrElemhead)){
- jam();
- texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead);
- } else {
+ bool move;
+ if (ElementHeader::getLocked(tidrElemhead))
+ {
jam();
idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
- texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
- }//if
- if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
+ ndbassert(idrOperationRecPtr.p->reducedHashValue.valid_bits() >= 1);
+ move = idrOperationRecPtr.p->reducedHashValue.get_bit(1);
+ idrOperationRecPtr.p->reducedHashValue.shift_out();
+ if (!fragrecptr.p->enough_valid_bits(idrOperationRecPtr.p->reducedHashValue))
+ {
+ jam();
+ idrOperationRecPtr.p->reducedHashValue =
+ fragrecptr.p->level.reduce(getElementHash(idrOperationRecPtr));
+ }
+ }
+ else
+ {
+ jam();
+ LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tidrElemhead);
+ ndbassert(reducedHashValue.valid_bits() >= 1);
+ move = reducedHashValue.get_bit(1);
+ reducedHashValue.shift_out();
+ if (!fragrecptr.p->enough_valid_bits(reducedHashValue))
+ {
+ jam();
+ reducedHashValue =
+ fragrecptr.p->level.reduce(getElementHash(&excPageptr.p->word32[cexcElementptr], cexcForward));
+ }
+ tidrElemhead = ElementHeader::setReducedHashValue(tidrElemhead, reducedHashValue);
+ }
+ if (!move)
+ {
jam();
+ if (ElementHeader::getUnlocked(tidrElemhead))
+ excPageptr.p->word32[cexcElementptr] = tidrElemhead;
/* --------------------------------------------------------------------------------- */
/* THIS ELEMENT IS NOT TO BE MOVED. WE CALCULATE THE WHEREABOUTS OF THE NEXT */
/* ELEMENT AND PROCEED WITH THAT OR END THE SEARCH IF THERE ARE NO MORE */
@@ -5519,17 +5574,41 @@ void Dbacc::expandcontainer(Signal* sign
ptrNull(idrOperationRecPtr);
arrGuard(tlastElementptr, 2048);
tidrElemhead = lastPageptr.p->word32[tlastElementptr];
- if (ElementHeader::getUnlocked(tidrElemhead)) {
- jam();
- texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead);
- } else {
+ if (ElementHeader::getLocked(tidrElemhead))
+ {
jam();
idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
- texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
- }//if
- if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
+ ndbassert(idrOperationRecPtr.p->reducedHashValue.valid_bits() >= 1);
+ move = idrOperationRecPtr.p->reducedHashValue.get_bit(1);
+ idrOperationRecPtr.p->reducedHashValue.shift_out();
+ if (!fragrecptr.p->enough_valid_bits(idrOperationRecPtr.p->reducedHashValue))
+ {
+ jam();
+ idrOperationRecPtr.p->reducedHashValue =
+ fragrecptr.p->level.reduce(getElementHash(idrOperationRecPtr));
+ }
+ }
+ else
+ {
jam();
+ LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tidrElemhead);
+ ndbassert(reducedHashValue.valid_bits() > 0);
+ move = reducedHashValue.get_bit(1);
+ reducedHashValue.shift_out();
+ if (!fragrecptr.p->enough_valid_bits(reducedHashValue))
+ {
+ jam();
+ reducedHashValue =
+ fragrecptr.p->level.reduce(getElementHash(&lastPageptr.p->word32[tlastElementptr], tlastForward));
+ }
+ tidrElemhead = ElementHeader::setReducedHashValue(tidrElemhead, reducedHashValue);
+ }
+ if (!move)
+ {
+ jam();
+ if (ElementHeader::getUnlocked(tidrElemhead))
+ lastPageptr.p->word32[tlastElementptr] = tidrElemhead;
/* --------------------------------------------------------------------------------- */
/* THE LAST ELEMENT IS NOT TO BE MOVED. WE COPY IT TO THE CURRENT ELEMENT. */
/* --------------------------------------------------------------------------------- */
@@ -5602,7 +5681,7 @@ void Dbacc::expandcontainer(Signal* sign
/* WILL BE JOINED ACORDING TO LH3 */
/* AND COMMIT TRANSACTION PROCESS */
/* WILL BE CONTINUED */
-Uint32 Dbacc::checkScanShrink(Signal* signal)
+Uint32 Dbacc::checkScanShrink(Signal* signal, Uint32 sourceBucket, Uint32 destBucket)
{
Uint32 Ti;
Uint32 TreturnCode = 0;
@@ -5616,14 +5695,8 @@ Uint32 Dbacc::checkScanShrink(Signal* si
Page8Ptr TPageptr;
ScanRecPtr TscanPtr;
- if (fragrecptr.p->p == 0) {
- jam();
- TmergeDest = fragrecptr.p->maxp >> 1;
- } else {
- jam();
- TmergeDest = fragrecptr.p->p - 1;
- }//if
- TmergeSource = fragrecptr.p->maxp + fragrecptr.p->p;
+ TmergeDest = destBucket;
+ TmergeSource = sourceBucket;
for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
TreleaseScanIndicator[Ti] = 0;
if (fragrecptr.p->scan[Ti] != RNIL) {
@@ -5677,8 +5750,8 @@ Uint32 Dbacc::checkScanShrink(Signal* si
if (TreleaseInd == 1) {
jam();
TreleaseScanBucket = TmergeSource;
- TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
- TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
+ TPageIndex = fragrecptr.p->getPageIndex(TreleaseScanBucket);
+ TDirInd = fragrecptr.p->getPageNumber(TreleaseScanBucket);
TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd);
ptrCheckGuard(TPageptr, cpagesize, page8);
for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
@@ -5718,9 +5791,8 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
jamEntry();
fragrecptr.i = signal->theData[0];
- Uint32 oldFlag = signal->theData[3];
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- fragrecptr.p->expandFlag = oldFlag;
+ fragrecptr.p->expandOrShrinkQueued = false;
tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
if (fragrecptr.p->slack <= fragrecptr.p->slackCheck) {
jam();
@@ -5730,7 +5802,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
/*--------------------------------------------------------------*/
return;
}//if
- if (fragrecptr.p->slack > (1u << 31)) {
+ if (fragrecptr.p->slack < 0) {
jam();
/*--------------------------------------------------------------*/
/* THE SLACK IS NEGATIVE, IN THIS CASE WE WILL NOT NEED ANY */
@@ -5738,7 +5810,6 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
/*--------------------------------------------------------------*/
return;
}//if
- texpDirInd = (fragrecptr.p->maxp + fragrecptr.p->p) >> fragrecptr.p->k;
if (fragrecptr.p->firstOverflowRec == RNIL) {
jam();
allocOverflowPage(signal);
@@ -5757,7 +5828,26 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
/*--------------------------------------------------------------*/
return;
}//if
- if (checkScanShrink(signal) == 1) {
+
+ if (fragrecptr.p->level.isEmpty())
+ {
+ jam();
+ /* no need to shrink empty hash table */
+ return;
+ }
+
+ // Since expandCounter guards more shrinks than expands and
+ // all fragments starts with a full page of buckets
+ ndbassert(fragrecptr.p->getPageNumber(fragrecptr.p->level.getTop()) > 0);
+
+ Uint32 mergeSourceBucket;
+ Uint32 mergeDestBucket;
+ bool doMerge = fragrecptr.p->level.getMergeBuckets(mergeSourceBucket, mergeDestBucket);
+
+ ndbassert(doMerge); // Merge always needed since we never shrink below one page of buckets
+
+ /* check that neither of source or destination bucket are currently scanned */
+ if (doMerge && checkScanShrink(signal, mergeSourceBucket, mergeDestBucket) == 1) {
jam();
/*--------------------------------------------------------------*/
// A scan state was inconsistent with performing a shrink
@@ -5765,15 +5855,6 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
/*--------------------------------------------------------------*/
return;
}//if
- if (fragrecptr.p->p == 0) {
- jam();
- fragrecptr.p->maxp = fragrecptr.p->maxp >> 1;
- fragrecptr.p->p = fragrecptr.p->maxp;
- fragrecptr.p->hashcheckbit--;
- } else {
- jam();
- fragrecptr.p->p--;
- }//if
if (ERROR_INSERTED(3002))
debug_lh_vars("SHR");
@@ -5782,12 +5863,14 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
fragrecptr.p->dirRangeFull = ZFALSE;
}
+ shrink_adjust_reduced_hash_value(mergeDestBucket);
+
/*--------------------------------------------------------------------------*/
/* WE START BY FINDING THE NECESSARY INFORMATION OF THE BUCKET TO BE */
/* REMOVED WHICH WILL SEND ITS ELEMENTS TO THE RECEIVING BUCKET. */
/*--------------------------------------------------------------------------*/
- cexcPageindex = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) & ((1 << fragrecptr.p->k) - 1);
- texpDirInd = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) >> fragrecptr.p->k;
+ cexcPageindex = fragrecptr.p->getPageIndex(mergeSourceBucket);
+ texpDirInd = fragrecptr.p->getPageNumber(mergeSourceBucket);
excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
fragrecptr.p->expSenderIndex = cexcPageindex;
fragrecptr.p->expSenderPageptr = excPageptr.i;
@@ -5796,9 +5879,9 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
/* WE NOW PROCEED BY FINDING THE NECESSARY INFORMATION ABOUT THE */
/* RECEIVING BUCKET. */
/*--------------------------------------------------------------------------*/
- texpReceivedBucket = fragrecptr.p->p >> fragrecptr.p->k;
- fragrecptr.p->expReceivePageptr = getPagePtr(fragrecptr.p->directory, texpReceivedBucket);
- fragrecptr.p->expReceiveIndex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1);
+ texpDirInd = fragrecptr.p->getPageNumber(mergeDestBucket);
+ fragrecptr.p->expReceivePageptr = getPagePtr(fragrecptr.p->directory, texpDirInd);
+ fragrecptr.p->expReceiveIndex = fragrecptr.p->getPageIndex(mergeDestBucket);
fragrecptr.p->expReceiveForward = ZTRUE;
if (excPageptr.i == RNIL) {
jam();
@@ -5904,6 +5987,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
void Dbacc::endofshrinkbucketLab(Signal* signal)
{
+ fragrecptr.p->level.shrink();
fragrecptr.p->expandCounter--;
fragrecptr.p->slack -= fragrecptr.p->maxloadfactor;
if (fragrecptr.p->expSenderIndex == 0) {
@@ -5915,7 +5999,7 @@ void Dbacc::endofshrinkbucketLab(Signal*
releasePage(signal);
unsetPagePtr(fragrecptr.p->directory, fragrecptr.p->expSenderDirIndex);
}//if
- if (((((fragrecptr.p->p + fragrecptr.p->maxp) + 1) >> fragrecptr.p->k) & 0xff) == 0) {
+ if ((fragrecptr.p->getPageNumber(fragrecptr.p->level.getSize()) & 0xff) == 0) {
jam();
DynArr256 dir(directoryPool, fragrecptr.p->directory);
DynArr256::ReleaseIterator iter;
@@ -5933,15 +6017,15 @@ void Dbacc::endofshrinkbucketLab(Signal*
}
}//if
}//if
- if (fragrecptr.p->slack < (1u << 31)) {
+ if (fragrecptr.p->slack > 0) {
jam();
/*--------------------------------------------------------------*/
/* THE SLACK IS POSITIVE, IN THIS CASE WE WILL CHECK WHETHER */
/* WE WILL CONTINUE PERFORM ANOTHER SHRINK. */
/*--------------------------------------------------------------*/
- Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p;
+ Uint32 noOfBuckets = fragrecptr.p->level.getSize();
Uint32 Thysteresis = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor;
- fragrecptr.p->slackCheck = noOfBuckets * Thysteresis;
+ fragrecptr.p->slackCheck = Int64(noOfBuckets) * Thysteresis;
if (fragrecptr.p->slack > Thysteresis) {
/*--------------------------------------------------------------*/
/* IT IS STILL NECESSARY TO SHRINK THE FRAGMENT MORE. THIS*/
@@ -5960,16 +6044,13 @@ void Dbacc::endofshrinkbucketLab(Signal*
/* WAS REMOVED 2000-05-12. */
/*--------------------------------------------------------------*/
signal->theData[0] = fragrecptr.i;
- signal->theData[1] = fragrecptr.p->p;
- signal->theData[2] = fragrecptr.p->maxp;
- signal->theData[3] = fragrecptr.p->expandFlag;
- ndbrequire(fragrecptr.p->expandFlag < 2);
- fragrecptr.p->expandFlag = 2;
- sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB);
+ ndbrequire(!fragrecptr.p->expandOrShrinkQueued);
+ fragrecptr.p->expandOrShrinkQueued = true;
+ sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 1, JBB);
}//if
}//if
}//if
- ndbrequire(fragrecptr.p->maxp >= (Uint32)((1 << fragrecptr.p->k) - 1));
+ ndbrequire(fragrecptr.p->getPageNumber(fragrecptr.p->level.getSize()) > 0);
return;
}//Dbacc::endofshrinkbucketLab()
@@ -5980,9 +6061,121 @@ void Dbacc::endofshrinkbucketLab(Signal*
/* CEXC_CONTAINERPTR (ARRAY INDEX OF THE CONTAINER). */
/* CEXC_FORWARD (CONTAINER FORWARD (+1) OR BACKWARD (-1)) */
/* */
-/* DESCRIPTION: ALL ELEMENTS OF THE ACTIVE CONTAINER HAVE TO MOVE TO THE NEW */
-/* CONTAINER. */
+/* DESCRIPTION: SCAN ALL ELEMENTS IN DESTINATION BUCKET BEFORE MERGE */
+/* AND ADJUST THE STORED REDUCED HASH VALUE (SHIFT IN ZERO). */
/* --------------------------------------------------------------------------------- */
+void
+Dbacc::shrink_adjust_reduced_hash_value(Uint32 bucket_number)
+{
+ /*
+ * Note: function are a copy paste from getElement() with modified inner loop
+ * instead of finding a specific element, scan through all and modify.
+ */
+ Uint32 tgeElementHeader;
+ Uint32 tgeElemStep;
+ Uint32 tgeContainerhead;
+ Uint32 tgePageindex;
+ Uint32 tgeActivePageDir;
+ Uint32 tgeNextptrtype;
+ register Uint32 tgeRemLen;
+ register Uint32 TelemLen = fragrecptr.p->elementLength;
+ const Uint32 localkeylen = fragrecptr.p->localkeylen;
+
+ tgePageindex = fragrecptr.p->getPageIndex(bucket_number);
+ gePageptr.i = getPagePtr(fragrecptr.p->directory, fragrecptr.p->getPageNumber(bucket_number));
+ ptrCheckGuard(gePageptr, cpagesize, page8);
+
+ ndbrequire(TelemLen == ZELEM_HEAD_SIZE + localkeylen);
+ tgeNextptrtype = ZLEFT;
+
+ /* Loop through all containers in a bucket */
+ do {
+ tgeContainerptr = mul_ZBUF_SIZE(tgePageindex);
+ if (tgeNextptrtype == ZLEFT)
+ {
+ jam();
+ tgeContainerptr = tgeContainerptr + ZHEAD_SIZE;
+ tgeElementptr = tgeContainerptr + ZCON_HEAD_SIZE;
+ tgeElemStep = TelemLen;
+ tgeForward = 1;
+ ndbrequire(tgeContainerptr < 2048);
+ tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
+ ndbrequire((tgeContainerptr + tgeRemLen - 1) < 2048);
+ }
+ else if (tgeNextptrtype == ZRIGHT)
+ {
+ jam();
+ tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
+ tgeElementptr = tgeContainerptr - 1;
+ tgeElemStep = 0 - TelemLen;
+ tgeForward = (Uint32)-1;
+ ndbrequire(tgeContainerptr < 2048);
+ tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
+ ndbrequire((tgeContainerptr - tgeRemLen) < 2048);
+ }
+ else
+ {
+ jam();
+ jamLine(tgeNextptrtype);
+ ndbrequire(false);
+ }//if
+ if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen)
+ {
+ ndbrequire(tgeRemLen <= ZBUF_SIZE);
+ /* ------------------------------------------------------------------- */
+ /* Loop through all elements in a container */
+ do
+ {
+ tgeElementHeader = gePageptr.p->word32[tgeElementptr];
+ tgeRemLen = tgeRemLen - TelemLen;
+ /*
+ * Adjust the stored reduced hash value for element, shifting in a zero
+ */
+ if (ElementHeader::getLocked(tgeElementHeader))
+ {
+ jam();
+ OperationrecPtr oprec;
+ oprec.i = ElementHeader::getOpPtrI(tgeElementHeader);
+ ptrCheckGuard(oprec, coprecsize, operationrec);
+ oprec.p->reducedHashValue.shift_in(false);
+ }
+ else
+ {
+ jam();
+ LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tgeElementHeader);
+ reducedHashValue.shift_in(false);
+ tgeElementHeader = ElementHeader::setReducedHashValue(tgeElementHeader, reducedHashValue);
+ gePageptr.p->word32[tgeElementptr] = tgeElementHeader;
+ }
+ if (tgeRemLen <= ZCON_HEAD_SIZE)
+ {
+ break;
+ }
+ tgeElementptr = tgeElementptr + tgeElemStep;
+ } while (true);
+ }//if
+ ndbrequire(tgeRemLen == ZCON_HEAD_SIZE);
+ tgeContainerhead = gePageptr.p->word32[tgeContainerptr];
+ tgeNextptrtype = (tgeContainerhead >> 7) & 0x3;
+ if (tgeNextptrtype == 0)
+ {
+ jam();
+ return; /* NO MORE CONTAINER */
+ }//if
+ tgePageindex = tgeContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */
+ ndbrequire(tgePageindex <= ZEMPTYLIST);
+ if (((tgeContainerhead >> 9) & 1) == ZFALSE)
+ {
+ jam();
+ tgeActivePageDir = gePageptr.p->word32[tgeContainerptr + 1]; /* NEXT PAGE ID */
+ gePageptr.i = getPagePtr(fragrecptr.p->overflowdir, tgeActivePageDir);
+ ptrCheckGuard(gePageptr, cpagesize, page8);
+ }//if
+ } while (1);
+
+ return;
+}//Dbacc::shrink_adjust_reduced_hash_value()
+
void Dbacc::shrinkcontainer(Signal* signal)
{
Uint32 tshrElementptr;
@@ -5991,7 +6184,6 @@ void Dbacc::shrinkcontainer(Signal* sign
Uint32 tshrTmp;
Uint32 tshrIndex;
Uint32 guard21;
-
tshrRemLen = cexcContainerlen - ZCON_HEAD_SIZE;
tshrInc = fragrecptr.p->elementLength;
if (cexcForward == ZTRUE) {
@@ -6020,7 +6212,14 @@ void Dbacc::shrinkcontainer(Signal* sign
/* --------------------------------------------------------------------------------- */
idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
+ idrOperationRecPtr.p->reducedHashValue.shift_in(true);
}//if
+ else
+ {
+ LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tidrElemhead);
+ reducedHashValue.shift_in(true);
+ tidrElemhead = ElementHeader::setReducedHashValue(tidrElemhead, reducedHashValue);
+ }
tshrTmp = tshrElementptr + cexcForward;
guard21 = fragrecptr.p->localkeylen - 1;
for (tshrIndex = 0; tshrIndex <= guard21; tshrIndex++) {
@@ -6094,7 +6293,6 @@ void Dbacc::initFragAdd(Signal* signal,
FragmentrecPtr regFragPtr)
{
const AccFragReq * const req = (AccFragReq*)&signal->theData[0];
- Uint32 lhFragBits = req->lhFragBits + 1;
Uint32 minLoadFactor = (req->minLoadFactor * ZBUF_SIZE) / 100;
Uint32 maxLoadFactor = (req->maxLoadFactor * ZBUF_SIZE) / 100;
if (ERROR_INSERTED(3003)) // use small LoadFactors to force sparse hash table
@@ -6112,7 +6310,7 @@ void Dbacc::initFragAdd(Signal* signal,
regFragPtr.p->myfid = req->fragId;
regFragPtr.p->myTableId = req->tableId;
ndbrequire(req->kValue == 6);
- regFragPtr.p->k = req->kValue; /* TK_SIZE = 6 IN THIS VERSION */
+ ndbrequire(req->kValue == regFragPtr.p->k);
regFragPtr.p->expandCounter = 0;
/**
@@ -6121,24 +6319,20 @@ void Dbacc::initFragAdd(Signal* signal,
*
* Is later restored to 0 by LQH at end of REDO log execution
*/
- regFragPtr.p->expandFlag = 0;
- regFragPtr.p->p = 0;
- regFragPtr.p->maxp = (1 << req->kValue) - 1;
+ regFragPtr.p->expandOrShrinkQueued = false;
+ regFragPtr.p->level.setSize(1 << req->kValue);
regFragPtr.p->minloadfactor = minLoadFactor;
regFragPtr.p->maxloadfactor = maxLoadFactor;
- regFragPtr.p->slack = (regFragPtr.p->maxp + 1) * maxLoadFactor;
- regFragPtr.p->lhfragbits = lhFragBits;
- regFragPtr.p->hashcheckbit = 0; //lhFragBits;
+ regFragPtr.p->slack = Int64(regFragPtr.p->level.getSize()) * maxLoadFactor;
regFragPtr.p->localkeylen = req->localKeyLen;
regFragPtr.p->nodetype = (req->reqInfo >> 4) & 0x3;
regFragPtr.p->lastOverIndex = 0;
regFragPtr.p->keyLength = req->keyLength;
ndbrequire(req->keyLength != 0);
regFragPtr.p->elementLength = ZELEM_HEAD_SIZE + regFragPtr.p->localkeylen;
- Uint32 Tmp1 = (regFragPtr.p->maxp + 1) + regFragPtr.p->p;
+ Uint32 Tmp1 = regFragPtr.p->level.getSize();
Uint32 Tmp2 = regFragPtr.p->maxloadfactor - regFragPtr.p->minloadfactor;
- Tmp2 = Tmp1 * Tmp2;
- regFragPtr.p->slackCheck = Tmp2;
+ regFragPtr.p->slackCheck = Int64(Tmp1) * Tmp2;
regFragPtr.p->mytabptr = req->tableId;
regFragPtr.p->roothashcheck = req->kValue + req->lhFragBits;
regFragPtr.p->noOfElements = 0;
@@ -6318,17 +6512,14 @@ void Dbacc::checkNextBucketLab(Signal* s
Uint32 tnsElementptr;
Uint32 tnsContainerptr;
Uint32 tnsIsLocked;
- Uint32 tnsTmp1;
- Uint32 tnsTmp2;
Uint32 tnsCopyDir;
- tnsCopyDir = scanPtr.p->nextBucketIndex >> fragrecptr.p->k;
+ tnsCopyDir = fragrecptr.p->getPageNumber(scanPtr.p->nextBucketIndex);
tnsPageidptr.i = getPagePtr(fragrecptr.p->directory, tnsCopyDir);
ptrCheckGuard(tnsPageidptr, cpagesize, page8);
gnsPageidptr.i = tnsPageidptr.i;
gnsPageidptr.p = tnsPageidptr.p;
- tnsTmp1 = (1 << fragrecptr.p->k) - 1;
- tgsePageindex = scanPtr.p->nextBucketIndex & tnsTmp1;
+ tgsePageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex);
gsePageidptr.i = gnsPageidptr.i;
gsePageidptr.p = gnsPageidptr.p;
if (!getScanElement(signal)) {
@@ -6344,7 +6535,7 @@ void Dbacc::checkNextBucketLab(Signal* s
return;
}//if
} else if (scanPtr.p->scanBucketState == ScanRec::FIRST_LAP) {
- if ((fragrecptr.p->p + fragrecptr.p->maxp) < scanPtr.p->nextBucketIndex) {
+ if (fragrecptr.p->level.getTop() < scanPtr.p->nextBucketIndex) {
/* ---------------------------------------------------------------- */
// All buckets have been scanned a first time.
/* ---------------------------------------------------------------- */
@@ -6366,7 +6557,7 @@ void Dbacc::checkNextBucketLab(Signal* s
/* --------------------------------------------------------------------------------- */
scanPtr.p->nextBucketIndex = scanPtr.p->minBucketIndexToRescan;
scanPtr.p->scanBucketState = ScanRec::SECOND_LAP;
- if (scanPtr.p->maxBucketIndexToRescan > (fragrecptr.p->p + fragrecptr.p->maxp)) {
+ if (scanPtr.p->maxBucketIndexToRescan > fragrecptr.p->level.getTop()) {
jam();
/* --------------------------------------------------------------------------------- */
// If we have had so many merges that the maximum is bigger than the number of buckets
@@ -6379,7 +6570,7 @@ void Dbacc::checkNextBucketLab(Signal* s
sendSystemerror(signal, __LINE__);
return;
}//if
- scanPtr.p->maxBucketIndexToRescan = fragrecptr.p->p + fragrecptr.p->maxp;
+ scanPtr.p->maxBucketIndexToRescan = fragrecptr.p->level.getTop();
}//if
}//if
}//if
@@ -6390,19 +6581,17 @@ void Dbacc::checkNextBucketLab(Signal* s
// We will only reset the scan indicator on the buckets that existed at the start of the
// scan. The others will be handled by the split and merge code.
/* --------------------------------------------------------------------------------- */
- tnsTmp2 = (1 << fragrecptr.p->k) - 1;
- trsbPageindex = scanPtr.p->nextBucketIndex & tnsTmp2;
+ trsbPageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex);
if (trsbPageindex != 0) {
jam();
rsbPageidptr.i = gnsPageidptr.i;
rsbPageidptr.p = gnsPageidptr.p;
} else {
jam();
- tmpP = scanPtr.p->nextBucketIndex >> fragrecptr.p->k;
+ tmpP = fragrecptr.p->getPageNumber(scanPtr.p->nextBucketIndex);
cscPageidptr.i = getPagePtr(fragrecptr.p->directory, tmpP);
ptrCheckGuard(cscPageidptr, cpagesize, page8);
- tmp1 = (1 << fragrecptr.p->k) - 1;
- trsbPageindex = scanPtr.p->nextBucketIndex & tmp1;
+ trsbPageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex);
rsbPageidptr.i = cscPageidptr.i;
rsbPageidptr.p = cscPageidptr.p;
}//if
@@ -6539,12 +6728,12 @@ void Dbacc::initScanFragmentPart(Signal*
scanPtr.p->activeLocalFrag = fragrecptr.i;
scanPtr.p->nextBucketIndex = 0; /* INDEX OF SCAN BUCKET */
scanPtr.p->scanBucketState = ScanRec::FIRST_LAP;
- scanPtr.p->startNoOfBuckets = fragrecptr.p->p + fragrecptr.p->maxp;
+ scanPtr.p->startNoOfBuckets = fragrecptr.p->level.getTop();
scanPtr.p->minBucketIndexToRescan = 0xFFFFFFFF;
scanPtr.p->maxBucketIndexToRescan = 0;
cnfPageidptr.i = getPagePtr(fragrecptr.p->directory, 0);
ptrCheckGuard(cnfPageidptr, cpagesize, page8);
- trsbPageindex = scanPtr.p->nextBucketIndex & ((1 << fragrecptr.p->k) - 1);
+ trsbPageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex);
rsbPageidptr.i = cnfPageidptr.i;
rsbPageidptr.p = cnfPageidptr.p;
releaseScanBucket(signal);
@@ -6914,6 +7103,7 @@ void Dbacc::initScanOpRec(Signal* signal
operationRecPtr.p->localdata[0] = Tkey1;
operationRecPtr.p->localdata[1] = isoPageptr.p->word32[tisoLocalPtr];
}
+ operationRecPtr.p->hashValue.clear();
operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength;
operationRecPtr.p->xfrmtupkeylen = 0; // not used
}//Dbacc::initScanOpRec()
@@ -7287,7 +7477,7 @@ void Dbacc::setlock(Signal* signal)
arrGuard(tslElementptr, 2048);
tselTmp1 = slPageidptr.p->word32[tslElementptr];
operationRecPtr.p->scanBits = ElementHeader::getScanBits(tselTmp1);
- operationRecPtr.p->hashvaluePart = ElementHeader::getHashValuePart(tselTmp1);
+ operationRecPtr.p->reducedHashValue = ElementHeader::getReducedHashValue(tselTmp1);
tselTmp1 = ElementHeader::setLocked(operationRecPtr.i);
dbgWord32(slPageidptr, tslElementptr, tselTmp1);
@@ -8188,10 +8378,9 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal
infoEvent("elementIsforward=%d, elementPage=%d, elementPointer=%d ",
tmpOpPtr.p->elementIsforward, tmpOpPtr.p->elementPage,
tmpOpPtr.p->elementPointer);
- infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ",
- tmpOpPtr.p->fid, tmpOpPtr.p->fragptr,
- tmpOpPtr.p->hashvaluePart);
- infoEvent("hashValue=%d", tmpOpPtr.p->hashValue);
+ infoEvent("fid=%d, fragptr=%d ",
+ tmpOpPtr.p->fid, tmpOpPtr.p->fragptr);
+ infoEvent("hashValue=%d", tmpOpPtr.p->hashValue.pack());
infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ",
tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp,
tmpOpPtr.p->nextParallelQue);
@@ -8202,8 +8391,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal
tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue);
infoEvent("prevSerialQue=%d, scanRecPtr=%d",
tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr);
- infoEvent("m_op_bits=0x%x, scanBits=%d ",
- tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits);
+ infoEvent("m_op_bits=0x%x, scanBits=%d, reducedHashValue=%x ",
+ tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits, tmpOpPtr.p->reducedHashValue.pack());
return;
}
@@ -8405,17 +8594,16 @@ Dbacc::execNODE_STATE_REP(Signal* signal
void
Dbacc::debug_lh_vars(const char* where)
{
- Uint32 b = fragrecptr.p->maxp + fragrecptr.p->p;
- Uint32 di = b >> fragrecptr.p->k;
+ Uint32 b = fragrecptr.p->level.getTop();
+ Uint32 di = fragrecptr.p->getPageNumber(b);
Uint32 ri = di >> 8;
ndbout
<< "DBACC: " << where << ":"
<< " frag:" << fragrecptr.p->myTableId
<< "/" << fragrecptr.p->myfid
- << " slack:" << (Int32)fragrecptr.p->slack
+ << " slack:" << fragrecptr.p->slack
<< "/" << fragrecptr.p->slackCheck
- << " maxp:" << fragrecptr.p->maxp
- << " p:" << fragrecptr.p->p
+ << " top:" << fragrecptr.p->level.getTop()
<< " di:" << di
<< " ri:" << ri
<< " full:" << fragrecptr.p->dirRangeFull
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-09-18 12:00:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2012-09-28 12:38:38 +0000
@@ -539,10 +539,6 @@ void Dblqh::execCONTINUEB(Signal* signal
{
jam();
c_lcp_complete_fragments.getPtr(fragptr);
- signal->theData[0] = fragptr.p->tabRef;
- signal->theData[1] = fragptr.p->fragId;
- BlockReference accRef = calcInstanceBlockRef(DBACC);
- sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB);
Ptr<Fragrecord> save = fragptr;
c_lcp_complete_fragments.next(fragptr);
@@ -12282,19 +12278,6 @@ Dblqh::execPREPARE_COPY_FRAG_REQ(Signal*
/**
*
*/
- if (cstartType == NodeState::ST_SYSTEM_RESTART)
- {
- jam();
- signal->theData[0] = fragptr.p->tabRef;
- signal->theData[1] = fragptr.p->fragId;
- BlockReference accRef = calcInstanceBlockRef(DBACC);
- sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB);
- }
-
-
- /**
- *
- */
fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED;
fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
fragptr.p->logFlag = Fragrecord::STATE_FALSE;
@@ -17085,10 +17068,6 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
*/
c_lcp_complete_fragments.add(fragptr);
- signal->theData[0] = tabptr.i;
- signal->theData[1] = fragId;
- BlockReference accRef = calcInstanceBlockRef(DBACC);
- sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB);
c_tup->disk_restart_lcp_id(tabptr.i, fragId, RNIL);
jamEntry();
return;
@@ -17241,18 +17220,9 @@ void Dblqh::execRESTORE_LCP_CONF(Signal*
c_lcp_restoring_fragments.remove(fragptr);
c_lcp_complete_fragments.add(fragptr);
- /**
- * Disable expand check in ACC
- * before running REDO
- */
tabptr.i = fragptr.p->tabRef;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
- signal->theData[0] = fragptr.p->tabRef;
- signal->theData[1] = fragptr.p->fragId;
- BlockReference accRef = calcInstanceBlockRef(DBACC);
- sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB);
-
if (!c_lcp_waiting_fragments.isEmpty())
{
send_restore_lcp(signal);
=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt 2012-01-23 17:37:12 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt 2012-09-28 12:55:26 +0000
@@ -72,7 +72,7 @@ SET_TARGET_PROPERTIES(mt_thr_config-t
PROPERTIES COMPILE_FLAGS "-DTEST_MT_THR_CONFIG")
TARGET_LINK_LIBRARIES(mt_thr_config-t ndbgeneral)
-FOREACH(testprog CountingPool DynArr256)
+FOREACH(testprog CountingPool DynArr256 LHLevel)
ADD_EXECUTABLE(${testprog}-t ${testprog}.cpp)
SET_TARGET_PROPERTIES(${testprog}-t PROPERTIES COMPILE_FLAGS "-DTAP_TEST")
TARGET_LINK_LIBRARIES(${testprog}-t ndbtest ndbkernel ndbsched ndberror
=== added file 'storage/ndb/src/kernel/vm/LHLevel.cpp'
--- a/storage/ndb/src/kernel/vm/LHLevel.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/LHLevel.cpp 2012-09-28 13:10:15 +0000
@@ -0,0 +1,277 @@
+/*
+ Copyright (c) 2012 Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifdef TAP_TEST
+
+#include <assert.h>
+#include <stdlib.h>
+
+#include <ndb_global.h>
+#include <NdbTap.hpp>
+
+#include "md5_hash.hpp"
+#include "random.h"
+#include "LHLevel.hpp"
+
+#ifndef UINT32_MAX
+#define UINT32_MAX (4294967295U)
+#endif
+
+#define BUCKSIZE 3
+
+struct elem
+{
+ Uint32 val;
+ Uint16 head;
+ static LHBits32 hash(Uint32 val)
+ {
+ return LHBits32(md5_hash((Uint64*)&val, 1));
+ }
+};
+
+void expand(LHLevel& lh, elem(*arr)[BUCKSIZE]);
+bool shrink(LHLevel& lh, elem(*arr)[BUCKSIZE]);
+bool insert_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 v);
+bool delete_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 w);
+int count_elem(LHLevel& lh, elem(*arr)[BUCKSIZE]);
+
+Uint64 c_inserts = 0;
+Uint64 c_expands = 0;
+Uint64 c_shrinks = 0;
+Uint64 c_deletes = 0;
+Uint64 c_moved = 0;
+Uint64 c_rehashed = 0;
+
+int main(int argc, char *argv[])
+{
+ unsigned int nelem = argc > 1 ? atoi(argv[1]) : 1000000;
+ plan(4);
+ elem(*arr)[BUCKSIZE] = new elem[nelem][BUCKSIZE];
+ bzero(arr, nelem * sizeof(elem[BUCKSIZE]));
+ LHLevel lh;
+ lh.clear();
+ expand(lh, arr);
+ Uint32 v = 0;
+ myRandom48Init(nelem);
+ for (int lap = 1; lap <= 2; lap++)
+ {
+ // Fill up table, with occasionally shrink
+ for (;v < UINT32_MAX;v++)
+ {
+ if (lh.getSize() * (BUCKSIZE - 1) < c_inserts - c_deletes)
+ {
+ if (!lh.isFull() && lh.getSize() < nelem)
+ expand(lh, arr);
+ else
+ break; /* Filled up */
+ }
+ insert_elem(lh, arr, v);
+ if (rand() % 100 == 0)
+ shrink(lh, arr);
+ }
+
+ // First lap, shrink to half
+ // Second lap, delete all
+ Uint32 lim = lh.getSize();
+ lim = lap * lim / 2;
+ while (v > 0 && lim > 0)
+ {
+ if (lh.isEmpty()) break;
+ Uint32 w = (Uint32)myRandom48(v + 1);
+ delete_elem(lh, arr, w);
+ delete_elem(lh, arr, v);
+ v--;
+ if (lh.getSize() * BUCKSIZE * 3 > c_inserts - c_deletes)
+ if (shrink(lh, arr))
+ lim--;
+ }
+
+ // Check table consistency
+ if (lap == 1)
+ {
+ int n = count_elem(lh, arr);
+ ok((n >= 0), "all element hash values match stored hash value and bucket address");
+ if (n < 0) n = -n;
+ ok((c_inserts == c_deletes + n),
+ "scanned element count (%u) matches difference between inserts (%llu) and deletes (%llu)",
+ n, c_inserts, c_deletes);
+ }
+ }
+ ok((c_inserts == c_deletes), "inserts (%llu) equals deletes (%llu)", c_inserts, c_deletes);
+ ok((c_expands == c_shrinks), "expands (%llu) equals shrinks (%llu)", c_expands, c_shrinks);
+ return exit_status();
+}
+
+bool delete_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 w)
+{
+ LHBits32 hash(elem::hash(w));
+ Uint32 addr = lh.getBucketNumber(hash);
+ int i;
+ bool found = false;
+ for (i = 0; i < BUCKSIZE && (arr[addr][i].head != 0); i++)
+ {
+ if (arr[addr][i].val == w)
+ {
+ found = true;
+ break;
+ }
+ }
+ if (found)
+ {
+ assert(arr[addr][i].head > 0);
+ int j;
+ c_deletes += arr[addr][i].head;
+ for (j = i + 1; j < BUCKSIZE; j++, i++)
+ arr[addr][i] = arr[addr][j];
+ bzero(&arr[addr][i], sizeof(arr[addr][i]));
+ return true;
+ }
+ else if (i < BUCKSIZE)
+ {
+ assert(arr[addr][i].head == 0);
+ }
+ return false;
+}
+
+bool shrink(LHLevel& lh, elem(*arr)[BUCKSIZE])
+{
+ assert(!lh.isEmpty());
+ Uint32 from;
+ Uint32 to;
+ if (!lh.getMergeBuckets(from, to))
+ {
+ int c = 0;
+ for (int i = 0; i < BUCKSIZE && arr[from][i].head != 0; i++)
+ c++;
+ // Only shrink if the only bucket is empty
+ if (c==0)
+ {
+ c_shrinks++;
+ lh.shrink();
+ return true;
+ }
+ return false;
+ }
+ assert(to < from);
+ int i, j;
+ int c = 0;
+ for (i = 0; i < BUCKSIZE && arr[to][i].head != 0; i++)
+ c++;
+ for (j = 0; j < BUCKSIZE && arr[from][j].head != 0; j++)
+ c++;
+ // Only shrink if both buckets element can fit in one bucket
+ if (c <= BUCKSIZE)
+ {
+ for (j = 0; j < BUCKSIZE && arr[from][j].head != 0; j++)
+ {
+ assert(i<BUCKSIZE);
+ arr[to][i] = arr[from][j];
+ bzero(&arr[from][j], sizeof(arr[from][j]));
+ i++;
+ }
+ c_shrinks++;
+ lh.shrink();
+ return true;
+ }
+ return false;
+}
+
+void expand(LHLevel& lh, elem(*arr)[BUCKSIZE])
+{
+ assert(!lh.isFull());
+ Uint32 from;
+ Uint32 to;
+ if (!lh.getSplitBucket(from, to))
+ {
+ // empty hash table, trivially expands to one bucket
+ c_expands++;
+ lh.expand();
+ return;
+ }
+ int i, j, k;
+ for (i = j = k = 0; i < BUCKSIZE && (arr[from][i].head != 0);
+ i++)
+ {
+ LHBits32 hash = elem::hash(arr[from][i].val);
+ if (lh.shouldMoveBeforeExpand(hash))
+ {
+ c_moved++;
+ arr[to][j] = arr[from][i];
+ j++;
+ }
+ else
+ {
+ if (k < i)
+ arr[from][k] = arr[from][i];
+ k++;
+ }
+ }
+ for (; j < BUCKSIZE; j++)
+ bzero(&arr[to][j], sizeof(arr[to][j]));
+ for (; k < BUCKSIZE; k++)
+ bzero(&arr[from][k], sizeof(arr[from][k]));
+ lh.expand();
+ c_expands++;
+}
+
+bool insert_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 v)
+{
+ LHBits32 hash = elem::hash(v);
+ Uint32 addr = lh.getBucketNumber(hash);
+ int i;
+ bool found = false;
+ for (i = 0; i < BUCKSIZE && (arr[addr][i].head != 0); i++)
+ {
+ if (arr[addr][i].val == v)
+ {
+ found = true;
+ break;
+ }
+ }
+ if (found)
+ {
+ arr[addr][i].head++;
+ c_inserts++;
+ }
+ else if (i < BUCKSIZE)
+ {
+ arr[addr][i].head = 1;
+ arr[addr][i].val = v;
+ c_inserts++;
+ }
+ else
+ {
+ return false;
+ }
+ return true;
+}
+
+int count_elem(LHLevel& lh, elem(*arr)[BUCKSIZE])
+{
+ int elements = 0;
+ int failures = 0;
+ if (!lh.isEmpty()) for (Uint32 addr = 0; addr <= lh.getTop(); addr++)
+ {
+ for (int i = 0; i < BUCKSIZE && arr[addr][i].head != 0; i++)
+ {
+ elements++;
+ }
+ }
+ return failures > 0 ? -elements : elements;
+}
+
+#endif
=== added file 'storage/ndb/src/kernel/vm/LHLevel.hpp'
--- a/storage/ndb/src/kernel/vm/LHLevel.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/LHLevel.hpp 2012-09-28 13:11:17 +0000
@@ -0,0 +1,456 @@
+/*
+ Copyright (c) 2012 Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_LHLEVEL_H
+#define NDB_LHLEVEL_H
+
+/**
+ * LHLevel keeps level information for linear hashing,
+ **/
+
+/**
+ * Supports up to UINT32_MAX number of bucket addresses.
+ * If support for more is needed, one also have to
+ * increase hash key size.
+ **/
+
+#include <assert.h>
+#include "Bitmask.hpp"
+
+template<typename Int> class LHBits
+{
+public:
+ explicit LHBits();
+ explicit LHBits(Int bits);
+ template<typename Int2> LHBits(LHBits<Int2> const& bits);
+ ~LHBits();
+ LHBits& operator=(LHBits const&);
+
+ void clear();
+static LHBits<Int> unpack(Int packed);
+ Int pack() const;
+ bool match(LHBits other) const;
+ void shift_out();
+ void shift_out(Uint8 bits);
+ void shift_in(bool bit);
+ void shift_in(Uint8 bits, Int value);
+ Uint8 valid_bits() const;
+ bool valid_bits(Int bits) const;
+ bool valid_bit(Int bit) const;
+ Int get_bits(Int bits) const;
+ Int get_bit(Int bit) const;
+private:
+ Int highbit() const;
+ Int m_bits;
+};
+
+typedef LHBits<Uint16> LHBits16;
+typedef LHBits<Uint32> LHBits32;
+
+class LHLevel
+{
+public:
+ explicit LHLevel();
+ explicit LHLevel(Uint32 size);
+ ~LHLevel() {}
+private:
+ LHLevel(LHLevel const&); // Not to be implemented
+ LHLevel& operator=(LHLevel const&); // Not to be implemented
+public:
+ void clear();
+ bool isEmpty() const;
+ bool isFull() const;
+ Uint32 getSize() const;
+ void setSize(Uint32 size);
+ Uint32 getTop() const;
+public:
+ Uint32 getBucketNumber(LHBits32 hash_value) const;
+ bool getSplitBucket(Uint32& from, Uint32& to) const; // true if data move needed
+ bool shouldMoveBeforeExpand(LHBits32 hash_value) const;
+ void expand();
+ bool getMergeBuckets(Uint32& from, Uint32& to) const; // true if data move needed
+ void shrink();
+private:
+ enum {
+ ADDR_MAX = 0xFFFFFFFEU,
+ MAX_SIZE = 0xFFFFFFFFU,
+ MAXP_EMPTY = 0xFFFFFFFFU,
+ };
+protected:
+ Uint32 max_size() const;
+ Uint8 hashcheckbit() const { return m_hashcheckbit; }
+ Uint32 maxp() const { return m_maxp; }
+ Uint32 p() const { return m_p; }
+private:
+ Uint32 m_maxp;
+ Uint32 m_p;
+ Uint8 m_hashcheckbit;
+};
+
+class LocalLHLevel : public LHLevel
+{
+public:
+ explicit LocalLHLevel(Uint32& size): LHLevel(size), m_src(size) {}
+ ~LocalLHLevel() { m_src = getSize(); }
+private:
+ LocalLHLevel(const LocalLHLevel&); // Not to be implemented
+ LocalLHLevel& operator=(const LocalLHLevel&); // Not to be implemented
+ Uint32& m_src;
+};
+
+/**
+ * LHLevelRH is LHLevel extended with support for
+ * a reduced hash value suitable to store with
+ * element in hash table.
+ */
+
+class LHLevelRH: public LHLevel
+{
+public:
+ explicit LHLevelRH(): LHLevel() {}
+ explicit LHLevelRH(Uint32 const& size): LHLevel(size) {}
+ ~LHLevelRH() {}
+private:
+ LHLevelRH(LHLevelRH const&); // Not to be implemented
+ LHLevelRH& operator=(LHLevelRH const&); // Not to be implemented
+public:
+ LHBits16 reduce(LHBits32 hash_value) const;
+ Uint8 getNeededValidBits(Uint8 bits) const;
+ LHBits32 enlarge(LHBits16 reduced_hash_value, Uint32 bucket_number) const;
+};
+
+/**
+ * Implementation LHBits<>
+ **/
+
+template<typename Int> inline LHBits<Int>::LHBits()
+: m_bits(1)
+{
+}
+
+template<typename Int> inline LHBits<Int>::LHBits(Int bits)
+: m_bits(bits | highbit())
+{
+}
+
+template<typename Int> template<typename Int2> inline LHBits<Int>::LHBits(LHBits<Int2> const& bits)
+: m_bits(bits.pack())
+{
+ if (m_bits != bits.pack())
+ m_bits |= highbit();
+}
+
+template<typename Int> inline LHBits<Int>::~LHBits()
+{
+}
+
+template<typename Int> inline LHBits<Int>& LHBits<Int>::operator=(LHBits const& src)
+{
+ m_bits = src.m_bits;
+ return *this;
+}
+
+template<typename Int> inline void LHBits<Int>::clear()
+{
+ m_bits = 1;
+}
+
+template<typename Int> inline LHBits<Int> LHBits<Int>::unpack(Int packed)
+{
+ LHBits<Int> x;
+ x.m_bits = packed;
+ return x;
+}
+
+template<typename Int> inline Int LHBits<Int>::pack() const
+{
+ return m_bits;
+}
+
+template<typename Int> inline bool LHBits<Int>::match(LHBits<Int> other) const
+{
+ assert(sizeof(Int) <= sizeof(Uint32));
+ assert (m_bits != 0) ;
+ assert (other.m_bits != 0);
+ assert(sizeof(Int) <= sizeof(Uint32) &&
+ (m_bits != 0) &&
+ (other.m_bits != 0));
+ // Warning: dont reduce to one shift below.
+ // << is only defined for right values < 32.
+ // and since m_bits != 0, clz(MIN(...)) is guaranteed <= 31
+ return ((Uint32(m_bits ^ other.m_bits) << BitmaskImpl::clz(MIN(m_bits, other.m_bits))) << 1) == 0;
+}
+
+template<typename Int> inline void LHBits<Int>::shift_out()
+{
+ m_bits >>= 1;
+ if (m_bits == 0)
+ m_bits++;
+}
+
+template<typename Int> inline void LHBits<Int>::shift_out(Uint8 bits)
+{
+ assert(bits < 8 * sizeof(m_bits));
+ m_bits >>= bits;
+ if (m_bits == 0)
+ m_bits++;
+}
+
+template<typename Int> inline void LHBits<Int>::shift_in(bool bit)
+{
+ if (m_bits >= highbit())
+ m_bits |= (highbit() >> 1);
+ m_bits = (m_bits << 1) | (bit ? 1 : 0);
+}
+
+template<typename Int> inline void LHBits<Int>::shift_in(Uint8 bits, Int value)
+{
+ assert(m_bits != 0);
+ assert(bits < 8 * sizeof(m_bits));
+ assert(value < (Int(1) << bits));
+ if (bits == 0)
+ return;
+ if (m_bits >= (highbit() >> (bits - 1)))
+ m_bits = highbit() | (m_bits << bits) | value;
+ else
+ m_bits = (m_bits << bits) | value;
+}
+
+template<typename Int> inline Uint8 LHBits<Int>::valid_bits() const
+{
+ assert(m_bits != 0);
+ return BitmaskImpl::fls(m_bits);
+}
+
+template<typename Int> inline bool LHBits<Int>::valid_bits(Int bits) const
+{
+ // Only allow bits to be on the form 2^n-1
+ assert((m_bits != 0) &&
+ (((bits + 1U) | bits) == (bits << 1U) + 1U)); // bits is 0..01..1
+ return m_bits > bits;
+}
+
+template<typename Int> inline bool LHBits<Int>::valid_bit(Int bit) const
+{
+ // Only allow bit to be on the form 2^n
+ assert((m_bits != 0) &&
+ ((((bit - 1U) | bit) >> 1U) == bit - 1U)); // bits is 0..010..0
+ return (m_bits >> 1U) >= bit;
+}
+
+template<typename Int> inline Int LHBits<Int>::get_bits(Int bits) const
+{
+ assert(valid_bits(bits));
+ return m_bits & bits;
+}
+
+template<typename Int> inline Int LHBits<Int>::get_bit(Int bit) const
+{
+ assert(valid_bit(bit));
+ return m_bits & bit;
+}
+
+template<typename Int> inline Int LHBits<Int>::highbit() const
+{
+ return 1 << (sizeof(Int) * 8 - 1);
+}
+
+/**
+ * Implementaion LHLevel
+ **/
+
+inline LHLevel::LHLevel()
+{
+ setSize(0);
+}
+
+inline LHLevel::LHLevel(Uint32 _size)
+{
+ setSize(_size);
+}
+
+inline void LHLevel::clear()
+{
+ m_maxp = MAXP_EMPTY;
+ m_p = 0;
+}
+
+inline bool LHLevel::isEmpty() const
+{
+ return maxp() == MAXP_EMPTY;
+}
+
+inline bool LHLevel::isFull() const
+{
+ return !isEmpty() && (getTop() == ADDR_MAX);
+}
+
+inline Uint32 LHLevel::max_size() const
+{
+ return MAX_SIZE;
+}
+
+inline Uint32 LHLevel::getSize() const
+{
+ assert(!isEmpty() || (maxp() + 1 + p() == 0));
+ return maxp() + 1 + p();
+}
+
+inline void LHLevel::setSize(Uint32 size)
+{
+ assert(size <= max_size());
+ if (size == 0)
+ clear();
+ else
+ {
+ m_hashcheckbit = BitmaskImpl::fls(size);
+ m_maxp = (1 << hashcheckbit()) - 1;
+ m_p = size - 1 - maxp();
+ }
+}
+
+inline Uint32 LHLevel::getTop() const
+{
+ assert(!isEmpty());
+ return maxp() + p();
+}
+
+inline Uint32 LHLevel::getBucketNumber(LHBits32 hash_value) const
+{
+ assert(!isEmpty());
+ Uint32 addr = hash_value.get_bits(maxp());
+ if (addr < p())
+ {
+ addr |= hash_value.get_bit(maxp() + 1);
+ }
+ return addr;
+}
+
+inline bool LHLevel::getSplitBucket(Uint32& from, Uint32& to) const
+{
+ assert(!isFull());
+
+ from = p();
+ to = getSize(); // == getTop() + 1
+
+ // true if data move needed, that is, it was not empty
+ return to > 0;
+}
+
+inline void LHLevel::expand()
+{
+ assert(!isFull());
+
+ if (isEmpty())
+ {
+ m_p = 0;
+ m_hashcheckbit = 0;
+ m_maxp = 0;
+ }
+ else if (p() == maxp())
+ {
+ m_maxp = (maxp() << 1) | 1;
+ m_hashcheckbit ++;
+ m_p = 0;
+ }
+ else
+ {
+ m_p ++;
+ }
+}
+
+inline bool LHLevel::shouldMoveBeforeExpand(LHBits32 hash_value) const
+{
+ return hash_value.get_bit(1 << hashcheckbit());
+}
+
+inline bool LHLevel::getMergeBuckets(Uint32& from, Uint32& to) const
+{
+ assert(!isEmpty());
+
+ from = getTop();
+
+ if (likely(p() != 0))
+ {
+ to = p() - 1;
+ }
+ else
+ {
+ to = maxp() >> 1;
+ }
+
+ // true if data move needed, that is, all buckets disappeared
+ return from > 0;
+}
+
+inline void LHLevel::shrink()
+{
+ assert(!isEmpty());
+
+ if (p() != 0)
+ {
+ m_p --;
+ }
+ else if (maxp() == 0)
+ {
+ m_maxp --; // == MAXP_EMPTY
+ }
+ else
+ {
+ m_maxp >>= 1;
+ m_hashcheckbit --;
+ m_p = m_maxp;
+ }
+}
+
+/**
+ * Implementation LHLevelRH
+ **/
+
+inline LHBits16 LHLevelRH::reduce(LHBits32 hash_value) const
+{
+ assert(!isEmpty());
+
+ if (!hash_value.valid_bits(maxp()))
+ return LHBits16();
+
+ Uint32 addr = hash_value.get_bits(maxp());
+ LHBits32 hv(hash_value);
+ hv.shift_out(hashcheckbit());
+ if (addr < p())
+ hv.shift_out();
+ return LHBits16(hv);
+}
+
+inline Uint8 LHLevelRH::getNeededValidBits(Uint8 bits) const
+{
+ Uint8 const usable_bits_in_hash_value = 4 * sizeof(LHBits32) - 1; // == 31
+ return MIN(bits, usable_bits_in_hash_value - hashcheckbit());
+}
+
+inline LHBits32 LHLevelRH::enlarge(LHBits16 reduced_hash_value, Uint32 bucket_number) const
+{
+ assert(!isEmpty());
+
+ Uint32 addr = bucket_number & maxp();
+ LHBits32 hv(reduced_hash_value);
+ Uint8 addr_bits = hashcheckbit() + ((addr < p()) ? 1 : 0);
+ hv.shift_in(addr_bits, bucket_number);
+ return hv;
+}
+
+#endif
=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am 2012-03-20 13:50:15 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am 2012-09-28 12:55:26 +0000
@@ -113,6 +113,11 @@ DynArr256_t_SOURCES = DynArr256.cpp
DynArr256_t_LDFLAGS = @ndb_bin_am_ldflags@
DynArr256_t_LDADD = $(test_ldadd)
+LHLevel_t_CXXFLAGS = -DTAP_TEST
+LHLevel_t_SOURCES = LHLevel.cpp
+LHLevel_t_LDFLAGS = @ndb_bin_am_ldflags@
+LHLevel_t_LDADD = $(test_ldadd)
+
testSectionReader_CXXFLAGS = -DUNIT_TEST
testSectionReader_SOURCES = SectionReader.cpp
testSectionReader_LDFLAGS = @ndb_bin_am_ldflags@
@@ -143,7 +148,7 @@ mt_thr_config_t_LDADD = \
$(top_builddir)/storage/ndb/src/common/util/libgeneral.la \
$(top_builddir)/mysys/libmysyslt.la
-noinst_PROGRAMS = mt_thr_config-t CountingPool-t DynArr256-t
+noinst_PROGRAMS = mt_thr_config-t CountingPool-t DynArr256-t LHLevel-t
mt_send_t_CXXFLAGS = -DTAP_TEST
mt_send_t_SOURCES = mt-send-t.cpp
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (mauritz.sundell:4991 to 4992) | Mauritz Sundell | 28 Sep |