MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:pekka Date:October 9 2006 2:13pm
Subject:bk commit into 5.0 tree (pekka:1.2266) BUG#20446
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2006-10-09 16:13:32+02:00, pekka@stripped +6 -0
  ndb - bug#20446: fix + other TUX scan improvements

  ndb/src/kernel/blocks/dbtux/Dbtux.hpp@stripped, 2006-10-09 16:12:09+02:00, pekka@stripped +46 -19
    fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)

  ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp@stripped, 2006-10-09 16:12:10+02:00, pekka@stripped +0 -1
    fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)

  ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp@stripped, 2006-10-09 16:12:10+02:00, pekka@stripped +13 -0
    fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)

  ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp@stripped, 2006-10-09 16:12:10+02:00, pekka@stripped +7 -6
    fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)

  ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp@stripped, 2006-10-09 16:12:10+02:00, pekka@stripped +139 -82
    fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)

  ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp@stripped, 2006-10-09 16:12:10+02:00, pekka@stripped +20 -37
    fixes to move scan when entry removed: 1. do not check visibility to transaction 2. keep Blocked if moved to another version of same tuple 3. general improvements (remove m_match etc)

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/export/home/space/pekka/ndb/version/my50-bug20446

--- 1.41/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2006-10-09 16:13:48 +02:00
+++ 1.42/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2006-10-09 16:13:48 +02:00
@@ -188,6 +188,7 @@
     unsigned m_fragBit : 1;     // which duplicated table fragment
     TreeEnt();
     // methods
+    bool eqtuple(const TreeEnt ent) const;
     bool eq(const TreeEnt ent) const;
     int cmp(const TreeEnt ent) const;
   };
@@ -265,8 +266,7 @@
   struct TreePos {
     TupLoc m_loc;               // physical node address
     Uint16 m_pos;               // position 0 to m_occup
-    Uint8 m_match;              // at an existing entry
-    Uint8 m_dir;                // see scanNext()
+    Uint8 m_dir;                // see scanNext
     TreePos();
   };
 
@@ -357,12 +357,13 @@
     enum {
       Undef = 0,
       First = 1,                // before first entry
-      Current = 2,              // at current before locking
-      Blocked = 3,              // at current waiting for ACC lock
-      Locked = 4,               // at current and locked or no lock needed
-      Next = 5,                 // looking for next extry
-      Last = 6,                 // after last entry
-      Aborting = 7,             // lock wait at scan close
+      Current = 2,              // at some entry
+      Found = 3,                // return current as next scan result
+      Blocked = 4,              // found and waiting for ACC lock
+      Locked = 5,               // found and locked or no lock needed
+      Next = 6,                 // looking for next extry
+      Last = 7,                 // after last entry
+      Aborting = 8,             // lock wait at scan close
       Invalid = 9               // cannot return REF to LQH currently
     };
     Uint16 m_state;
@@ -539,6 +540,7 @@
   void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
   void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
   void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
+  void unpackBound(const ScanBound& bound, Data data);
 
   /*
    * DbtuxMeta.cpp
@@ -613,7 +615,9 @@
   void execACCKEYREF(Signal* signal);
   void execACC_ABORTCONF(Signal* signal);
   void scanFirst(ScanOpPtr scanPtr);
+  void scanFind(ScanOpPtr scanPtr);
   void scanNext(ScanOpPtr scanPtr, bool fromMaintReq);
+  bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent);
   bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
   void scanClose(Signal* signal, ScanOpPtr scanPtr);
   void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
@@ -623,8 +627,8 @@
   /*
    * DbtuxSearch.cpp
    */
-  void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
-  void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+  bool searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+  bool searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
   void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
   void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
   void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
@@ -782,6 +786,14 @@
 }
 
 inline bool
