Below is the list of changes that have just been committed into a local
4.1 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://www.mysql.com/doc/I/n/Installing_source_tree.html
ChangeSet
1.1897 04/07/08 14:35:19 pekka@stripped +13 -0
tux optim 12 - remove max prefix + related
ndb/test/ndbapi/testOIBasic.cpp
1.9 04/07/08 14:34:31 pekka@stripped +3 -1
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/Times.txt
1.13 04/07/08 14:34:31 pekka@stripped +3 -0
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/Makefile.am
1.3 04/07/08 14:34:31 pekka@stripped +1 -0
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
1.8 04/07/08 14:34:31 pekka@stripped +0 -106
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
1.2 04/07/08 14:34:31 pekka@stripped +0 -0
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
1.8 04/07/08 14:34:31 pekka@stripped +23 -122
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
1.9 04/07/08 14:34:31 pekka@stripped +13 -22
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
1.10 04/07/08 14:34:31 pekka@stripped +13 -9
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
1.8 04/07/08 14:34:31 pekka@stripped +8 -0
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
1.7 04/07/08 14:34:31 pekka@stripped +78 -17
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
1.5 04/07/08 14:34:31 pekka@stripped +134 -72
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/Dbtux.hpp
1.17 04/07/08 14:34:31 pekka@stripped +55 -23
tux optim 12 - remove max prefix + related
ndb/include/kernel/ndb_limits.h
1.3 04/07/08 14:34:31 pekka@stripped +1 -1
tux optim 12 - remove max prefix + related
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
1.1 04/07/08 14:33:43 pekka@stripped +333 -0
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
1.0 04/07/08 14:33:43 pekka@stripped +0 -0
BitKeeper file /space/pekka/ndb/version/my41/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: pekka
# Host: orca.ndb.mysql.com
# Root: /space/pekka/ndb/version/my41
--- 1.2/ndb/include/kernel/ndb_limits.h 2004-06-10 12:03:51 +02:00
+++ 1.3/ndb/include/kernel/ndb_limits.h 2004-07-08 14:34:31 +02:00
@@ -88,7 +88,7 @@
* Ordered index constants. Make configurable per index later.
*/
#define MAX_TTREE_NODE_SIZE 64 // total words in node
-#define MAX_TTREE_PREF_SIZE 4 // words in min/max prefix each
+#define MAX_TTREE_PREF_SIZE 4 // words in min prefix
#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy
/*
--- 1.16/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2004-06-19 13:31:53 +02:00
+++ 1.17/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2004-07-08 14:34:31 +02:00
@@ -77,10 +77,14 @@
#define jam() jamLine(60000 + __LINE__)
#define jamEntry() jamEntryLine(60000 + __LINE__)
#endif
-#ifdef DBTUX_CMP_CPP
+#ifdef DBTUX_SEARCH_CPP
#define jam() jamLine(70000 + __LINE__)
#define jamEntry() jamEntryLine(70000 + __LINE__)
#endif
+#ifdef DBTUX_CMP_CPP
+#define jam() jamLine(80000 + __LINE__)
+#define jamEntry() jamEntryLine(80000 + __LINE__)
+#endif
#ifdef DBTUX_DEBUG_CPP
#define jam() jamLine(90000 + __LINE__)
#define jamEntry() jamEntryLine(90000 + __LINE__)
@@ -112,6 +116,7 @@
static const unsigned DescPageSize = 256;
private:
static const unsigned MaxTreeNodeSize = MAX_TTREE_NODE_SIZE;
+ static const unsigned MaxPrefSize = MAX_TTREE_PREF_SIZE;
static const unsigned ScanBoundSegmentSize = 7;
static const unsigned MaxAccLockOps = MAX_PARALLEL_OP_PER_SCAN;
BLOCK_DEFINES(Dbtux);
@@ -206,19 +211,19 @@
unsigned m_fragBit : 1; // which duplicated table fragment
TreeEnt();
// methods
+ bool eq(const TreeEnt ent) const;
int cmp(const TreeEnt ent) const;
};
static const unsigned TreeEntSize = sizeof(TreeEnt) >> 2;
static const TreeEnt NullTreeEnt;
/*
- * Tree node has 1) fixed part 2) actual table data for min and max
- * prefix 3) max and min entries 4) rest of entries 5) one extra entry
+ * Tree node has 1) fixed part 2) a prefix of index key data for min
+ * entry 3) max and min entries 4) rest of entries 5) one extra entry
* used as work space.
*
* struct TreeNode part 1, size 6 words
* min prefix part 2, size TreeHead::m_prefSize
- * max prefix part 2, size TreeHead::m_prefSize
* max entry part 3
* min entry part 3
* rest of entries part 4
@@ -265,14 +270,14 @@
friend struct TreeHead;
struct TreeHead {
Uint8 m_nodeSize; // words in tree node
- Uint8 m_prefSize; // words in min/max prefix each
+ Uint8 m_prefSize; // words in min prefix
Uint8 m_minOccup; // min entries in internal node
Uint8 m_maxOccup; // max entries in node
TupLoc m_root; // root node
TreeHead();
// methods
unsigned getSize(AccSize acc) const;
- Data getPref(TreeNode* node, unsigned i) const;
+ Data getPref(TreeNode* node) const;
TreeEnt* getEntList(TreeNode* node) const;
};
@@ -514,6 +519,8 @@
NodeHandle(Frag& frag);
NodeHandle(const NodeHandle& node);
NodeHandle& operator=(const NodeHandle& node);
+ // check if unassigned
+ bool isNull();
// getters
TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell
@@ -528,7 +535,7 @@
void setBalance(int b);
void setNodeScan(Uint32 scanPtrI);
// access other parts of the node
- Data getPref(unsigned i);
+ Data getPref();
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert
@@ -618,7 +625,7 @@
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
void deleteNode(Signal* signal, NodeHandle& node);
- void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
+ void setNodePref(Signal* signal, NodeHandle& node);
// node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
@@ -633,7 +640,6 @@
/*
* DbtuxTree.cpp
*/
- void treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
@@ -658,11 +664,19 @@
void releaseScanOp(ScanOpPtr& scanPtr);
/*
+ * DbtuxSearch.cpp
+ */
+ void searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ void searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
+
+ /*
* DbtuxCmp.cpp
*/
- int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2 = MaxAttrDataSize);
- int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2);
- int cmpScanBound(const Frag& frag, const BoundPar boundPar);
+ int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
+ int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey);
+ int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
+ int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey);
/*
* DbtuxDebug.cpp
@@ -675,6 +689,7 @@
TupLoc m_parent; // expected parent address
int m_depth; // returned depth
unsigned m_occup; // returned occupancy
+ TreeEnt m_minmax[2]; // returned subtree min and max
bool m_ok; // returned status
PrintPar();
};
@@ -699,6 +714,8 @@
DebugTree = 4, // log and check tree after each op
DebugScan = 8 // log scans
};
+ static const int DataFillByte = 0xa2;
+ static const int NodeFillByte = 0xa4;
#endif
// start up info
@@ -859,13 +876,18 @@
{
}
+inline bool
+Dbtux::TreeEnt::eq(const TreeEnt ent) const
+{
+ return
+ m_tupLoc == ent.m_tupLoc &&
+ m_tupVersion == ent.m_tupVersion &&
+ m_fragBit == ent.m_fragBit;
+}
+
inline int
Dbtux::TreeEnt::cmp(const TreeEnt ent) const
{
- if (m_fragBit < ent.m_fragBit)
- return -1;
- if (m_fragBit > ent.m_fragBit)
- return +1;
if (m_tupLoc.m_pageId < ent.m_tupLoc.m_pageId)
return -1;
if (m_tupLoc.m_pageId > ent.m_tupLoc.m_pageId)
@@ -878,6 +900,10 @@
return -1;
if (m_tupVersion > ent.m_tupVersion)
return +1;
+ if (m_fragBit < ent.m_fragBit)
+ return -1;
+ if (m_fragBit > ent.m_fragBit)
+ return +1;
return 0;
}
@@ -920,7 +946,7 @@
case AccHead:
return NodeHeadSize;
case AccPref:
- return NodeHeadSize + 2 * m_prefSize + 2 * TreeEntSize;
+ return NodeHeadSize + m_prefSize + 2 * TreeEntSize;
case AccFull:
return m_nodeSize;
}
@@ -929,16 +955,16 @@
}
inline Dbtux::Data
-Dbtux::TreeHead::getPref(TreeNode* node, unsigned i) const
+Dbtux::TreeHead::getPref(TreeNode* node) const
{
- Uint32* ptr = (Uint32*)node + NodeHeadSize + i * m_prefSize;
+ Uint32* ptr = (Uint32*)node + NodeHeadSize;
return ptr;
}
inline Dbtux::TreeEnt*
Dbtux::TreeHead::getEntList(TreeNode* node) const
{
- Uint32* ptr = (Uint32*)node + NodeHeadSize + 2 * m_prefSize;
+ Uint32* ptr = (Uint32*)node + NodeHeadSize + m_prefSize;
return (TreeEnt*)ptr;
}
@@ -1087,6 +1113,12 @@
return *this;
}
+inline bool
+Dbtux::NodeHandle::isNull()
+{
+ return m_node == 0;
+}
+
inline Dbtux::TupLoc
Dbtux::NodeHandle::getLink(unsigned i)
{
@@ -1161,11 +1193,11 @@
}
inline Dbtux::Data
-Dbtux::NodeHandle::getPref(unsigned i)
+Dbtux::NodeHandle::getPref()
{
TreeHead& tree = m_frag.m_tree;
- ndbrequire(m_acc >= AccPref && i <= 1);
- return tree.getPref(m_node, i);
+ ndbrequire(m_acc >= AccPref);
+ return tree.getPref(m_node);
}
inline Dbtux::TreeEnt
--- 1.4/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp 2004-06-19 13:31:53 +02:00
+++ 1.5/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp 2004-07-08 14:34:31 +02:00
@@ -25,14 +25,14 @@
* prefix may be partial in which case CmpUnknown may be returned.
*/
int
-Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2)
+Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// number of words of attribute data left
- unsigned len2 = maxlen2;
+ unsigned len2 = maxlen;
// skip to right position in search key
- data1 += start;
+ searchKey += start;
int ret = 0;
while (start < numAttrs) {
if (len2 < AttributeHeaderSize) {
@@ -41,20 +41,20 @@
break;
}
len2 -= AttributeHeaderSize;
- if (*data1 != 0) {
- if (! data2.ah().isNULL()) {
+ if (*searchKey != 0) {
+ if (! entryData.ah().isNULL()) {
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
const unsigned typeId = descAttr.m_typeId;
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
- ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize());
+ ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2);
len2 -= size2;
// compare
- const Uint32* const p1 = *data1;
- const Uint32* const p2 = &data2[AttributeHeaderSize];
+ const Uint32* const p1 = *searchKey;
+ const Uint32* const p2 = &entryData[AttributeHeaderSize];
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
if (ret != 0) {
jam();
@@ -67,15 +67,15 @@
break;
}
} else {
- if (! data2.ah().isNULL()) {
+ if (! entryData.ah().isNULL()) {
jam();
// NULL > not NULL
ret = +1;
break;
}
}
- data1 += 1;
- data2 += AttributeHeaderSize + data2.ah().getDataSize();
+ searchKey += 1;
+ entryData += AttributeHeaderSize + entryData.ah().getDataSize();
start++;
}
// XXX until data format errors are handled
@@ -89,17 +89,17 @@
* Start position is updated as in previous routine.
*/
int
-Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2)
+Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// skip to right position
- data1 += start;
- data2 += start;
+ searchKey += start;
+ entryKey += start;
int ret = 0;
while (start < numAttrs) {
- if (*data1 != 0) {
- if (*data2 != 0) {
+ if (*searchKey != 0) {
+ if (*entryKey != 0) {
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
@@ -107,8 +107,8 @@
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
- const Uint32* const p1 = *data1;
- const Uint32* const p2 = *data2;
+ const Uint32* const p1 = *searchKey;
+ const Uint32* const p2 = *entryKey;
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
if (ret != 0) {
jam();
@@ -121,15 +121,15 @@
break;
}
} else {
- if (*data2 != 0) {
+ if (*entryKey != 0) {
jam();
// NULL > not NULL
ret = +1;
break;
}
}
- data1 += 1;
- data2 += 1;
+ searchKey += 1;
+ entryKey += 1;
start++;
}
// XXX until data format errors are handled
@@ -137,71 +137,68 @@
return ret;
}
-
/*
- * Scan bound vs tree entry.
+ * Scan bound vs node prefix.
*
* Compare lower or upper bound and index attribute data. The attribute
* data may be partial in which case CmpUnknown may be returned.
- * Returns -1 if the boundary is to the left of the compared key and +1 if
- * the boundary is to the right of the compared key.
+ * Returns -1 if the boundary is to the left of the compared key and +1
+ * if the boundary is to the right of the compared key.
*
- * To get this behaviour we treat equality a little bit special.
- * If the boundary is a lower bound then the boundary is to the left of all
- * equal keys and if it is an upper bound then the boundary is to the right
- * of all equal keys.
+ * To get this behaviour we treat equality a little bit special. If the
+ * boundary is a lower bound then the boundary is to the left of all
+ * equal keys and if it is an upper bound then the boundary is to the
+ * right of all equal keys.
*
* When searching for the first key we are using the lower bound to try
- * to find the first key that is to the right of the boundary.
- * Then we start scanning from this tuple (including the tuple itself)
- * until we find the first key which is to the right of the boundary. Then
- * we stop and do not include that key in the scan result.
+ * to find the first key that is to the right of the boundary. Then we
+ * start scanning from this tuple (including the tuple itself) until we
+ * find the first key which is to the right of the boundary. Then we
+ * stop and do not include that key in the scan result.
*/
int
-Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar)
+Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
{
- unsigned type = 4;
- int ret = 0;
- /*
- No boundary means full scan, low boundary is to the right of all keys.
- Thus we should always return -1. For upper bound we are to the right of
- all keys, thus we should always return +1. We achieve this behaviour
- by initialising return value to 0 and set type to 4.
- */
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
- ConstData data1 = boundPar.m_data1;
- ConstData data2 = boundPar.m_data2;
// direction 0-lower 1-upper
- const unsigned dir = boundPar.m_dir;
ndbrequire(dir <= 1);
// number of words of data left
- unsigned len2 = boundPar.m_len2;
- for (unsigned i = 0; i < boundPar.m_count1; i++) {
+ unsigned len2 = maxlen;
+ /*
+ * No boundary means full scan, low boundary is to the right of all
+ * keys. Thus we should always return -1. For upper bound we are to
+ * the right of all keys, thus we should always return +1. We achieve
+ * this behaviour by initializing type to 4.
+ */
+ unsigned type = 4;
+ while (boundCount != 0) {
if (len2 < AttributeHeaderSize) {
jam();
return NdbSqlUtil::CmpUnknown;
}
len2 -= AttributeHeaderSize;
// get and skip bound type
- type = data1[0];
- data1 += 1;
- ndbrequire(! data1.ah().isNULL());
- if (! data2.ah().isNULL()) {
+ type = boundInfo[0];
+ boundInfo += 1;
+ ndbrequire(! boundInfo.ah().isNULL());
+ if (! entryData.ah().isNULL()) {
jam();
// current attribute
- const unsigned index = data1.ah().getAttributeId();
+ const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index];
const unsigned typeId = descAttr.m_typeId;
- ndbrequire(data2.ah().getAttributeId() == descAttr.m_primaryAttrId);
+ ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size
- const unsigned size1 = data1.ah().getDataSize();
- ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize());
+ const unsigned size1 = boundInfo.ah().getDataSize();
+ ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2);
len2 -= size2;
// compare
- const Uint32* const p1 = &data1[AttributeHeaderSize];
- const Uint32* const p2 = &data2[AttributeHeaderSize];
- ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
+ const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
+ const Uint32* const p2 = &entryData[AttributeHeaderSize];
+ int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
+ // XXX until data format errors are handled
+ ndbrequire(ret != NdbSqlUtil::CmpError);
if (ret != 0) {
jam();
return ret;
@@ -209,22 +206,22 @@
} else {
jam();
/*
- NULL is bigger than any bound, thus the boundary is always to the
- left of NULL
- */
+ * NULL is bigger than any bound, thus the boundary is always to
+ * the left of NULL.
+ */
return -1;
}
- data1 += AttributeHeaderSize + data1.ah().getDataSize();
- data2 += AttributeHeaderSize + data2.ah().getDataSize();
+ boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
+ entryData += AttributeHeaderSize + entryData.ah().getDataSize();
+ boundCount -= 1;
}
- ndbassert(ret == 0);
if (dir == 0) {
jam();
/*
- Looking for the lower bound. If strict lower bound then the boundary is
- to the right of the compared key and otherwise (equal included in range)
- then the boundary is to the left of the key.
- */
+ * Looking for the lower bound. If strict lower bound then the
+ * boundary is to the right of the compared key and otherwise (equal
+ * included in range) then the boundary is to the left of the key.
+ */
if (type == 1) {
jam();
return +1;
@@ -233,10 +230,11 @@
} else {
jam();
/*
- Looking for the upper bound. If strict upper bound then the boundary is
- to the left of all equal keys and otherwise (equal included in the
- range) then the boundary is to the right of all equal keys.
- */
+ * Looking for the upper bound. If strict upper bound then the
+ * boundary is to the left of all equal keys and otherwise (equal
+ * included in the range) then the boundary is to the right of all
+ * equal keys.
+ */
if (type == 3) {
jam();
return -1;
@@ -245,3 +243,67 @@
}
}
+/*
+ * Scan bound vs tree entry.
+ */
+int
+Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey)
+{
+ const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
+ // direction 0-lower 1-upper
+ ndbrequire(dir <= 1);
+ // initialize type to equality
+ unsigned type = 4;
+ while (boundCount != 0) {
+ // get and skip bound type
+ type = boundInfo[0];
+ boundInfo += 1;
+ ndbrequire(! boundInfo.ah().isNULL());
+ if (*entryKey != 0) {
+ jam();
+ // current attribute
+ const unsigned index = boundInfo.ah().getAttributeId();
+ const DescAttr& descAttr = descEnt.m_descAttr[index];
+ const unsigned typeId = descAttr.m_typeId;
+ // full data size
+ const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
+ // compare
+ const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
+ const Uint32* const p2 = *entryKey;
+ int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
+ // XXX until data format errors are handled
+ ndbrequire(ret != NdbSqlUtil::CmpError);
+ if (ret != 0) {
+ jam();
+ return ret;
+ }
+ } else {
+ jam();
+ /*
+ * NULL is bigger than any bound, thus the boundary is always to
+ * the left of NULL.
+ */
+ return -1;
+ }
+ boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
+ entryKey += 1;
+ boundCount -= 1;
+ }
+ if (dir == 0) {
+ // lower bound
+ jam();
+ if (type == 1) {
+ jam();
+ return +1;
+ }
+ return -1;
+ } else {
+ // upper bound
+ jam();
+ if (type == 3) {
+ jam();
+ return -1;
+ }
+ return +1;
+ }
+}
--- 1.6/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp 2004-06-17 10:50:58 +02:00
+++ 1.7/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp 2004-07-08 14:34:31 +02:00
@@ -137,16 +137,17 @@
par.m_ok = false;
}
}
+ static const char* const sep = " *** ";
// check child-parent links
if (node.getLink(2) != par.m_parent) {
par.m_ok = false;
- out << par.m_path << " *** ";
+ out << par.m_path << sep;
out << "parent loc " << hex << node.getLink(2);
out << " should be " << hex << par.m_parent << endl;
}
if (node.getSide() != par.m_side) {
par.m_ok = false;
- out << par.m_path << " *** ";
+ out << par.m_path << sep;
out << "side " << dec << node.getSide();
out << " should be " << dec << par.m_side << endl;
}
@@ -154,26 +155,26 @@
const int balance = -cpar[0].m_depth + cpar[1].m_depth;
if (node.getBalance() != balance) {
par.m_ok = false;
- out << par.m_path << " *** ";
+ out << par.m_path << sep;
out << "balance " << node.getBalance();
out << " should be " << balance << endl;
}
if (abs(node.getBalance()) > 1) {
par.m_ok = false;
- out << par.m_path << " *** ";
+ out << par.m_path << sep;
out << "balance " << node.getBalance() << " is invalid" << endl;
}
// check occupancy
- if (node.getOccup() > tree.m_maxOccup) {
+ if (node.getOccup() == 0 || node.getOccup() > tree.m_maxOccup) {
par.m_ok = false;
- out << par.m_path << " *** ";
+ out << par.m_path << sep;
out << "occupancy " << node.getOccup();
- out << " greater than max " << tree.m_maxOccup << endl;
+ out << " zero or greater than max " << tree.m_maxOccup << endl;
}
// check for occupancy of interior node
if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) {
par.m_ok = false;
- out << par.m_path << " *** ";
+ out << par.m_path << sep;
out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl;
}
@@ -183,13 +184,74 @@
node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
par.m_ok = false;
- out << par.m_path << " *** ";
+ out << par.m_path << sep;
out << "missed merge with child " << i << endl;
}
}
+ // check inline prefix
+ { ConstData data1 = node.getPref();
+ Uint32 data2[MaxPrefSize];
+ memset(data2, DataFillByte, MaxPrefSize << 2);
+ readKeyAttrs(frag, node.getMinMax(0), 0, c_searchKey);
+ copyAttrs(frag, c_searchKey, data2, tree.m_prefSize);
+ for (unsigned n = 0; n < tree.m_prefSize; n++) {
+ if (data1[n] != data2[n]) {
+ par.m_ok = false;
+ out << par.m_path << sep;
+ out << "inline prefix mismatch word " << n;
+ out << " value " << hex << data1[n];
+ out << " should be " << hex << data2[n] << endl;
+ break;
+ }
+ }
+ }
+ // check ordering within node
+ for (unsigned j = 1; j < node.getOccup(); j++) {
+ unsigned start = 0;
+ const TreeEnt ent1 = node.getEnt(j - 1);
+ const TreeEnt ent2 = node.getEnt(j);
+ if (j == 1) {
+ readKeyAttrs(frag, ent1, start, c_searchKey);
+ } else {
+ memcpy(c_searchKey, c_entryKey, frag.m_numAttrs << 2);
+ }
+ readKeyAttrs(frag, ent2, start, c_entryKey);
+ int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
+ if (ret == 0)
+ ret = ent1.cmp(ent2);
+ if (ret != -1) {
+ par.m_ok = false;
+ out << par.m_path << sep;
+ out << " disorder within node at pos " << j << endl;
+ }
+ }
+ // check ordering wrt subtrees
+ for (unsigned i = 0; i <= 1; i++) {
+ if (node.getLink(i) == NullTupLoc)
+ continue;
+ const TreeEnt ent1 = cpar[i].m_minmax[1 - i];
+ const TreeEnt ent2 = node.getMinMax(i);
+ unsigned start = 0;
+ readKeyAttrs(frag, ent1, start, c_searchKey);
+ readKeyAttrs(frag, ent2, start, c_entryKey);
+ int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
+ if (ret == 0)
+ ret = ent1.cmp(ent2);
+ if (ret != (i == 0 ? -1 : +1)) {
+ par.m_ok = false;
+ out << par.m_path << sep;
+ out << " disorder wrt subtree " << i << endl;
+ }
+ }
// return values
par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
par.m_occup = node.getOccup();
+ for (unsigned i = 0; i <= 1; i++) {
+ if (node.getLink(i) == NullTupLoc)
+ par.m_minmax[i] = node.getMinMax(i);
+ else
+ par.m_minmax[i] = cpar[i].m_minmax[i];
+ }
}
NdbOut&
@@ -355,20 +417,19 @@
out << " [acc " << dec << node.m_acc << "]";
out << " [node " << *node.m_node << "]";
if (node.m_acc >= Dbtux::AccPref) {
- for (unsigned i = 0; i <= 1; i++) {
- out << " [pref " << dec << i;
- const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + i * tree.m_prefSize;
- for (unsigned j = 0; j < node.m_frag.m_tree.m_prefSize; j++)
- out << " " << hex << data[j];
- out << "]";
- }
+ const Uint32* data;
+ out << " [pref";
+ data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize;
+ for (unsigned j = 0; j < tree.m_prefSize; j++)
+ out << " " << hex << data[j];
+ out << "]";
out << " [entList";
unsigned numpos = node.m_node->m_occup;
if (node.m_acc < Dbtux::AccFull && numpos > 2) {
numpos = 2;
out << "(" << dec << numpos << ")";
}
- const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + 2 * tree.m_prefSize;
+ data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
for (unsigned pos = 0; pos < numpos; pos++)
out << " " << entList[pos];
--- 1.7/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2004-06-19 13:31:53 +02:00
+++ 1.8/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2004-07-08 14:34:31 +02:00
@@ -26,8 +26,13 @@
#ifdef VM_TRACE
debugFile(0),
debugOut(*new NullOutputStream()),
+ // until ndb_mgm supports dump
+#ifdef DBTUX_DEBUG_TREE
+ debugFlags(DebugTree),
+#else
debugFlags(0),
#endif
+#endif
c_internalStartPhase(0),
c_typeOfStart(NodeState::ST_ILLEGAL_TYPE),
c_dataBuffer(0)
@@ -314,6 +319,9 @@
keyAttrs += 1;
data1 += 1;
}
+#ifdef VM_TRACE
+ memset(data2, DataFillByte, len2 << 2);
+#endif
}
BLOCK_FUNCTIONS(Dbtux);
--- 1.9/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp 2004-06-19 13:31:53 +02:00
+++ 1.10/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp 2004-07-08 14:34:31 +02:00
@@ -111,19 +111,18 @@
debugOut << endl;
}
#endif
- // find position in tree
- TreePos treePos;
- treeSearch(signal, frag, c_searchKey, ent, treePos);
-#ifdef VM_TRACE
- if (debugFlags & DebugMaint) {
- debugOut << treePos << endl;
- }
-#endif
// do the operation
req->errorCode = 0;
+ TreePos treePos;
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
+ searchToAdd(signal, frag, c_searchKey, ent, treePos);
+#ifdef VM_TRACE
+ if (debugFlags & DebugMaint) {
+ debugOut << treePos << endl;
+ }
+#endif
if (treePos.m_match) {
jam();
// there is no "Building" state so this will have to do
@@ -152,6 +151,12 @@
break;
case TuxMaintReq::OpRemove:
jam();
+ searchToRemove(signal, frag, c_searchKey, ent, treePos);
+#ifdef VM_TRACE
+ if (debugFlags & DebugMaint) {
+ debugOut << treePos << endl;
+ }
+#endif
if (! treePos.m_match) {
jam();
// there is no "Building" state so this will have to do
@@ -167,7 +172,6 @@
ndbrequire(false);
break;
}
- // commit and release nodes
#ifdef VM_TRACE
if (debugFlags & DebugTree) {
printTree(signal, frag, debugOut);
--- 1.8/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2004-06-19 13:31:53 +02:00
+++ 1.9/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2004-07-08 14:34:31 +02:00
@@ -85,10 +85,9 @@
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
- memset(node.getPref(0), 0xa2, tree.m_prefSize << 2);
- memset(node.getPref(1), 0xa2, tree.m_prefSize << 2);
+ memset(node.getPref(), DataFillByte, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node.m_node);
- memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
+ memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
}
@@ -116,12 +115,12 @@
* attribute headers for now. XXX use null mask instead
*/
void
-Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
+Dbtux::setNodePref(Signal* signal, NodeHandle& node)
{
const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree;
- readKeyAttrs(frag, node.getMinMax(i), 0, c_entryKey);
- copyAttrs(frag, c_entryKey, node.getPref(i), tree.m_prefSize);
+ readKeyAttrs(frag, node.getMinMax(0), 0, c_entryKey);
+ copyAttrs(frag, c_entryKey, node.getPref(), tree.m_prefSize);
}
// node operations
@@ -173,11 +172,9 @@
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
- // fix prefixes
+ // fix prefix
if (occup == 0 || pos == 0)
- setNodePref(signal, node, 0);
- if (occup == 0 || pos == occup)
- setNodePref(signal, node, 1);
+ setNodePref(signal, node);
}
/*
@@ -248,11 +245,9 @@
}
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
- // fix prefixes
+ // fix prefix
if (occup != 1 && pos == 0)
- setNodePref(signal, node, 0);
- if (occup != 1 && pos == occup - 1)
- setNodePref(signal, node, 1);
+ setNodePref(signal, node);
}
/*
@@ -325,11 +320,9 @@
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
- // fix prefixes
+ // fix prefix
if (true)
- setNodePref(signal, node, 0);
- if (occup == 1 || pos == occup - 1)
- setNodePref(signal, node, 1);
+ setNodePref(signal, node);
}
/*
@@ -403,11 +396,9 @@
}
tmpList[0] = newMin;
entList[0] = entList[occup];
- // fix prefixes
+ // fix prefix
if (true)
- setNodePref(signal, node, 0);
- if (occup == 1 || pos == occup - 1)
- setNodePref(signal, node, 1);
+ setNodePref(signal, node);
}
/*
--- 1.7/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp 2004-06-18 13:46:05 +02:00
+++ 1.8/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp 2004-07-08 14:34:31 +02:00
@@ -689,16 +689,9 @@
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
TreeHead& tree = frag.m_tree;
- if (tree.m_root == NullTupLoc) {
- // tree may have become empty
- jam();
- scan.m_state = ScanOp::Last;
- return;
- }
- TreePos pos;
- pos.m_loc = tree.m_root;
- NodeHandle node(frag);
- // unpack lower bound
+ // set up index keys for this operation
+ setKeyAttrs(frag);
+ // unpack lower bound into c_dataBuffer
const ScanBound& bound = *scan.m_bound[0];
ScanBoundIterator iter;
bound.first(iter);
@@ -707,103 +700,22 @@
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
- // comparison parameters
- BoundPar boundPar;
- boundPar.m_data1 = c_dataBuffer;
- boundPar.m_count1 = scan.m_boundCnt[0];
- boundPar.m_dir = 0;
-loop: {
+ // search for scan start position
+ TreePos treePos;
+ searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
+ if (treePos.m_loc == NullTupLoc) {
+ // empty tree
jam();
- selectNode(signal, node, pos.m_loc, AccPref);
- const unsigned occup = node.getOccup();
- ndbrequire(occup != 0);
- for (unsigned i = 0; i <= 1; i++) {
- jam();
- // compare prefix
- boundPar.m_data2 = node.getPref(i);
- boundPar.m_len2 = tree.m_prefSize;
- int ret = cmpScanBound(frag, boundPar);
- if (ret == NdbSqlUtil::CmpUnknown) {
- jam();
- // read full value
- ReadPar readPar;
- readPar.m_ent = node.getMinMax(i);
- readPar.m_first = 0;
- readPar.m_count = frag.m_numAttrs;
- readPar.m_data = 0; // leave in signal data
- tupReadAttrs(signal, frag, readPar);
- // compare full value
- boundPar.m_data2 = readPar.m_data;
- boundPar.m_len2 = ZNIL; // big
- ret = cmpScanBound(frag, boundPar);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
- }
- if (i == 0 && ret < 0) {
- jam();
- const TupLoc loc = node.getLink(i);
- if (loc != NullTupLoc) {
- jam();
- // continue to left subtree
- pos.m_loc = loc;
- goto loop;
- }
- // start scanning this node
- pos.m_pos = 0;
- pos.m_match = false;
- pos.m_dir = 3;
- scan.m_scanPos = pos;
- scan.m_state = ScanOp::Next;
- linkScan(node, scanPtr);
- return;
- }
- if (i == 1 && ret > 0) {
- jam();
- const TupLoc loc = node.getLink(i);
- if (loc != NullTupLoc) {
- jam();
- // continue to right subtree
- pos.m_loc = loc;
- goto loop;
- }
- // start scanning upwards
- pos.m_dir = 1;
- scan.m_scanPos = pos;
- scan.m_state = ScanOp::Next;
- linkScan(node, scanPtr);
- return;
- }
- }
- // read rest of current node
- accessNode(signal, node, AccFull);
- // look for first entry
- ndbrequire(occup >= 2);
- for (unsigned j = 1; j < occup; j++) {
- jam();
- ReadPar readPar;
- readPar.m_ent = node.getEnt(j);
- readPar.m_first = 0;
- readPar.m_count = frag.m_numAttrs;
- readPar.m_data = 0; // leave in signal data
- tupReadAttrs(signal, frag, readPar);
- // compare
- boundPar.m_data2 = readPar.m_data;
- boundPar.m_len2 = ZNIL; // big
- int ret = cmpScanBound(frag, boundPar);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
- if (ret < 0) {
- jam();
- // start scanning this node
- pos.m_pos = j;
- pos.m_match = false;
- pos.m_dir = 3;
- scan.m_scanPos = pos;
- scan.m_state = ScanOp::Next;
- linkScan(node, scanPtr);
- return;
- }
- }
- ndbrequire(false);
+ scan.m_state = ScanOp::Last;
+ return;
}
+ // set position and state
+ scan.m_scanPos = treePos;
+ scan.m_state = ScanOp::Next;
+ // link the scan to node found
+ NodeHandle node(frag);
+ selectNode(signal, node, treePos.m_loc, AccFull);
+ linkScan(node, scanPtr);
}
/*
@@ -841,7 +753,9 @@
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
- // unpack upper bound
+ // set up index keys for this operation
+ setKeyAttrs(frag);
+ // unpack upper bound into c_dataBuffer
const ScanBound& bound = *scan.m_bound[1];
ScanBoundIterator iter;
bound.first(iter);
@@ -850,11 +764,6 @@
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
- // comparison parameters
- BoundPar boundPar;
- boundPar.m_data1 = c_dataBuffer;
- boundPar.m_count1 = scan.m_boundCnt[1];
- boundPar.m_dir = 1;
// use copy of position
TreePos pos = scan.m_scanPos;
// get and remember original node
@@ -912,17 +821,9 @@
jam();
pos.m_ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged
- // XXX implement prefix optimization
- ReadPar readPar;
- readPar.m_ent = pos.m_ent;
- readPar.m_first = 0;
- readPar.m_count = frag.m_numAttrs;
- readPar.m_data = 0; // leave in signal data
- tupReadAttrs(signal, frag, readPar);
- // compare
- boundPar.m_data2 = readPar.m_data;
- boundPar.m_len2 = ZNIL; // big
- int ret = cmpScanBound(frag, boundPar);
+ // read and compare all attributes
+ readKeyAttrs(frag, pos.m_ent, 0, c_entryKey);
+ int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
jam();
--- 1.7/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp 2004-06-19 13:31:53 +02:00
+++ 1.8/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp 2004-07-08 14:34:31 +02:00
@@ -18,112 +18,6 @@
#include "Dbtux.hpp"
/*
- * Search for entry.
- *
- * Search key is index attribute data and tree entry value. Start from
- * root node and compare the key to min/max of each node. Use linear
- * search on the final (bounding) node. Initial attributes which are
- * same in min/max need not be checked.
- */
-void
-Dbtux::treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
-{
- const TreeHead& tree = frag.m_tree;
- const unsigned numAttrs = frag.m_numAttrs;
- treePos.m_loc = tree.m_root;
- if (treePos.m_loc == NullTupLoc) {
- // empty tree
- jam();
- treePos.m_pos = 0;
- treePos.m_match = false;
- return;
- }
- NodeHandle node(frag);
-loop: {
- jam();
- selectNode(signal, node, treePos.m_loc, AccPref);
- const unsigned occup = node.getOccup();
- ndbrequire(occup != 0);
- // number of equal initial attributes in bounding node
- unsigned start = ZNIL;
- for (unsigned i = 0; i <= 1; i++) {
- jam();
- unsigned start1 = 0;
- // compare prefix
- int ret = cmpSearchKey(frag, start1, searchKey, node.getPref(i), tree.m_prefSize);
- if (ret == NdbSqlUtil::CmpUnknown) {
- jam();
- // read and compare remaining attributes
- readKeyAttrs(frag, node.getMinMax(i), start1, c_entryKey);
- ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
- }
- if (start > start1)
- start = start1;
- if (ret == 0) {
- jam();
- // keys are equal, compare entry values
- ret = searchEnt.cmp(node.getMinMax(i));
- }
- if (i == 0 ? (ret < 0) : (ret > 0)) {
- jam();
- const TupLoc loc = node.getLink(i);
- if (loc != NullTupLoc) {
- jam();
- // continue to left/right subtree
- treePos.m_loc = loc;
- goto loop;
- }
- // position is immediately before/after this node
- treePos.m_pos = (i == 0 ? 0 : occup);
- treePos.m_match = false;
- return;
- }
- if (ret == 0) {
- jam();
- // position is at first/last entry
- treePos.m_pos = (i == 0 ? 0 : occup - 1);
- treePos.m_match = true;
- return;
- }
- }
- // access rest of the bounding node
- accessNode(signal, node, AccFull);
- // position is strictly within the node
- ndbrequire(occup >= 2);
- const unsigned numWithin = occup - 2;
- for (unsigned j = 1; j <= numWithin; j++) {
- jam();
- int ret = 0;
- if (start < numAttrs) {
- jam();
- // read and compare remaining attributes
- unsigned start1 = start;
- readKeyAttrs(frag, node.getEnt(j), start1, c_entryKey);
- ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
- }
- if (ret == 0) {
- jam();
- // keys are equal, compare entry values
- ret = searchEnt.cmp(node.getEnt(j));
- }
- if (ret <= 0) {
- jam();
- // position is before or at this entry
- treePos.m_pos = j;
- treePos.m_match = (ret == 0);
- return;
- }
- }
- // position is before last entry
- treePos.m_pos = occup - 1;
- treePos.m_match = false;
- return;
- }
-}
-
-/*
* Add entry.
*/
void
--- 1.8/ndb/test/ndbapi/testOIBasic.cpp 2004-06-23 11:49:22 +02:00
+++ 1.9/ndb/test/ndbapi/testOIBasic.cpp 2004-07-08 14:34:31 +02:00
@@ -2525,7 +2525,7 @@
for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
- RUNSTEP(par, readverify, MT);
+ RUNSTEP(par, readverify, ST);
RUNSTEP(par, dropindex, ST);
}
return 0;
@@ -2564,9 +2564,11 @@
t1.off(par.m_totrows);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
+ RUNSTEP(par, readverify, ST);
t2.on();
RUNSTEP(par, pkupdate, MT);
t2.off(par.m_totrows);
+ RUNSTEP(par, readverify, ST);
RUNSTEP(par, dropindex, ST);
}
LL1("update - " << t1.time());
--- New file ---
+++ ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp 04/07/08 14:33:43
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define DBTUX_SEARCH_CPP
#include "Dbtux.hpp"
/*
* Search for entry to add.
*
* Similar to searchToRemove (see below).
*
* TODO optimize for initial equal attrs in node min/max
*/
void
Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
/*
* In order to not (yet) change old behaviour, a position between
* 2 nodes returns the one at the bottom of the tree.
*/
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
unsigned start = 0;
ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
ndbrequire(start < numAttrs);
readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getMinMax(0));
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = true;
return;
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
unsigned start = 0;
readKeyAttrs(frag, currNode.getEnt(j), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getEnt(j));
}
if (ret <= 0) {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = (ret == 0);
return;
}
}
if (! bottomNode.isNull()) {
jam();
// backwards compatible for now
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
return;
}
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup();
treePos.m_match = false;
}
/*
* Search for entry to remove.
*
* Compares search key to each node min. A move to right subtree can
* overshoot target node. The last such node is saved. The final node
* is a half-leaf or leaf. If search key is less than final node min
* then the saved node is the g.l.b of the final node and we move back
* to it.
*/
void
Dbtux::searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
unsigned start = 0;
ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
ndbrequire(start < numAttrs);
readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getMinMax(0));
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b
currNode = glbNode;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = true;
return;
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
// pos 0 was handled above
for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) {
jam();
// compare only the entry
if (searchEnt.eq(currNode.getEnt(j))) {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = true;
return;
}
}
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup();
treePos.m_match = false;
}
/*
* Search for scan start position.
*
* Similar to searchToAdd.
*/
void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare all attributes
readKeyAttrs(frag, currNode.getMinMax(0), 0, c_entryKey);
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
} else {
// start scanning this node
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
ndbassert(false);
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
readKeyAttrs(frag, currNode.getEnt(j), 0, c_entryKey);
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
// start scanning from current entry
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
}
if (! bottomNode.isNull()) {
jam();
// start scanning the l.u.b
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
// start scanning upwards (pretend we came from right child)
treePos.m_loc = currNode.m_loc;
treePos.m_dir = 1;
}
--- 1.12/ndb/src/kernel/blocks/dbtux/Times.txt 2004-06-19 13:31:53 +02:00
+++ 1.13/ndb/src/kernel/blocks/dbtux/Times.txt 2004-07-08 14:34:31 +02:00
@@ -49,4 +49,7 @@
optim 11 mc02/a 43 ms 63 ms 46 pct
mc02/b 52 ms 86 ms 63 pct
+optim 12 mc02/a 38 ms 55 ms 43 pct
+ mc02/b 47 ms 77 ms 63 pct
+
vim: set et:
--- 1.2/ndb/src/kernel/blocks/dbtux/Makefile.am 2004-06-16 16:27:56 +02:00
+++ 1.3/ndb/src/kernel/blocks/dbtux/Makefile.am 2004-07-08 14:34:31 +02:00
@@ -7,6 +7,7 @@
DbtuxNode.cpp \
DbtuxTree.cpp \
DbtuxScan.cpp \
+ DbtuxSearch.cpp \
DbtuxCmp.cpp \
DbtuxDebug.cpp
| Thread |
|---|
| • bk commit into 4.1 tree (pekka:1.1897) | Pekka Nousiainen | 9 Jul |