List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:August 9 2011 3:39pm
Subject:bzr push into mysql-5.1-telco-7.0-wl4124-new1 branch (pekka.nousiainen:4419
to 4421) WL#4124
View as plain text  
 4421 Pekka Nousiainen	2011-08-09
      wl#4124 x18_fix.diff
      bugfix: index stats on big-endian

    modified:
      storage/ndb/include/util/NdbPack.hpp
      storage/ndb/src/common/util/NdbPack.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
 4420 Pekka Nousiainen	2011-08-09
      wl#4124 x17_fix.diff
      testIndexStat: NdbRecord nullbits, align, const

    modified:
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/test/ndbapi/testIndexStat.cpp
 4419 Pekka Nousiainen	2011-07-31
      wl#4124 x16_fix.diff
      small changes inspired by sol10

    modified:
      storage/ndb/include/ndbapi/NdbIndexStat.hpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp
      storage/ndb/tools/ndb_index_stat.cpp
=== modified file 'storage/ndb/include/util/NdbPack.hpp'
--- a/storage/ndb/include/util/NdbPack.hpp	2011-05-04 09:44:18 +0000
+++ b/storage/ndb/include/util/NdbPack.hpp	2011-08-09 15:37:45 +0000
@@ -117,7 +117,7 @@ public:
   class Endian {
   public:
     enum Value {
-      Native = 0,
+      Native = 0, // replaced by actual value
       Little = 1,
       Big = 2
     };
@@ -300,6 +300,10 @@ public:
    * Instance of an array of data values.  The values are packed into
    * a byte buffer.  The buffer is also maintained as a single varbinary
    * value if non-zero var bytes (length bytes) is specified.
+   *
+   * Data instances can be received from another source (such as table
+   * in database) and may not be native-endian.  Such instances must
+   * first be completed with desc_all() and convert().
    */
   class Data : public DataC {
   public:
@@ -324,7 +328,7 @@ public:
     // convert endian
     int convert(Endian::Value to_endian);
     // create complete instance from buffer contents
-    int desc_all(Uint32 cnt);
+    int desc_all(Uint32 cnt, Endian::Value from_endian);
     // getters
     Uint32 get_max_len() const;
     Uint32 get_max_len4() const;
@@ -351,7 +355,7 @@ public:
     const Uint32 m_varBytes;
     Uint8* m_buf;
     Uint32 m_bufMaxLen;
-    Endian::Value m_endian; // Native until finalize()
+    Endian::Value m_endian;
     // iterator on items added
     Iter m_iter;
   };
@@ -685,7 +689,7 @@ NdbPack::Data::Data(const Spec& spec, bo
 {
   m_buf = 0;
   m_bufMaxLen = 0;
-  m_endian = Endian::Native;
+  m_endian = Endian::get_endian();
 }
 
 inline void
@@ -704,7 +708,7 @@ NdbPack::Data::reset()
   m_cnt = 0;  // in DataC
   const Uint32 bytes = m_varBytes + m_spec.get_nullmask_len(m_allNullable);
   memset(m_buf, 0, bytes);
-  m_endian = Endian::Native;
+  m_endian = Endian::get_endian();
   m_iter.reset();
 }
 
@@ -713,17 +717,14 @@ NdbPack::Data::finalize()
 {
   if (m_varBytes == 0 ||
       finalize_impl() == 0)
-  {
-    m_endian = Endian::get_endian();
     return 0;
-  }
   return -1;
 }
 
 inline int
 NdbPack::Data::convert(Endian::Value to_endian)
 {
-  if (unlikely(to_endian == Endian::Native))
+  if (to_endian == Endian::Native)
     to_endian = Endian::get_endian();
   if (m_endian == to_endian)
     return 0;

=== modified file 'storage/ndb/src/common/util/NdbPack.cpp'
--- a/storage/ndb/src/common/util/NdbPack.cpp	2011-05-09 15:35:25 +0000
+++ b/storage/ndb/src/common/util/NdbPack.cpp	2011-08-09 15:37:45 +0000
@@ -560,8 +560,11 @@ NdbPack::Data::finalize_impl()
 }
 
 int
-NdbPack::Data::desc_all(Uint32 cnt)
+NdbPack::Data::desc_all(Uint32 cnt, Endian::Value from_endian)
 {
+  if (from_endian == NdbPack::Endian::Native)
+    from_endian = NdbPack::Endian::get_endian();
+  m_endian = from_endian;
   assert(m_cnt == 0); // reset() would destroy nullmask
   for (Uint32 i = 0; i < cnt; i++)
   {
@@ -757,7 +760,7 @@ NdbPack::Spec::print(char* buf, Uint32 b
 
 // print DataC
 
-bool g_ndb_pack_print_hex_always = false;
+bool g_ndb_pack_print_hex_always = true;
 
 NdbOut&
 operator<<(NdbOut& out, const NdbPack::DataC& a)
@@ -1836,7 +1839,7 @@ testdesc(const Tdata& tdata)
   Uint8 buf_new[Tspec::MaxBuf];
   data_new.set_buf(buf_new, sizeof(buf_new));
   memcpy(buf_new, buf_old, fullLen);
-  chk2(data_new.desc_all(cnt) == 0, data_new);
+  chk2(data_new.desc_all(cnt, NdbPack::Endian::Native) == 0, data_new);
   chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
   chk1(data_new.get_data_len() == data.get_data_len());
   chk1(data_new.get_cnt() == data.get_cnt());
@@ -1903,28 +1906,17 @@ testconvert(const Tdata& tdata)
   require(data.get_cnt() == data_new.get_cnt());
   const Uint32 cnt = tdata.m_cnt;
   Uint32 num_eq;
-  switch (NdbPack::Endian::get_endian()) {
-  case NdbPack::Endian::Little:
-    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
-    num_eq = ~(Uint32)0;
-    chk1(data.cmp(data_new, cnt, num_eq) == 0);
-    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
-    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
-    num_eq = ~(Uint32)0;
-    chk1(data.cmp(data_new, cnt, num_eq) == 0);
-    break;
-  case NdbPack::Endian::Big:
-    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
-    num_eq = ~(Uint32)0;
-    chk1(data.cmp(data_new, cnt, num_eq) == 0);
-    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
-    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
-    num_eq = ~(Uint32)0;
-    chk1(data.cmp(data_new, cnt, num_eq) == 0);
-    break;
-  default:
-    require(false);
-    break;
+  int i;
+  for (i = 0; i < 10; i++) {
+    int k = getrandom(3); // assumes Endian::Value 0,1,2
+    NdbPack::Endian::Value v = (NdbPack::Endian::Value)k;
+    chk2(data_new.convert(v) == 0, data_new);
+    if (v == NdbPack::Endian::Native ||
+        v == NdbPack::Endian::get_endian()) {
+      num_eq = ~(Uint32)0;
+      chk1(data.cmp(data_new, cnt, num_eq) == 0);
+      require(num_eq == cnt);
+    }
   }
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-07-31 17:40:01 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-08-09 15:37:45 +0000
@@ -742,7 +742,7 @@ NdbIndexStatImpl::set_index(const NdbDic
     // rir + rpk
     if (m_valueSpec.add(type, m_valueAttrs) == -1)
     {
-      setError(InternalError, __LINE__);
+      setError(InternalError, __LINE__, m_valueSpec.get_error_code());
       return -1;
     }
   }
@@ -1153,15 +1153,32 @@ NdbIndexStatImpl::read_next(Con& con)
       setError(con, __LINE__);
     return ret;
   }
-  // create consistent NdbPack::Data instances
-  if (m_keyData.desc_all(m_keyAttrs) == -1)
+
+  /*
+   * Key and value are raw data and little-endian.  Create the complete
+   * NdbPack::Data instance and convert it to native-endian.
+   */
+  const NdbPack::Endian::Value from_endian = NdbPack::Endian::Little;
+  const NdbPack::Endian::Value to_endian = NdbPack::Endian::Native;
+
+  if (m_keyData.desc_all(m_keyAttrs, from_endian) == -1)
   {
-    setError(InternalError, __LINE__);
+    setError(InternalError, __LINE__, m_keyData.get_error_code());
     return -1;
   }
-  if (m_valueData.desc_all(m_valueAttrs) == -1)
+  if (m_keyData.convert(to_endian) == -1)
   {
-    setError(InternalError, __LINE__);
+    setError(InternalError, __LINE__, m_keyData.get_error_code());
+    return -1;
+  }
+  if (m_valueData.desc_all(m_valueAttrs, from_endian) == -1)
+  {
+    setError(InternalError, __LINE__, m_valueData.get_error_code());
+    return -1;
+  }
+  if (m_valueData.convert(to_endian) == -1)
+  {
+    setError(InternalError, __LINE__, m_valueData.get_error_code());
     return -1;
   }
   return 0;
@@ -1856,7 +1873,7 @@ NdbIndexStatImpl::convert_range(Range& r
         }
         if (bound.m_data.add(data, &len_out) == -1)
         {
-          setError(InternalError, __LINE__);
+          setError(InternalError, __LINE__, bound.m_data.get_error_code());
           return -1;
         }
       }
@@ -1864,7 +1881,7 @@ NdbIndexStatImpl::convert_range(Range& r
       {
         if (bound.m_data.add_null(&len_out) == -1)
         {
-          setError(InternalError, __LINE__);
+          setError(InternalError, __LINE__, bound.m_data.get_error_code());
           return -1;
         }
       }

=== modified file 'storage/ndb/test/ndbapi/testIndexStat.cpp'
--- a/storage/ndb/test/ndbapi/testIndexStat.cpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/test/ndbapi/testIndexStat.cpp	2011-08-09 15:36:25 +0000
@@ -110,7 +110,8 @@ static const NdbRecord* g_ind_rec = 0;
 
 struct my_record
 {
-  Uint32 m_null_bm;
+  Uint8 m_null_bm;
+  Uint8 fill[3];
   Uint32 m_a;
   Uint32 m_b;
   char m_c[1+g_charlen];
@@ -376,6 +377,7 @@ struct Val {
   void copy(const Val& val2);
   void make(uint numattrs, const Lim& lim);
   int cmp(const Val& val2, uint numattrs = g_numattrs, uint* num_eq = 0) const;
+  void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
 
 private:
   Val& operator=(const Val&);
@@ -556,6 +558,40 @@ Val::cmp(const Val& val2, uint numattrs,
   return k;
 }
 
+void
+Val::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
+{
+  const char* key = (j == 0 ? ib.low_key : ib.high_key);
+  const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
+  const Uint8 nullbits = *(const Uint8*)key;
+  require(numattrs <= g_numattrs);
+  if (numattrs >= 1) {
+    if (nullbits & (1 << g_ndbrec_b_nb_offset))
+      b_null = 1;
+    else {
+      memcpy(&b, &key[g_ndbrec_b_offset], sizeof(b));
+      b_null = 0;
+    }
+  }
+  if (numattrs >= 2) {
+    if (nullbits & (1 << g_ndbrec_c_nb_offset))
+      c_null = 1;
+    else {
+      memcpy(c, &key[g_ndbrec_c_offset], sizeof(c));
+      c_null = 0;
+    }
+  }
+  if (numattrs >= 3) {
+    if (nullbits & (1 << g_ndbrec_d_nb_offset))
+      d_null = 1;
+    else {
+      memcpy(&d, &key[g_ndbrec_d_offset], sizeof(d));
+      d_null = 0;
+    }
+  }
+  m_numattrs = numattrs;
+}
+
 // index keys
 
 struct Key {
@@ -844,7 +880,9 @@ struct Bnd {
   Bnd& make(uint minattrs);
   Bnd& make(uint minattrs, const Val& theval);
   int cmp(const Key& key) const;
+  int cmp(const Bnd& bnd2);
   int type(uint colno) const; // for setBound
+  void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
 
 private:
   Bnd& operator=(const Bnd&);
@@ -937,6 +975,52 @@ Bnd::cmp(const Key& key) const
 }
 
 int
+Bnd::cmp(const Bnd& bnd2)
+{
+  int place; // debug
+  int ret;
+  const Bnd& bnd1 = *this;
+  const Val& val1 = bnd1.m_val;
+  const Val& val2 = bnd2.m_val;
+  const uint numattrs1 = val1.m_numattrs;
+  const uint numattrs2 = val2.m_numattrs;
+  const uint n = (numattrs1 < numattrs2 ? numattrs1 : numattrs2);
+  do {
+    int k = val1.cmp(val2, n);
+    if (k != 0) {
+      place = 1;
+      ret = k;
+      break;
+    }
+    if (numattrs1 < numattrs2) {
+      place = 2;
+      ret = (+1) * bnd1.m_side;
+      break;
+    }
+    if (numattrs1 > numattrs2) {
+      place = 3;
+      ret = (-1) * bnd1.m_side;
+      break;
+    }
+    if (bnd1.m_side < bnd2.m_side) {
+      place = 4;
+      ret = -1;
+      break;
+    }
+    if (bnd1.m_side > bnd2.m_side) {
+      place = 5;
+      ret = +1;
+      break;
+    }
+    place = 6;
+    ret = 0;
+  } while (0);
+  ll3("bnd: " << *this << " cmp bnd: " << bnd2
+      << " ret: " << ret << " place: " << place);
+  return ret;
+}
+
+int
 Bnd::type(uint colno) const
 {
   int t;
@@ -960,6 +1044,21 @@ Bnd::type(uint colno) const
   return t;
 }
 
+void
+Bnd::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
+{
+  Val& val = m_val;
+  val.fromib(ib, j);
+  const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
+  const bool inclusive = (j == 0 ? ib.low_inclusive : ib.high_inclusive);
+  if (numattrs == 0) {
+    m_side = 0;
+  } else {
+    m_side = (j == 0 ? (inclusive ? -1 : +1) : (inclusive ? +1 : -1));
+  }
+  m_lohi = j;
+}
+
 // stats values
 
 struct Stval {
@@ -1016,6 +1115,7 @@ struct Rng {
   void copy(const Rng& rng2);
   int cmp(const Key& key) const; // -1,0,+1 = key is before,in,after range
   uint rowcount() const;
+  void fromib(const NdbIndexScanOperation::IndexBound& ib);
 
 private:
   Rng& operator=(const Rng&);
@@ -1164,6 +1264,15 @@ Rng::rowcount() const
   return count;
 }
 
+void
+Rng::fromib(const NdbIndexScanOperation::IndexBound& ib)
+{
+  for (uint j = 0; j <= 1; j++) {
+    Bnd& bnd = m_bnd[j];
+    bnd.fromib(ib, j);
+  }
+}
+
 static Rng* g_rnglist = 0;
 
 static void
@@ -1475,7 +1584,8 @@ queryscan(Rng& rng)
  */
 static int
 initialiseIndexBound(const Rng& rng, 
-                     NdbIndexScanOperation::IndexBound& ib)
+                     NdbIndexScanOperation::IndexBound& ib,
+                     my_record* low_key, my_record* high_key)
 {
   ll3("initialiseIndexBound: " << rng);
   uint i;
@@ -1483,9 +1593,13 @@ initialiseIndexBound(const Rng& rng,
   Uint32 colsInBound[2]= {0, 0};
   bool boundInclusive[2]= {false, false};
 
+  memset(&ib, 0xf1, sizeof(ib));
+  memset(low_key, 0xf2, sizeof(*low_key));
+  memset(high_key, 0xf3, sizeof(*high_key));
+
   // Clear nullbit storage
-  *((char *)ib.low_key) = 
-    *((char *)ib.high_key) = 0;
+  low_key->m_null_bm = 0;
+  high_key->m_null_bm = 0;
 
   for (i = 0; i < g_numattrs; i++) {
     const Uint32 no = i; // index attribute number
@@ -1499,7 +1613,7 @@ initialiseIndexBound(const Rng& rng,
     }
     for (j = 0; j <= 1; j++) {
       /* Get ptr to key storage space for this bound */
-      my_record* keyBuf= (my_record *)( (j==0) ? ib.low_key : ib.high_key);
+      my_record* keyBuf= (j==0) ? low_key : high_key;
       int t = type[j];
       if (t == -1)
         continue;
@@ -1542,8 +1656,10 @@ initialiseIndexBound(const Rng& rng,
   }
 
   /* Now have everything we need to initialise the IndexBound */
+  ib.low_key = (char*)low_key;
   ib.low_key_count= colsInBound[0];
   ib.low_inclusive= boundInclusive[0];
+  ib.high_key = (char*)high_key;
   ib.high_key_count= colsInBound[1];
   ib.high_inclusive= boundInclusive[1];
   ib.range_no= 0;
@@ -1554,11 +1670,18 @@ initialiseIndexBound(const Rng& rng,
       " high_inc=" << ib.high_inclusive);
   ll3(" low bound b=" << *((Uint32*) &ib.low_key[g_ndbrec_b_offset]) <<
       " d=" << *((Uint16*) &ib.low_key[g_ndbrec_d_offset]) <<
-      " first byte=%xu" << ib.low_key[0]);
+      " first byte=" << ib.low_key[0]);
   ll3(" high bound b=" << *((Uint32*) &ib.high_key[g_ndbrec_b_offset]) <<
       " d=" << *((Uint16*) &ib.high_key[g_ndbrec_d_offset]) <<
-      " first byte=%xu" << ib.high_key[0]);  
+      " first byte=" << ib.high_key[0]);  
 
+  // verify by reverse
+  {
+    Rng rng;
+    rng.fromib(ib);
+    require(rng.m_bnd[0].cmp(bnd[0]) == 0);
+    require(rng.m_bnd[1].cmp(bnd[1]) == 0);
+  }
   return 0;
 }
 
@@ -1568,13 +1691,12 @@ querystat_v2(Rng& rng)
   ll3("querystat_v2");
   
   /* Create IndexBound and key storage space */
-  char keySpace[2][g_ndbrecord_bytes];
   NdbIndexScanOperation::IndexBound ib;
-  ib.low_key= keySpace[0];
-  ib.high_key= keySpace[1];
+  my_record low_key;
+  my_record high_key;
 
   chkdb((g_con = g_ndb->startTransaction()) != 0);
-  chkrc(initialiseIndexBound(rng, ib) == 0);
+  chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
 
   Uint64 count = ~(Uint64)0;
   chkdb(g_is->records_in_range(g_ind, 
@@ -1610,10 +1732,9 @@ querystat(Rng& rng)
 
   // convert to IndexBound (like in mysqld)
   NdbIndexScanOperation::IndexBound ib;
-  char keySpace[2][g_ndbrecord_bytes];
-  ib.low_key = keySpace[0];
-  ib.high_key = keySpace[1];
-  chkrc(initialiseIndexBound(rng, ib) == 0);
+  my_record low_key;
+  my_record high_key;
+  chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
   chkrc(g_is->convert_range(range, g_ind_rec, &ib) == 0);
 
   // index stat query

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-wl4124-new1 branch (pekka.nousiainen:4419to 4421) WL#4124Pekka Nousiainen10 Aug