List:Commits« Previous MessageNext Message »
From:Mauritz Sundell Date:March 13 2012 12:05pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (mauritz.sundell:4876 to 4881)
View as plain text  
 4881 Mauritz Sundell	2012-03-13
      ndb - dbacc - use DynArr256 for page directory
      
      this will relax one size limitation of hash index
      of one partition, that is the number of rows are
      now not limited to ~45M, but note where are other
      limitations that can be more restrictive.
      
      pages are still from static pool (IndexMemory).
      localkey length into dbtup are still only one word.

    modified:
      storage/ndb/include/kernel/kernel_config_parameters.h
      storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
      storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
      storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
      storage/ndb/src/kernel/blocks/record_types.hpp
      storage/ndb/src/kernel/vm/Configuration.cpp
 4880 Mauritz Sundell	2012-03-13
      ndb - dbacc refactoring
      
      remove some dead code (members and methods).
      introduce inline mul_ZBUF_SIZE() function and use
      it instead of shift arithmetics.

    modified:
      storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
      storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
 4879 Mauritz Sundell	2012-03-13
      ndb - make DynArr256 release empty pages
      
      DynArr256 use pages with 30 nodes each.
      Each node can contain 256 words.
      
      DynArr256Pool are used for seize and release
      of nodes and it allocates new pages if necessary
      and now also releases pages when no nodes on the
      page are allocated.
      
      The pool keeps a double linked list of pages with
      at least one node free. Allocation of a node are
      made from the first page in that list.
      
      Within a page the first free node are used based
      on its index.
      The last free node are used to keep the links of
      the double linked list of pages with free nodes.
      Which nodes that are free are determined by bits
      in the headers magic fields.

    modified:
      storage/ndb/src/kernel/vm/DynArr256.cpp
      storage/ndb/src/kernel/vm/DynArr256.hpp
 4878 Mauritz Sundell	2012-03-13
      ndb - stricter DynArr256 in debug
      
      If one compile with VM_TRACE one can not
      get values from index above the highest
      index with value set.

    modified:
      storage/ndb/src/kernel/vm/DynArr256.cpp
      storage/ndb/src/kernel/vm/DynArr256.hpp
 4877 Mauritz Sundell	2012-03-13
      ndb - add trim function to DynArr256
      
      free memory from end of array and downwards as long
      as values are missing or RNIL.

    modified:
      storage/ndb/src/kernel/vm/DynArr256.cpp
      storage/ndb/src/kernel/vm/DynArr256.hpp
 4876 Frazer Clement	2012-03-12
      Bug#13428909 CONFLICT DETECTION OPERATION EXECUTION ERROR HANDLING INCOMPLETE
      
      - Error handling in some conflict detection/resolution scenarios was incomplete
      - This patch improves error handling and reporting
      - Also, reporting of misconfiguration is improved.
      - Infrastructure for manual testing is added, automated testing not setup yet.

    modified:
      mysql-test/suite/ndb_rpl/t/disabled.def
      sql/ha_ndbcluster.cc
      sql/ndb_conflict.cc
      sql/ndb_conflict_trans.cc
      storage/ndb/test/ndbapi/bench/asyncGenerator.cpp
      storage/ndb/test/ndbapi/bench/dbPopulate.cpp
      storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp
      storage/ndb/test/ndbapi/bench/mainPopulate.cpp
      storage/ndb/test/ndbapi/bench/ndb_error.hpp
      storage/ndb/test/ndbapi/bench/testData.h
      storage/ndb/test/ndbapi/bench/userInterface.h
=== modified file 'storage/ndb/include/kernel/kernel_config_parameters.h'
--- a/storage/ndb/include/kernel/kernel_config_parameters.h	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/kernel_config_parameters.h	2012-03-13 11:57:27 +0000
@@ -21,8 +21,6 @@
 
 #define PRIVATE_BASE          14000
 
-#define CFG_ACC_DIR_RANGE     (PRIVATE_BASE +  1)
-#define CFG_ACC_DIR_ARRAY     (PRIVATE_BASE +  2)
 #define CFG_ACC_FRAGMENT      (PRIVATE_BASE +  3)
 #define CFG_ACC_OP_RECS       (PRIVATE_BASE +  4)
 #define CFG_ACC_OVERFLOW_RECS (PRIVATE_BASE +  5)

=== modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp	2011-12-01 10:45:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp	2012-03-13 11:57:27 +0000
@@ -24,6 +24,7 @@
 #endif
 
 #include <pc.hpp>
+#include <DynArr256.hpp>
 #include <SimulatedBlock.hpp>
 
 #ifdef DBACC_C
@@ -103,8 +104,6 @@ ndbout << "Ptr: " << ptr.p->word32 << "
 #define ZDEFAULT_LIST 3
 #define ZWORDS_IN_PAGE 2048
 #define ZADDFRAG 0
-#define ZDIRARRAY 68
-#define ZDIRRANGESIZE 65
 //#define ZEMPTY_FRAGMENT 0
 #define ZFRAGMENTSIZE 64
 #define ZFIRSTTIME 1
@@ -154,6 +153,7 @@ ndbout << "Ptr: " << ptr.p->word32 << "
 #define ZREL_ROOT_FRAG 5
 #define ZREL_FRAG 6
 #define ZREL_DIR 7
+#define ZREL_ODIR 8
 
 /* ------------------------------------------------------------------------- */
 /* ERROR CODES                                                               */
@@ -180,6 +180,18 @@ ndbout << "Ptr: " << ptr.p->word32 << "
 #define ZTO_OP_STATE_ERROR 631
 #define ZTOO_EARLY_ACCESS_ERROR 632
 #define ZDIR_RANGE_FULL_ERROR 633 // on fragment
