List:Commits« Previous MessageNext Message »
From:jonas oreland Date:April 27 2011 9:13pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (jonas:4342 to 4343)
View as plain text  
 4343 jonas oreland	2011-04-27 [merge]
      ndb - merge first parts of wl4163

    modified:
      storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
 4342 Jonas Oreland	2011-04-27
      ndb - fix 0-len key (fails on my new brillant assert in ACC)

    modified:
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2011-04-19 09:01:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2011-04-25 16:46:59 +0000
@@ -219,16 +219,11 @@ private:
   static const TreeEnt NullTreeEnt;
 
   /*
-   * Tree node has 1) fixed part 2) a prefix of index key data for min
-   * entry 3) max and min entries 4) rest of entries 5) one extra entry
-   * used as work space.
+   * Tree node has 3 parts:
    *
-   * struct TreeNode            part 1, size 6 words
-   * min prefix                 part 2, size TreeHead::m_prefSize
-   * max entry                  part 3
-   * min entry                  part 3
-   * rest of entries            part 4
-   * work entry                 part 5
+   * 1) struct TreeNode - the header (6 words)
+   * 2) some key values for min entry - the min prefix
+   * 3) list of TreeEnt (each 2 words)
    *
    * There are 3 links to other nodes: left child, right child, parent.
    * Occupancy (number of entries) is at least 1 except temporarily when
@@ -248,17 +243,6 @@ private:
   STATIC_CONST( NodeHeadSize = sizeof(TreeNode) >> 2 );
 
   /*
-   * Tree node "access size" was for an early version with signal
-   * interface to TUP.  It is now used only to compute sizes.
-   */
-  enum AccSize {
-    AccNone = 0,
-    AccHead = 1,                // part 1
-    AccPref = 2,                // parts 1-3
-    AccFull = 3                 // parts 1-5
-  };
-
-  /*
    * Tree header.  There is one in each fragment.  Contains tree
    * parameters and address of root node.
    */
@@ -273,7 +257,6 @@ private:
     TupLoc m_root;              // root node
     TreeHead();
     // methods
-    unsigned getSize(AccSize acc) const;
     Data getPref(TreeNode* node) const;
     TreeEnt* getEntList(TreeNode* node) const;
   };
@@ -562,7 +545,6 @@ private:
     // access other parts of the node
     Data getPref();
     TreeEnt getEnt(unsigned pos);
-    TreeEnt getMinMax(unsigned i);
     // for ndbrequire and ndbassert
     void progError(int line, int cause, const char* file);
   };
