4881 Mauritz Sundell 2012-03-13
ndb - dbacc - use DynArr256 for page directory
this will relax one size limitation of hash index
of one partition, that is the number of rows are
now not limited to ~45M, but note where are other
limitations that can be more restrictive.
pages are still from static pool (IndexMemory).
localkey length into dbtup are still only one word.
modified:
storage/ndb/include/kernel/kernel_config_parameters.h
storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
storage/ndb/src/kernel/blocks/record_types.hpp
storage/ndb/src/kernel/vm/Configuration.cpp
4880 Mauritz Sundell 2012-03-13
ndb - dbacc refactoring
remove some dead code (members and methods).
introduce inline mul_ZBUF_SIZE() function and use
it instead of shift arithmetics.
modified:
storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
4879 Mauritz Sundell 2012-03-13
ndb - make DynArr256 release empty pages
DynArr256 use pages with 30 nodes each.
Each node can contain 256 words.
DynArr256Pool are used for seize and release
of nodes and it allocates new pages if necessary
and now also releases pages when no nodes on the
page are allocated.
The pool keeps a double linked list of pages with
at least one node free. Allocation of a node are
made from the first page in that list.
Within a page the first free node are used based
on its index.
The last free node are used to keep the links of
the double linked list of pages with free nodes.
Which nodes that are free are determined by bits
in the headers magic fields.
modified:
storage/ndb/src/kernel/vm/DynArr256.cpp
storage/ndb/src/kernel/vm/DynArr256.hpp
4878 Mauritz Sundell 2012-03-13
ndb - stricter DynArr256 in debug
If one compile with VM_TRACE one can not
get values from index above the highest
index with value set.
modified:
storage/ndb/src/kernel/vm/DynArr256.cpp
storage/ndb/src/kernel/vm/DynArr256.hpp
4877 Mauritz Sundell 2012-03-13
ndb - add trim function to DynArr256
free memory from end of array and downwards as long
as values are missing or RNIL.
modified:
storage/ndb/src/kernel/vm/DynArr256.cpp
storage/ndb/src/kernel/vm/DynArr256.hpp
4876 Frazer Clement 2012-03-12
Bug#13428909 CONFLICT DETECTION OPERATION EXECUTION ERROR HANDLING INCOMPLETE
- Error handling in some conflict detection/resolution scenarios was incomplete
- This patch improves error handling and reporting
- Also, reporting of misconfiguration is improved.
- Infrastructure for manual testing is added, automated testing not setup yet.
modified:
mysql-test/suite/ndb_rpl/t/disabled.def
sql/ha_ndbcluster.cc
sql/ndb_conflict.cc
sql/ndb_conflict_trans.cc
storage/ndb/test/ndbapi/bench/asyncGenerator.cpp
storage/ndb/test/ndbapi/bench/dbPopulate.cpp
storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp
storage/ndb/test/ndbapi/bench/mainPopulate.cpp
storage/ndb/test/ndbapi/bench/ndb_error.hpp
storage/ndb/test/ndbapi/bench/testData.h
storage/ndb/test/ndbapi/bench/userInterface.h
=== modified file 'storage/ndb/include/kernel/kernel_config_parameters.h'
--- a/storage/ndb/include/kernel/kernel_config_parameters.h 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/kernel_config_parameters.h 2012-03-13 11:57:27 +0000
@@ -21,8 +21,6 @@
#define PRIVATE_BASE 14000
-#define CFG_ACC_DIR_RANGE (PRIVATE_BASE + 1)
-#define CFG_ACC_DIR_ARRAY (PRIVATE_BASE + 2)
#define CFG_ACC_FRAGMENT (PRIVATE_BASE + 3)
#define CFG_ACC_OP_RECS (PRIVATE_BASE + 4)
#define CFG_ACC_OVERFLOW_RECS (PRIVATE_BASE + 5)
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2011-12-01 10:45:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2012-03-13 11:57:27 +0000
@@ -24,6 +24,7 @@
#endif
#include <pc.hpp>
+#include <DynArr256.hpp>
#include <SimulatedBlock.hpp>
#ifdef DBACC_C
@@ -103,8 +104,6 @@ ndbout << "Ptr: " << ptr.p->word32 << "
#define ZDEFAULT_LIST 3
#define ZWORDS_IN_PAGE 2048
#define ZADDFRAG 0
-#define ZDIRARRAY 68
-#define ZDIRRANGESIZE 65
//#define ZEMPTY_FRAGMENT 0
#define ZFRAGMENTSIZE 64
#define ZFIRSTTIME 1
@@ -154,6 +153,7 @@ ndbout << "Ptr: " << ptr.p->word32 << "
#define ZREL_ROOT_FRAG 5
#define ZREL_FRAG 6
#define ZREL_DIR 7
+#define ZREL_ODIR 8
/* ------------------------------------------------------------------------- */
/* ERROR CODES */
@@ -180,6 +180,18 @@ ndbout << "Ptr: " << ptr.p->word32 << "
#define ZTO_OP_STATE_ERROR 631
#define ZTOO_EARLY_ACCESS_ERROR 632
#define ZDIR_RANGE_FULL_ERROR 633 // on fragment
+
+#if ZBUF_SIZE != ((1 << ZSHIFT_PLUS) - (1 << ZSHIFT_MINUS))
+#error ZBUF_SIZE != ((1 << ZSHIFT_PLUS) - (1 << ZSHIFT_MINUS))
+#endif
+
+static
+inline
+Uint32 mul_ZBUF_SIZE(Uint32 i)
+{
+ return (i << ZSHIFT_PLUS) - (i << ZSHIFT_MINUS);
+}
+
#endif
class ElementHeader {
@@ -305,24 +317,6 @@ enum State {
// Records
/* --------------------------------------------------------------------------------- */
-/* DIRECTORY RANGE */
-/* --------------------------------------------------------------------------------- */
- struct DirRange {
- Uint32 dirArray[256];
- }; /* p2c: size = 1024 bytes */
-
- typedef Ptr<DirRange> DirRangePtr;
-
-/* --------------------------------------------------------------------------------- */
-/* DIRECTORYARRAY */
-/* --------------------------------------------------------------------------------- */
-struct Directoryarray {
- Uint32 pagep[256];
-}; /* p2c: size = 1024 bytes */
-
- typedef Ptr<Directoryarray> DirectoryarrayPtr;
-
-/* --------------------------------------------------------------------------------- */
/* FRAGMENTREC. ALL INFORMATION ABOUT FRAMENT AND HASH TABLE IS SAVED IN FRAGMENT */
/* REC A POINTER TO FRAGMENT RECORD IS SAVED IN ROOTFRAGMENTREC FRAGMENT */
/* --------------------------------------------------------------------------------- */
@@ -355,7 +349,6 @@ struct Fragmentrec {
Uint32 expReceiveIndex;
Uint32 expReceiveForward;
Uint32 expSenderDirIndex;
- Uint32 expSenderDirptr;
Uint32 expSenderIndex;
Uint32 expSenderPageptr;
@@ -369,9 +362,8 @@ struct Fragmentrec {
// in its turn references the pages) for the bucket pages and the overflow
// bucket pages.
//-----------------------------------------------------------------------------
- Uint32 directory;
- Uint32 dirsize;
- Uint32 overflowdir;
+ DynArr256::Head directory;
+ DynArr256::Head overflowdir;
Uint32 lastOverIndex;
//-----------------------------------------------------------------------------
@@ -458,12 +450,10 @@ struct Fragmentrec {
// hashcheckbit is the bit to check whether to send element to split bucket or not
// k (== 6) is the number of buckets per page
// lhfragbits is the number of bits used to calculate the fragment id
-// lhdirbits is the number of bits used to calculate the page id
//-----------------------------------------------------------------------------
Uint8 hashcheckbit;
Uint8 k;
Uint8 lhfragbits;
- Uint8 lhdirbits;
//-----------------------------------------------------------------------------
// nodetype can only be STORED in this release. Is currently only set, never read
@@ -692,10 +682,7 @@ private:
void releaseFragResources(Signal* signal, Uint32 fragIndex);
void releaseRootFragRecord(Signal* signal, RootfragmentrecPtr rootPtr);
void releaseRootFragResources(Signal* signal, Uint32 tableId);
- void releaseDirResources(Signal* signal,
- Uint32 fragIndex,
- Uint32 dirIndex,
- Uint32 startIndex);
+ void releaseDirResources(Signal* signal);
void releaseDirectoryResources(Signal* signal,
Uint32 fragIndex,
Uint32 dirIndex,
@@ -707,8 +694,6 @@ private:
void initScanFragmentPart(Signal* signal);
Uint32 checkScanExpand(Signal* signal);
Uint32 checkScanShrink(Signal* signal);
- void initialiseDirRec(Signal* signal);
- void initialiseDirRangeRec(Signal* signal);
void initialiseFragRec(Signal* signal);
void initialiseFsConnectionRec(Signal* signal);
void initialiseFsOpRec(Signal* signal);
@@ -776,6 +761,9 @@ private:
void seizeRightlist(Signal* signal);
Uint32 readTablePk(Uint32 lkey1, Uint32 lkey2, Uint32 eh, OperationrecPtr);
Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
+ Uint32 getPagePtr(DynArr256::Head&, Uint32);
+ bool setPagePtr(DynArr256::Head& directory, Uint32 index, Uint32 ptri);
+ Uint32 unsetPagePtr(DynArr256::Head& directory, Uint32 index);
void getdirindex(Signal* signal);
void commitdelete(Signal* signal);
void deleteElement(Signal* signal);
@@ -809,17 +797,13 @@ private:
void putOpInFragWaitQue(Signal* signal);
void putOverflowRecInFrag(Signal* signal);
void putRecInFreeOverdir(Signal* signal);
- void releaseDirectory(Signal* signal);
- void releaseDirrange(Signal* signal);
void releaseFsConnRec(Signal* signal);
void releaseFsOpRec(Signal* signal);
void releaseOpRec(Signal* signal);
void releaseOverflowRec(Signal* signal);
void releaseOverpage(Signal* signal);
void releasePage(Signal* signal);
- void releaseLogicalPage(Fragmentrec * fragP, Uint32 logicalPageId);
void seizeDirectory(Signal* signal);
- void seizeDirrange(Signal* signal);
void seizeFragrec(Signal* signal);
void seizeFsConnectRec(Signal* signal);
void seizeFsOpRec(Signal* signal);
@@ -870,27 +854,9 @@ private:
// Variables
/* --------------------------------------------------------------------------------- */
-/* DIRECTORY RANGE */
+/* DIRECTORY */
/* --------------------------------------------------------------------------------- */
- DirRange *dirRange;
- DirRangePtr expDirRangePtr;
- DirRangePtr gnsDirRangePtr;
- DirRangePtr newDirRangePtr;
- DirRangePtr rdDirRangePtr;
- DirRangePtr nciOverflowrangeptr;
- Uint32 cdirrangesize;
- Uint32 cfirstfreeDirrange;
-/* --------------------------------------------------------------------------------- */
-/* DIRECTORYARRAY */
-/* --------------------------------------------------------------------------------- */
- Directoryarray *directoryarray;
- DirectoryarrayPtr expDirptr;
- DirectoryarrayPtr rdDirptr;
- DirectoryarrayPtr sdDirptr;
- DirectoryarrayPtr nciOverflowDirptr;
- Uint32 cdirarraysize;
- Uint32 cdirmemory;
- Uint32 cfirstfreedir;
+ DynArr256Pool directoryPool;
/* --------------------------------------------------------------------------------- */
/* FRAGMENTREC. ALL INFORMATION ABOUT FRAMENT AND HASH TABLE IS SAVED IN FRAGMENT */
/* REC A POINTER TO FRAGMENT RECORD IS SAVED IN ROOTFRAGMENTREC FRAGMENT */
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp 2011-12-01 10:45:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp 2012-03-13 11:57:27 +0000
@@ -24,18 +24,17 @@
void Dbacc::initData()
{
- cdirarraysize = ZDIRARRAY;
coprecsize = ZOPRECSIZE;
cpagesize = ZPAGESIZE;
ctablesize = ZTABLESIZE;
cfragmentsize = ZFRAGMENTSIZE;
- cdirrangesize = ZDIRRANGESIZE;
coverflowrecsize = ZOVERFLOWRECSIZE;
cscanRecSize = ZSCAN_REC_SIZE;
-
- dirRange = 0;
- directoryarray = 0;
+ Pool_context pc;
+ pc.m_block = this;
+ directoryPool.init(RT_DBACC_DIRECTORY, pc);
+
fragmentrec = 0;
operationrec = 0;
overflowRecord = 0;
@@ -102,14 +101,6 @@ void Dbacc::initRecords()
sizeof(Operationrec),
coprecsize);
- dirRange = (DirRange*)allocRecord("DirRange",
- sizeof(DirRange),
- cdirrangesize);
-
- directoryarray = (Directoryarray*)allocRecord("Directoryarray",
- sizeof(Directoryarray),
- cdirarraysize);
-
fragmentrec = (Fragmentrec*)allocRecord("Fragmentrec",
sizeof(Fragmentrec),
cfragmentsize);
@@ -166,16 +157,7 @@ Dbacc::Dbacc(Block_context& ctx, Uint32
#ifdef VM_TRACE
{
- void* tmp[] = { &expDirRangePtr,
- &gnsDirRangePtr,
- &newDirRangePtr,
- &rdDirRangePtr,
- &nciOverflowrangeptr,
- &expDirptr,
- &rdDirptr,
- &sdDirptr,
- &nciOverflowDirptr,
- &fragrecptr,
+ void* tmp[] = { &fragrecptr,
&operationRecPtr,
&idrOperationRecPtr,
&mlpqOperPtr,
@@ -231,14 +213,6 @@ Dbacc::Dbacc(Block_context& ctx, Uint32
Dbacc::~Dbacc()
{
- deallocRecord((void **)&dirRange, "DirRange",
- sizeof(DirRange),
- cdirrangesize);
-
- deallocRecord((void **)&directoryarray, "Directoryarray",
- sizeof(Directoryarray),
- cdirarraysize);
-
deallocRecord((void **)&fragmentrec, "Fragmentrec",
sizeof(Fragmentrec),
cfragmentsize);
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2011-12-23 14:34:57 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2012-03-13 11:57:27 +0000
@@ -99,12 +99,10 @@ void Dbacc::execCONTINUEB(Signal* signal
break;
}
case ZREL_DIR:
+ case ZREL_ODIR:
{
jam();
- Uint32 fragIndex = signal->theData[1];
- Uint32 dirIndex = signal->theData[2];
- Uint32 startIndex = signal->theData[3];
- releaseDirResources(signal, fragIndex, dirIndex, startIndex);
+ releaseDirResources(signal);
break;
}
@@ -215,11 +213,9 @@ void Dbacc::initialiseRecordsLab(Signal*
break;
case 4:
jam();
- initialiseDirRec(signal);
break;
case 5:
jam();
- initialiseDirRangeRec(signal);
break;
case 6:
jam();
@@ -296,8 +292,6 @@ void Dbacc::execREAD_CONFIG_REQ(Signal*
m_ctx.m_config.getOwnConfigIterator();
ndbrequire(p != 0);
- ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_DIR_RANGE, &cdirrangesize));
- ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_DIR_ARRAY, &cdirarraysize));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_FRAGMENT, &cfragmentsize));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_OP_RECS, &coprecsize));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_OVERFLOW_RECS,
@@ -336,48 +330,6 @@ void Dbacc::sttorrysignalLab(Signal* sig
}//Dbacc::sttorrysignalLab()
/* --------------------------------------------------------------------------------- */
-/* INITIALISE_DIR_REC */
-/* INITIALATES THE DIRECTORY RECORDS. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::initialiseDirRec(Signal* signal)
-{
- DirectoryarrayPtr idrDirptr;
- ndbrequire(cdirarraysize > 0);
- for (idrDirptr.i = 0; idrDirptr.i < cdirarraysize; idrDirptr.i++) {
- refresh_watch_dog();
- ptrAss(idrDirptr, directoryarray);
- for (Uint32 i = 0; i <= 255; i++) {
- idrDirptr.p->pagep[i] = RNIL;
- }//for
- }//for
- cdirmemory = 0;
- cfirstfreedir = RNIL;
-}//Dbacc::initialiseDirRec()
-
-/* --------------------------------------------------------------------------------- */
-/* INITIALISE_DIR_RANGE_REC */
-/* INITIALATES THE DIR_RANGE RECORDS. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::initialiseDirRangeRec(Signal* signal)
-{
- DirRangePtr idrDirRangePtr;
-
- ndbrequire(cdirrangesize > 0);
- for (idrDirRangePtr.i = 0; idrDirRangePtr.i < cdirrangesize; idrDirRangePtr.i++) {
- refresh_watch_dog();
- ptrAss(idrDirRangePtr, dirRange);
- idrDirRangePtr.p->dirArray[0] = idrDirRangePtr.i + 1;
- for (Uint32 i = 1; i < 256; i++) {
- idrDirRangePtr.p->dirArray[i] = RNIL;
- }//for
- }//for
- idrDirRangePtr.i = cdirrangesize - 1;
- ptrAss(idrDirRangePtr, dirRange);
- idrDirRangePtr.p->dirArray[0] = RNIL;
- cfirstfreeDirrange = 0;
-}//Dbacc::initialiseDirRangeRec()
-
-/* --------------------------------------------------------------------------------- */
/* INITIALISE_FRAG_REC */
/* INITIALATES THE FRAGMENT RECORDS. */
/* --------------------------------------------------------------------------------- */
@@ -545,55 +497,22 @@ void Dbacc::execACCFRAGREQ(Signal* signa
addFragRefuse(signal, ZFULL_FRAGRECORD_ERROR);
return;
}//if
- if (cfirstfreeDirrange == RNIL) {
- jam();
- releaseFragRecord(signal, fragrecptr);
- addFragRefuse(signal, ZDIR_RANGE_ERROR);
- return;
- } else {
- jam();
- seizeDirrange(signal);
- }//if
-
- fragrecptr.p->directory = newDirRangePtr.i;
- seizeDirectory(signal);
- if (tresult < ZLIMIT_OF_ERROR) {
- jam();
- newDirRangePtr.p->dirArray[0] = sdDirptr.i;
- } else {
- jam();
- addFragRefuse(signal, tresult);
- return;
- }//if
-
seizePage(signal);
if (tresult > ZLIMIT_OF_ERROR) {
jam();
addFragRefuse(signal, tresult);
return;
}//if
- sdDirptr.p->pagep[0] = spPageptr.i;
+ if (!setPagePtr(fragrecptr.p->directory, 0, spPageptr.i))
+ {
+ jam();
+ addFragRefuse(signal, ZDIR_RANGE_FULL_ERROR);
+ return;
+ }
+
tipPageId = 0;
inpPageptr = spPageptr;
initPage(signal);
- if (cfirstfreeDirrange == RNIL) {
- jam();
- addFragRefuse(signal, ZDIR_RANGE_ERROR);
- return;
- } else {
- jam();
- seizeDirrange(signal);
- }//if
- fragrecptr.p->overflowdir = newDirRangePtr.i;
- seizeDirectory(signal);
- if (tresult < ZLIMIT_OF_ERROR) {
- jam();
- newDirRangePtr.p->dirArray[0] = sdDirptr.i;
- } else {
- jam();
- addFragRefuse(signal, tresult);
- return;
- }//if
Uint32 userPtr = req->userPtr;
BlockReference retRef = req->userRef;
@@ -723,14 +642,24 @@ void Dbacc::releaseFragResources(Signal*
regFragPtr.i = fragIndex;
ptrCheckGuard(regFragPtr, cfragmentsize, fragmentrec);
verifyFragCorrect(regFragPtr);
- if (regFragPtr.p->directory != RNIL) {
- jam();
- releaseDirResources(signal, regFragPtr.i, regFragPtr.p->directory, 0);
- regFragPtr.p->directory = RNIL;
- } else if (regFragPtr.p->overflowdir != RNIL) {
+ if (!regFragPtr.p->directory.isEmpty()) {
jam();
- releaseDirResources(signal, regFragPtr.i, regFragPtr.p->overflowdir, 0);
- regFragPtr.p->overflowdir = RNIL;
+ DynArr256::ReleaseIterator iter;
+ DynArr256 dir(directoryPool, regFragPtr.p->directory);
+ dir.init(iter);
+ signal->theData[0] = ZREL_DIR;
+ signal->theData[1] = regFragPtr.i;
+ memcpy(&signal->theData[2], &iter, sizeof(iter));
+ sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2 + sizeof(iter) / 4, JBB);
+ } else if (!regFragPtr.p->overflowdir.isEmpty()) {
+ jam();
+ DynArr256::ReleaseIterator iter;
+ DynArr256 dir(directoryPool, regFragPtr.p->overflowdir);
+ dir.init(iter);
+ signal->theData[0] = ZREL_ODIR;
+ signal->theData[1] = regFragPtr.i;
+ memcpy(&signal->theData[2], &iter, sizeof(iter));
+ sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2 + sizeof(iter) / 4, JBB);
} else if (regFragPtr.p->firstOverflowRec != RNIL) {
jam();
releaseOverflowResources(signal, regFragPtr);
@@ -752,58 +681,68 @@ void Dbacc::verifyFragCorrect(Fragmentre
ndbrequire(regFragPtr.p->lockOwnersList == RNIL);
}//Dbacc::verifyFragCorrect()
-void Dbacc::releaseDirResources(Signal* signal,
- Uint32 fragIndex,
- Uint32 dirIndex,
- Uint32 startIndex)
-{
- DirRangePtr regDirRangePtr;
- regDirRangePtr.i = dirIndex;
- ptrCheckGuard(regDirRangePtr, cdirrangesize, dirRange);
- for (Uint32 i = startIndex; i < 256; i++) {
- jam();
- if (regDirRangePtr.p->dirArray[i] != RNIL) {
- jam();
- Uint32 directoryIndex = regDirRangePtr.p->dirArray[i];
- regDirRangePtr.p->dirArray[i] = RNIL;
- releaseDirectoryResources(signal, fragIndex, dirIndex, (i + 1), directoryIndex);
- return;
- }//if
- }//for
- rdDirRangePtr = regDirRangePtr;
- releaseDirrange(signal);
- signal->theData[0] = ZREL_FRAG;
- signal->theData[1] = fragIndex;
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2, JBB);
-}//Dbacc::releaseDirResources()
+void Dbacc::releaseDirResources(Signal* signal)
+{
+ jam();
+ Uint32 fragIndex = signal->theData[1];
+
+ DynArr256::ReleaseIterator iter;
+ memcpy(&iter, &signal->theData[2], sizeof(iter));
+
+ FragmentrecPtr regFragPtr;
+ regFragPtr.i = fragIndex;
+ ptrCheckGuard(regFragPtr, cfragmentsize, fragmentrec);
+ verifyFragCorrect(regFragPtr);
+
+ DynArr256::Head* directory;
+ switch (signal->theData[0])
+ {
+ case ZREL_DIR:
+ jam();
+ directory = ®FragPtr.p->directory;
+ break;
+ case ZREL_ODIR:
+ jam();
+ directory = ®FragPtr.p->overflowdir;
+ break;
+ default:
+ ndbrequire(false);
+ }
-void Dbacc::releaseDirectoryResources(Signal* signal,
- Uint32 fragIndex,
- Uint32 dirIndex,
- Uint32 startIndex,
- Uint32 directoryIndex)
-{
- DirectoryarrayPtr regDirPtr;
- regDirPtr.i = directoryIndex;
- ptrCheckGuard(regDirPtr, cdirarraysize, directoryarray);
- for (Uint32 i = 0; i < 256; i++) {
+ DynArr256 dir(directoryPool, *directory);
+ Uint32 ret;
+ Uint32 pagei;
+ /* TODO: find a good value for count
+ * bigger value means quicker release of big index,
+ * but longer time slice so less concurrent
+ */
+ int count = 10;
+ while (count > 0 &&
+ (ret = dir.release(iter, &pagei)) != 0)
+ {
jam();
- if (regDirPtr.p->pagep[i] != RNIL) {
+ count--;
+ if (ret == 1 && pagei != RNIL)
+ {
jam();
- rpPageptr.i = regDirPtr.p->pagep[i];
+ rpPageptr.i = pagei;
ptrCheckGuard(rpPageptr, cpagesize, page8);
releasePage(signal);
- regDirPtr.p->pagep[i] = RNIL;
- }//if
- }//for
- rdDirptr = regDirPtr;
- releaseDirectory(signal);
- signal->theData[0] = ZREL_DIR;
- signal->theData[1] = fragIndex;
- signal->theData[2] = dirIndex;
- signal->theData[3] = startIndex;
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 4, JBB);
-}//Dbacc::releaseDirectoryResources()
+ }
+ }
+ if (ret != 0)
+ {
+ jam();
+ memcpy(&signal->theData[2], &iter, sizeof(iter));
+ sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2 + sizeof(iter) / 4, JBB);
+ }
+ else
+ {
+ jam();
+ signal->theData[0] = ZREL_FRAG;
+ sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2, JBB);
+ }
+}//Dbacc::releaseDirResources()
void Dbacc::releaseOverflowResources(Signal* signal, FragmentrecPtr regFragPtr)
{
@@ -2662,8 +2601,6 @@ void Dbacc::execACC_LOCKREQ(Signal* sign
/* --------------------------------------------------------------------------------- */
void Dbacc::insertElement(Signal* signal)
{
- DirRangePtr inrOverflowrangeptr;
- DirectoryarrayPtr inrOverflowDirptr;
OverflowRecordPtr inrOverflowRecPtr;
Page8Ptr inrNewPageptr;
Uint32 tinrNextSamePage;
@@ -2694,12 +2631,7 @@ void Dbacc::insertElement(Signal* signal
if (tinrNextSamePage == ZFALSE) {
jam(); /* NEXT CONTAINER IS IN AN OVERFLOW PAGE */
tinrTmp = idrPageptr.p->word32[tidrContainerptr + 1];
- inrOverflowrangeptr.i = fragrecptr.p->overflowdir;
- ptrCheckGuard(inrOverflowrangeptr, cdirrangesize, dirRange);
- arrGuard((tinrTmp >> 8), 256);
- inrOverflowDirptr.i = inrOverflowrangeptr.p->dirArray[tinrTmp >> 8];
- ptrCheckGuard(inrOverflowDirptr, cdirarraysize, directoryarray);
- idrPageptr.i = inrOverflowDirptr.p->pagep[tinrTmp & 0xff];
+ idrPageptr.i = getPagePtr(fragrecptr.p->overflowdir, tinrTmp);
ptrCheckGuard(idrPageptr, cpagesize, page8);
}//if
ndbrequire(tidrPageindex < ZEMPTYLIST);
@@ -2791,8 +2723,7 @@ void Dbacc::insertContainer(Signal* sign
Uint32 guard26;
tidrResult = ZFALSE;
- tidrContainerptr = (tidrPageindex << ZSHIFT_PLUS) - (tidrPageindex << ZSHIFT_MINUS);
- tidrContainerptr = tidrContainerptr + ZHEAD_SIZE;
+ tidrContainerptr = mul_ZBUF_SIZE(tidrPageindex) + ZHEAD_SIZE;
/* --------------------------------------------------------------------------------- */
/* CALCULATE THE POINTER TO THE ELEMENT TO BE INSERTED AND THE POINTER TO THE */
/* CONTAINER HEADER OF THE OTHER SIDE OF THE BUFFER. */
@@ -3027,7 +2958,7 @@ void Dbacc::seizeLeftlist(Signal* signal
Uint32 tsllHeadIndex;
Uint32 tsllTmp;
- tsllHeadIndex = ((tslPageindex << ZSHIFT_PLUS) - (tslPageindex << ZSHIFT_MINUS)) + ZHEAD_SIZE;
+ tsllHeadIndex = mul_ZBUF_SIZE(tslPageindex) + ZHEAD_SIZE;
arrGuard(tsllHeadIndex + 1, 2048);
tslNextfree = slPageptr.p->word32[tsllHeadIndex];
tslPrevfree = slPageptr.p->word32[tsllHeadIndex + 1];
@@ -3043,13 +2974,13 @@ void Dbacc::seizeLeftlist(Signal* signal
} else {
ndbrequire(tslPrevfree < ZEMPTYLIST);
jam();
- tsllTmp = ((tslPrevfree << ZSHIFT_PLUS) - (tslPrevfree << ZSHIFT_MINUS)) + ZHEAD_SIZE;
+ tsllTmp = mul_ZBUF_SIZE(tslPrevfree) + ZHEAD_SIZE;
dbgWord32(slPageptr, tsllTmp, tslNextfree);
slPageptr.p->word32[tsllTmp] = tslNextfree;
}//if
if (tslNextfree < ZEMPTYLIST) {
jam();
- tsllTmp = (((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ZHEAD_SIZE) + 1;
+ tsllTmp = mul_ZBUF_SIZE(tslNextfree) + ZHEAD_SIZE + 1;
dbgWord32(slPageptr, tsllTmp, tslPrevfree);
slPageptr.p->word32[tsllTmp] = tslPrevfree;
} else {
@@ -3084,7 +3015,7 @@ void Dbacc::seizeLeftlist(Signal* signal
slPageptr.p->word32[ZPOS_EMPTY_LIST] = tsllTmp;
if (tslNextfree < ZEMPTYLIST) {
jam();
- tsllTmp = ((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ZHEAD_SIZE;
+ tsllTmp = mul_ZBUF_SIZE(tslNextfree) + ZHEAD_SIZE;
tsllTmp1 = slPageptr.p->word32[tsllTmp] & 0xfe03ffff;
tsllTmp1 = tsllTmp1 | (tslPageindex << 18);
dbgWord32(slPageptr, tsllTmp, tsllTmp1);
@@ -3112,7 +3043,7 @@ void Dbacc::seizeRightlist(Signal* signa
Uint32 tsrlHeadIndex;
Uint32 tsrlTmp;
- tsrlHeadIndex = ((tslPageindex << ZSHIFT_PLUS) - (tslPageindex << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
+ tsrlHeadIndex = mul_ZBUF_SIZE(tslPageindex) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
arrGuard(tsrlHeadIndex + 1, 2048);
tslNextfree = slPageptr.p->word32[tsrlHeadIndex];
tslPrevfree = slPageptr.p->word32[tsrlHeadIndex + 1];
@@ -3124,13 +3055,13 @@ void Dbacc::seizeRightlist(Signal* signa
} else {
ndbrequire(tslPrevfree < ZEMPTYLIST);
jam();
- tsrlTmp = ((tslPrevfree << ZSHIFT_PLUS) - (tslPrevfree << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
+ tsrlTmp = mul_ZBUF_SIZE(tslPrevfree) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
dbgWord32(slPageptr, tsrlTmp, tslNextfree);
slPageptr.p->word32[tsrlTmp] = tslNextfree;
}//if
if (tslNextfree < ZEMPTYLIST) {
jam();
- tsrlTmp = ((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - (ZCON_HEAD_SIZE - 1));
+ tsrlTmp = mul_ZBUF_SIZE(tslNextfree) + ((ZHEAD_SIZE + ZBUF_SIZE) - (ZCON_HEAD_SIZE - 1));
dbgWord32(slPageptr, tsrlTmp, tslPrevfree);
slPageptr.p->word32[tsrlTmp] = tslPrevfree;
} else {
@@ -3163,7 +3094,7 @@ void Dbacc::seizeRightlist(Signal* signa
slPageptr.p->word32[ZPOS_EMPTY_LIST] = tsrlTmp | (tslPageindex << 16);
if (tslNextfree < ZEMPTYLIST) {
jam();
- tsrlTmp = ((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
+ tsrlTmp = mul_ZBUF_SIZE(tslNextfree) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
tsrlTmp1 = slPageptr.p->word32[tsrlTmp] & 0xfe03ffff;
dbgWord32(slPageptr, tsrlTmp, tsrlTmp1 | (tslPageindex << 18));
slPageptr.p->word32[tsrlTmp] = tsrlTmp1 | (tslPageindex << 18);
@@ -3223,10 +3154,33 @@ void Dbacc::seizeRightlist(Signal* signa
/* THE ADDRESS OF THE ELEMENT IN THE HASH TABLE,(GDI_PAGEPTR, */
/* TGDI_PAGEINDEX) ACCORDING TO LH3. */
/* --------------------------------------------------------------------------------- */
+Uint32 Dbacc::getPagePtr(DynArr256::Head& directory, Uint32 index)
+{
+ DynArr256 dir(directoryPool, directory);
+ Uint32* ptr = dir.get(index);
+ return *ptr;
+}
+
+bool Dbacc::setPagePtr(DynArr256::Head& directory, Uint32 index, Uint32 ptri)
+{
+ DynArr256 dir(directoryPool, directory);
+ Uint32* ptr = dir.set(index);
+ if (ptr == NULL) return false;
+ *ptr = ptri;
+ return true;
+}
+
+Uint32 Dbacc::unsetPagePtr(DynArr256::Head& directory, Uint32 index)
+{
+ DynArr256 dir(directoryPool, directory);
+ Uint32* ptr = dir.get(index);
+ Uint32 ptri = *ptr;
+ *ptr = RNIL;
+ return ptri;
+}
+
void Dbacc::getdirindex(Signal* signal)
{
- DirRangePtr gdiDirRangePtr;
- DirectoryarrayPtr gdiDirptr;
Uint32 tgdiTmp;
Uint32 tgdiAddress;
@@ -3235,17 +3189,12 @@ void Dbacc::getdirindex(Signal* signal)
tgdiTmp = operationRecPtr.p->hashValue >> tgdiTmp;
tgdiTmp = (tgdiTmp << fragrecptr.p->k) | tgdiPageindex;
tgdiAddress = tgdiTmp & fragrecptr.p->maxp;
- gdiDirRangePtr.i = fragrecptr.p->directory;
- ptrCheckGuard(gdiDirRangePtr, cdirrangesize, dirRange);
if (tgdiAddress < fragrecptr.p->p) {
jam();
tgdiAddress = tgdiTmp & ((fragrecptr.p->maxp << 1) | 1);
}//if
tgdiTmp = tgdiAddress >> fragrecptr.p->k;
- arrGuard((tgdiTmp >> 8), 256);
- gdiDirptr.i = gdiDirRangePtr.p->dirArray[tgdiTmp >> 8];
- ptrCheckGuard(gdiDirptr, cdirarraysize, directoryarray);
- gdiPageptr.i = gdiDirptr.p->pagep[tgdiTmp & 0xff]; /* DIRECTORY INDEX OF SEND BUCKET PAGE */
+ gdiPageptr.i = getPagePtr(fragrecptr.p->directory, tgdiTmp);
ptrCheckGuard(gdiPageptr, cpagesize, page8);
}//Dbacc::getdirindex()
@@ -3318,15 +3267,12 @@ Uint32
Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr)
{
Uint32 errcode;
- DirRangePtr geOverflowrangeptr;
- DirectoryarrayPtr geOverflowDirptr;
Uint32 tgeElementHeader;
Uint32 tgeElemStep;
Uint32 tgeContainerhead;
Uint32 tgePageindex;
Uint32 tgeActivePageDir;
Uint32 tgeNextptrtype;
- register Uint32 tgeKeyptr;
register Uint32 tgeRemLen;
register Uint32 TelemLen = fragrecptr.p->elementLength;
register Uint32* Tkeydata = (Uint32*)&signal->theData[7];
@@ -3348,12 +3294,11 @@ Dbacc::getElement(Signal* signal, Operat
const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF;
do {
- tgeContainerptr = (tgePageindex << ZSHIFT_PLUS) - (tgePageindex << ZSHIFT_MINUS);
+ tgeContainerptr = mul_ZBUF_SIZE(tgePageindex);
if (tgeNextptrtype == ZLEFT) {
jam();
tgeContainerptr = tgeContainerptr + ZHEAD_SIZE;
tgeElementptr = tgeContainerptr + ZCON_HEAD_SIZE;
- tgeKeyptr = (tgeElementptr + ZELEM_HEAD_SIZE) + localkeylen;
tgeElemStep = TelemLen;
tgeForward = 1;
if (unlikely(tgeContainerptr >= 2048))
@@ -3371,7 +3316,6 @@ Dbacc::getElement(Signal* signal, Operat
jam();
tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
tgeElementptr = tgeContainerptr - 1;
- tgeKeyptr = (tgeElementptr - ZELEM_HEAD_SIZE) - localkeylen;
tgeElemStep = 0 - TelemLen;
tgeForward = (Uint32)-1;
if (unlikely(tgeContainerptr >= 2048))
@@ -3475,12 +3419,7 @@ Dbacc::getElement(Signal* signal, Operat
if (((tgeContainerhead >> 9) & 1) == ZFALSE) {
jam();
tgeActivePageDir = gePageptr.p->word32[tgeContainerptr + 1]; /* NEXT PAGE ID */
- geOverflowrangeptr.i = fragrecptr.p->overflowdir;
- ptrCheckGuard(geOverflowrangeptr, cdirrangesize, dirRange);
- arrGuard((tgeActivePageDir >> 8), 256);
- geOverflowDirptr.i = geOverflowrangeptr.p->dirArray[tgeActivePageDir >> 8];
- ptrCheckGuard(geOverflowDirptr, cdirarraysize, directoryarray);
- gePageptr.i = geOverflowDirptr.p->pagep[tgeActivePageDir & 0xff];
+ gePageptr.i = getPagePtr(fragrecptr.p->overflowdir, tgeActivePageDir);
ptrCheckGuard(gePageptr, cpagesize, page8);
}//if
} while (1);
@@ -3555,7 +3494,7 @@ void Dbacc::commitdelete(Signal* signal)
lastPageptr.i = gdiPageptr.i;
lastPageptr.p = gdiPageptr.p;
tlastForward = ZTRUE;
- tlastContainerptr = (tlastPageindex << ZSHIFT_PLUS) - (tlastPageindex << ZSHIFT_MINUS);
+ tlastContainerptr = mul_ZBUF_SIZE(tlastPageindex);
tlastContainerptr = tlastContainerptr + ZHEAD_SIZE;
arrGuard(tlastContainerptr, 2048);
tlastContainerhead = lastPageptr.p->word32[tlastContainerptr];
@@ -3685,8 +3624,6 @@ void Dbacc::deleteElement(Signal* signal
/* --------------------------------------------------------------------------------- */
void Dbacc::getLastAndRemove(Signal* signal)
{
- DirRangePtr glrOverflowrangeptr;
- DirectoryarrayPtr glrOverflowDirptr;
Uint32 tglrHead;
Uint32 tglrTmp;
@@ -3701,15 +3638,10 @@ void Dbacc::getLastAndRemove(Signal* sig
jam();
arrGuard(tlastContainerptr + 1, 2048);
tglrTmp = lastPageptr.p->word32[tlastContainerptr + 1];
- glrOverflowrangeptr.i = fragrecptr.p->overflowdir;
- ptrCheckGuard(glrOverflowrangeptr, cdirrangesize, dirRange);
- arrGuard((tglrTmp >> 8), 256);
- glrOverflowDirptr.i = glrOverflowrangeptr.p->dirArray[tglrTmp >> 8];
- ptrCheckGuard(glrOverflowDirptr, cdirarraysize, directoryarray);
- lastPageptr.i = glrOverflowDirptr.p->pagep[tglrTmp & 0xff];
+ lastPageptr.i = getPagePtr(fragrecptr.p->overflowdir, tglrTmp);
ptrCheckGuard(lastPageptr, cpagesize, page8);
}//if
- tlastContainerptr = (tlastPageindex << ZSHIFT_PLUS) - (tlastPageindex << ZSHIFT_MINUS);
+ tlastContainerptr = mul_ZBUF_SIZE(tlastPageindex);
if (((tlastContainerhead >> 7) & 3) == ZLEFT) {
jam();
tlastForward = ZTRUE;
@@ -3827,7 +3759,7 @@ void Dbacc::releaseLeftlist(Signal* sign
trlPrevused = (trlHead >> 18) & 0x7f;
if (trlNextused < ZEMPTYLIST) {
jam();
- tullTmp1 = (trlNextused << ZSHIFT_PLUS) - (trlNextused << ZSHIFT_MINUS);
+ tullTmp1 = mul_ZBUF_SIZE(trlNextused);
tullTmp1 = tullTmp1 + ZHEAD_SIZE;
tullTmp = rlPageptr.p->word32[tullTmp1] & 0xfe03ffff;
dbgWord32(rlPageptr, tullTmp1, tullTmp | (trlPrevused << 18));
@@ -3838,7 +3770,7 @@ void Dbacc::releaseLeftlist(Signal* sign
}//if
if (trlPrevused < ZEMPTYLIST) {
jam();
- tullTmp1 = (trlPrevused << ZSHIFT_PLUS) - (trlPrevused << ZSHIFT_MINUS);
+ tullTmp1 = mul_ZBUF_SIZE(trlPrevused);
tullTmp1 = tullTmp1 + ZHEAD_SIZE;
tullTmp = rlPageptr.p->word32[tullTmp1] & 0xfffc07ff;
dbgWord32(rlPageptr, tullTmp1, tullTmp | (trlNextused << 11));
@@ -3863,7 +3795,7 @@ void Dbacc::releaseLeftlist(Signal* sign
rlPageptr.p->word32[tullIndex] = tullTmp1;
if (tullTmp1 < ZEMPTYLIST) {
jam();
- tullTmp1 = (tullTmp1 << ZSHIFT_PLUS) - (tullTmp1 << ZSHIFT_MINUS);
+ tullTmp1 = mul_ZBUF_SIZE(tullTmp1);
tullTmp1 = (tullTmp1 + ZHEAD_SIZE) + 1;
dbgWord32(rlPageptr, tullTmp1, trlPageindex);
rlPageptr.p->word32[tullTmp1] = trlPageindex; /* UPDATES PREV POINTER IN THE NEXT FREE */
@@ -3920,7 +3852,7 @@ void Dbacc::releaseRightlist(Signal* sig
trlPrevused = (trlHead >> 18) & 0x7f;
if (trlNextused < ZEMPTYLIST) {
jam();
- turlTmp1 = (trlNextused << ZSHIFT_PLUS) - (trlNextused << ZSHIFT_MINUS);
+ turlTmp1 = mul_ZBUF_SIZE(trlNextused);
turlTmp1 = turlTmp1 + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
turlTmp = rlPageptr.p->word32[turlTmp1] & 0xfe03ffff;
dbgWord32(rlPageptr, turlTmp1, turlTmp | (trlPrevused << 18));
@@ -3931,7 +3863,7 @@ void Dbacc::releaseRightlist(Signal* sig
}//if
if (trlPrevused < ZEMPTYLIST) {
jam();
- turlTmp1 = (trlPrevused << ZSHIFT_PLUS) - (trlPrevused << ZSHIFT_MINUS);
+ turlTmp1 = mul_ZBUF_SIZE(trlPrevused);
turlTmp1 = turlTmp1 + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
turlTmp = rlPageptr.p->word32[turlTmp1] & 0xfffc07ff;
dbgWord32(rlPageptr, turlTmp1, turlTmp | (trlNextused << 11));
@@ -3957,7 +3889,7 @@ void Dbacc::releaseRightlist(Signal* sig
rlPageptr.p->word32[turlIndex] = turlTmp1;
if (turlTmp1 < ZEMPTYLIST) {
jam();
- turlTmp = (turlTmp1 << ZSHIFT_PLUS) - (turlTmp1 << ZSHIFT_MINUS);
+ turlTmp = mul_ZBUF_SIZE(turlTmp1);
turlTmp = turlTmp + ((ZHEAD_SIZE + ZBUF_SIZE) - (ZCON_HEAD_SIZE - 1));
dbgWord32(rlPageptr, turlTmp, trlPageindex);
rlPageptr.p->word32[turlTmp] = trlPageindex; /* UPDATES PREV POINTER IN THE NEXT FREE */
@@ -5084,12 +5016,8 @@ void Dbacc::insertLockOwnersList(Signal*
/* --------------------------------------------------------------------------------- */
void Dbacc::allocOverflowPage(Signal* signal)
{
- DirRangePtr aopDirRangePtr;
- DirectoryarrayPtr aopOverflowDirptr;
OverflowRecordPtr aopOverflowRecPtr;
Uint32 taopTmp1;
- Uint32 taopTmp2;
- Uint32 taopTmp3;
tresult = 0;
if (cfirstfreepage == RNIL)
@@ -5112,11 +5040,6 @@ void Dbacc::allocOverflowPage(Signal* si
jam();
tresult = ZOVER_REC_ERROR;
return;
- } else if ((cfirstfreedir == RNIL) &&
- (cdirarraysize <= cdirmemory)) {
- jam();
- tresult = ZDIRSIZE_ERROR;
- return;
} else {
jam();
seizeOverRec(signal);
@@ -5128,22 +5051,14 @@ void Dbacc::allocOverflowPage(Signal* si
fragrecptr.p->firstOverflowRec = aopOverflowRecPtr.i;
fragrecptr.p->lastOverflowRec = aopOverflowRecPtr.i;
taopTmp1 = aopOverflowRecPtr.p->dirindex;
- aopDirRangePtr.i = fragrecptr.p->overflowdir;
- taopTmp2 = taopTmp1 >> 8;
- taopTmp3 = taopTmp1 & 0xff;
- ptrCheckGuard(aopDirRangePtr, cdirrangesize, dirRange);
- arrGuard(taopTmp2, 256);
- if (aopDirRangePtr.p->dirArray[taopTmp2] == RNIL) {
- jam();
- seizeDirectory(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- aopDirRangePtr.p->dirArray[taopTmp2] = sdDirptr.i;
- }//if
- aopOverflowDirptr.i = aopDirRangePtr.p->dirArray[taopTmp2];
seizePage(signal);
ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- ptrCheckGuard(aopOverflowDirptr, cdirarraysize, directoryarray);
- aopOverflowDirptr.p->pagep[taopTmp3] = spPageptr.i;
+ if (!setPagePtr(fragrecptr.p->overflowdir, taopTmp1, spPageptr.i))
+ {
+ jam();
+ tresult = ZOVER_REC_ERROR;
+ }
+ ndbrequire(tresult <= ZLIMIT_OF_ERROR);
tiopPageId = aopOverflowRecPtr.p->dirindex;
iopOverflowRecPtr = aopOverflowRecPtr;
iopPageptr = spPageptr;
@@ -5189,8 +5104,6 @@ Uint32 Dbacc::checkScanExpand(Signal* si
Uint32 TreleaseInd = 0;
Uint32 TreleaseScanBucket;
Uint32 TreleaseScanIndicator[MAX_PARALLEL_SCANS_PER_FRAG];
- DirectoryarrayPtr TDirptr;
- DirRangePtr TDirRangePtr;
Page8Ptr TPageptr;
ScanRecPtr TscanPtr;
@@ -5248,14 +5161,9 @@ Uint32 Dbacc::checkScanExpand(Signal* si
}//for
if (TreleaseInd == 1) {
TreleaseScanBucket = TSplit;
- TDirRangePtr.i = fragrecptr.p->directory;
TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
- ptrCheckGuard(TDirRangePtr, cdirrangesize, dirRange);
- arrGuard((TDirInd >> 8), 256);
- TDirptr.i = TDirRangePtr.p->dirArray[TDirInd >> 8];
- ptrCheckGuard(TDirptr, cdirarraysize, directoryarray);
- TPageptr.i = TDirptr.p->pagep[TDirInd & 0xff];
+ TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd);
ptrCheckGuard(TPageptr, cpagesize, page8);
for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
if (TreleaseScanIndicator[Ti] == 1) {
@@ -5281,8 +5189,6 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
return;
}
- DirectoryarrayPtr newDirptr;
-
fragrecptr.i = signal->theData[0];
tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
Uint32 tmp = 1;
@@ -5340,62 +5246,35 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
/* THE NEXT HASH BIT. THIS BIT IS USED IN THE SPLIT MECHANISM TO */
/* DECIDE WHICH ELEMENT GOES WHERE. */
/*--------------------------------------------------------------------------*/
- expDirRangePtr.i = fragrecptr.p->directory;
texpReceivedBucket = (fragrecptr.p->maxp + fragrecptr.p->p) + 1; /* RECEIVED BUCKET */
texpDirInd = texpReceivedBucket >> fragrecptr.p->k;
- newDirptr.i = RNIL;
- ptrNull(newDirptr);
- texpDirRangeIndex = texpDirInd >> 8;
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- Uint32 max_dir_range_size = (256 * (100 - m_free_pct)) / 100;
- if (ERROR_INSERTED(3002)) {
- debug_lh_vars("EXP");
- max_dir_range_size = 2;
+ if ((texpReceivedBucket & ((1 << fragrecptr.p->k) - 1)) == 0)
+ { // Need new bucket
+ expPageptr.i = RNIL;
}
- if (texpDirRangeIndex >= max_dir_range_size) {
- jam();
- ndbrequire(texpDirRangeIndex == max_dir_range_size);
- if (fragrecptr.p->dirRangeFull == ZFALSE) {
- jam();
- fragrecptr.p->dirRangeFull = ZTRUE;
- }
- return;
+ else
+ {
+ expPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
+#ifdef VM_TRACE
+ require(expPageptr.i != RNIL);
+#endif
}
- expDirptr.i = expDirRangePtr.p->dirArray[texpDirRangeIndex];
- if (expDirptr.i == RNIL) {
- jam();
- seizeDirectory(signal);
- if (tresult > ZLIMIT_OF_ERROR) {
- jam();
- return;
- } else {
- jam();
- newDirptr = sdDirptr;
- expDirptr = sdDirptr;
- expDirRangePtr.p->dirArray[texpDirRangeIndex] = sdDirptr.i;
- }//if
- } else {
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- }//if
- texpDirPageIndex = texpDirInd & 0xff;
- expPageptr.i = expDirptr.p->pagep[texpDirPageIndex];
if (expPageptr.i == RNIL) {
jam();
seizePage(signal);
if (tresult > ZLIMIT_OF_ERROR) {
jam();
- if (newDirptr.i != RNIL) {
- jam();
- rdDirptr.i = newDirptr.i;
- releaseDirectory(signal);
- }//if
return;
}//if
- expDirptr.p->pagep[texpDirPageIndex] = spPageptr.i;
+ if (!setPagePtr(fragrecptr.p->directory, texpDirInd, spPageptr.i))
+ {
+ jam();
+ tresult = ZDIR_RANGE_FULL_ERROR;
+ return;
+ }
tipPageId = texpDirInd;
inpPageptr = spPageptr;
initPage(signal);
- fragrecptr.p->dirsize++;
expPageptr = spPageptr;
} else {
ptrCheckGuard(expPageptr, cpagesize, page8);
@@ -5407,14 +5286,12 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
/* THE NEXT ACTION IS TO FIND THE PAGE, THE PAGE INDEX AND THE PAGE */
/* DIRECTORY OF THE BUCKET TO BE SPLIT. */
/*--------------------------------------------------------------------------*/
- expDirRangePtr.i = fragrecptr.p->directory;
cexcPageindex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
texpDirInd = fragrecptr.p->p >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- arrGuard((texpDirInd >> 8), 256);
- expDirptr.i = expDirRangePtr.p->dirArray[texpDirInd >> 8];
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- excPageptr.i = expDirptr.p->pagep[texpDirInd & 0xff];
+ excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
+#ifdef VM_TRACE
+ require(excPageptr.i != RNIL);
+#endif
fragrecptr.p->expSenderIndex = cexcPageindex;
fragrecptr.p->expSenderPageptr = excPageptr.i;
if (excPageptr.i == RNIL) {
@@ -5437,7 +5314,6 @@ void Dbacc::endofexpLab(Signal* signal)
if (fragrecptr.p->p > fragrecptr.p->maxp) {
jam();
fragrecptr.p->maxp = (fragrecptr.p->maxp << 1) | 1;
- fragrecptr.p->lhdirbits++;
fragrecptr.p->hashcheckbit++;
fragrecptr.p->p = 0;
}//if
@@ -5525,7 +5401,7 @@ void Dbacc::expandcontainer(Signal* sign
cexcPrevconptr = 0;
cexcForward = ZTRUE;
EXP_CONTAINER_LOOP:
- cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
+ cexcContainerptr = mul_ZBUF_SIZE(cexcPageindex);
if (cexcForward == ZTRUE) {
jam();
cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
@@ -5727,8 +5603,6 @@ Uint32 Dbacc::checkScanShrink(Signal* si
Uint32 TreleaseScanBucket;
Uint32 TreleaseInd = 0;
Uint32 TreleaseScanIndicator[MAX_PARALLEL_SCANS_PER_FRAG];
- DirectoryarrayPtr TDirptr;
- DirRangePtr TDirRangePtr;
Page8Ptr TPageptr;
ScanRecPtr TscanPtr;
@@ -5793,14 +5667,9 @@ Uint32 Dbacc::checkScanShrink(Signal* si
if (TreleaseInd == 1) {
jam();
TreleaseScanBucket = TmergeSource;
- TDirRangePtr.i = fragrecptr.p->directory;
TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
- ptrCheckGuard(TDirRangePtr, cdirrangesize, dirRange);
- arrGuard((TDirInd >> 8), 256);
- TDirptr.i = TDirRangePtr.p->dirArray[TDirInd >> 8];
- ptrCheckGuard(TDirptr, cdirarraysize, directoryarray);
- TPageptr.i = TDirptr.p->pagep[TDirInd & 0xff];
+ TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd);
ptrCheckGuard(TPageptr, cpagesize, page8);
for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
if (TreleaseScanIndicator[Ti] == 1) {
@@ -5890,7 +5759,6 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
jam();
fragrecptr.p->maxp = fragrecptr.p->maxp >> 1;
fragrecptr.p->p = fragrecptr.p->maxp;
- fragrecptr.p->lhdirbits--;
fragrecptr.p->hashcheckbit--;
} else {
jam();
@@ -5908,17 +5776,9 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
/* WE START BY FINDING THE NECESSARY INFORMATION OF THE BUCKET TO BE */
/* REMOVED WHICH WILL SEND ITS ELEMENTS TO THE RECEIVING BUCKET. */
/*--------------------------------------------------------------------------*/
- expDirRangePtr.i = fragrecptr.p->directory;
cexcPageindex = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) & ((1 << fragrecptr.p->k) - 1);
texpDirInd = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) >> fragrecptr.p->k;
- texpDirRangeIndex = texpDirInd >> 8;
- texpDirPageIndex = texpDirInd & 0xff;
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- arrGuard(texpDirRangeIndex, 256);
- expDirptr.i = expDirRangePtr.p->dirArray[texpDirRangeIndex];
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- excPageptr.i = expDirptr.p->pagep[texpDirPageIndex];
- fragrecptr.p->expSenderDirptr = expDirptr.i;
+ excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
fragrecptr.p->expSenderIndex = cexcPageindex;
fragrecptr.p->expSenderPageptr = excPageptr.i;
fragrecptr.p->expSenderDirIndex = texpDirInd;
@@ -5926,13 +5786,8 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
/* WE NOW PROCEED BY FINDING THE NECESSARY INFORMATION ABOUT THE */
/* RECEIVING BUCKET. */
/*--------------------------------------------------------------------------*/
- expDirRangePtr.i = fragrecptr.p->directory;
texpReceivedBucket = fragrecptr.p->p >> fragrecptr.p->k;
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- arrGuard((texpReceivedBucket >> 8), 256);
- expDirptr.i = expDirRangePtr.p->dirArray[texpReceivedBucket >> 8];
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- fragrecptr.p->expReceivePageptr = expDirptr.p->pagep[texpReceivedBucket & 0xff];
+ fragrecptr.p->expReceivePageptr = getPagePtr(fragrecptr.p->directory, texpReceivedBucket);
fragrecptr.p->expReceiveIndex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1);
fragrecptr.p->expReceiveForward = ZTRUE;
if (excPageptr.i == RNIL) {
@@ -5945,7 +5800,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
/*--------------------------------------------------------------------------*/
ptrCheckGuard(excPageptr, cpagesize, page8);
cexcForward = ZTRUE;
- cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
+ cexcContainerptr = mul_ZBUF_SIZE(cexcPageindex);
cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
arrGuard(cexcContainerptr, 2048);
cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
@@ -5979,7 +5834,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
}//if
nextcontainerinfoExp(signal);
do {
- cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
+ cexcContainerptr = mul_ZBUF_SIZE(cexcPageindex);
if (cexcForward == ZTRUE) {
jam();
cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
@@ -6043,24 +5898,29 @@ void Dbacc::endofshrinkbucketLab(Signal*
fragrecptr.p->slack -= fragrecptr.p->maxloadfactor;
if (fragrecptr.p->expSenderIndex == 0) {
jam();
- fragrecptr.p->dirsize--;
if (fragrecptr.p->expSenderPageptr != RNIL) {
jam();
rpPageptr.i = fragrecptr.p->expSenderPageptr;
ptrCheckGuard(rpPageptr, cpagesize, page8);
releasePage(signal);
- expDirptr.i = fragrecptr.p->expSenderDirptr;
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- expDirptr.p->pagep[fragrecptr.p->expSenderDirIndex & 0xff] = RNIL;
+ unsetPagePtr(fragrecptr.p->directory, fragrecptr.p->expSenderDirIndex);
}//if
if (((((fragrecptr.p->p + fragrecptr.p->maxp) + 1) >> fragrecptr.p->k) & 0xff) == 0) {
jam();
- rdDirptr.i = fragrecptr.p->expSenderDirptr;
- releaseDirectory(signal);
- expDirRangePtr.i = fragrecptr.p->directory;
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- arrGuard((fragrecptr.p->expSenderDirIndex >> 8), 256);
- expDirRangePtr.p->dirArray[fragrecptr.p->expSenderDirIndex >> 8] = RNIL;
+ DynArr256 dir(directoryPool, fragrecptr.p->directory);
+ DynArr256::ReleaseIterator iter;
+ Uint32 relcode;
+#ifdef VM_TRACE
+ Uint32 count = 0;
+#endif
+ dir.init(iter);
+ while ((relcode = dir.trim(fragrecptr.p->expSenderDirIndex, iter)) != 0)
+ {
+#ifdef VM_TRACE
+ count++;
+ ndbrequire(count <= 256);
+#endif
+ }
}//if
}//if
if (fragrecptr.p->slack < (1u << 31)) {
@@ -6215,12 +6075,7 @@ void Dbacc::nextcontainerinfoExp(Signal*
/* NEXT CONTAINER IS IN AN OVERFLOW PAGE */
arrGuard(cexcContainerptr + 1, 2048);
tnciTmp = excPageptr.p->word32[cexcContainerptr + 1];
- nciOverflowrangeptr.i = fragrecptr.p->overflowdir;
- ptrCheckGuard(nciOverflowrangeptr, cdirrangesize, dirRange);
- arrGuard((tnciTmp >> 8), 256);
- nciOverflowDirptr.i = nciOverflowrangeptr.p->dirArray[tnciTmp >> 8];
- ptrCheckGuard(nciOverflowDirptr, cdirarraysize, directoryarray);
- excPageptr.i = nciOverflowDirptr.p->pagep[tnciTmp & 0xff];
+ excPageptr.i = getPagePtr(fragrecptr.p->overflowdir, tnciTmp);
ptrCheckGuard(excPageptr, cpagesize, page8);
}//if
}//Dbacc::nextcontainerinfoExp()
@@ -6257,12 +6112,10 @@ void Dbacc::initFragAdd(Signal* signal,
regFragPtr.p->maxloadfactor = maxLoadFactor;
regFragPtr.p->slack = (regFragPtr.p->maxp + 1) * maxLoadFactor;
regFragPtr.p->lhfragbits = lhFragBits;
- regFragPtr.p->lhdirbits = 0;
regFragPtr.p->hashcheckbit = 0; //lhFragBits;
regFragPtr.p->localkeylen = req->localKeyLen;
regFragPtr.p->nodetype = (req->reqInfo >> 4) & 0x3;
regFragPtr.p->lastOverIndex = 0;
- regFragPtr.p->dirsize = 1;
regFragPtr.p->keyLength = req->keyLength;
ndbrequire(req->keyLength != 0);
regFragPtr.p->elementLength = ZELEM_HEAD_SIZE + regFragPtr.p->localkeylen;
@@ -6284,8 +6137,9 @@ void Dbacc::initFragAdd(Signal* signal,
void Dbacc::initFragGeneral(FragmentrecPtr regFragPtr)
{
- regFragPtr.p->directory = RNIL;
- regFragPtr.p->overflowdir = RNIL;
+ new (®FragPtr.p->directory) DynArr256::Head();
+ new (®FragPtr.p->overflowdir) DynArr256::Head();
+
regFragPtr.p->firstOverflowRec = RNIL;
regFragPtr.p->lastOverflowRec = RNIL;
regFragPtr.p->lockOwnersList = RNIL;
@@ -6299,29 +6153,6 @@ void Dbacc::initFragGeneral(FragmentrecP
}//Dbacc::initFragGeneral()
-void
-Dbacc::releaseLogicalPage(Fragmentrec * fragP, Uint32 logicalPageId){
- Ptr<struct DirRange> dirRangePtr;
- dirRangePtr.i = fragP->directory;
- ptrCheckGuard(dirRangePtr, cdirrangesize, dirRange);
-
- const Uint32 lp1 = logicalPageId >> 8;
- const Uint32 lp2 = logicalPageId & 0xFF;
- ndbrequire(lp1 < 256);
-
- Ptr<struct Directoryarray> dirArrPtr;
- dirArrPtr.i = dirRangePtr.p->dirArray[lp1];
- ptrCheckGuard(dirArrPtr, cdirarraysize, directoryarray);
-
- const Uint32 physicalPageId = dirArrPtr.p->pagep[lp2];
-
- rpPageptr.i = physicalPageId;
- ptrCheckGuard(rpPageptr, cpagesize, page8);
- releasePage(0);
-
- dirArrPtr.p->pagep[lp2] = RNIL;
-}
-
void Dbacc::execACC_SCANREQ(Signal* signal)
{
jamEntry();
@@ -6464,9 +6295,6 @@ void Dbacc::execNEXT_SCANREQ(Signal* sig
void Dbacc::checkNextBucketLab(Signal* signal)
{
- DirRangePtr cscDirRangePtr;
- DirectoryarrayPtr cscDirptr;
- DirectoryarrayPtr tnsDirptr;
Page8Ptr nsPageptr;
Page8Ptr cscPageidptr;
Page8Ptr gnsPageidptr;
@@ -6476,17 +6304,10 @@ void Dbacc::checkNextBucketLab(Signal* s
Uint32 tnsIsLocked;
Uint32 tnsTmp1;
Uint32 tnsTmp2;
- Uint32 tnsCopyIndex1;
- Uint32 tnsCopyIndex2;
Uint32 tnsCopyDir;
tnsCopyDir = scanPtr.p->nextBucketIndex >> fragrecptr.p->k;
- tnsCopyIndex1 = tnsCopyDir >> 8;
- tnsCopyIndex2 = tnsCopyDir & 0xff;
- arrGuard(tnsCopyIndex1, 256);
- tnsDirptr.i = gnsDirRangePtr.p->dirArray[tnsCopyIndex1];
- ptrCheckGuard(tnsDirptr, cdirarraysize, directoryarray);
- tnsPageidptr.i = tnsDirptr.p->pagep[tnsCopyIndex2];
+ tnsPageidptr.i = getPagePtr(fragrecptr.p->directory, tnsCopyDir);
ptrCheckGuard(tnsPageidptr, cpagesize, page8);
gnsPageidptr.i = tnsPageidptr.i;
gnsPageidptr.p = tnsPageidptr.p;
@@ -6561,15 +6382,8 @@ void Dbacc::checkNextBucketLab(Signal* s
rsbPageidptr.p = gnsPageidptr.p;
} else {
jam();
- cscDirRangePtr.i = fragrecptr.p->directory;
tmpP = scanPtr.p->nextBucketIndex >> fragrecptr.p->k;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- ptrCheckGuard(cscDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- cscDirptr.i = cscDirRangePtr.p->dirArray[tmpP2];
- ptrCheckGuard(cscDirptr, cdirarraysize, directoryarray);
- cscPageidptr.i = cscDirptr.p->pagep[tmpP];
+ cscPageidptr.i = getPagePtr(fragrecptr.p->directory, tmpP);
ptrCheckGuard(cscPageidptr, cpagesize, page8);
tmp1 = (1 << fragrecptr.p->k) - 1;
trsbPageindex = scanPtr.p->nextBucketIndex & tmp1;
@@ -6696,8 +6510,6 @@ void Dbacc::checkNextFragmentLab(Signal*
void Dbacc::initScanFragmentPart(Signal* signal)
{
- DirRangePtr cnfDirRangePtr;
- DirectoryarrayPtr cnfDirptr;
Page8Ptr cnfPageidptr;
/* ----------------------------------------------------------------------- */
// Set the active fragment part.
@@ -6714,11 +6526,7 @@ void Dbacc::initScanFragmentPart(Signal*
scanPtr.p->startNoOfBuckets = fragrecptr.p->p + fragrecptr.p->maxp;
scanPtr.p->minBucketIndexToRescan = 0xFFFFFFFF;
scanPtr.p->maxBucketIndexToRescan = 0;
- cnfDirRangePtr.i = fragrecptr.p->directory;
- ptrCheckGuard(cnfDirRangePtr, cdirrangesize, dirRange);
- cnfDirptr.i = cnfDirRangePtr.p->dirArray[0];
- ptrCheckGuard(cnfDirptr, cdirarraysize, directoryarray);
- cnfPageidptr.i = cnfDirptr.p->pagep[0];
+ cnfPageidptr.i = getPagePtr(fragrecptr.p->directory, 0);
ptrCheckGuard(cnfPageidptr, cpagesize, page8);
trsbPageindex = scanPtr.p->nextBucketIndex & ((1 << fragrecptr.p->k) - 1);
rsbPageidptr.i = cnfPageidptr.i;
@@ -6939,8 +6747,6 @@ void Dbacc::execACC_CHECK_SCAN(Signal* s
fragrecptr.i = scanPtr.p->activeLocalFrag;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- gnsDirRangePtr.i = fragrecptr.p->directory;
- ptrCheckGuard(gnsDirRangePtr, cdirrangesize, dirRange);
checkNextBucketLab(signal);
return;
}//Dbacc::execACC_CHECK_SCAN()
@@ -6986,7 +6792,7 @@ void Dbacc::execACC_TO_REQ(Signal* signa
/* --------------------------------------------------------------------------------- */
void Dbacc::containerinfo(Signal* signal)
{
- tciContainerptr = (tciPageindex << ZSHIFT_PLUS) - (tciPageindex << ZSHIFT_MINUS);
+ tciContainerptr = mul_ZBUF_SIZE(tciPageindex);
if (tciIsforward == ZTRUE) {
jam();
tciContainerptr = tciContainerptr + ZHEAD_SIZE;
@@ -7125,12 +6931,7 @@ void Dbacc::nextcontainerinfo(Signal* si
/* NEXT CONTAINER IS IN AN OVERFLOW PAGE */
arrGuard(tnciContainerptr + 1, 2048);
tnciTmp = nciPageidptr.p->word32[tnciContainerptr + 1];
- nciOverflowrangeptr.i = fragrecptr.p->overflowdir;
- ptrCheckGuard(nciOverflowrangeptr, cdirrangesize, dirRange);
- arrGuard((tnciTmp >> 8), 256);
- nciOverflowDirptr.i = nciOverflowrangeptr.p->dirArray[tnciTmp >> 8];
- ptrCheckGuard(nciOverflowDirptr, cdirarraysize, directoryarray);
- nciPageidptr.i = nciOverflowDirptr.p->pagep[tnciTmp & 0xff];
+ nciPageidptr.i = getPagePtr(fragrecptr.p->overflowdir, tnciTmp);
ptrCheckGuard(nciPageidptr, cpagesize, page8);
}//if
}//Dbacc::nextcontainerinfo()
@@ -7873,26 +7674,6 @@ void Dbacc::putRecInFreeOverdir(Signal*
}//Dbacc::putRecInFreeOverdir()
/* --------------------------------------------------------------------------------- */
-/* RELEASE_DIRECTORY */
-/* --------------------------------------- ----------------------------------------- */
-void Dbacc::releaseDirectory(Signal* signal)
-{
- ptrCheckGuard(rdDirptr, cdirarraysize, directoryarray);
- rdDirptr.p->pagep[0] = cfirstfreedir;
- cfirstfreedir = rdDirptr.i;
-}//Dbacc::releaseDirectory()
-
-/* --------------------------------------------------------------------------------- */
-/* RELEASE_DIRRANGE */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::releaseDirrange(Signal* signal)
-{
- ptrCheckGuard(rdDirRangePtr, cdirrangesize, dirRange);
- rdDirRangePtr.p->dirArray[0] = cfirstfreeDirrange;
- cfirstfreeDirrange = rdDirRangePtr.i;
-}//Dbacc::releaseDirrange()
-
-/* --------------------------------------------------------------------------------- */
/* RELEASE OP RECORD */
/* PUT A FREE OPERATION IN A FREE LIST OF THE OPERATIONS */
/* --------------------------------------------------------------------------------- */
@@ -7940,13 +7721,9 @@ void Dbacc::releaseOverflowRec(Signal* s
/* --------------------------------------------------------------------------------- */
void Dbacc::releaseOverpage(Signal* signal)
{
- DirRangePtr ropOverflowrangeptr;
- DirectoryarrayPtr ropOverflowDirptr;
OverflowRecordPtr ropOverflowRecPtr;
OverflowRecordPtr tuodOverflowRecPtr;
Uint32 tropTmp;
- Uint32 tropTmp1;
- Uint32 tropTmp2;
ropOverflowRecPtr.i = ropPageptr.p->word32[ZPOS_OVERFLOWREC];
ndbrequire(ropOverflowRecPtr.i != RNIL);
@@ -7959,28 +7736,6 @@ void Dbacc::releaseOverpage(Signal* sign
jam();
return; /* THERE IS ONLY ONE OVERFLOW PAGE */
}//if
-#if kalle
- logicalPage = 0;
-
- i = fragrecptr.p->directory;
- p = dirRange.getPtr(i);
-
- i1 = logicalPage >> 8;
- i2 = logicalPage & 0xFF;
-
- ndbrequire(i1 < 256);
-
- i = p->dirArray[i1];
- p = directoryarray.getPtr(i);
-
- physicPageId = p->pagep[i2];
- physicPageP = page8.getPtr(physicPageId);
-
- p->pagep[i2] = RNIL;
- rpPageptr = { physicPageId, physicPageP };
- releasePage(signal);
-
-#endif
/* ----------------------------------------------------------------------- */
/* IT WAS OK TO RELEASE THE PAGE. */
@@ -7992,14 +7747,7 @@ void Dbacc::releaseOverpage(Signal* sign
priOverflowRecPtr = ropOverflowRecPtr;
putRecInFreeOverdir(signal);
tropTmp = ropPageptr.p->word32[ZPOS_PAGE_ID];
- ropOverflowrangeptr.i = fragrecptr.p->overflowdir;
- tropTmp1 = tropTmp >> 8;
- tropTmp2 = tropTmp & 0xff;
- ptrCheckGuard(ropOverflowrangeptr, cdirrangesize, dirRange);
- arrGuard(tropTmp1, 256);
- ropOverflowDirptr.i = ropOverflowrangeptr.p->dirArray[tropTmp1];
- ptrCheckGuard(ropOverflowDirptr, cdirarraysize, directoryarray);
- ropOverflowDirptr.p->pagep[tropTmp2] = RNIL;
+ unsetPagePtr(fragrecptr.p->overflowdir, tropTmp);
rpPageptr = ropPageptr;
releasePage(signal);
if (ropOverflowRecPtr.p->dirindex != (fragrecptr.p->lastOverIndex - 1)) {
@@ -8010,23 +7758,12 @@ void Dbacc::releaseOverpage(Signal* sign
/* THE LAST PAGE IN THE DIRECTORY WAS RELEASED IT IS NOW NECESSARY
* TO REMOVE ALL RELEASED OVERFLOW DIRECTORIES AT THE END OF THE LIST.
* ---------------------------------------------------------------------- */
- do {
- fragrecptr.p->lastOverIndex--;
- if (tropTmp2 == 0) {
- jam();
- ndbrequire(tropTmp1 != 0);
- ropOverflowrangeptr.p->dirArray[tropTmp1] = RNIL;
- rdDirptr.i = ropOverflowDirptr.i;
- releaseDirectory(signal);
- tropTmp1--;
- tropTmp2 = 255;
- } else {
- jam();
- tropTmp2--;
- }//if
- ropOverflowDirptr.i = ropOverflowrangeptr.p->dirArray[tropTmp1];
- ptrCheckGuard(ropOverflowDirptr, cdirarraysize, directoryarray);
- } while (ropOverflowDirptr.p->pagep[tropTmp2] == RNIL);
+ DynArr256 dir(directoryPool, fragrecptr.p->overflowdir);
+ DynArr256::ReleaseIterator iter;
+ dir.init(iter);
+ Uint32 rc;
+ while ((rc = dir.trim(0, iter))!=0);
+ fragrecptr.p->lastOverIndex = iter.m_sz ? iter.m_pos + 1 : 0;
/* ----------------------------------------------------------------------- */
/* RELEASE ANY OVERFLOW RECORDS THAT ARE PART OF THE FREE INDEX LIST WHICH */
/* DIRECTORY INDEX NOW HAS BEEN RELEASED. */
@@ -8086,56 +7823,6 @@ void Dbacc::releasePage(Signal* signal)
}//Dbacc::releasePage()
/* --------------------------------------------------------------------------------- */
-/* SEIZE_DIRECTORY */
-/* DESCRIPTION: A DIRECTORY BLOCK (ZDIRBLOCKSIZE NUMBERS OF DIRECTORY */
-/* RECORDS WILL BE ALLOCATED AND RETURNED. */
-/* SIZE OF DIRECTORY ERROR_CODE, WILL BE RETURNED IF THERE IS NO ANY */
-/* FREE BLOCK */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::seizeDirectory(Signal* signal)
-{
- Uint32 tsdyIndex;
-
- if (cfirstfreedir == RNIL) {
- jam();
- if (cdirarraysize <= cdirmemory) {
- jam();
- tresult = ZDIRSIZE_ERROR;
- return;
- } else {
- jam();
- sdDirptr.i = cdirmemory;
- ptrCheckGuard(sdDirptr, cdirarraysize, directoryarray);
- cdirmemory = cdirmemory + 1;
- }//if
- } else {
- jam();
- sdDirptr.i = cfirstfreedir;
- ptrCheckGuard(sdDirptr, cdirarraysize, directoryarray);
- cfirstfreedir = sdDirptr.p->pagep[0];
- sdDirptr.p->pagep[0] = RNIL;
- }//if
- for (tsdyIndex = 0; tsdyIndex <= 255; tsdyIndex++) {
- sdDirptr.p->pagep[tsdyIndex] = RNIL;
- }//for
-}//Dbacc::seizeDirectory()
-
-/* --------------------------------------------------------------------------------- */
-/* SEIZE_DIRRANGE */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::seizeDirrange(Signal* signal)
-{
- Uint32 tsdeIndex;
-
- newDirRangePtr.i = cfirstfreeDirrange;
- ptrCheckGuard(newDirRangePtr, cdirrangesize, dirRange);
- cfirstfreeDirrange = newDirRangePtr.p->dirArray[0];
- for (tsdeIndex = 0; tsdeIndex <= 255; tsdeIndex++) {
- newDirRangePtr.p->dirArray[tsdeIndex] = RNIL;
- }//for
-}//Dbacc::seizeDirrange()
-
-/* --------------------------------------------------------------------------------- */
/* SEIZE FRAGREC */
/* --------------------------------------------------------------------------------- */
void Dbacc::seizeFragrec(Signal* signal)
=== modified file 'storage/ndb/src/kernel/blocks/record_types.hpp'
--- a/storage/ndb/src/kernel/blocks/record_types.hpp 2011-10-07 13:15:08 +0000
+++ b/storage/ndb/src/kernel/blocks/record_types.hpp 2012-03-13 11:57:27 +0000
@@ -94,6 +94,7 @@
#define RT_DBTUP_PAGE MAKE_TID( 1, RG_DATAMEM)
#define RT_DBTUP_PAGE_MAP MAKE_TID( 2, RG_DATAMEM)
+#define RT_DBACC_DIRECTORY MAKE_TID( 3, RG_DATAMEM)
#define RT_JOB_BUFFER MAKE_TID( 1, RG_JOBBUFFER)
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2012-01-30 14:28:55 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2012-03-13 11:57:27 +0000
@@ -840,13 +840,6 @@ Configuration::calcSizeAlt(ConfigValues
* Acc Size Alt values
*/
// Can keep 65536 pages (= 0.5 GByte)
- cfg.put(CFG_ACC_DIR_RANGE,
- 2 * NO_OF_FRAG_PER_NODE * noOfAccTables* noOfReplicas);
-
- cfg.put(CFG_ACC_DIR_ARRAY,
- (noOfIndexPages >> 8) +
- 2 * NO_OF_FRAG_PER_NODE * noOfAccTables* noOfReplicas);
-
cfg.put(CFG_ACC_FRAGMENT,
NO_OF_FRAG_PER_NODE * noOfAccTables* noOfReplicas);
=== modified file 'storage/ndb/src/kernel/vm/DynArr256.cpp'
--- a/storage/ndb/src/kernel/vm/DynArr256.cpp 2012-02-07 15:43:54 +0000
+++ b/storage/ndb/src/kernel/vm/DynArr256.cpp 2012-03-13 11:55:23 +0000
@@ -33,6 +33,7 @@ struct DA256Free
{
Uint32 m_magic;
Uint32 m_next_free;
+ Uint32 m_prev_free;
};
struct DA256Node
@@ -46,8 +47,43 @@ struct DA256Page
struct DA256Node m_nodes[30];
bool get(Uint32 node, Uint32 idx, Uint32 type_id, Uint32*& val_ptr) const;
+ bool is_empty() const;
+ bool is_full() const;
+ Uint32 first_free() const;
+ Uint32 last_free() const;
};
+inline
+bool DA256Page::is_empty() const
+{
+ return !(0x7fff & (m_header[0].m_magic | m_header[1].m_magic));
+}
+
+inline
+bool DA256Page::is_full() const
+{
+ return !(0x7fff & ~(m_header[0].m_magic & m_header[1].m_magic));
+}
+
+inline
+Uint32 DA256Page::first_free() const
+{
+ Uint32 node = BitmaskImpl::ctz(~m_header[0].m_magic | 0x8000);
+ if (node == 15)
+ node = 15 + BitmaskImpl::ctz(~m_header[1].m_magic | 0x8000);
+ return node;
+}
+
+inline
+Uint32 DA256Page::last_free() const
+{
+ Uint32 node = 29 - BitmaskImpl::clz((~m_header[1].m_magic << 17) | 0x10000);
+ if (node == 14)
+ node = 14 - BitmaskImpl::clz((~m_header[0].m_magic << 17) | 0x10000);
+ return node;
+}
+
+
#undef require
#define require(x) require_exit_or_core_with_printer((x), 0, ndbout_printer)
//#define DA256_USE_PX
@@ -68,8 +104,11 @@ struct DA256Page
#endif
Uint32 verbose = 0;
Uint32 allocatedpages = 0;
+Uint32 releasedpages = 0;
+Uint32 maxallocatedpages = 0;
Uint32 allocatednodes = 0;
Uint32 releasednodes = 0;
+Uint32 maxallocatednodes = 0;
#endif
static
@@ -94,6 +133,7 @@ DynArr256Pool::DynArr256Pool()
{
m_type_id = RNIL;
m_first_free = RNIL;
+ m_last_free = RNIL;
m_memroot = 0;
}
@@ -163,6 +203,9 @@ DynArr256::get(Uint32 pos) const
return 0;
}
+#ifdef VM_TRACE
+ require((m_head.m_sz > 0) && (pos <= m_head.m_high_pos));
+#endif
#ifdef DA256_USE_PX
Uint32 px[4] = { (pos >> 24) & 255,
(pos >> 16) & 255,
@@ -251,6 +294,11 @@ DynArr256::set(Uint32 pos)
ptrI = * retVal;
}
+#ifdef VM_TRACE
+ if (pos > m_head.m_high_pos)
+ m_head.m_high_pos = pos;
+#endif
+
return retVal;
err:
@@ -296,16 +344,14 @@ initpage(DA256Page* p, Uint32 page_no, U
{
free = (DA256Free*)(p->m_nodes+i);
free->m_magic = type_id;
- free->m_next_free = (page_no << DA256_BITS) + (i + 1);
+ free->m_next_free = RNIL;
+ free->m_prev_free = RNIL;
#ifdef DA256_EXTRA_SAFE
DA256Node* node = p->m_nodes+i;
for (j = 0; j<17; j++)
node->m_lines[j].m_magic = type_id;
#endif
}
-
- free = (DA256Free*)(p->m_nodes+29);
- free->m_next_free = RNIL;
}
bool
@@ -370,9 +416,12 @@ DynArr256::init(ReleaseIterator &iter)
* 0 - done
* 1 - data
* 2 - no data
+ *
+ * if ptrVal is NULL, truncate work in trim mode and will stop
+ * (return 0) as soon value is not RNIL
*/
Uint32
-DynArr256::truncate(Uint32 keep_pos, ReleaseIterator& iter, Uint32* ptrVal)
+DynArr256::truncate(Uint32 trunc_pos, ReleaseIterator& iter, Uint32* ptrVal)
{
Uint32 type_id = (~m_pool.m_type_id) & 0xFFFF;
DA256Page * memroot = m_pool.m_memroot;
@@ -380,7 +429,7 @@ DynArr256::truncate(Uint32 keep_pos, Rel
for (;;)
{
if (iter.m_sz == 0 ||
- iter.m_pos < keep_pos ||
+ iter.m_pos < trunc_pos ||
m_head.m_sz == 0)
{
return 0;
@@ -391,7 +440,8 @@ DynArr256::truncate(Uint32 keep_pos, Rel
Uint32 page_no = ptrI >> DA256_BITS;
Uint32 page_idx = (ptrI & DA256_MASK) ;
DA256Page* page = memroot + page_no;
- Uint32 node_index = (iter.m_pos >> (8 * (m_head.m_sz - iter.m_sz))) & 255;
+ Uint32 node_addr = (iter.m_pos >> (8 * (m_head.m_sz - iter.m_sz)));
+ Uint32 node_index = node_addr & 255;
bool is_value = (iter.m_sz == m_head.m_sz);
if (unlikely(! page->get(page_idx, node_index, type_id, refPtr)))
@@ -399,10 +449,17 @@ DynArr256::truncate(Uint32 keep_pos, Rel
require(false);
}
assert(refPtr != NULL);
- *ptrVal = *refPtr;
+ if (ptrVal != NULL)
+ {
+ *ptrVal = *refPtr;
+ }
+ else if (is_value && *refPtr != RNIL)
+ {
+ return 0;
+ }
if (iter.m_sz == 1 &&
- (iter.m_pos >> (8 * (m_head.m_sz - iter.m_sz))) == 0)
+ node_addr == 0)
{
assert(iter.m_ptr_i[iter.m_sz] == m_head.m_ptr_i);
assert(iter.m_ptr_i[iter.m_sz + 1] == RNIL);
@@ -410,10 +467,10 @@ DynArr256::truncate(Uint32 keep_pos, Rel
m_pool.release(m_head.m_ptr_i);
m_head.m_sz --;
m_head.m_ptr_i = iter.m_ptr_i[iter.m_sz];
- return is_value ? 1 : 2;
+ if (is_value)
+ return 1;
}
-
- if (is_value || iter.m_ptr_i[iter.m_sz + 1] == *refPtr)
+ else if (is_value || iter.m_ptr_i[iter.m_sz + 1] == *refPtr)
{ // sz--
Uint32 ptrI = *refPtr;
if (!is_value)
@@ -438,7 +495,11 @@ DynArr256::truncate(Uint32 keep_pos, Rel
assert((iter.m_pos & ~(0xffffffff << (8 * (m_head.m_sz - iter.m_sz)))) == 0);
iter.m_pos --;
}
- if (is_value)
+#ifdef VM_TRACE
+ if (iter.m_pos < m_head.m_high_pos)
+ m_head.m_high_pos = iter.m_pos;
+#endif
+ if (is_value && ptrVal != NULL)
return 1;
}
else
@@ -502,6 +563,8 @@ seizenode(DA256Page* page, Uint32 idx, U
#ifdef UNIT_TEST
allocatednodes++;
+ if (allocatednodes - releasednodes > maxallocatednodes)
+ maxallocatednodes = allocatednodes - releasednodes;
#endif
return true;
}
@@ -574,28 +637,45 @@ DynArr256Pool::seize()
initpage(page, page_no, type_id);
#ifdef UNIT_TEST
allocatedpages++;
+ if (allocatedpages - releasedpages > maxallocatedpages)
+ maxallocatedpages = allocatedpages - releasedpages;
#endif
}
else
{
return RNIL;
}
- ff = (page_no << DA256_BITS);
+ m_last_free = m_first_free = ff = page_no;
}
else
{
- page = memroot + (ff >> DA256_BITS);
+ page = memroot + ff;
}
- Uint32 idx = ff & DA256_MASK;
+ Uint32 idx = page->first_free();
DA256Free * ptr = (DA256Free*)(page->m_nodes + idx);
if (likely(ptr->m_magic == type_id))
{
- Uint32 next = ptr->m_next_free;
+ Uint32 last_free = page->last_free();
+ Uint32 next_page = ((DA256Free*)(page->m_nodes + last_free))->m_next_free;
if (likely(seizenode(page, idx, type_id)))
{
- m_first_free = next;
- return ff;
+ if (page->is_full())
+ {
+ assert(m_first_free == ff);
+ m_first_free = next_page;
+ if (m_first_free == RNIL)
+ {
+ assert(m_last_free == ff);
+ m_last_free = RNIL;
+ }
+ else
+ {
+ page = memroot + next_page;
+ ((DA256Free*)(page->m_nodes + page->last_free()))->m_prev_free = RNIL;
+ }
+ }
+ return (ff << DA256_BITS) | idx;
}
}
@@ -613,15 +693,61 @@ DynArr256Pool::release(Uint32 ptrI)
Uint32 page_idx = ptrI & DA256_MASK;
DA256Page * memroot = m_memroot;
DA256Page * page = memroot + page_no;
-
DA256Free * ptr = (DA256Free*)(page->m_nodes + page_idx);
+
+ Guard2 g(m_mutex);
+ Uint32 last_free = page->last_free();
if (likely(releasenode(page, page_idx, type_id)))
{
ptr->m_magic = type_id;
- Guard2 g(m_mutex);
- Uint32 ff = m_first_free;
- ptr->m_next_free = ff;
- m_first_free = ptrI;
+ if (last_free > 29)
+ { // Add last to page free list
+ Uint32 lf = m_last_free;
+ ptr->m_prev_free = lf;
+ ptr->m_next_free = RNIL;
+ m_last_free = page_no;
+ if (m_first_free == RNIL)
+ m_first_free = page_no;
+
+ if (lf != RNIL)
+ {
+ page = memroot + lf;
+ DA256Free* pptr = (DA256Free*)(page->m_nodes + page->last_free());
+ pptr->m_next_free = page_no;
+ }
+ }
+ else if (page->is_empty())
+ {
+ // TODO msundell: release_page
+ // unlink from free page list
+ Uint32 nextpage = ((DA256Free*)(page->m_nodes + last_free))->m_next_free;
+ Uint32 prevpage = ((DA256Free*)(page->m_nodes + last_free))->m_prev_free;
+ m_ctx.release_page(type_id, page_no);
+#ifdef UNIT_TEST
+ releasedpages ++;
+#endif
+ if (nextpage != RNIL)
+ {
+ page = memroot + nextpage;
+ ((DA256Free*)(page->m_nodes + page->last_free()))->m_prev_free = prevpage;
+ }
+ if (prevpage != RNIL)
+ {
+ page = memroot + prevpage;
+ ((DA256Free*)(page->m_nodes + page->last_free()))->m_next_free = nextpage;
+ }
+ if (m_first_free == page_no)
+ {
+ m_first_free = nextpage;
+ }
+ if (m_last_free == page_no)
+ m_last_free = prevpage;
+ }
+ else if (page_idx > last_free)
+ { // last free node in page tracks free page list links
+ ptr->m_next_free = ((DA256Free*)(page->m_nodes + last_free))->m_next_free;
+ ptr->m_prev_free = ((DA256Free*)(page->m_nodes + last_free))->m_prev_free;
+ }
return;
}
require(false);
@@ -631,6 +757,36 @@ DynArr256Pool::release(Uint32 ptrI)
static
bool
+release(DynArr256& arr)
+{
+ DynArr256::ReleaseIterator iter;
+ arr.init(iter);
+ Uint32 val;
+ Uint32 cnt=0;
+ Uint64 start;
+ if (verbose > 2)
+ ndbout_c("allocatedpages: %d (max %d) releasedpages: %d allocatednodes: %d (max %d) releasednodes: %d",
+ allocatedpages, maxallocatedpages,
+ releasedpages,
+ allocatednodes, maxallocatednodes,
+ releasednodes);
+ start = my_micro_time();
+ while (arr.release(iter, &val))
+ cnt++;
+ start = my_micro_time() - start;
+ if (verbose > 1)
+ ndbout_c("allocatedpages: %d (max %d) releasedpages: %d allocatednodes: %d (max %d) releasednodes: %d (%llu us)"
+ " releasecnt: %d",
+ allocatedpages, maxallocatedpages,
+ releasedpages,
+ allocatednodes, maxallocatednodes,
+ releasednodes,
+ start, cnt);
+ return true;
+}
+
+static
+bool
simple(DynArr256 & arr, int argc, char* argv[])
{
if (verbose) ndbout_c("argc: %d", argc);
@@ -863,16 +1019,15 @@ write(DynArr256& arr, int argc, char **
{
Uint32 idx = ((rand() & (~seqmask)) + ((i + seq) & seqmask)) % maxidx;
Uint32 *ptr = arr.set(idx);
+ if (ptr == NULL) break; /* out of memory */
*ptr = i;
}
start = my_micro_time() - start;
float uspg = (float)start; uspg /= cnt;
if (verbose)
ndbout_c("Elapsed %lldus -> %f us/set", start, uspg);
- DynArr256::ReleaseIterator iter;
- arr.init(iter);
- Uint32 val;
- while(arr.release(iter, &val));
+ if (!release(arr))
+ return false;
}
return true;
}
@@ -923,6 +1078,12 @@ main(int argc, char** argv)
#else
verbose = 0;
#endif
+ while (argc > 1 && strcmp(argv[1], "-v") == 0)
+ {
+ verbose++;
+ argc--;
+ argv++;
+ }
Pool_context pc = test_context(10000 /* pages */);
@@ -983,20 +1144,18 @@ main(int argc, char** argv)
}
#endif
- DynArr256::ReleaseIterator iter;
- arr.init(iter);
- Uint32 cnt = 0, val;
- while (arr.release(iter, &val)) cnt++;
-
+ release(arr);
if (verbose)
- ndbout_c("allocatedpages: %d allocatednodes: %d releasednodes: %d"
- " releasecnt: %d",
- allocatedpages,
- allocatednodes,
- releasednodes,
- cnt);
+ ndbout_c("allocatedpages: %d (max %d) releasedpages: %d allocatednodes: %d (max %d) releasednodes: %d",
+ allocatedpages, maxallocatedpages,
+ releasedpages,
+ allocatednodes, maxallocatednodes,
+ releasednodes);
+
#ifdef TAP_TEST
- ok(allocatednodes == releasednodes, "release");
+ ok(allocatednodes == releasednodes &&
+ allocatedpages == releasedpages,
+ "release");
return exit_status();
#else
return 0;
=== modified file 'storage/ndb/src/kernel/vm/DynArr256.hpp'
--- a/storage/ndb/src/kernel/vm/DynArr256.hpp 2012-02-07 15:43:54 +0000
+++ b/storage/ndb/src/kernel/vm/DynArr256.hpp 2012-03-13 11:55:23 +0000
@@ -37,6 +37,7 @@ public:
protected:
Uint32 m_type_id;
Uint32 m_first_free;
+ Uint32 m_last_free;
Pool_context m_ctx;
struct DA256Page* m_memroot;
NdbMutex * m_mutex;
@@ -51,10 +52,17 @@ class DynArr256
public:
struct Head
{
+#ifdef VM_TRACE
+ Head() { m_ptr_i = RNIL; m_sz = 0; m_high_pos = 0; }
+#else
Head() { m_ptr_i = RNIL; m_sz = 0;}
+#endif
Uint32 m_ptr_i;
Uint32 m_sz;
+#ifdef VM_TRACE
+ Uint32 m_high_pos;
+#endif
bool isEmpty() const { return m_sz == 0;}
};
@@ -79,7 +87,8 @@ public:
* 2 - nodata
*/
Uint32 release(ReleaseIterator&, Uint32* retptr);
- Uint32 truncate(Uint32 keep_pos, ReleaseIterator&, Uint32* retptr);
+ Uint32 trim(Uint32 trim_pos, ReleaseIterator&);
+ Uint32 truncate(Uint32 trunc_pos, ReleaseIterator&, Uint32* retptr);
protected:
Head & m_head;
DynArr256Pool & m_pool;
@@ -94,4 +103,10 @@ Uint32 DynArr256::release(ReleaseIterato
return truncate(0, iter, retptr);
}
+inline
+Uint32 DynArr256::trim(Uint32 pos, ReleaseIterator& iter)
+{
+ return truncate(pos, iter, NULL);
+}
+
#endif
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (mauritz.sundell:4876 to 4881) | Mauritz Sundell | 13 Mar |