#At file:///export/space/pekka/ms/ms-wl4163-70/ based on revid:pekka@stripped
4356 Pekka Nousiainen 2011-05-04
wl#4163 g03_ops.diff
index scan using packed data
modified:
storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2011-05-04 12:08:38 +0000
@@ -317,16 +317,22 @@ private:
typedef NdbPack::DataC KeyDataC;
typedef NdbPack::Data KeyData;
+ typedef NdbPack::BoundC KeyBoundC;
+ typedef NdbPack::Bound KeyBound;
// range scan
-
+
/*
- * Scan bounds are stored in linked list of segments.
+ * ScanBound instances are members of ScanOp. Bound data is stored in
+ * a separate segmented buffer pool.
*/
- typedef DataBuffer<ScanBoundSegmentSize> ScanBound;
- typedef DataBuffer<ScanBoundSegmentSize>::ConstDataBufferIterator ScanBoundIterator;
- typedef DataBuffer<ScanBoundSegmentSize>::DataBufferPool ScanBoundPool;
- ScanBoundPool c_scanBoundPool;
+ struct ScanBound {
+ DataBuffer<ScanBoundSegmentSize>::Head m_head;
+ Uint16 m_cnt; // number of attributes
+ Int16 m_side;
+ ScanBound();
+ };
+ DataBuffer<ScanBoundSegmentSize>::DataBufferPool c_scanBoundPool;
// ScanLock
struct ScanLock {
@@ -395,10 +401,7 @@ private:
Uint8 m_readCommitted; // no locking
Uint8 m_lockMode;
Uint8 m_descending;
- ScanBound m_boundMin;
- ScanBound m_boundMax;
- ScanBound* m_bound[2]; // pointers to above 2
- Uint16 m_boundCnt[2]; // number of bounds in each
+ ScanBound m_scanBound[2];
TreePos m_scanPos; // position
TreeEnt m_scanEnt; // latest entry found
Uint32 m_nodeScan; // next scan at node (single-linked)
@@ -407,7 +410,7 @@ private:
Uint32 nextList;
};
Uint32 prevList;
- ScanOp(ScanBoundPool& scanBoundPool);
+ ScanOp();
};
typedef Ptr<ScanOp> ScanOpPtr;
ArrayPool<ScanOp> c_scanOpPool;
@@ -549,7 +552,7 @@ private:
void readKeyAttrs(TuxCtx&, const Frag& frag, TreeEnt ent, KeyData& keyData, Uint32 count);
void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
void copyAttrs(TuxCtx&, const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
- void unpackBound(const ScanBound& bound, Data data);
+ void unpackBound(TuxCtx&, const ScanBound& bound, KeyBoundC& searchBound);
void findFrag(const Index& index, Uint32 fragId, FragPtr& fragPtr);
/*
@@ -647,9 +650,9 @@ private:
bool findPosToRemove(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
bool searchToAdd(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos);
bool searchToRemove(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos);
- void findNodeToScan(Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode);
- void findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos);
- void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
+ void findNodeToScan(Frag& frag, unsigned dir, const KeyBoundC& searchBound, NodeHandle& currNode);
+ void findPosToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode, Uint16* pos);
+ void searchToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, TreePos& treePos);
/*
* DbtuxCmp.cpp
@@ -657,6 +660,7 @@ private:
int cmpSearchKey(TuxCtx&, const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpSearchKey(const KeyDataC& searchKey, const KeyDataC& entryKey, Uint32 cnt);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
+ int cmpSearchBound(const KeyBoundC& searchBound, const KeyDataC& entryKey, Uint32 cnt);
/*
* DbtuxStat.cpp
@@ -947,10 +951,20 @@ Dbtux::DescPage::DescPage() :
}
}
+// Dbtux::ScanBound
+
+inline
+Dbtux::ScanBound::ScanBound() :
+ m_head(),
+ m_cnt(0),
+ m_side(0)
+{
+}
+
// Dbtux::ScanOp
inline
-Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
+Dbtux::ScanOp::ScanOp() :
m_state(Undef),
m_lockwait(false),
m_errorCode(0),
@@ -967,16 +981,11 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& sca
m_readCommitted(0),
m_lockMode(0),
m_descending(0),
- m_boundMin(scanBoundPool),
- m_boundMax(scanBoundPool),
+ m_scanBound(),
m_scanPos(),
m_scanEnt(),
m_nodeScan(RNIL)
{
- m_bound[0] = &m_boundMin;
- m_bound[1] = &m_boundMax;
- m_boundCnt[0] = 0;
- m_boundCnt[1] = 0;
}
// Dbtux::Index
@@ -1277,6 +1286,24 @@ Dbtux::cmpSearchKey(const KeyDataC& sear
debugOut << " entry:" << entryKey.print(tmp, sizeof(tmp));
debugOut << endl;
}
+#endif
+ return ret;
+}
+
+inline int
+Dbtux::cmpSearchBound(const KeyBoundC& searchBound, const KeyDataC& entryKey, Uint32 cnt)
+{
+ // compare cnt attributes from each
+ Uint32 num_eq;
+ int ret = searchBound.cmp(entryKey, cnt, num_eq);
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ char tmp[MaxAttrDataSize << 2];
+ debugOut << "cmpSearchBound: res:" << ret;
+ debugOut << " search:" << searchBound.print(tmp, sizeof(tmp));
+ debugOut << " entry:" << entryKey.print(tmp, sizeof(tmp));
+ debugOut << endl;
+ }
#endif
return ret;
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp 2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp 2011-05-04 12:08:38 +0000
@@ -439,17 +439,16 @@ operator<<(NdbOut& out, const Dbtux::Sca
out << " [pos " << scan.m_scanPos << "]";
out << " [ent " << scan.m_scanEnt << "]";
for (unsigned i = 0; i <= 1; i++) {
- out << " [bound " << dec << i;
- Dbtux::ScanBound& bound = *scan.m_bound[i];
- Dbtux::ScanBoundIterator iter;
- bound.first(iter);
- for (unsigned j = 0; j < bound.getSize(); j++) {
- out << " " << hex << *iter.data;
- bound.next(iter);
- }
+ const Dbtux::ScanBound scanBound = scan.m_scanBound[i];
+ const Dbtux::Index& index = *tux->c_indexPool.getPtr(scan.m_indexId);
+ Dbtux::KeyDataC keyBoundData(index.m_keySpec, true);
+ Dbtux::KeyBoundC keyBound(keyBoundData);
+ tux->unpackBound(tux->c_ctx, scanBound, keyBound);
+ char tmp[Dbtux::MaxAttrDataSize << 2];
+ out << " [scanBound " << dec << i;
+ out << " " << keyBound.print(tmp, sizeof(tmp));
out << "]";
}
- out << "]";
return out;
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2011-05-04 12:08:38 +0000
@@ -398,16 +398,26 @@ Dbtux::copyAttrs(TuxCtx& ctx, const Frag
}
void
-Dbtux::unpackBound(const ScanBound& bound, Data dest)
+Dbtux::unpackBound(TuxCtx& ctx, const ScanBound& scanBound, KeyBoundC& searchBound)
{
- ScanBoundIterator iter;
- bound.first(iter);
- const unsigned n = bound.getSize();
- unsigned j;
- for (j = 0; j < n; j++) {
- dest[j] = *iter.data;
- bound.next(iter);
+ // there is no const version of LocalDataBuffer
+ DataBuffer<ScanBoundSegmentSize>::Head head = scanBound.m_head;
+ LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+ DataBuffer<ScanBoundSegmentSize>::ConstDataBufferIterator iter;
+ // always use searchKey buffer
+ Uint32* const outputBuffer = ctx.c_searchKey;
+ b.first(iter);
+ const Uint32 n = b.getSize();
+ ndbrequire(n <= MaxAttrDataSize);
+ for (Uint32 i = 0; i < n; i++) {
+ outputBuffer[i] = *iter.data;
+ b.next(iter);
}
+ // set bound to the unpacked data buffer
+ KeyDataC& searchBoundData = searchBound.get_data();
+ searchBoundData.set_buf(outputBuffer, MaxAttrDataSize << 2, scanBound.m_cnt);
+ int ret = searchBound.finalize(scanBound.m_side);
+ ndbrequire(ret == 0);
}
void
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp 2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp 2011-05-04 12:08:38 +0000
@@ -75,7 +75,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
errorCode = AccScanRef::TuxNoFreeScanOp;
break;
}
- new (scanPtr.p) ScanOp(c_scanBoundPool);
+ new (scanPtr.p) ScanOp;
scanPtr.p->m_state = ScanOp::First;
scanPtr.p->m_userPtr = req->senderData;
scanPtr.p->m_userRef = req->senderRef;
@@ -131,9 +131,9 @@ Dbtux::execACC_SCANREQ(Signal* signal)
* Check that sets of lower and upper bounds are on initial sequences of
* keys and that all but possibly last bound is non-strict.
*
- * Finally save the sets of lower and upper bounds (i.e. start key and
- * end key). Full bound type is included but only the strict bit is
- * used since lower and upper have now been separated.
+ * Finally convert the sets of lower and upper bounds (i.e. start key
+ * and end key) to NdbPack format. The data is saved in segmented
+ * memory. The bound is reconstructed at use time via unpackBound().
*
* Error handling: Error code is set in the scan and also returned in
* EXECUTE_DIRECT (the old way).
@@ -143,175 +143,144 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal
{
jamEntry();
// get records
- TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
- const TuxBoundInfo* const req = (const TuxBoundInfo*)sig;
+ TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI);
const Index& index = *c_indexPool.getPtr(scan.m_indexId);
const DescHead& descHead = getDescHead(index);
const KeyType* keyTypes = getKeyTypes(descHead);
- // collect normalized lower and upper bounds
- struct BoundInfo {
- int type2; // with EQ -> LE/GE
- Uint32 offset; // offset in xfrmData
- Uint32 size;
- };
- BoundInfo boundInfo[2][MaxIndexAttributes];
- const unsigned dstSize = MaxAttrDataSize;
- // use some static buffer (they are only used within a timeslice)
- Uint32* const xfrmData = c_ctx.c_dataBuffer;
- Uint32 dstPos = 0;
- // largest attrId seen plus one
- Uint32 maxAttrId[2] = { 0, 0 };
- // walk through entries
- const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
- Uint32 offset = 0;
- while (offset + 2 <= req->boundAiLength) {
- jam();
- const unsigned type = data[offset];
- const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
- const Uint32 attrId = ah->getAttributeId();
- const Uint32 byteSize = ah->getByteSize();
- const Uint32 dataSize = ah->getDataSize();
- if (type > 4 || attrId >= index.m_numAttrs || dstPos + 2 + dataSize > dstSize) {
- jam();
- scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
- sig->errorCode = scan.m_errorCode;
- return;
- }
- // copy header
- xfrmData[dstPos + 0] = data[offset + 0];
- xfrmData[dstPos + 1] = data[offset + 1];
- // copy bound value
- Uint32 dstBytes = 0;
- Uint32 dstWords = 0;
- if (! ah->isNULL()) {
- jam();
- const uchar* srcPtr = (const uchar*)&data[offset + 2];
- const KeyType& keyType = keyTypes[attrId];
- Uint32 typeId = keyType.get_type_id();
- Uint32 maxBytes = keyType.get_byte_size();
- Uint32 lb, len;
- bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, maxBytes, lb, len);
- if (! ok) {
- jam();
- scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
- sig->errorCode = scan.m_errorCode;
- return;
- }
- Uint32 srcBytes = lb + len;
- Uint32 srcWords = (srcBytes + 3) / 4;
- if (srcBytes != byteSize) {
+ // extract lower and upper bound in separate passes
+ for (unsigned idir = 0; idir <= 1; idir++) {
+ jam();
+ struct BoundInfo {
+ int type2; // with EQ -> LE/GE
+ Uint32 offset; // word offset in signal data
+ Uint32 bytes;
+ };
+ BoundInfo boundInfo[MaxIndexAttributes];
+ // largest attrId seen plus one
+ Uint32 maxAttrId = 0;
+ const Uint32* const data = &req->data[0];
+ Uint32 offset = 0;
+ while (offset + 2 <= req->boundAiLength) {
+ jam();
+ const Uint32 type = data[offset];
+ const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
+ const Uint32 attrId = ah->getAttributeId();
+ const Uint32 byteSize = ah->getByteSize();
+ const Uint32 dataSize = ah->getDataSize();
+ // check type
+ if (unlikely(type > 4)) {
jam();
scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
- sig->errorCode = scan.m_errorCode;
+ req->errorCode = scan.m_errorCode;
return;
}
- uchar* dstPtr = (uchar*)&xfrmData[dstPos + 2];
- if (keyType.get_cs_number() == 0) {
- memcpy(dstPtr, srcPtr, srcWords << 2);
- dstBytes = srcBytes;
- dstWords = srcWords;
- } else {
+ Uint32 type2 = type;
+ if (type2 == 4) {
jam();
- CHARSET_INFO* cs = all_charsets[keyType.get_cs_number()];
- Uint32 xmul = cs->strxfrm_multiply;
- if (xmul == 0)
- xmul = 1;
- // see comment in DbtcMain.cpp
- Uint32 dstLen = xmul * (maxBytes - lb);
- if (dstLen > ((dstSize - dstPos) << 2)) {
+ type2 = (idir << 1); // LE=0 GE=2
+ }
+ // check if attribute belongs to this bound
+ if ((type2 & 0x2) == (idir << 1)) {
+ if (unlikely(attrId >= index.m_numAttrs)) {
jam();
- scan.m_errorCode = TuxBoundInfo::TooMuchAttrInfo;
- sig->errorCode = scan.m_errorCode;
+ scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
+ req->errorCode = scan.m_errorCode;
return;
}
- int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
- ndbrequire(n != -1);
- dstBytes = n;
- while ((n & 3) != 0) {
- dstPtr[n++] = 0;
+ // mark entries in any gap as undefined
+ while (maxAttrId <= attrId) {
+ jam();
+ BoundInfo& b = boundInfo[maxAttrId];
+ b.type2 = -1;
+ maxAttrId++;
}
- dstWords = n / 4;
- }
- }
- for (unsigned j = 0; j <= 1; j++) {
- jam();
- // check if lower/upper bit matches
- const unsigned luBit = (j << 1);
- if ((type & 0x2) != luBit && type != 4)
- continue;
- // EQ -> LE, GE
- const unsigned type2 = (type & 0x1) | luBit;
- // fill in any gap
- while (maxAttrId[j] <= attrId) {
- jam();
- BoundInfo& b = boundInfo[j][maxAttrId[j]];
- maxAttrId[j]++;
- b.type2 = -1;
- }
- BoundInfo& b = boundInfo[j][attrId];
- if (b.type2 != -1) {
- // compare with previously defined bound
- if (b.type2 != (int)type2 ||
- b.size != 2 + dstWords ||
- memcmp(&xfrmData[b.offset + 2], &xfrmData[dstPos + 2], dstWords << 2) != 0) {
+ BoundInfo& b = boundInfo[attrId];
+ // duplicate no longer allowed (wl#4163)
+ if (unlikely(b.type2 != -1)) {
jam();
scan.m_errorCode = TuxBoundInfo::InvalidBounds;
- sig->errorCode = scan.m_errorCode;
+ req->errorCode = scan.m_errorCode;
return;
}
- } else {
- // fix length
- AttributeHeader* ah = (AttributeHeader*)&xfrmData[dstPos + 1];
- ah->setByteSize(dstBytes);
- // enter new bound
- jam();
- b.type2 = type2;
- b.offset = dstPos;
- b.size = 2 + dstWords;
+ b.type2 = (int)type2;
+ b.offset = offset + 1; // poai
+ b.bytes = byteSize;
}
+ // jump to next
+ offset += 2 + dataSize;
}
- // jump to next
- offset += 2 + dataSize;
- dstPos += 2 + dstWords;
- }
- if (offset != req->boundAiLength) {
- jam();
- scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
- sig->errorCode = scan.m_errorCode;
- return;
- }
- for (unsigned j = 0; j <= 1; j++) {
- // save lower/upper bound in index attribute id order
- for (unsigned i = 0; i < maxAttrId[j]; i++) {
- jam();
- const BoundInfo& b = boundInfo[j][i];
- // check for gap or strict bound before last
- if (b.type2 == -1 || (i + 1 < maxAttrId[j] && (b.type2 & 0x1))) {
- jam();
- scan.m_errorCode = TuxBoundInfo::InvalidBounds;
- sig->errorCode = scan.m_errorCode;
- return;
- }
- bool ok = scan.m_bound[j]->append(&xfrmData[b.offset], b.size);
- if (! ok) {
+ if (unlikely(offset != req->boundAiLength)) {
+ jam();
+ scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
+ req->errorCode = scan.m_errorCode;
+ return;
+ }
+ // check and pack the bound data
+ KeyData searchBoundData(index.m_keySpec, true, 0);
+ KeyBound searchBound(searchBoundData);
+ searchBoundData.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
+ int strict = 0; // 0 or 1
+ Uint32 i;
+ for (i = 0; i < maxAttrId; i++) {
+ jam();
+ const BoundInfo& b = boundInfo[i];
+ // check for gap or strict bound before last
+ strict = (b.type2 & 0x1);
+ if (unlikely(b.type2 == -1 || (i + 1 < maxAttrId && strict))) {
+ jam();
+ scan.m_errorCode = TuxBoundInfo::InvalidBounds;
+ req->errorCode = scan.m_errorCode;
+ return;
+ }
+ Uint32 len;
+ if (unlikely(searchBoundData.add_poai(&data[b.offset], &len) == -1 ||
+ b.bytes != len)) {
+ jam();
+ scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
+ req->errorCode = scan.m_errorCode;
+ return;
+ }
+ }
+ int side = 0;
+ if (maxAttrId != 0) {
+ // arithmetic is faster
+ // side = (idir == 0 ? (strict ? +1 : -1) : (strict ? -1 : +1));
+ side = (-1) * (1 - 2 * strict) * (1 - 2 * int(idir));
+ }
+ if (unlikely(searchBound.finalize(side) == -1)) {
+ jam();
+ scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
+ req->errorCode = scan.m_errorCode;
+ return;
+ }
+ ScanBound& scanBound = scan.m_scanBound[idir];
+ scanBound.m_cnt = maxAttrId;
+ scanBound.m_side = side;
+ // save data words in segmented memory
+ {
+ DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
+ LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+ const Uint32* data = (const Uint32*)searchBoundData.get_data_buf();
+ Uint32 size = (searchBoundData.get_data_len() + 3) / 4;
+ bool ok = b.append(data, size);
+ if (unlikely(!ok)) {
jam();
scan.m_errorCode = TuxBoundInfo::OutOfBuffers;
- sig->errorCode = scan.m_errorCode;
+ req->errorCode = scan.m_errorCode;
return;
}
}
- scan.m_boundCnt[j] = maxAttrId[j];
}
if (ERROR_INSERTED(12009)) {
jam();
CLEAR_ERROR_INSERT_VALUE;
scan.m_errorCode = TuxBoundInfo::InvalidBounds;
- sig->errorCode = scan.m_errorCode;
+ req->errorCode = scan.m_errorCode;
return;
}
// no error
- sig->errorCode = 0;
+ req->errorCode = 0;
}
void
@@ -758,18 +727,21 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
}
#endif
- // set up index keys for this operation
- setKeyAttrs(c_ctx, frag);
// scan direction 0, 1
const unsigned idir = scan.m_descending;
- unpackBound(*scan.m_bound[idir], c_ctx.c_dataBuffer);
+ // set up bound from segmented memory
+ const ScanBound& scanBound = scan.m_scanBound[idir];
+ KeyDataC searchBoundData(index.m_keySpec, true);
+ KeyBoundC searchBound(searchBoundData);
+ unpackBound(c_ctx, scanBound, searchBound);
TreePos treePos;
- searchToScan(frag, c_ctx.c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
+ searchToScan(frag, idir, searchBound, treePos);
if (treePos.m_loc != NullTupLoc) {
scan.m_scanPos = treePos;
// link the scan to node found
@@ -780,11 +752,15 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
jam();
// check upper bound
TreeEnt ent = node.getEnt(treePos.m_pos);
- if (scanCheck(scanPtr, ent))
+ if (scanCheck(scanPtr, ent)) {
+ jam();
scan.m_state = ScanOp::Current;
- else
+ } else {
+ jam();
scan.m_state = ScanOp::Last;
+ }
} else {
+ jam();
scan.m_state = ScanOp::Next;
}
} else {
@@ -875,8 +851,6 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool
#endif
// cannot be moved away from tuple we have locked
ndbrequire(scan.m_state != ScanOp::Locked);
- // set up index keys for this operation
- setKeyAttrs(c_ctx, frag);
// scan direction
const unsigned idir = scan.m_descending; // 0, 1
const int jdir = 1 - 2 * (int)idir; // 1, -1
@@ -1031,17 +1005,34 @@ Dbtux::scanCheck(ScanOpPtr scanPtr, Tree
return false;
}
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
const unsigned idir = scan.m_descending;
const int jdir = 1 - 2 * (int)idir;
- unpackBound(*scan.m_bound[1 - idir], c_ctx.c_dataBuffer);
- unsigned boundCnt = scan.m_boundCnt[1 - idir];
- readKeyAttrs(c_ctx, frag, ent, 0, c_ctx.c_entryKey);
- int ret = cmpScanBound(frag, 1 - idir, c_ctx.c_dataBuffer, boundCnt, c_ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
- if (jdir * ret > 0)
- return true;
- // hit upper bound of single range scan
- return false;
+ const ScanBound& scanBound = scan.m_scanBound[1 - idir];
+ int ret = 0;
+ if (scanBound.m_cnt != 0) {
+ jam();
+ // set up bound from segmented memory
+ KeyDataC searchBoundData(index.m_keySpec, true);
+ KeyBoundC searchBound(searchBoundData);
+ unpackBound(c_ctx, scanBound, searchBound);
+ // key data for the entry
+ KeyData entryKey(index.m_keySpec, true, 0);
+ entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
+ readKeyAttrs(c_ctx, frag, ent, entryKey, index.m_numAttrs);
+ // compare bound to key
+ const Uint32 boundCount = searchBound.get_data().get_cnt();
+ ret = cmpSearchBound(searchBound, entryKey, boundCount);
+ ndbrequire(ret != 0);
+ ret = (-1) * ret; // reverse for key vs bound
+ ret = jdir * ret; // reverse for descending scan
+ }
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Check scan " << scanPtr.i << " " << scan << " ret:" << dec << ret << endl;
+ }
+#endif
+ return (ret <= 0);
}
/*
@@ -1208,8 +1199,12 @@ Dbtux::releaseScanOp(ScanOpPtr& scanPtr)
}
#endif
Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
- scanPtr.p->m_boundMin.release();
- scanPtr.p->m_boundMax.release();
+ for (unsigned i = 0; i <= 1; i++) {
+ ScanBound& scanBound = scanPtr.p->m_scanBound[i];
+ DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
+ LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+ b.release();
+ }
// unlink from per-fragment list and release from pool
frag.m_scanList.release(scanPtr);
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp 2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp 2011-05-04 12:08:38 +0000
@@ -206,20 +206,25 @@ Dbtux::searchToRemove(TuxCtx& ctx, Frag&
* Search within the found node is done by caller.
*/
void
-Dbtux::findNodeToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode)
+Dbtux::findNodeToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode)
{
+ const int jdir = 1 - 2 * int(idir);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+ const Uint32 numAttrs = searchBound.get_data().get_cnt();
+ KeyData entryKey(index.m_keySpec, false, 0);
+ entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(currNode, currNode.m_loc);
- int ret;
// wl4163_todo temp disable prefix
- if (true) {
+ int ret = (-1) * jdir;
+ if (numAttrs > 0) {
jam();
// read and compare all attributes
- readKeyAttrs(c_ctx, frag, currNode.getEnt(0), 0, c_ctx.c_entryKey);
- ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+ readKeyAttrs(c_ctx, frag, currNode.getEnt(0), entryKey, numAttrs);
+ ret = cmpSearchBound(searchBound, entryKey, numAttrs);
+ ndbrequire(ret != 0);
}
if (ret < 0) {
// bound is left of this node
@@ -262,23 +267,26 @@ Dbtux::findNodeToScan(Frag& frag, unsign
* search similar to findPosToAdd().
*/
void
-Dbtux::findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos)
+Dbtux::findPosToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode, Uint16* pos)
{
const int jdir = 1 - 2 * int(idir);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+ const Uint32 numAttrs = searchBound.get_data().get_cnt();
int lo = -1;
int hi = (int)currNode.getOccup();
+ KeyData entryKey(index.m_keySpec, false, 0);
+ entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
while (hi - lo > 1) {
jam();
// hi - lo > 1 implies lo < j < hi
int j = (hi + lo) / 2;
int ret = (-1) * jdir;
- if (boundCount != 0) {
- // read and compare attributes
- const TreeEnt currEnt = currNode.getEnt(j);
- readKeyAttrs(c_ctx, frag, currEnt, 0, c_ctx.c_entryKey);
- ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
+ if (numAttrs != 0) {
+ // read and compare all attributes
+ readKeyAttrs(c_ctx, frag, currNode.getEnt(j), entryKey, numAttrs);
+ ret = cmpSearchBound(searchBound, entryKey, numAttrs);
+ ndbrequire(ret != 0);
}
- ndbrequire(ret != 0);
if (ret < 0) {
jam();
hi = j;
@@ -298,7 +306,7 @@ Dbtux::findPosToScan(Frag& frag, unsigne
* Search for scan start position.
*/
void
-Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
+Dbtux::searchToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
@@ -308,11 +316,10 @@ Dbtux::searchToScan(Frag& frag, ConstDat
jam();
return;
}
- const unsigned idir = unsigned(descending);
- findNodeToScan(frag, idir, boundInfo, boundCount, currNode);
+ findNodeToScan(frag, idir, searchBound, currNode);
treePos.m_loc = currNode.m_loc;
Uint16 pos;
- findPosToScan(frag, idir, boundInfo, boundCount, currNode, &pos);
+ findPosToScan(frag, idir, searchBound, currNode, &pos);
const unsigned occup = currNode.getOccup();
if (idir == 0) {
if (pos < occup) {
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp 2011-05-04 11:58:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp 2011-05-04 12:08:38 +0000
@@ -49,22 +49,18 @@ Dbtux::statRecordsInRange(ScanOpPtr scan
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
TreeHead& tree = frag.m_tree;
// get first and last position
TreePos pos1 = scan.m_scanPos;
TreePos pos2;
{ // as in scanFirst()
- setKeyAttrs(c_ctx, frag);
const unsigned idir = 1;
- const ScanBound& bound = *scan.m_bound[idir];
- ScanBoundIterator iter;
- bound.first(iter);
- for (unsigned j = 0; j < bound.getSize(); j++) {
- jam();
- c_ctx.c_dataBuffer[j] = *iter.data;
- bound.next(iter);
- }
- searchToScan(frag, c_ctx.c_dataBuffer, scan.m_boundCnt[idir], true, pos2);
+ const ScanBound& scanBound = scan.m_scanBound[idir];
+ KeyDataC searchBoundData(index.m_keySpec, true);
+ KeyBoundC searchBound(searchBoundData);
+ unpackBound(c_ctx, scanBound, searchBound);
+ searchToScan(frag, idir, searchBound, pos2);
// committed read (same timeslice) and range not empty
ndbrequire(pos2.m_loc != NullTupLoc);
}
Attachment: [text/bzr-bundle] bzr/pekka@mysql.com-20110504120838-kcfvxygof4gt9qrc.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-wl4163 branch (pekka:4356) WL#4163 | Pekka Nousiainen | 4 May |