List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:May 4 2011 12:08pm
Subject:bzr commit into mysql-5.1-telco-7.0-wl4163 branch (pekka:4356) WL#4163
View as plain text  
#At file:///export/space/pekka/ms/ms-wl4163-70/ based on revid:pekka@stripped

 4356 Pekka Nousiainen	2011-05-04
      wl#4163 g03_ops.diff
      index scan using packed data

    modified:
      storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2011-05-04 12:08:38 +0000
@@ -317,16 +317,22 @@ private:
 
   typedef NdbPack::DataC KeyDataC;
   typedef NdbPack::Data KeyData;
+  typedef NdbPack::BoundC KeyBoundC;
+  typedef NdbPack::Bound KeyBound;
 
   // range scan
- 
+
   /*
-   * Scan bounds are stored in linked list of segments.
+   * ScanBound instances are members of ScanOp.  Bound data is stored in
+   * a separate segmented buffer pool.
    */
-  typedef DataBuffer<ScanBoundSegmentSize> ScanBound;
-  typedef DataBuffer<ScanBoundSegmentSize>::ConstDataBufferIterator ScanBoundIterator;
-  typedef DataBuffer<ScanBoundSegmentSize>::DataBufferPool ScanBoundPool;
-  ScanBoundPool c_scanBoundPool;
+  struct ScanBound {
+    DataBuffer<ScanBoundSegmentSize>::Head m_head;
+    Uint16 m_cnt;       // number of attributes
+    Int16 m_side;
+    ScanBound();
+  };
+  DataBuffer<ScanBoundSegmentSize>::DataBufferPool c_scanBoundPool;
 
   // ScanLock
   struct ScanLock {
@@ -395,10 +401,7 @@ private:
     Uint8 m_readCommitted;      // no locking
     Uint8 m_lockMode;
     Uint8 m_descending;
-    ScanBound m_boundMin;
-    ScanBound m_boundMax;
-    ScanBound* m_bound[2];      // pointers to above 2
-    Uint16 m_boundCnt[2];       // number of bounds in each
+    ScanBound m_scanBound[2];
     TreePos m_scanPos;          // position
     TreeEnt m_scanEnt;          // latest entry found
     Uint32 m_nodeScan;          // next scan at node (single-linked)
@@ -407,7 +410,7 @@ private:
     Uint32 nextList;
     };
     Uint32 prevList;
-    ScanOp(ScanBoundPool& scanBoundPool);
+    ScanOp();
   };
   typedef Ptr<ScanOp> ScanOpPtr;
   ArrayPool<ScanOp> c_scanOpPool;
