4343 jonas oreland 2011-04-27 [merge]
ndb - merge first parts of wl4163
modified:
storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
4342 Jonas Oreland 2011-04-27
ndb - fix 0-len key (fails on my new brillant assert in ACC)
modified:
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2011-04-19 09:01:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2011-04-25 16:46:59 +0000
@@ -219,16 +219,11 @@ private:
static const TreeEnt NullTreeEnt;
/*
- * Tree node has 1) fixed part 2) a prefix of index key data for min
- * entry 3) max and min entries 4) rest of entries 5) one extra entry
- * used as work space.
+ * Tree node has 3 parts:
*
- * struct TreeNode part 1, size 6 words
- * min prefix part 2, size TreeHead::m_prefSize
- * max entry part 3
- * min entry part 3
- * rest of entries part 4
- * work entry part 5
+ * 1) struct TreeNode - the header (6 words)
+ * 2) some key values for min entry - the min prefix
+ * 3) list of TreeEnt (each 2 words)
*
* There are 3 links to other nodes: left child, right child, parent.
* Occupancy (number of entries) is at least 1 except temporarily when
@@ -248,17 +243,6 @@ private:
STATIC_CONST( NodeHeadSize = sizeof(TreeNode) >> 2 );
/*
- * Tree node "access size" was for an early version with signal
- * interface to TUP. It is now used only to compute sizes.
- */
- enum AccSize {
- AccNone = 0,
- AccHead = 1, // part 1
- AccPref = 2, // parts 1-3
- AccFull = 3 // parts 1-5
- };
-
- /*
* Tree header. There is one in each fragment. Contains tree
* parameters and address of root node.
*/
@@ -273,7 +257,6 @@ private:
TupLoc m_root; // root node
TreeHead();
// methods
- unsigned getSize(AccSize acc) const;
Data getPref(TreeNode* node) const;
TreeEnt* getEntList(TreeNode* node) const;
};
@@ -562,7 +545,6 @@ private:
// access other parts of the node
Data getPref();
TreeEnt getEnt(unsigned pos);
- TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert
void progError(int line, int cause, const char* file);
};
@@ -583,6 +565,7 @@ private:
void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
void copyAttrs(TuxCtx&, const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
void unpackBound(const ScanBound& bound, Data data);
+ void findFrag(const Index& index, Uint32 fragId, FragPtr& fragPtr);
/*
* DbtuxMeta.cpp
@@ -674,11 +657,14 @@ private:
/*
* DbtuxSearch.cpp
*/
+ void findNodeToUpdate(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode);
+ bool findPosToAdd(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
+ bool findPosToRemove(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
bool searchToAdd(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
- bool searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ bool searchToRemove(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ void findNodeToScan(Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode);
+ void findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
- void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
- void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
@@ -929,22 +915,6 @@ Dbtux::TreeHead::TreeHead() :
{
}
-inline unsigned
-Dbtux::TreeHead::getSize(AccSize acc) const
-{
- switch (acc) {
- case AccNone:
- return 0;
- case AccHead:
- return NodeHeadSize;
- case AccPref:
- return NodeHeadSize + m_prefSize + 2 * TreeEntSize;
- case AccFull:
- return m_nodeSize;
- }
- return 0;
-}
-
inline Dbtux::Data
Dbtux::TreeHead::getPref(TreeNode* node) const
{
@@ -1201,15 +1171,7 @@ Dbtux::NodeHandle::getEnt(unsigned pos)
TreeEnt* entList = tree.getEntList(m_node);
const unsigned occup = m_node->m_occup;
ndbrequire(pos < occup);
- return entList[(1 + pos) % occup];
-}
-
-inline Dbtux::TreeEnt
-Dbtux::NodeHandle::getMinMax(unsigned i)
-{
- const unsigned occup = m_node->m_occup;
- ndbrequire(i <= 1 && occup != 0);
- return getEnt(i == 0 ? 0 : occup - 1);
+ return entList[pos];
}
// parameters for methods
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp 2011-01-30 20:56:00 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp 2011-04-24 13:10:50 +0000
@@ -83,15 +83,7 @@ Dbtux::mt_buildIndexFragment(mt_BuildInd
const Uint32 fragId = req->fragId;
// get the fragment
FragPtr fragPtr;
- fragPtr.i = RNIL;
- for (unsigned i = 0; i < indexPtr.p->m_numFrags; i++) {
- jam();
- if (indexPtr.p->m_fragId[i] == fragId) {
- jam();
- c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
- break;
- }
- }
+ findFrag(*indexPtr.p, fragId, fragPtr);
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp 2011-04-24 16:20:23 +0000
@@ -286,7 +286,7 @@ Dbtux::printNode(TuxCtx & ctx,
{ ConstData data1 = node.getPref();
Uint32 data2[MaxPrefSize];
memset(data2, DataFillByte, MaxPrefSize << 2);
- readKeyAttrs(ctx, frag, node.getMinMax(0), 0, ctx.c_searchKey);
+ readKeyAttrs(ctx, frag, node.getEnt(0), 0, ctx.c_searchKey);
copyAttrs(ctx, frag, ctx.c_searchKey, data2, tree.m_prefSize);
for (unsigned n = 0; n < tree.m_prefSize; n++) {
if (data1[n] != data2[n]) {
@@ -320,7 +320,8 @@ Dbtux::printNode(TuxCtx & ctx,
if (node.getLink(i) == NullTupLoc)
continue;
const TreeEnt ent1 = cpar[i].m_minmax[1 - i];
- const TreeEnt ent2 = node.getMinMax(i);
+ const unsigned pos = (i == 0 ? 0 : node.getOccup() - 1);
+ const TreeEnt ent2 = node.getEnt(pos);
unsigned start = 0;
readKeyAttrs(ctx, frag, ent1, start, ctx.c_searchKey);
readKeyAttrs(ctx, frag, ent2, start, ctx.c_entryKey);
@@ -337,9 +338,10 @@ Dbtux::printNode(TuxCtx & ctx,
par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
par.m_occup = node.getOccup();
for (unsigned i = 0; i <= 1; i++) {
- if (node.getLink(i) == NullTupLoc)
- par.m_minmax[i] = node.getMinMax(i);
- else
+ if (node.getLink(i) == NullTupLoc) {
+ const unsigned pos = (i == 0 ? 0 : node.getOccup() - 1);
+ par.m_minmax[i] = node.getEnt(pos);
+ } else
par.m_minmax[i] = cpar[i].m_minmax[i];
}
}
@@ -387,9 +389,6 @@ operator<<(NdbOut& out, const Dbtux::Tre
out << " [prefSize " << dec << tree.m_prefSize << "]";
out << " [minOccup " << dec << tree.m_minOccup << "]";
out << " [maxOccup " << dec << tree.m_maxOccup << "]";
- out << " [AccHead " << dec << tree.getSize(Dbtux::AccHead) << "]";
- out << " [AccPref " << dec << tree.getSize(Dbtux::AccPref) << "]";
- out << " [AccFull " << dec << tree.getSize(Dbtux::AccFull) << "]";
out << " [root " << hex << tree.m_root << "]";
out << "]";
return out;
@@ -528,9 +527,8 @@ operator<<(NdbOut& out, const Dbtux::Nod
unsigned numpos = node.m_node->m_occup;
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
- // print entries in logical order
- for (unsigned pos = 1; pos <= numpos; pos++)
- out << " " << entList[pos % numpos];
+ for (unsigned pos = 0; pos < numpos; pos++)
+ out << " " << entList[pos];
out << "]";
out << "]";
return out;
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2011-04-24 13:10:50 +0000
@@ -366,4 +366,20 @@ Dbtux::unpackBound(const ScanBound& boun
}
}
+void
+Dbtux::findFrag(const Index& index, Uint32 fragId, FragPtr& fragPtr)
+{
+ const Uint32 numFrags = index.m_numFrags;
+ for (Uint32 i = 0; i < numFrags; i++) {
+ jam();
+ if (index.m_fragId[i] == fragId) {
+ jam();
+ fragPtr.i = index.m_fragPtrI[i];
+ c_fragPool.getPtr(fragPtr);
+ return;
+ }
+ }
+ fragPtr.i = RNIL;
+}
+
BLOCK_FUNCTIONS(Dbtux)
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp 2011-02-01 21:05:11 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp 2011-04-25 14:42:38 +0000
@@ -63,15 +63,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
const Uint32 fragId = req->fragId;
// get the fragment
FragPtr fragPtr;
- fragPtr.i = RNIL;
- for (unsigned i = 0; i < indexPtr.p->m_numFrags; i++) {
- jam();
- if (indexPtr.p->m_fragId[i] == fragId) {
- jam();
- c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
- break;
- }
- }
+ findFrag(*indexPtr.p, fragId, fragPtr);
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
// set up index keys for this operation
@@ -152,7 +144,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
case TuxMaintReq::OpRemove:
jam();
- ok = searchToRemove(frag, c_ctx.c_searchKey, ent, treePos);
+ ok = searchToRemove(c_ctx, frag, c_ctx.c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (! ok ? " - error" : "") << endl;
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp 2011-04-24 16:20:23 +0000
@@ -313,21 +313,20 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
tree.m_prefSize = MAX_TTREE_PREF_SIZE;
const unsigned maxSlack = MAX_TTREE_NODE_SLACK;
- // size up to and including first 2 entries
- const unsigned pref = tree.getSize(AccPref);
- if (! (pref <= tree.m_nodeSize)) {
+ // size of header and min prefix
+ const unsigned fixedSize = NodeHeadSize + tree.m_prefSize;
+ if (! (fixedSize <= tree.m_nodeSize)) {
jam();
errorCode = (TuxFragRef::ErrorCode)TuxAddAttrRef::InvalidNodeSize;
break;
}
- const unsigned slots = (tree.m_nodeSize - pref) / TreeEntSize;
- // leave out work space entry
- tree.m_maxOccup = 2 + slots - 1;
+ const unsigned slots = (tree.m_nodeSize - fixedSize) / TreeEntSize;
+ tree.m_maxOccup = slots;
// min occupancy of interior node must be at least 2
if (! (2 + maxSlack <= tree.m_maxOccup)) {
jam();
- errorCode = (TuxFragRef::ErrorCode)TuxAddAttrRef::InvalidNodeSize;
- break;
+ errorCode = (TuxFragRef::ErrorCode)TuxAddAttrRef::InvalidNodeSize;
+ break;
}
tree.m_minOccup = tree.m_maxOccup - maxSlack;
// root node does not exist (also set by ctor)
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-04-24 16:20:23 +0000
@@ -102,7 +102,7 @@ Dbtux::insertNode(NodeHandle& node)
TreeHead& tree = frag.m_tree;
memset(node.getPref(), DataFillByte, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node.m_node);
- memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
+ memset(entList, NodeFillByte, tree.m_maxOccup * (TreeEntSize << 2));
#endif
}
@@ -156,7 +156,7 @@ Dbtux::setNodePref(TuxCtx & ctx, NodeHan
{
const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree;
- readKeyAttrs(ctx, frag, node.getMinMax(0), 0, ctx.c_entryKey);
+ readKeyAttrs(ctx, frag, node.getEnt(0), 0, ctx.c_entryKey);
copyAttrs(ctx, frag, ctx.c_entryKey, node.getPref(), tree.m_prefSize);
}
@@ -185,14 +185,11 @@ Dbtux::nodePushUp(TuxCtx & ctx, NodeHand
nodePushUpScans(node, pos);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
thrjam(ctx.jamBuffer);
- tmpList[i] = tmpList[i - 1];
+ entList[i] = entList[i - 1];
}
- tmpList[pos] = ent;
- entList[0] = entList[occup + 1];
+ entList[pos] = ent;
node.setOccup(occup + 1);
// add new scans
if (scanList != RNIL)
@@ -258,14 +255,11 @@ Dbtux::nodePopDown(TuxCtx& ctx, NodeHand
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- ent = tmpList[pos];
+ ent = entList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
thrjam(ctx.jamBuffer);
- tmpList[i] = tmpList[i + 1];
+ entList[i] = entList[i + 1];
}
- entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
@@ -326,16 +320,13 @@ Dbtux::nodePushDown(TuxCtx& ctx, NodeHan
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- TreeEnt oldMin = tmpList[0];
+ TreeEnt oldMin = entList[0];
for (unsigned i = 0; i < pos; i++) {
thrjam(ctx.jamBuffer);
- tmpList[i] = tmpList[i + 1];
+ entList[i] = entList[i + 1];
}
- tmpList[pos] = ent;
+ entList[pos] = ent;
ent = oldMin;
- entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(ctx, node);
@@ -396,16 +387,13 @@ Dbtux::nodePopUp(TuxCtx& ctx, NodeHandle
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
- ent = tmpList[pos];
+ ent = entList[pos];
for (unsigned i = pos; i > 0; i--) {
thrjam(ctx.jamBuffer);
- tmpList[i] = tmpList[i - 1];
+ entList[i] = entList[i - 1];
}
- tmpList[0] = newMin;
- entList[0] = entList[occup];
+ entList[0] = newMin;
// add scans
if (scanList != RNIL)
addScanList(node, 0, scanList);
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp 2011-04-19 09:01:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp 2011-04-25 15:57:28 +0000
@@ -39,15 +39,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
c_indexPool.getPtr(indexPtr, req->tableId);
// get the fragment
FragPtr fragPtr;
- fragPtr.i = RNIL;
- for (unsigned i = 0; i < indexPtr.p->m_numFrags; i++) {
- jam();
- if (indexPtr.p->m_fragId[i] == req->fragmentNo) {
- jam();
- c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
- break;
- }
- }
+ findFrag(*indexPtr.p, req->fragmentNo, fragPtr);
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
// check for index not Online (i.e. Dropping)
@@ -858,8 +850,9 @@ Dbtux::scanFind(ScanOpPtr scanPtr)
* 0 - up from left child (scan this node next)
* 1 - up from right child (proceed to parent)
* 2 - up from root (the scan ends)
- * 3 - left to right within node (at end proceed to right child)
+ * 3 - left to right within node (at end set state 5)
* 4 - down from parent (proceed to left child)
+ * 5 - at node end proceed to right child (state becomes 4)
*
* If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction.
@@ -926,6 +919,19 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool
// pretend we came from left child
pos.m_dir = idir;
}
+ if (pos.m_dir == 5) {
+ // at node end proceed to right child
+ jam();
+ TupLoc loc = node.getLink(1 - idir);
+ if (loc != NullTupLoc) {
+ jam();
+ pos.m_loc = loc;
+ pos.m_dir = 4; // down from parent as usual
+ continue;
+ }
+ // pretend we came from right child
+ pos.m_dir = 1 - idir;
+ }
const unsigned occup = node.getOccup();
if (occup == 0) {
jam();
@@ -957,15 +963,8 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool
break;
}
// after node proceed to right child
- TupLoc loc = node.getLink(1 - idir);
- if (loc != NullTupLoc) {
- jam();
- pos.m_loc = loc;
- pos.m_dir = 4;
- continue;
- }
- // pretend we came from right child
- pos.m_dir = 1 - idir;
+ pos.m_dir = 5;
+ continue;
}
if (pos.m_dir == 1 - idir) {
// coming up from right child proceed to parent
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp 2011-04-27 21:13:10 +0000
@@ -20,28 +20,23 @@
#include "Dbtux.hpp"
/*
- * Search for entry to add.
+ * Search down non-empty tree for node to update. Compare search key to
+ * each node minimum. If greater, move to right subtree. This can
+ * overshoot target node. The last such node is saved. The search ends
+ * at a final node which is a semi-leaf or leaf. If search key is less
+ * than final node minimum then the saved node (if any) is the g.l.b of
+ * the final node and we move back to it.
*
- * Similar to searchToRemove (see below).
+ * Search within the found node is done by caller. On add, search key
+ * may be before minimum or after maximum entry. On remove, search key
+ * is within the node.
*/
-bool
-Dbtux::searchToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+void
+Dbtux::findNodeToUpdate(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode)
{
const TreeHead& tree = frag.m_tree;
- const unsigned numAttrs = frag.m_numAttrs;
- NodeHandle currNode(frag);
- currNode.m_loc = tree.m_root;
- if (currNode.m_loc == NullTupLoc) {
- // empty tree
- thrjam(ctx.jamBuffer);
- return true;
- }
+ const Uint32 numAttrs = frag.m_numAttrs;
NodeHandle glbNode(frag); // potential g.l.b of final node
- /*
- * In order to not (yet) change old behaviour, a position between
- * 2 nodes returns the one at the bottom of the tree.
- */
- NodeHandle bottomNode(frag);
while (true) {
thrjam(ctx.jamBuffer);
selectNode(currNode, currNode.m_loc);
@@ -53,14 +48,14 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
thrjam(ctx.jamBuffer);
// read and compare remaining attributes
ndbrequire(start < numAttrs);
- readKeyAttrs(ctx, frag, currNode.getMinMax(0), start, ctx.c_entryKey);
+ readKeyAttrs(ctx, frag, currNode.getEnt(0), start, ctx.c_entryKey);
ret = cmpSearchKey(ctx, frag, start, searchKey, ctx.c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
thrjam(ctx.jamBuffer);
// keys are equal, compare entry values
- ret = searchEnt.cmp(currNode.getMinMax(0));
+ ret = searchEnt.cmp(currNode.getEnt(0));
}
if (ret < 0) {
thrjam(ctx.jamBuffer);
@@ -73,11 +68,12 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
}
if (! glbNode.isNull()) {
thrjam(ctx.jamBuffer);
- // move up to the g.l.b but remember the bottom node
- bottomNode = currNode;
+ // move up to the g.l.b
currNode = glbNode;
}
- } else if (ret > 0) {
+ break;
+ }
+ if (ret > 0) {
thrjam(ctx.jamBuffer);
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
@@ -88,26 +84,28 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
currNode.m_loc = loc;
continue;
}
- } else {
- thrjam(ctx.jamBuffer);
- treePos.m_loc = currNode.m_loc;
- treePos.m_pos = 0;
- // entry found - error
- return false;
+ break;
}
+ // ret == 0
+ thrjam(ctx.jamBuffer);
break;
}
- // anticipate
- treePos.m_loc = currNode.m_loc;
- // binary search
+}
+
+/*
+ * Find position within the final node to add entry to. Use binary
+ * search. Return true if ok i.e. entry to add is not a duplicate.
+ */
+bool
+Dbtux::findPosToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
+{
int lo = -1;
- int hi = currNode.getOccup();
+ int hi = (int)currNode.getOccup();
int ret;
- while (1) {
+ while (hi - lo > 1) {
thrjam(ctx.jamBuffer);
// hi - lo > 1 implies lo < j < hi
int j = (hi + lo) / 2;
-
// read and compare attributes
unsigned start = 0;
readKeyAttrs(ctx, frag, currNode.getEnt(j), start, ctx.c_entryKey);
@@ -118,171 +116,113 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getEnt(j));
}
- if (ret < 0)
+ if (ret < 0) {
+ thrjam(ctx.jamBuffer);
hi = j;
- else if (ret > 0)
+ } else if (ret > 0) {
+ thrjam(ctx.jamBuffer);
lo = j;
- else {
+ } else {
treePos.m_pos = j;
// entry found - error
return false;
}
- if (hi - lo == 1)
- break;
}
- if (ret < 0) {
+ ndbrequire(hi - lo == 1);
+ // return hi pos, see treeAdd() for next step
+ treePos.m_pos = hi;
+ return true;
+}
+
+/*
+ * Find position within the final node to remove entry from. Use linear
+ * search. Return true if ok i.e. the entry was found.
+ */
+bool
+Dbtux::findPosToRemove(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
+{
+ const unsigned occup = currNode.getOccup();
+ for (unsigned j = 0; j < occup; j++) {
thrjam(ctx.jamBuffer);
- treePos.m_pos = hi;
- return true;
+ // compare only the entry
+ if (searchEnt.eq(currNode.getEnt(j))) {
+ thrjam(ctx.jamBuffer);
+ treePos.m_pos = j;
+ return true;
+ }
}
- if ((uint) hi < currNode.getOccup()) {
+ treePos.m_pos = occup;
+ // not found - failed
+ return false;
+}
+
+/*
+ * Search for entry to add.
+ */
+bool
+Dbtux::searchToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+{
+ const TreeHead& tree = frag.m_tree;
+ NodeHandle currNode(frag);
+ currNode.m_loc = tree.m_root;
+ if (unlikely(currNode.m_loc == NullTupLoc)) {
+ // empty tree
thrjam(ctx.jamBuffer);
- treePos.m_pos = hi;
return true;
}
- if (bottomNode.isNull()) {
+ findNodeToUpdate(ctx, frag, searchKey, searchEnt, currNode);
+ treePos.m_loc = currNode.m_loc;
+ if (! findPosToAdd(ctx, frag, searchKey, searchEnt, currNode, treePos)) {
thrjam(ctx.jamBuffer);
- treePos.m_pos = hi;
- return true;
+ return false;
}
- thrjam(ctx.jamBuffer);
- // backwards compatible for now
- treePos.m_loc = bottomNode.m_loc;
- treePos.m_pos = 0;
return true;
}
/*
* Search for entry to remove.
- *
- * Compares search key to each node min. A move to right subtree can
- * overshoot target node. The last such node is saved. The final node
- * is a semi-leaf or leaf. If search key is less than final node min
- * then the saved node is the g.l.b of the final node and we move back
- * to it.
*/
bool
-Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToRemove(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
- const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
- if (currNode.m_loc == NullTupLoc) {
+ if (unlikely(currNode.m_loc == NullTupLoc)) {
// empty tree - failed
- jam();
+ thrjam(ctx.jamBuffer);
return false;
}
- NodeHandle glbNode(frag); // potential g.l.b of final node
- while (true) {
- jam();
- selectNode(currNode, currNode.m_loc);
- int ret;
- // compare prefix
- unsigned start = 0;
- ret = cmpSearchKey(c_ctx, frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
- if (ret == NdbSqlUtil::CmpUnknown) {
- jam();
- // read and compare remaining attributes
- ndbrequire(start < numAttrs);
- readKeyAttrs(c_ctx, frag, currNode.getMinMax(0), start, c_ctx.c_entryKey);
- ret = cmpSearchKey(c_ctx, frag, start, searchKey, c_ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
- }
- if (ret == 0) {
- jam();
- // keys are equal, compare entry values
- ret = searchEnt.cmp(currNode.getMinMax(0));
- }
- if (ret < 0) {
- jam();
- const TupLoc loc = currNode.getLink(0);
- if (loc != NullTupLoc) {
- jam();
- // continue to left subtree
- currNode.m_loc = loc;
- continue;
- }
- if (! glbNode.isNull()) {
- jam();
- // move up to the g.l.b
- currNode = glbNode;
- }
- } else if (ret > 0) {
- jam();
- const TupLoc loc = currNode.getLink(1);
- if (loc != NullTupLoc) {
- jam();
- // save potential g.l.b
- glbNode = currNode;
- // continue to right subtree
- currNode.m_loc = loc;
- continue;
- }
- } else {
- jam();
- treePos.m_loc = currNode.m_loc;
- treePos.m_pos = 0;
- return true;
- }
- break;
- }
- // anticipate
+ findNodeToUpdate(ctx, frag, searchKey, searchEnt, currNode);
treePos.m_loc = currNode.m_loc;
- // pos 0 was handled above
- for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) {
- jam();
- // compare only the entry
- if (searchEnt.eq(currNode.getEnt(j))) {
- jam();
- treePos.m_pos = j;
- return true;
- }
+ if (! findPosToRemove(ctx, frag, searchKey, searchEnt, currNode, treePos)) {
+ thrjam(ctx.jamBuffer);
+ return false;
}
- treePos.m_pos = currNode.getOccup();
- // not found - failed
- return false;
+ return true;
}
/*
- * Search for scan start position.
- *
- * Similar to searchToAdd. The routines differ somewhat depending on
- * scan direction and are done by separate methods.
+ * Search down non-empty tree for node to start scan from. Similar to
+ * findNodeToUpdate(). Direction is 0-ascending or 1-descending.
+ * Search within the found node is done by caller.
*/
void
-Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
-{
- const TreeHead& tree = frag.m_tree;
- if (tree.m_root != NullTupLoc) {
- if (! descending)
- searchToScanAscending(frag, boundInfo, boundCount, treePos);
- else
- searchToScanDescending(frag, boundInfo, boundCount, treePos);
- return;
- }
- // empty tree
-}
-
-void
-Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
+Dbtux::findNodeToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode)
{
const TreeHead& tree = frag.m_tree;
- NodeHandle currNode(frag);
- currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node
- NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
- ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
+ ret = cmpScanBound(frag, idir, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare all attributes
- readKeyAttrs(c_ctx, frag, currNode.getMinMax(0), 0, c_ctx.c_entryKey);
- ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_ctx.c_entryKey);
+ readKeyAttrs(c_ctx, frag, currNode.getEnt(0), 0, c_ctx.c_entryKey);
+ ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret < 0) {
@@ -297,17 +237,12 @@ Dbtux::searchToScanAscending(Frag& frag,
}
if (! glbNode.isNull()) {
jam();
- // move up to the g.l.b but remember the bottom node
- bottomNode = currNode;
+ // move up to the g.l.b
currNode = glbNode;
- } else {
- // start scanning this node
- treePos.m_loc = currNode.m_loc;
- treePos.m_pos = 0;
- treePos.m_dir = 3;
- return;
}
- } else {
+ break;
+ }
+ if (ret > 0) {
// bound is at or right of this node
jam();
const TupLoc loc = currNode.getLink(1);
@@ -319,117 +254,89 @@ Dbtux::searchToScanAscending(Frag& frag,
currNode.m_loc = loc;
continue;
}
+ break;
}
- break;
+ // ret == 0 never
+ ndbrequire(false);
}
- for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
+}
+
+/*
+ * Search across final node for position to start scan from. Use binary
+ * search similar to findPosToAdd().
+ */
+void
+Dbtux::findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos)
+{
+ const int jdir = 1 - 2 * int(idir);
+ int lo = -1;
+ int hi = (int)currNode.getOccup();
+ while (hi - lo > 1) {
jam();
- int ret;
- // read and compare attributes
- readKeyAttrs(c_ctx, frag, currNode.getEnt(j), 0, c_ctx.c_entryKey);
- ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+ // hi - lo > 1 implies lo < j < hi
+ int j = (hi + lo) / 2;
+ int ret = (-1) * jdir;
+ if (boundCount != 0) {
+ // read and compare attributes
+ const TreeEnt currEnt = currNode.getEnt(j);
+ readKeyAttrs(c_ctx, frag, currEnt, 0, c_ctx.c_entryKey);
+ ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
+ }
+ ndbrequire(ret != 0);
if (ret < 0) {
- // found first entry satisfying the bound
- treePos.m_loc = currNode.m_loc;
- treePos.m_pos = j;
- treePos.m_dir = 3;
- return;
+ jam();
+ hi = j;
+ } else if (ret > 0) {
+ jam();
+ lo = j;
+ } else {
+ // ret == 0 never
+ ndbrequire(false);
}
}
- // bound is to right of this node
- if (! bottomNode.isNull()) {
- jam();
- // start scanning the l.u.b
- treePos.m_loc = bottomNode.m_loc;
- treePos.m_pos = 0;
- treePos.m_dir = 3;
- return;
- }
- // start scanning upwards (pretend we came from right child)
- treePos.m_loc = currNode.m_loc;
- treePos.m_dir = 1;
+ // return hi pos, caller handles ascending vs descending
+ *pos = hi;
}
+/*
+ * Search for scan start position.
+ */
void
-Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
+Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
- NodeHandle glbNode(frag); // potential g.l.b of final node
- NodeHandle bottomNode(frag);
- while (true) {
+ if (unlikely(currNode.m_loc == NullTupLoc)) {
+ // empty tree
jam();
- selectNode(currNode, currNode.m_loc);
- int ret;
- // compare prefix
- ret = cmpScanBound(frag, 1, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
- if (ret == NdbSqlUtil::CmpUnknown) {
+ return;
+ }
+ const unsigned idir = unsigned(descending);
+ findNodeToScan(frag, idir, boundInfo, boundCount, currNode);
+ treePos.m_loc = currNode.m_loc;
+ Uint16 pos;
+ findPosToScan(frag, idir, boundInfo, boundCount, currNode, &pos);
+ const unsigned occup = currNode.getOccup();
+ if (idir == 0) {
+ if (pos < occup) {
jam();
- // read and compare all attributes
- readKeyAttrs(c_ctx, frag, currNode.getMinMax(0), 0, c_ctx.c_entryKey);
- ret = cmpScanBound(frag, 1, boundInfo, boundCount, c_ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+ treePos.m_pos = pos;
+ treePos.m_dir = 3;
+ } else {
+ // start scan after node end i.e. proceed to right child
+ treePos.m_pos = ZNIL;
+ treePos.m_dir = 5;
}
- if (ret < 0) {
- // bound is left of this node
+ } else {
+ if (pos > 0) {
jam();
- const TupLoc loc = currNode.getLink(0);
- if (loc != NullTupLoc) {
- jam();
- // continue to left subtree
- currNode.m_loc = loc;
- continue;
- }
- if (! glbNode.isNull()) {
- jam();
- // move up to the g.l.b but remember the bottom node
- bottomNode = currNode;
- currNode = glbNode;
- } else {
- // empty result set
- return;
- }
+ // start scan from previous entry
+ treePos.m_pos = pos - 1;
+ treePos.m_dir = 3;
} else {
- // bound is at or right of this node
- jam();
- const TupLoc loc = currNode.getLink(1);
- if (loc != NullTupLoc) {
- jam();
- // save potential g.l.b
- glbNode = currNode;
- // continue to right subtree
- currNode.m_loc = loc;
- continue;
- }
- }
- break;
- }
- for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
- jam();
- int ret;
- // read and compare attributes
- readKeyAttrs(c_ctx, frag, currNode.getEnt(j), 0, c_ctx.c_entryKey);
- ret = cmpScanBound(frag, 1, boundInfo, boundCount, c_ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
- if (ret < 0) {
- if (j > 0) {
- // start scanning from previous entry
- treePos.m_loc = currNode.m_loc;
- treePos.m_pos = j - 1;
- treePos.m_dir = 3;
- return;
- }
- // start scanning upwards (pretend we came from left child)
- treePos.m_loc = currNode.m_loc;
- treePos.m_pos = 0;
+ treePos.m_pos = ZNIL;
treePos.m_dir = 0;
- return;
}
}
- // start scanning this node
- treePos.m_loc = currNode.m_loc;
- treePos.m_pos = currNode.getOccup() - 1;
- treePos.m_dir = 3;
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas:4342 to 4343) | jonas oreland | 27 Apr |