@@ -583,6 +565,7 @@ private:
   void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
   void copyAttrs(TuxCtx&, const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
   void unpackBound(const ScanBound& bound, Data data);
+  void findFrag(const Index& index, Uint32 fragId, FragPtr& fragPtr);
 
   /*
    * DbtuxMeta.cpp
@@ -674,11 +657,14 @@ private:
   /*
    * DbtuxSearch.cpp
    */
+  void findNodeToUpdate(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode);
+  bool findPosToAdd(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
+  bool findPosToRemove(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
   bool searchToAdd(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
-  bool searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+  bool searchToRemove(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+  void findNodeToScan(Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode);
+  void findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos);
   void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
-  void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
-  void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
 
   /*
    * DbtuxCmp.cpp
@@ -929,22 +915,6 @@ Dbtux::TreeHead::TreeHead() :
 {
 }
 
-inline unsigned
-Dbtux::TreeHead::getSize(AccSize acc) const
-{
-  switch (acc) {
-  case AccNone:
-    return 0;
-  case AccHead:
-    return NodeHeadSize;
-  case AccPref:
-    return NodeHeadSize + m_prefSize + 2 * TreeEntSize;
-  case AccFull:
-    return m_nodeSize;
-  }
-  return 0;
-}
-
 inline Dbtux::Data
 Dbtux::TreeHead::getPref(TreeNode* node) const
 {
@@ -1201,15 +1171,7 @@ Dbtux::NodeHandle::getEnt(unsigned pos)
   TreeEnt* entList = tree.getEntList(m_node);
   const unsigned occup = m_node->m_occup;
   ndbrequire(pos < occup);
-  return entList[(1 + pos) % occup];
-}
-
-inline Dbtux::TreeEnt
-Dbtux::NodeHandle::getMinMax(unsigned i)
-{
-  const unsigned occup = m_node->m_occup;
-  ndbrequire(i <= 1 && occup != 0);
-  return getEnt(i == 0 ? 0 : occup - 1);
+  return entList[pos];
 }
 
 // parameters for methods

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp	2011-01-30 20:56:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp	2011-04-24 13:10:50 +0000
@@ -83,15 +83,7 @@ Dbtux::mt_buildIndexFragment(mt_BuildInd
   const Uint32 fragId = req->fragId;
   // get the fragment
   FragPtr fragPtr;
-  fragPtr.i = RNIL;
-  for (unsigned i = 0; i < indexPtr.p->m_numFrags; i++) {
-    jam();
-    if (indexPtr.p->m_fragId[i] == fragId) {
-      jam();
-      c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
-      break;
-    }
-  }
+  findFrag(*indexPtr.p, fragId, fragPtr);
   ndbrequire(fragPtr.i != RNIL);
   Frag& frag = *fragPtr.p;
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2011-04-24 16:20:23 +0000
@@ -286,7 +286,7 @@ Dbtux::printNode(TuxCtx & ctx,
   { ConstData data1 = node.getPref();
     Uint32 data2[MaxPrefSize];
     memset(data2, DataFillByte, MaxPrefSize << 2);
-    readKeyAttrs(ctx, frag, node.getMinMax(0), 0, ctx.c_searchKey);
+    readKeyAttrs(ctx, frag, node.getEnt(0), 0, ctx.c_searchKey);
     copyAttrs(ctx, frag, ctx.c_searchKey, data2, tree.m_prefSize);
     for (unsigned n = 0; n < tree.m_prefSize; n++) {
       if (data1[n] != data2[n]) {
@@ -320,7 +320,8 @@ Dbtux::printNode(TuxCtx & ctx,
     if (node.getLink(i) == NullTupLoc)
       continue;
     const TreeEnt ent1 = cpar[i].m_minmax[1 - i];
-    const TreeEnt ent2 = node.getMinMax(i);
+    const unsigned pos = (i == 0 ? 0 : node.getOccup() - 1);
+    const TreeEnt ent2 = node.getEnt(pos);
     unsigned start = 0;
     readKeyAttrs(ctx, frag, ent1, start, ctx.c_searchKey);
     readKeyAttrs(ctx, frag, ent2, start, ctx.c_entryKey);
@@ -337,9 +338,10 @@ Dbtux::printNode(TuxCtx & ctx,
   par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
   par.m_occup = node.getOccup();
   for (unsigned i = 0; i <= 1; i++) {
-    if (node.getLink(i) == NullTupLoc)
-      par.m_minmax[i] = node.getMinMax(i);
-    else
+    if (node.getLink(i) == NullTupLoc) {
+      const unsigned pos = (i == 0 ? 0 : node.getOccup() - 1);
+      par.m_minmax[i] = node.getEnt(pos);
+    } else
       par.m_minmax[i] = cpar[i].m_minmax[i];
   }
 }
@@ -387,9 +389,6 @@ operator<<(NdbOut& out, const Dbtux::Tre
   out << " [prefSize " << dec << tree.m_prefSize << "]";
   out << " [minOccup " << dec << tree.m_minOccup << "]";
   out << " [maxOccup " << dec << tree.m_maxOccup << "]";
-  out << " [AccHead " << dec << tree.getSize(Dbtux::AccHead) << "]";
-  out << " [AccPref " << dec << tree.getSize(Dbtux::AccPref) << "]";
-  out << " [AccFull " << dec << tree.getSize(Dbtux::AccFull) << "]";
   out << " [root " << hex << tree.m_root << "]";
   out << "]";
   return out;
@@ -528,9 +527,8 @@ operator<<(NdbOut& out, const Dbtux::Nod
   unsigned numpos = node.m_node->m_occup;
   data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
   const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
-  // print entries in logical order
-  for (unsigned pos = 1; pos <= numpos; pos++)
-    out << " " << entList[pos % numpos];
+  for (unsigned pos = 0; pos < numpos; pos++)
+    out << " " << entList[pos];
   out << "]";
   out << "]";
   return out;

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2011-04-24 13:10:50 +0000
@@ -366,4 +366,20 @@ Dbtux::unpackBound(const ScanBound& boun
   }
 }
 
+void
+Dbtux::findFrag(const Index& index, Uint32 fragId, FragPtr& fragPtr)
+{
+  const Uint32 numFrags = index.m_numFrags;
+  for (Uint32 i = 0; i < numFrags; i++) {
+    jam();
+    if (index.m_fragId[i] == fragId) {
+      jam();
+      fragPtr.i = index.m_fragPtrI[i];
+      c_fragPool.getPtr(fragPtr);
+      return;
+    }
+  }
+  fragPtr.i = RNIL;
+}
+
 BLOCK_FUNCTIONS(Dbtux)

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2011-02-01 21:05:11 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2011-04-25 14:42:38 +0000
@@ -63,15 +63,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
   const Uint32 fragId = req->fragId;
   // get the fragment
   FragPtr fragPtr;
-  fragPtr.i = RNIL;
-  for (unsigned i = 0; i < indexPtr.p->m_numFrags; i++) {
-    jam();
-    if (indexPtr.p->m_fragId[i] == fragId) {
-      jam();
-      c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
-      break;
-    }
-  }
+  findFrag(*indexPtr.p, fragId, fragPtr);
   ndbrequire(fragPtr.i != RNIL);
   Frag& frag = *fragPtr.p;
   // set up index keys for this operation
@@ -152,7 +144,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
     break;
   case TuxMaintReq::OpRemove:
     jam();
-    ok = searchToRemove(frag, c_ctx.c_searchKey, ent, treePos);
+    ok = searchToRemove(c_ctx, frag, c_ctx.c_searchKey, ent, treePos);
 #ifdef VM_TRACE
     if (debugFlags & DebugMaint) {
       debugOut << treePos << (! ok ? " - error" : "") << endl;

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp	2011-04-24 16:20:23 +0000
@@ -313,21 +313,20 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
     tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
     tree.m_prefSize = MAX_TTREE_PREF_SIZE;
     const unsigned maxSlack = MAX_TTREE_NODE_SLACK;
-    // size up to and including first 2 entries
-    const unsigned pref = tree.getSize(AccPref);
-    if (! (pref <= tree.m_nodeSize)) {
+    // size of header and min prefix
+    const unsigned fixedSize = NodeHeadSize + tree.m_prefSize;
+    if (! (fixedSize <= tree.m_nodeSize)) {
       jam();
       errorCode = (TuxFragRef::ErrorCode)TuxAddAttrRef::InvalidNodeSize;
       break;
     }
-    const unsigned slots = (tree.m_nodeSize - pref) / TreeEntSize;
-    // leave out work space entry
-    tree.m_maxOccup = 2 + slots - 1;
+    const unsigned slots = (tree.m_nodeSize - fixedSize) / TreeEntSize;
+    tree.m_maxOccup = slots;
     // min occupancy of interior node must be at least 2
     if (! (2 + maxSlack <= tree.m_maxOccup)) {
       jam();
-        errorCode = (TuxFragRef::ErrorCode)TuxAddAttrRef::InvalidNodeSize;
-        break;
+      errorCode = (TuxFragRef::ErrorCode)TuxAddAttrRef::InvalidNodeSize;
+      break;
     }
     tree.m_minOccup = tree.m_maxOccup - maxSlack;
     // root node does not exist (also set by ctor)

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-04-24 16:20:23 +0000
@@ -102,7 +102,7 @@ Dbtux::insertNode(NodeHandle& node)
   TreeHead& tree = frag.m_tree;
   memset(node.getPref(), DataFillByte, tree.m_prefSize << 2);
   TreeEnt* entList = tree.getEntList(node.m_node);
-  memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
+  memset(entList, NodeFillByte, tree.m_maxOccup * (TreeEntSize << 2));
 #endif
 }
 
@@ -156,7 +156,7 @@ Dbtux::setNodePref(TuxCtx & ctx, NodeHan
 {
   const Frag& frag = node.m_frag;
   const TreeHead& tree = frag.m_tree;
-  readKeyAttrs(ctx, frag, node.getMinMax(0), 0, ctx.c_entryKey);
+  readKeyAttrs(ctx, frag, node.getEnt(0), 0, ctx.c_entryKey);
   copyAttrs(ctx, frag, ctx.c_entryKey, node.getPref(), tree.m_prefSize);
 }
 
@@ -185,14 +185,11 @@ Dbtux::nodePushUp(TuxCtx & ctx, NodeHand
     nodePushUpScans(node, pos);
   // fix node
   TreeEnt* const entList = tree.getEntList(node.m_node);
-  entList[occup] = entList[0];
-  TreeEnt* const tmpList = entList + 1;
   for (unsigned i = occup; i > pos; i--) {
     thrjam(ctx.jamBuffer);
-    tmpList[i] = tmpList[i - 1];
+    entList[i] = entList[i - 1];
   }
-  tmpList[pos] = ent;
-  entList[0] = entList[occup + 1];
+  entList[pos] = ent;
   node.setOccup(occup + 1);
   // add new scans
   if (scanList != RNIL)
@@ -258,14 +255,11 @@ Dbtux::nodePopDown(TuxCtx& ctx, NodeHand
   }
   // fix node
   TreeEnt* const entList = tree.getEntList(node.m_node);
-  entList[occup] = entList[0];
-  TreeEnt* const tmpList = entList + 1;
-  ent = tmpList[pos];
+  ent = entList[pos];
   for (unsigned i = pos; i < occup - 1; i++) {
     thrjam(ctx.jamBuffer);
-    tmpList[i] = tmpList[i + 1];
+    entList[i] = entList[i + 1];
   }
-  entList[0] = entList[occup - 1];
   node.setOccup(occup - 1);
   // fix prefix
   if (occup != 1 && pos == 0)
@@ -326,16 +320,13 @@ Dbtux::nodePushDown(TuxCtx& ctx, NodeHan
   }
   // fix node
   TreeEnt* const entList = tree.getEntList(node.m_node);
-  entList[occup] = entList[0];
-  TreeEnt* const tmpList = entList + 1;
-  TreeEnt oldMin = tmpList[0];
+  TreeEnt oldMin = entList[0];
   for (unsigned i = 0; i < pos; i++) {
     thrjam(ctx.jamBuffer);
-    tmpList[i] = tmpList[i + 1];
+    entList[i] = entList[i + 1];
   }
-  tmpList[pos] = ent;
+  entList[pos] = ent;
   ent = oldMin;
-  entList[0] = entList[occup];
   // fix prefix
   if (true)
     setNodePref(ctx, node);
@@ -396,16 +387,13 @@ Dbtux::nodePopUp(TuxCtx& ctx, NodeHandle
   }
   // fix node
   TreeEnt* const entList = tree.getEntList(node.m_node);
-  entList[occup] = entList[0];
-  TreeEnt* const tmpList = entList + 1;
   TreeEnt newMin = ent;
-  ent = tmpList[pos];
+  ent = entList[pos];
   for (unsigned i = pos; i > 0; i--) {
     thrjam(ctx.jamBuffer);
-    tmpList[i] = tmpList[i - 1];
+    entList[i] = entList[i - 1];
   }
-  tmpList[0] = newMin;
-  entList[0] = entList[occup];
+  entList[0] = newMin;
   // add scans
   if (scanList != RNIL)
     addScanList(node, 0, scanList);

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2011-04-19 09:01:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2011-04-25 15:57:28 +0000
@@ -39,15 +39,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
     c_indexPool.getPtr(indexPtr, req->tableId);
     // get the fragment
     FragPtr fragPtr;
-    fragPtr.i = RNIL;
-    for (unsigned i = 0; i < indexPtr.p->m_numFrags; i++) {
-      jam();
-      if (indexPtr.p->m_fragId[i] == req->fragmentNo) {
-        jam();
-        c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
-        break;
-      }
-    }
+    findFrag(*indexPtr.p, req->fragmentNo, fragPtr);
     ndbrequire(fragPtr.i != RNIL);
     Frag& frag = *fragPtr.p;
     // check for index not Online (i.e. Dropping)
@@ -858,8 +850,9 @@ Dbtux::scanFind(ScanOpPtr scanPtr)
  * 0 - up from left child (scan this node next)
  * 1 - up from right child (proceed to parent)
  * 2 - up from root (the scan ends)
- * 3 - left to right within node (at end proceed to right child)
+ * 3 - left to right within node (at end set state 5)
  * 4 - down from parent (proceed to left child)
+ * 5 - at node end proceed to right child (state becomes 4)
  *
  * If an entry was found, scan direction is 3.  Therefore tree
  * re-organizations need not worry about scan direction.
@@ -926,6 +919,19 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool 
       // pretend we came from left child
       pos.m_dir = idir;
     }
+    if (pos.m_dir == 5) {
+      // at node end proceed to right child
+      jam();
+      TupLoc loc = node.getLink(1 - idir);
+      if (loc != NullTupLoc) {
+        jam();
+        pos.m_loc = loc;
+        pos.m_dir = 4;  // down from parent as usual
+        continue;
+      }
+      // pretend we came from right child
+      pos.m_dir = 1 - idir;
+    }
     const unsigned occup = node.getOccup();
     if (occup == 0) {
       jam();
@@ -957,15 +963,8 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool 
         break;
       }
       // after node proceed to right child
-      TupLoc loc = node.getLink(1 - idir);
-      if (loc != NullTupLoc) {
-        jam();
-        pos.m_loc = loc;
-        pos.m_dir = 4;
-        continue;
-      }
-      // pretend we came from right child
-      pos.m_dir = 1 - idir;
+      pos.m_dir = 5;
+      continue;
     }
     if (pos.m_dir == 1 - idir) {
       // coming up from right child proceed to parent

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp	2011-04-27 21:13:10 +0000
@@ -20,28 +20,23 @@
 #include "Dbtux.hpp"
 
 /*
- * Search for entry to add.
+ * Search down non-empty tree for node to update.  Compare search key to
+ * each node minimum.  If greater, move to right subtree.  This can
+ * overshoot target node.  The last such node is saved.  The search ends
+ * at a final node which is a semi-leaf or leaf.  If search key is less
+ * than final node minimum then the saved node (if any) is the g.l.b of
+ * the final node and we move back to it.
  *
- * Similar to searchToRemove (see below).
+ * Search within the found node is done by caller.  On add, search key
+ * may be before minimum or after maximum entry.  On remove, search key
+ * is within the node.
  */
-bool
-Dbtux::searchToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+void
+Dbtux::findNodeToUpdate(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode)
 {
   const TreeHead& tree = frag.m_tree;
-  const unsigned numAttrs = frag.m_numAttrs;
-  NodeHandle currNode(frag);
-  currNode.m_loc = tree.m_root;
-  if (currNode.m_loc == NullTupLoc) {
-    // empty tree
-    thrjam(ctx.jamBuffer);
-    return true;
-  }
+  const Uint32 numAttrs = frag.m_numAttrs;
   NodeHandle glbNode(frag);     // potential g.l.b of final node
-  /*
-   * In order to not (yet) change old behaviour, a position between
-   * 2 nodes returns the one at the bottom of the tree.
-   */
-  NodeHandle bottomNode(frag);
   while (true) {
     thrjam(ctx.jamBuffer);
     selectNode(currNode, currNode.m_loc);
@@ -53,14 +48,14 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
       thrjam(ctx.jamBuffer);
       // read and compare remaining attributes
       ndbrequire(start < numAttrs);
-      readKeyAttrs(ctx, frag, currNode.getMinMax(0), start, ctx.c_entryKey);
+      readKeyAttrs(ctx, frag, currNode.getEnt(0), start, ctx.c_entryKey);
       ret = cmpSearchKey(ctx, frag, start, searchKey, ctx.c_entryKey);
       ndbrequire(ret != NdbSqlUtil::CmpUnknown);
     }
     if (ret == 0) {
       thrjam(ctx.jamBuffer);
       // keys are equal, compare entry values
-      ret = searchEnt.cmp(currNode.getMinMax(0));
+      ret = searchEnt.cmp(currNode.getEnt(0));
     }
     if (ret < 0) {
       thrjam(ctx.jamBuffer);
@@ -73,11 +68,12 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
       }
       if (! glbNode.isNull()) {
         thrjam(ctx.jamBuffer);
-        // move up to the g.l.b but remember the bottom node
-        bottomNode = currNode;
+        // move up to the g.l.b
         currNode = glbNode;
       }
-    } else if (ret > 0) {
+      break;
+    }
+    if (ret > 0) {
       thrjam(ctx.jamBuffer);
       const TupLoc loc = currNode.getLink(1);
       if (loc != NullTupLoc) {
@@ -88,26 +84,28 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
         currNode.m_loc = loc;
         continue;
       }
-    } else {
-      thrjam(ctx.jamBuffer);
-      treePos.m_loc = currNode.m_loc;
-      treePos.m_pos = 0;
-      // entry found - error
-      return false;
+      break;
     }
+    // ret == 0
+    thrjam(ctx.jamBuffer);
     break;
   }
-  // anticipate
-  treePos.m_loc = currNode.m_loc;
-  // binary search
+}
+
+/*
+ * Find position within the final node to add entry to.  Use binary
+ * search.  Return true if ok i.e. entry to add is not a duplicate.
+ */
+bool
+Dbtux::findPosToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
+{
   int lo = -1;
-  int hi = currNode.getOccup();
+  int hi = (int)currNode.getOccup();
   int ret;
-  while (1) {
+  while (hi - lo > 1) {
     thrjam(ctx.jamBuffer);
     // hi - lo > 1 implies lo < j < hi
     int j = (hi + lo) / 2;
-
     // read and compare attributes
     unsigned start = 0;
     readKeyAttrs(ctx, frag, currNode.getEnt(j), start, ctx.c_entryKey);
@@ -118,171 +116,113 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
       // keys are equal, compare entry values
       ret = searchEnt.cmp(currNode.getEnt(j));
     }
-    if (ret < 0)
+    if (ret < 0) {
+      thrjam(ctx.jamBuffer);
       hi = j;
-    else if (ret > 0)
+    } else if (ret > 0) {
+      thrjam(ctx.jamBuffer);
       lo = j;
-    else {
+    } else {
       treePos.m_pos = j;
       // entry found - error
       return false;
     }
-    if (hi - lo == 1)
-      break;
   }
-  if (ret < 0) {
+  ndbrequire(hi - lo == 1);
+  // return hi pos, see treeAdd() for next step
+  treePos.m_pos = hi;
+  return true;
+}
+
+/*
+ * Find position within the final node to remove entry from.  Use linear
+ * search.  Return true if ok i.e. the entry was found.
+ */
+bool
+Dbtux::findPosToRemove(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
+{
+  const unsigned occup = currNode.getOccup();
+  for (unsigned j = 0; j < occup; j++) {
     thrjam(ctx.jamBuffer);
-    treePos.m_pos = hi;
-    return true;
+    // compare only the entry
+    if (searchEnt.eq(currNode.getEnt(j))) {
+      thrjam(ctx.jamBuffer);
+      treePos.m_pos = j;
+      return true;
+    }
   }
-  if ((uint) hi < currNode.getOccup()) {
+  treePos.m_pos = occup;
+  // not found - failed
+  return false;
+}
+
+/*
+ * Search for entry to add.
+ */
+bool
+Dbtux::searchToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+{
+  const TreeHead& tree = frag.m_tree;
+  NodeHandle currNode(frag);
+  currNode.m_loc = tree.m_root;
+  if (unlikely(currNode.m_loc == NullTupLoc)) {
+    // empty tree
     thrjam(ctx.jamBuffer);
-    treePos.m_pos = hi;
     return true;
   }
-  if (bottomNode.isNull()) {
+  findNodeToUpdate(ctx, frag, searchKey, searchEnt, currNode);
+  treePos.m_loc = currNode.m_loc;
+  if (! findPosToAdd(ctx, frag, searchKey, searchEnt, currNode, treePos)) {
     thrjam(ctx.jamBuffer);
-    treePos.m_pos = hi;
-    return true;
+    return false;
   }
-  thrjam(ctx.jamBuffer);
-  // backwards compatible for now
-  treePos.m_loc = bottomNode.m_loc;
-  treePos.m_pos = 0;
   return true;
 }
 
 /*
  * Search for entry to remove.
- *
- * Compares search key to each node min.  A move to right subtree can
- * overshoot target node.  The last such node is saved.  The final node
- * is a semi-leaf or leaf.  If search key is less than final node min
- * then the saved node is the g.l.b of the final node and we move back
- * to it.
  */
 bool
-Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToRemove(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
 {
   const TreeHead& tree = frag.m_tree;
-  const unsigned numAttrs = frag.m_numAttrs;
   NodeHandle currNode(frag);
   currNode.m_loc = tree.m_root;
-  if (currNode.m_loc == NullTupLoc) {
+  if (unlikely(currNode.m_loc == NullTupLoc)) {
     // empty tree - failed
-    jam();
+    thrjam(ctx.jamBuffer);
     return false;
   }
-  NodeHandle glbNode(frag);     // potential g.l.b of final node
-  while (true) {
-    jam();
-    selectNode(currNode, currNode.m_loc);
-    int ret;
-    // compare prefix
-    unsigned start = 0;
-    ret = cmpSearchKey(c_ctx, frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
-    if (ret == NdbSqlUtil::CmpUnknown) {
-      jam();
-      // read and compare remaining attributes
-      ndbrequire(start < numAttrs);
-      readKeyAttrs(c_ctx, frag, currNode.getMinMax(0), start, c_ctx.c_entryKey);
-      ret = cmpSearchKey(c_ctx, frag, start, searchKey, c_ctx.c_entryKey);
-      ndbrequire(ret != NdbSqlUtil::CmpUnknown);
-    }
-    if (ret == 0) {
-      jam();
-      // keys are equal, compare entry values
-      ret = searchEnt.cmp(currNode.getMinMax(0));
-    }
-    if (ret < 0) {
-      jam();
-      const TupLoc loc = currNode.getLink(0);
-      if (loc != NullTupLoc) {
-        jam();
-        // continue to left subtree
-        currNode.m_loc = loc;
-        continue;
-      }
-      if (! glbNode.isNull()) {
-        jam();
-        // move up to the g.l.b
-        currNode = glbNode;
-      }
-    } else if (ret > 0) {
-      jam();
-      const TupLoc loc = currNode.getLink(1);
-      if (loc != NullTupLoc) {
-        jam();
-        // save potential g.l.b
-        glbNode = currNode;
-        // continue to right subtree
-        currNode.m_loc = loc;
-        continue;
-      }
-    } else {
-      jam();
-      treePos.m_loc = currNode.m_loc;
-      treePos.m_pos = 0;
-      return true;
-    }
-    break;
-  }
-  // anticipate
+  findNodeToUpdate(ctx, frag, searchKey, searchEnt, currNode);
   treePos.m_loc = currNode.m_loc;
-  // pos 0 was handled above
-  for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) {
-    jam();
-    // compare only the entry
-    if (searchEnt.eq(currNode.getEnt(j))) {
-      jam();
-      treePos.m_pos = j;
-      return true;
-    }
+  if (! findPosToRemove(ctx, frag, searchKey, searchEnt, currNode, treePos)) {
+    thrjam(ctx.jamBuffer);
+    return false;
   }
-  treePos.m_pos = currNode.getOccup();
-  // not found - failed
-  return false;
+  return true;
 }
 
 /*
- * Search for scan start position.
- *
- * Similar to searchToAdd.  The routines differ somewhat depending on
- * scan direction and are done by separate methods.
+ * Search down non-empty tree for node to start scan from.  Similar to
+ * findNodeToUpdate().  Direction is 0-ascending or 1-descending.
+ * Search within the found node is done by caller.
  */
 void
-Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
-{
-  const TreeHead& tree = frag.m_tree;
-  if (tree.m_root != NullTupLoc) {
-    if (! descending)
-      searchToScanAscending(frag, boundInfo, boundCount, treePos);
-    else
-      searchToScanDescending(frag, boundInfo, boundCount, treePos);
-    return;
-  }
-  // empty tree
-}
-
-void
-Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
+Dbtux::findNodeToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode)
 {
   const TreeHead& tree = frag.m_tree;
-  NodeHandle currNode(frag);
-  currNode.m_loc = tree.m_root;
   NodeHandle glbNode(frag);     // potential g.l.b of final node
-  NodeHandle bottomNode(frag);
   while (true) {
     jam();
     selectNode(currNode, currNode.m_loc);
     int ret;
     // compare prefix
-    ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
+    ret = cmpScanBound(frag, idir, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
     if (ret == NdbSqlUtil::CmpUnknown) {
       jam();
       // read and compare all attributes
-      readKeyAttrs(c_ctx, frag, currNode.getMinMax(0), 0, c_ctx.c_entryKey);
-      ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_ctx.c_entryKey);
+      readKeyAttrs(c_ctx, frag, currNode.getEnt(0), 0, c_ctx.c_entryKey);
+      ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
       ndbrequire(ret != NdbSqlUtil::CmpUnknown);
     }
     if (ret < 0) {
@@ -297,17 +237,12 @@ Dbtux::searchToScanAscending(Frag& frag,
       }
       if (! glbNode.isNull()) {
         jam();
-        // move up to the g.l.b but remember the bottom node
-        bottomNode = currNode;
+        // move up to the g.l.b
         currNode = glbNode;
-      } else {
-        // start scanning this node
-        treePos.m_loc = currNode.m_loc;
-        treePos.m_pos = 0;
-        treePos.m_dir = 3;
-        return;
       }
-    } else {
+      break;
+    }
+    if (ret > 0) {
       // bound is at or right of this node
       jam();
       const TupLoc loc = currNode.getLink(1);
@@ -319,117 +254,89 @@ Dbtux::searchToScanAscending(Frag& frag,
         currNode.m_loc = loc;
         continue;
       }
+      break;
     }
-    break;
+    // ret == 0 never
+    ndbrequire(false);
   }
-  for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
+}
+
+/*
+ * Search across final node for position to start scan from.  Use binary
+ * search similar to findPosToAdd().
+ */
+void
+Dbtux::findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos)
+{
+  const int jdir = 1 - 2 * int(idir);
+  int lo = -1;
+  int hi = (int)currNode.getOccup();
+  while (hi - lo > 1) {
     jam();
-    int ret;
-    // read and compare attributes
-    readKeyAttrs(c_ctx, frag, currNode.getEnt(j), 0, c_ctx.c_entryKey);
-    ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_ctx.c_entryKey);
-    ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+    // hi - lo > 1 implies lo < j < hi
+    int j = (hi + lo) / 2;
+    int ret = (-1) * jdir;
+    if (boundCount != 0) {
+      // read and compare attributes
+      const TreeEnt currEnt = currNode.getEnt(j);
+      readKeyAttrs(c_ctx, frag, currEnt, 0, c_ctx.c_entryKey);
+      ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
+    }
+    ndbrequire(ret != 0);
     if (ret < 0) {
-      // found first entry satisfying the bound
-      treePos.m_loc = currNode.m_loc;
-      treePos.m_pos = j;
-      treePos.m_dir = 3;
-      return;
+      jam();
+      hi = j;
+    } else if (ret > 0) {
+      jam();
+      lo = j;
+    } else {
+      // ret == 0 never
+      ndbrequire(false);
     }
   }
-  // bound is to right of this node
-  if (! bottomNode.isNull()) {
-    jam();
-    // start scanning the l.u.b
-    treePos.m_loc = bottomNode.m_loc;
-    treePos.m_pos = 0;
-    treePos.m_dir = 3;
-    return;
-  }
-  // start scanning upwards (pretend we came from right child)
-  treePos.m_loc = currNode.m_loc;
-  treePos.m_dir = 1;
+  // return hi pos, caller handles ascending vs descending
+  *pos = hi;
 }
 
+/*
+ * Search for scan start position.
+ */
 void
-Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
+Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
 {
   const TreeHead& tree = frag.m_tree;
   NodeHandle currNode(frag);
   currNode.m_loc = tree.m_root;
-  NodeHandle glbNode(frag);     // potential g.l.b of final node
-  NodeHandle bottomNode(frag);
-  while (true) {
+  if (unlikely(currNode.m_loc == NullTupLoc)) {
+    // empty tree
     jam();
-    selectNode(currNode, currNode.m_loc);
-    int ret;
-    // compare prefix
-    ret = cmpScanBound(frag, 1, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
-    if (ret == NdbSqlUtil::CmpUnknown) {
+    return;
+  }
+  const unsigned idir = unsigned(descending);
+  findNodeToScan(frag, idir, boundInfo, boundCount, currNode);
+  treePos.m_loc = currNode.m_loc;
+  Uint16 pos;
+  findPosToScan(frag, idir, boundInfo, boundCount, currNode, &pos);
+  const unsigned occup = currNode.getOccup();
+  if (idir == 0) {
+    if (pos < occup) {
       jam();
-      // read and compare all attributes
-      readKeyAttrs(c_ctx, frag, currNode.getMinMax(0), 0, c_ctx.c_entryKey);
-      ret = cmpScanBound(frag, 1, boundInfo, boundCount, c_ctx.c_entryKey);
-      ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+      treePos.m_pos = pos;
+      treePos.m_dir = 3;
+    } else {
+      // start scan after node end i.e. proceed to right child
+      treePos.m_pos = ZNIL;
+      treePos.m_dir = 5;
     }
-    if (ret < 0) {
-      // bound is left of this node
+  } else {
+    if (pos > 0) {
       jam();
-      const TupLoc loc = currNode.getLink(0);
-      if (loc != NullTupLoc) {
-        jam();
-        // continue to left subtree
-        currNode.m_loc = loc;
-        continue;
-      }
-      if (! glbNode.isNull()) {
-        jam();
-        // move up to the g.l.b but remember the bottom node
-        bottomNode = currNode;
-        currNode = glbNode;
-      } else {
-        // empty result set
-        return;
-      }
+      // start scan from previous entry
+      treePos.m_pos = pos - 1;
+      treePos.m_dir = 3;
     } else {
-      // bound is at or right of this node
-      jam();
-      const TupLoc loc = currNode.getLink(1);
-      if (loc != NullTupLoc) {
-        jam();
-        // save potential g.l.b
-        glbNode = currNode;
-        // continue to right subtree
-        currNode.m_loc = loc;
-        continue;
-      }
-    }
-    break;
-  }
-  for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
-    jam();
-    int ret;
-    // read and compare attributes
-    readKeyAttrs(c_ctx, frag, currNode.getEnt(j), 0, c_ctx.c_entryKey);
-    ret = cmpScanBound(frag, 1, boundInfo, boundCount, c_ctx.c_entryKey);
-    ndbrequire(ret != NdbSqlUtil::CmpUnknown);
-    if (ret < 0) {
-      if (j > 0) {
-        // start scanning from previous entry
-        treePos.m_loc = currNode.m_loc;
-        treePos.m_pos = j - 1;
-        treePos.m_dir = 3;
-        return;
-      }
-      // start scanning upwards (pretend we came from left child)
-      treePos.m_loc = currNode.m_loc;
-      treePos.m_pos = 0;
+      treePos.m_pos = ZNIL;
       treePos.m_dir = 0;
-      return;
     }
   }
-  // start scanning this node
-  treePos.m_loc = currNode.m_loc;
-  treePos.m_pos = currNode.getOccup() - 1;
-  treePos.m_dir = 3;
 }

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (jonas:4342 to 4343) jonas oreland27 Apr