List:Internals« Previous MessageNext Message »
From:Pekka Nousiainen Date:July 9 2004 10:42am
Subject:bk commit into 4.1 tree (pekka:1.1897)
View as plain text  
Below is the list of changes that have just been committed into a local
4.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://www.mysql.com/doc/I/n/Installing_source_tree.html

ChangeSet
  1.1897 04/07/08 14:35:19 pekka@stripped +13 -0
  tux optim 12 - remove max prefix + related

  ndb/test/ndbapi/testOIBasic.cpp
    1.9 04/07/08 14:34:31 pekka@stripped +3 -1
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/Times.txt
    1.13 04/07/08 14:34:31 pekka@stripped +3 -0
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/Makefile.am
    1.3 04/07/08 14:34:31 pekka@stripped +1 -0
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
    1.8 04/07/08 14:34:31 pekka@stripped +0 -106
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
    1.2 04/07/08 14:34:31 pekka@stripped +0 -0
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
    1.8 04/07/08 14:34:31 pekka@stripped +23 -122
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
    1.9 04/07/08 14:34:31 pekka@stripped +13 -22
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
    1.10 04/07/08 14:34:31 pekka@stripped +13 -9
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
    1.8 04/07/08 14:34:31 pekka@stripped +8 -0
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
    1.7 04/07/08 14:34:31 pekka@stripped +78 -17
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
    1.5 04/07/08 14:34:31 pekka@stripped +134 -72
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/Dbtux.hpp
    1.17 04/07/08 14:34:31 pekka@stripped +55 -23
    tux optim 12 - remove max prefix + related

  ndb/include/kernel/ndb_limits.h
    1.3 04/07/08 14:34:31 pekka@stripped +1 -1
    tux optim 12 - remove max prefix + related

  ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
    1.1 04/07/08 14:33:43 pekka@stripped +333 -0

  ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
    1.0 04/07/08 14:33:43 pekka@stripped +0 -0
    BitKeeper file /space/pekka/ndb/version/my41/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	orca.ndb.mysql.com
# Root:	/space/pekka/ndb/version/my41

--- 1.2/ndb/include/kernel/ndb_limits.h	2004-06-10 12:03:51 +02:00
+++ 1.3/ndb/include/kernel/ndb_limits.h	2004-07-08 14:34:31 +02:00
@@ -88,7 +88,7 @@
  * Ordered index constants.  Make configurable per index later.
  */
 #define MAX_TTREE_NODE_SIZE 64		// total words in node
-#define MAX_TTREE_PREF_SIZE 4		// words in min/max prefix each
+#define MAX_TTREE_PREF_SIZE 4		// words in min prefix
 #define MAX_TTREE_NODE_SLACK 3		// diff between max and min occupancy
 
 /*

--- 1.16/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2004-06-19 13:31:53 +02:00
+++ 1.17/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2004-07-08 14:34:31 +02:00
@@ -77,10 +77,14 @@
 #define jam()           jamLine(60000 + __LINE__)
 #define jamEntry()      jamEntryLine(60000 + __LINE__)
 #endif
-#ifdef DBTUX_CMP_CPP
+#ifdef DBTUX_SEARCH_CPP
 #define jam()           jamLine(70000 + __LINE__)
 #define jamEntry()      jamEntryLine(70000 + __LINE__)
 #endif
+#ifdef DBTUX_CMP_CPP
+#define jam()           jamLine(80000 + __LINE__)
+#define jamEntry()      jamEntryLine(80000 + __LINE__)
+#endif
 #ifdef DBTUX_DEBUG_CPP
 #define jam()           jamLine(90000 + __LINE__)
 #define jamEntry()      jamEntryLine(90000 + __LINE__)
@@ -112,6 +116,7 @@
   static const unsigned DescPageSize = 256;
 private:
   static const unsigned MaxTreeNodeSize = MAX_TTREE_NODE_SIZE;
+  static const unsigned MaxPrefSize = MAX_TTREE_PREF_SIZE;
   static const unsigned ScanBoundSegmentSize = 7;
   static const unsigned MaxAccLockOps = MAX_PARALLEL_OP_PER_SCAN;
   BLOCK_DEFINES(Dbtux);
@@ -206,19 +211,19 @@
     unsigned m_fragBit : 1;     // which duplicated table fragment
     TreeEnt();
     // methods
+    bool eq(const TreeEnt ent) const;
     int cmp(const TreeEnt ent) const;
   };
   static const unsigned TreeEntSize = sizeof(TreeEnt) >> 2;
   static const TreeEnt NullTreeEnt;
 
   /*
-   * Tree node has 1) fixed part 2) actual table data for min and max
-   * prefix 3) max and min entries 4) rest of entries 5) one extra entry
+   * Tree node has 1) fixed part 2) a prefix of index key data for min
+   * entry 3) max and min entries 4) rest of entries 5) one extra entry
    * used as work space.
    *
    * struct TreeNode            part 1, size 6 words
    * min prefix                 part 2, size TreeHead::m_prefSize
-   * max prefix                 part 2, size TreeHead::m_prefSize
    * max entry                  part 3
    * min entry                  part 3
    * rest of entries            part 4
@@ -265,14 +270,14 @@
   friend struct TreeHead;
   struct TreeHead {
     Uint8 m_nodeSize;           // words in tree node
-    Uint8 m_prefSize;           // words in min/max prefix each
+    Uint8 m_prefSize;           // words in min prefix
     Uint8 m_minOccup;           // min entries in internal node
     Uint8 m_maxOccup;           // max entries in node
     TupLoc m_root;              // root node
     TreeHead();
     // methods
     unsigned getSize(AccSize acc) const;
-    Data getPref(TreeNode* node, unsigned i) const;
+    Data getPref(TreeNode* node) const;
     TreeEnt* getEntList(TreeNode* node) const;
   };
 
@@ -514,6 +519,8 @@
     NodeHandle(Frag& frag);
     NodeHandle(const NodeHandle& node);
     NodeHandle& operator=(const NodeHandle& node);
+    // check if unassigned
+    bool isNull();
     // getters
     TupLoc getLink(unsigned i);
     unsigned getChilds();       // cannot spell
@@ -528,7 +535,7 @@
     void setBalance(int b);
     void setNodeScan(Uint32 scanPtrI);
     // access other parts of the node
-    Data getPref(unsigned i);
+    Data getPref();
     TreeEnt getEnt(unsigned pos);
     TreeEnt getMinMax(unsigned i);
     // for ndbrequire and ndbassert
@@ -618,7 +625,7 @@
   void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
   void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
   void deleteNode(Signal* signal, NodeHandle& node);
-  void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
+  void setNodePref(Signal* signal, NodeHandle& node);
   // node operations
   void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
   void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
@@ -633,7 +640,6 @@
   /*
    * DbtuxTree.cpp
    */
