4421 Pekka Nousiainen 2011-08-09
wl#4124 x18_fix.diff
bugfix: index stats on big-endian
modified:
storage/ndb/include/util/NdbPack.hpp
storage/ndb/src/common/util/NdbPack.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
4420 Pekka Nousiainen 2011-08-09
wl#4124 x17_fix.diff
testIndexStat: NdbRecord nullbits, align, const
modified:
storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
storage/ndb/test/ndbapi/testIndexStat.cpp
4419 Pekka Nousiainen 2011-07-31
wl#4124 x16_fix.diff
small changes inspired by sol10
modified:
storage/ndb/include/ndbapi/NdbIndexStat.hpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp
storage/ndb/tools/ndb_index_stat.cpp
=== modified file 'storage/ndb/include/util/NdbPack.hpp'
--- a/storage/ndb/include/util/NdbPack.hpp 2011-05-04 09:44:18 +0000
+++ b/storage/ndb/include/util/NdbPack.hpp 2011-08-09 15:37:45 +0000
@@ -117,7 +117,7 @@ public:
class Endian {
public:
enum Value {
- Native = 0,
+ Native = 0, // replaced by actual value
Little = 1,
Big = 2
};
@@ -300,6 +300,10 @@ public:
* Instance of an array of data values. The values are packed into
* a byte buffer. The buffer is also maintained as a single varbinary
* value if non-zero var bytes (length bytes) is specified.
+ *
+ * Data instances can be received from another source (such as table
+ * in database) and may not be native-endian. Such instances must
+ * first be completed with desc_all() and convert().
*/
class Data : public DataC {
public:
@@ -324,7 +328,7 @@ public:
// convert endian
int convert(Endian::Value to_endian);
// create complete instance from buffer contents
- int desc_all(Uint32 cnt);
+ int desc_all(Uint32 cnt, Endian::Value from_endian);
// getters
Uint32 get_max_len() const;
Uint32 get_max_len4() const;
@@ -351,7 +355,7 @@ public:
const Uint32 m_varBytes;
Uint8* m_buf;
Uint32 m_bufMaxLen;
- Endian::Value m_endian; // Native until finalize()
+ Endian::Value m_endian;
// iterator on items added
Iter m_iter;
};
@@ -685,7 +689,7 @@ NdbPack::Data::Data(const Spec& spec, bo
{
m_buf = 0;
m_bufMaxLen = 0;
- m_endian = Endian::Native;
+ m_endian = Endian::get_endian();
}
inline void
@@ -704,7 +708,7 @@ NdbPack::Data::reset()
m_cnt = 0; // in DataC
const Uint32 bytes = m_varBytes + m_spec.get_nullmask_len(m_allNullable);
memset(m_buf, 0, bytes);
- m_endian = Endian::Native;
+ m_endian = Endian::get_endian();
m_iter.reset();
}
@@ -713,17 +717,14 @@ NdbPack::Data::finalize()
{
if (m_varBytes == 0 ||
finalize_impl() == 0)
- {
- m_endian = Endian::get_endian();
return 0;
- }
return -1;
}
inline int
NdbPack::Data::convert(Endian::Value to_endian)
{
- if (unlikely(to_endian == Endian::Native))
+ if (to_endian == Endian::Native)
to_endian = Endian::get_endian();
if (m_endian == to_endian)
return 0;
=== modified file 'storage/ndb/src/common/util/NdbPack.cpp'
--- a/storage/ndb/src/common/util/NdbPack.cpp 2011-05-09 15:35:25 +0000
+++ b/storage/ndb/src/common/util/NdbPack.cpp 2011-08-09 15:37:45 +0000
@@ -560,8 +560,11 @@ NdbPack::Data::finalize_impl()
}
int
-NdbPack::Data::desc_all(Uint32 cnt)
+NdbPack::Data::desc_all(Uint32 cnt, Endian::Value from_endian)
{
+ if (from_endian == NdbPack::Endian::Native)
+ from_endian = NdbPack::Endian::get_endian();
+ m_endian = from_endian;
assert(m_cnt == 0); // reset() would destroy nullmask
for (Uint32 i = 0; i < cnt; i++)
{
@@ -757,7 +760,7 @@ NdbPack::Spec::print(char* buf, Uint32 b
// print DataC
-bool g_ndb_pack_print_hex_always = false;
+bool g_ndb_pack_print_hex_always = true;
NdbOut&
operator<<(NdbOut& out, const NdbPack::DataC& a)
@@ -1836,7 +1839,7 @@ testdesc(const Tdata& tdata)
Uint8 buf_new[Tspec::MaxBuf];
data_new.set_buf(buf_new, sizeof(buf_new));
memcpy(buf_new, buf_old, fullLen);
- chk2(data_new.desc_all(cnt) == 0, data_new);
+ chk2(data_new.desc_all(cnt, NdbPack::Endian::Native) == 0, data_new);
chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
chk1(data_new.get_data_len() == data.get_data_len());
chk1(data_new.get_cnt() == data.get_cnt());
@@ -1903,28 +1906,17 @@ testconvert(const Tdata& tdata)
require(data.get_cnt() == data_new.get_cnt());
const Uint32 cnt = tdata.m_cnt;
Uint32 num_eq;
- switch (NdbPack::Endian::get_endian()) {
- case NdbPack::Endian::Little:
- chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
- num_eq = ~(Uint32)0;
- chk1(data.cmp(data_new, cnt, num_eq) == 0);
- chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
- chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
- num_eq = ~(Uint32)0;
- chk1(data.cmp(data_new, cnt, num_eq) == 0);
- break;
- case NdbPack::Endian::Big:
- chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
- num_eq = ~(Uint32)0;
- chk1(data.cmp(data_new, cnt, num_eq) == 0);
- chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
- chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
- num_eq = ~(Uint32)0;
- chk1(data.cmp(data_new, cnt, num_eq) == 0);
- break;
- default:
- require(false);
- break;
+ int i;
+ for (i = 0; i < 10; i++) {
+ int k = getrandom(3); // assumes Endian::Value 0,1,2
+ NdbPack::Endian::Value v = (NdbPack::Endian::Value)k;
+ chk2(data_new.convert(v) == 0, data_new);
+ if (v == NdbPack::Endian::Native ||
+ v == NdbPack::Endian::get_endian()) {
+ num_eq = ~(Uint32)0;
+ chk1(data.cmp(data_new, cnt, num_eq) == 0);
+ require(num_eq == cnt);
+ }
}
}
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-07-31 17:40:01 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-08-09 15:37:45 +0000
@@ -742,7 +742,7 @@ NdbIndexStatImpl::set_index(const NdbDic
// rir + rpk
if (m_valueSpec.add(type, m_valueAttrs) == -1)
{
- setError(InternalError, __LINE__);
+ setError(InternalError, __LINE__, m_valueSpec.get_error_code());
return -1;
}
}
@@ -1153,15 +1153,32 @@ NdbIndexStatImpl::read_next(Con& con)
setError(con, __LINE__);
return ret;
}
- // create consistent NdbPack::Data instances
- if (m_keyData.desc_all(m_keyAttrs) == -1)
+
+ /*
+ * Key and value are raw data and little-endian. Create the complete
+ * NdbPack::Data instance and convert it to native-endian.
+ */
+ const NdbPack::Endian::Value from_endian = NdbPack::Endian::Little;
+ const NdbPack::Endian::Value to_endian = NdbPack::Endian::Native;
+
+ if (m_keyData.desc_all(m_keyAttrs, from_endian) == -1)
{
- setError(InternalError, __LINE__);
+ setError(InternalError, __LINE__, m_keyData.get_error_code());
return -1;
}
- if (m_valueData.desc_all(m_valueAttrs) == -1)
+ if (m_keyData.convert(to_endian) == -1)
{
- setError(InternalError, __LINE__);
+ setError(InternalError, __LINE__, m_keyData.get_error_code());
+ return -1;
+ }
+ if (m_valueData.desc_all(m_valueAttrs, from_endian) == -1)
+ {
+ setError(InternalError, __LINE__, m_valueData.get_error_code());
+ return -1;
+ }
+ if (m_valueData.convert(to_endian) == -1)
+ {
+ setError(InternalError, __LINE__, m_valueData.get_error_code());
return -1;
}
return 0;
@@ -1856,7 +1873,7 @@ NdbIndexStatImpl::convert_range(Range& r
}
if (bound.m_data.add(data, &len_out) == -1)
{
- setError(InternalError, __LINE__);
+ setError(InternalError, __LINE__, bound.m_data.get_error_code());
return -1;
}
}
@@ -1864,7 +1881,7 @@ NdbIndexStatImpl::convert_range(Range& r
{
if (bound.m_data.add_null(&len_out) == -1)
{
- setError(InternalError, __LINE__);
+ setError(InternalError, __LINE__, bound.m_data.get_error_code());
return -1;
}
}
=== modified file 'storage/ndb/test/ndbapi/testIndexStat.cpp'
--- a/storage/ndb/test/ndbapi/testIndexStat.cpp 2011-07-04 13:37:56 +0000
+++ b/storage/ndb/test/ndbapi/testIndexStat.cpp 2011-08-09 15:36:25 +0000
@@ -110,7 +110,8 @@ static const NdbRecord* g_ind_rec = 0;
struct my_record
{
- Uint32 m_null_bm;
+ Uint8 m_null_bm;
+ Uint8 fill[3];
Uint32 m_a;
Uint32 m_b;
char m_c[1+g_charlen];
@@ -376,6 +377,7 @@ struct Val {
void copy(const Val& val2);
void make(uint numattrs, const Lim& lim);
int cmp(const Val& val2, uint numattrs = g_numattrs, uint* num_eq = 0) const;
+ void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
private:
Val& operator=(const Val&);
@@ -556,6 +558,40 @@ Val::cmp(const Val& val2, uint numattrs,
return k;
}
+void
+Val::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
+{
+ const char* key = (j == 0 ? ib.low_key : ib.high_key);
+ const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
+ const Uint8 nullbits = *(const Uint8*)key;
+ require(numattrs <= g_numattrs);
+ if (numattrs >= 1) {
+ if (nullbits & (1 << g_ndbrec_b_nb_offset))
+ b_null = 1;
+ else {
+ memcpy(&b, &key[g_ndbrec_b_offset], sizeof(b));
+ b_null = 0;
+ }
+ }
+ if (numattrs >= 2) {
+ if (nullbits & (1 << g_ndbrec_c_nb_offset))
+ c_null = 1;
+ else {
+ memcpy(c, &key[g_ndbrec_c_offset], sizeof(c));
+ c_null = 0;
+ }
+ }
+ if (numattrs >= 3) {
+ if (nullbits & (1 << g_ndbrec_d_nb_offset))
+ d_null = 1;
+ else {
+ memcpy(&d, &key[g_ndbrec_d_offset], sizeof(d));
+ d_null = 0;
+ }
+ }
+ m_numattrs = numattrs;
+}
+
// index keys
struct Key {
@@ -844,7 +880,9 @@ struct Bnd {
Bnd& make(uint minattrs);
Bnd& make(uint minattrs, const Val& theval);
int cmp(const Key& key) const;
+ int cmp(const Bnd& bnd2);
int type(uint colno) const; // for setBound
+ void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
private:
Bnd& operator=(const Bnd&);
@@ -937,6 +975,52 @@ Bnd::cmp(const Key& key) const
}
int
+Bnd::cmp(const Bnd& bnd2)
+{
+ int place; // debug
+ int ret;
+ const Bnd& bnd1 = *this;
+ const Val& val1 = bnd1.m_val;
+ const Val& val2 = bnd2.m_val;
+ const uint numattrs1 = val1.m_numattrs;
+ const uint numattrs2 = val2.m_numattrs;
+ const uint n = (numattrs1 < numattrs2 ? numattrs1 : numattrs2);
+ do {
+ int k = val1.cmp(val2, n);
+ if (k != 0) {
+ place = 1;
+ ret = k;
+ break;
+ }
+ if (numattrs1 < numattrs2) {
+ place = 2;
+ ret = (+1) * bnd1.m_side;
+ break;
+ }
+ if (numattrs1 > numattrs2) {
+ place = 3;
+ ret = (-1) * bnd1.m_side;
+ break;
+ }
+ if (bnd1.m_side < bnd2.m_side) {
+ place = 4;
+ ret = -1;
+ break;
+ }
+ if (bnd1.m_side > bnd2.m_side) {
+ place = 5;
+ ret = +1;
+ break;
+ }
+ place = 6;
+ ret = 0;
+ } while (0);
+ ll3("bnd: " << *this << " cmp bnd: " << bnd2
+ << " ret: " << ret << " place: " << place);
+ return ret;
+}
+
+int
Bnd::type(uint colno) const
{
int t;
@@ -960,6 +1044,21 @@ Bnd::type(uint colno) const
return t;
}
+void
+Bnd::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
+{
+ Val& val = m_val;
+ val.fromib(ib, j);
+ const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
+ const bool inclusive = (j == 0 ? ib.low_inclusive : ib.high_inclusive);
+ if (numattrs == 0) {
+ m_side = 0;
+ } else {
+ m_side = (j == 0 ? (inclusive ? -1 : +1) : (inclusive ? +1 : -1));
+ }
+ m_lohi = j;
+}
+
// stats values
struct Stval {
@@ -1016,6 +1115,7 @@ struct Rng {
void copy(const Rng& rng2);
int cmp(const Key& key) const; // -1,0,+1 = key is before,in,after range
uint rowcount() const;
+ void fromib(const NdbIndexScanOperation::IndexBound& ib);
private:
Rng& operator=(const Rng&);
@@ -1164,6 +1264,15 @@ Rng::rowcount() const
return count;
}
+void
+Rng::fromib(const NdbIndexScanOperation::IndexBound& ib)
+{
+ for (uint j = 0; j <= 1; j++) {
+ Bnd& bnd = m_bnd[j];
+ bnd.fromib(ib, j);
+ }
+}
+
static Rng* g_rnglist = 0;
static void
@@ -1475,7 +1584,8 @@ queryscan(Rng& rng)
*/
static int
initialiseIndexBound(const Rng& rng,
- NdbIndexScanOperation::IndexBound& ib)
+ NdbIndexScanOperation::IndexBound& ib,
+ my_record* low_key, my_record* high_key)
{
ll3("initialiseIndexBound: " << rng);
uint i;
@@ -1483,9 +1593,13 @@ initialiseIndexBound(const Rng& rng,
Uint32 colsInBound[2]= {0, 0};
bool boundInclusive[2]= {false, false};
+ memset(&ib, 0xf1, sizeof(ib));
+ memset(low_key, 0xf2, sizeof(*low_key));
+ memset(high_key, 0xf3, sizeof(*high_key));
+
// Clear nullbit storage
- *((char *)ib.low_key) =
- *((char *)ib.high_key) = 0;
+ low_key->m_null_bm = 0;
+ high_key->m_null_bm = 0;
for (i = 0; i < g_numattrs; i++) {
const Uint32 no = i; // index attribute number
@@ -1499,7 +1613,7 @@ initialiseIndexBound(const Rng& rng,
}
for (j = 0; j <= 1; j++) {
/* Get ptr to key storage space for this bound */
- my_record* keyBuf= (my_record *)( (j==0) ? ib.low_key : ib.high_key);
+ my_record* keyBuf= (j==0) ? low_key : high_key;
int t = type[j];
if (t == -1)
continue;
@@ -1542,8 +1656,10 @@ initialiseIndexBound(const Rng& rng,
}
/* Now have everything we need to initialise the IndexBound */
+ ib.low_key = (char*)low_key;
ib.low_key_count= colsInBound[0];
ib.low_inclusive= boundInclusive[0];
+ ib.high_key = (char*)high_key;
ib.high_key_count= colsInBound[1];
ib.high_inclusive= boundInclusive[1];
ib.range_no= 0;
@@ -1554,11 +1670,18 @@ initialiseIndexBound(const Rng& rng,
" high_inc=" << ib.high_inclusive);
ll3(" low bound b=" << *((Uint32*) &ib.low_key[g_ndbrec_b_offset]) <<
" d=" << *((Uint16*) &ib.low_key[g_ndbrec_d_offset]) <<
- " first byte=%xu" << ib.low_key[0]);
+ " first byte=" << ib.low_key[0]);
ll3(" high bound b=" << *((Uint32*) &ib.high_key[g_ndbrec_b_offset]) <<
" d=" << *((Uint16*) &ib.high_key[g_ndbrec_d_offset]) <<
- " first byte=%xu" << ib.high_key[0]);
+ " first byte=" << ib.high_key[0]);
+ // verify by reverse
+ {
+ Rng rng;
+ rng.fromib(ib);
+ require(rng.m_bnd[0].cmp(bnd[0]) == 0);
+ require(rng.m_bnd[1].cmp(bnd[1]) == 0);
+ }
return 0;
}
@@ -1568,13 +1691,12 @@ querystat_v2(Rng& rng)
ll3("querystat_v2");
/* Create IndexBound and key storage space */
- char keySpace[2][g_ndbrecord_bytes];
NdbIndexScanOperation::IndexBound ib;
- ib.low_key= keySpace[0];
- ib.high_key= keySpace[1];
+ my_record low_key;
+ my_record high_key;
chkdb((g_con = g_ndb->startTransaction()) != 0);
- chkrc(initialiseIndexBound(rng, ib) == 0);
+ chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
Uint64 count = ~(Uint64)0;
chkdb(g_is->records_in_range(g_ind,
@@ -1610,10 +1732,9 @@ querystat(Rng& rng)
// convert to IndexBound (like in mysqld)
NdbIndexScanOperation::IndexBound ib;
- char keySpace[2][g_ndbrecord_bytes];
- ib.low_key = keySpace[0];
- ib.high_key = keySpace[1];
- chkrc(initialiseIndexBound(rng, ib) == 0);
+ my_record low_key;
+ my_record high_key;
+ chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
chkrc(g_is->convert_range(range, g_ind_rec, &ib) == 0);
// index stat query
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0-wl4124-new1 branch (pekka.nousiainen:4419to 4421) WL#4124 | Pekka Nousiainen | 10 Aug |