@@ -549,7 +552,7 @@ private:
   void readKeyAttrs(TuxCtx&, const Frag& frag, TreeEnt ent, KeyData& keyData, Uint32 count);
   void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
   void copyAttrs(TuxCtx&, const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
-  void unpackBound(const ScanBound& bound, Data data);
+  void unpackBound(TuxCtx&, const ScanBound& bound, KeyBoundC& searchBound);
   void findFrag(const Index& index, Uint32 fragId, FragPtr& fragPtr);
 
   /*
@@ -647,9 +650,9 @@ private:
   bool findPosToRemove(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
   bool searchToAdd(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos);
   bool searchToRemove(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos);
-  void findNodeToScan(Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode);
-  void findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos);
-  void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
+  void findNodeToScan(Frag& frag, unsigned dir, const KeyBoundC& searchBound, NodeHandle& currNode);
+  void findPosToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode, Uint16* pos);
+  void searchToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, TreePos& treePos);
 
   /*
    * DbtuxCmp.cpp
@@ -657,6 +660,7 @@ private:
   int cmpSearchKey(TuxCtx&, const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
   int cmpSearchKey(const KeyDataC& searchKey, const KeyDataC& entryKey, Uint32 cnt);
   int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
+  int cmpSearchBound(const KeyBoundC& searchBound, const KeyDataC& entryKey, Uint32 cnt);
 
   /*
    * DbtuxStat.cpp
@@ -947,10 +951,20 @@ Dbtux::DescPage::DescPage() :
   }
 }
 
+// Dbtux::ScanBound
+
+inline
+Dbtux::ScanBound::ScanBound() :
+  m_head(),
+  m_cnt(0),
+  m_side(0)
+{
+}
+
 // Dbtux::ScanOp
 
 inline
-Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
+Dbtux::ScanOp::ScanOp() :
   m_state(Undef),
   m_lockwait(false),
   m_errorCode(0),
@@ -967,16 +981,11 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& sca
   m_readCommitted(0),
   m_lockMode(0),
   m_descending(0),
-  m_boundMin(scanBoundPool),
-  m_boundMax(scanBoundPool),
+  m_scanBound(),
   m_scanPos(),
   m_scanEnt(),
   m_nodeScan(RNIL)
 {
-  m_bound[0] = &m_boundMin;
-  m_bound[1] = &m_boundMax;
-  m_boundCnt[0] = 0;
-  m_boundCnt[1] = 0;
 }
 
 // Dbtux::Index
@@ -1277,6 +1286,24 @@ Dbtux::cmpSearchKey(const KeyDataC& sear
     debugOut << " entry:" << entryKey.print(tmp, sizeof(tmp));
     debugOut << endl;
   }
+#endif
+  return ret;
+}
+
+inline int
+Dbtux::cmpSearchBound(const KeyBoundC& searchBound, const KeyDataC& entryKey, Uint32 cnt)
+{
+  // compare cnt attributes from each
+  Uint32 num_eq;
+  int ret = searchBound.cmp(entryKey, cnt, num_eq);
+#ifdef VM_TRACE
+  if (debugFlags & DebugScan) {
+    char tmp[MaxAttrDataSize << 2];
+    debugOut << "cmpSearchBound: res:" << ret;
+    debugOut << " search:" << searchBound.print(tmp, sizeof(tmp));
+    debugOut << " entry:" << entryKey.print(tmp, sizeof(tmp));
+    debugOut << endl;
+  }
 #endif
   return ret;
 }

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2011-05-04 12:08:38 +0000
@@ -439,17 +439,16 @@ operator<<(NdbOut& out, const Dbtux::Sca
   out << " [pos " << scan.m_scanPos << "]";
   out << " [ent " << scan.m_scanEnt << "]";
   for (unsigned i = 0; i <= 1; i++) {
-    out << " [bound " << dec << i;
-    Dbtux::ScanBound& bound = *scan.m_bound[i];
-    Dbtux::ScanBoundIterator iter;
-    bound.first(iter);
-    for (unsigned j = 0; j < bound.getSize(); j++) {
-      out << " " << hex << *iter.data;
-      bound.next(iter);
-    }
+    const Dbtux::ScanBound scanBound = scan.m_scanBound[i];
+    const Dbtux::Index& index = *tux->c_indexPool.getPtr(scan.m_indexId);
+    Dbtux::KeyDataC keyBoundData(index.m_keySpec, true);
+    Dbtux::KeyBoundC keyBound(keyBoundData);
+    tux->unpackBound(tux->c_ctx, scanBound, keyBound);
+    char tmp[Dbtux::MaxAttrDataSize << 2];
+    out << " [scanBound " << dec << i;
+    out << " " << keyBound.print(tmp, sizeof(tmp));
     out << "]";
   }
-  out << "]";
   return out;
 }
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2011-05-04 12:08:38 +0000
@@ -398,16 +398,26 @@ Dbtux::copyAttrs(TuxCtx& ctx, const Frag
 }
 
 void
-Dbtux::unpackBound(const ScanBound& bound, Data dest)
+Dbtux::unpackBound(TuxCtx& ctx, const ScanBound& scanBound, KeyBoundC& searchBound)
 {
-  ScanBoundIterator iter;
-  bound.first(iter);
-  const unsigned n = bound.getSize();
-  unsigned j;
-  for (j = 0; j < n; j++) {
-    dest[j] = *iter.data;
-    bound.next(iter);
+  // there is no const version of LocalDataBuffer
+  DataBuffer<ScanBoundSegmentSize>::Head head = scanBound.m_head;
+  LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+  DataBuffer<ScanBoundSegmentSize>::ConstDataBufferIterator iter;
+  // always use searchKey buffer
+  Uint32* const outputBuffer = ctx.c_searchKey;
+  b.first(iter);
+  const Uint32 n = b.getSize();
+  ndbrequire(n <= MaxAttrDataSize);
+  for (Uint32 i = 0; i < n; i++) {
+    outputBuffer[i] = *iter.data;
+    b.next(iter);
   }
+  // set bound to the unpacked data buffer
+  KeyDataC& searchBoundData = searchBound.get_data();
+  searchBoundData.set_buf(outputBuffer, MaxAttrDataSize << 2, scanBound.m_cnt);
+  int ret = searchBound.finalize(scanBound.m_side);
+  ndbrequire(ret == 0);
 }
 
 void

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2011-05-04 12:08:38 +0000
@@ -75,7 +75,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
       errorCode = AccScanRef::TuxNoFreeScanOp;
       break;
     }
-    new (scanPtr.p) ScanOp(c_scanBoundPool);
+    new (scanPtr.p) ScanOp;
     scanPtr.p->m_state = ScanOp::First;
     scanPtr.p->m_userPtr = req->senderData;
     scanPtr.p->m_userRef = req->senderRef;
@@ -131,9 +131,9 @@ Dbtux::execACC_SCANREQ(Signal* signal)
  * Check that sets of lower and upper bounds are on initial sequences of
  * keys and that all but possibly last bound is non-strict.
  *
- * Finally save the sets of lower and upper bounds (i.e. start key and
- * end key).  Full bound type is included but only the strict bit is
- * used since lower and upper have now been separated.
+ * Finally convert the sets of lower and upper bounds (i.e. start key
+ * and end key) to NdbPack format.  The data is saved in segmented
+ * memory.  The bound is reconstructed at use time via unpackBound().
  *
  * Error handling:  Error code is set in the scan and also returned in
  * EXECUTE_DIRECT (the old way).
@@ -143,175 +143,144 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal
 {
   jamEntry();
   // get records
-  TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
-  const TuxBoundInfo* const req = (const TuxBoundInfo*)sig;
+  TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
   ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI);
   const Index& index = *c_indexPool.getPtr(scan.m_indexId);
   const DescHead& descHead = getDescHead(index);
   const KeyType* keyTypes = getKeyTypes(descHead);
-  // collect normalized lower and upper bounds
-  struct BoundInfo {
-    int type2;     // with EQ -> LE/GE
-    Uint32 offset; // offset in xfrmData
-    Uint32 size;
-  };
-  BoundInfo boundInfo[2][MaxIndexAttributes];
-  const unsigned dstSize = MaxAttrDataSize;
-  // use some static buffer (they are only used within a timeslice)
-  Uint32* const xfrmData = c_ctx.c_dataBuffer;
-  Uint32 dstPos = 0;
-  // largest attrId seen plus one
-  Uint32 maxAttrId[2] = { 0, 0 };
-  // walk through entries
-  const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
-  Uint32 offset = 0;
-  while (offset + 2 <= req->boundAiLength) {
-    jam();
-    const unsigned type = data[offset];
-    const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
-    const Uint32 attrId = ah->getAttributeId();
-    const Uint32 byteSize = ah->getByteSize();
-    const Uint32 dataSize = ah->getDataSize();
-    if (type > 4 || attrId >= index.m_numAttrs || dstPos + 2 + dataSize > dstSize) {
-      jam();
-      scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
-      sig->errorCode = scan.m_errorCode;
-      return;
-    }
-    // copy header
-    xfrmData[dstPos + 0] = data[offset + 0];
-    xfrmData[dstPos + 1] = data[offset + 1];
-    // copy bound value
-    Uint32 dstBytes = 0;
-    Uint32 dstWords = 0;
-    if (! ah->isNULL()) {
-      jam();
-      const uchar* srcPtr = (const uchar*)&data[offset + 2];
-      const KeyType& keyType = keyTypes[attrId];
-      Uint32 typeId = keyType.get_type_id();
-      Uint32 maxBytes = keyType.get_byte_size();
-      Uint32 lb, len;
-      bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, maxBytes, lb, len);
-      if (! ok) {
-        jam();
-        scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
-        sig->errorCode = scan.m_errorCode;
-        return;
-      }
-      Uint32 srcBytes = lb + len;
-      Uint32 srcWords = (srcBytes + 3) / 4;
-      if (srcBytes != byteSize) {
+  // extract lower and upper bound in separate passes
+  for (unsigned idir = 0; idir <= 1; idir++) {
+    jam();
+    struct BoundInfo {
+      int type2;      // with EQ -> LE/GE
+      Uint32 offset;  // word offset in signal data
+      Uint32 bytes;
+    };
+    BoundInfo boundInfo[MaxIndexAttributes];
+    // largest attrId seen plus one
+    Uint32 maxAttrId = 0;
+    const Uint32* const data = &req->data[0];
+    Uint32 offset = 0;
+    while (offset + 2 <= req->boundAiLength) {
+      jam();
+      const Uint32 type = data[offset];
+      const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
+      const Uint32 attrId = ah->getAttributeId();
+      const Uint32 byteSize = ah->getByteSize();
+      const Uint32 dataSize = ah->getDataSize();
+      // check type
+      if (unlikely(type > 4)) {
         jam();
         scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
-        sig->errorCode = scan.m_errorCode;
+        req->errorCode = scan.m_errorCode;
         return;
       }
-      uchar* dstPtr = (uchar*)&xfrmData[dstPos + 2];
-      if (keyType.get_cs_number() == 0) {
-        memcpy(dstPtr, srcPtr, srcWords << 2);
-        dstBytes = srcBytes;
-        dstWords = srcWords;
-      } else {
+      Uint32 type2 = type;
+      if (type2 == 4) {
         jam();
-        CHARSET_INFO* cs = all_charsets[keyType.get_cs_number()];
-        Uint32 xmul = cs->strxfrm_multiply;
-        if (xmul == 0)
-          xmul = 1;
-        // see comment in DbtcMain.cpp
-        Uint32 dstLen = xmul * (maxBytes - lb);
-        if (dstLen > ((dstSize - dstPos) << 2)) {
+        type2 = (idir << 1); // LE=0 GE=2
+      }
+      // check if attribute belongs to this bound
+      if ((type2 & 0x2) == (idir << 1)) {
+        if (unlikely(attrId >= index.m_numAttrs)) {
           jam();
-          scan.m_errorCode = TuxBoundInfo::TooMuchAttrInfo;
-          sig->errorCode = scan.m_errorCode;
+          scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
+          req->errorCode = scan.m_errorCode;
           return;
         }
-        int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
-        ndbrequire(n != -1);
-        dstBytes = n;
-        while ((n & 3) != 0) {
-          dstPtr[n++] = 0;
+        // mark entries in any gap as undefined
+        while (maxAttrId <= attrId) {
+          jam();
+          BoundInfo& b = boundInfo[maxAttrId];
+          b.type2 = -1;
+          maxAttrId++;
         }
-        dstWords = n / 4;
-      }
-    }
-    for (unsigned j = 0; j <= 1; j++) {
-      jam();
-      // check if lower/upper bit matches
-      const unsigned luBit = (j << 1);
-      if ((type & 0x2) != luBit && type != 4)
-        continue;
-      // EQ -> LE, GE
-      const unsigned type2 = (type & 0x1) | luBit;
-      // fill in any gap
-      while (maxAttrId[j] <= attrId) {
-        jam();
-        BoundInfo& b = boundInfo[j][maxAttrId[j]];
-        maxAttrId[j]++;
-        b.type2 = -1;
-      }
-      BoundInfo& b = boundInfo[j][attrId];
-      if (b.type2 != -1) {
-        // compare with previously defined bound
-        if (b.type2 != (int)type2 ||
-            b.size != 2 + dstWords ||
-            memcmp(&xfrmData[b.offset + 2], &xfrmData[dstPos + 2], dstWords << 2) != 0) {
+        BoundInfo& b = boundInfo[attrId];
+        // duplicate no longer allowed (wl#4163)
+        if (unlikely(b.type2 != -1)) {
           jam();
           scan.m_errorCode = TuxBoundInfo::InvalidBounds;
-          sig->errorCode = scan.m_errorCode;
+          req->errorCode = scan.m_errorCode;
           return;
         }
-      } else {
-        // fix length
-        AttributeHeader* ah = (AttributeHeader*)&xfrmData[dstPos + 1];
-        ah->setByteSize(dstBytes);
-        // enter new bound
-        jam();
-        b.type2 = type2;
-        b.offset = dstPos;
-        b.size = 2 + dstWords;
+        b.type2 = (int)type2;
+        b.offset = offset + 1; // poai
+        b.bytes = byteSize;
       }
+      // jump to next
+      offset += 2 + dataSize;
     }
-    // jump to next
-    offset += 2 + dataSize;
-    dstPos += 2 + dstWords;
-  }
-  if (offset != req->boundAiLength) {
-    jam();
-    scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
-    sig->errorCode = scan.m_errorCode;
-    return;
-  }
-  for (unsigned j = 0; j <= 1; j++) {
-    // save lower/upper bound in index attribute id order
-    for (unsigned i = 0; i < maxAttrId[j]; i++) {
-      jam();
-      const BoundInfo& b = boundInfo[j][i];
-      // check for gap or strict bound before last
-      if (b.type2 == -1 || (i + 1 < maxAttrId[j] && (b.type2 & 0x1))) {
-        jam();
-        scan.m_errorCode = TuxBoundInfo::InvalidBounds;
-        sig->errorCode = scan.m_errorCode;
-        return;
-      }
-      bool ok = scan.m_bound[j]->append(&xfrmData[b.offset], b.size);
-      if (! ok) {
+    if (unlikely(offset != req->boundAiLength)) {
+      jam();
+      scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
+      req->errorCode = scan.m_errorCode;
+      return;
+    }
+    // check and pack the bound data
+    KeyData searchBoundData(index.m_keySpec, true, 0);
+    KeyBound searchBound(searchBoundData);
+    searchBoundData.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
+    int strict = 0; // 0 or 1
+    Uint32 i;
+    for (i = 0; i < maxAttrId; i++) {
+      jam();
+      const BoundInfo& b = boundInfo[i];
+       // check for gap or strict bound before last
+       strict = (b.type2 & 0x1);
+       if (unlikely(b.type2 == -1 || (i + 1 < maxAttrId && strict))) {
+         jam();
+         scan.m_errorCode = TuxBoundInfo::InvalidBounds;
+         req->errorCode = scan.m_errorCode;
+         return;
+       }
+       Uint32 len;
+       if (unlikely(searchBoundData.add_poai(&data[b.offset], &len) == -1 ||
+           b.bytes != len)) {
+         jam();
+         scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
+         req->errorCode = scan.m_errorCode;
+         return;
+       }
+    }
+    int side = 0;
+    if (maxAttrId != 0) {
+      // arithmetic is faster
+      // side = (idir == 0 ? (strict ? +1 : -1) : (strict ? -1 : +1));
+      side = (-1) * (1 - 2 * strict) * (1 - 2 * int(idir));
+    }
+    if (unlikely(searchBound.finalize(side) == -1)) {
+      jam();
+      scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
+      req->errorCode = scan.m_errorCode;
+      return;
+    }
+    ScanBound& scanBound = scan.m_scanBound[idir];
+    scanBound.m_cnt = maxAttrId;
+    scanBound.m_side = side;
+    // save data words in segmented memory
+    {
+      DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
+      LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+      const Uint32* data = (const Uint32*)searchBoundData.get_data_buf();
+      Uint32 size = (searchBoundData.get_data_len() + 3) / 4;
+      bool ok = b.append(data, size);
+      if (unlikely(!ok)) {
         jam();
         scan.m_errorCode = TuxBoundInfo::OutOfBuffers;
-        sig->errorCode = scan.m_errorCode;
+        req->errorCode = scan.m_errorCode;
         return;
       }
     }
-    scan.m_boundCnt[j] = maxAttrId[j];
   }
   if (ERROR_INSERTED(12009)) {
     jam();
     CLEAR_ERROR_INSERT_VALUE;
     scan.m_errorCode = TuxBoundInfo::InvalidBounds;
-    sig->errorCode = scan.m_errorCode;
+    req->errorCode = scan.m_errorCode;
     return;
   }
   // no error
-  sig->errorCode = 0;
+  req->errorCode = 0;
 }
 
 void
@@ -758,18 +727,21 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
 {
   ScanOp& scan = *scanPtr.p;
   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
 #ifdef VM_TRACE
   if (debugFlags & DebugScan) {
     debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
   }
 #endif
-  // set up index keys for this operation
-  setKeyAttrs(c_ctx, frag);
   // scan direction 0, 1
   const unsigned idir = scan.m_descending;
-  unpackBound(*scan.m_bound[idir], c_ctx.c_dataBuffer);
+  // set up bound from segmented memory
+  const ScanBound& scanBound = scan.m_scanBound[idir];
+  KeyDataC searchBoundData(index.m_keySpec, true);
+  KeyBoundC searchBound(searchBoundData);
+  unpackBound(c_ctx, scanBound, searchBound);
   TreePos treePos;
-  searchToScan(frag, c_ctx.c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
+  searchToScan(frag, idir, searchBound, treePos);
   if (treePos.m_loc != NullTupLoc) {
     scan.m_scanPos = treePos;
     // link the scan to node found
@@ -780,11 +752,15 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
       jam();
       // check upper bound
       TreeEnt ent = node.getEnt(treePos.m_pos);
-      if (scanCheck(scanPtr, ent))
+      if (scanCheck(scanPtr, ent)) {
+        jam();
         scan.m_state = ScanOp::Current;
-      else
+      } else {
+        jam();
         scan.m_state = ScanOp::Last;
+      }
     } else {
+      jam();
       scan.m_state = ScanOp::Next;
     }
   } else {
@@ -875,8 +851,6 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool
 #endif
   // cannot be moved away from tuple we have locked
   ndbrequire(scan.m_state != ScanOp::Locked);
-  // set up index keys for this operation
-  setKeyAttrs(c_ctx, frag);
   // scan direction
   const unsigned idir = scan.m_descending; // 0, 1
   const int jdir = 1 - 2 * (int)idir;      // 1, -1
@@ -1031,17 +1005,34 @@ Dbtux::scanCheck(ScanOpPtr scanPtr, Tree
     return false;
   }
   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
   const unsigned idir = scan.m_descending;
   const int jdir = 1 - 2 * (int)idir;
-  unpackBound(*scan.m_bound[1 - idir], c_ctx.c_dataBuffer);
-  unsigned boundCnt = scan.m_boundCnt[1 - idir];
-  readKeyAttrs(c_ctx, frag, ent, 0, c_ctx.c_entryKey);
-  int ret = cmpScanBound(frag, 1 - idir, c_ctx.c_dataBuffer, boundCnt, c_ctx.c_entryKey);
-  ndbrequire(ret != NdbSqlUtil::CmpUnknown);
-  if (jdir * ret > 0)
-    return true;
-  // hit upper bound of single range scan
-  return false;
+  const ScanBound& scanBound = scan.m_scanBound[1 - idir];
+  int ret = 0;
+  if (scanBound.m_cnt != 0) {
+    jam();
+    // set up bound from segmented memory
+    KeyDataC searchBoundData(index.m_keySpec, true);
+    KeyBoundC searchBound(searchBoundData);
+    unpackBound(c_ctx, scanBound, searchBound);
+    // key data for the entry
+    KeyData entryKey(index.m_keySpec, true, 0);
+    entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
+    readKeyAttrs(c_ctx, frag, ent, entryKey, index.m_numAttrs);
+    // compare bound to key
+    const Uint32 boundCount = searchBound.get_data().get_cnt();
+    ret = cmpSearchBound(searchBound, entryKey, boundCount);
+    ndbrequire(ret != 0);
+    ret = (-1) * ret; // reverse for key vs bound
+    ret = jdir * ret; // reverse for descending scan
+  }
+#ifdef VM_TRACE
+  if (debugFlags & DebugScan) {
+    debugOut << "Check scan " << scanPtr.i << " " << scan << " ret:" << dec << ret << endl;
+  }
+#endif
+  return (ret <= 0);
 }
 
 /*
@@ -1208,8 +1199,12 @@ Dbtux::releaseScanOp(ScanOpPtr& scanPtr)
   }
 #endif
   Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
-  scanPtr.p->m_boundMin.release();
-  scanPtr.p->m_boundMax.release();
+  for (unsigned i = 0; i <= 1; i++) {
+    ScanBound& scanBound = scanPtr.p->m_scanBound[i];
+    DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
+    LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+    b.release();
+  }
   // unlink from per-fragment list and release from pool
   frag.m_scanList.release(scanPtr);
 }

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp	2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp	2011-05-04 12:08:38 +0000
@@ -206,20 +206,25 @@ Dbtux::searchToRemove(TuxCtx& ctx, Frag&
  * Search within the found node is done by caller.
  */
 void
-Dbtux::findNodeToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode)
+Dbtux::findNodeToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode)
 {
+  const int jdir = 1 - 2 * int(idir);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+  const Uint32 numAttrs = searchBound.get_data().get_cnt();
+  KeyData entryKey(index.m_keySpec, false, 0);
+  entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
   NodeHandle glbNode(frag);     // potential g.l.b of final node
   while (true) {
     jam();
     selectNode(currNode, currNode.m_loc);
-    int ret;
     // wl4163_todo temp disable prefix
-    if (true) {
+    int ret = (-1) * jdir;
+    if (numAttrs > 0) {
       jam();
       // read and compare all attributes
-      readKeyAttrs(c_ctx, frag, currNode.getEnt(0), 0, c_ctx.c_entryKey);
-      ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
-      ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+      readKeyAttrs(c_ctx, frag, currNode.getEnt(0), entryKey, numAttrs);
+      ret = cmpSearchBound(searchBound, entryKey, numAttrs);
+      ndbrequire(ret != 0);
     }
     if (ret < 0) {
       // bound is left of this node
@@ -262,23 +267,26 @@ Dbtux::findNodeToScan(Frag& frag, unsign
  * search similar to findPosToAdd().
  */
 void
-Dbtux::findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos)
+Dbtux::findPosToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode, Uint16* pos)
 {
   const int jdir = 1 - 2 * int(idir);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+  const Uint32 numAttrs = searchBound.get_data().get_cnt();
   int lo = -1;
   int hi = (int)currNode.getOccup();
+  KeyData entryKey(index.m_keySpec, false, 0);
+  entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
   while (hi - lo > 1) {
     jam();
     // hi - lo > 1 implies lo < j < hi
     int j = (hi + lo) / 2;
     int ret = (-1) * jdir;
-    if (boundCount != 0) {
-      // read and compare attributes
-      const TreeEnt currEnt = currNode.getEnt(j);
-      readKeyAttrs(c_ctx, frag, currEnt, 0, c_ctx.c_entryKey);
-      ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
+    if (numAttrs != 0) {
+      // read and compare all attributes
+      readKeyAttrs(c_ctx, frag, currNode.getEnt(j), entryKey, numAttrs);
+      ret = cmpSearchBound(searchBound, entryKey, numAttrs);
+      ndbrequire(ret != 0);
     }
-    ndbrequire(ret != 0);
     if (ret < 0) {
       jam();
       hi = j;
@@ -298,7 +306,7 @@ Dbtux::findPosToScan(Frag& frag, unsigne
  * Search for scan start position.
  */
 void
-Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
+Dbtux::searchToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, TreePos& treePos)
 {
   const TreeHead& tree = frag.m_tree;
   NodeHandle currNode(frag);
@@ -308,11 +316,10 @@ Dbtux::searchToScan(Frag& frag, ConstDat
     jam();
     return;
   }
-  const unsigned idir = unsigned(descending);
-  findNodeToScan(frag, idir, boundInfo, boundCount, currNode);
+  findNodeToScan(frag, idir, searchBound, currNode);
   treePos.m_loc = currNode.m_loc;
   Uint16 pos;
-  findPosToScan(frag, idir, boundInfo, boundCount, currNode, &pos);
+  findPosToScan(frag, idir, searchBound, currNode, &pos);
   const unsigned occup = currNode.getOccup();
   if (idir == 0) {
     if (pos < occup) {

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp	2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp	2011-05-04 12:08:38 +0000
@@ -49,22 +49,18 @@ Dbtux::statRecordsInRange(ScanOpPtr scan
 {
   ScanOp& scan = *scanPtr.p;
   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
   TreeHead& tree = frag.m_tree;
   // get first and last position
   TreePos pos1 = scan.m_scanPos;
   TreePos pos2;
   { // as in scanFirst()
-    setKeyAttrs(c_ctx, frag);
     const unsigned idir = 1;
-    const ScanBound& bound = *scan.m_bound[idir];
-    ScanBoundIterator iter;
-    bound.first(iter);
-    for (unsigned j = 0; j < bound.getSize(); j++) {
-      jam();
-      c_ctx.c_dataBuffer[j] = *iter.data;
-      bound.next(iter);
-    }
-    searchToScan(frag, c_ctx.c_dataBuffer, scan.m_boundCnt[idir], true, pos2);
+    const ScanBound& scanBound = scan.m_scanBound[idir];
+    KeyDataC searchBoundData(index.m_keySpec, true);
+    KeyBoundC searchBound(searchBoundData);
+    unpackBound(c_ctx, scanBound, searchBound);
+    searchToScan(frag, idir, searchBound, pos2);
     // committed read (same timeslice) and range not empty
     ndbrequire(pos2.m_loc != NullTupLoc);
   }


Attachment: [text/bzr-bundle] bzr/pekka@mysql.com-20110504120838-kcfvxygof4gt9qrc.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-wl4163 branch (pekka:4356) WL#4163Pekka Nousiainen4 May