-  void treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
   void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
   void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
   void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
@@ -658,11 +664,19 @@
   void releaseScanOp(ScanOpPtr& scanPtr);
 
   /*
+   * DbtuxSearch.cpp
+   */
+  void searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
+  void searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
+  void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
+
+  /*
    * DbtuxCmp.cpp
    */
-  int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2 = MaxAttrDataSize);
-  int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2);
-  int cmpScanBound(const Frag& frag, const BoundPar boundPar);
+  int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
+  int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey);
+  int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
+  int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey);
 
   /*
    * DbtuxDebug.cpp
@@ -675,6 +689,7 @@
     TupLoc m_parent;            // expected parent address
     int m_depth;                // returned depth
     unsigned m_occup;           // returned occupancy
+    TreeEnt m_minmax[2];        // returned subtree min and max
     bool m_ok;                  // returned status
     PrintPar();
   };
@@ -699,6 +714,8 @@
     DebugTree = 4,              // log and check tree after each op
     DebugScan = 8               // log scans
   };
+  static const int DataFillByte = 0xa2;
+  static const int NodeFillByte = 0xa4;
 #endif
 
   // start up info
@@ -859,13 +876,18 @@
 {
 }
 
+inline bool
+Dbtux::TreeEnt::eq(const TreeEnt ent) const
+{
+  return
+    m_tupLoc == ent.m_tupLoc &&
+    m_tupVersion == ent.m_tupVersion &&
+    m_fragBit == ent.m_fragBit;
+}
+
 inline int
 Dbtux::TreeEnt::cmp(const TreeEnt ent) const
 {
-  if (m_fragBit < ent.m_fragBit)
-    return -1;
-  if (m_fragBit > ent.m_fragBit)
-    return +1;
   if (m_tupLoc.m_pageId < ent.m_tupLoc.m_pageId)
     return -1;
   if (m_tupLoc.m_pageId > ent.m_tupLoc.m_pageId)
@@ -878,6 +900,10 @@
     return -1;
   if (m_tupVersion > ent.m_tupVersion)
     return +1;
+  if (m_fragBit < ent.m_fragBit)
+    return -1;
+  if (m_fragBit > ent.m_fragBit)
+    return +1;
   return 0;
 }
 
@@ -920,7 +946,7 @@
   case AccHead:
     return NodeHeadSize;
   case AccPref:
-    return NodeHeadSize + 2 * m_prefSize + 2 * TreeEntSize;
+    return NodeHeadSize + m_prefSize + 2 * TreeEntSize;
   case AccFull:
     return m_nodeSize;
   }
@@ -929,16 +955,16 @@
 }
 
 inline Dbtux::Data
-Dbtux::TreeHead::getPref(TreeNode* node, unsigned i) const
+Dbtux::TreeHead::getPref(TreeNode* node) const
 {
-  Uint32* ptr = (Uint32*)node + NodeHeadSize + i * m_prefSize;
+  Uint32* ptr = (Uint32*)node + NodeHeadSize;
   return ptr;
 }
 
 inline Dbtux::TreeEnt*
 Dbtux::TreeHead::getEntList(TreeNode* node) const
 {
-  Uint32* ptr = (Uint32*)node + NodeHeadSize + 2 * m_prefSize;
+  Uint32* ptr = (Uint32*)node + NodeHeadSize + m_prefSize;
   return (TreeEnt*)ptr;
 }
 
@@ -1087,6 +1113,12 @@
   return *this;
 }
 
+inline bool
+Dbtux::NodeHandle::isNull()
+{
+  return m_node == 0;
+}
+
 inline Dbtux::TupLoc
 Dbtux::NodeHandle::getLink(unsigned i)
 {
@@ -1161,11 +1193,11 @@
 }
 
 inline Dbtux::Data
-Dbtux::NodeHandle::getPref(unsigned i)
+Dbtux::NodeHandle::getPref()
 {
   TreeHead& tree = m_frag.m_tree;
-  ndbrequire(m_acc >= AccPref && i <= 1);
-  return tree.getPref(m_node, i);
+  ndbrequire(m_acc >= AccPref);
+  return tree.getPref(m_node);
 }
 
 inline Dbtux::TreeEnt

--- 1.4/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp	2004-06-19 13:31:53 +02:00
+++ 1.5/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp	2004-07-08 14:34:31 +02:00
@@ -25,14 +25,14 @@
  * prefix may be partial in which case CmpUnknown may be returned.
  */
 int
-Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2)
+Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen)
 {
   const unsigned numAttrs = frag.m_numAttrs;
   const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
   // number of words of attribute data left
-  unsigned len2 = maxlen2;
+  unsigned len2 = maxlen;
   // skip to right position in search key
-  data1 += start;
+  searchKey += start;
   int ret = 0;
   while (start < numAttrs) {
     if (len2 < AttributeHeaderSize) {
@@ -41,20 +41,20 @@
       break;
     }
     len2 -= AttributeHeaderSize;
-    if (*data1 != 0) {
-      if (! data2.ah().isNULL()) {
+    if (*searchKey != 0) {
+      if (! entryData.ah().isNULL()) {
         jam();
         // current attribute
         const DescAttr& descAttr = descEnt.m_descAttr[start];
         const unsigned typeId = descAttr.m_typeId;
         // full data size
         const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
-        ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize());
+        ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
         const unsigned size2 = min(size1, len2);
         len2 -= size2;
         // compare
-        const Uint32* const p1 = *data1;
-        const Uint32* const p2 = &data2[AttributeHeaderSize];
+        const Uint32* const p1 = *searchKey;
+        const Uint32* const p2 = &entryData[AttributeHeaderSize];
         ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
         if (ret != 0) {
           jam();
@@ -67,15 +67,15 @@
         break;
       }
     } else {
-      if (! data2.ah().isNULL()) {
+      if (! entryData.ah().isNULL()) {
         jam();
         // NULL > not NULL
         ret = +1;
         break;
       }
     }
-    data1 += 1;
-    data2 += AttributeHeaderSize + data2.ah().getDataSize();
+    searchKey += 1;
+    entryData += AttributeHeaderSize + entryData.ah().getDataSize();
     start++;
   }
   // XXX until data format errors are handled
@@ -89,17 +89,17 @@
  * Start position is updated as in previous routine.
  */
 int
-Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2)
+Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey)
 {
   const unsigned numAttrs = frag.m_numAttrs;
   const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
   // skip to right position
-  data1 += start;
-  data2 += start;
+  searchKey += start;
+  entryKey += start;
   int ret = 0;
   while (start < numAttrs) {
-    if (*data1 != 0) {
-      if (*data2 != 0) {
+    if (*searchKey != 0) {
+      if (*entryKey != 0) {
         jam();
         // current attribute
         const DescAttr& descAttr = descEnt.m_descAttr[start];
@@ -107,8 +107,8 @@
         // full data size
         const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
         // compare
-        const Uint32* const p1 = *data1;
-        const Uint32* const p2 = *data2;
+        const Uint32* const p1 = *searchKey;
+        const Uint32* const p2 = *entryKey;
         ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
         if (ret != 0) {
           jam();
@@ -121,15 +121,15 @@
         break;
       }
     } else {
-      if (*data2 != 0) {
+      if (*entryKey != 0) {
         jam();
         // NULL > not NULL
         ret = +1;
         break;
       }
     }
-    data1 += 1;
-    data2 += 1;
+    searchKey += 1;
+    entryKey += 1;
     start++;
   }
   // XXX until data format errors are handled
@@ -137,71 +137,68 @@
   return ret;
 }
 
-
 /*
- * Scan bound vs tree entry.
+ * Scan bound vs node prefix.
  *
  * Compare lower or upper bound and index attribute data.  The attribute
  * data may be partial in which case CmpUnknown may be returned.
- * Returns -1 if the boundary is to the left of the compared key and +1 if
- * the boundary is to the right of the compared key.
+ * Returns -1 if the boundary is to the left of the compared key and +1
+ * if the boundary is to the right of the compared key.
  *
- * To get this behaviour we treat equality a little bit special.
- * If the boundary is a lower bound then the boundary is to the left of all
- * equal keys and if it is an upper bound then the boundary is to the right
- * of all equal keys.
+ * To get this behaviour we treat equality a little bit special.  If the
+ * boundary is a lower bound then the boundary is to the left of all
+ * equal keys and if it is an upper bound then the boundary is to the
+ * right of all equal keys.
  *
  * When searching for the first key we are using the lower bound to try
- * to find the first key that is to the right of the boundary.
- * Then we start scanning from this tuple (including the tuple itself)
- * until we find the first key which is to the right of the boundary. Then
- * we stop and do not include that key in the scan result.
+ * to find the first key that is to the right of the boundary.  Then we
+ * start scanning from this tuple (including the tuple itself) until we
+ * find the first key which is to the right of the boundary.  Then we
+ * stop and do not include that key in the scan result.
  */
 int
-Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar)
+Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
 {
-  unsigned type = 4;
-  int ret = 0;
-  /*
-  No boundary means full scan, low boundary is to the right of all keys.
-  Thus we should always return -1. For upper bound we are to the right of
-  all keys, thus we should always return +1. We achieve this behaviour
-  by initialising return value to 0 and set type to 4.
-  */
   const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
-  ConstData data1 = boundPar.m_data1;
-  ConstData data2 = boundPar.m_data2;
   // direction 0-lower 1-upper
-  const unsigned dir = boundPar.m_dir;
   ndbrequire(dir <= 1);
   // number of words of data left
-  unsigned len2 = boundPar.m_len2;
-  for (unsigned i = 0; i < boundPar.m_count1; i++) {
+  unsigned len2 = maxlen;
+  /*
+   * No boundary means full scan, low boundary is to the right of all
+   * keys.  Thus we should always return -1.  For upper bound we are to
+   * the right of all keys, thus we should always return +1.  We achieve
+   * this behaviour by initializing type to 4.
+   */
+  unsigned type = 4;
+  while (boundCount != 0) {
     if (len2 < AttributeHeaderSize) {
       jam();
       return NdbSqlUtil::CmpUnknown;
     }
     len2 -= AttributeHeaderSize;
     // get and skip bound type
-    type = data1[0];
-    data1 += 1;
-    ndbrequire(! data1.ah().isNULL());
-    if (! data2.ah().isNULL()) {
+    type = boundInfo[0];
+    boundInfo += 1;
+    ndbrequire(! boundInfo.ah().isNULL());
+    if (! entryData.ah().isNULL()) {
       jam();
       // current attribute
-      const unsigned index = data1.ah().getAttributeId();
+      const unsigned index = boundInfo.ah().getAttributeId();
       const DescAttr& descAttr = descEnt.m_descAttr[index];
       const unsigned typeId = descAttr.m_typeId;
-      ndbrequire(data2.ah().getAttributeId() == descAttr.m_primaryAttrId);
+      ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
       // full data size
-      const unsigned size1 = data1.ah().getDataSize();
-      ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize());
+      const unsigned size1 = boundInfo.ah().getDataSize();
+      ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
       const unsigned size2 = min(size1, len2);
       len2 -= size2;
       // compare
-      const Uint32* const p1 = &data1[AttributeHeaderSize];
-      const Uint32* const p2 = &data2[AttributeHeaderSize];
-      ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
+      const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
+      const Uint32* const p2 = &entryData[AttributeHeaderSize];
+      int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
+      // XXX until data format errors are handled
+      ndbrequire(ret != NdbSqlUtil::CmpError);
       if (ret != 0) {
         jam();
         return ret;
@@ -209,22 +206,22 @@
     } else {
       jam();
       /*
-      NULL is bigger than any bound, thus the boundary is always to the
-      left of NULL
-      */
+       * NULL is bigger than any bound, thus the boundary is always to
+       * the left of NULL.
+       */
       return -1;
     }
-    data1 += AttributeHeaderSize + data1.ah().getDataSize();
-    data2 += AttributeHeaderSize + data2.ah().getDataSize();
+    boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
+    entryData += AttributeHeaderSize + entryData.ah().getDataSize();
+    boundCount -= 1;
   }
-  ndbassert(ret == 0);
   if (dir == 0) {
     jam();
     /*
-    Looking for the lower bound. If strict lower bound then the boundary is
-    to the right of the compared key and otherwise (equal included in range)
-    then the boundary is to the left of the key.
-    */
+     * Looking for the lower bound.  If strict lower bound then the
+     * boundary is to the right of the compared key and otherwise (equal
+     * included in range) then the boundary is to the left of the key.
+     */
     if (type == 1) {
       jam();
       return +1;
@@ -233,10 +230,11 @@
   } else {
     jam();
     /*
-    Looking for the upper bound. If strict upper bound then the boundary is
-    to the left of all equal keys and otherwise (equal included in the
-    range) then the boundary is to the right of all equal keys.
-    */
+     * Looking for the upper bound.  If strict upper bound then the
+     * boundary is to the left of all equal keys and otherwise (equal
+     * included in the range) then the boundary is to the right of all
+     * equal keys.
+     */
     if (type == 3) {
       jam();
       return -1;
@@ -245,3 +243,67 @@
   }
 }
 
+/*
+ * Scan bound vs tree entry.
+ */
+int
+Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey)
+{
+  const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
+  // direction 0-lower 1-upper
+  ndbrequire(dir <= 1);
+  // initialize type to equality
+  unsigned type = 4;
+  while (boundCount != 0) {
+    // get and skip bound type
+    type = boundInfo[0];
+    boundInfo += 1;
+    ndbrequire(! boundInfo.ah().isNULL());
+    if (*entryKey != 0) {
+      jam();
+      // current attribute
+      const unsigned index = boundInfo.ah().getAttributeId();
+      const DescAttr& descAttr = descEnt.m_descAttr[index];
+      const unsigned typeId = descAttr.m_typeId;
+      // full data size
+      const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
+      // compare
+      const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
+      const Uint32* const p2 = *entryKey;
+      int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
+      // XXX until data format errors are handled
+      ndbrequire(ret != NdbSqlUtil::CmpError);
+      if (ret != 0) {
+        jam();
+        return ret;
+      }
+    } else {
+      jam();
+      /*
+       * NULL is bigger than any bound, thus the boundary is always to
+       * the left of NULL.
+       */
+      return -1;
+    }
+    boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
+    entryKey += 1;
+    boundCount -= 1;
+  }
+  if (dir == 0) {
+    // lower bound
+    jam();
+    if (type == 1) {
+      jam();
+      return +1;
+    }
+    return -1;
+  } else {
+    // upper bound
+    jam();
+    if (type == 3) {
+      jam();
+      return -1;
+    }
+    return +1;
+  }
+}

--- 1.6/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2004-06-17 10:50:58 +02:00
+++ 1.7/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2004-07-08 14:34:31 +02:00
@@ -137,16 +137,17 @@
       par.m_ok = false;
     }
   }
+  static const char* const sep = " *** ";
   // check child-parent links
   if (node.getLink(2) != par.m_parent) {
     par.m_ok = false;
-    out << par.m_path << " *** ";
+    out << par.m_path << sep;
     out << "parent loc " << hex << node.getLink(2);
     out << " should be " << hex << par.m_parent << endl;
   }
   if (node.getSide() != par.m_side) {
     par.m_ok = false;
-    out << par.m_path << " *** ";
+    out << par.m_path << sep;
     out << "side " << dec << node.getSide();
     out << " should be " << dec << par.m_side << endl;
   }
@@ -154,26 +155,26 @@
   const int balance = -cpar[0].m_depth + cpar[1].m_depth;
   if (node.getBalance() != balance) {
     par.m_ok = false;
-    out << par.m_path << " *** ";
+    out << par.m_path << sep;
     out << "balance " << node.getBalance();
     out << " should be " << balance << endl;
   }
   if (abs(node.getBalance()) > 1) {
     par.m_ok = false;
-    out << par.m_path << " *** ";
+    out << par.m_path << sep;
     out << "balance " << node.getBalance() << " is invalid" << endl;
   }
   // check occupancy
-  if (node.getOccup() > tree.m_maxOccup) {
+  if (node.getOccup() == 0 || node.getOccup() > tree.m_maxOccup) {
     par.m_ok = false;
-    out << par.m_path << " *** ";
+    out << par.m_path << sep;
     out << "occupancy " << node.getOccup();
-    out << " greater than max " << tree.m_maxOccup << endl;
+    out << " zero or greater than max " << tree.m_maxOccup << endl;
   }
   // check for occupancy of interior node
   if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) {
     par.m_ok = false;
-    out << par.m_path << " *** ";
+    out << par.m_path << sep;
     out << "occupancy " << node.getOccup() << " of interior node";
     out << " less than min " << tree.m_minOccup << endl;
   }
@@ -183,13 +184,74 @@
         node.getLink(1 - i) == NullTupLoc &&
         node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
       par.m_ok = false;
-      out << par.m_path << " *** ";
+      out << par.m_path << sep;
       out << "missed merge with child " << i << endl;
     }
   }
+  // check inline prefix
+  { ConstData data1 = node.getPref();
+    Uint32 data2[MaxPrefSize];
+    memset(data2, DataFillByte, MaxPrefSize << 2);
+    readKeyAttrs(frag, node.getMinMax(0), 0, c_searchKey);
+    copyAttrs(frag, c_searchKey, data2, tree.m_prefSize);
+    for (unsigned n = 0; n < tree.m_prefSize; n++) {
+      if (data1[n] != data2[n]) {
+        par.m_ok = false;
+        out << par.m_path << sep;
+        out << "inline prefix mismatch word " << n;
+        out << " value " << hex << data1[n];
+        out << " should be " << hex << data2[n] << endl;
+        break;
+      }
+    }
+  }
+  // check ordering within node
+  for (unsigned j = 1; j < node.getOccup(); j++) {
+    unsigned start = 0;
+    const TreeEnt ent1 = node.getEnt(j - 1);
+    const TreeEnt ent2 = node.getEnt(j);
+    if (j == 1) {
+      readKeyAttrs(frag, ent1, start, c_searchKey);
+    } else {
+      memcpy(c_searchKey, c_entryKey, frag.m_numAttrs << 2);
+    }
+    readKeyAttrs(frag, ent2, start, c_entryKey);
+    int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
+    if (ret == 0)
+      ret = ent1.cmp(ent2);
+    if (ret != -1) {
+      par.m_ok = false;
+      out << par.m_path << sep;
+      out << " disorder within node at pos " << j << endl;
+    }
+  }
+  // check ordering wrt subtrees
+  for (unsigned i = 0; i <= 1; i++) {
+    if (node.getLink(i) == NullTupLoc)
+      continue;
+    const TreeEnt ent1 = cpar[i].m_minmax[1 - i];
+    const TreeEnt ent2 = node.getMinMax(i);
+    unsigned start = 0;
+    readKeyAttrs(frag, ent1, start, c_searchKey);
+    readKeyAttrs(frag, ent2, start, c_entryKey);
+    int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
+    if (ret == 0)
+      ret = ent1.cmp(ent2);
+    if (ret != (i == 0 ? -1 : +1)) {
+      par.m_ok = false;
+      out << par.m_path << sep;
+      out << " disorder wrt subtree " << i << endl;
+    }
+  }
   // return values
   par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
   par.m_occup = node.getOccup();
+  for (unsigned i = 0; i <= 1; i++) {
+    if (node.getLink(i) == NullTupLoc)
+      par.m_minmax[i] = node.getMinMax(i);
+    else
+      par.m_minmax[i] = cpar[i].m_minmax[i];
+  }
 }
 
 NdbOut&
@@ -355,20 +417,19 @@
   out << " [acc " << dec << node.m_acc << "]";
   out << " [node " << *node.m_node << "]";
   if (node.m_acc >= Dbtux::AccPref) {
-    for (unsigned i = 0; i <= 1; i++) {
-      out << " [pref " << dec << i;
-      const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + i * tree.m_prefSize;
-      for (unsigned j = 0; j < node.m_frag.m_tree.m_prefSize; j++)
-        out << " " << hex << data[j];
-      out << "]";
-    }
+    const Uint32* data;
+    out << " [pref";
+    data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize;
+    for (unsigned j = 0; j < tree.m_prefSize; j++)
+      out << " " << hex << data[j];
+    out << "]";
     out << " [entList";
     unsigned numpos = node.m_node->m_occup;
     if (node.m_acc < Dbtux::AccFull && numpos > 2) {
       numpos = 2;
       out << "(" << dec << numpos << ")";
     }
-    const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + 2 * tree.m_prefSize;
+    data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
     const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
     for (unsigned pos = 0; pos < numpos; pos++)
       out << " " << entList[pos];

--- 1.7/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2004-06-19 13:31:53 +02:00
+++ 1.8/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2004-07-08 14:34:31 +02:00
@@ -26,8 +26,13 @@
 #ifdef VM_TRACE
   debugFile(0),
   debugOut(*new NullOutputStream()),
+  // until ndb_mgm supports dump
+#ifdef DBTUX_DEBUG_TREE
+  debugFlags(DebugTree),
+#else
   debugFlags(0),
 #endif
+#endif
   c_internalStartPhase(0),
   c_typeOfStart(NodeState::ST_ILLEGAL_TYPE),
   c_dataBuffer(0)
@@ -314,6 +319,9 @@
     keyAttrs += 1;
     data1 += 1;
   }
+#ifdef VM_TRACE
+  memset(data2, DataFillByte, len2 << 2);
+#endif
 }
 
 BLOCK_FUNCTIONS(Dbtux);

--- 1.9/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2004-06-19 13:31:53 +02:00
+++ 1.10/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2004-07-08 14:34:31 +02:00
@@ -111,19 +111,18 @@
     debugOut << endl;
   }
 #endif
-  // find position in tree
-  TreePos treePos;
-  treeSearch(signal, frag, c_searchKey, ent, treePos);
-#ifdef VM_TRACE
-  if (debugFlags & DebugMaint) {
-    debugOut << treePos << endl;
-  }
-#endif
   // do the operation
   req->errorCode = 0;
+  TreePos treePos;
   switch (opCode) {
   case TuxMaintReq::OpAdd:
     jam();
+    searchToAdd(signal, frag, c_searchKey, ent, treePos);
+#ifdef VM_TRACE
+    if (debugFlags & DebugMaint) {
+      debugOut << treePos << endl;
+    }
+#endif
     if (treePos.m_match) {
       jam();
       // there is no "Building" state so this will have to do
@@ -152,6 +151,12 @@
     break;
   case TuxMaintReq::OpRemove:
     jam();
+    searchToRemove(signal, frag, c_searchKey, ent, treePos);
+#ifdef VM_TRACE
+    if (debugFlags & DebugMaint) {
+      debugOut << treePos << endl;
+    }
+#endif
     if (! treePos.m_match) {
       jam();
       // there is no "Building" state so this will have to do
@@ -167,7 +172,6 @@
     ndbrequire(false);
     break;
   }
-  // commit and release nodes
 #ifdef VM_TRACE
   if (debugFlags & DebugTree) {
     printTree(signal, frag, debugOut);

--- 1.8/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2004-06-19 13:31:53 +02:00
+++ 1.9/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2004-07-08 14:34:31 +02:00
@@ -85,10 +85,9 @@
   new (node.m_node) TreeNode();
 #ifdef VM_TRACE
   TreeHead& tree = frag.m_tree;
-  memset(node.getPref(0), 0xa2, tree.m_prefSize << 2);
-  memset(node.getPref(1), 0xa2, tree.m_prefSize << 2);
+  memset(node.getPref(), DataFillByte, tree.m_prefSize << 2);
   TreeEnt* entList = tree.getEntList(node.m_node);
-  memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
+  memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
 #endif
 }
 
@@ -116,12 +115,12 @@
  * attribute headers for now.  XXX use null mask instead
  */
 void
-Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
+Dbtux::setNodePref(Signal* signal, NodeHandle& node)
 {
   const Frag& frag = node.m_frag;
   const TreeHead& tree = frag.m_tree;
-  readKeyAttrs(frag, node.getMinMax(i), 0, c_entryKey);
-  copyAttrs(frag, c_entryKey, node.getPref(i), tree.m_prefSize);
+  readKeyAttrs(frag, node.getMinMax(0), 0, c_entryKey);
+  copyAttrs(frag, c_entryKey, node.getPref(), tree.m_prefSize);
 }
 
 // node operations
@@ -173,11 +172,9 @@
   tmpList[pos] = ent;
   entList[0] = entList[occup + 1];
   node.setOccup(occup + 1);
-  // fix prefixes
+  // fix prefix
   if (occup == 0 || pos == 0)
-    setNodePref(signal, node, 0);
-  if (occup == 0 || pos == occup)
-    setNodePref(signal, node, 1);
+    setNodePref(signal, node);
 }
 
 /*
@@ -248,11 +245,9 @@
   }
   entList[0] = entList[occup - 1];
   node.setOccup(occup - 1);
-  // fix prefixes
+  // fix prefix
   if (occup != 1 && pos == 0)
-    setNodePref(signal, node, 0);
-  if (occup != 1 && pos == occup - 1)
-    setNodePref(signal, node, 1);
+    setNodePref(signal, node);
 }
 
 /*
@@ -325,11 +320,9 @@
   tmpList[pos] = ent;
   ent = oldMin;
   entList[0] = entList[occup];
-  // fix prefixes
+  // fix prefix
   if (true)
-    setNodePref(signal, node, 0);
-  if (occup == 1 || pos == occup - 1)
-    setNodePref(signal, node, 1);
+    setNodePref(signal, node);
 }
 
 /*
@@ -403,11 +396,9 @@
   }
   tmpList[0] = newMin;
   entList[0] = entList[occup];
-  // fix prefixes
+  // fix prefix
   if (true)
-    setNodePref(signal, node, 0);
-  if (occup == 1 || pos == occup - 1)
-    setNodePref(signal, node, 1);
+    setNodePref(signal, node);
 }
 
 /*

--- 1.7/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2004-06-18 13:46:05 +02:00
+++ 1.8/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2004-07-08 14:34:31 +02:00
@@ -689,16 +689,9 @@
   ScanOp& scan = *scanPtr.p;
   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
   TreeHead& tree = frag.m_tree;
-  if (tree.m_root == NullTupLoc) {
-    // tree may have become empty
-    jam();
-    scan.m_state = ScanOp::Last;
-    return;
-  }
-  TreePos pos;
-  pos.m_loc = tree.m_root;
-  NodeHandle node(frag);
-  // unpack lower bound
+  // set up index keys for this operation
+  setKeyAttrs(frag);
+  // unpack lower bound into c_dataBuffer
   const ScanBound& bound = *scan.m_bound[0];
   ScanBoundIterator iter;
   bound.first(iter);
@@ -707,103 +700,22 @@
     c_dataBuffer[j] = *iter.data;
     bound.next(iter);
   }
-  // comparison parameters
-  BoundPar boundPar;
-  boundPar.m_data1 = c_dataBuffer;
-  boundPar.m_count1 = scan.m_boundCnt[0];
-  boundPar.m_dir = 0;
-loop: {
+  // search for scan start position
+  TreePos treePos;
+  searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
+  if (treePos.m_loc == NullTupLoc) {
+    // empty tree
     jam();
-    selectNode(signal, node, pos.m_loc, AccPref);
-    const unsigned occup = node.getOccup();
-    ndbrequire(occup != 0);
-    for (unsigned i = 0; i <= 1; i++) {
-      jam();
-      // compare prefix
-      boundPar.m_data2 = node.getPref(i);
-      boundPar.m_len2 = tree.m_prefSize;
-      int ret = cmpScanBound(frag, boundPar);
-      if (ret == NdbSqlUtil::CmpUnknown) {
-        jam();
-        // read full value
-        ReadPar readPar;
-        readPar.m_ent = node.getMinMax(i);
-        readPar.m_first = 0;
-        readPar.m_count = frag.m_numAttrs;
-        readPar.m_data = 0;     // leave in signal data
-        tupReadAttrs(signal, frag, readPar);
-        // compare full value
-        boundPar.m_data2 = readPar.m_data;
-        boundPar.m_len2 = ZNIL; // big
-        ret = cmpScanBound(frag, boundPar);
-        ndbrequire(ret != NdbSqlUtil::CmpUnknown);
-      }
-      if (i == 0 && ret < 0) {
-        jam();
-        const TupLoc loc = node.getLink(i);
-        if (loc != NullTupLoc) {
-          jam();
-          // continue to left subtree
-          pos.m_loc = loc;
-          goto loop;
-        }
-        // start scanning this node
-        pos.m_pos = 0;
-        pos.m_match = false;
-        pos.m_dir = 3;
-        scan.m_scanPos = pos;
-        scan.m_state = ScanOp::Next;
-        linkScan(node, scanPtr);
-        return;
-      }
-      if (i == 1 && ret > 0) {
-        jam();
-        const TupLoc loc = node.getLink(i);
-        if (loc != NullTupLoc) {
-          jam();
-          // continue to right subtree
-          pos.m_loc = loc;
-          goto loop;
-        }
-        // start scanning upwards
-        pos.m_dir = 1;
-        scan.m_scanPos = pos;
-        scan.m_state = ScanOp::Next;
-        linkScan(node, scanPtr);
-        return;
-      }
-    }
-    // read rest of current node
-    accessNode(signal, node, AccFull);
-    // look for first entry
-    ndbrequire(occup >= 2);
-    for (unsigned j = 1; j < occup; j++) {
-      jam();
-      ReadPar readPar;
-      readPar.m_ent = node.getEnt(j);
-      readPar.m_first = 0;
-      readPar.m_count = frag.m_numAttrs;
-      readPar.m_data = 0;       // leave in signal data
-      tupReadAttrs(signal, frag, readPar);
-      // compare
-      boundPar.m_data2 = readPar.m_data;
-      boundPar.m_len2 = ZNIL;   // big
-      int ret = cmpScanBound(frag, boundPar);
-      ndbrequire(ret != NdbSqlUtil::CmpUnknown);
-      if (ret < 0) {
-        jam();
-        // start scanning this node
-        pos.m_pos = j;
-        pos.m_match = false;
-        pos.m_dir = 3;
-        scan.m_scanPos = pos;
-        scan.m_state = ScanOp::Next;
-        linkScan(node, scanPtr);
-        return;
-      }
-    }
-    ndbrequire(false);
+    scan.m_state = ScanOp::Last;
+    return;
   }
+  // set position and state
+  scan.m_scanPos = treePos;
+  scan.m_state = ScanOp::Next;
+  // link the scan to node found
+  NodeHandle node(frag);
+  selectNode(signal, node, treePos.m_loc, AccFull);
+  linkScan(node, scanPtr);
 }
 
 /*
@@ -841,7 +753,9 @@
     scan.m_accLockOp = RNIL;
     scan.m_state = ScanOp::Current;
   }
-  // unpack upper bound
+  // set up index keys for this operation
+  setKeyAttrs(frag);
+  // unpack upper bound into c_dataBuffer
   const ScanBound& bound = *scan.m_bound[1];
   ScanBoundIterator iter;
   bound.first(iter);
@@ -850,11 +764,6 @@
     c_dataBuffer[j] = *iter.data;
     bound.next(iter);
   }
-  // comparison parameters
-  BoundPar boundPar;
-  boundPar.m_data1 = c_dataBuffer;
-  boundPar.m_count1 = scan.m_boundCnt[1];
-  boundPar.m_dir = 1;
   // use copy of position
   TreePos pos = scan.m_scanPos;
   // get and remember original node
@@ -912,17 +821,9 @@
         jam();
         pos.m_ent = node.getEnt(pos.m_pos);
         pos.m_dir = 3;  // unchanged
-        // XXX implement prefix optimization
-        ReadPar readPar;
-        readPar.m_ent = pos.m_ent;
-        readPar.m_first = 0;
-        readPar.m_count = frag.m_numAttrs;
-        readPar.m_data = 0;     // leave in signal data
-        tupReadAttrs(signal, frag, readPar);
-        // compare
-        boundPar.m_data2 = readPar.m_data;
-        boundPar.m_len2 = ZNIL; // big
-        int ret = cmpScanBound(frag, boundPar);
+        // read and compare all attributes
+        readKeyAttrs(frag, pos.m_ent, 0, c_entryKey);
+        int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey);
         ndbrequire(ret != NdbSqlUtil::CmpUnknown);
         if (ret < 0) {
           jam();

--- 1.7/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp	2004-06-19 13:31:53 +02:00
+++ 1.8/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp	2004-07-08 14:34:31 +02:00
@@ -18,112 +18,6 @@
 #include "Dbtux.hpp"
 
 /*
- * Search for entry.
- *
- * Search key is index attribute data and tree entry value.  Start from
- * root node and compare the key to min/max of each node.  Use linear
- * search on the final (bounding) node.  Initial attributes which are
- * same in min/max need not be checked.
- */
-void
-Dbtux::treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
-{
-  const TreeHead& tree = frag.m_tree;
-  const unsigned numAttrs = frag.m_numAttrs;
-  treePos.m_loc = tree.m_root;
-  if (treePos.m_loc == NullTupLoc) {
-    // empty tree
-    jam();
-    treePos.m_pos = 0;
-    treePos.m_match = false;
-    return;
-  }
-  NodeHandle node(frag);
-loop: {
-    jam();
-    selectNode(signal, node, treePos.m_loc, AccPref);
-    const unsigned occup = node.getOccup();
-    ndbrequire(occup != 0);
-    // number of equal initial attributes in bounding node
-    unsigned start = ZNIL;
-    for (unsigned i = 0; i <= 1; i++) {
-      jam();
-      unsigned start1 = 0;
-      // compare prefix
-      int ret = cmpSearchKey(frag, start1, searchKey, node.getPref(i), tree.m_prefSize);
-      if (ret == NdbSqlUtil::CmpUnknown) {
-        jam();
-        // read and compare remaining attributes
-        readKeyAttrs(frag, node.getMinMax(i), start1, c_entryKey);
-        ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
-        ndbrequire(ret != NdbSqlUtil::CmpUnknown);
-      }
-      if (start > start1)
-        start = start1;
-      if (ret == 0) {
-        jam();
-        // keys are equal, compare entry values
-        ret = searchEnt.cmp(node.getMinMax(i));
-      }
-      if (i == 0 ? (ret < 0) : (ret > 0)) {
-        jam();
-        const TupLoc loc = node.getLink(i);
-        if (loc != NullTupLoc) {
-          jam();
-          // continue to left/right subtree
-          treePos.m_loc = loc;
-          goto loop;
-        }
-        // position is immediately before/after this node
-        treePos.m_pos = (i == 0 ? 0 : occup);
-        treePos.m_match = false;
-        return;
-      }
-      if (ret == 0) {
-        jam();
-        // position is at first/last entry
-        treePos.m_pos = (i == 0 ? 0 : occup - 1);
-        treePos.m_match = true;
-        return;
-      }
-    }
-    // access rest of the bounding node
-    accessNode(signal, node, AccFull);
-    // position is strictly within the node
-    ndbrequire(occup >= 2);
-    const unsigned numWithin = occup - 2;
-    for (unsigned j = 1; j <= numWithin; j++) {
-      jam();
-      int ret = 0;
-      if (start < numAttrs) {
-        jam();
-        // read and compare remaining attributes
-        unsigned start1 = start;
-        readKeyAttrs(frag, node.getEnt(j), start1, c_entryKey);
-        ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
-        ndbrequire(ret != NdbSqlUtil::CmpUnknown);
-      }
-      if (ret == 0) {
-        jam();
-        // keys are equal, compare entry values
-        ret = searchEnt.cmp(node.getEnt(j));
-      }
-      if (ret <= 0) {
-        jam();
-        // position is before or at this entry
-        treePos.m_pos = j;
-        treePos.m_match = (ret == 0);
-        return;
-      }
-    }
-    // position is before last entry
-    treePos.m_pos = occup - 1;
-    treePos.m_match = false;
-    return;
-  }
-}
-
-/*
  * Add entry.
  */
 void

--- 1.8/ndb/test/ndbapi/testOIBasic.cpp	2004-06-23 11:49:22 +02:00
+++ 1.9/ndb/test/ndbapi/testOIBasic.cpp	2004-07-08 14:34:31 +02:00
@@ -2525,7 +2525,7 @@
   for (unsigned i = 0; i < par.m_subloop; i++) {
     RUNSTEP(par, pkupdateindexbuild, MT);
     RUNSTEP(par, invalidateindex, MT);
-    RUNSTEP(par, readverify, MT);
+    RUNSTEP(par, readverify, ST);
     RUNSTEP(par, dropindex, ST);
   }
   return 0;
@@ -2564,9 +2564,11 @@
     t1.off(par.m_totrows);
     RUNSTEP(par, createindex, ST);
     RUNSTEP(par, invalidateindex, MT);
+    RUNSTEP(par, readverify, ST);
     t2.on();
     RUNSTEP(par, pkupdate, MT);
     t2.off(par.m_totrows);
+    RUNSTEP(par, readverify, ST);
     RUNSTEP(par, dropindex, ST);
   }
   LL1("update - " << t1.time());
--- New file ---
+++ ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp	04/07/08 14:33:43
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#define DBTUX_SEARCH_CPP
#include "Dbtux.hpp"

/*
 * Search for entry to add.
 *
 * Similar to searchToRemove (see below).
 *
 * TODO optimize for initial equal attrs in node min/max
 */
void
Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
  const TreeHead& tree = frag.m_tree;
  const unsigned numAttrs = frag.m_numAttrs;
  NodeHandle currNode(frag);
  currNode.m_loc = tree.m_root;
  if (currNode.m_loc == NullTupLoc) {
    // empty tree
    jam();
    treePos.m_match = false;
    return;
  }
  NodeHandle glbNode(frag);     // potential g.l.b of final node
  /*
   * In order to not (yet) change old behaviour, a position between
   * 2 nodes returns the one at the bottom of the tree.
   */
  NodeHandle bottomNode(frag);
  while (true) {
    jam();
    selectNode(signal, currNode, currNode.m_loc, AccPref);
    int ret;
    // compare prefix
    unsigned start = 0;
    ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
    if (ret == NdbSqlUtil::CmpUnknown) {
      jam();
      // read and compare remaining attributes
      ndbrequire(start < numAttrs);
      readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
      ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
      ndbrequire(ret != NdbSqlUtil::CmpUnknown);
    }
    if (ret == 0) {
      jam();
      // keys are equal, compare entry values
      ret = searchEnt.cmp(currNode.getMinMax(0));
    }
    if (ret < 0) {
      jam();
      const TupLoc loc = currNode.getLink(0);
      if (loc != NullTupLoc) {
        jam();
        // continue to left subtree
        currNode.m_loc = loc;
        continue;
      }
      if (! glbNode.isNull()) {
        jam();
        // move up to the g.l.b but remember the bottom node
        bottomNode = currNode;
        currNode = glbNode;
      }
    } else if (ret > 0) {
      jam();
      const TupLoc loc = currNode.getLink(1);
      if (loc != NullTupLoc) {
        jam();
        // save potential g.l.b
        glbNode = currNode;
        // continue to right subtree
        currNode.m_loc = loc;
        continue;
      }
    } else {
      jam();
      treePos.m_loc = currNode.m_loc;
      treePos.m_pos = 0;
      treePos.m_match = true;
      return;
    }
    break;
  }
  // access rest of current node
  accessNode(signal, currNode, AccFull);
  for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
    jam();
    int ret;
    // read and compare attributes
    unsigned start = 0;
    readKeyAttrs(frag, currNode.getEnt(j), start, c_entryKey);
    ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
    ndbrequire(ret != NdbSqlUtil::CmpUnknown);
    if (ret == 0) {
      jam();
      // keys are equal, compare entry values
      ret = searchEnt.cmp(currNode.getEnt(j));
    }
    if (ret <= 0) {
      jam();
      treePos.m_loc = currNode.m_loc;
      treePos.m_pos = j;
      treePos.m_match = (ret == 0);
      return;
    }
  }
  if (! bottomNode.isNull()) {
    jam();
    // backwards compatible for now
    treePos.m_loc = bottomNode.m_loc;
    treePos.m_pos = 0;
    treePos.m_match = false;
    return;
  }
  treePos.m_loc = currNode.m_loc;
  treePos.m_pos = currNode.getOccup();
  treePos.m_match = false;
}