+
+#if ZBUF_SIZE != ((1 << ZSHIFT_PLUS) - (1 << ZSHIFT_MINUS))
+#error ZBUF_SIZE != ((1 << ZSHIFT_PLUS) - (1 << ZSHIFT_MINUS))
+#endif
+
+static
+inline
+Uint32 mul_ZBUF_SIZE(Uint32 i)
+{
+  return (i << ZSHIFT_PLUS) - (i << ZSHIFT_MINUS);
+}
+
 #endif
 
 class ElementHeader {
@@ -305,24 +317,6 @@ enum State {
 // Records
 
 /* --------------------------------------------------------------------------------- */
-/* DIRECTORY RANGE                                                                   */
-/* --------------------------------------------------------------------------------- */
-  struct DirRange {
-    Uint32 dirArray[256];
-  }; /* p2c: size = 1024 bytes */
-  
-  typedef Ptr<DirRange> DirRangePtr;
-
-/* --------------------------------------------------------------------------------- */
-/* DIRECTORYARRAY                                                                    */
-/* --------------------------------------------------------------------------------- */
-struct Directoryarray {
-  Uint32 pagep[256];
-}; /* p2c: size = 1024 bytes */
-
-  typedef Ptr<Directoryarray> DirectoryarrayPtr;
-
-/* --------------------------------------------------------------------------------- */
 /* FRAGMENTREC. ALL INFORMATION ABOUT FRAMENT AND HASH TABLE IS SAVED IN FRAGMENT    */
 /*         REC  A POINTER TO FRAGMENT RECORD IS SAVED IN ROOTFRAGMENTREC FRAGMENT    */
 /* --------------------------------------------------------------------------------- */
@@ -355,7 +349,6 @@ struct Fragmentrec {
   Uint32 expReceiveIndex;
   Uint32 expReceiveForward;
   Uint32 expSenderDirIndex;
-  Uint32 expSenderDirptr;
   Uint32 expSenderIndex;
   Uint32 expSenderPageptr;
 
@@ -369,9 +362,8 @@ struct Fragmentrec {
 // in its turn references the pages) for the bucket pages and the overflow
 // bucket pages.
 //-----------------------------------------------------------------------------
-  Uint32 directory;
-  Uint32 dirsize;
-  Uint32 overflowdir;
+  DynArr256::Head directory;
+  DynArr256::Head overflowdir;
   Uint32 lastOverIndex;
 
 //-----------------------------------------------------------------------------
@@ -458,12 +450,10 @@ struct Fragmentrec {
 // hashcheckbit is the bit to check whether to send element to split bucket or not
 // k (== 6) is the number of buckets per page
 // lhfragbits is the number of bits used to calculate the fragment id
-// lhdirbits is the number of bits used to calculate the page id
 //-----------------------------------------------------------------------------
   Uint8 hashcheckbit;
   Uint8 k;
   Uint8 lhfragbits;
-  Uint8 lhdirbits;
 
 //-----------------------------------------------------------------------------
 // nodetype can only be STORED in this release. Is currently only set, never read
@@ -692,10 +682,7 @@ private:
   void releaseFragResources(Signal* signal, Uint32 fragIndex);
   void releaseRootFragRecord(Signal* signal, RootfragmentrecPtr rootPtr);
   void releaseRootFragResources(Signal* signal, Uint32 tableId);
-  void releaseDirResources(Signal* signal,
-                           Uint32 fragIndex,
-                           Uint32 dirIndex,
-                           Uint32 startIndex);
+  void releaseDirResources(Signal* signal);
   void releaseDirectoryResources(Signal* signal,
                                  Uint32 fragIndex,
                                  Uint32 dirIndex,
@@ -707,8 +694,6 @@ private:
   void initScanFragmentPart(Signal* signal);
   Uint32 checkScanExpand(Signal* signal);
   Uint32 checkScanShrink(Signal* signal);
-  void initialiseDirRec(Signal* signal);
-  void initialiseDirRangeRec(Signal* signal);
   void initialiseFragRec(Signal* signal);
   void initialiseFsConnectionRec(Signal* signal);
   void initialiseFsOpRec(Signal* signal);
@@ -776,6 +761,9 @@ private:
   void seizeRightlist(Signal* signal);
   Uint32 readTablePk(Uint32 lkey1, Uint32 lkey2, Uint32 eh, OperationrecPtr);
   Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
+  Uint32 getPagePtr(DynArr256::Head&, Uint32);
+  bool setPagePtr(DynArr256::Head& directory, Uint32 index, Uint32 ptri);
+  Uint32 unsetPagePtr(DynArr256::Head& directory, Uint32 index);
   void getdirindex(Signal* signal);
   void commitdelete(Signal* signal);
   void deleteElement(Signal* signal);
@@ -809,17 +797,13 @@ private:
   void putOpInFragWaitQue(Signal* signal);
   void putOverflowRecInFrag(Signal* signal);
   void putRecInFreeOverdir(Signal* signal);
-  void releaseDirectory(Signal* signal);
-  void releaseDirrange(Signal* signal);
   void releaseFsConnRec(Signal* signal);
   void releaseFsOpRec(Signal* signal);
   void releaseOpRec(Signal* signal);
   void releaseOverflowRec(Signal* signal);
   void releaseOverpage(Signal* signal);
   void releasePage(Signal* signal);
-  void releaseLogicalPage(Fragmentrec * fragP, Uint32 logicalPageId);
   void seizeDirectory(Signal* signal);
-  void seizeDirrange(Signal* signal);
   void seizeFragrec(Signal* signal);
   void seizeFsConnectRec(Signal* signal);
   void seizeFsOpRec(Signal* signal);
@@ -870,27 +854,9 @@ private:
 
   // Variables
 /* --------------------------------------------------------------------------------- */
-/* DIRECTORY RANGE                                                                   */
+/* DIRECTORY                                                                         */
 /* --------------------------------------------------------------------------------- */
-  DirRange *dirRange;
-  DirRangePtr expDirRangePtr;
-  DirRangePtr gnsDirRangePtr;
-  DirRangePtr newDirRangePtr;
-  DirRangePtr rdDirRangePtr;
-  DirRangePtr nciOverflowrangeptr;
-  Uint32 cdirrangesize;
-  Uint32 cfirstfreeDirrange;
-/* --------------------------------------------------------------------------------- */
-/* DIRECTORYARRAY                                                                    */
-/* --------------------------------------------------------------------------------- */
-  Directoryarray *directoryarray;
-  DirectoryarrayPtr expDirptr;
-  DirectoryarrayPtr rdDirptr;
-  DirectoryarrayPtr sdDirptr;
-  DirectoryarrayPtr nciOverflowDirptr;
-  Uint32 cdirarraysize;
-  Uint32 cdirmemory;
-  Uint32 cfirstfreedir;
+  DynArr256Pool   directoryPool;
 /* --------------------------------------------------------------------------------- */
 /* FRAGMENTREC. ALL INFORMATION ABOUT FRAMENT AND HASH TABLE IS SAVED IN FRAGMENT    */
 /*         REC  A POINTER TO FRAGMENT RECORD IS SAVED IN ROOTFRAGMENTREC FRAGMENT    */

=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp	2011-12-01 10:45:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp	2012-03-13 11:57:27 +0000
@@ -24,18 +24,17 @@
 
 void Dbacc::initData() 
 {
-  cdirarraysize = ZDIRARRAY;
   coprecsize = ZOPRECSIZE;
   cpagesize = ZPAGESIZE;
   ctablesize = ZTABLESIZE;
   cfragmentsize = ZFRAGMENTSIZE;
-  cdirrangesize = ZDIRRANGESIZE;
   coverflowrecsize = ZOVERFLOWRECSIZE;
   cscanRecSize = ZSCAN_REC_SIZE;
 
-  
-  dirRange = 0;
-  directoryarray = 0;
+  Pool_context pc;
+  pc.m_block = this;
+  directoryPool.init(RT_DBACC_DIRECTORY, pc);
+
   fragmentrec = 0;
   operationrec = 0;
   overflowRecord = 0;
@@ -102,14 +101,6 @@ void Dbacc::initRecords()
 					    sizeof(Operationrec),
 					    coprecsize);
 
-  dirRange = (DirRange*)allocRecord("DirRange",
-				    sizeof(DirRange), 
-				    cdirrangesize);
-
-  directoryarray = (Directoryarray*)allocRecord("Directoryarray",
-						sizeof(Directoryarray), 
-						cdirarraysize);
-
   fragmentrec = (Fragmentrec*)allocRecord("Fragmentrec",
 					  sizeof(Fragmentrec), 
 					  cfragmentsize);
@@ -166,16 +157,7 @@ Dbacc::Dbacc(Block_context& ctx, Uint32
 
 #ifdef VM_TRACE
   {
-    void* tmp[] = { &expDirRangePtr,
-		    &gnsDirRangePtr,
-		    &newDirRangePtr,
-		    &rdDirRangePtr,
-		    &nciOverflowrangeptr,
-                    &expDirptr,
-                    &rdDirptr,
-                    &sdDirptr,
-                    &nciOverflowDirptr,
-                    &fragrecptr,
+    void* tmp[] = { &fragrecptr,
                     &operationRecPtr,
                     &idrOperationRecPtr,
                     &mlpqOperPtr,
@@ -231,14 +213,6 @@ Dbacc::Dbacc(Block_context& ctx, Uint32
 
 Dbacc::~Dbacc() 
 {
-  deallocRecord((void **)&dirRange, "DirRange",
-		sizeof(DirRange), 
-		cdirrangesize);
-  
-  deallocRecord((void **)&directoryarray, "Directoryarray",
-		sizeof(Directoryarray), 
-		cdirarraysize);
-  
   deallocRecord((void **)&fragmentrec, "Fragmentrec",
 		sizeof(Fragmentrec), 
 		cfragmentsize);

=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2011-12-23 14:34:57 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2012-03-13 11:57:27 +0000
@@ -99,12 +99,10 @@ void Dbacc::execCONTINUEB(Signal* signal
       break;
     }
   case ZREL_DIR:
+  case ZREL_ODIR:
     {
       jam();
-      Uint32 fragIndex = signal->theData[1];
-      Uint32 dirIndex = signal->theData[2];
-      Uint32 startIndex = signal->theData[3];
-      releaseDirResources(signal, fragIndex, dirIndex, startIndex);
+      releaseDirResources(signal);
       break;
     }
 
@@ -215,11 +213,9 @@ void Dbacc::initialiseRecordsLab(Signal*
     break;
   case 4:
     jam();
-    initialiseDirRec(signal);
     break;
   case 5:
     jam();
-    initialiseDirRangeRec(signal);
     break;
   case 6:
     jam();
@@ -296,8 +292,6 @@ void Dbacc::execREAD_CONFIG_REQ(Signal*
     m_ctx.m_config.getOwnConfigIterator();
   ndbrequire(p != 0);
   
-  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_DIR_RANGE, &cdirrangesize));
-  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_DIR_ARRAY, &cdirarraysize));
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_FRAGMENT, &cfragmentsize));
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_OP_RECS, &coprecsize));
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_ACC_OVERFLOW_RECS, 
@@ -336,48 +330,6 @@ void Dbacc::sttorrysignalLab(Signal* sig
 }//Dbacc::sttorrysignalLab()
 
 /* --------------------------------------------------------------------------------- */
-/* INITIALISE_DIR_REC                                                                */
-/*              INITIALATES THE DIRECTORY RECORDS.                                   */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::initialiseDirRec(Signal* signal) 
-{
-  DirectoryarrayPtr idrDirptr;
-  ndbrequire(cdirarraysize > 0);
-  for (idrDirptr.i = 0; idrDirptr.i < cdirarraysize; idrDirptr.i++) {
-    refresh_watch_dog();
-    ptrAss(idrDirptr, directoryarray);
-    for (Uint32 i = 0; i <= 255; i++) {
-      idrDirptr.p->pagep[i] = RNIL;
-    }//for
-  }//for
-  cdirmemory = 0;
-  cfirstfreedir = RNIL;
-}//Dbacc::initialiseDirRec()
-
-/* --------------------------------------------------------------------------------- */
-/* INITIALISE_DIR_RANGE_REC                                                          */
-/*              INITIALATES THE DIR_RANGE RECORDS.                                   */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::initialiseDirRangeRec(Signal* signal) 
-{
-  DirRangePtr idrDirRangePtr;
-
-  ndbrequire(cdirrangesize > 0);
-  for (idrDirRangePtr.i = 0; idrDirRangePtr.i < cdirrangesize; idrDirRangePtr.i++) {
-    refresh_watch_dog();
-    ptrAss(idrDirRangePtr, dirRange);
-    idrDirRangePtr.p->dirArray[0] = idrDirRangePtr.i + 1;
-    for (Uint32 i = 1; i < 256; i++) {
-      idrDirRangePtr.p->dirArray[i] = RNIL;
-    }//for
-  }//for
-  idrDirRangePtr.i = cdirrangesize - 1;
-  ptrAss(idrDirRangePtr, dirRange);
-  idrDirRangePtr.p->dirArray[0] = RNIL;
-  cfirstfreeDirrange = 0;
-}//Dbacc::initialiseDirRangeRec()
-
-/* --------------------------------------------------------------------------------- */
 /* INITIALISE_FRAG_REC                                                               */
 /*              INITIALATES THE FRAGMENT RECORDS.                                    */
 /* --------------------------------------------------------------------------------- */
@@ -545,55 +497,22 @@ void Dbacc::execACCFRAGREQ(Signal* signa
     addFragRefuse(signal, ZFULL_FRAGRECORD_ERROR);
     return;
   }//if
-  if (cfirstfreeDirrange == RNIL) {
-    jam();
-    releaseFragRecord(signal, fragrecptr);
-    addFragRefuse(signal, ZDIR_RANGE_ERROR);
-    return;
-  } else {
-    jam();
-    seizeDirrange(signal);
-  }//if
-
-  fragrecptr.p->directory = newDirRangePtr.i;
-  seizeDirectory(signal);
-  if (tresult < ZLIMIT_OF_ERROR) {
-    jam();
-    newDirRangePtr.p->dirArray[0] = sdDirptr.i;
-  } else {
-    jam();
-    addFragRefuse(signal, tresult);
-    return;
-  }//if
-
   seizePage(signal);
   if (tresult > ZLIMIT_OF_ERROR) {
     jam();
     addFragRefuse(signal, tresult);
     return;
   }//if
-  sdDirptr.p->pagep[0] = spPageptr.i;
+  if (!setPagePtr(fragrecptr.p->directory, 0, spPageptr.i))
+  {
+    jam();
+    addFragRefuse(signal, ZDIR_RANGE_FULL_ERROR);
+    return;
+  }
+
   tipPageId = 0;
   inpPageptr = spPageptr;
   initPage(signal);
-  if (cfirstfreeDirrange == RNIL) {
-    jam();
-    addFragRefuse(signal, ZDIR_RANGE_ERROR);
-    return;
-  } else {
-    jam();
-    seizeDirrange(signal);
-  }//if
-  fragrecptr.p->overflowdir = newDirRangePtr.i;
-  seizeDirectory(signal);
-  if (tresult < ZLIMIT_OF_ERROR) {
-    jam();
-    newDirRangePtr.p->dirArray[0] = sdDirptr.i;
-  } else {
-    jam();
-    addFragRefuse(signal, tresult);
-    return;
-  }//if
 
   Uint32 userPtr = req->userPtr;
   BlockReference retRef = req->userRef;
@@ -723,14 +642,24 @@ void Dbacc::releaseFragResources(Signal*
   regFragPtr.i = fragIndex;
   ptrCheckGuard(regFragPtr, cfragmentsize, fragmentrec);
   verifyFragCorrect(regFragPtr);
-  if (regFragPtr.p->directory != RNIL) {
-    jam();
-    releaseDirResources(signal, regFragPtr.i, regFragPtr.p->directory, 0);
-    regFragPtr.p->directory = RNIL;
-  } else if (regFragPtr.p->overflowdir != RNIL) {
+  if (!regFragPtr.p->directory.isEmpty()) {
     jam();
-    releaseDirResources(signal, regFragPtr.i, regFragPtr.p->overflowdir, 0);
-    regFragPtr.p->overflowdir = RNIL;
+    DynArr256::ReleaseIterator iter;
+    DynArr256 dir(directoryPool, regFragPtr.p->directory);
+    dir.init(iter);
+    signal->theData[0] = ZREL_DIR;
+    signal->theData[1] = regFragPtr.i;
+    memcpy(&signal->theData[2], &iter, sizeof(iter));
+    sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2 + sizeof(iter) / 4, JBB);
+  } else if (!regFragPtr.p->overflowdir.isEmpty()) {
+    jam();
+    DynArr256::ReleaseIterator iter;
+    DynArr256 dir(directoryPool, regFragPtr.p->overflowdir);
+    dir.init(iter);
+    signal->theData[0] = ZREL_ODIR;
+    signal->theData[1] = regFragPtr.i;
+    memcpy(&signal->theData[2], &iter, sizeof(iter));
+    sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2 + sizeof(iter) / 4, JBB);
   } else if (regFragPtr.p->firstOverflowRec != RNIL) {
     jam();
     releaseOverflowResources(signal, regFragPtr);
@@ -752,58 +681,68 @@ void Dbacc::verifyFragCorrect(Fragmentre
   ndbrequire(regFragPtr.p->lockOwnersList == RNIL);
 }//Dbacc::verifyFragCorrect()
 
-void Dbacc::releaseDirResources(Signal* signal, 
-				Uint32 fragIndex, 
-				Uint32 dirIndex, 
-				Uint32 startIndex)
-{
-  DirRangePtr regDirRangePtr;
-  regDirRangePtr.i = dirIndex;
-  ptrCheckGuard(regDirRangePtr, cdirrangesize, dirRange);
-  for (Uint32 i = startIndex; i < 256; i++) {
-    jam();
-    if (regDirRangePtr.p->dirArray[i] != RNIL) {
-      jam();
-      Uint32 directoryIndex = regDirRangePtr.p->dirArray[i];
-      regDirRangePtr.p->dirArray[i] = RNIL;
-      releaseDirectoryResources(signal, fragIndex, dirIndex, (i + 1), directoryIndex);
-      return;
-    }//if
-  }//for
-  rdDirRangePtr = regDirRangePtr;
-  releaseDirrange(signal);
-  signal->theData[0] = ZREL_FRAG;
-  signal->theData[1] = fragIndex;
-  sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2, JBB);
-}//Dbacc::releaseDirResources()
+void Dbacc::releaseDirResources(Signal* signal)
+{
+  jam();
+  Uint32 fragIndex = signal->theData[1];
+
+  DynArr256::ReleaseIterator iter;
+  memcpy(&iter, &signal->theData[2], sizeof(iter));
+
+  FragmentrecPtr regFragPtr;
+  regFragPtr.i = fragIndex;
+  ptrCheckGuard(regFragPtr, cfragmentsize, fragmentrec);
+  verifyFragCorrect(regFragPtr);
+
+  DynArr256::Head* directory;
+  switch (signal->theData[0])
+  {
+  case ZREL_DIR:
+    jam();
+    directory = &regFragPtr.p->directory;
+    break;
+  case ZREL_ODIR:
+    jam();
+    directory = &regFragPtr.p->overflowdir;
+    break;
+  default:
+    ndbrequire(false);
+  }
 
-void Dbacc::releaseDirectoryResources(Signal* signal,
-                                      Uint32 fragIndex,
-                                      Uint32 dirIndex,
-                                      Uint32 startIndex,
-                                      Uint32 directoryIndex)
-{
-  DirectoryarrayPtr regDirPtr;
-  regDirPtr.i = directoryIndex;
-  ptrCheckGuard(regDirPtr, cdirarraysize, directoryarray);
-  for (Uint32 i = 0; i < 256; i++) {
+  DynArr256 dir(directoryPool, *directory);
+  Uint32 ret;
+  Uint32 pagei;
+  /* TODO: find a good value for count
+   * bigger value means quicker release of big index,
+   * but longer time slice so less concurrent
+   */
+  int count = 10;
+  while (count > 0 &&
+         (ret = dir.release(iter, &pagei)) != 0)
+  {
     jam();
-    if (regDirPtr.p->pagep[i] != RNIL) {
+    count--;
+    if (ret == 1 && pagei != RNIL)
+    {
       jam();
-      rpPageptr.i = regDirPtr.p->pagep[i];
+      rpPageptr.i = pagei;
       ptrCheckGuard(rpPageptr, cpagesize, page8);
       releasePage(signal);
-      regDirPtr.p->pagep[i] = RNIL;
-    }//if
-  }//for
-  rdDirptr = regDirPtr;
-  releaseDirectory(signal);
-  signal->theData[0] = ZREL_DIR;
-  signal->theData[1] = fragIndex;
-  signal->theData[2] = dirIndex;
-  signal->theData[3] = startIndex;
-  sendSignal(cownBlockref, GSN_CONTINUEB, signal, 4, JBB);
-}//Dbacc::releaseDirectoryResources()
+    }
+  }
+  if (ret != 0)
+  {
+    jam();
+    memcpy(&signal->theData[2], &iter, sizeof(iter));
+    sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2 + sizeof(iter) / 4, JBB);
+  }
+  else
+  {
+    jam();
+    signal->theData[0] = ZREL_FRAG;
+    sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2, JBB);
+  }
+}//Dbacc::releaseDirResources()
 
 void Dbacc::releaseOverflowResources(Signal* signal, FragmentrecPtr regFragPtr)
 {
@@ -2662,8 +2601,6 @@ void Dbacc::execACC_LOCKREQ(Signal* sign
 /* --------------------------------------------------------------------------------- */
 void Dbacc::insertElement(Signal* signal) 
 {
-  DirRangePtr inrOverflowrangeptr;
-  DirectoryarrayPtr inrOverflowDirptr;
   OverflowRecordPtr inrOverflowRecPtr;
   Page8Ptr inrNewPageptr;
   Uint32 tinrNextSamePage;
@@ -2694,12 +2631,7 @@ void Dbacc::insertElement(Signal* signal
       if (tinrNextSamePage == ZFALSE) {
         jam();     /* NEXT CONTAINER IS IN AN OVERFLOW PAGE */
         tinrTmp = idrPageptr.p->word32[tidrContainerptr + 1];
-        inrOverflowrangeptr.i = fragrecptr.p->overflowdir;
-        ptrCheckGuard(inrOverflowrangeptr, cdirrangesize, dirRange);
-        arrGuard((tinrTmp >> 8), 256);
-        inrOverflowDirptr.i = inrOverflowrangeptr.p->dirArray[tinrTmp >> 8];
-        ptrCheckGuard(inrOverflowDirptr, cdirarraysize, directoryarray);
-        idrPageptr.i = inrOverflowDirptr.p->pagep[tinrTmp & 0xff];
+        idrPageptr.i = getPagePtr(fragrecptr.p->overflowdir, tinrTmp);
         ptrCheckGuard(idrPageptr, cpagesize, page8);
       }//if
       ndbrequire(tidrPageindex < ZEMPTYLIST);
@@ -2791,8 +2723,7 @@ void Dbacc::insertContainer(Signal* sign
   Uint32 guard26;
 
   tidrResult = ZFALSE;
-  tidrContainerptr = (tidrPageindex << ZSHIFT_PLUS) - (tidrPageindex << ZSHIFT_MINUS);
-  tidrContainerptr = tidrContainerptr + ZHEAD_SIZE;
+  tidrContainerptr = mul_ZBUF_SIZE(tidrPageindex) + ZHEAD_SIZE;
   /* --------------------------------------------------------------------------------- */
   /*       CALCULATE THE POINTER TO THE ELEMENT TO BE INSERTED AND THE POINTER TO THE  */
   /*       CONTAINER HEADER OF THE OTHER SIDE OF THE BUFFER.                           */
@@ -3027,7 +2958,7 @@ void Dbacc::seizeLeftlist(Signal* signal
   Uint32 tsllHeadIndex;
   Uint32 tsllTmp;
 
-  tsllHeadIndex = ((tslPageindex << ZSHIFT_PLUS) - (tslPageindex << ZSHIFT_MINUS)) + ZHEAD_SIZE;
+  tsllHeadIndex = mul_ZBUF_SIZE(tslPageindex) + ZHEAD_SIZE;
   arrGuard(tsllHeadIndex + 1, 2048);
   tslNextfree = slPageptr.p->word32[tsllHeadIndex];
   tslPrevfree = slPageptr.p->word32[tsllHeadIndex + 1];
@@ -3043,13 +2974,13 @@ void Dbacc::seizeLeftlist(Signal* signal
   } else {
     ndbrequire(tslPrevfree < ZEMPTYLIST);
     jam();
-    tsllTmp = ((tslPrevfree << ZSHIFT_PLUS) - (tslPrevfree << ZSHIFT_MINUS)) + ZHEAD_SIZE;
+    tsllTmp = mul_ZBUF_SIZE(tslPrevfree) + ZHEAD_SIZE;
     dbgWord32(slPageptr, tsllTmp, tslNextfree);
     slPageptr.p->word32[tsllTmp] = tslNextfree;
   }//if
   if (tslNextfree < ZEMPTYLIST) {
     jam();
-    tsllTmp = (((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ZHEAD_SIZE) + 1;
+    tsllTmp = mul_ZBUF_SIZE(tslNextfree) + ZHEAD_SIZE + 1;
     dbgWord32(slPageptr, tsllTmp, tslPrevfree);
     slPageptr.p->word32[tsllTmp] = tslPrevfree;
   } else {
@@ -3084,7 +3015,7 @@ void Dbacc::seizeLeftlist(Signal* signal
     slPageptr.p->word32[ZPOS_EMPTY_LIST] = tsllTmp;
     if (tslNextfree < ZEMPTYLIST) {
       jam();
-      tsllTmp = ((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ZHEAD_SIZE;
+      tsllTmp = mul_ZBUF_SIZE(tslNextfree) + ZHEAD_SIZE;
       tsllTmp1 = slPageptr.p->word32[tsllTmp] & 0xfe03ffff;
       tsllTmp1 = tsllTmp1 | (tslPageindex << 18);
       dbgWord32(slPageptr, tsllTmp, tsllTmp1);
@@ -3112,7 +3043,7 @@ void Dbacc::seizeRightlist(Signal* signa
   Uint32 tsrlHeadIndex;
   Uint32 tsrlTmp;
 
-  tsrlHeadIndex = ((tslPageindex << ZSHIFT_PLUS) - (tslPageindex << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
+  tsrlHeadIndex = mul_ZBUF_SIZE(tslPageindex) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
   arrGuard(tsrlHeadIndex + 1, 2048);
   tslNextfree = slPageptr.p->word32[tsrlHeadIndex];
   tslPrevfree = slPageptr.p->word32[tsrlHeadIndex + 1];
@@ -3124,13 +3055,13 @@ void Dbacc::seizeRightlist(Signal* signa
   } else {
     ndbrequire(tslPrevfree < ZEMPTYLIST);
     jam();
-    tsrlTmp = ((tslPrevfree << ZSHIFT_PLUS) - (tslPrevfree << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
+    tsrlTmp = mul_ZBUF_SIZE(tslPrevfree) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
     dbgWord32(slPageptr, tsrlTmp, tslNextfree);
     slPageptr.p->word32[tsrlTmp] = tslNextfree;
   }//if
   if (tslNextfree < ZEMPTYLIST) {
     jam();
-    tsrlTmp = ((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - (ZCON_HEAD_SIZE - 1));
+    tsrlTmp = mul_ZBUF_SIZE(tslNextfree) + ((ZHEAD_SIZE + ZBUF_SIZE) - (ZCON_HEAD_SIZE - 1));
     dbgWord32(slPageptr, tsrlTmp, tslPrevfree);
     slPageptr.p->word32[tsrlTmp] = tslPrevfree;
   } else {
@@ -3163,7 +3094,7 @@ void Dbacc::seizeRightlist(Signal* signa
     slPageptr.p->word32[ZPOS_EMPTY_LIST] = tsrlTmp | (tslPageindex << 16);
     if (tslNextfree < ZEMPTYLIST) {
       jam();
-      tsrlTmp = ((tslNextfree << ZSHIFT_PLUS) - (tslNextfree << ZSHIFT_MINUS)) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
+      tsrlTmp = mul_ZBUF_SIZE(tslNextfree) + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
       tsrlTmp1 = slPageptr.p->word32[tsrlTmp] & 0xfe03ffff;
       dbgWord32(slPageptr, tsrlTmp, tsrlTmp1 | (tslPageindex << 18));
       slPageptr.p->word32[tsrlTmp] = tsrlTmp1 | (tslPageindex << 18);
@@ -3223,10 +3154,33 @@ void Dbacc::seizeRightlist(Signal* signa
 /*                     THE ADDRESS OF THE ELEMENT IN THE HASH TABLE,(GDI_PAGEPTR,    */
 /*                     TGDI_PAGEINDEX) ACCORDING TO LH3.                             */
 /* --------------------------------------------------------------------------------- */
+Uint32 Dbacc::getPagePtr(DynArr256::Head& directory, Uint32 index)
+{
+  DynArr256 dir(directoryPool, directory);
+  Uint32* ptr = dir.get(index);
+  return *ptr;
+}
+
+bool Dbacc::setPagePtr(DynArr256::Head& directory, Uint32 index, Uint32 ptri)
+{
+  DynArr256 dir(directoryPool, directory);
+  Uint32* ptr = dir.set(index);
+  if (ptr == NULL) return false;
+  *ptr = ptri;
+  return true;
+}
+
+Uint32 Dbacc::unsetPagePtr(DynArr256::Head& directory, Uint32 index)
+{
+  DynArr256 dir(directoryPool, directory);
+  Uint32* ptr = dir.get(index);
+  Uint32 ptri = *ptr;
+  *ptr = RNIL;
+  return ptri;
+}
+
 void Dbacc::getdirindex(Signal* signal) 
 {
-  DirRangePtr gdiDirRangePtr;
-  DirectoryarrayPtr gdiDirptr;
   Uint32 tgdiTmp;
   Uint32 tgdiAddress;
 
@@ -3235,17 +3189,12 @@ void Dbacc::getdirindex(Signal* signal)
   tgdiTmp = operationRecPtr.p->hashValue >> tgdiTmp;
   tgdiTmp = (tgdiTmp << fragrecptr.p->k) | tgdiPageindex;
   tgdiAddress = tgdiTmp & fragrecptr.p->maxp;
-  gdiDirRangePtr.i = fragrecptr.p->directory;
-  ptrCheckGuard(gdiDirRangePtr, cdirrangesize, dirRange);
   if (tgdiAddress < fragrecptr.p->p) {
     jam();
     tgdiAddress = tgdiTmp & ((fragrecptr.p->maxp << 1) | 1);
   }//if
   tgdiTmp = tgdiAddress >> fragrecptr.p->k;
-  arrGuard((tgdiTmp >> 8), 256);
-  gdiDirptr.i = gdiDirRangePtr.p->dirArray[tgdiTmp >> 8];
-  ptrCheckGuard(gdiDirptr, cdirarraysize, directoryarray);
-  gdiPageptr.i = gdiDirptr.p->pagep[tgdiTmp & 0xff];	/* DIRECTORY INDEX OF SEND BUCKET PAGE */
+  gdiPageptr.i = getPagePtr(fragrecptr.p->directory, tgdiTmp);
   ptrCheckGuard(gdiPageptr, cpagesize, page8);
 }//Dbacc::getdirindex()
 
@@ -3318,15 +3267,12 @@ Uint32
 Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr) 
 {
   Uint32 errcode;
-  DirRangePtr geOverflowrangeptr;
-  DirectoryarrayPtr geOverflowDirptr;
   Uint32 tgeElementHeader;
   Uint32 tgeElemStep;
   Uint32 tgeContainerhead;
   Uint32 tgePageindex;
   Uint32 tgeActivePageDir;
   Uint32 tgeNextptrtype;
-  register Uint32 tgeKeyptr;
   register Uint32 tgeRemLen;
   register Uint32 TelemLen = fragrecptr.p->elementLength;
   register Uint32* Tkeydata = (Uint32*)&signal->theData[7];
@@ -3348,12 +3294,11 @@ Dbacc::getElement(Signal* signal, Operat
   const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
   const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF;
   do {
-    tgeContainerptr = (tgePageindex << ZSHIFT_PLUS) - (tgePageindex << ZSHIFT_MINUS);
+    tgeContainerptr = mul_ZBUF_SIZE(tgePageindex);
     if (tgeNextptrtype == ZLEFT) {
       jam();
       tgeContainerptr = tgeContainerptr + ZHEAD_SIZE;
       tgeElementptr = tgeContainerptr + ZCON_HEAD_SIZE;
-      tgeKeyptr = (tgeElementptr + ZELEM_HEAD_SIZE) + localkeylen;
       tgeElemStep = TelemLen;
       tgeForward = 1;
       if (unlikely(tgeContainerptr >= 2048)) 
@@ -3371,7 +3316,6 @@ Dbacc::getElement(Signal* signal, Operat
       jam();
       tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
       tgeElementptr = tgeContainerptr - 1;
-      tgeKeyptr = (tgeElementptr - ZELEM_HEAD_SIZE) - localkeylen;
       tgeElemStep = 0 - TelemLen;
       tgeForward = (Uint32)-1;
       if (unlikely(tgeContainerptr >= 2048)) 
@@ -3475,12 +3419,7 @@ Dbacc::getElement(Signal* signal, Operat
     if (((tgeContainerhead >> 9) & 1) == ZFALSE) {
       jam();
       tgeActivePageDir = gePageptr.p->word32[tgeContainerptr + 1];	/* NEXT PAGE ID */
-      geOverflowrangeptr.i = fragrecptr.p->overflowdir;
-      ptrCheckGuard(geOverflowrangeptr, cdirrangesize, dirRange);
-      arrGuard((tgeActivePageDir >> 8), 256);
-      geOverflowDirptr.i = geOverflowrangeptr.p->dirArray[tgeActivePageDir >> 8];
-      ptrCheckGuard(geOverflowDirptr, cdirarraysize, directoryarray);
-      gePageptr.i = geOverflowDirptr.p->pagep[tgeActivePageDir & 0xff];
+      gePageptr.i = getPagePtr(fragrecptr.p->overflowdir, tgeActivePageDir);
       ptrCheckGuard(gePageptr, cpagesize, page8);
     }//if
   } while (1);
@@ -3555,7 +3494,7 @@ void Dbacc::commitdelete(Signal* signal)
   lastPageptr.i = gdiPageptr.i;
   lastPageptr.p = gdiPageptr.p;
   tlastForward = ZTRUE;
-  tlastContainerptr = (tlastPageindex << ZSHIFT_PLUS) - (tlastPageindex << ZSHIFT_MINUS);
+  tlastContainerptr = mul_ZBUF_SIZE(tlastPageindex);
   tlastContainerptr = tlastContainerptr + ZHEAD_SIZE;
   arrGuard(tlastContainerptr, 2048);
   tlastContainerhead = lastPageptr.p->word32[tlastContainerptr];
@@ -3685,8 +3624,6 @@ void Dbacc::deleteElement(Signal* signal
 /* --------------------------------------------------------------------------------- */
 void Dbacc::getLastAndRemove(Signal* signal) 
 {
-  DirRangePtr glrOverflowrangeptr;
-  DirectoryarrayPtr glrOverflowDirptr;
   Uint32 tglrHead;
   Uint32 tglrTmp;
 
@@ -3701,15 +3638,10 @@ void Dbacc::getLastAndRemove(Signal* sig
       jam();
       arrGuard(tlastContainerptr + 1, 2048);
       tglrTmp = lastPageptr.p->word32[tlastContainerptr + 1];
-      glrOverflowrangeptr.i = fragrecptr.p->overflowdir;
-      ptrCheckGuard(glrOverflowrangeptr, cdirrangesize, dirRange);
-      arrGuard((tglrTmp >> 8), 256);
-      glrOverflowDirptr.i = glrOverflowrangeptr.p->dirArray[tglrTmp >> 8];
-      ptrCheckGuard(glrOverflowDirptr, cdirarraysize, directoryarray);
-      lastPageptr.i = glrOverflowDirptr.p->pagep[tglrTmp & 0xff];
+      lastPageptr.i = getPagePtr(fragrecptr.p->overflowdir, tglrTmp);
       ptrCheckGuard(lastPageptr, cpagesize, page8);
     }//if
-    tlastContainerptr = (tlastPageindex << ZSHIFT_PLUS) - (tlastPageindex << ZSHIFT_MINUS);
+    tlastContainerptr = mul_ZBUF_SIZE(tlastPageindex);
     if (((tlastContainerhead >> 7) & 3) == ZLEFT) {
       jam();
       tlastForward = ZTRUE;
@@ -3827,7 +3759,7 @@ void Dbacc::releaseLeftlist(Signal* sign
     trlPrevused = (trlHead >> 18) & 0x7f;
     if (trlNextused < ZEMPTYLIST) {
       jam();
-      tullTmp1 = (trlNextused << ZSHIFT_PLUS) - (trlNextused << ZSHIFT_MINUS);
+      tullTmp1 = mul_ZBUF_SIZE(trlNextused);
       tullTmp1 = tullTmp1 + ZHEAD_SIZE;
       tullTmp = rlPageptr.p->word32[tullTmp1] & 0xfe03ffff;
       dbgWord32(rlPageptr, tullTmp1, tullTmp | (trlPrevused << 18));
@@ -3838,7 +3770,7 @@ void Dbacc::releaseLeftlist(Signal* sign
     }//if
     if (trlPrevused < ZEMPTYLIST) {
       jam();
-      tullTmp1 = (trlPrevused << ZSHIFT_PLUS) - (trlPrevused << ZSHIFT_MINUS);
+      tullTmp1 = mul_ZBUF_SIZE(trlPrevused);
       tullTmp1 = tullTmp1 + ZHEAD_SIZE;
       tullTmp = rlPageptr.p->word32[tullTmp1] & 0xfffc07ff;
       dbgWord32(rlPageptr, tullTmp1, tullTmp | (trlNextused << 11));
@@ -3863,7 +3795,7 @@ void Dbacc::releaseLeftlist(Signal* sign
   rlPageptr.p->word32[tullIndex] = tullTmp1;
   if (tullTmp1 < ZEMPTYLIST) {
     jam();
-    tullTmp1 = (tullTmp1 << ZSHIFT_PLUS) - (tullTmp1 << ZSHIFT_MINUS);
+    tullTmp1 = mul_ZBUF_SIZE(tullTmp1);
     tullTmp1 = (tullTmp1 + ZHEAD_SIZE) + 1;
     dbgWord32(rlPageptr, tullTmp1, trlPageindex);
     rlPageptr.p->word32[tullTmp1] = trlPageindex;	/* UPDATES PREV POINTER IN THE NEXT FREE */
@@ -3920,7 +3852,7 @@ void Dbacc::releaseRightlist(Signal* sig
     trlPrevused = (trlHead >> 18) & 0x7f;
     if (trlNextused < ZEMPTYLIST) {
       jam();
-      turlTmp1 = (trlNextused << ZSHIFT_PLUS) - (trlNextused << ZSHIFT_MINUS);
+      turlTmp1 = mul_ZBUF_SIZE(trlNextused);
       turlTmp1 = turlTmp1 + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
       turlTmp = rlPageptr.p->word32[turlTmp1] & 0xfe03ffff;
       dbgWord32(rlPageptr, turlTmp1, turlTmp | (trlPrevused << 18));
@@ -3931,7 +3863,7 @@ void Dbacc::releaseRightlist(Signal* sig
     }//if
     if (trlPrevused < ZEMPTYLIST) {
       jam();
-      turlTmp1 = (trlPrevused << ZSHIFT_PLUS) - (trlPrevused << ZSHIFT_MINUS);
+      turlTmp1 = mul_ZBUF_SIZE(trlPrevused);
       turlTmp1 = turlTmp1 + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
       turlTmp = rlPageptr.p->word32[turlTmp1] & 0xfffc07ff;
       dbgWord32(rlPageptr, turlTmp1, turlTmp | (trlNextused << 11));
@@ -3957,7 +3889,7 @@ void Dbacc::releaseRightlist(Signal* sig
   rlPageptr.p->word32[turlIndex] = turlTmp1;
   if (turlTmp1 < ZEMPTYLIST) {
     jam();
-    turlTmp = (turlTmp1 << ZSHIFT_PLUS) - (turlTmp1 << ZSHIFT_MINUS);
+    turlTmp = mul_ZBUF_SIZE(turlTmp1);
     turlTmp = turlTmp + ((ZHEAD_SIZE + ZBUF_SIZE) - (ZCON_HEAD_SIZE - 1));
     dbgWord32(rlPageptr, turlTmp, trlPageindex);
     rlPageptr.p->word32[turlTmp] = trlPageindex;	/* UPDATES PREV POINTER IN THE NEXT FREE */
@@ -5084,12 +5016,8 @@ void Dbacc::insertLockOwnersList(Signal*
 /* --------------------------------------------------------------------------------- */
 void Dbacc::allocOverflowPage(Signal* signal) 
 {
-  DirRangePtr aopDirRangePtr;
-  DirectoryarrayPtr aopOverflowDirptr;
   OverflowRecordPtr aopOverflowRecPtr;
   Uint32 taopTmp1;
-  Uint32 taopTmp2;
-  Uint32 taopTmp3;
 
   tresult = 0;
   if (cfirstfreepage == RNIL)
@@ -5112,11 +5040,6 @@ void Dbacc::allocOverflowPage(Signal* si
     jam();
     tresult = ZOVER_REC_ERROR;
     return;
-  } else if ((cfirstfreedir == RNIL) &&
-             (cdirarraysize <= cdirmemory)) {
-    jam();
-    tresult = ZDIRSIZE_ERROR;
-    return;
   } else {
     jam();
     seizeOverRec(signal);
@@ -5128,22 +5051,14 @@ void Dbacc::allocOverflowPage(Signal* si
   fragrecptr.p->firstOverflowRec = aopOverflowRecPtr.i;
   fragrecptr.p->lastOverflowRec = aopOverflowRecPtr.i;
   taopTmp1 = aopOverflowRecPtr.p->dirindex;
-  aopDirRangePtr.i = fragrecptr.p->overflowdir;
-  taopTmp2 = taopTmp1 >> 8;
-  taopTmp3 = taopTmp1 & 0xff;
-  ptrCheckGuard(aopDirRangePtr, cdirrangesize, dirRange);
-  arrGuard(taopTmp2, 256);
-  if (aopDirRangePtr.p->dirArray[taopTmp2] == RNIL) {
-    jam();
-    seizeDirectory(signal);
-    ndbrequire(tresult <= ZLIMIT_OF_ERROR);
-    aopDirRangePtr.p->dirArray[taopTmp2] = sdDirptr.i;
-  }//if
-  aopOverflowDirptr.i = aopDirRangePtr.p->dirArray[taopTmp2];
   seizePage(signal);
   ndbrequire(tresult <= ZLIMIT_OF_ERROR);
-  ptrCheckGuard(aopOverflowDirptr, cdirarraysize, directoryarray);
-  aopOverflowDirptr.p->pagep[taopTmp3] = spPageptr.i;
+  if (!setPagePtr(fragrecptr.p->overflowdir, taopTmp1, spPageptr.i))
+  {
+    jam();
+    tresult = ZOVER_REC_ERROR;
+  }
+  ndbrequire(tresult <= ZLIMIT_OF_ERROR);
   tiopPageId = aopOverflowRecPtr.p->dirindex;
   iopOverflowRecPtr = aopOverflowRecPtr;
   iopPageptr = spPageptr;
@@ -5189,8 +5104,6 @@ Uint32 Dbacc::checkScanExpand(Signal* si
   Uint32 TreleaseInd = 0;
   Uint32 TreleaseScanBucket;
   Uint32 TreleaseScanIndicator[MAX_PARALLEL_SCANS_PER_FRAG];
-  DirectoryarrayPtr TDirptr;
-  DirRangePtr TDirRangePtr;
   Page8Ptr TPageptr;
   ScanRecPtr TscanPtr;
 
@@ -5248,14 +5161,9 @@ Uint32 Dbacc::checkScanExpand(Signal* si
   }//for
   if (TreleaseInd == 1) {
     TreleaseScanBucket = TSplit;
-    TDirRangePtr.i = fragrecptr.p->directory;
     TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1);	/* PAGE INDEX OBS K = 6 */
     TDirInd = TreleaseScanBucket >> fragrecptr.p->k;	/* DIRECTORY INDEX OBS K = 6 */
-    ptrCheckGuard(TDirRangePtr, cdirrangesize, dirRange);
-    arrGuard((TDirInd >> 8), 256);
-    TDirptr.i = TDirRangePtr.p->dirArray[TDirInd >> 8];
-    ptrCheckGuard(TDirptr, cdirarraysize, directoryarray);
-    TPageptr.i = TDirptr.p->pagep[TDirInd & 0xff];
+    TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd);
     ptrCheckGuard(TPageptr, cpagesize, page8);
     for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
       if (TreleaseScanIndicator[Ti] == 1) {
@@ -5281,8 +5189,6 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
     return;
   }
 
-  DirectoryarrayPtr newDirptr;
-
   fragrecptr.i = signal->theData[0];
   tresult = 0;	/* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
   Uint32 tmp = 1;
@@ -5340,62 +5246,35 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
   /*       THE NEXT HASH BIT. THIS BIT IS USED IN THE SPLIT MECHANISM TO      */
   /*       DECIDE WHICH ELEMENT GOES WHERE.                                   */
   /*--------------------------------------------------------------------------*/
-  expDirRangePtr.i = fragrecptr.p->directory;
   texpReceivedBucket = (fragrecptr.p->maxp + fragrecptr.p->p) + 1;	/* RECEIVED BUCKET */
   texpDirInd = texpReceivedBucket >> fragrecptr.p->k;
-  newDirptr.i = RNIL;
-  ptrNull(newDirptr);
-  texpDirRangeIndex = texpDirInd >> 8;
-  ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
-  Uint32 max_dir_range_size = (256 * (100 - m_free_pct)) / 100;
-  if (ERROR_INSERTED(3002)) {
-      debug_lh_vars("EXP");
-      max_dir_range_size = 2;
+  if ((texpReceivedBucket & ((1 << fragrecptr.p->k) - 1)) == 0)
+  { // Need new bucket
+    expPageptr.i = RNIL;
   }
-  if (texpDirRangeIndex >= max_dir_range_size) {
-    jam();
-    ndbrequire(texpDirRangeIndex == max_dir_range_size);
-    if (fragrecptr.p->dirRangeFull == ZFALSE) {
-      jam();
-      fragrecptr.p->dirRangeFull = ZTRUE;
-    }
-    return;
+  else
+  {
+    expPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
+#ifdef VM_TRACE
+    require(expPageptr.i != RNIL);
+#endif
   }
-  expDirptr.i = expDirRangePtr.p->dirArray[texpDirRangeIndex];
-  if (expDirptr.i == RNIL) {
-    jam();
-    seizeDirectory(signal);
-    if (tresult > ZLIMIT_OF_ERROR) {
-      jam();
-      return;
-    } else {
-      jam();
-      newDirptr = sdDirptr;
-      expDirptr = sdDirptr;
-      expDirRangePtr.p->dirArray[texpDirRangeIndex] = sdDirptr.i;
-    }//if
-  } else {
-    ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
-  }//if
-  texpDirPageIndex = texpDirInd & 0xff;
-  expPageptr.i = expDirptr.p->pagep[texpDirPageIndex];
   if (expPageptr.i == RNIL) {
     jam();
     seizePage(signal);
     if (tresult > ZLIMIT_OF_ERROR) {
       jam();
-      if (newDirptr.i != RNIL) {
-        jam();
-        rdDirptr.i = newDirptr.i;
-        releaseDirectory(signal);
-      }//if
       return;
     }//if
-    expDirptr.p->pagep[texpDirPageIndex] = spPageptr.i;
+    if (!setPagePtr(fragrecptr.p->directory, texpDirInd, spPageptr.i))
+    {
+      jam();
+      tresult = ZDIR_RANGE_FULL_ERROR;
+      return;
+    }
     tipPageId = texpDirInd;
     inpPageptr = spPageptr;
     initPage(signal);
-    fragrecptr.p->dirsize++;
     expPageptr = spPageptr;
   } else {
     ptrCheckGuard(expPageptr, cpagesize, page8);
@@ -5407,14 +5286,12 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
   /*       THE NEXT ACTION IS TO FIND THE PAGE, THE PAGE INDEX AND THE PAGE   */
   /*       DIRECTORY OF THE BUCKET TO BE SPLIT.                               */
   /*--------------------------------------------------------------------------*/
-  expDirRangePtr.i = fragrecptr.p->directory;
   cexcPageindex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1);	/* PAGE INDEX OBS K = 6 */
   texpDirInd = fragrecptr.p->p >> fragrecptr.p->k;	/* DIRECTORY INDEX OBS K = 6 */
-  ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
-  arrGuard((texpDirInd >> 8), 256);
-  expDirptr.i = expDirRangePtr.p->dirArray[texpDirInd >> 8];
-  ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
-  excPageptr.i = expDirptr.p->pagep[texpDirInd & 0xff];
+  excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
+#ifdef VM_TRACE
+  require(excPageptr.i != RNIL);
+#endif
   fragrecptr.p->expSenderIndex = cexcPageindex;
   fragrecptr.p->expSenderPageptr = excPageptr.i;
   if (excPageptr.i == RNIL) {
@@ -5437,7 +5314,6 @@ void Dbacc::endofexpLab(Signal* signal)
   if (fragrecptr.p->p > fragrecptr.p->maxp) {
     jam();
     fragrecptr.p->maxp = (fragrecptr.p->maxp << 1) | 1;
-    fragrecptr.p->lhdirbits++;
     fragrecptr.p->hashcheckbit++;
     fragrecptr.p->p = 0;
   }//if
@@ -5525,7 +5401,7 @@ void Dbacc::expandcontainer(Signal* sign
   cexcPrevconptr = 0;
   cexcForward = ZTRUE;
  EXP_CONTAINER_LOOP:
-  cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
+  cexcContainerptr = mul_ZBUF_SIZE(cexcPageindex);
   if (cexcForward == ZTRUE) {
     jam();
     cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
@@ -5727,8 +5603,6 @@ Uint32 Dbacc::checkScanShrink(Signal* si
   Uint32 TreleaseScanBucket;
   Uint32 TreleaseInd = 0;
   Uint32 TreleaseScanIndicator[MAX_PARALLEL_SCANS_PER_FRAG];
-  DirectoryarrayPtr TDirptr;
-  DirRangePtr TDirRangePtr;
   Page8Ptr TPageptr;
   ScanRecPtr TscanPtr;
 
@@ -5793,14 +5667,9 @@ Uint32 Dbacc::checkScanShrink(Signal* si
   if (TreleaseInd == 1) {
     jam();
     TreleaseScanBucket = TmergeSource;
-    TDirRangePtr.i = fragrecptr.p->directory;
     TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1);	/* PAGE INDEX OBS K = 6 */
     TDirInd = TreleaseScanBucket >> fragrecptr.p->k;	/* DIRECTORY INDEX OBS K = 6 */
-    ptrCheckGuard(TDirRangePtr, cdirrangesize, dirRange);
-    arrGuard((TDirInd >> 8), 256);
-    TDirptr.i = TDirRangePtr.p->dirArray[TDirInd >> 8];
-    ptrCheckGuard(TDirptr, cdirarraysize, directoryarray);
-    TPageptr.i = TDirptr.p->pagep[TDirInd & 0xff];
+    TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd);
     ptrCheckGuard(TPageptr, cpagesize, page8);
     for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
       if (TreleaseScanIndicator[Ti] == 1) {
@@ -5890,7 +5759,6 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
     jam();
     fragrecptr.p->maxp = fragrecptr.p->maxp >> 1;
     fragrecptr.p->p = fragrecptr.p->maxp;
-    fragrecptr.p->lhdirbits--;
     fragrecptr.p->hashcheckbit--;
   } else {
     jam();
@@ -5908,17 +5776,9 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
   /*       WE START BY FINDING THE NECESSARY INFORMATION OF THE BUCKET TO BE  */
   /*       REMOVED WHICH WILL SEND ITS ELEMENTS TO THE RECEIVING BUCKET.      */
   /*--------------------------------------------------------------------------*/
-  expDirRangePtr.i = fragrecptr.p->directory;
   cexcPageindex = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) & ((1 << fragrecptr.p->k) - 1);
   texpDirInd = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) >> fragrecptr.p->k;
-  texpDirRangeIndex = texpDirInd >> 8;
-  texpDirPageIndex = texpDirInd & 0xff;
-  ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
-  arrGuard(texpDirRangeIndex, 256);
-  expDirptr.i = expDirRangePtr.p->dirArray[texpDirRangeIndex];
-  ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
-  excPageptr.i = expDirptr.p->pagep[texpDirPageIndex];
-  fragrecptr.p->expSenderDirptr = expDirptr.i;
+  excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
   fragrecptr.p->expSenderIndex = cexcPageindex;
   fragrecptr.p->expSenderPageptr = excPageptr.i;
   fragrecptr.p->expSenderDirIndex = texpDirInd;
@@ -5926,13 +5786,8 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
   /*       WE NOW PROCEED BY FINDING THE NECESSARY INFORMATION ABOUT THE      */
   /*       RECEIVING BUCKET.                                                  */
   /*--------------------------------------------------------------------------*/
-  expDirRangePtr.i = fragrecptr.p->directory;
   texpReceivedBucket = fragrecptr.p->p >> fragrecptr.p->k;
-  ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
-  arrGuard((texpReceivedBucket >> 8), 256);
-  expDirptr.i = expDirRangePtr.p->dirArray[texpReceivedBucket >> 8];
-  ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
-  fragrecptr.p->expReceivePageptr = expDirptr.p->pagep[texpReceivedBucket & 0xff];
+  fragrecptr.p->expReceivePageptr = getPagePtr(fragrecptr.p->directory, texpReceivedBucket);
   fragrecptr.p->expReceiveIndex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1);
   fragrecptr.p->expReceiveForward = ZTRUE;
   if (excPageptr.i == RNIL) {
@@ -5945,7 +5800,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
   /*--------------------------------------------------------------------------*/
   ptrCheckGuard(excPageptr, cpagesize, page8);
   cexcForward = ZTRUE;
-  cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
+  cexcContainerptr = mul_ZBUF_SIZE(cexcPageindex);
   cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
   arrGuard(cexcContainerptr, 2048);
   cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
@@ -5979,7 +5834,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
   }//if
   nextcontainerinfoExp(signal);
   do {
-    cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
+    cexcContainerptr = mul_ZBUF_SIZE(cexcPageindex);
     if (cexcForward == ZTRUE) {
       jam();
       cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
@@ -6043,24 +5898,29 @@ void Dbacc::endofshrinkbucketLab(Signal*
   fragrecptr.p->slack -= fragrecptr.p->maxloadfactor;
   if (fragrecptr.p->expSenderIndex == 0) {
     jam();
-    fragrecptr.p->dirsize--;
     if (fragrecptr.p->expSenderPageptr != RNIL) {
       jam();
       rpPageptr.i = fragrecptr.p->expSenderPageptr;
       ptrCheckGuard(rpPageptr, cpagesize, page8);
       releasePage(signal);
-      expDirptr.i = fragrecptr.p->expSenderDirptr;
-      ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
-      expDirptr.p->pagep[fragrecptr.p->expSenderDirIndex & 0xff] = RNIL;
+      unsetPagePtr(fragrecptr.p->directory, fragrecptr.p->expSenderDirIndex);
     }//if
     if (((((fragrecptr.p->p + fragrecptr.p->maxp) + 1) >> fragrecptr.p->k) & 0xff) == 0) {
       jam();
-      rdDirptr.i = fragrecptr.p->expSenderDirptr;
-      releaseDirectory(signal);
-      expDirRangePtr.i = fragrecptr.p->directory;
-      ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
-      arrGuard((fragrecptr.p->expSenderDirIndex >> 8), 256);
-      expDirRangePtr.p->dirArray[fragrecptr.p->expSenderDirIndex >> 8] = RNIL;
+      DynArr256 dir(directoryPool, fragrecptr.p->directory);
+      DynArr256::ReleaseIterator iter;
+      Uint32 relcode;
+#ifdef VM_TRACE
+      Uint32 count = 0;
+#endif
+      dir.init(iter);
+      while ((relcode = dir.trim(fragrecptr.p->expSenderDirIndex, iter)) != 0)
+      {
+#ifdef VM_TRACE
+        count++;
+        ndbrequire(count <= 256);
+#endif
+      }
     }//if
   }//if
   if (fragrecptr.p->slack < (1u << 31)) {
@@ -6215,12 +6075,7 @@ void Dbacc::nextcontainerinfoExp(Signal*
     /* NEXT CONTAINER IS IN AN OVERFLOW PAGE */
     arrGuard(cexcContainerptr + 1, 2048);
     tnciTmp = excPageptr.p->word32[cexcContainerptr + 1];
-    nciOverflowrangeptr.i = fragrecptr.p->overflowdir;
-    ptrCheckGuard(nciOverflowrangeptr, cdirrangesize, dirRange);
-    arrGuard((tnciTmp >> 8), 256);
-    nciOverflowDirptr.i = nciOverflowrangeptr.p->dirArray[tnciTmp >> 8];
-    ptrCheckGuard(nciOverflowDirptr, cdirarraysize, directoryarray);
-    excPageptr.i = nciOverflowDirptr.p->pagep[tnciTmp & 0xff];
+    excPageptr.i = getPagePtr(fragrecptr.p->overflowdir, tnciTmp);
     ptrCheckGuard(excPageptr, cpagesize, page8);
   }//if
 }//Dbacc::nextcontainerinfoExp()
@@ -6257,12 +6112,10 @@ void Dbacc::initFragAdd(Signal* signal,
   regFragPtr.p->maxloadfactor = maxLoadFactor;
   regFragPtr.p->slack = (regFragPtr.p->maxp + 1) * maxLoadFactor;
   regFragPtr.p->lhfragbits = lhFragBits;
-  regFragPtr.p->lhdirbits = 0;
   regFragPtr.p->hashcheckbit = 0; //lhFragBits;
   regFragPtr.p->localkeylen = req->localKeyLen;
   regFragPtr.p->nodetype = (req->reqInfo >> 4) & 0x3;
   regFragPtr.p->lastOverIndex = 0;
-  regFragPtr.p->dirsize = 1;
   regFragPtr.p->keyLength = req->keyLength;
   ndbrequire(req->keyLength != 0);
   regFragPtr.p->elementLength = ZELEM_HEAD_SIZE + regFragPtr.p->localkeylen;
@@ -6284,8 +6137,9 @@ void Dbacc::initFragAdd(Signal* signal,
 
 void Dbacc::initFragGeneral(FragmentrecPtr regFragPtr)
 {
-  regFragPtr.p->directory = RNIL;
-  regFragPtr.p->overflowdir = RNIL;
+  new (&regFragPtr.p->directory) DynArr256::Head();
+  new (&regFragPtr.p->overflowdir) DynArr256::Head();
+
   regFragPtr.p->firstOverflowRec = RNIL;
   regFragPtr.p->lastOverflowRec = RNIL;
   regFragPtr.p->lockOwnersList = RNIL;
@@ -6299,29 +6153,6 @@ void Dbacc::initFragGeneral(FragmentrecP
 }//Dbacc::initFragGeneral()
 
 
-void
-Dbacc::releaseLogicalPage(Fragmentrec * fragP, Uint32 logicalPageId){
-  Ptr<struct DirRange> dirRangePtr;
-  dirRangePtr.i = fragP->directory;
-  ptrCheckGuard(dirRangePtr, cdirrangesize, dirRange);
-
-  const Uint32 lp1 = logicalPageId >> 8;
-  const Uint32 lp2 = logicalPageId & 0xFF;
-  ndbrequire(lp1 < 256);
-
-  Ptr<struct Directoryarray> dirArrPtr;
-  dirArrPtr.i = dirRangePtr.p->dirArray[lp1];
-  ptrCheckGuard(dirArrPtr, cdirarraysize, directoryarray);
-
-  const Uint32 physicalPageId = dirArrPtr.p->pagep[lp2];
-  
-  rpPageptr.i = physicalPageId;
-  ptrCheckGuard(rpPageptr, cpagesize, page8);
-  releasePage(0);
-
-  dirArrPtr.p->pagep[lp2] = RNIL;
-}
-
 void Dbacc::execACC_SCANREQ(Signal* signal) 
 {
   jamEntry();
@@ -6464,9 +6295,6 @@ void Dbacc::execNEXT_SCANREQ(Signal* sig
 
 void Dbacc::checkNextBucketLab(Signal* signal) 
 {
-  DirRangePtr cscDirRangePtr;
-  DirectoryarrayPtr cscDirptr;
-  DirectoryarrayPtr tnsDirptr;
   Page8Ptr nsPageptr;
   Page8Ptr cscPageidptr;
   Page8Ptr gnsPageidptr;
@@ -6476,17 +6304,10 @@ void Dbacc::checkNextBucketLab(Signal* s
   Uint32 tnsIsLocked;
   Uint32 tnsTmp1;
   Uint32 tnsTmp2;
-  Uint32 tnsCopyIndex1;
-  Uint32 tnsCopyIndex2;
   Uint32 tnsCopyDir;
 
   tnsCopyDir = scanPtr.p->nextBucketIndex >> fragrecptr.p->k;
-  tnsCopyIndex1 = tnsCopyDir >> 8;
-  tnsCopyIndex2 = tnsCopyDir & 0xff;
-  arrGuard(tnsCopyIndex1, 256);
-  tnsDirptr.i = gnsDirRangePtr.p->dirArray[tnsCopyIndex1];
-  ptrCheckGuard(tnsDirptr, cdirarraysize, directoryarray);
-  tnsPageidptr.i = tnsDirptr.p->pagep[tnsCopyIndex2];
+  tnsPageidptr.i = getPagePtr(fragrecptr.p->directory, tnsCopyDir);
   ptrCheckGuard(tnsPageidptr, cpagesize, page8);
   gnsPageidptr.i = tnsPageidptr.i;
   gnsPageidptr.p = tnsPageidptr.p;
@@ -6561,15 +6382,8 @@ void Dbacc::checkNextBucketLab(Signal* s
         rsbPageidptr.p = gnsPageidptr.p;
       } else {
         jam();
-        cscDirRangePtr.i = fragrecptr.p->directory;
         tmpP = scanPtr.p->nextBucketIndex >> fragrecptr.p->k;
-        tmpP2 = tmpP >> 8;
-        tmpP = tmpP & 0xff;
-        ptrCheckGuard(cscDirRangePtr, cdirrangesize, dirRange);
-        arrGuard(tmpP2, 256);
-        cscDirptr.i = cscDirRangePtr.p->dirArray[tmpP2];
-        ptrCheckGuard(cscDirptr, cdirarraysize, directoryarray);
-        cscPageidptr.i = cscDirptr.p->pagep[tmpP];
+        cscPageidptr.i = getPagePtr(fragrecptr.p->directory, tmpP);
         ptrCheckGuard(cscPageidptr, cpagesize, page8);
         tmp1 = (1 << fragrecptr.p->k) - 1;
         trsbPageindex = scanPtr.p->nextBucketIndex & tmp1;
@@ -6696,8 +6510,6 @@ void Dbacc::checkNextFragmentLab(Signal*
 
 void Dbacc::initScanFragmentPart(Signal* signal)
 {
-  DirRangePtr cnfDirRangePtr;
-  DirectoryarrayPtr cnfDirptr;
   Page8Ptr cnfPageidptr;
   /* ----------------------------------------------------------------------- */
   // Set the active fragment part.
@@ -6714,11 +6526,7 @@ void Dbacc::initScanFragmentPart(Signal*
   scanPtr.p->startNoOfBuckets = fragrecptr.p->p + fragrecptr.p->maxp;
   scanPtr.p->minBucketIndexToRescan = 0xFFFFFFFF;
   scanPtr.p->maxBucketIndexToRescan = 0;
-  cnfDirRangePtr.i = fragrecptr.p->directory;
-  ptrCheckGuard(cnfDirRangePtr, cdirrangesize, dirRange);
-  cnfDirptr.i = cnfDirRangePtr.p->dirArray[0];
-  ptrCheckGuard(cnfDirptr, cdirarraysize, directoryarray);
-  cnfPageidptr.i = cnfDirptr.p->pagep[0];
+  cnfPageidptr.i = getPagePtr(fragrecptr.p->directory, 0);
   ptrCheckGuard(cnfPageidptr, cpagesize, page8);
   trsbPageindex = scanPtr.p->nextBucketIndex & ((1 << fragrecptr.p->k) - 1);
   rsbPageidptr.i = cnfPageidptr.i;
@@ -6939,8 +6747,6 @@ void Dbacc::execACC_CHECK_SCAN(Signal* s
 
   fragrecptr.i = scanPtr.p->activeLocalFrag;
   ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
-  gnsDirRangePtr.i = fragrecptr.p->directory;
-  ptrCheckGuard(gnsDirRangePtr, cdirrangesize, dirRange);
   checkNextBucketLab(signal);
   return;
 }//Dbacc::execACC_CHECK_SCAN()
@@ -6986,7 +6792,7 @@ void Dbacc::execACC_TO_REQ(Signal* signa
 /* --------------------------------------------------------------------------------- */
 void Dbacc::containerinfo(Signal* signal) 
 {
-  tciContainerptr = (tciPageindex << ZSHIFT_PLUS) - (tciPageindex << ZSHIFT_MINUS);
+  tciContainerptr = mul_ZBUF_SIZE(tciPageindex);
   if (tciIsforward == ZTRUE) {
     jam();
     tciContainerptr = tciContainerptr + ZHEAD_SIZE;
@@ -7125,12 +6931,7 @@ void Dbacc::nextcontainerinfo(Signal* si
     /* NEXT CONTAINER IS IN AN OVERFLOW PAGE */
     arrGuard(tnciContainerptr + 1, 2048);
     tnciTmp = nciPageidptr.p->word32[tnciContainerptr + 1];
-    nciOverflowrangeptr.i = fragrecptr.p->overflowdir;
-    ptrCheckGuard(nciOverflowrangeptr, cdirrangesize, dirRange);
-    arrGuard((tnciTmp >> 8), 256);
-    nciOverflowDirptr.i = nciOverflowrangeptr.p->dirArray[tnciTmp >> 8];
-    ptrCheckGuard(nciOverflowDirptr, cdirarraysize, directoryarray);
-    nciPageidptr.i = nciOverflowDirptr.p->pagep[tnciTmp & 0xff];
+    nciPageidptr.i = getPagePtr(fragrecptr.p->overflowdir, tnciTmp);
     ptrCheckGuard(nciPageidptr, cpagesize, page8);
   }//if
 }//Dbacc::nextcontainerinfo()
@@ -7873,26 +7674,6 @@ void Dbacc::putRecInFreeOverdir(Signal*
 }//Dbacc::putRecInFreeOverdir()
 
 /* --------------------------------------------------------------------------------- */
-/* RELEASE_DIRECTORY                                                                 */
-/* --------------------------------------- ----------------------------------------- */
-void Dbacc::releaseDirectory(Signal* signal) 
-{
-  ptrCheckGuard(rdDirptr, cdirarraysize, directoryarray);
-  rdDirptr.p->pagep[0] = cfirstfreedir;
-  cfirstfreedir = rdDirptr.i;
-}//Dbacc::releaseDirectory()
-
-/* --------------------------------------------------------------------------------- */
-/* RELEASE_DIRRANGE                                                                  */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::releaseDirrange(Signal* signal) 
-{
-  ptrCheckGuard(rdDirRangePtr, cdirrangesize, dirRange);
-  rdDirRangePtr.p->dirArray[0] = cfirstfreeDirrange;
-  cfirstfreeDirrange = rdDirRangePtr.i;
-}//Dbacc::releaseDirrange()
-
-/* --------------------------------------------------------------------------------- */
 /* RELEASE OP RECORD                                                                 */
 /*         PUT A FREE OPERATION IN A FREE LIST OF THE OPERATIONS                     */
 /* --------------------------------------------------------------------------------- */
@@ -7940,13 +7721,9 @@ void Dbacc::releaseOverflowRec(Signal* s
 /* --------------------------------------------------------------------------------- */
 void Dbacc::releaseOverpage(Signal* signal) 
 {
-  DirRangePtr ropOverflowrangeptr;
-  DirectoryarrayPtr ropOverflowDirptr;
   OverflowRecordPtr ropOverflowRecPtr;
   OverflowRecordPtr tuodOverflowRecPtr;
   Uint32 tropTmp;
-  Uint32 tropTmp1;
-  Uint32 tropTmp2;
 
   ropOverflowRecPtr.i = ropPageptr.p->word32[ZPOS_OVERFLOWREC];
   ndbrequire(ropOverflowRecPtr.i != RNIL);
@@ -7959,28 +7736,6 @@ void Dbacc::releaseOverpage(Signal* sign
     jam();
     return;	/* THERE IS ONLY ONE OVERFLOW PAGE */
   }//if
-#if kalle
-  logicalPage = 0;
-
-  i = fragrecptr.p->directory;
-  p = dirRange.getPtr(i);
-
-  i1 = logicalPage >> 8;
-  i2 = logicalPage & 0xFF;
-
-  ndbrequire(i1 < 256);
-  
-  i = p->dirArray[i1];
-  p = directoryarray.getPtr(i);
-
-  physicPageId = p->pagep[i2];
-  physicPageP = page8.getPtr(physicPageId);
-  
-  p->pagep[i2] = RNIL;
-  rpPageptr = { physicPageId, physicPageP };
-  releasePage(signal);
-  
-#endif
 
   /* ----------------------------------------------------------------------- */
   /* IT WAS OK TO RELEASE THE PAGE.                                          */
@@ -7992,14 +7747,7 @@ void Dbacc::releaseOverpage(Signal* sign
   priOverflowRecPtr = ropOverflowRecPtr;
   putRecInFreeOverdir(signal);
   tropTmp = ropPageptr.p->word32[ZPOS_PAGE_ID];
-  ropOverflowrangeptr.i = fragrecptr.p->overflowdir;
-  tropTmp1 = tropTmp >> 8;
-  tropTmp2 = tropTmp & 0xff;
-  ptrCheckGuard(ropOverflowrangeptr, cdirrangesize, dirRange);
-  arrGuard(tropTmp1, 256);
-  ropOverflowDirptr.i = ropOverflowrangeptr.p->dirArray[tropTmp1];
-  ptrCheckGuard(ropOverflowDirptr, cdirarraysize, directoryarray);
-  ropOverflowDirptr.p->pagep[tropTmp2] = RNIL;
+  unsetPagePtr(fragrecptr.p->overflowdir, tropTmp);
   rpPageptr = ropPageptr;
   releasePage(signal);
   if (ropOverflowRecPtr.p->dirindex != (fragrecptr.p->lastOverIndex - 1)) {
@@ -8010,23 +7758,12 @@ void Dbacc::releaseOverpage(Signal* sign
   /* THE LAST PAGE IN THE DIRECTORY WAS RELEASED IT IS NOW NECESSARY 
    * TO REMOVE ALL RELEASED OVERFLOW DIRECTORIES AT THE END OF THE LIST.   
    * ---------------------------------------------------------------------- */
-  do {
-    fragrecptr.p->lastOverIndex--;
-    if (tropTmp2 == 0) {
-      jam();
-      ndbrequire(tropTmp1 != 0);
-      ropOverflowrangeptr.p->dirArray[tropTmp1] = RNIL;
-      rdDirptr.i = ropOverflowDirptr.i;
-      releaseDirectory(signal);
-      tropTmp1--;
-      tropTmp2 = 255;
-    } else {
-      jam();
-      tropTmp2--;
-    }//if
-    ropOverflowDirptr.i = ropOverflowrangeptr.p->dirArray[tropTmp1];
-    ptrCheckGuard(ropOverflowDirptr, cdirarraysize, directoryarray);
-  } while (ropOverflowDirptr.p->pagep[tropTmp2] == RNIL);
+  DynArr256 dir(directoryPool, fragrecptr.p->overflowdir);
+  DynArr256::ReleaseIterator iter;
+  dir.init(iter);
+  Uint32 rc;
+  while ((rc = dir.trim(0, iter))!=0);
+  fragrecptr.p->lastOverIndex = iter.m_sz ? iter.m_pos + 1 : 0;
   /* ----------------------------------------------------------------------- */
   /* RELEASE ANY OVERFLOW RECORDS THAT ARE PART OF THE FREE INDEX LIST WHICH */
   /* DIRECTORY INDEX NOW HAS BEEN RELEASED.                                  */
@@ -8086,56 +7823,6 @@ void Dbacc::releasePage(Signal* signal)
 }//Dbacc::releasePage()
 
 /* --------------------------------------------------------------------------------- */
-/* SEIZE_DIRECTORY                                                                   */
-/*          DESCRIPTION: A DIRECTORY BLOCK (ZDIRBLOCKSIZE NUMBERS OF DIRECTORY       */
-/*               RECORDS WILL BE ALLOCATED AND RETURNED.                             */
-/*               SIZE OF DIRECTORY ERROR_CODE, WILL BE RETURNED IF THERE IS NO ANY   */
-/*               FREE BLOCK                                                          */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::seizeDirectory(Signal* signal) 
-{
-  Uint32 tsdyIndex;
-
-  if (cfirstfreedir == RNIL) {
-    jam();
-    if (cdirarraysize <= cdirmemory) {
-      jam();
-      tresult = ZDIRSIZE_ERROR;
-      return;
-    } else {
-      jam();
-      sdDirptr.i = cdirmemory;
-      ptrCheckGuard(sdDirptr, cdirarraysize, directoryarray);
-      cdirmemory = cdirmemory + 1;
-    }//if
-  } else {
-    jam();
-    sdDirptr.i = cfirstfreedir;
-    ptrCheckGuard(sdDirptr, cdirarraysize, directoryarray);
-    cfirstfreedir = sdDirptr.p->pagep[0];
-    sdDirptr.p->pagep[0] = RNIL;
-  }//if
-  for (tsdyIndex = 0; tsdyIndex <= 255; tsdyIndex++) {
-    sdDirptr.p->pagep[tsdyIndex] = RNIL;
-  }//for
-}//Dbacc::seizeDirectory()
-
-/* --------------------------------------------------------------------------------- */
-/* SEIZE_DIRRANGE                                                                    */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::seizeDirrange(Signal* signal) 
-{
-  Uint32 tsdeIndex;
-
-  newDirRangePtr.i = cfirstfreeDirrange;
-  ptrCheckGuard(newDirRangePtr, cdirrangesize, dirRange);
-  cfirstfreeDirrange = newDirRangePtr.p->dirArray[0];
-  for (tsdeIndex = 0; tsdeIndex <= 255; tsdeIndex++) {
-    newDirRangePtr.p->dirArray[tsdeIndex] = RNIL;
-  }//for
-}//Dbacc::seizeDirrange()
-
-/* --------------------------------------------------------------------------------- */
 /* SEIZE    FRAGREC                                                                  */
 /* --------------------------------------------------------------------------------- */
 void Dbacc::seizeFragrec(Signal* signal) 

=== modified file 'storage/ndb/src/kernel/blocks/record_types.hpp'
--- a/storage/ndb/src/kernel/blocks/record_types.hpp	2011-10-07 13:15:08 +0000
+++ b/storage/ndb/src/kernel/blocks/record_types.hpp	2012-03-13 11:57:27 +0000
@@ -94,6 +94,7 @@
 
 #define RT_DBTUP_PAGE              MAKE_TID( 1, RG_DATAMEM)
 #define RT_DBTUP_PAGE_MAP          MAKE_TID( 2, RG_DATAMEM)
+#define RT_DBACC_DIRECTORY         MAKE_TID( 3, RG_DATAMEM)
 
 #define RT_JOB_BUFFER              MAKE_TID( 1, RG_JOBBUFFER)
 

=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp	2012-01-30 14:28:55 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp	2012-03-13 11:57:27 +0000
@@ -840,13 +840,6 @@ Configuration::calcSizeAlt(ConfigValues
      * Acc Size Alt values
      */
     // Can keep 65536 pages (= 0.5 GByte)
-    cfg.put(CFG_ACC_DIR_RANGE, 
-	    2 * NO_OF_FRAG_PER_NODE * noOfAccTables* noOfReplicas); 
-    
-    cfg.put(CFG_ACC_DIR_ARRAY,
-	    (noOfIndexPages >> 8) + 
-	    2 * NO_OF_FRAG_PER_NODE * noOfAccTables* noOfReplicas);
-    
     cfg.put(CFG_ACC_FRAGMENT,
 	    NO_OF_FRAG_PER_NODE * noOfAccTables* noOfReplicas);
     

=== modified file 'storage/ndb/src/kernel/vm/DynArr256.cpp'
--- a/storage/ndb/src/kernel/vm/DynArr256.cpp	2012-02-07 15:43:54 +0000
+++ b/storage/ndb/src/kernel/vm/DynArr256.cpp	2012-03-13 11:55:23 +0000
@@ -33,6 +33,7 @@ struct DA256Free
 {
   Uint32 m_magic;
   Uint32 m_next_free;
+  Uint32 m_prev_free;
 };
 
 struct DA256Node
@@ -46,8 +47,43 @@ struct DA256Page
   struct DA256Node m_nodes[30];
 
   bool get(Uint32 node, Uint32 idx, Uint32 type_id, Uint32*& val_ptr) const;
+  bool is_empty() const;
+  bool is_full() const;
+  Uint32 first_free() const;
+  Uint32 last_free() const;
 };
 
+inline
+bool DA256Page::is_empty() const
+{
+  return !(0x7fff & (m_header[0].m_magic | m_header[1].m_magic));
+}
+
+inline
+bool DA256Page::is_full() const
+{
+  return !(0x7fff & ~(m_header[0].m_magic & m_header[1].m_magic));
+}
+
+inline
+Uint32 DA256Page::first_free() const
+{
+  Uint32 node = BitmaskImpl::ctz(~m_header[0].m_magic | 0x8000);
+  if (node == 15)
+    node = 15 + BitmaskImpl::ctz(~m_header[1].m_magic | 0x8000);
+  return node;
+}
+
+inline
+Uint32 DA256Page::last_free() const
+{
+  Uint32 node = 29 - BitmaskImpl::clz((~m_header[1].m_magic << 17) | 0x10000);
+  if (node == 14)
+    node = 14 - BitmaskImpl::clz((~m_header[0].m_magic << 17) | 0x10000);
+  return node;
+}
+
+
 #undef require
 #define require(x) require_exit_or_core_with_printer((x), 0, ndbout_printer)
 //#define DA256_USE_PX
@@ -68,8 +104,11 @@ struct DA256Page
 #endif
 Uint32 verbose = 0;
 Uint32 allocatedpages = 0;
+Uint32 releasedpages = 0;
+Uint32 maxallocatedpages = 0;
 Uint32 allocatednodes = 0;
 Uint32 releasednodes = 0;
+Uint32 maxallocatednodes = 0;
 #endif
 
 static
@@ -94,6 +133,7 @@ DynArr256Pool::DynArr256Pool()
 {
   m_type_id = RNIL;
   m_first_free = RNIL;
+  m_last_free = RNIL;
   m_memroot = 0;
 }
 
@@ -163,6 +203,9 @@ DynArr256::get(Uint32 pos) const
     return 0;
   }
   
+#ifdef VM_TRACE
+  require((m_head.m_sz > 0) && (pos <= m_head.m_high_pos));
+#endif
 #ifdef DA256_USE_PX
   Uint32 px[4] = { (pos >> 24) & 255, 
 		   (pos >> 16) & 255, 
@@ -251,6 +294,11 @@ DynArr256::set(Uint32 pos)
     ptrI = * retVal;
   } 
   
+#ifdef VM_TRACE
+  if (pos > m_head.m_high_pos)
+    m_head.m_high_pos = pos;
+#endif
+
   return retVal;
   
 err:
@@ -296,16 +344,14 @@ initpage(DA256Page* p, Uint32 page_no, U
   {
     free = (DA256Free*)(p->m_nodes+i);
     free->m_magic = type_id;
-    free->m_next_free = (page_no << DA256_BITS) + (i + 1);
+    free->m_next_free = RNIL;
+    free->m_prev_free = RNIL;
 #ifdef DA256_EXTRA_SAFE
     DA256Node* node = p->m_nodes+i;
     for (j = 0; j<17; j++)
       node->m_lines[j].m_magic = type_id;
 #endif
   }
-  
-  free = (DA256Free*)(p->m_nodes+29);
-  free->m_next_free = RNIL;
 }
 
 bool
@@ -370,9 +416,12 @@ DynArr256::init(ReleaseIterator &iter)
  * 0 - done
  * 1 - data
  * 2 - no data
+ *
+ * if ptrVal is NULL, truncate work in trim mode and will stop
+ * (return 0) as soon value is not RNIL
  */
 Uint32
-DynArr256::truncate(Uint32 keep_pos, ReleaseIterator& iter, Uint32* ptrVal)
+DynArr256::truncate(Uint32 trunc_pos, ReleaseIterator& iter, Uint32* ptrVal)
 {
   Uint32 type_id = (~m_pool.m_type_id) & 0xFFFF;
   DA256Page * memroot = m_pool.m_memroot;
@@ -380,7 +429,7 @@ DynArr256::truncate(Uint32 keep_pos, Rel
   for (;;)
   {
     if (iter.m_sz == 0 ||
-        iter.m_pos < keep_pos ||
+        iter.m_pos < trunc_pos ||
         m_head.m_sz == 0)
     {
       return 0;
@@ -391,7 +440,8 @@ DynArr256::truncate(Uint32 keep_pos, Rel
     Uint32 page_no = ptrI >> DA256_BITS;
     Uint32 page_idx = (ptrI & DA256_MASK) ;
     DA256Page* page = memroot + page_no;
-    Uint32 node_index = (iter.m_pos >> (8 * (m_head.m_sz - iter.m_sz))) & 255;
+    Uint32 node_addr = (iter.m_pos >> (8 * (m_head.m_sz - iter.m_sz)));
+    Uint32 node_index = node_addr & 255;
     bool is_value = (iter.m_sz == m_head.m_sz);
 
     if (unlikely(! page->get(page_idx, node_index, type_id, refPtr)))
@@ -399,10 +449,17 @@ DynArr256::truncate(Uint32 keep_pos, Rel
       require(false);
     }
     assert(refPtr != NULL);
-    *ptrVal = *refPtr;
+    if (ptrVal != NULL)
+    {
+      *ptrVal = *refPtr;
+    }
+    else if (is_value && *refPtr != RNIL)
+    {
+      return 0;
+    }
 
     if (iter.m_sz == 1 &&
-        (iter.m_pos >> (8 * (m_head.m_sz - iter.m_sz))) == 0)
+        node_addr == 0)
     {
       assert(iter.m_ptr_i[iter.m_sz] == m_head.m_ptr_i);
       assert(iter.m_ptr_i[iter.m_sz + 1] == RNIL);
@@ -410,10 +467,10 @@ DynArr256::truncate(Uint32 keep_pos, Rel
       m_pool.release(m_head.m_ptr_i);
       m_head.m_sz --;
       m_head.m_ptr_i = iter.m_ptr_i[iter.m_sz];
-      return is_value ? 1 : 2;
+      if (is_value)
+        return 1;
     }
-
-    if (is_value || iter.m_ptr_i[iter.m_sz + 1] == *refPtr)
+    else if (is_value || iter.m_ptr_i[iter.m_sz + 1] == *refPtr)
     { // sz--
       Uint32 ptrI = *refPtr;
       if (!is_value)
@@ -438,7 +495,11 @@ DynArr256::truncate(Uint32 keep_pos, Rel
         assert((iter.m_pos & ~(0xffffffff << (8 * (m_head.m_sz - iter.m_sz)))) == 0);
         iter.m_pos --;
       }
-      if (is_value)
+#ifdef VM_TRACE
+      if (iter.m_pos < m_head.m_high_pos)
+        m_head.m_high_pos = iter.m_pos;
+#endif
+      if (is_value && ptrVal != NULL)
         return 1;
     }
     else
@@ -502,6 +563,8 @@ seizenode(DA256Page* page, Uint32 idx, U
 
 #ifdef UNIT_TEST
   allocatednodes++;
+  if (allocatednodes - releasednodes > maxallocatednodes)
+    maxallocatednodes = allocatednodes - releasednodes;
 #endif
   return true;
 }
@@ -574,28 +637,45 @@ DynArr256Pool::seize()
       initpage(page, page_no, type_id);
 #ifdef UNIT_TEST
       allocatedpages++;
+      if (allocatedpages - releasedpages > maxallocatedpages)
+        maxallocatedpages = allocatedpages - releasedpages;
 #endif
     }
     else
     {
       return RNIL;
     }
-    ff = (page_no << DA256_BITS);
+    m_last_free = m_first_free = ff = page_no;
   }
   else
   {
-    page = memroot + (ff >> DA256_BITS);
+    page = memroot + ff;
   }
   
-  Uint32 idx = ff & DA256_MASK;
+  Uint32 idx = page->first_free();
   DA256Free * ptr = (DA256Free*)(page->m_nodes + idx);
   if (likely(ptr->m_magic == type_id))
   {
-    Uint32 next = ptr->m_next_free;
+    Uint32 last_free = page->last_free();
+    Uint32 next_page = ((DA256Free*)(page->m_nodes + last_free))->m_next_free;
     if (likely(seizenode(page, idx, type_id)))
     {
-      m_first_free = next;    
-      return ff;
+      if (page->is_full())
+      {
+        assert(m_first_free == ff);
+        m_first_free = next_page;
+        if (m_first_free == RNIL)
+        {
+          assert(m_last_free == ff);
+          m_last_free = RNIL;
+        }
+        else
+        {
+          page = memroot + next_page;
+          ((DA256Free*)(page->m_nodes + page->last_free()))->m_prev_free = RNIL;
+        }
+      }
+      return (ff << DA256_BITS) | idx;
     }
   }
   
@@ -613,15 +693,61 @@ DynArr256Pool::release(Uint32 ptrI)
   Uint32 page_idx = ptrI & DA256_MASK;
   DA256Page * memroot = m_memroot;
   DA256Page * page = memroot + page_no;
-  
   DA256Free * ptr = (DA256Free*)(page->m_nodes + page_idx);
+
+  Guard2 g(m_mutex);
+  Uint32 last_free = page->last_free();
   if (likely(releasenode(page, page_idx, type_id)))
   {
     ptr->m_magic = type_id;
-    Guard2 g(m_mutex);
-    Uint32 ff = m_first_free;
-    ptr->m_next_free = ff;
-    m_first_free = ptrI;
+    if (last_free > 29)
+    { // Add last to page free list
+      Uint32 lf = m_last_free;
+      ptr->m_prev_free = lf;
+      ptr->m_next_free = RNIL;
+      m_last_free = page_no;
+      if (m_first_free == RNIL)
+        m_first_free = page_no;
+
+      if (lf != RNIL)
+      {
+        page = memroot + lf;
+        DA256Free* pptr = (DA256Free*)(page->m_nodes + page->last_free());
+        pptr->m_next_free = page_no;
+      }
+    }
+    else if (page->is_empty())
+    {
+      // TODO msundell: release_page
+      // unlink from free page list
+      Uint32 nextpage = ((DA256Free*)(page->m_nodes + last_free))->m_next_free;
+      Uint32 prevpage = ((DA256Free*)(page->m_nodes + last_free))->m_prev_free;
+      m_ctx.release_page(type_id, page_no);
+#ifdef UNIT_TEST
+      releasedpages ++;
+#endif
+      if (nextpage != RNIL)
+      {
+        page = memroot + nextpage;
+        ((DA256Free*)(page->m_nodes + page->last_free()))->m_prev_free = prevpage;
+      }
+      if (prevpage != RNIL)
+      {
+        page = memroot + prevpage;
+        ((DA256Free*)(page->m_nodes + page->last_free()))->m_next_free = nextpage;
+      }
+      if (m_first_free == page_no)
+      {
+        m_first_free = nextpage;
+      }
+      if (m_last_free == page_no)
+        m_last_free = prevpage;
+    }
+    else if (page_idx > last_free)
+    { // last free node in page tracks free page list links
+      ptr->m_next_free = ((DA256Free*)(page->m_nodes + last_free))->m_next_free;
+      ptr->m_prev_free = ((DA256Free*)(page->m_nodes + last_free))->m_prev_free;
+    }
     return;
   }
   require(false);
@@ -631,6 +757,36 @@ DynArr256Pool::release(Uint32 ptrI)
 
 static
 bool
+release(DynArr256& arr)
+{
+  DynArr256::ReleaseIterator iter;
+  arr.init(iter);
+  Uint32 val;
+  Uint32 cnt=0;
+  Uint64 start;
+  if (verbose > 2)
+    ndbout_c("allocatedpages: %d (max %d) releasedpages: %d allocatednodes: %d (max %d) releasednodes: %d",
+           allocatedpages, maxallocatedpages,
+           releasedpages,
+           allocatednodes, maxallocatednodes,
+           releasednodes);
+  start = my_micro_time();
+  while (arr.release(iter, &val))
+    cnt++;
+  start = my_micro_time() - start;
+  if (verbose > 1)
+    ndbout_c("allocatedpages: %d (max %d) releasedpages: %d allocatednodes: %d (max %d) releasednodes: %d (%llu us)"
+             " releasecnt: %d",
+             allocatedpages, maxallocatedpages,
+             releasedpages,
+             allocatednodes, maxallocatednodes,
+             releasednodes,
+             start, cnt);
+  return true;
+}
+
+static
+bool
 simple(DynArr256 & arr, int argc, char* argv[])
 {
   if (verbose) ndbout_c("argc: %d", argc);
@@ -863,16 +1019,15 @@ write(DynArr256& arr, int argc, char **
     {
       Uint32 idx = ((rand() & (~seqmask)) + ((i + seq) & seqmask)) % maxidx;
       Uint32 *ptr = arr.set(idx);
+      if (ptr == NULL) break; /* out of memory */
       *ptr = i;
     }
     start = my_micro_time() - start;
     float uspg = (float)start; uspg /= cnt;
     if (verbose)
       ndbout_c("Elapsed %lldus -> %f us/set", start, uspg);
-    DynArr256::ReleaseIterator iter;
-    arr.init(iter);
-    Uint32 val;
-    while(arr.release(iter, &val));
+    if (!release(arr))
+      return false;
   }
   return true;
 }
@@ -923,6 +1078,12 @@ main(int argc, char** argv)
 #else
   verbose = 0;
 #endif
+  while (argc > 1 && strcmp(argv[1], "-v") == 0)
+  {
+    verbose++;
+    argc--;
+    argv++;
+  }
 
   Pool_context pc = test_context(10000 /* pages */);
 
@@ -983,20 +1144,18 @@ main(int argc, char** argv)
   }
 #endif
 
-  DynArr256::ReleaseIterator iter;
-  arr.init(iter);
-  Uint32 cnt = 0, val;
-  while (arr.release(iter, &val)) cnt++;
-  
+  release(arr);
   if (verbose)
-    ndbout_c("allocatedpages: %d allocatednodes: %d releasednodes: %d"
-	   " releasecnt: %d",
-	   allocatedpages, 
-	   allocatednodes,
-	   releasednodes,
-	   cnt);
+    ndbout_c("allocatedpages: %d (max %d) releasedpages: %d allocatednodes: %d (max %d) releasednodes: %d",
+           allocatedpages, maxallocatedpages,
+           releasedpages,
+           allocatednodes, maxallocatednodes,
+           releasednodes);
+
 #ifdef TAP_TEST
-  ok(allocatednodes == releasednodes, "release");
+  ok(allocatednodes == releasednodes &&
+     allocatedpages == releasedpages,
+     "release");
   return exit_status();
 #else
   return 0;

=== modified file 'storage/ndb/src/kernel/vm/DynArr256.hpp'
--- a/storage/ndb/src/kernel/vm/DynArr256.hpp	2012-02-07 15:43:54 +0000
+++ b/storage/ndb/src/kernel/vm/DynArr256.hpp	2012-03-13 11:55:23 +0000
@@ -37,6 +37,7 @@ public:
 protected:
   Uint32 m_type_id;
   Uint32 m_first_free;
+  Uint32 m_last_free;
   Pool_context m_ctx;
   struct DA256Page* m_memroot;
   NdbMutex * m_mutex;
@@ -51,10 +52,17 @@ class DynArr256
 public:
   struct Head
   {
+#ifdef VM_TRACE
+    Head() { m_ptr_i = RNIL; m_sz = 0; m_high_pos = 0; }
+#else
     Head() { m_ptr_i = RNIL; m_sz = 0;}
+#endif
     
     Uint32 m_ptr_i;
     Uint32 m_sz;
+#ifdef VM_TRACE
+    Uint32 m_high_pos;
+#endif
 
     bool isEmpty() const { return m_sz == 0;}
   };
@@ -79,7 +87,8 @@ public:
    *        2 - nodata
    */
   Uint32 release(ReleaseIterator&, Uint32* retptr);
-  Uint32 truncate(Uint32 keep_pos, ReleaseIterator&, Uint32* retptr);
+  Uint32 trim(Uint32 trim_pos, ReleaseIterator&);
+  Uint32 truncate(Uint32 trunc_pos, ReleaseIterator&, Uint32* retptr);
 protected:
   Head & m_head;
   DynArr256Pool & m_pool;
@@ -94,4 +103,10 @@ Uint32 DynArr256::release(ReleaseIterato
   return truncate(0, iter, retptr);
 }
 
+inline
+Uint32 DynArr256::trim(Uint32 pos, ReleaseIterator& iter)
+{
+  return truncate(pos, iter, NULL);
+}
+
 #endif

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (mauritz.sundell:4876 to 4881) Mauritz Sundell13 Mar