+Dbtux::TreeEnt::eqtuple(const TreeEnt ent) const
+{
+  return
+    m_tupLoc == ent.m_tupLoc &&
+    m_fragBit == ent.m_fragBit;
+}
+
+inline bool
 Dbtux::TreeEnt::eq(const TreeEnt ent) const
 {
   return
@@ -793,6 +805,11 @@
 inline int
 Dbtux::TreeEnt::cmp(const TreeEnt ent) const
 {
+  // compare frag first to improve cacheing in 5.0
+  if (m_fragBit < ent.m_fragBit)
+    return -1;
+  if (m_fragBit > ent.m_fragBit)
+    return +1;
   if (m_tupLoc.getPageId() < ent.m_tupLoc.getPageId())
     return -1;
   if (m_tupLoc.getPageId() > ent.m_tupLoc.getPageId())
@@ -801,14 +818,25 @@
     return -1;
   if (m_tupLoc.getPageOffset() > ent.m_tupLoc.getPageOffset())
     return +1;
-  if (m_tupVersion < ent.m_tupVersion)
-    return -1;
-  if (m_tupVersion > ent.m_tupVersion)
-    return +1;
-  if (m_fragBit < ent.m_fragBit)
-    return -1;
-  if (m_fragBit > ent.m_fragBit)
-    return +1;
+  /*
+   * Guess if one tuple version has wrapped around.  This is well
+   * defined ordering on existing versions since versions are assigned
+   * consecutively and different versions exists only on uncommitted
+   * tuple.  Assuming max 2**14 uncommitted ops on same tuple.
+   */
+  const unsigned version_wrap_limit = (1 << (ZTUP_VERSION_BITS - 1));
+  if (m_tupVersion < ent.m_tupVersion) {
+    if (ent.m_tupVersion - m_tupVersion < version_wrap_limit)
+      return -1;
+    else
+      return +1;
+  }
+  if (m_tupVersion > ent.m_tupVersion) {
+    if (m_tupVersion - ent.m_tupVersion < version_wrap_limit)
+      return +1;
+    else
+      return -1;
+  }
   return 0;
 }
 
@@ -875,7 +903,6 @@
 Dbtux::TreePos::TreePos() :
   m_loc(),
   m_pos(ZNIL),
-  m_match(false),
   m_dir(255)
 {
 }

--- 1.20/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2006-10-09 16:13:48 +02:00
+++ 1.21/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2006-10-09 16:13:48 +02:00
@@ -311,7 +311,6 @@
   out << "[TreePos " << hex << &pos;
   out << " [loc " << pos.m_loc << "]";
   out << " [pos " << dec << pos.m_pos << "]";
-  out << " [match " << dec << pos.m_match << "]";
   out << " [dir " << dec << pos.m_dir << "]";
   out << "]";
   return out;

--- 1.19/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2006-10-09 16:13:48 +02:00
+++ 1.20/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2006-10-09 16:13:48 +02:00
@@ -314,4 +314,17 @@
 #endif
 }
 
+void
+Dbtux::unpackBound(const ScanBound& bound, Data dest)
+{
+  ScanBoundIterator iter;
+  bound.first(iter);
+  const unsigned n = bound.getSize();
+  unsigned j;
+  for (j = 0; j < n; j++) {
+    dest[j] = *iter.data;
+    bound.next(iter);
+  }
+}
+
 BLOCK_FUNCTIONS(Dbtux)

--- 1.16/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2006-10-09 16:13:48 +02:00
+++ 1.17/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2006-10-09 16:13:48 +02:00
@@ -113,16 +113,17 @@
   // do the operation
   req->errorCode = 0;
   TreePos treePos;
