List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:September 29 2012 12:05am
Subject:bzr push into mysql-5.1-telco-7.1 branch (frazer.clement:4618 to 4620)
View as plain text  
 4620 Frazer Clement	2012-09-29 [merge]
      Merge 7.0->7.1

    modified:
      scripts/mysql_system_tables.sql
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TCP_Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/vm/NdbinfoTables.cpp
      storage/ndb/tools/ndbinfo_sql.cpp
 4619 Frazer Clement	2012-09-29 [merge]
      Merge 7.0->7.1

    added:
      mysql-test/suite/ndb_big/bug14000373.cnf
      mysql-test/suite/ndb_big/bug14000373.result
      mysql-test/suite/ndb_big/bug14000373.test
      storage/ndb/src/kernel/vm/LHLevel.cpp
      storage/ndb/src/kernel/vm/LHLevel.hpp
    modified:
      storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
      storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/vm/CMakeLists.txt
      storage/ndb/src/kernel/vm/Makefile.am
 4618 Martin Zaun	2012-09-25 [merge]
      merge from ndb-7.1-jtie-array-fix

    modified:
      storage/ndb/src/ndbjtie/jtie/jtie_tconv_array_impl.hpp
=== added file 'mysql-test/suite/ndb_big/bug14000373.cnf'
--- a/mysql-test/suite/ndb_big/bug14000373.cnf	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_big/bug14000373.cnf	2012-09-28 12:58:59 +0000
@@ -0,0 +1,5 @@
+!include suite/ndb/my.cnf
+
+[cluster_config.1]
+DataMemory=300M
+IndexMemory=700M

=== added file 'mysql-test/suite/ndb_big/bug14000373.result'
--- a/mysql-test/suite/ndb_big/bug14000373.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_big/bug14000373.result	2012-09-28 12:58:59 +0000
@@ -0,0 +1,21 @@
+set max_heap_table_size = 286720000;
+create table t1 (a int key) engine=memory;
+load data local infile 'suite/ndb/data/table_data10000.dat' into table t1 columns terminated by ' ' (a, @col2);
+insert into t1 select a + 10000 from t1;;
+insert into t1 select a + 10000 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 * 2 * 2 from t1;;
+insert into t1 select a + 10000 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 from t1;;
+select count(*) from t1;
+count(*)
+5120000
+alter table t1 engine=ndbcluster comment='NDB_TABLE=NOLOGGING' partition by key() partitions 1;
+alter table t1 engine=memory;
+select count(*) from t1;
+count(*)
+5120000
+drop table t1;

=== added file 'mysql-test/suite/ndb_big/bug14000373.test'
--- a/mysql-test/suite/ndb_big/bug14000373.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_big/bug14000373.test	2012-09-28 12:58:59 +0000
@@ -0,0 +1,25 @@
+-- source include/have_ndb.inc
+
+# Test is using error insert, check that binaries support it
+-- source suite/ndb/t/have_ndb_error_insert.inc
+
+# Use small LoadFactors to force sparse hash table
+--exec $NDB_MGM --no-defaults --ndb-connectstring="$NDB_CONNECTSTRING" -e "all error 3003" >> $NDB_TOOLS_OUTPUT
+
+set max_heap_table_size = 286720000;
+create table t1 (a int key) engine=memory;
+load data local infile 'suite/ndb/data/table_data10000.dat' into table t1 columns terminated by ' ' (a, @col2);
+let $i = 9;
+let $b = 10000;
+while ($i)
+{
+--eval insert into t1 select a + $b from t1;
+  let $b = $b * 2;
+  dec $i;
+}
+select count(*) from t1;
+alter table t1 engine=ndbcluster comment='NDB_TABLE=NOLOGGING' partition by key() partitions 1;
+--exec $NDB_MGM --no-defaults --ndb-connectstring="$NDB_CONNECTSTRING" -e "all report memory" >> $NDB_TOOLS_OUTPUT
+alter table t1 engine=memory;
+select count(*) from t1;
+drop table t1;

=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql	2012-03-28 15:30:10 +0000
+++ b/scripts/mysql_system_tables.sql	2012-09-28 23:50:32 +0000
@@ -290,7 +290,7 @@ PREPARE stmt FROM @str;
 EXECUTE stmt;
 DROP PREPARE stmt;
 
-SET @str=IF(@have_ndbinfo,'CREATE TABLE `ndbinfo`.`ndb$transporters` (`node_id` INT UNSIGNED,`remote_node_id` INT UNSIGNED,`connection_status` INT UNSIGNED) COMMENT="transporter status" ENGINE=NDBINFO','SET @dummy = 0');
+SET @str=IF(@have_ndbinfo,'CREATE TABLE `ndbinfo`.`ndb$transporters` (`node_id` INT UNSIGNED,`remote_node_id` INT UNSIGNED,`connection_status` INT UNSIGNED,`remote_address` VARCHAR(512),`bytes_sent` BIGINT UNSIGNED,`bytes_received` BIGINT UNSIGNED) COMMENT="transporter status" ENGINE=NDBINFO','SET @dummy = 0');
 PREPARE stmt FROM @str;
 EXECUTE stmt;
 DROP PREPARE stmt;
@@ -450,7 +450,7 @@ EXECUTE stmt;
 DROP PREPARE stmt;
 
 # ndbinfo.transporters
-SET @str=IF(@have_ndbinfo,'CREATE OR REPLACE DEFINER=`root@localhost` SQL SECURITY INVOKER VIEW `ndbinfo`.`transporters` AS SELECT node_id, remote_node_id,  CASE connection_status  WHEN 0 THEN "CONNECTED"  WHEN 1 THEN "CONNECTING"  WHEN 2 THEN "DISCONNECTED"  WHEN 3 THEN "DISCONNECTING"  ELSE NULL  END AS status FROM `ndbinfo`.`ndb$transporters`','SET @dummy = 0');
+SET @str=IF(@have_ndbinfo,'CREATE OR REPLACE DEFINER=`root@localhost` SQL SECURITY INVOKER VIEW `ndbinfo`.`transporters` AS SELECT node_id, remote_node_id,  CASE connection_status  WHEN 0 THEN "CONNECTED"  WHEN 1 THEN "CONNECTING"  WHEN 2 THEN "DISCONNECTED"  WHEN 3 THEN "DISCONNECTING"  ELSE NULL  END AS status,  remote_address, bytes_sent, bytes_received FROM `ndbinfo`.`ndb$transporters`','SET @dummy = 0');
 PREPARE stmt FROM @str;
 EXECUTE stmt;
 DROP PREPARE stmt;

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-09-13 10:27:55 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-09-28 23:36:17 +0000
@@ -394,6 +394,9 @@ public:
 		  		 int s_port);	// signed port. <0 is dynamic
   Transporter* get_transporter(NodeId nodeId);
   struct in_addr get_connect_address(NodeId node_id) const;
+
+  Uint64 get_bytes_sent(NodeId nodeId) const;
+  Uint64 get_bytes_received(NodeId nodeId) const;
 protected:
   
 private:

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2012-06-07 14:46:55 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2012-09-28 23:36:17 +0000
@@ -379,6 +379,7 @@ ok:
   iovec_data_sent(sum_sent);
   sendCount += send_cnt;
   sendSize  += sum_sent;