/*
 * Search for entry to remove.
 *
 * Compares search key to each node min.  A move to right subtree can
 * overshoot target node.  The last such node is saved.  The final node
 * is a half-leaf or leaf.  If search key is less than final node min
 * then the saved node is the g.l.b of the final node and we move back
 * to it.
 */
void
Dbtux::searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
  const TreeHead& tree = frag.m_tree;
  const unsigned numAttrs = frag.m_numAttrs;
  NodeHandle currNode(frag);
  currNode.m_loc = tree.m_root;
  if (currNode.m_loc == NullTupLoc) {
    // empty tree
    jam();
    treePos.m_match = false;
    return;
  }
  NodeHandle glbNode(frag);     // potential g.l.b of final node
  while (true) {
    jam();
    selectNode(signal, currNode, currNode.m_loc, AccPref);
    int ret;
    // compare prefix
    unsigned start = 0;
    ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
    if (ret == NdbSqlUtil::CmpUnknown) {
      jam();
      // read and compare remaining attributes
      ndbrequire(start < numAttrs);
      readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
      ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
      ndbrequire(ret != NdbSqlUtil::CmpUnknown);
    }
    if (ret == 0) {
      jam();
      // keys are equal, compare entry values
      ret = searchEnt.cmp(currNode.getMinMax(0));
    }
    if (ret < 0) {
      jam();
      const TupLoc loc = currNode.getLink(0);
      if (loc != NullTupLoc) {
        jam();
        // continue to left subtree
        currNode.m_loc = loc;
        continue;
      }
      if (! glbNode.isNull()) {
        jam();
        // move up to the g.l.b
        currNode = glbNode;
      }
    } else if (ret > 0) {
      jam();
      const TupLoc loc = currNode.getLink(1);
      if (loc != NullTupLoc) {
        jam();
        // save potential g.l.b
        glbNode = currNode;
        // continue to right subtree
        currNode.m_loc = loc;
        continue;
      }
    } else {
      jam();
      treePos.m_loc = currNode.m_loc;
      treePos.m_pos = 0;
      treePos.m_match = true;
      return;
    }
    break;
  }
  // access rest of current node
  accessNode(signal, currNode, AccFull);
  // pos 0 was handled above
  for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) {
    jam();
    // compare only the entry
    if (searchEnt.eq(currNode.getEnt(j))) {
      jam();
      treePos.m_loc = currNode.m_loc;
      treePos.m_pos = j;
      treePos.m_match = true;
      return;
    }
  }
  treePos.m_loc = currNode.m_loc;
  treePos.m_pos = currNode.getOccup();
  treePos.m_match = false;
}