+  bool ok;
   switch (opCode) {
   case TuxMaintReq::OpAdd:
     jam();
-    searchToAdd(frag, c_searchKey, ent, treePos);
+    ok = searchToAdd(frag, c_searchKey, ent, treePos);
 #ifdef VM_TRACE
     if (debugFlags & DebugMaint) {
-      debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
+      debugOut << treePos << (! ok ? " - error" : "") << endl;
     }
 #endif
-    if (treePos.m_match) {
+    if (! ok) {
       jam();
       // there is no "Building" state so this will have to do
       if (indexPtr.p->m_state == Index::Online) {
@@ -152,13 +153,13 @@
     break;
   case TuxMaintReq::OpRemove:
     jam();
-    searchToRemove(frag, c_searchKey, ent, treePos);
+    ok = searchToRemove(frag, c_searchKey, ent, treePos);
 #ifdef VM_TRACE
     if (debugFlags & DebugMaint) {
-      debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
+      debugOut << treePos << (! ok ? " - error" : "") << endl;
     }
 #endif
-    if (! treePos.m_match) {
+    if (! ok) {
       jam();
       // there is no "Building" state so this will have to do
       if (indexPtr.p->m_state == Index::Online) {

--- 1.27/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2006-10-09 16:13:48 +02:00
+++ 1.28/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2006-10-09 16:13:48 +02:00
@@ -421,21 +421,17 @@
     jam();
     // search is done only once in single range scan
     scanFirst(scanPtr);
-#ifdef VM_TRACE
-    if (debugFlags & DebugScan) {
-      debugOut << "First scan " << scanPtr.i << " " << scan << endl;
-    }
-#endif
   }
-  if (scan.m_state == ScanOp::Next) {
+  if (scan.m_state == ScanOp::Current ||
+      scan.m_state == ScanOp::Next) {
     jam();
     // look for next
-    scanNext(scanPtr, false);
+    scanFind(scanPtr);
   }
-  // for reading tuple key in Current or Locked state
+  // for reading tuple key in Found or Locked state
   Data pkData = c_dataBuffer;
   unsigned pkSize = 0; // indicates not yet done
-  if (scan.m_state == ScanOp::Current) {
+  if (scan.m_state == ScanOp::Found) {
     // found an entry to return
     jam();
     ndbrequire(scan.m_accLockOp == RNIL);
@@ -509,8 +505,8 @@
         jam();
         // max ops should depend on max scans (assert only)
         ndbassert(false);
-        // stay in Current state
-        scan.m_state = ScanOp::Current;
+        // stay in Found state
+        scan.m_state = ScanOp::Found;
         signal->theData[0] = scan.m_userPtr;
         signal->theData[1] = true;
         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
@@ -697,44 +693,95 @@
 }
 
 /*
- * Find start position for single range scan.  If it exists, sets state
- * to Next and links the scan to the node.  The first entry is returned
- * by scanNext.
+ * Find start position for single range scan.
  */
 void
 Dbtux::scanFirst(ScanOpPtr scanPtr)
 {
   ScanOp& scan = *scanPtr.p;
   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+#ifdef VM_TRACE
+  if (debugFlags & DebugScan) {
+    debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
+  }
+#endif
   TreeHead& tree = frag.m_tree;
   // set up index keys for this operation
   setKeyAttrs(frag);
   // scan direction 0, 1
   const unsigned idir = scan.m_descending;
-  // unpack start key into c_dataBuffer
-  const ScanBound& bound = *scan.m_bound[idir];
-  ScanBoundIterator iter;
-  bound.first(iter);
-  for (unsigned j = 0; j < bound.getSize(); j++) {
-    jam();
-    c_dataBuffer[j] = *iter.data;
-    bound.next(iter);
-  }
+  unpackBound(*scan.m_bound[idir], c_dataBuffer);
   TreePos treePos;
   searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
-  if (treePos.m_loc == NullTupLoc) {
-    // empty result set
+  if (treePos.m_loc != NullTupLoc) {
+    scan.m_scanPos = treePos;
+    // link the scan to node found
+    NodeHandle node(frag);
+    selectNode(node, treePos.m_loc);
+    linkScan(node, scanPtr);
+    if (treePos.m_dir == 3) {
+      jam();
+      // check upper bound
+      TreeEnt ent = node.getEnt(treePos.m_pos);
+      if (scanCheck(scanPtr, ent))
+        scan.m_state = ScanOp::Current;
+      else
+        scan.m_state = ScanOp::Last;
+    } else {
+      scan.m_state = ScanOp::Next;
+    }
+  } else {
     jam();
     scan.m_state = ScanOp::Last;
-    return;
   }
-  // set position and state
-  scan.m_scanPos = treePos;
-  scan.m_state = ScanOp::Next;
-  // link the scan to node found
-  NodeHandle node(frag);
-  selectNode(node, treePos.m_loc);
-  linkScan(node, scanPtr);
+#ifdef VM_TRACE
+  if (debugFlags & DebugScan) {
+    debugOut << "Leave first scan " << scanPtr.i << " " << scan << endl;
+  }
+#endif
+}
+
+/*
+ * Look for entry to return as scan result.
+ */
+void
+Dbtux::scanFind(ScanOpPtr scanPtr)
+{
+  ScanOp& scan = *scanPtr.p;
+  Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+#ifdef VM_TRACE
+  if (debugFlags & DebugScan) {
+    debugOut << "Enter find scan " << scanPtr.i << " " << scan << endl;
+  }
+#endif
+  ndbrequire(scan.m_state == ScanOp::Current || scan.m_state == ScanOp::Next);
+  while (1) {
+    jam();
+    if (scan.m_state == ScanOp::Next)
+      scanNext(scanPtr, false);
+    if (scan.m_state == ScanOp::Current) {
+      jam();
+      const TreePos pos = scan.m_scanPos;
+      NodeHandle node(frag);
+      selectNode(node, pos.m_loc);
+      const TreeEnt ent = node.getEnt(pos.m_pos);
+      if (scanVisible(scanPtr, ent)) {
+        jam();
+        scan.m_state = ScanOp::Found;
+        scan.m_scanEnt = ent;
+        break;
+      }
+    } else {
+      jam();
+      break;
+    }
+    scan.m_state = ScanOp::Next;
+  }
+#ifdef VM_TRACE
+  if (debugFlags & DebugScan) {
+    debugOut << "Leave find scan " << scanPtr.i << " " << scan << endl;
+  }
+#endif
 }
 
 /*
@@ -752,6 +799,11 @@
  *
  * If an entry was found, scan direction is 3.  Therefore tree
  * re-organizations need not worry about scan direction.
+ *
+ * This method is also used to move a scan when its entry is removed
+ * (see moveScanList).  If the scan is Blocked, we check if it remains
+ * Blocked on a different version of the tuple.  Otherwise the tuple is
+ * lost and state becomes Current.
  */
 void
 Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
@@ -759,8 +811,8 @@
   ScanOp& scan = *scanPtr.p;
   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
 #ifdef VM_TRACE
-  if (debugFlags & DebugScan) {
-    debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
+  if (debugFlags & (DebugMaint | DebugScan)) {
+    debugOut << "Enter next scan " << scanPtr.i << " " << scan << endl;
   }
 #endif
   // cannot be moved away from tuple we have locked
@@ -770,15 +822,7 @@
   // scan direction
   const unsigned idir = scan.m_descending; // 0, 1
   const int jdir = 1 - 2 * (int)idir;      // 1, -1
-  // unpack end key into c_dataBuffer
-  const ScanBound& bound = *scan.m_bound[1 - idir];
-  ScanBoundIterator iter;
-  bound.first(iter);
-  for (unsigned j = 0; j < bound.getSize(); j++) {
-    jam();
-    c_dataBuffer[j] = *iter.data;
-    bound.next(iter);
-  }
+  unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
   // use copy of position
   TreePos pos = scan.m_scanPos;
   // get and remember original node
@@ -792,15 +836,14 @@
   while (true) {
     jam();
 #ifdef VM_TRACE
-    if (debugFlags & DebugScan) {
-      debugOut << "Scan next pos " << pos << " " << node << endl;
+    if (debugFlags & (DebugMaint | DebugScan)) {
+      debugOut << "Current scan " << scanPtr.i << " pos " << pos << " node " << node << endl;
     }
 #endif
     if (pos.m_dir == 2) {
       // coming up from root ends the scan
       jam();
       pos.m_loc = NullTupLoc;
-      scan.m_state = ScanOp::Last;
       break;
     }
     if (node.m_loc != pos.m_loc) {
@@ -832,41 +875,22 @@
     if (pos.m_dir == idir) {
       // coming up from left child scan current node
       jam();
-      pos.m_pos = idir == 0 ? 0 : occup - 1;
-      pos.m_match = false;
+      pos.m_pos = idir == 0 ? (Uint16)-1 : occup;
       pos.m_dir = 3;
     }
     if (pos.m_dir == 3) {
-      // within node
+      // before or within node
       jam();
-      // advance position
-      if (! pos.m_match)
-        pos.m_match = true;
-      else
-        // becomes ZNIL (which is > occup) if 0 and scan descending
-        pos.m_pos += jdir;
+      // advance position - becomes ZNIL (> occup) if 0 and descending
+      pos.m_pos += jdir;
       if (pos.m_pos < occup) {
         jam();
-        ent = node.getEnt(pos.m_pos);
         pos.m_dir = 3;  // unchanged
-        // read and compare all attributes
-        readKeyAttrs(frag, ent, 0, c_entryKey);
-        int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, scan.m_boundCnt[1 - idir], c_entryKey);
-        ndbrequire(ret != NdbSqlUtil::CmpUnknown);
-        if (jdir * ret < 0) {
+        ent = node.getEnt(pos.m_pos);
+        if (! scanCheck(scanPtr, ent)) {
           jam();
-          // hit upper bound of single range scan
           pos.m_loc = NullTupLoc;
-          scan.m_state = ScanOp::Last;
-          break;
         }
-        // can we see it
-        if (! scanVisible(scanPtr, ent)) {
-          jam();
-          continue;
-        }
-        // found entry
-        scan.m_state = ScanOp::Current;
         break;
       }
       // after node proceed to right child
@@ -892,28 +916,61 @@
   // copy back position
   scan.m_scanPos = pos;
   // relink
-  if (scan.m_state == ScanOp::Current) {
-    ndbrequire(pos.m_match == true && pos.m_dir == 3);
+  if (pos.m_loc != NullTupLoc) {
+    ndbrequire(pos.m_dir == 3);
     ndbrequire(pos.m_loc == node.m_loc);
     if (origNode.m_loc != node.m_loc) {
       jam();
       unlinkScan(origNode, scanPtr);
       linkScan(node, scanPtr);
     }
-    // copy found entry
-    scan.m_scanEnt = ent;
-  } else if (scan.m_state == ScanOp::Last) {
+    if (scan.m_state != ScanOp::Blocked) {
+      scan.m_state = ScanOp::Current;
+    } else {
+      jam();
+      ndbrequire(fromMaintReq);
+      TreeEnt& scanEnt = scan.m_scanEnt;
+      ndbrequire(scanEnt.m_tupLoc != NullTupLoc);
+      if (scanEnt.eqtuple(ent)) {
+        // remains blocked on another version
+        scanEnt = ent;
+      } else {
+        jam();
+        scanEnt.m_tupLoc = NullTupLoc;
+        scan.m_state = ScanOp::Current;
+      }
+    }
+  } else {
     jam();
-    ndbrequire(pos.m_loc == NullTupLoc);
     unlinkScan(origNode, scanPtr);
-  } else {
-    ndbrequire(false);
+    scan.m_state = ScanOp::Last;
   }
 #ifdef VM_TRACE
-  if (debugFlags & DebugScan) {
-    debugOut << "Next out scan " << scanPtr.i << " " << scan << endl;
+  if (debugFlags & (DebugMaint | DebugScan)) {
+    debugOut << "Leave next scan " << scanPtr.i << " " << scan << endl;
   }
 #endif
+}
+
+/*
+ * Check end key.  Return true if scan is still within range.
+ */
+bool
+Dbtux::scanCheck(ScanOpPtr scanPtr, TreeEnt ent)
+{
+  ScanOp& scan = *scanPtr.p;
+  Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+  const unsigned idir = scan.m_descending;
+  const int jdir = 1 - 2 * (int)idir;
+  unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
+  unsigned boundCnt = scan.m_boundCnt[1 - idir];
+  readKeyAttrs(frag, ent, 0, c_entryKey);
+  int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, boundCnt, c_entryKey);
+  ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+  if (jdir * ret > 0)
+    return true;
+  // hit upper bound of single range scan
+  return false;
 }
 
 /*

--- 1.9/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp	2006-10-09 16:13:48 +02:00
+++ 1.10/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp	2006-10-09 16:13:48 +02:00
@@ -21,22 +21,18 @@
  * Search for entry to add.
  *
  * Similar to searchToRemove (see below).
- *
- * TODO optimize for initial equal attrs in node min/max
  */
-void
+bool
 Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
 {
   const TreeHead& tree = frag.m_tree;
   const unsigned numAttrs = frag.m_numAttrs;
   NodeHandle currNode(frag);
   currNode.m_loc = tree.m_root;
-  // assume success
-  treePos.m_match = false;
   if (currNode.m_loc == NullTupLoc) {
     // empty tree
     jam();
-    return;
+    return true;
   }
   NodeHandle glbNode(frag);     // potential g.l.b of final node
   /*
@@ -94,9 +90,8 @@
       jam();
       treePos.m_loc = currNode.m_loc;
       treePos.m_pos = 0;
-      // failed
-      treePos.m_match = true;
-      return;
+      // entry found - error
+      return false;
     }
     break;
   }
@@ -104,7 +99,7 @@
   treePos.m_loc = currNode.m_loc;
   // binary search
   int lo = -1;
-  unsigned hi = currNode.getOccup();
+  int hi = currNode.getOccup();
   int ret;
   while (1) {
     jam();
@@ -126,9 +121,8 @@
       lo = j;
     else {
       treePos.m_pos = j;
-      // failed
-      treePos.m_match = true;
-      return;
+      // entry found - error
+      return false;
     }
     if (hi - lo == 1)
       break;
@@ -136,22 +130,23 @@
   if (ret < 0) {
     jam();
     treePos.m_pos = hi;
-    return;
+    return true;
   }
   if (hi < currNode.getOccup()) {
     jam();
     treePos.m_pos = hi;
-    return;
+    return true;
   }
   if (bottomNode.isNull()) {
     jam();
     treePos.m_pos = hi;
-    return;
+    return true;
   }
   jam();
   // backwards compatible for now
   treePos.m_loc = bottomNode.m_loc;
   treePos.m_pos = 0;
+  return true;
 }
 
 /*
@@ -163,21 +158,17 @@
  * then the saved node is the g.l.b of the final node and we move back
  * to it.
  */
-void
+bool
 Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
 {
   const TreeHead& tree = frag.m_tree;
   const unsigned numAttrs = frag.m_numAttrs;
   NodeHandle currNode(frag);
   currNode.m_loc = tree.m_root;
-  // assume success
-  treePos.m_match = true;
   if (currNode.m_loc == NullTupLoc) {
-    // empty tree
+    // empty tree - failed
     jam();
-    // failed
-    treePos.m_match = false;
-    return;
+    return false;
   }
   NodeHandle glbNode(frag);     // potential g.l.b of final node
   while (true) {
@@ -229,7 +220,7 @@
       jam();
       treePos.m_loc = currNode.m_loc;
       treePos.m_pos = 0;
-      return;
+      return true;
     }
     break;
   }
@@ -242,12 +233,12 @@
     if (searchEnt.eq(currNode.getEnt(j))) {
       jam();
       treePos.m_pos = j;
-      return;
+      return true;
     }
   }
   treePos.m_pos = currNode.getOccup();
-  // failed
-  treePos.m_match = false;
+  // not found - failed
+  return false;
 }
 
 /*
@@ -278,8 +269,6 @@
   currNode.m_loc = tree.m_root;
   NodeHandle glbNode(frag);     // potential g.l.b of final node
   NodeHandle bottomNode(frag);
-  // always before entry
-  treePos.m_match = false;
   while (true) {
     jam();
     selectNode(currNode, currNode.m_loc);
@@ -315,7 +304,7 @@
         treePos.m_dir = 3;
         return;
       }
-    } else if (ret > 0) {
+    } else {
       // bound is at or right of this node
       jam();
       const TupLoc loc = currNode.getLink(1);
@@ -327,8 +316,6 @@
         currNode.m_loc = loc;
         continue;
       }
-    } else {
-      ndbrequire(false);
     }
     break;
   }
@@ -369,8 +356,6 @@
   currNode.m_loc = tree.m_root;
   NodeHandle glbNode(frag);     // potential g.l.b of final node
   NodeHandle bottomNode(frag);
-  // always before entry
-  treePos.m_match = false;
   while (true) {
     jam();
     selectNode(currNode, currNode.m_loc);
@@ -403,7 +388,7 @@
         // empty result set
         return;
       }
-    } else if (ret > 0) {
+    } else {
       // bound is at or right of this node
       jam();
       const TupLoc loc = currNode.getLink(1);
@@ -415,8 +400,6 @@
         currNode.m_loc = loc;
         continue;
       }
-    } else {
-      ndbrequire(false);
     }
     break;
   }
Thread
bk commit into 5.0 tree (pekka:1.2266) BUG#20446pekka9 Oct