From: Pekka Nousiainen Date: August 9 2011 3:39pm Subject: bzr push into mysql-5.1-telco-7.0-wl4124-new1 branch (pekka.nousiainen:4419 to 4421) WL#4124 List-Archive: http://lists.mysql.com/commits/140555 Message-Id: <20110809153952.6BA7455875@sama.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4421 Pekka Nousiainen 2011-08-09 wl#4124 x18_fix.diff bugfix: index stats on big-endian modified: storage/ndb/include/util/NdbPack.hpp storage/ndb/src/common/util/NdbPack.cpp storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 4420 Pekka Nousiainen 2011-08-09 wl#4124 x17_fix.diff testIndexStat: NdbRecord nullbits, align, const modified: storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp storage/ndb/test/ndbapi/testIndexStat.cpp 4419 Pekka Nousiainen 2011-07-31 wl#4124 x16_fix.diff small changes inspired by sol10 modified: storage/ndb/include/ndbapi/NdbIndexStat.hpp storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp storage/ndb/tools/ndb_index_stat.cpp === modified file 'storage/ndb/include/util/NdbPack.hpp' --- a/storage/ndb/include/util/NdbPack.hpp 2011-05-04 09:44:18 +0000 +++ b/storage/ndb/include/util/NdbPack.hpp 2011-08-09 15:37:45 +0000 @@ -117,7 +117,7 @@ public: class Endian { public: enum Value { - Native = 0, + Native = 0, // replaced by actual value Little = 1, Big = 2 }; @@ -300,6 +300,10 @@ public: * Instance of an array of data values. The values are packed into * a byte buffer. The buffer is also maintained as a single varbinary * value if non-zero var bytes (length bytes) is specified. + * + * Data instances can be received from another source (such as table + * in database) and may not be native-endian. Such instances must + * first be completed with desc_all() and convert(). */ class Data : public DataC { public: @@ -324,7 +328,7 @@ public: // convert endian int convert(Endian::Value to_endian); // create complete instance from buffer contents - int desc_all(Uint32 cnt); + int desc_all(Uint32 cnt, Endian::Value from_endian); // getters Uint32 get_max_len() const; Uint32 get_max_len4() const; @@ -351,7 +355,7 @@ public: const Uint32 m_varBytes; Uint8* m_buf; Uint32 m_bufMaxLen; - Endian::Value m_endian; // Native until finalize() + Endian::Value m_endian; // iterator on items added Iter m_iter; }; @@ -685,7 +689,7 @@ NdbPack::Data::Data(const Spec& spec, bo { m_buf = 0; m_bufMaxLen = 0; - m_endian = Endian::Native; + m_endian = Endian::get_endian(); } inline void @@ -704,7 +708,7 @@ NdbPack::Data::reset() m_cnt = 0; // in DataC const Uint32 bytes = m_varBytes + m_spec.get_nullmask_len(m_allNullable); memset(m_buf, 0, bytes); - m_endian = Endian::Native; + m_endian = Endian::get_endian(); m_iter.reset(); } @@ -713,17 +717,14 @@ NdbPack::Data::finalize() { if (m_varBytes == 0 || finalize_impl() == 0) - { - m_endian = Endian::get_endian(); return 0; - } return -1; } inline int NdbPack::Data::convert(Endian::Value to_endian) { - if (unlikely(to_endian == Endian::Native)) + if (to_endian == Endian::Native) to_endian = Endian::get_endian(); if (m_endian == to_endian) return 0; === modified file 'storage/ndb/src/common/util/NdbPack.cpp' --- a/storage/ndb/src/common/util/NdbPack.cpp 2011-05-09 15:35:25 +0000 +++ b/storage/ndb/src/common/util/NdbPack.cpp 2011-08-09 15:37:45 +0000 @@ -560,8 +560,11 @@ NdbPack::Data::finalize_impl() } int -NdbPack::Data::desc_all(Uint32 cnt) +NdbPack::Data::desc_all(Uint32 cnt, Endian::Value from_endian) { + if (from_endian == NdbPack::Endian::Native) + from_endian = NdbPack::Endian::get_endian(); + m_endian = from_endian; assert(m_cnt == 0); // reset() would destroy nullmask for (Uint32 i = 0; i < cnt; i++) { @@ -757,7 +760,7 @@ NdbPack::Spec::print(char* buf, Uint32 b // print DataC -bool g_ndb_pack_print_hex_always = false; +bool g_ndb_pack_print_hex_always = true; NdbOut& operator<<(NdbOut& out, const NdbPack::DataC& a) @@ -1836,7 +1839,7 @@ testdesc(const Tdata& tdata) Uint8 buf_new[Tspec::MaxBuf]; data_new.set_buf(buf_new, sizeof(buf_new)); memcpy(buf_new, buf_old, fullLen); - chk2(data_new.desc_all(cnt) == 0, data_new); + chk2(data_new.desc_all(cnt, NdbPack::Endian::Native) == 0, data_new); chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0); chk1(data_new.get_data_len() == data.get_data_len()); chk1(data_new.get_cnt() == data.get_cnt()); @@ -1903,28 +1906,17 @@ testconvert(const Tdata& tdata) require(data.get_cnt() == data_new.get_cnt()); const Uint32 cnt = tdata.m_cnt; Uint32 num_eq; - switch (NdbPack::Endian::get_endian()) { - case NdbPack::Endian::Little: - chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new); - num_eq = ~(Uint32)0; - chk1(data.cmp(data_new, cnt, num_eq) == 0); - chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new); - chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new); - num_eq = ~(Uint32)0; - chk1(data.cmp(data_new, cnt, num_eq) == 0); - break; - case NdbPack::Endian::Big: - chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new); - num_eq = ~(Uint32)0; - chk1(data.cmp(data_new, cnt, num_eq) == 0); - chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new); - chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new); - num_eq = ~(Uint32)0; - chk1(data.cmp(data_new, cnt, num_eq) == 0); - break; - default: - require(false); - break; + int i; + for (i = 0; i < 10; i++) { + int k = getrandom(3); // assumes Endian::Value 0,1,2 + NdbPack::Endian::Value v = (NdbPack::Endian::Value)k; + chk2(data_new.convert(v) == 0, data_new); + if (v == NdbPack::Endian::Native || + v == NdbPack::Endian::get_endian()) { + num_eq = ~(Uint32)0; + chk1(data.cmp(data_new, cnt, num_eq) == 0); + require(num_eq == cnt); + } } } === modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp' --- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-07-31 17:40:01 +0000 +++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp 2011-08-09 15:37:45 +0000 @@ -742,7 +742,7 @@ NdbIndexStatImpl::set_index(const NdbDic // rir + rpk if (m_valueSpec.add(type, m_valueAttrs) == -1) { - setError(InternalError, __LINE__); + setError(InternalError, __LINE__, m_valueSpec.get_error_code()); return -1; } } @@ -1153,15 +1153,32 @@ NdbIndexStatImpl::read_next(Con& con) setError(con, __LINE__); return ret; } - // create consistent NdbPack::Data instances - if (m_keyData.desc_all(m_keyAttrs) == -1) + + /* + * Key and value are raw data and little-endian. Create the complete + * NdbPack::Data instance and convert it to native-endian. + */ + const NdbPack::Endian::Value from_endian = NdbPack::Endian::Little; + const NdbPack::Endian::Value to_endian = NdbPack::Endian::Native; + + if (m_keyData.desc_all(m_keyAttrs, from_endian) == -1) { - setError(InternalError, __LINE__); + setError(InternalError, __LINE__, m_keyData.get_error_code()); return -1; } - if (m_valueData.desc_all(m_valueAttrs) == -1) + if (m_keyData.convert(to_endian) == -1) { - setError(InternalError, __LINE__); + setError(InternalError, __LINE__, m_keyData.get_error_code()); + return -1; + } + if (m_valueData.desc_all(m_valueAttrs, from_endian) == -1) + { + setError(InternalError, __LINE__, m_valueData.get_error_code()); + return -1; + } + if (m_valueData.convert(to_endian) == -1) + { + setError(InternalError, __LINE__, m_valueData.get_error_code()); return -1; } return 0; @@ -1856,7 +1873,7 @@ NdbIndexStatImpl::convert_range(Range& r } if (bound.m_data.add(data, &len_out) == -1) { - setError(InternalError, __LINE__); + setError(InternalError, __LINE__, bound.m_data.get_error_code()); return -1; } } @@ -1864,7 +1881,7 @@ NdbIndexStatImpl::convert_range(Range& r { if (bound.m_data.add_null(&len_out) == -1) { - setError(InternalError, __LINE__); + setError(InternalError, __LINE__, bound.m_data.get_error_code()); return -1; } } === modified file 'storage/ndb/test/ndbapi/testIndexStat.cpp' --- a/storage/ndb/test/ndbapi/testIndexStat.cpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/test/ndbapi/testIndexStat.cpp 2011-08-09 15:36:25 +0000 @@ -110,7 +110,8 @@ static const NdbRecord* g_ind_rec = 0; struct my_record { - Uint32 m_null_bm; + Uint8 m_null_bm; + Uint8 fill[3]; Uint32 m_a; Uint32 m_b; char m_c[1+g_charlen]; @@ -376,6 +377,7 @@ struct Val { void copy(const Val& val2); void make(uint numattrs, const Lim& lim); int cmp(const Val& val2, uint numattrs = g_numattrs, uint* num_eq = 0) const; + void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j); private: Val& operator=(const Val&); @@ -556,6 +558,40 @@ Val::cmp(const Val& val2, uint numattrs, return k; } +void +Val::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j) +{ + const char* key = (j == 0 ? ib.low_key : ib.high_key); + const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count); + const Uint8 nullbits = *(const Uint8*)key; + require(numattrs <= g_numattrs); + if (numattrs >= 1) { + if (nullbits & (1 << g_ndbrec_b_nb_offset)) + b_null = 1; + else { + memcpy(&b, &key[g_ndbrec_b_offset], sizeof(b)); + b_null = 0; + } + } + if (numattrs >= 2) { + if (nullbits & (1 << g_ndbrec_c_nb_offset)) + c_null = 1; + else { + memcpy(c, &key[g_ndbrec_c_offset], sizeof(c)); + c_null = 0; + } + } + if (numattrs >= 3) { + if (nullbits & (1 << g_ndbrec_d_nb_offset)) + d_null = 1; + else { + memcpy(&d, &key[g_ndbrec_d_offset], sizeof(d)); + d_null = 0; + } + } + m_numattrs = numattrs; +} + // index keys struct Key { @@ -844,7 +880,9 @@ struct Bnd { Bnd& make(uint minattrs); Bnd& make(uint minattrs, const Val& theval); int cmp(const Key& key) const; + int cmp(const Bnd& bnd2); int type(uint colno) const; // for setBound + void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j); private: Bnd& operator=(const Bnd&); @@ -937,6 +975,52 @@ Bnd::cmp(const Key& key) const } int +Bnd::cmp(const Bnd& bnd2) +{ + int place; // debug + int ret; + const Bnd& bnd1 = *this; + const Val& val1 = bnd1.m_val; + const Val& val2 = bnd2.m_val; + const uint numattrs1 = val1.m_numattrs; + const uint numattrs2 = val2.m_numattrs; + const uint n = (numattrs1 < numattrs2 ? numattrs1 : numattrs2); + do { + int k = val1.cmp(val2, n); + if (k != 0) { + place = 1; + ret = k; + break; + } + if (numattrs1 < numattrs2) { + place = 2; + ret = (+1) * bnd1.m_side; + break; + } + if (numattrs1 > numattrs2) { + place = 3; + ret = (-1) * bnd1.m_side; + break; + } + if (bnd1.m_side < bnd2.m_side) { + place = 4; + ret = -1; + break; + } + if (bnd1.m_side > bnd2.m_side) { + place = 5; + ret = +1; + break; + } + place = 6; + ret = 0; + } while (0); + ll3("bnd: " << *this << " cmp bnd: " << bnd2 + << " ret: " << ret << " place: " << place); + return ret; +} + +int Bnd::type(uint colno) const { int t; @@ -960,6 +1044,21 @@ Bnd::type(uint colno) const return t; } +void +Bnd::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j) +{ + Val& val = m_val; + val.fromib(ib, j); + const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count); + const bool inclusive = (j == 0 ? ib.low_inclusive : ib.high_inclusive); + if (numattrs == 0) { + m_side = 0; + } else { + m_side = (j == 0 ? (inclusive ? -1 : +1) : (inclusive ? +1 : -1)); + } + m_lohi = j; +} + // stats values struct Stval { @@ -1016,6 +1115,7 @@ struct Rng { void copy(const Rng& rng2); int cmp(const Key& key) const; // -1,0,+1 = key is before,in,after range uint rowcount() const; + void fromib(const NdbIndexScanOperation::IndexBound& ib); private: Rng& operator=(const Rng&); @@ -1164,6 +1264,15 @@ Rng::rowcount() const return count; } +void +Rng::fromib(const NdbIndexScanOperation::IndexBound& ib) +{ + for (uint j = 0; j <= 1; j++) { + Bnd& bnd = m_bnd[j]; + bnd.fromib(ib, j); + } +} + static Rng* g_rnglist = 0; static void @@ -1475,7 +1584,8 @@ queryscan(Rng& rng) */ static int initialiseIndexBound(const Rng& rng, - NdbIndexScanOperation::IndexBound& ib) + NdbIndexScanOperation::IndexBound& ib, + my_record* low_key, my_record* high_key) { ll3("initialiseIndexBound: " << rng); uint i; @@ -1483,9 +1593,13 @@ initialiseIndexBound(const Rng& rng, Uint32 colsInBound[2]= {0, 0}; bool boundInclusive[2]= {false, false}; + memset(&ib, 0xf1, sizeof(ib)); + memset(low_key, 0xf2, sizeof(*low_key)); + memset(high_key, 0xf3, sizeof(*high_key)); + // Clear nullbit storage - *((char *)ib.low_key) = - *((char *)ib.high_key) = 0; + low_key->m_null_bm = 0; + high_key->m_null_bm = 0; for (i = 0; i < g_numattrs; i++) { const Uint32 no = i; // index attribute number @@ -1499,7 +1613,7 @@ initialiseIndexBound(const Rng& rng, } for (j = 0; j <= 1; j++) { /* Get ptr to key storage space for this bound */ - my_record* keyBuf= (my_record *)( (j==0) ? ib.low_key : ib.high_key); + my_record* keyBuf= (j==0) ? low_key : high_key; int t = type[j]; if (t == -1) continue; @@ -1542,8 +1656,10 @@ initialiseIndexBound(const Rng& rng, } /* Now have everything we need to initialise the IndexBound */ + ib.low_key = (char*)low_key; ib.low_key_count= colsInBound[0]; ib.low_inclusive= boundInclusive[0]; + ib.high_key = (char*)high_key; ib.high_key_count= colsInBound[1]; ib.high_inclusive= boundInclusive[1]; ib.range_no= 0; @@ -1554,11 +1670,18 @@ initialiseIndexBound(const Rng& rng, " high_inc=" << ib.high_inclusive); ll3(" low bound b=" << *((Uint32*) &ib.low_key[g_ndbrec_b_offset]) << " d=" << *((Uint16*) &ib.low_key[g_ndbrec_d_offset]) << - " first byte=%xu" << ib.low_key[0]); + " first byte=" << ib.low_key[0]); ll3(" high bound b=" << *((Uint32*) &ib.high_key[g_ndbrec_b_offset]) << " d=" << *((Uint16*) &ib.high_key[g_ndbrec_d_offset]) << - " first byte=%xu" << ib.high_key[0]); + " first byte=" << ib.high_key[0]); + // verify by reverse + { + Rng rng; + rng.fromib(ib); + require(rng.m_bnd[0].cmp(bnd[0]) == 0); + require(rng.m_bnd[1].cmp(bnd[1]) == 0); + } return 0; } @@ -1568,13 +1691,12 @@ querystat_v2(Rng& rng) ll3("querystat_v2"); /* Create IndexBound and key storage space */ - char keySpace[2][g_ndbrecord_bytes]; NdbIndexScanOperation::IndexBound ib; - ib.low_key= keySpace[0]; - ib.high_key= keySpace[1]; + my_record low_key; + my_record high_key; chkdb((g_con = g_ndb->startTransaction()) != 0); - chkrc(initialiseIndexBound(rng, ib) == 0); + chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0); Uint64 count = ~(Uint64)0; chkdb(g_is->records_in_range(g_ind, @@ -1610,10 +1732,9 @@ querystat(Rng& rng) // convert to IndexBound (like in mysqld) NdbIndexScanOperation::IndexBound ib; - char keySpace[2][g_ndbrecord_bytes]; - ib.low_key = keySpace[0]; - ib.high_key = keySpace[1]; - chkrc(initialiseIndexBound(rng, ib) == 0); + my_record low_key; + my_record high_key; + chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0); chkrc(g_is->convert_range(range, g_ind_rec, &ib) == 0); // index stat query No bundle (reason: useless for push emails).