/*
 * Search for scan start position.
 *
 * Similar to searchToAdd.
 */
void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
  const TreeHead& tree = frag.m_tree;
  NodeHandle currNode(frag);
  currNode.m_loc = tree.m_root;
  if (currNode.m_loc == NullTupLoc) {
    // empty tree
    jam();
    treePos.m_match = false;
    return;
  }
  NodeHandle glbNode(frag);     // potential g.l.b of final node
  NodeHandle bottomNode(frag);
  while (true) {
    jam();
    selectNode(signal, currNode, currNode.m_loc, AccPref);
    int ret;
    // compare prefix
    ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
    if (ret == NdbSqlUtil::CmpUnknown) {
      jam();
      // read and compare all attributes
      readKeyAttrs(frag, currNode.getMinMax(0), 0, c_entryKey);
      ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
      ndbrequire(ret != NdbSqlUtil::CmpUnknown);
    }
    if (ret < 0) {
      jam();
      const TupLoc loc = currNode.getLink(0);
      if (loc != NullTupLoc) {
        jam();
        // continue to left subtree
        currNode.m_loc = loc;
        continue;
      }
      if (! glbNode.isNull()) {
        jam();
        // move up to the g.l.b but remember the bottom node
        bottomNode = currNode;
        currNode = glbNode;
      } else {
        // start scanning this node
        treePos.m_loc = currNode.m_loc;
        treePos.m_pos = 0;
        treePos.m_match = false;
        treePos.m_dir = 3;
        return;
      }
    } else if (ret > 0) {
      jam();
      const TupLoc loc = currNode.getLink(1);
      if (loc != NullTupLoc) {
        jam();
        // save potential g.l.b
        glbNode = currNode;
        // continue to right subtree
        currNode.m_loc = loc;
        continue;
      }
    } else {
      ndbassert(false);
    }
    break;
  }
  // access rest of current node
  accessNode(signal, currNode, AccFull);
  for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
    jam();
    int ret;
    // read and compare attributes
    readKeyAttrs(frag, currNode.getEnt(j), 0, c_entryKey);
    ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
    ndbrequire(ret != NdbSqlUtil::CmpUnknown);
    if (ret < 0) {
      // start scanning from current entry
      treePos.m_loc = currNode.m_loc;
      treePos.m_pos = j;
      treePos.m_match = false;
      treePos.m_dir = 3;
      return;
    }
  }
  if (! bottomNode.isNull()) {
    jam();
    // start scanning the l.u.b
    treePos.m_loc = bottomNode.m_loc;
    treePos.m_pos = 0;
    treePos.m_match = false;
    treePos.m_dir = 3;
    return;
  }
  // start scanning upwards (pretend we came from right child)
  treePos.m_loc = currNode.m_loc;
  treePos.m_dir = 1;
}


--- 1.12/ndb/src/kernel/blocks/dbtux/Times.txt	2004-06-19 13:31:53 +02:00
+++ 1.13/ndb/src/kernel/blocks/dbtux/Times.txt	2004-07-08 14:34:31 +02:00
@@ -49,4 +49,7 @@
 optim 11        mc02/a  43 ms   63 ms    46 pct
                 mc02/b  52 ms   86 ms    63 pct
 
+optim 12        mc02/a  38 ms   55 ms    43 pct
+                mc02/b  47 ms   77 ms    63 pct
+
 vim: set et:

--- 1.2/ndb/src/kernel/blocks/dbtux/Makefile.am	2004-06-16 16:27:56 +02:00
+++ 1.3/ndb/src/kernel/blocks/dbtux/Makefile.am	2004-07-08 14:34:31 +02:00
@@ -7,6 +7,7 @@
 			DbtuxNode.cpp \
 			DbtuxTree.cpp \
 			DbtuxScan.cpp \
+			DbtuxSearch.cpp \
 			DbtuxCmp.cpp \
 			DbtuxDebug.cpp
 
Thread
bk commit into 4.1 tree (pekka:1.1897)Pekka Nousiainen9 Jul