+  m_bytes_sent += sum_sent;
   if(sendCount >= reportFreq)
   {
     get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
@@ -420,6 +421,7 @@ TCP_Transporter::doReceive(TransporterRe
       
       receiveCount ++;
       receiveSize  += nBytesRead;
+      m_bytes_received += nBytesRead;
       
       if(receiveCount == reportFreq){
         recvdata.reportReceiveLen(remoteNodeId,

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	2012-06-07 14:46:55 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	2012-09-28 23:36:17 +0000
@@ -44,6 +44,7 @@ Transporter::Transporter(TransporterRegi
     isServer(lNodeId==serverNodeId),
     m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
     m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF),
+    m_bytes_sent(0), m_bytes_received(0),
     isMgmConnection(_isMgmConnection),
     m_connected(false),
     m_type(_type),
@@ -286,6 +287,8 @@ Transporter::doDisconnect() {
     return;
 
   m_connected = false;
+  m_bytes_sent = 0;
+  m_bytes_received = 0;
 
   disconnectImpl();
 }

=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp	2012-06-07 14:46:55 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp	2012-09-28 23:36:17 +0000
@@ -158,6 +158,8 @@ protected:
   /* Overload limit, as configured with the OverloadLimit config parameter. */
   Uint32 m_overload_limit;
   Uint32 m_slowdown_limit;
+  Uint64 m_bytes_sent;
+  Uint64 m_bytes_received;
 
 private:
 

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-09-13 10:28:59 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-09-28 23:50:32 +0000
@@ -56,6 +56,18 @@ TransporterRegistry::get_connect_address
   return theTransporters[node_id]->m_connect_address;
 }
 
+Uint64
+TransporterRegistry::get_bytes_sent(NodeId node_id) const
+{
+  return theTransporters[node_id]->m_bytes_sent;
+}
+
+Uint64
+TransporterRegistry::get_bytes_received(NodeId node_id) const
+{
+  return theTransporters[node_id]->m_bytes_received;
+}
+
 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
 {
   DBUG_ENTER("SocketServer::Session * TransporterService::newSession");

=== modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp	2012-03-13 11:57:27 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp	2012-09-28 13:15:29 +0000
@@ -26,6 +26,7 @@
 #include <pc.hpp>
 #include <DynArr256.hpp>
 #include <SimulatedBlock.hpp>
+#include <LHLevel.hpp>
 
 #ifdef DBACC_C
 // Debug Macros
@@ -199,7 +200,7 @@ class ElementHeader {
    * 
    * l = Locked    -- If true contains operation else scan bits + hash value
    * s = Scan bits
-   * h = Hash value
+   * h = Reduced hash value. The lower bits used for address is shifted away
    * o = Operation ptr I
    *
    *           1111111111222222222233
@@ -208,17 +209,16 @@ class ElementHeader {
    *  ooooooooooooooooooooooooooooooo
    */
 public:
-  STATIC_CONST( HASH_VALUE_PART_MASK = 0xFFFF );
-  
   static bool getLocked(Uint32 data);
   static bool getUnlocked(Uint32 data);
   static Uint32 getScanBits(Uint32 data);
-  static Uint32 getHashValuePart(Uint32 data);
   static Uint32 getOpPtrI(Uint32 data);
+  static LHBits16 getReducedHashValue(Uint32 data);
 
   static Uint32 setLocked(Uint32 opPtrI);
-  static Uint32 setUnlocked(Uint32 hashValuePart, Uint32 scanBits);
+  static Uint32 setUnlocked(Uint32 scanBits, LHBits16 const& reducedHashValue);
   static Uint32 setScanBit(Uint32 header, Uint32 scanBit);
+  static Uint32 setReducedHashValue(Uint32 header, LHBits16 const& reducedHashValue);
   static Uint32 clearScanBit(Uint32 header, Uint32 scanBit);
 };
 
@@ -241,11 +241,11 @@ ElementHeader::getScanBits(Uint32 data){
   return (data >> 1) & ((1 << MAX_PARALLEL_SCANS_PER_FRAG) - 1);
 }
 
-inline 
-Uint32 
-ElementHeader::getHashValuePart(Uint32 data){
+inline
+LHBits16
+ElementHeader::getReducedHashValue(Uint32 data){
   assert(getUnlocked(data));
-  return data >> 16;
+  return LHBits16::unpack(data >> 16);
 }
 
 inline
@@ -258,12 +258,15 @@ ElementHeader::getOpPtrI(Uint32 data){
 inline 
 Uint32 
 ElementHeader::setLocked(Uint32 opPtrI){
+  assert(opPtrI < 0x8000000);
   return (opPtrI << 1) + 0;
 }
 inline
 Uint32 
-ElementHeader::setUnlocked(Uint32 hashValue, Uint32 scanBits){
-  return (hashValue << 16) + (scanBits << 1) + 1;
+ElementHeader::setUnlocked(Uint32 scanBits, LHBits16 const& reducedHashValue)
+{
+  assert(scanBits < (1 << MAX_PARALLEL_SCANS_PER_FRAG));
+  return (Uint32(reducedHashValue.pack()) << 16) | (scanBits << 1) | 1;
 }
 
 inline
@@ -280,6 +283,13 @@ ElementHeader::clearScanBit(Uint32 heade
   return header & (~(scanBit << 1));
 }
 
+inline
+Uint32
+ElementHeader::setReducedHashValue(Uint32 header, LHBits16 const& reducedHashValue)
+{
+  assert(getUnlocked(header));
+  return (Uint32(reducedHashValue.pack()) << 16) | (header & 0xffff);
+}
 
 class Dbacc: public SimulatedBlock {
   friend class DbaccProxy;
@@ -401,14 +411,15 @@ struct Fragmentrec {
 // slackCheck When slack goes over this value it is time to expand.
 // slackCheck = (maxp + p + 1)*(maxloadfactor - minloadfactor) or 
 // bucketSize * hysteresis
+// Since at most RNIL 8KiB-pages can be used for a fragment, the extreme values
+// for slack will be within -2^43 and +2^43 words.
 //-----------------------------------------------------------------------------
+  LHLevelRH level;
   Uint32 localkeylen;
-  Uint32 maxp;
   Uint32 maxloadfactor;
   Uint32 minloadfactor;
-  Uint32 p;
-  Uint32 slack;
-  Uint32 slackCheck;
+  Int64 slack;
+  Int64 slackCheck;
 
 //-----------------------------------------------------------------------------
 // nextfreefrag is the next free fragment if linked into a free list
@@ -441,19 +452,17 @@ struct Fragmentrec {
   Uint16 keyLength;
 
 //-----------------------------------------------------------------------------
-// This flag is used to avoid sending a big number of expand or shrink signals
-// when simultaneously committing many inserts or deletes.
+// Only allow one expand or shrink signal in queue at the time.
 //-----------------------------------------------------------------------------
-  Uint8 expandFlag;
+  bool expandOrShrinkQueued;
 
 //-----------------------------------------------------------------------------
 // hashcheckbit is the bit to check whether to send element to split bucket or not
 // k (== 6) is the number of buckets per page
-// lhfragbits is the number of bits used to calculate the fragment id
 //-----------------------------------------------------------------------------
-  Uint8 hashcheckbit;
-  Uint8 k;
-  Uint8 lhfragbits;
+  STATIC_CONST( k = 6 );
+  STATIC_CONST( MIN_HASH_COMPARE_BITS = 7 );
+  STATIC_CONST( MAX_HASH_VALUE_BITS = 31 );
 
 //-----------------------------------------------------------------------------
 // nodetype can only be STORED in this release. Is currently only set, never read
@@ -469,6 +478,11 @@ struct Fragmentrec {
 // flag to mark that execEXPANDCHECK2 has failed due to DirRange full
 //-----------------------------------------------------------------------------
   Uint8 dirRangeFull;
+
+public:
+  Uint32 getPageNumber(Uint32 bucket_number) const;
+  Uint32 getPageIndex(Uint32 bucket_number) const;
+  bool enough_valid_bits(LHBits16 const& reduced_hash_value) const;
 };
 
   typedef Ptr<Fragmentrec> FragmentrecPtr;
@@ -484,8 +498,7 @@ struct Operationrec {
   Uint32 elementPointer;
   Uint32 fid;
   Uint32 fragptr;
-  Uint32 hashvaluePart;
-  Uint32 hashValue;
+  LHBits32 hashValue;
   Uint32 nextLockOwnerOp;
   Uint32 nextOp;
   Uint32 nextParallelQue;
@@ -511,7 +524,8 @@ struct Operationrec {
   Uint16 tupkeylen;
   Uint32 xfrmtupkeylen;
   Uint32 userblockref;
-  Uint32 scanBits;
+  Uint16 scanBits;
+  LHBits16 reducedHashValue;
 
   enum OpBits {
     OP_MASK                 = 0x0000F // 4 bits for operation type
@@ -692,8 +706,8 @@ private:
   void releaseDirIndexResources(Signal* signal, FragmentrecPtr regFragPtr);
   void releaseFragRecord(Signal* signal, FragmentrecPtr regFragPtr);
   void initScanFragmentPart(Signal* signal);
-  Uint32 checkScanExpand(Signal* signal);
-  Uint32 checkScanShrink(Signal* signal);
+  Uint32 checkScanExpand(Signal* signal, Uint32 splitBucket);
+  Uint32 checkScanShrink(Signal* signal, Uint32 sourceBucket, Uint32 destBucket);
   void initialiseFragRec(Signal* signal);
   void initialiseFsConnectionRec(Signal* signal);
   void initialiseFsOpRec(Signal* signal);
@@ -761,6 +775,10 @@ private:
   void seizeRightlist(Signal* signal);
   Uint32 readTablePk(Uint32 lkey1, Uint32 lkey2, Uint32 eh, OperationrecPtr);
   Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
+  LHBits32 getElementHash(OperationrecPtr& oprec);
+  LHBits32 getElementHash(Uint32 const* element, Int32 forward);
+  LHBits32 getElementHash(Uint32 const* element, Int32 forward, OperationrecPtr& oprec);
+  void shrink_adjust_reduced_hash_value(Uint32 bucket_number);
   Uint32 getPagePtr(DynArr256::Head&, Uint32);
   bool setPagePtr(DynArr256::Head& directory, Uint32 index, Uint32 ptri);
   Uint32 unsetPagePtr(DynArr256::Head& directory, Uint32 index);
@@ -837,8 +855,6 @@ private:
 
   void zpagesize_error(const char* where);
 
-  void reenable_expand_after_redo_log_exection_complete(Signal*);
-
   // charsets
   void xfrmKeyData(Signal* signal);
 
@@ -1000,7 +1016,6 @@ private:
   Uint32 tgeContainerptr;
   Uint32 tgeElementptr;
   Uint32 tgeForward;
-  Uint32 texpReceivedBucket;
   Uint32 texpDirInd;
   Uint32 texpDirRangeIndex;
   Uint32 texpDirPageIndex;
@@ -1034,7 +1049,6 @@ private:
   Uint32 tmp;
   Uint32 tmpP;
   Uint32 tmpP2;
-  Uint32 tmp1;
   Uint32 tmp2;
   Uint32 tgflPageindex;
   Uint32 tmpindex;
@@ -1094,4 +1108,23 @@ private:
   Uint32 c_memusage_report_frequency;
 };
 
+inline Uint32 Dbacc::Fragmentrec::getPageNumber(Uint32 bucket_number) const
+{
+  assert(bucket_number < RNIL);
+  return bucket_number >> k;
+}
+
+inline Uint32 Dbacc::Fragmentrec::getPageIndex(Uint32 bucket_number) const
+{
+  assert(bucket_number < RNIL);
+  return bucket_number & ((1 << k) - 1);
+}
+
+inline bool Dbacc::Fragmentrec::enough_valid_bits(LHBits16 const& reduced_hash_value) const
+{
+  // Forte C 5.0 needs use of intermediate constant
+  int const bits = MIN_HASH_COMPARE_BITS;
+  return level.getNeededValidBits(bits) <= reduced_hash_value.valid_bits();
+}
+
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2012-04-26 10:33:16 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2012-09-28 13:15:29 +0000
@@ -34,6 +34,7 @@
 #include <signaldata/TransIdAI.hpp>
 #include <KeyDescriptor.hpp>
 #include <signaldata/NodeStateSignalData.hpp>
+#include <md5_hash.hpp>
 
 #ifdef VM_TRACE
 #define DEBUG(x) ndbout << "DBACC: "<< x << endl;
@@ -902,7 +903,7 @@ void Dbacc::initOpRec(Signal* signal)
 
   Treqinfo = signal->theData[2];
 
-  operationRecPtr.p->hashValue = signal->theData[3];
+  operationRecPtr.p->hashValue = LHBits32(signal->theData[3]);
   operationRecPtr.p->tupkeylen = signal->theData[4];
   operationRecPtr.p->xfrmtupkeylen = signal->theData[4];
   operationRecPtr.p->transId1 = signal->theData[5];
@@ -1071,7 +1072,7 @@ void Dbacc::execACCKEYREQ(Signal* signal
 	  /*---------------------------------------------------------------*/
           Uint32 eh = gePageptr.p->word32[tgeElementptr];
           operationRecPtr.p->scanBits = ElementHeader::getScanBits(eh);
-          operationRecPtr.p->hashvaluePart = ElementHeader::getHashValuePart(eh);
+          operationRecPtr.p->reducedHashValue = ElementHeader::getReducedHashValue(eh);
           operationRecPtr.p->elementPage = gePageptr.i;
           operationRecPtr.p->elementContainer = tgeContainerptr;
           operationRecPtr.p->elementPointer = tgeElementptr;
@@ -1533,10 +1534,8 @@ void Dbacc::insertelementLab(Signal* sig
   
   insertLockOwnersList(signal, operationRecPtr);
 
-  const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
-  operationRecPtr.p->hashvaluePart = 
-    (operationRecPtr.p->hashValue >> tmp) & 0xFFFF;
   operationRecPtr.p->scanBits = 0;	/* NOT ANY ACTIVE SCAN */
+  operationRecPtr.p->reducedHashValue = fragrecptr.p->level.reduce(operationRecPtr.p->hashValue);
   tidrElemhead = ElementHeader::setLocked(operationRecPtr.i);
   idrPageptr = gdiPageptr;
   tidrPageindex = tgdiPageindex;
@@ -2343,14 +2342,12 @@ void Dbacc::execACC_COMMITREQ(Signal* si
 	if (fragrecptr.p->slack > fragrecptr.p->slackCheck) { 
           /* TIME FOR JOIN BUCKETS PROCESS */
 	  if (fragrecptr.p->expandCounter > 0) {
-	    if (fragrecptr.p->expandFlag < 2) {
+            if (!fragrecptr.p->expandOrShrinkQueued)
+            {
 	      jam();
 	      signal->theData[0] = fragrecptr.i;
-	      signal->theData[1] = fragrecptr.p->p;
-	      signal->theData[2] = fragrecptr.p->maxp;
-	      signal->theData[3] = fragrecptr.p->expandFlag;
-	      fragrecptr.p->expandFlag = 2;
-	      sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB);
+              fragrecptr.p->expandOrShrinkQueued = true;
+              sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 1, JBB);
 	    }//if
 	  }//if
 	}//if
@@ -2359,15 +2356,15 @@ void Dbacc::execACC_COMMITREQ(Signal* si
       jam();                                                /* EXPAND PROCESS HANDLING */
       fragrecptr.p->noOfElements++;
       fragrecptr.p->slack -= fragrecptr.p->elementLength;
-      if (fragrecptr.p->slack >= (1u << 31)) { 
+      if (fragrecptr.p->slack < 0 && !fragrecptr.p->level.isFull())
+      {
 	/* IT MEANS THAT IF SLACK < ZERO */
-	if (fragrecptr.p->expandFlag == 0) {
+        if (!fragrecptr.p->expandOrShrinkQueued)
+        {
 	  jam();
-	  fragrecptr.p->expandFlag = 2;
 	  signal->theData[0] = fragrecptr.i;
-	  signal->theData[1] = fragrecptr.p->p;
-	  signal->theData[2] = fragrecptr.p->maxp;
-	  sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
+          fragrecptr.p->expandOrShrinkQueued = true;
+          sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 1, JBB);
 	}//if
       }//if
     }
@@ -3181,21 +3178,11 @@ Uint32 Dbacc::unsetPagePtr(DynArr256::He
 
 void Dbacc::getdirindex(Signal* signal) 
 {
-  Uint32 tgdiTmp;
   Uint32 tgdiAddress;
 
-  tgdiTmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;	/* OBS K = 6 */
-  tgdiPageindex = operationRecPtr.p->hashValue & ((1 << fragrecptr.p->k) - 1);
-  tgdiTmp = operationRecPtr.p->hashValue >> tgdiTmp;
-  tgdiTmp = (tgdiTmp << fragrecptr.p->k) | tgdiPageindex;
-  tgdiAddress = tgdiTmp & fragrecptr.p->maxp;
-  if (tgdiAddress < fragrecptr.p->p) {
-    jam();
-    tgdiAddress = tgdiTmp & ((fragrecptr.p->maxp << 1) | 1);
-  }//if
-  tgdiTmp = tgdiAddress >> fragrecptr.p->k;
-  ndbassert(tgdiTmp <= ElementHeader::HASH_VALUE_PART_MASK);
-  gdiPageptr.i = getPagePtr(fragrecptr.p->directory, tgdiTmp);
+  tgdiAddress = fragrecptr.p->level.getBucketNumber(operationRecPtr.p->hashValue);
+  tgdiPageindex = fragrecptr.p->getPageIndex(tgdiAddress);
+  gdiPageptr.i = getPagePtr(fragrecptr.p->directory, fragrecptr.p->getPageNumber(tgdiAddress));
   ptrCheckGuard(gdiPageptr, cpagesize, page8);
 }//Dbacc::getdirindex()
 
@@ -3278,6 +3265,7 @@ Dbacc::getElement(Signal* signal, Operat
   register Uint32 TelemLen = fragrecptr.p->elementLength;
   register Uint32* Tkeydata = (Uint32*)&signal->theData[7];
   const Uint32 localkeylen = fragrecptr.p->localkeylen;
+  Uint32 bucket_number = fragrecptr.p->level.getBucketNumber(operationRecPtr.p->hashValue);
 
   getdirindex(signal);
   tgePageindex = tgdiPageindex;
@@ -3292,8 +3280,6 @@ Dbacc::getElement(Signal* signal, Operat
   ndbrequire(TelemLen == ZELEM_HEAD_SIZE + localkeylen);
   tgeNextptrtype = ZLEFT;
 
-  const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
-  const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF;
   do {
     tgeContainerptr = mul_ZBUF_SIZE(tgePageindex);
     if (tgeNextptrtype == ZLEFT) {
@@ -3345,23 +3331,25 @@ Dbacc::getElement(Signal* signal, Operat
       // Check if it is the element searched for.
       /* ------------------------------------------------------------------- */
       do {
+        bool possible_match;
         tgeElementHeader = gePageptr.p->word32[tgeElementptr];
         tgeRemLen = tgeRemLen - TelemLen;
-        Uint32 hashValuePart;
 	Uint32 localkey1, localkey2;
 	lockOwnerPtr.i = RNIL;
 	lockOwnerPtr.p = NULL;
+        LHBits16 reducedHashValue;
         if (ElementHeader::getLocked(tgeElementHeader)) {
           jam();
 	  lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
           ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec);
-          hashValuePart = lockOwnerPtr.p->hashvaluePart;
+          possible_match = lockOwnerPtr.p->hashValue.match(operationRecPtr.p->hashValue);
+          reducedHashValue = lockOwnerPtr.p->reducedHashValue;
 	  localkey1 = lockOwnerPtr.p->localdata[0];
 	  localkey2 = lockOwnerPtr.p->localdata[1];
         } else {
           jam();
+          reducedHashValue = ElementHeader::getReducedHashValue(tgeElementHeader);
           Uint32 pos = tgeElementptr + tgeForward;
-          hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader);
           localkey1 = gePageptr.p->word32[pos];
           if (likely(localkeylen == 1))
           {
@@ -3372,8 +3360,11 @@ Dbacc::getElement(Signal* signal, Operat
           {
             localkey2 = gePageptr.p->word32[pos + tgeForward];
           }
+          possible_match = true;
         }
-        if (hashValuePart == opHashValuePart) {
+        if (possible_match &&
+            operationRecPtr.p->hashValue.match(fragrecptr.p->level.enlarge(reducedHashValue, bucket_number)))
+        {
           jam();
           bool found;
           if (! searchLocalKey) 
@@ -3514,8 +3505,7 @@ void Dbacc::commitdelete(Signal* signal)
   // We thus update the element header to ensure we log an unlocked element. We do not
   // need to restore it later since it is deleted immediately anyway.
   /* --------------------------------------------------------------------------------- */
-  const Uint32 hv = operationRecPtr.p->hashvaluePart;
-  const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
+  const Uint32 eh = ElementHeader::setUnlocked(0, LHBits16());
   delPageptr.p->word32[tdelElementptr] = eh;
   if (operationRecPtr.p->elementPage == lastPageptr.i) {
     if (operationRecPtr.p->elementPointer == tlastElementptr) {
@@ -3588,8 +3578,7 @@ void Dbacc::deleteElement(Signal* signal
       // An undo of the delete will reinstall the moved record. We have to ensure that the
       // lock is removed to ensure that no such thing happen.
       /* --------------------------------------------------------------------------------- */
-      Uint32 eh = ElementHeader::setUnlocked(deOperationRecPtr.p->hashvaluePart,
-					     0);
+      Uint32 eh = ElementHeader::setUnlocked(0, LHBits16());
       lastPageptr.p->word32[tlastElementptr] = eh;
     }//if
     return;
@@ -4303,8 +4292,7 @@ void Dbacc::abortOperation(Signal* signa
 
         taboElementptr = operationRecPtr.p->elementPointer;
         aboPageidptr.i = operationRecPtr.p->elementPage;
-        tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
-					     operationRecPtr.p->scanBits);
+        tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->scanBits, operationRecPtr.p->reducedHashValue);
         ptrCheckGuard(aboPageidptr, cpagesize, page8);
         dbgWord32(aboPageidptr, taboElementptr, tmp2Olq);
         arrGuard(taboElementptr, 2048);
@@ -4336,7 +4324,7 @@ Dbacc::commitDeleteCheck()
   OperationrecPtr deleteOpPtr;
   Uint32 elementDeleted = 0;
   bool deleteCheckOngoing = true;
-  Uint32 hashValue = 0;
+  LHBits32 hashValue;
   lastOpPtr = operationRecPtr;
   opPtr.i = operationRecPtr.p->nextParallelQue;
   while (opPtr.i != RNIL) {
@@ -4463,8 +4451,7 @@ void Dbacc::commitOperation(Signal* sign
       
       coPageidptr.i = operationRecPtr.p->elementPage;
       tcoElementptr = operationRecPtr.p->elementPointer;
-      tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
-					   operationRecPtr.p->scanBits);   
+      tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->scanBits, operationRecPtr.p->reducedHashValue);
       ptrCheckGuard(coPageidptr, cpagesize, page8);
       dbgWord32(coPageidptr, tcoElementptr, tmp2Olq);
       arrGuard(tcoElementptr, 2048);
@@ -4785,7 +4772,7 @@ Dbacc::release_lockowner(Signal* signal,
     newOwner.p->elementPointer = opPtr.p->elementPointer;
     newOwner.p->elementContainer = opPtr.p->elementContainer;
     newOwner.p->scanBits = opPtr.p->scanBits;
-    newOwner.p->hashvaluePart = opPtr.p->hashvaluePart;
+    newOwner.p->reducedHashValue = opPtr.p->reducedHashValue;
     newOwner.p->m_op_bits |= (opbits & Operationrec::OP_ELEMENT_DISAPPEARED);
     if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
     {
@@ -5095,7 +5082,7 @@ void Dbacc::allocOverflowPage(Signal* si
 /* BE EXPANDED ACORDING TO LH3,    */
 /* AND COMMIT TRANSACTION PROCESS  */
 /* WILL BE CONTINUED */
-Uint32 Dbacc::checkScanExpand(Signal* signal)
+Uint32 Dbacc::checkScanExpand(Signal* signal, Uint32 splitBucket)
 {
   Uint32 Ti;
   Uint32 TreturnCode = 0;
@@ -5108,7 +5095,7 @@ Uint32 Dbacc::checkScanExpand(Signal* si
   Page8Ptr TPageptr;
   ScanRecPtr TscanPtr;
 
-  TSplit = fragrecptr.p->p;
+  TSplit = splitBucket;
   for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
     TreleaseScanIndicator[Ti] = 0;
     if (fragrecptr.p->scan[Ti] != RNIL) {
@@ -5162,8 +5149,8 @@ Uint32 Dbacc::checkScanExpand(Signal* si
   }//for
   if (TreleaseInd == 1) {
     TreleaseScanBucket = TSplit;
-    TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1);	/* PAGE INDEX OBS K = 6 */
-    TDirInd = TreleaseScanBucket >> fragrecptr.p->k;	/* DIRECTORY INDEX OBS K = 6 */
+    TPageIndex = fragrecptr.p->getPageIndex(TreleaseScanBucket);
+    TDirInd = fragrecptr.p->getPageNumber(TreleaseScanBucket);
     TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd);
     ptrCheckGuard(TPageptr, cpagesize, page8);
     for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
@@ -5192,11 +5179,9 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
 
   fragrecptr.i = signal->theData[0];
   tresult = 0;	/* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
-  Uint32 tmp = 1;
-  tmp = tmp << 31;
   ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
-  fragrecptr.p->expandFlag = 0;
-  if (fragrecptr.p->slack < tmp) {
+  fragrecptr.p->expandOrShrinkQueued = false;
+  if (fragrecptr.p->slack > 0) {
     jam();
     /* IT MEANS THAT IF SLACK > ZERO */
     /*--------------------------------------------------------------*/
@@ -5232,7 +5217,24 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
     /*--------------------------------------------------------------*/
     return;
   }//if
-  if (checkScanExpand(signal) == 1) {
+
+  if (fragrecptr.p->level.isFull())
+  {
+    jam();
+    /*
+     * The level structure does not allow more buckets.
+     * Do not expand.
+     */
+    return;
+  }
+
+  Uint32 splitBucket;
+  Uint32 receiveBucket;
+
+  bool doSplit = fragrecptr.p->level.getSplitBucket(splitBucket, receiveBucket);
+
+  // Check that splitted bucket is not currently scanned
+  if (doSplit && checkScanExpand(signal, splitBucket) == 1) {
     jam();
     /*--------------------------------------------------------------*/
     // A scan state was inconsistent with performing an expand
@@ -5247,9 +5249,9 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
   /*       THE NEXT HASH BIT. THIS BIT IS USED IN THE SPLIT MECHANISM TO      */
   /*       DECIDE WHICH ELEMENT GOES WHERE.                                   */
   /*--------------------------------------------------------------------------*/
-  texpReceivedBucket = (fragrecptr.p->maxp + fragrecptr.p->p) + 1;	/* RECEIVED BUCKET */
-  texpDirInd = texpReceivedBucket >> fragrecptr.p->k;
-  if ((texpReceivedBucket & ((1 << fragrecptr.p->k) - 1)) == 0)
+
+  texpDirInd = fragrecptr.p->getPageNumber(receiveBucket);
+  if (fragrecptr.p->getPageIndex(receiveBucket) == 0)
   { // Need new bucket
     expPageptr.i = RNIL;
   }
@@ -5262,15 +5264,6 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
   }
   if (expPageptr.i == RNIL) {
     jam();
-    // We cannot expand if the new page index cannot be
-    // represented in the stored hash bits.
-    if (texpDirInd > ElementHeader::HASH_VALUE_PART_MASK)
-    {
-      jam();
-      fragrecptr.p->dirRangeFull = ZTRUE;
-      tresult = ZDIR_RANGE_FULL_ERROR;
-      return;
-    }
     seizePage(signal);
     if (tresult > ZLIMIT_OF_ERROR) {
       jam();
@@ -5279,6 +5272,7 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
     if (!setPagePtr(fragrecptr.p->directory, texpDirInd, spPageptr.i))
     {
       jam();
+      // TODO: should release seized page
       tresult = ZDIR_RANGE_FULL_ERROR;
       return;
     }
@@ -5291,13 +5285,13 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
   }//if
 
   fragrecptr.p->expReceivePageptr = expPageptr.i;
-  fragrecptr.p->expReceiveIndex = texpReceivedBucket & ((1 << fragrecptr.p->k) - 1);
+  fragrecptr.p->expReceiveIndex = fragrecptr.p->getPageIndex(receiveBucket);
   /*--------------------------------------------------------------------------*/
   /*       THE NEXT ACTION IS TO FIND THE PAGE, THE PAGE INDEX AND THE PAGE   */
   /*       DIRECTORY OF THE BUCKET TO BE SPLIT.                               */
   /*--------------------------------------------------------------------------*/
-  cexcPageindex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1);	/* PAGE INDEX OBS K = 6 */
-  texpDirInd = fragrecptr.p->p >> fragrecptr.p->k;	/* DIRECTORY INDEX OBS K = 6 */
+  cexcPageindex = fragrecptr.p->getPageIndex(splitBucket);
+  texpDirInd = fragrecptr.p->getPageNumber(splitBucket);
   excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
 #ifdef VM_TRACE
   require(excPageptr.i != RNIL);
@@ -5318,71 +5312,27 @@ void Dbacc::execEXPANDCHECK2(Signal* sig
   
 void Dbacc::endofexpLab(Signal* signal) 
 {
-  fragrecptr.p->p++;
   fragrecptr.p->slack += fragrecptr.p->maxloadfactor;
   fragrecptr.p->expandCounter++;
-  if (fragrecptr.p->p > fragrecptr.p->maxp) {
-    jam();
-    fragrecptr.p->maxp = (fragrecptr.p->maxp << 1) | 1;
-    fragrecptr.p->hashcheckbit++;
-    fragrecptr.p->p = 0;
-  }//if
-  Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p;
+  fragrecptr.p->level.expand();
+  Uint32 noOfBuckets = fragrecptr.p->level.getSize();
   Uint32 Thysteres = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor;
-  fragrecptr.p->slackCheck = noOfBuckets * Thysteres;
-  if (fragrecptr.p->slack > (1u << 31)) {
+  fragrecptr.p->slackCheck = Int64(noOfBuckets) * Thysteres;
+  if (fragrecptr.p->slack < 0 && !fragrecptr.p->level.isFull())
+  {
     jam();
     /* IT MEANS THAT IF SLACK < ZERO */
     /* --------------------------------------------------------------------------------- */
     /*       IT IS STILL NECESSARY TO EXPAND THE FRAGMENT EVEN MORE. START IT FROM HERE  */
     /*       WITHOUT WAITING FOR NEXT COMMIT ON THE FRAGMENT.                            */
     /* --------------------------------------------------------------------------------- */
-    fragrecptr.p->expandFlag = 2;
     signal->theData[0] = fragrecptr.i;
-    signal->theData[1] = fragrecptr.p->p;
-    signal->theData[2] = fragrecptr.p->maxp;
-    sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
+    fragrecptr.p->expandOrShrinkQueued = true;
+    sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 1, JBB);
   }//if
   return;
 }//Dbacc::endofexpLab()
 
-void Dbacc::reenable_expand_after_redo_log_exection_complete(Signal* signal){
-
-  tabptr.i = signal->theData[0];
-  Uint32 fragId = signal->theData[1];
-
-  ptrCheckGuard(tabptr, ctablesize, tabrec);
-  ndbrequire(getfragmentrec(signal, fragrecptr, fragId));
-#if 0
-  ndbout_c("reenable expand check for table %d fragment: %d", 
-	   tabptr.i, fragId);
-#endif
-
-  switch(fragrecptr.p->expandFlag){
-  case 0:
-    /**
-     * Hmm... this means that it's alreay has been reenabled...
-     */
-    fragrecptr.p->expandFlag = 1;
-    break;
-  case 1:
-    /**
-     * Nothing is going on start expand check
-     */
-  case 2:
-    /**
-     * A shrink is running, do expand check anyway
-     *  (to reset expandFlag)
-     */
-    fragrecptr.p->expandFlag = 2; 
-    signal->theData[0] = fragrecptr.i;
-    signal->theData[1] = fragrecptr.p->p;
-    signal->theData[2] = fragrecptr.p->maxp;
-    sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
-    break;
-  }
-}
-
 void Dbacc::execDEBUG_SIG(Signal* signal) 
 {
   jamEntry();
@@ -5392,6 +5342,86 @@ void Dbacc::execDEBUG_SIG(Signal* signal
   return;
 }//Dbacc::execDEBUG_SIG()
 
+LHBits32 Dbacc::getElementHash(OperationrecPtr& oprec)
+{
+  jam();
+  ndbassert(!oprec.isNull());
+
+  // Only calculate hash value if operation does not already have a complete hash value
+  if (oprec.p->hashValue.valid_bits() < fragrecptr.p->MAX_HASH_VALUE_BITS)
+  {
+    jam();
+    Uint32 localkey[2];
+    localkey[0] = oprec.p->localdata[0];
+    localkey[1] = oprec.p->localdata[1];
+    Uint32 len = readTablePk(localkey[0], localkey[1], ElementHeader::setLocked(oprec.i), oprec);
+    if (len > 0)
+      oprec.p->hashValue = LHBits32(md5_hash((Uint64*)ckeys, len));
+  }
+  return oprec.p->hashValue;
+}
+
+LHBits32 Dbacc::getElementHash(Uint32 const* elemptr, Int32 forward)
+{
+  jam();
+  assert(ElementHeader::getUnlocked(*elemptr));
+
+  Uint32 elemhead = *elemptr;
+  Uint32 localkey[2];
+  elemptr += forward;
+  localkey[0] = *elemptr;
+  if (likely(fragrecptr.p->localkeylen == 1))
+  {
+    jam();
+    localkey[1] = Local_key::ref2page_idx(localkey[0]);
+    localkey[0] = Local_key::ref2page_id(localkey[0]);
+  }
+  else
+  {
+    jam();
+    elemptr += forward;
+    localkey[1] = *elemptr;
+  }
+  OperationrecPtr oprec;
+  oprec.i = RNIL;
+  Uint32 len = readTablePk(localkey[0], localkey[1], elemhead, oprec);
+  if (len > 0)
+  {
+    jam();
+    return LHBits32(md5_hash((Uint64*)ckeys, len));
+  }
+  else
+  { // Return an invalid hash value if no data
+    jam();
+    return LHBits32();
+  }
+}
+
+LHBits32 Dbacc::getElementHash(Uint32 const* elemptr, Int32 forward, OperationrecPtr& oprec)
+{
+  jam();
+
+  if (!oprec.isNull())
+  {
+    jam();
+    return getElementHash(oprec);
+  }
+
+  Uint32 elemhead = *elemptr;
+  if (ElementHeader::getUnlocked(elemhead))
+  {
+    jam();
+    return getElementHash(elemptr, forward);
+  }
+  else
+  {
+    jam();
+    oprec.i = ElementHeader::getOpPtrI(elemhead);
+    ptrCheckGuard(oprec, coprecsize, operationrec);
+    return getElementHash(oprec);
+  }
+}
+
 /* --------------------------------------------------------------------------------- */
 /* EXPANDCONTAINER                                                                   */
 /*        INPUT: EXC_PAGEPTR (POINTER TO THE ACTIVE PAGE RECORD)                     */
@@ -5402,7 +5432,7 @@ void Dbacc::execDEBUG_SIG(Signal* signal
 /* --------------------------------------------------------------------------------- */
 void Dbacc::expandcontainer(Signal* signal) 
 {
-  Uint32 texcHashvalue;
+  LHBits32 texcHashvalue;
   Uint32 texcTmp;
   Uint32 texcIndex;
   Uint32 guard20;
@@ -5444,17 +5474,42 @@ void Dbacc::expandcontainer(Signal* sign
   /* --------------------------------------------------------------------------------- */
   arrGuard(cexcElementptr, 2048);
   tidrElemhead = excPageptr.p->word32[cexcElementptr];
-  if (ElementHeader::getUnlocked(tidrElemhead)){
-    jam();
-    texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead);
-  } else {
+  bool move;
+  if (ElementHeader::getLocked(tidrElemhead))
+  {
     jam();
     idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
     ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
-    texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
-  }//if
-  if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
+    ndbassert(idrOperationRecPtr.p->reducedHashValue.valid_bits() >= 1);
+    move = idrOperationRecPtr.p->reducedHashValue.get_bit(1);
+    idrOperationRecPtr.p->reducedHashValue.shift_out();
+    if (!fragrecptr.p->enough_valid_bits(idrOperationRecPtr.p->reducedHashValue))
+    {
+      jam();
+      idrOperationRecPtr.p->reducedHashValue =
+        fragrecptr.p->level.reduce(getElementHash(idrOperationRecPtr));
+    }
+  }
+  else
+  {
+    jam();
+    LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tidrElemhead);
+    ndbassert(reducedHashValue.valid_bits() >= 1);
+    move = reducedHashValue.get_bit(1);
+    reducedHashValue.shift_out();
+    if (!fragrecptr.p->enough_valid_bits(reducedHashValue))
+    {
+      jam();
+      reducedHashValue =
+        fragrecptr.p->level.reduce(getElementHash(&excPageptr.p->word32[cexcElementptr], cexcForward));
+    }
+    tidrElemhead = ElementHeader::setReducedHashValue(tidrElemhead, reducedHashValue);
+  }
+  if (!move)
+  {
     jam();
+    if (ElementHeader::getUnlocked(tidrElemhead))
+      excPageptr.p->word32[cexcElementptr] = tidrElemhead;
     /* --------------------------------------------------------------------------------- */
     /*       THIS ELEMENT IS NOT TO BE MOVED. WE CALCULATE THE WHEREABOUTS OF THE NEXT   */
     /*       ELEMENT AND PROCEED WITH THAT OR END THE SEARCH IF THERE ARE NO MORE        */
@@ -5519,17 +5574,41 @@ void Dbacc::expandcontainer(Signal* sign
   ptrNull(idrOperationRecPtr);
   arrGuard(tlastElementptr, 2048);
   tidrElemhead = lastPageptr.p->word32[tlastElementptr];
-  if (ElementHeader::getUnlocked(tidrElemhead)) {
-    jam();
-    texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead);
-  } else {
+  if (ElementHeader::getLocked(tidrElemhead))
+  {
     jam();
     idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
     ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
-    texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
-  }//if
-  if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
+    ndbassert(idrOperationRecPtr.p->reducedHashValue.valid_bits() >= 1);
+    move = idrOperationRecPtr.p->reducedHashValue.get_bit(1);
+    idrOperationRecPtr.p->reducedHashValue.shift_out();
+    if (!fragrecptr.p->enough_valid_bits(idrOperationRecPtr.p->reducedHashValue))
+    {
+      jam();
+      idrOperationRecPtr.p->reducedHashValue =
+        fragrecptr.p->level.reduce(getElementHash(idrOperationRecPtr));
+    }
+  }
+  else
+  {
     jam();
+    LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tidrElemhead);
+    ndbassert(reducedHashValue.valid_bits() > 0);
+    move = reducedHashValue.get_bit(1);
+    reducedHashValue.shift_out();
+    if (!fragrecptr.p->enough_valid_bits(reducedHashValue))
+    {
+      jam();
+      reducedHashValue =
+        fragrecptr.p->level.reduce(getElementHash(&lastPageptr.p->word32[tlastElementptr], tlastForward));
+    }
+    tidrElemhead = ElementHeader::setReducedHashValue(tidrElemhead, reducedHashValue);
+  }
+  if (!move)
+  {
+    jam();
+    if (ElementHeader::getUnlocked(tidrElemhead))
+      lastPageptr.p->word32[tlastElementptr] = tidrElemhead;
     /* --------------------------------------------------------------------------------- */
     /*       THE LAST ELEMENT IS NOT TO BE MOVED. WE COPY IT TO THE CURRENT ELEMENT.     */
     /* --------------------------------------------------------------------------------- */
@@ -5602,7 +5681,7 @@ void Dbacc::expandcontainer(Signal* sign
 /* WILL BE JOINED  ACORDING TO LH3 */
 /* AND COMMIT TRANSACTION PROCESS  */
 /* WILL BE CONTINUED */
-Uint32 Dbacc::checkScanShrink(Signal* signal)
+Uint32 Dbacc::checkScanShrink(Signal* signal, Uint32 sourceBucket, Uint32 destBucket)
 {
   Uint32 Ti;
   Uint32 TreturnCode = 0;
@@ -5616,14 +5695,8 @@ Uint32 Dbacc::checkScanShrink(Signal* si
   Page8Ptr TPageptr;
   ScanRecPtr TscanPtr;
 
-  if (fragrecptr.p->p == 0) {
-    jam();
-    TmergeDest = fragrecptr.p->maxp >> 1;
-  } else {
-    jam();
-    TmergeDest = fragrecptr.p->p - 1;
-  }//if
-  TmergeSource = fragrecptr.p->maxp + fragrecptr.p->p;
+  TmergeDest = destBucket;
+  TmergeSource = sourceBucket;
   for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
     TreleaseScanIndicator[Ti] = 0;
     if (fragrecptr.p->scan[Ti] != RNIL) {
@@ -5677,8 +5750,8 @@ Uint32 Dbacc::checkScanShrink(Signal* si
   if (TreleaseInd == 1) {
     jam();
     TreleaseScanBucket = TmergeSource;
-    TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1);	/* PAGE INDEX OBS K = 6 */
-    TDirInd = TreleaseScanBucket >> fragrecptr.p->k;	/* DIRECTORY INDEX OBS K = 6 */
+    TPageIndex = fragrecptr.p->getPageIndex(TreleaseScanBucket);
+    TDirInd = fragrecptr.p->getPageNumber(TreleaseScanBucket);
     TPageptr.i = getPagePtr(fragrecptr.p->directory, TDirInd);
     ptrCheckGuard(TPageptr, cpagesize, page8);
     for (Ti = 0; Ti < MAX_PARALLEL_SCANS_PER_FRAG; Ti++) {
@@ -5718,9 +5791,8 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
 
   jamEntry();
   fragrecptr.i = signal->theData[0];
-  Uint32 oldFlag = signal->theData[3];
   ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
-  fragrecptr.p->expandFlag = oldFlag;
+  fragrecptr.p->expandOrShrinkQueued = false;
   tresult = 0;	/* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
   if (fragrecptr.p->slack <= fragrecptr.p->slackCheck) {
     jam();
@@ -5730,7 +5802,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
     /*--------------------------------------------------------------*/
     return;
   }//if
-  if (fragrecptr.p->slack > (1u << 31)) {
+  if (fragrecptr.p->slack < 0) {
     jam();
     /*--------------------------------------------------------------*/
     /* THE SLACK IS NEGATIVE, IN THIS CASE WE WILL NOT NEED ANY     */
@@ -5738,7 +5810,6 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
     /*--------------------------------------------------------------*/
     return;
   }//if
-  texpDirInd = (fragrecptr.p->maxp + fragrecptr.p->p) >> fragrecptr.p->k;
   if (fragrecptr.p->firstOverflowRec == RNIL) {
     jam();
     allocOverflowPage(signal);
@@ -5757,7 +5828,26 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
     /*--------------------------------------------------------------*/
     return;
   }//if
-  if (checkScanShrink(signal) == 1) {
+
+  if (fragrecptr.p->level.isEmpty())
+  {
+    jam();
+    /* no need to shrink empty hash table */
+    return;
+  }
+
+  // Since expandCounter guards more shrinks than expands and
+  // all fragments starts with a full page of buckets
+  ndbassert(fragrecptr.p->getPageNumber(fragrecptr.p->level.getTop()) > 0);
+
+  Uint32 mergeSourceBucket;
+  Uint32 mergeDestBucket;
+  bool doMerge = fragrecptr.p->level.getMergeBuckets(mergeSourceBucket, mergeDestBucket);
+
+  ndbassert(doMerge); // Merge always needed since we never shrink below one page of buckets
+
+  /* check that neither of source or destination bucket are currently scanned */
+  if (doMerge && checkScanShrink(signal, mergeSourceBucket, mergeDestBucket) == 1) {
     jam();
     /*--------------------------------------------------------------*/
     // A scan state was inconsistent with performing a shrink
@@ -5765,15 +5855,6 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
     /*--------------------------------------------------------------*/
     return;
   }//if
-  if (fragrecptr.p->p == 0) {
-    jam();
-    fragrecptr.p->maxp = fragrecptr.p->maxp >> 1;
-    fragrecptr.p->p = fragrecptr.p->maxp;
-    fragrecptr.p->hashcheckbit--;
-  } else {
-    jam();
-    fragrecptr.p->p--;
-  }//if
   
   if (ERROR_INSERTED(3002))
     debug_lh_vars("SHR");
@@ -5782,12 +5863,14 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
     fragrecptr.p->dirRangeFull = ZFALSE;
   }
 
+  shrink_adjust_reduced_hash_value(mergeDestBucket);
+
   /*--------------------------------------------------------------------------*/
   /*       WE START BY FINDING THE NECESSARY INFORMATION OF THE BUCKET TO BE  */
   /*       REMOVED WHICH WILL SEND ITS ELEMENTS TO THE RECEIVING BUCKET.      */
   /*--------------------------------------------------------------------------*/
-  cexcPageindex = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) & ((1 << fragrecptr.p->k) - 1);
-  texpDirInd = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) >> fragrecptr.p->k;
+  cexcPageindex = fragrecptr.p->getPageIndex(mergeSourceBucket);
+  texpDirInd = fragrecptr.p->getPageNumber(mergeSourceBucket);
   excPageptr.i = getPagePtr(fragrecptr.p->directory, texpDirInd);
   fragrecptr.p->expSenderIndex = cexcPageindex;
   fragrecptr.p->expSenderPageptr = excPageptr.i;
@@ -5796,9 +5879,9 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
   /*       WE NOW PROCEED BY FINDING THE NECESSARY INFORMATION ABOUT THE      */
   /*       RECEIVING BUCKET.                                                  */
   /*--------------------------------------------------------------------------*/
-  texpReceivedBucket = fragrecptr.p->p >> fragrecptr.p->k;
-  fragrecptr.p->expReceivePageptr = getPagePtr(fragrecptr.p->directory, texpReceivedBucket);
-  fragrecptr.p->expReceiveIndex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1);
+  texpDirInd = fragrecptr.p->getPageNumber(mergeDestBucket);
+  fragrecptr.p->expReceivePageptr = getPagePtr(fragrecptr.p->directory, texpDirInd);
+  fragrecptr.p->expReceiveIndex = fragrecptr.p->getPageIndex(mergeDestBucket);
   fragrecptr.p->expReceiveForward = ZTRUE;
   if (excPageptr.i == RNIL) {
     jam();
@@ -5904,6 +5987,7 @@ void Dbacc::execSHRINKCHECK2(Signal* sig
 
 void Dbacc::endofshrinkbucketLab(Signal* signal) 
 {
+  fragrecptr.p->level.shrink();
   fragrecptr.p->expandCounter--;
   fragrecptr.p->slack -= fragrecptr.p->maxloadfactor;
   if (fragrecptr.p->expSenderIndex == 0) {
@@ -5915,7 +5999,7 @@ void Dbacc::endofshrinkbucketLab(Signal*
       releasePage(signal);
       unsetPagePtr(fragrecptr.p->directory, fragrecptr.p->expSenderDirIndex);
     }//if
-    if (((((fragrecptr.p->p + fragrecptr.p->maxp) + 1) >> fragrecptr.p->k) & 0xff) == 0) {
+    if ((fragrecptr.p->getPageNumber(fragrecptr.p->level.getSize()) & 0xff) == 0) {
       jam();
       DynArr256 dir(directoryPool, fragrecptr.p->directory);
       DynArr256::ReleaseIterator iter;
@@ -5933,15 +6017,15 @@ void Dbacc::endofshrinkbucketLab(Signal*
       }
     }//if
   }//if
-  if (fragrecptr.p->slack < (1u << 31)) {
+  if (fragrecptr.p->slack > 0) {
     jam();
     /*--------------------------------------------------------------*/
     /* THE SLACK IS POSITIVE, IN THIS CASE WE WILL CHECK WHETHER    */
     /* WE WILL CONTINUE PERFORM ANOTHER SHRINK.                     */
     /*--------------------------------------------------------------*/
-    Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p;
+    Uint32 noOfBuckets = fragrecptr.p->level.getSize();
     Uint32 Thysteresis = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor;
-    fragrecptr.p->slackCheck = noOfBuckets * Thysteresis;
+    fragrecptr.p->slackCheck = Int64(noOfBuckets) * Thysteresis;
     if (fragrecptr.p->slack > Thysteresis) {
       /*--------------------------------------------------------------*/
       /*       IT IS STILL NECESSARY TO SHRINK THE FRAGMENT MORE. THIS*/
@@ -5960,16 +6044,13 @@ void Dbacc::endofshrinkbucketLab(Signal*
 	/*       WAS REMOVED 2000-05-12.                                */
 	/*--------------------------------------------------------------*/
         signal->theData[0] = fragrecptr.i;
-        signal->theData[1] = fragrecptr.p->p;
-        signal->theData[2] = fragrecptr.p->maxp;
-        signal->theData[3] = fragrecptr.p->expandFlag;
-	ndbrequire(fragrecptr.p->expandFlag < 2);
-        fragrecptr.p->expandFlag = 2;
-        sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB);
+        ndbrequire(!fragrecptr.p->expandOrShrinkQueued);
+        fragrecptr.p->expandOrShrinkQueued = true;
+        sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 1, JBB);
       }//if
     }//if
   }//if
-  ndbrequire(fragrecptr.p->maxp >= (Uint32)((1 << fragrecptr.p->k) - 1));
+  ndbrequire(fragrecptr.p->getPageNumber(fragrecptr.p->level.getSize()) > 0);
   return;
 }//Dbacc::endofshrinkbucketLab()
 
@@ -5980,9 +6061,121 @@ void Dbacc::endofshrinkbucketLab(Signal*
 /*               CEXC_CONTAINERPTR (ARRAY INDEX OF THE CONTAINER).                   */
 /*               CEXC_FORWARD (CONTAINER FORWARD (+1) OR BACKWARD (-1))              */
 /*                                                                                   */
-/*        DESCRIPTION: ALL ELEMENTS OF THE ACTIVE CONTAINER HAVE TO MOVE TO THE NEW  */
-/*                  CONTAINER.                                                       */
+/*        DESCRIPTION: SCAN ALL ELEMENTS IN DESTINATION BUCKET BEFORE MERGE          */
+/*               AND ADJUST THE STORED REDUCED HASH VALUE (SHIFT IN ZERO).           */
 /* --------------------------------------------------------------------------------- */
+void
+Dbacc::shrink_adjust_reduced_hash_value(Uint32 bucket_number)
+{
+  /*
+   * Note: function are a copy paste from getElement() with modified inner loop
+   * instead of finding a specific element, scan through all and modify.
+   */
+  Uint32 tgeElementHeader;
+  Uint32 tgeElemStep;
+  Uint32 tgeContainerhead;
+  Uint32 tgePageindex;
+  Uint32 tgeActivePageDir;
+  Uint32 tgeNextptrtype;
+  register Uint32 tgeRemLen;
+  register Uint32 TelemLen = fragrecptr.p->elementLength;
+  const Uint32 localkeylen = fragrecptr.p->localkeylen;
+
+  tgePageindex = fragrecptr.p->getPageIndex(bucket_number);
+  gePageptr.i = getPagePtr(fragrecptr.p->directory, fragrecptr.p->getPageNumber(bucket_number));
+  ptrCheckGuard(gePageptr, cpagesize, page8);
+
+  ndbrequire(TelemLen == ZELEM_HEAD_SIZE + localkeylen);
+  tgeNextptrtype = ZLEFT;
+
+  /* Loop through all containers in a bucket */
+  do {
+    tgeContainerptr = mul_ZBUF_SIZE(tgePageindex);
+    if (tgeNextptrtype == ZLEFT)
+    {
+      jam();
+      tgeContainerptr = tgeContainerptr + ZHEAD_SIZE;
+      tgeElementptr = tgeContainerptr + ZCON_HEAD_SIZE;
+      tgeElemStep = TelemLen;
+      tgeForward = 1;
+      ndbrequire(tgeContainerptr < 2048);
+      tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
+      ndbrequire((tgeContainerptr + tgeRemLen - 1) < 2048);
+    }
+    else if (tgeNextptrtype == ZRIGHT)
+    {
+      jam();
+      tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
+      tgeElementptr = tgeContainerptr - 1;
+      tgeElemStep = 0 - TelemLen;
+      tgeForward = (Uint32)-1;
+      ndbrequire(tgeContainerptr < 2048);
+      tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
+      ndbrequire((tgeContainerptr - tgeRemLen) < 2048);
+    }
+    else
+    {
+      jam();
+      jamLine(tgeNextptrtype);
+      ndbrequire(false);
+    }//if
+    if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen)
+    {
+      ndbrequire(tgeRemLen <= ZBUF_SIZE);
+      /* ------------------------------------------------------------------- */
+      /* Loop through all elements in a container */
+      do
+      {
+        tgeElementHeader = gePageptr.p->word32[tgeElementptr];
+        tgeRemLen = tgeRemLen - TelemLen;
+        /*
+         * Adjust the stored reduced hash value for element, shifting in a zero
+         */
+        if (ElementHeader::getLocked(tgeElementHeader))
+        {
+          jam();
+          OperationrecPtr oprec;
+          oprec.i = ElementHeader::getOpPtrI(tgeElementHeader);
+          ptrCheckGuard(oprec, coprecsize, operationrec);
+          oprec.p->reducedHashValue.shift_in(false);
+        }
+        else
+        {
+          jam();
+          LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tgeElementHeader);
+          reducedHashValue.shift_in(false);
+          tgeElementHeader = ElementHeader::setReducedHashValue(tgeElementHeader, reducedHashValue);
+          gePageptr.p->word32[tgeElementptr] = tgeElementHeader;
+        }
+        if (tgeRemLen <= ZCON_HEAD_SIZE)
+        {
+          break;
+        }
+        tgeElementptr = tgeElementptr + tgeElemStep;
+      } while (true);
+    }//if
+    ndbrequire(tgeRemLen == ZCON_HEAD_SIZE);
+    tgeContainerhead = gePageptr.p->word32[tgeContainerptr];
+    tgeNextptrtype = (tgeContainerhead >> 7) & 0x3;
+    if (tgeNextptrtype == 0)
+    {
+      jam();
+      return;	/* NO MORE CONTAINER */
+    }//if
+    tgePageindex = tgeContainerhead & 0x7f;	/* NEXT CONTAINER PAGE INDEX 7 BITS */
+    ndbrequire(tgePageindex <= ZEMPTYLIST);
+    if (((tgeContainerhead >> 9) & 1) == ZFALSE)
+    {
+      jam();
+      tgeActivePageDir = gePageptr.p->word32[tgeContainerptr + 1];	/* NEXT PAGE ID */
+      gePageptr.i = getPagePtr(fragrecptr.p->overflowdir, tgeActivePageDir);
+      ptrCheckGuard(gePageptr, cpagesize, page8);
+    }//if
+  } while (1);
+
+  return;
+}//Dbacc::shrink_adjust_reduced_hash_value()
+
 void Dbacc::shrinkcontainer(Signal* signal) 
 {
   Uint32 tshrElementptr;
@@ -5991,7 +6184,6 @@ void Dbacc::shrinkcontainer(Signal* sign
   Uint32 tshrTmp;
   Uint32 tshrIndex;
   Uint32 guard21;
-
   tshrRemLen = cexcContainerlen - ZCON_HEAD_SIZE;
   tshrInc = fragrecptr.p->elementLength;
   if (cexcForward == ZTRUE) {
@@ -6020,7 +6212,14 @@ void Dbacc::shrinkcontainer(Signal* sign
     /* --------------------------------------------------------------------------------- */
     idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
     ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
+    idrOperationRecPtr.p->reducedHashValue.shift_in(true);
   }//if
+  else
+  {
+    LHBits16 reducedHashValue = ElementHeader::getReducedHashValue(tidrElemhead);
+    reducedHashValue.shift_in(true);
+    tidrElemhead = ElementHeader::setReducedHashValue(tidrElemhead, reducedHashValue);
+  }
   tshrTmp = tshrElementptr + cexcForward;
   guard21 = fragrecptr.p->localkeylen - 1;
   for (tshrIndex = 0; tshrIndex <= guard21; tshrIndex++) {
@@ -6094,7 +6293,6 @@ void Dbacc::initFragAdd(Signal* signal,
                         FragmentrecPtr regFragPtr) 
 {
   const AccFragReq * const req = (AccFragReq*)&signal->theData[0];  
-  Uint32 lhFragBits = req->lhFragBits + 1;
   Uint32 minLoadFactor = (req->minLoadFactor * ZBUF_SIZE) / 100;
   Uint32 maxLoadFactor = (req->maxLoadFactor * ZBUF_SIZE) / 100;
   if (ERROR_INSERTED(3003)) // use small LoadFactors to force sparse hash table
@@ -6112,7 +6310,7 @@ void Dbacc::initFragAdd(Signal* signal,
   regFragPtr.p->myfid = req->fragId;
   regFragPtr.p->myTableId = req->tableId;
   ndbrequire(req->kValue == 6);
-  regFragPtr.p->k = req->kValue;	/* TK_SIZE = 6 IN THIS VERSION */
+  ndbrequire(req->kValue == regFragPtr.p->k);
   regFragPtr.p->expandCounter = 0;
 
   /**
@@ -6121,24 +6319,20 @@ void Dbacc::initFragAdd(Signal* signal,
    *
    * Is later restored to 0 by LQH at end of REDO log execution
    */
-  regFragPtr.p->expandFlag = 0;
-  regFragPtr.p->p = 0;
-  regFragPtr.p->maxp = (1 << req->kValue) - 1;
+  regFragPtr.p->expandOrShrinkQueued = false;
+  regFragPtr.p->level.setSize(1 << req->kValue);
   regFragPtr.p->minloadfactor = minLoadFactor;
   regFragPtr.p->maxloadfactor = maxLoadFactor;
-  regFragPtr.p->slack = (regFragPtr.p->maxp + 1) * maxLoadFactor;
-  regFragPtr.p->lhfragbits = lhFragBits;
-  regFragPtr.p->hashcheckbit = 0; //lhFragBits;
+  regFragPtr.p->slack = Int64(regFragPtr.p->level.getSize()) * maxLoadFactor;
   regFragPtr.p->localkeylen = req->localKeyLen;
   regFragPtr.p->nodetype = (req->reqInfo >> 4) & 0x3;
   regFragPtr.p->lastOverIndex = 0;
   regFragPtr.p->keyLength = req->keyLength;
   ndbrequire(req->keyLength != 0);
   regFragPtr.p->elementLength = ZELEM_HEAD_SIZE + regFragPtr.p->localkeylen;
-  Uint32 Tmp1 = (regFragPtr.p->maxp + 1) + regFragPtr.p->p;
+  Uint32 Tmp1 = regFragPtr.p->level.getSize();
   Uint32 Tmp2 = regFragPtr.p->maxloadfactor - regFragPtr.p->minloadfactor;
-  Tmp2 = Tmp1 * Tmp2;
-  regFragPtr.p->slackCheck = Tmp2;
+  regFragPtr.p->slackCheck = Int64(Tmp1) * Tmp2;
   regFragPtr.p->mytabptr = req->tableId;
   regFragPtr.p->roothashcheck = req->kValue + req->lhFragBits;
   regFragPtr.p->noOfElements = 0;
@@ -6318,17 +6512,14 @@ void Dbacc::checkNextBucketLab(Signal* s
   Uint32 tnsElementptr;
   Uint32 tnsContainerptr;
   Uint32 tnsIsLocked;
-  Uint32 tnsTmp1;
-  Uint32 tnsTmp2;
   Uint32 tnsCopyDir;
 
-  tnsCopyDir = scanPtr.p->nextBucketIndex >> fragrecptr.p->k;
+  tnsCopyDir = fragrecptr.p->getPageNumber(scanPtr.p->nextBucketIndex);
   tnsPageidptr.i = getPagePtr(fragrecptr.p->directory, tnsCopyDir);
   ptrCheckGuard(tnsPageidptr, cpagesize, page8);
   gnsPageidptr.i = tnsPageidptr.i;
   gnsPageidptr.p = tnsPageidptr.p;
-  tnsTmp1 = (1 << fragrecptr.p->k) - 1;
-  tgsePageindex = scanPtr.p->nextBucketIndex & tnsTmp1;
+  tgsePageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex);
   gsePageidptr.i = gnsPageidptr.i;
   gsePageidptr.p = gnsPageidptr.p;
   if (!getScanElement(signal)) {
@@ -6344,7 +6535,7 @@ void Dbacc::checkNextBucketLab(Signal* s
         return;
       }//if
     } else if (scanPtr.p->scanBucketState ==  ScanRec::FIRST_LAP) {
-      if ((fragrecptr.p->p + fragrecptr.p->maxp) < scanPtr.p->nextBucketIndex) {
+      if (fragrecptr.p->level.getTop() < scanPtr.p->nextBucketIndex) {
 	/* ---------------------------------------------------------------- */
 	// All buckets have been scanned a first time.
 	/* ---------------------------------------------------------------- */
@@ -6366,7 +6557,7 @@ void Dbacc::checkNextBucketLab(Signal* s
 	  /* --------------------------------------------------------------------------------- */
           scanPtr.p->nextBucketIndex = scanPtr.p->minBucketIndexToRescan;
 	  scanPtr.p->scanBucketState =  ScanRec::SECOND_LAP;
-          if (scanPtr.p->maxBucketIndexToRescan > (fragrecptr.p->p + fragrecptr.p->maxp)) {
+          if (scanPtr.p->maxBucketIndexToRescan > fragrecptr.p->level.getTop()) {
             jam();
 	    /* --------------------------------------------------------------------------------- */
 	    // If we have had so many merges that the maximum is bigger than the number of buckets
@@ -6379,7 +6570,7 @@ void Dbacc::checkNextBucketLab(Signal* s
               sendSystemerror(signal, __LINE__);
               return;
             }//if
-            scanPtr.p->maxBucketIndexToRescan = fragrecptr.p->p + fragrecptr.p->maxp;
+            scanPtr.p->maxBucketIndexToRescan = fragrecptr.p->level.getTop();
           }//if
         }//if
       }//if
@@ -6390,19 +6581,17 @@ void Dbacc::checkNextBucketLab(Signal* s
       // We will only reset the scan indicator on the buckets that existed at the start of the
       // scan. The others will be handled by the split and merge code.
       /* --------------------------------------------------------------------------------- */
-      tnsTmp2 = (1 << fragrecptr.p->k) - 1;
-      trsbPageindex = scanPtr.p->nextBucketIndex & tnsTmp2;
+      trsbPageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex);
       if (trsbPageindex != 0) {
         jam();
         rsbPageidptr.i = gnsPageidptr.i;
         rsbPageidptr.p = gnsPageidptr.p;
       } else {
         jam();
-        tmpP = scanPtr.p->nextBucketIndex >> fragrecptr.p->k;
+        tmpP = fragrecptr.p->getPageNumber(scanPtr.p->nextBucketIndex);
         cscPageidptr.i = getPagePtr(fragrecptr.p->directory, tmpP);
         ptrCheckGuard(cscPageidptr, cpagesize, page8);
-        tmp1 = (1 << fragrecptr.p->k) - 1;
-        trsbPageindex = scanPtr.p->nextBucketIndex & tmp1;
+        trsbPageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex);
         rsbPageidptr.i = cscPageidptr.i;
         rsbPageidptr.p = cscPageidptr.p;
       }//if
@@ -6539,12 +6728,12 @@ void Dbacc::initScanFragmentPart(Signal*
   scanPtr.p->activeLocalFrag = fragrecptr.i;
   scanPtr.p->nextBucketIndex = 0;	/* INDEX OF SCAN BUCKET */
   scanPtr.p->scanBucketState = ScanRec::FIRST_LAP;
-  scanPtr.p->startNoOfBuckets = fragrecptr.p->p + fragrecptr.p->maxp;
+  scanPtr.p->startNoOfBuckets = fragrecptr.p->level.getTop();
   scanPtr.p->minBucketIndexToRescan = 0xFFFFFFFF;
   scanPtr.p->maxBucketIndexToRescan = 0;
   cnfPageidptr.i = getPagePtr(fragrecptr.p->directory, 0);
   ptrCheckGuard(cnfPageidptr, cpagesize, page8);
-  trsbPageindex = scanPtr.p->nextBucketIndex & ((1 << fragrecptr.p->k) - 1);
+  trsbPageindex = fragrecptr.p->getPageIndex(scanPtr.p->nextBucketIndex);
   rsbPageidptr.i = cnfPageidptr.i;
   rsbPageidptr.p = cnfPageidptr.p;
   releaseScanBucket(signal);
@@ -6914,6 +7103,7 @@ void Dbacc::initScanOpRec(Signal* signal
     operationRecPtr.p->localdata[0] = Tkey1;
     operationRecPtr.p->localdata[1] = isoPageptr.p->word32[tisoLocalPtr];
   }
+  operationRecPtr.p->hashValue.clear();
   operationRecPtr.p->tupkeylen = fragrecptr.p->keyLength;
   operationRecPtr.p->xfrmtupkeylen = 0; // not used
 }//Dbacc::initScanOpRec()
@@ -7287,7 +7477,7 @@ void Dbacc::setlock(Signal* signal)
   arrGuard(tslElementptr, 2048);
   tselTmp1 = slPageidptr.p->word32[tslElementptr];
   operationRecPtr.p->scanBits = ElementHeader::getScanBits(tselTmp1);
-  operationRecPtr.p->hashvaluePart = ElementHeader::getHashValuePart(tselTmp1);
+  operationRecPtr.p->reducedHashValue = ElementHeader::getReducedHashValue(tselTmp1);
 
   tselTmp1 = ElementHeader::setLocked(operationRecPtr.i);
   dbgWord32(slPageidptr, tslElementptr, tselTmp1);
@@ -8188,10 +8378,9 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal
     infoEvent("elementIsforward=%d, elementPage=%d, elementPointer=%d ",
 	      tmpOpPtr.p->elementIsforward, tmpOpPtr.p->elementPage, 
 	      tmpOpPtr.p->elementPointer);
-    infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ",
-	      tmpOpPtr.p->fid, tmpOpPtr.p->fragptr, 
-	      tmpOpPtr.p->hashvaluePart);
-    infoEvent("hashValue=%d", tmpOpPtr.p->hashValue);
+    infoEvent("fid=%d, fragptr=%d ",
+              tmpOpPtr.p->fid, tmpOpPtr.p->fragptr);
+    infoEvent("hashValue=%d", tmpOpPtr.p->hashValue.pack());
     infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ",
 	      tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp, 
 	      tmpOpPtr.p->nextParallelQue);
@@ -8202,8 +8391,8 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal
 	      tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue);
     infoEvent("prevSerialQue=%d, scanRecPtr=%d",
 	      tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr);
-    infoEvent("m_op_bits=0x%x, scanBits=%d ",
-	      tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits);
+    infoEvent("m_op_bits=0x%x, scanBits=%d, reducedHashValue=%x ",
+              tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits, tmpOpPtr.p->reducedHashValue.pack());
     return;
   }
 
@@ -8405,17 +8594,16 @@ Dbacc::execNODE_STATE_REP(Signal* signal
 void
 Dbacc::debug_lh_vars(const char* where)
 {
-  Uint32 b = fragrecptr.p->maxp + fragrecptr.p->p;
-  Uint32 di = b >> fragrecptr.p->k;
+  Uint32 b = fragrecptr.p->level.getTop();
+  Uint32 di = fragrecptr.p->getPageNumber(b);
   Uint32 ri = di >> 8;
   ndbout
     << "DBACC: " << where << ":"
     << " frag:" << fragrecptr.p->myTableId
     << "/" << fragrecptr.p->myfid
-    << " slack:" << (Int32)fragrecptr.p->slack
+    << " slack:" << fragrecptr.p->slack
     << "/" << fragrecptr.p->slackCheck
-    << " maxp:" << fragrecptr.p->maxp
-    << " p:" << fragrecptr.p->p
+    << " top:" << fragrecptr.p->level.getTop()
     << " di:" << di
     << " ri:" << ri
     << " full:" << fragrecptr.p->dirRangeFull

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-09-18 13:43:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-09-28 23:43:50 +0000
@@ -539,10 +539,6 @@ void Dblqh::execCONTINUEB(Signal* signal
     {
       jam();
       c_lcp_complete_fragments.getPtr(fragptr);
-      signal->theData[0] = fragptr.p->tabRef;
-      signal->theData[1] = fragptr.p->fragId;
-      BlockReference accRef = calcInstanceBlockRef(DBACC);
-      sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB);
       Ptr<Fragrecord> save = fragptr;
 
       c_lcp_complete_fragments.next(fragptr);
@@ -12282,19 +12278,6 @@ Dblqh::execPREPARE_COPY_FRAG_REQ(Signal*
     /**
      *
      */
-    if (cstartType == NodeState::ST_SYSTEM_RESTART)
-    {
-      jam();
-      signal->theData[0] = fragptr.p->tabRef;
-      signal->theData[1] = fragptr.p->fragId;
-      BlockReference accRef = calcInstanceBlockRef(DBACC);
-      sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB);
-    }
-    
-    
-    /**
-     *
-     */
     fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED;
     fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
     fragptr.p->logFlag = Fragrecord::STATE_FALSE;
@@ -17085,10 +17068,6 @@ void Dblqh::execSTART_FRAGREQ(Signal* si
      */
     c_lcp_complete_fragments.add(fragptr);
 
-    signal->theData[0] = tabptr.i;
-    signal->theData[1] = fragId;
-    BlockReference accRef = calcInstanceBlockRef(DBACC);
-    sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB);
     c_tup->disk_restart_lcp_id(tabptr.i, fragId, RNIL);
     jamEntry();
     return;
@@ -17241,18 +17220,9 @@ void Dblqh::execRESTORE_LCP_CONF(Signal*
   c_lcp_restoring_fragments.remove(fragptr);
   c_lcp_complete_fragments.add(fragptr);
 
-  /**
-   * Disable expand check in ACC
-   *   before running REDO
-   */
   tabptr.i = fragptr.p->tabRef;
   ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
 
-  signal->theData[0] = fragptr.p->tabRef;
-  signal->theData[1] = fragptr.p->fragId;
-  BlockReference accRef = calcInstanceBlockRef(DBACC);
-  sendSignal(accRef, GSN_EXPANDCHECK2, signal, 2, JBB);
-  
   if (!c_lcp_waiting_fragments.isEmpty())
   {
     send_restore_lcp(signal);

=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp	2012-01-23 20:23:08 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp	2012-09-28 23:36:17 +0000
@@ -403,8 +403,32 @@ Trpman::execDBINFO_SCANREQ(Signal *signa
         row.write_uint32(getOwnNodeId()); // Node id
         row.write_uint32(rnode); // Remote node id
         row.write_uint32(globalTransporterRegistry.getPerformState(rnode)); // State
+
+        /* Connect address */
+        if (globalTransporterRegistry.get_transporter(rnode) != NULL &&
+            globalTransporterRegistry.get_connect_address(rnode).s_addr != 0)
+        {
+          row.write_string(inet_ntoa(globalTransporterRegistry.get_connect_address(rnode)));
+        }
+        else
+        {
+          row.write_string("-");
+        }
+
+        /* Bytes sent/received */
+        if (globalTransporterRegistry.get_transporter(rnode) != NULL)
+        {
+          row.write_uint64(globalTransporterRegistry.get_bytes_sent(rnode));
+          row.write_uint64(globalTransporterRegistry.get_bytes_received(rnode));
+        }
+        else
+        {
+          row.write_uint64(0);
+          row.write_uint64(0);
+        }
+
         ndbinfo_send_row(signal, req, row, rl);
-       break;
+        break;
       }
 
       case NodeInfo::INVALID:

=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt	2012-01-23 17:37:12 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt	2012-09-28 12:55:26 +0000
@@ -72,7 +72,7 @@ SET_TARGET_PROPERTIES(mt_thr_config-t
                       PROPERTIES COMPILE_FLAGS "-DTEST_MT_THR_CONFIG")
 TARGET_LINK_LIBRARIES(mt_thr_config-t ndbgeneral)
 
-FOREACH(testprog CountingPool DynArr256)
+FOREACH(testprog CountingPool DynArr256 LHLevel)
   ADD_EXECUTABLE(${testprog}-t ${testprog}.cpp)
   SET_TARGET_PROPERTIES(${testprog}-t PROPERTIES COMPILE_FLAGS "-DTAP_TEST")
   TARGET_LINK_LIBRARIES(${testprog}-t ndbtest ndbkernel ndbsched ndberror

=== added file 'storage/ndb/src/kernel/vm/LHLevel.cpp'
--- a/storage/ndb/src/kernel/vm/LHLevel.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/LHLevel.cpp	2012-09-28 13:10:15 +0000
@@ -0,0 +1,277 @@
+/*
+   Copyright (c) 2012 Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifdef TAP_TEST
+
+#include <assert.h>
+#include <stdlib.h>
+
+#include <ndb_global.h>
+#include <NdbTap.hpp>
+
+#include "md5_hash.hpp"
+#include "random.h"
+#include "LHLevel.hpp"
+
+#ifndef UINT32_MAX
+#define UINT32_MAX (4294967295U)
+#endif
+
+#define BUCKSIZE 3
+
+struct elem
+{
+  Uint32 val;
+  Uint16 head;
+  static LHBits32 hash(Uint32 val)
+  {
+    return LHBits32(md5_hash((Uint64*)&val, 1));
+  }
+};
+
+void expand(LHLevel& lh, elem(*arr)[BUCKSIZE]);
+bool shrink(LHLevel& lh, elem(*arr)[BUCKSIZE]);
+bool insert_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 v);
+bool delete_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 w);
+int count_elem(LHLevel& lh, elem(*arr)[BUCKSIZE]);
+
+Uint64 c_inserts = 0;
+Uint64 c_expands = 0;
+Uint64 c_shrinks = 0;
+Uint64 c_deletes = 0;
+Uint64 c_moved = 0;
+Uint64 c_rehashed = 0;
+
+int main(int argc, char *argv[])
+{
+  unsigned int nelem = argc > 1 ? atoi(argv[1]) : 1000000;
+  plan(4);
+  elem(*arr)[BUCKSIZE] = new elem[nelem][BUCKSIZE];
+  bzero(arr, nelem * sizeof(elem[BUCKSIZE]));
+  LHLevel lh;
+  lh.clear();
+  expand(lh, arr);
+  Uint32 v = 0;
+  myRandom48Init(nelem);
+  for (int lap = 1; lap <= 2; lap++)
+  {
+    // Fill up table, with occasionally shrink
+    for (;v < UINT32_MAX;v++)
+    {
+      if (lh.getSize() * (BUCKSIZE - 1) < c_inserts - c_deletes)
+      {
+        if (!lh.isFull() && lh.getSize() < nelem)
+          expand(lh, arr);
+        else
+          break; /* Filled up */
+      }
+      insert_elem(lh, arr, v);
+      if (rand() % 100 == 0)
+        shrink(lh, arr);
+    }
+
+    // First lap, shrink to half
+    // Second lap, delete all
+    Uint32 lim = lh.getSize();
+    lim = lap * lim / 2;
+    while (v > 0 && lim > 0)
+    {
+      if (lh.isEmpty()) break;
+      Uint32 w = (Uint32)myRandom48(v + 1);
+      delete_elem(lh, arr, w);
+      delete_elem(lh, arr, v);
+      v--;
+      if (lh.getSize() * BUCKSIZE * 3 > c_inserts - c_deletes)
+        if (shrink(lh, arr))
+          lim--;
+    }
+
+    // Check table consistency
+    if (lap == 1)
+    {
+      int n = count_elem(lh, arr);
+      ok((n >= 0), "all element hash values match stored hash value and bucket address");
+      if (n < 0) n = -n;
+      ok((c_inserts == c_deletes + n),
+         "scanned element count (%u) matches difference between inserts (%llu) and deletes (%llu)",
+         n, c_inserts, c_deletes);
+    }
+  }
+  ok((c_inserts == c_deletes), "inserts (%llu) equals deletes (%llu)", c_inserts, c_deletes);
+  ok((c_expands == c_shrinks), "expands (%llu) equals shrinks (%llu)", c_expands, c_shrinks);
+  return exit_status();
+}
+
+bool delete_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 w)
+{
+  LHBits32 hash(elem::hash(w));
+  Uint32 addr = lh.getBucketNumber(hash);
+  int i;
+  bool found = false;
+  for (i = 0; i < BUCKSIZE && (arr[addr][i].head != 0); i++)
+  {
+    if (arr[addr][i].val == w)
+    {
+      found = true;
+      break;
+    }
+  }
+  if (found)
+  {
+    assert(arr[addr][i].head > 0);
+    int j;
+    c_deletes += arr[addr][i].head;
+    for (j = i + 1; j < BUCKSIZE; j++, i++)
+      arr[addr][i] = arr[addr][j];
+    bzero(&arr[addr][i], sizeof(arr[addr][i]));
+    return true;
+  }
+  else if (i < BUCKSIZE)
+  {
+    assert(arr[addr][i].head == 0);
+  }
+  return false;
+}
+
+bool shrink(LHLevel& lh, elem(*arr)[BUCKSIZE])
+{
+  assert(!lh.isEmpty());
+  Uint32 from;
+  Uint32 to;
+  if (!lh.getMergeBuckets(from, to))
+  {
+    int c = 0;
+    for (int i = 0; i < BUCKSIZE && arr[from][i].head != 0; i++)
+      c++;
+    // Only shrink if the only bucket is empty
+    if (c==0)
+    {
+      c_shrinks++;
+      lh.shrink();
+      return true;
+    }
+    return false;
+  }
+  assert(to < from);
+  int i, j;
+  int c = 0;
+  for (i = 0; i < BUCKSIZE && arr[to][i].head != 0; i++)
+    c++;
+  for (j = 0; j < BUCKSIZE && arr[from][j].head != 0; j++)
+    c++;
+  // Only shrink if both buckets element can fit in one bucket
+  if (c <= BUCKSIZE)
+  {
+    for (j = 0; j < BUCKSIZE && arr[from][j].head != 0; j++)
+    {
+      assert(i<BUCKSIZE);
+      arr[to][i] = arr[from][j];
+      bzero(&arr[from][j], sizeof(arr[from][j]));
+      i++;
+    }
+    c_shrinks++;
+    lh.shrink();
+    return true;
+  }
+  return false;
+}
+
+void expand(LHLevel& lh, elem(*arr)[BUCKSIZE])
+{
+  assert(!lh.isFull());
+  Uint32 from;
+  Uint32 to;
+  if (!lh.getSplitBucket(from, to))
+  {
+    // empty hash table, trivially expands to one bucket
+    c_expands++;
+    lh.expand();
+    return;
+  }
+  int i, j, k;
+  for (i = j = k = 0; i < BUCKSIZE && (arr[from][i].head != 0);
+       i++)
+  {
+    LHBits32 hash = elem::hash(arr[from][i].val);
+    if (lh.shouldMoveBeforeExpand(hash))
+    {
+      c_moved++;
+      arr[to][j] = arr[from][i];
+      j++;
+    }
+    else
+    {
+      if (k < i)
+        arr[from][k] = arr[from][i];
+      k++;
+    }
+  }
+  for (; j < BUCKSIZE; j++)
+    bzero(&arr[to][j], sizeof(arr[to][j]));
+  for (; k < BUCKSIZE; k++)
+    bzero(&arr[from][k], sizeof(arr[from][k]));
+  lh.expand();
+  c_expands++;
+}
+
+bool insert_elem(LHLevel& lh, elem(*arr)[BUCKSIZE], Uint32 v)
+{
+  LHBits32 hash = elem::hash(v);
+  Uint32 addr = lh.getBucketNumber(hash);
+  int i;
+  bool found = false;
+  for (i = 0; i < BUCKSIZE && (arr[addr][i].head != 0); i++)
+  {
+    if (arr[addr][i].val == v)
+    {
+      found = true;
+      break;
+    }
+  }
+  if (found)
+  {
+    arr[addr][i].head++;
+    c_inserts++;
+  }
+  else if (i < BUCKSIZE)
+  {
+    arr[addr][i].head = 1;
+    arr[addr][i].val = v;
+    c_inserts++;
+  }
+  else
+  {
+    return false;
+  }
+  return true;
+}
+
+int count_elem(LHLevel& lh, elem(*arr)[BUCKSIZE])
+{
+  int elements = 0;
+  int failures = 0;
+  if (!lh.isEmpty()) for (Uint32 addr = 0; addr <= lh.getTop(); addr++)
+    {
+      for (int i = 0; i < BUCKSIZE && arr[addr][i].head != 0; i++)
+      {
+        elements++;
+      }
+    }
+  return failures > 0 ? -elements : elements;
+}
+
+#endif

=== added file 'storage/ndb/src/kernel/vm/LHLevel.hpp'
--- a/storage/ndb/src/kernel/vm/LHLevel.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/LHLevel.hpp	2012-09-28 13:11:17 +0000
@@ -0,0 +1,456 @@
+/*
+   Copyright (c) 2012 Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_LHLEVEL_H
+#define NDB_LHLEVEL_H
+
+/**
+ * LHLevel keeps level information for linear hashing,
+ **/
+
+/**
+ * Supports up to UINT32_MAX number of bucket addresses.
+ * If support for more is needed, one also have to
+ * increase hash key size.
+ **/
+
+#include <assert.h>
+#include "Bitmask.hpp"
+
+template<typename Int> class LHBits
+{
+public:
+  explicit LHBits();
+  explicit LHBits(Int bits);
+  template<typename Int2> LHBits(LHBits<Int2> const& bits);
+  ~LHBits();
+  LHBits& operator=(LHBits const&);
+
+  void clear();
+static LHBits<Int> unpack(Int packed);
+  Int pack() const;
+  bool match(LHBits other) const;
+  void shift_out();
+  void shift_out(Uint8 bits);
+  void shift_in(bool bit);
+  void shift_in(Uint8 bits, Int value);
+  Uint8 valid_bits() const;
+  bool valid_bits(Int bits) const;
+  bool valid_bit(Int bit) const;
+  Int get_bits(Int bits) const;
+  Int get_bit(Int bit) const;
+private:
+  Int highbit() const;
+  Int m_bits;
+};
+
+typedef LHBits<Uint16> LHBits16;
+typedef LHBits<Uint32> LHBits32;
+
+class LHLevel
+{
+public:
+  explicit LHLevel();
+  explicit LHLevel(Uint32 size);
+  ~LHLevel() {}
+private:
+  LHLevel(LHLevel const&); // Not to be implemented
+  LHLevel&  operator=(LHLevel const&); // Not to be implemented
+public:
+  void clear();
+  bool isEmpty() const;
+  bool isFull() const;
+  Uint32 getSize() const;
+  void setSize(Uint32 size);
+  Uint32 getTop() const;
+public:
+  Uint32 getBucketNumber(LHBits32 hash_value) const;
+  bool getSplitBucket(Uint32& from, Uint32& to) const; // true if data move needed
+  bool shouldMoveBeforeExpand(LHBits32 hash_value) const;
+  void expand();
+  bool getMergeBuckets(Uint32& from, Uint32& to) const; // true if data move needed
+  void shrink();
+private:
+  enum {
+    ADDR_MAX = 0xFFFFFFFEU,
+    MAX_SIZE = 0xFFFFFFFFU,
+    MAXP_EMPTY = 0xFFFFFFFFU,
+  };
+protected:
+  Uint32 max_size() const;
+  Uint8 hashcheckbit() const { return m_hashcheckbit; }
+  Uint32 maxp() const { return m_maxp; }
+  Uint32 p() const { return m_p; }
+private:
+  Uint32 m_maxp;
+  Uint32 m_p;
+  Uint8 m_hashcheckbit;
+};
+
+class LocalLHLevel : public LHLevel
+{
+public:
+  explicit LocalLHLevel(Uint32& size): LHLevel(size), m_src(size) {}
+  ~LocalLHLevel() { m_src = getSize(); }
+private:
+  LocalLHLevel(const LocalLHLevel&); // Not to be implemented
+  LocalLHLevel&  operator=(const LocalLHLevel&); // Not to be implemented
+  Uint32& m_src;
+};
+
+/**
+ * LHLevelRH is LHLevel extended with support for
+ * a reduced hash value suitable to store with
+ * element in hash table.
+ */
+
+class LHLevelRH: public LHLevel
+{
+public:
+  explicit LHLevelRH(): LHLevel() {}
+  explicit LHLevelRH(Uint32 const& size): LHLevel(size) {}
+  ~LHLevelRH() {}
+private:
+  LHLevelRH(LHLevelRH const&); // Not to be implemented
+  LHLevelRH&  operator=(LHLevelRH const&); // Not to be implemented
+public:
+  LHBits16 reduce(LHBits32 hash_value) const;
+  Uint8 getNeededValidBits(Uint8 bits) const;
+  LHBits32 enlarge(LHBits16 reduced_hash_value, Uint32 bucket_number) const;
+};
+
+/**
+ * Implementation LHBits<>
+ **/
+
+template<typename Int> inline LHBits<Int>::LHBits()
+: m_bits(1)
+{
+}
+
+template<typename Int> inline LHBits<Int>::LHBits(Int bits)
+: m_bits(bits | highbit())
+{
+}
+
+template<typename Int> template<typename Int2> inline LHBits<Int>::LHBits(LHBits<Int2> const& bits)
+: m_bits(bits.pack())
+{
+  if (m_bits != bits.pack())
+    m_bits |= highbit();
+}
+
+template<typename Int> inline LHBits<Int>::~LHBits()
+{
+}
+
+template<typename Int> inline LHBits<Int>& LHBits<Int>::operator=(LHBits const& src)
+{
+  m_bits = src.m_bits;
+  return *this;
+}
+
+template<typename Int> inline void LHBits<Int>::clear()
+{
+  m_bits = 1;
+}
+
+template<typename Int> inline LHBits<Int> LHBits<Int>::unpack(Int packed)
+{
+  LHBits<Int> x;
+  x.m_bits = packed;
+  return x;
+}
+
+template<typename Int> inline Int LHBits<Int>::pack() const
+{
+  return m_bits;
+}
+
+template<typename Int> inline bool LHBits<Int>::match(LHBits<Int> other) const
+{
+  assert(sizeof(Int) <= sizeof(Uint32));
+  assert       (m_bits != 0) ;
+  assert       (other.m_bits != 0);
+  assert(sizeof(Int) <= sizeof(Uint32) &&
+         (m_bits != 0) &&
+         (other.m_bits != 0));
+  // Warning: dont reduce to one shift below.
+  // << is only defined for right values < 32.
+  // and since m_bits != 0, clz(MIN(...)) is guaranteed <= 31
+  return ((Uint32(m_bits ^ other.m_bits) << BitmaskImpl::clz(MIN(m_bits, other.m_bits))) << 1) == 0;
+}
+
+template<typename Int> inline void LHBits<Int>::shift_out()
+{
+  m_bits >>= 1;
+  if (m_bits == 0)
+    m_bits++;
+}
+
+template<typename Int> inline void LHBits<Int>::shift_out(Uint8 bits)
+{
+  assert(bits < 8 * sizeof(m_bits));
+  m_bits >>= bits;
+  if (m_bits == 0)
+    m_bits++;
+}
+
+template<typename Int> inline void LHBits<Int>::shift_in(bool bit)
+{
+  if (m_bits >= highbit())
+    m_bits |= (highbit() >> 1);
+  m_bits = (m_bits << 1) | (bit ? 1 : 0);
+}
+
+template<typename Int> inline void LHBits<Int>::shift_in(Uint8 bits, Int value)
+{
+  assert(m_bits != 0);
+  assert(bits < 8 * sizeof(m_bits));
+  assert(value < (Int(1) << bits));
+  if (bits == 0)
+    return;
+  if (m_bits >= (highbit() >> (bits - 1)))
+    m_bits = highbit() | (m_bits << bits) | value;
+  else
+    m_bits = (m_bits << bits) | value;
+}
+
+template<typename Int> inline Uint8 LHBits<Int>::valid_bits() const
+{
+  assert(m_bits != 0);
+  return BitmaskImpl::fls(m_bits);
+}
+
+template<typename Int> inline bool LHBits<Int>::valid_bits(Int bits) const
+{
+  // Only allow bits to be on the form 2^n-1
+  assert((m_bits != 0) &&
+         (((bits + 1U) | bits) == (bits << 1U) + 1U)); // bits is 0..01..1
+  return m_bits > bits;
+}
+
+template<typename Int> inline bool LHBits<Int>::valid_bit(Int bit) const
+{
+  // Only allow bit to be on the form 2^n
+  assert((m_bits != 0) &&
+         ((((bit - 1U) | bit) >> 1U) == bit - 1U)); // bits is 0..010..0
+  return (m_bits >> 1U) >= bit;
+}
+
+template<typename Int> inline Int LHBits<Int>::get_bits(Int bits) const
+{
+  assert(valid_bits(bits));
+  return m_bits & bits;
+}
+
+template<typename Int> inline Int LHBits<Int>::get_bit(Int bit) const
+{
+  assert(valid_bit(bit));
+  return m_bits & bit;
+}
+
+template<typename Int> inline Int LHBits<Int>::highbit() const
+{
+  return 1 << (sizeof(Int) * 8 - 1);
+}
+
+/**
+ * Implementaion LHLevel
+ **/
+
+inline LHLevel::LHLevel()
+{
+  setSize(0);
+}
+
+inline LHLevel::LHLevel(Uint32 _size)
+{
+  setSize(_size);
+}
+
+inline void LHLevel::clear()
+{
+  m_maxp = MAXP_EMPTY;
+  m_p = 0;
+}
+
+inline bool LHLevel::isEmpty() const
+{
+  return maxp() == MAXP_EMPTY;
+}
+
+inline bool LHLevel::isFull() const
+{
+  return !isEmpty() && (getTop() == ADDR_MAX);
+}
+
+inline Uint32 LHLevel::max_size() const
+{
+  return MAX_SIZE;
+}
+
+inline Uint32 LHLevel::getSize() const
+{
+  assert(!isEmpty() || (maxp() + 1 + p() == 0));
+  return maxp() + 1 + p();
+}
+
+inline void LHLevel::setSize(Uint32 size)
+{
+  assert(size <= max_size());
+  if (size == 0)
+    clear();
+  else
+  {
+    m_hashcheckbit = BitmaskImpl::fls(size);
+    m_maxp = (1 << hashcheckbit()) - 1;
+    m_p = size - 1 - maxp();
+  }
+}
+
+inline Uint32 LHLevel::getTop() const
+{
+  assert(!isEmpty());
+  return maxp() + p();
+}
+
+inline Uint32 LHLevel::getBucketNumber(LHBits32 hash_value) const
+{
+  assert(!isEmpty());
+  Uint32 addr = hash_value.get_bits(maxp());
+  if (addr < p())
+  {
+    addr |= hash_value.get_bit(maxp() + 1);
+  }
+  return addr;
+}
+
+inline bool LHLevel::getSplitBucket(Uint32& from, Uint32& to) const
+{
+  assert(!isFull());
+
+  from = p();
+  to = getSize(); // == getTop() + 1
+
+  // true if data move needed, that is, it was not empty
+  return to > 0;
+}
+
+inline void LHLevel::expand()
+{
+  assert(!isFull());
+
+  if (isEmpty())
+  {
+    m_p = 0;
+    m_hashcheckbit = 0;
+    m_maxp = 0;
+  }
+  else if (p() == maxp())
+  {
+    m_maxp = (maxp() << 1) | 1;
+    m_hashcheckbit ++;
+    m_p = 0;
+  }
+  else
+  {
+    m_p ++;
+  }
+}
+
+inline bool LHLevel::shouldMoveBeforeExpand(LHBits32 hash_value) const
+{
+  return hash_value.get_bit(1 << hashcheckbit());
+}
+
+inline bool LHLevel::getMergeBuckets(Uint32& from, Uint32& to) const
+{
+  assert(!isEmpty());
+
+  from = getTop();
+
+  if (likely(p() != 0))
+  {
+    to = p() - 1;
+  }
+  else
+  {
+    to = maxp() >> 1;
+  }
+
+  // true if data move needed, that is, all buckets disappeared
+  return from > 0;
+}
+
+inline void LHLevel::shrink()
+{
+  assert(!isEmpty());
+
+  if (p() != 0)
+  {
+    m_p --;
+  }
+  else if (maxp() == 0)
+  {
+    m_maxp --; // == MAXP_EMPTY
+  }
+  else
+  {
+    m_maxp >>= 1;
+    m_hashcheckbit --;
+    m_p = m_maxp;
+  }
+}
+
+/**
+ * Implementation LHLevelRH
+ **/
+
+inline LHBits16 LHLevelRH::reduce(LHBits32 hash_value) const
+{
+  assert(!isEmpty());
+
+  if (!hash_value.valid_bits(maxp()))
+    return LHBits16();
+
+  Uint32 addr = hash_value.get_bits(maxp());
+  LHBits32 hv(hash_value);
+  hv.shift_out(hashcheckbit());
+  if (addr < p())
+    hv.shift_out();
+  return LHBits16(hv);
+}
+
+inline Uint8 LHLevelRH::getNeededValidBits(Uint8 bits) const
+{
+  Uint8 const usable_bits_in_hash_value = 4 * sizeof(LHBits32) - 1; // == 31
+  return MIN(bits, usable_bits_in_hash_value - hashcheckbit());
+}
+
+inline LHBits32 LHLevelRH::enlarge(LHBits16 reduced_hash_value, Uint32 bucket_number) const
+{
+  assert(!isEmpty());
+
+  Uint32 addr = bucket_number & maxp();
+  LHBits32 hv(reduced_hash_value);
+  Uint8 addr_bits = hashcheckbit() + ((addr < p()) ? 1 : 0);
+  hv.shift_in(addr_bits, bucket_number);
+  return hv;
+}
+
+#endif

=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am	2012-03-20 13:50:15 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am	2012-09-28 12:55:26 +0000
@@ -113,6 +113,11 @@ DynArr256_t_SOURCES = DynArr256.cpp
 DynArr256_t_LDFLAGS = @ndb_bin_am_ldflags@
 DynArr256_t_LDADD = $(test_ldadd)
 
+LHLevel_t_CXXFLAGS = -DTAP_TEST
+LHLevel_t_SOURCES = LHLevel.cpp
+LHLevel_t_LDFLAGS = @ndb_bin_am_ldflags@
+LHLevel_t_LDADD = $(test_ldadd)
+
 testSectionReader_CXXFLAGS = -DUNIT_TEST
 testSectionReader_SOURCES = SectionReader.cpp
 testSectionReader_LDFLAGS = @ndb_bin_am_ldflags@
@@ -143,7 +148,7 @@ mt_thr_config_t_LDADD = \
 	$(top_builddir)/storage/ndb/src/common/util/libgeneral.la \
 	$(top_builddir)/mysys/libmysyslt.la
 
-noinst_PROGRAMS = mt_thr_config-t CountingPool-t DynArr256-t
+noinst_PROGRAMS = mt_thr_config-t CountingPool-t DynArr256-t LHLevel-t
 
 mt_send_t_CXXFLAGS = -DTAP_TEST
 mt_send_t_SOURCES = mt-send-t.cpp

=== modified file 'storage/ndb/src/kernel/vm/NdbinfoTables.cpp'
--- a/storage/ndb/src/kernel/vm/NdbinfoTables.cpp	2011-10-17 16:46:12 +0000
+++ b/storage/ndb/src/kernel/vm/NdbinfoTables.cpp	2012-09-28 23:50:32 +0000
@@ -78,13 +78,17 @@ DECLARE_NDBINFO_TABLE(POOLS,12) =
   }
 };
 
-DECLARE_NDBINFO_TABLE(TRANSPORTERS, 3) =
-{ { "transporters", 3, 0, "transporter status" },
+DECLARE_NDBINFO_TABLE(TRANSPORTERS, 6) =
+{ { "transporters", 6, 0, "transporter status" },
   {
     {"node_id",            Ndbinfo::Number, ""},
     {"remote_node_id",     Ndbinfo::Number, ""},
 
-    {"connection_status",  Ndbinfo::Number, ""}
+    {"connection_status",  Ndbinfo::Number, ""},
+    
+    {"remote_address",     Ndbinfo::String, ""},
+    {"bytes_sent",         Ndbinfo::Number64, ""},
+    {"bytes_received",     Ndbinfo::Number64, ""}
   }
 };
 

=== modified file 'storage/ndb/tools/ndbinfo_sql.cpp'
--- a/storage/ndb/tools/ndbinfo_sql.cpp	2011-12-13 18:32:26 +0000
+++ b/storage/ndb/tools/ndbinfo_sql.cpp	2012-09-28 23:50:32 +0000
@@ -66,7 +66,8 @@ struct view {
     "  WHEN 2 THEN \"DISCONNECTED\""
     "  WHEN 3 THEN \"DISCONNECTING\""
     "  ELSE NULL "
-    " END AS status "
+    " END AS status, "
+    " remote_address, bytes_sent, bytes_received "
     "FROM `<NDBINFO_DB>`.`<TABLE_PREFIX>transporters`"
   },
   { "logspaces",

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (frazer.clement:4618 to 4620) Frazer Clement3 Oct