List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:May 4 2011 9:44am
Subject:bzr commit into mysql-5.1-telco-7.0-wl4163 branch (pekka:4351) WL#4163
View as plain text  
#At file:///export/space/pekka/ms/ms-wl4163-70/ based on revid:pekka@stripped

 4351 Pekka Nousiainen	2011-05-04
      wl#4163 e01_pack.diff
      pack ndb data according to spec

    added:
      storage/ndb/include/util/NdbPack.hpp
      storage/ndb/src/common/util/NdbPack.cpp
    modified:
      storage/ndb/include/util/NdbSqlUtil.hpp
      storage/ndb/src/common/util/CMakeLists.txt
      storage/ndb/src/common/util/Makefile.am
=== added file 'storage/ndb/include/util/NdbPack.hpp'
--- a/storage/ndb/include/util/NdbPack.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/NdbPack.hpp	2011-05-04 09:44:18 +0000
@@ -0,0 +1,857 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_PACK_HPP
+#define NDB_PACK_HPP
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <kernel/AttributeHeader.hpp>
+#include <NdbSqlUtil.hpp>
+#include <NdbEnv.h>
+class NdbOut;
+
+/*
+ * Pack an array of NDB data values.  The types are specified by an
+ * array of data types.  There is no associated table or attribute ids.
+ * All or an initial sequence of the specified values are present.
+ *
+ * Currently used for ordered index keys and bounds in kernel (DBTUX)
+ * and in index statistics (mysqld).  The comparison methods use the
+ * primitive type comparisons from NdbSqlUtil.
+ *
+ * Keys and bounds use same spec.  However a value in an index bound can
+ * be NULL even if the key attribute is not nullable.  Therefore bounds
+ * set the "allNullable" property and have a longer null mask.
+ *
+ * There are two distinct use occasions: 1) construction of data or
+ * bound 2) operating on previously constructed data or bound.  There
+ * are classes Data/DataC and Bound/BoundC for these uses.  The latter
+ * often can return a result without interpreting the full value.
+ *
+ * Methods return -1 on error and 0 on success.  Comparison methods
+ * assume well-formed data and return negative, zero, positive for less,
+ * equal, greater.
+ */
+
+class NdbPack {
+public:
+  class Endian;
+  class Type;
+  class Spec;
+  class Iter;
+  class DataC;
+  class Data;
+  class BoundC;
+  class Bound;
+
+  /*
+   * Get SQL type.
+   */
+  static const NdbSqlUtil::Type& getSqlType(Uint32 typeId);
+
+  /*
+   * Error codes for core dumps.
+   */
+  class Error {
+  public:
+    enum {
+      TypeNotSet = -101,          // type id was not set
+      TypeOutOfRange = -102,      // type id is out of range
+      TypeNotSupported = -103,    // blob (and for now bit) types
+      TypeSizeZero = -104,        // max size was set to zero
+      TypeFixSizeInvalid = -105,  // fixed size specified wrong
+      TypeNullableNotBool = -106, // nullable must be 0 or 1
+      CharsetNotSpecified = -107, // char type with no charset number
+      CharsetNotFound = -108,     // cannot install in all_charsets[]
+      CharsetNotAllowed = -109,   // non-char type with charset
+      SpecBufOverflow = -201,     // more spec items than allocated
+      DataCntOverflow = -301,     // more data items than in spec
+      DataBufOverflow = -302,     // more data bytes than allocated
+      DataValueOverflow = -303,   // var length exceeds max size
+      DataNotNullable = -304,     // NULL value to not-nullable type
+      InvalidAttrInfo = -305,     // invalid plain old attr info
+      BoundEmptySide = -401,      // side not 0 for empty bound
+      BoundNonemptySide = -402,   // side not -1,+1 for non-empty bound
+      InternalError = -901,
+      ValidationError = -902,
+      NoError = 0
+    };
+    Error();
+    ~Error() {}
+    int get_error_code() const;
+    int get_error_line() const;
+
+  private:
+    friend class Endian;
+    friend class Type;
+    friend class Spec;
+    friend class Iter;
+    friend class DataC;
+    friend class Data;
+    friend class BoundC;
+    friend class Bound;
+    void set_error(int code, int line) const;
+    void set_error(const Error& e2) const;
+    mutable int m_error_code;
+    mutable int m_error_line;
+  };
+
+  /*
+   * Endian definitions.
+   */
+  class Endian {
+  public:
+    enum Value {
+      Native = 0,
+      Little = 1,
+      Big = 2
+    };
+    static Value get_endian();
+    static void convert(void* ptr, Uint32 len);
+  };
+
+  /*
+   * Data type.
+   */
+  class Type : public Error {
+  public:
+    Type();
+    Type(int typeId, Uint32 byteSize, bool nullable, Uint32 csNumber);
+    ~Type() {}
+    /*
+     * Define the type.  Size is fixed or max size.  Values of variable
+     * length have length bytes.  The definition is verified when the
+     * type is added to the specification.  This also installs missing
+     * CHARSET_INFO* into all_charsets[].
+     */
+    void set(Uint32 typeId, Uint32 byteSize, bool nullable, Uint32 csNumber);
+    // getters
+    Uint32 get_type_id() const;
+    Uint32 get_byte_size() const;
+    bool get_nullable() const;
+    Uint32 get_cs_number() const;
+    Uint32 get_array_type() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const Type&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    friend class Spec;
+    friend class Iter;
+    friend class DataC;
+    friend class Data;
+    // verify and complete when added to specification
+    int complete();
+    Uint16 m_typeId;
+    Uint16 m_byteSize;    // fixed or max size in bytes
+    Uint16 m_nullable;
+    Uint16 m_csNumber;
+    Uint16 m_arrayType;   // 0,1,2 length bytes
+    Uint16 m_nullbitPos;  // computed as part of Spec
+  };
+
+  /*
+   * Data specification i.e. array of types.  Usually constructed on the
+   * heap, so keep fairly small.  Used for boths keys and bounds.
+   */
+  class Spec : public Error {
+  public:
+    Spec();
+    ~Spec() {}
+    // set initial buffer (calls reset)
+    void set_buf(Type* buf, Uint32 bufMaxCnt);
+    // use if buffer is relocated
+    void set_buf(Type* buf);
+    // reset but keep buffer
+    void reset();
+    // add type to specification once or number of times
+    int add(Type type);
+    int add(Type type, Uint32 cnt);
+    // copy from
+    void copy(const Spec& s2);
+    // getters (bounds set allNullable)
+    const Type& get_type(Uint32 i) const;
+    Uint32 get_cnt() const;
+    Uint32 get_nullable_cnt(bool allNullable) const;
+    Uint32 get_nullmask_len(bool allNullable) const;
+    // max data length including null mask
+    Uint32 get_max_data_len(bool allNullable) const;
+    // minimum var bytes (if used by Data instance)
+    Uint32 get_min_var_bytes(bool allNullable) const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const Spec&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    friend class Iter;
+    friend class DataC;
+    friend class Data;
+    friend class BoundC;
+    // undefined
+    Spec(const Spec&);
+    Spec& operator=(const Spec&);
+    Type* m_buf;
+    Uint16 m_bufMaxCnt;
+    Uint16 m_cnt;
+    Uint16 m_nullableCnt;
+    Uint16 m_varsizeCnt;
+    Uint32 m_maxByteSize; // excludes null mask
+  };
+
+  /*
+   * Iterator over data items.  DataC uses external Iter instances in
+   * comparison methods etc.  Data contains an Iter instance which
+   * iterates on items added.
+   */
+  class Iter : public Error {
+  public:
+    // the data instance is only used to set metadata
+    Iter(const DataC& data);
+    ~Iter() {}
+    void reset();
+
+  private:
+    friend class DataC;
+    friend class Data;
+    friend class BoundC;
+    // undefined
+    Iter(const Iter&);
+    Iter& operator=(const Iter&);
+    // describe next non-null or null item and advance iterator
+    int desc(const Uint8* item);
+    int desc_null();
+    // compare current items (DataC buffers are passed)
+    int cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const;
+
+    const Spec& m_spec;
+    const bool m_allNullable;
+    // iterator
+    Uint32 m_itemPos;     // position of current item in DataC buffer
+    Uint32 m_cnt;         // number of items described so far
+    Uint32 m_nullCnt;
+    // current item
+    Uint32 m_lenBytes;    // 0-2
+    Uint32 m_bareLen;     // excludes length bytes
+    Uint32 m_itemLen;     // full length, value zero means null
+  };
+
+  /*
+   * Read-only superclass of Data.  Initialized from a previously
+   * constructed Data buffer (any var bytes skipped).  Methods interpret
+   * one data item at a time.  Values are native endian.
+   */
+  class DataC : public Error {
+  public:
+    DataC(const Spec& spec, bool allNullable);
+    // set buffer to previously constructed one with given item count
+    void set_buf(const void* buf, Uint32 bufMaxLen, Uint32 cnt);
+    // interpret next data item
+    int desc(Iter& r) const;
+    // compare cnt attrs and also return number of initial equal attrs
+    int cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const;
+    // getters
+    const Spec& get_spec() const;
+    bool get_all_nullable() const;
+    const void* get_data_buf() const;
+    Uint32 get_cnt() const;
+    bool is_empty() const;
+    bool is_full() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const DataC&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz, bool convert_flag = false) const;
+    int validate() const { return 0; }
+
+  private:
+    friend class Iter;
+    friend class Data;
+    friend class BoundC;
+    // undefined
+    DataC(const Data&);
+    DataC& operator=(const DataC&);
+    const Spec& m_spec;
+    const bool m_allNullable;
+    const Uint8* m_buf;
+    Uint32 m_bufMaxLen;
+    // can be updated as part of Data instance
+    Uint32 m_cnt;
+  };
+
+  /*
+   * Instance of an array of data values.  The values are packed into
+   * a byte buffer.  The buffer is also maintained as a single varbinary
+   * value if non-zero var bytes (length bytes) is specified.
+   */
+  class Data : public DataC {
+  public:
+    Data(const Spec& spec, bool allNullable, Uint32 varBytes);
+    // set buffer (calls reset)
+    void set_buf(void* buf, Uint32 bufMaxLen);
+    // reset but keep buffer (header is zeroed)
+    void reset();
+    // add non-null data items and return length in bytes
+    int add(const void* data, Uint32* len_out);
+    int add(const void* data, Uint32 cnt, Uint32* len_out);
+    // add null data items and return length 0 bytes
+    int add_null(Uint32* len_out);
+    int add_null(Uint32 cnt, Uint32* len_out);
+    // add from "plain old attr info"
+    int add_poai(const Uint32* poai, Uint32* len_out);
+    int add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out);
+    // call this before first use
+    int finalize();
+    // copy from
+    int copy(const DataC& d2);
+    // convert endian
+    int convert(Endian::Value to_endian);
+    // create complete instance from buffer contents
+    int desc_all(Uint32 cnt);
+    // getters
+    Uint32 get_max_len() const;
+    Uint32 get_max_len4() const;
+    Uint32 get_var_bytes() const;
+    const void* get_full_buf() const;
+    Uint32 get_full_len() const;
+    Uint32 get_data_len() const;
+    Uint32 get_null_cnt() const;
+    Endian::Value get_endian() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const Data&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    friend class Iter;
+    friend class Bound;
+    // undefined
+    Data(const Data&);
+    Data& operator=(const Data&);
+    int finalize_impl();
+    int convert_impl(Endian::Value to_endian);
+    const Uint32 m_varBytes;
+    Uint8* m_buf;
+    Uint32 m_bufMaxLen;
+    Endian::Value m_endian; // Native until finalize()
+    // iterator on items added
+    Iter m_iter;
+  };
+
+  /*
+   * Read-only superclass of BoundC, analogous to DataC.  Initialized
+   * from a previously constructed Bound or DataC buffer.
+   */
+  class BoundC : public Error {
+  public:
+    BoundC(DataC& data);
+    ~BoundC() {}
+    // call this before first use
+    int finalize(int side);
+    // compare bound to key (may return 0 if bound is longer)
+    int cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const;
+    // compare bounds (may return 0 if cnt is less than min length)
+    int cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const;
+    // getters
+    DataC& get_data() const;
+    int get_side() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const BoundC&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    friend class Bound;
+    // undefined
+    BoundC(const BoundC&);
+    BoundC& operator=(const BoundC&);
+    DataC& m_data;
+    int m_side;
+  };
+
+  /*
+   * Ordered index range bound consists of a partial key and a "side".
+   * The partial key is a Data instance where some initial number of
+   * values are present.  It is defined separately by the caller and
+   * passed to Bound ctor by reference.
+   */
+  class Bound : public BoundC {
+  public:
+    Bound(Data& data);
+    ~Bound() {}
+    void reset();
+    // call this before first use
+    int finalize(int side);
+    // getters
+    Data& get_data() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const Bound&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    // undefined
+    Bound(const Bound&);
+    Bound& operator=(const Bound&);
+    Data& m_data;
+  };
+
+  /*
+   * Helper for print() methods.
+   */
+  struct Print {
+  private:
+    friend class Endian;
+    friend class Type;
+    friend class Spec;
+    friend class Iter;
+    friend class DataC;
+    friend class Data;
+    friend class BoundC;
+    friend class Bound;
+    Print(char* buf, Uint32 bufsz);
+    void print(const char* frm, ...);
+    char* m_buf;
+    Uint32 m_bufsz;
+    Uint32 m_sz;
+  };
+};
+
+// NdbPack
+
+inline const NdbSqlUtil::Type&
+NdbPack::getSqlType(Uint32 typeId)
+{
+  return NdbSqlUtil::m_typeList[typeId];
+}
+
+// NdbPack::Error
+
+inline
+NdbPack::Error::Error()
+{
+  m_error_code = 0;
+  m_error_line = 0;
+}
+
+// NdbPack::Endian
+
+inline NdbPack::Endian::Value
+NdbPack::Endian::get_endian()
+{
+#ifndef WORDS_BIGENDIAN
+  return Little;
+#else
+  return Big;
+#endif
+}
+
+// NdbPack::Type
+
+inline
+NdbPack::Type::Type()
+{
+  m_typeId = NDB_TYPE_UNDEFINED;
+  m_byteSize = 0;
+  m_nullable = true;
+  m_csNumber = 0;
+  m_arrayType = 0;
+  m_nullbitPos = 0;
+}
+
+inline
+NdbPack::Type::Type(int typeId, Uint32 byteSize, bool nullable, Uint32 csNumber)
+{
+  set(typeId, byteSize, nullable, csNumber);
+}
+
+inline void
+NdbPack::Type::set(Uint32 typeId, Uint32 byteSize, bool nullable, Uint32 csNumber)
+{
+  m_typeId = typeId;
+  m_byteSize = byteSize;
+  m_nullable = nullable;
+  m_csNumber = csNumber;
+}
+
+inline Uint32
+NdbPack::Type::get_type_id() const
+{
+  return m_typeId;
+}
+
+inline Uint32
+NdbPack::Type::get_byte_size() const
+{
+  return m_byteSize;
+}
+
+inline bool
+NdbPack::Type::get_nullable() const
+{
+  return (bool)m_nullable;
+}
+
+inline Uint32
+NdbPack::Type::get_cs_number() const
+{
+  return m_csNumber;
+}
+
+inline Uint32
+NdbPack::Type::get_array_type() const
+{
+  return m_arrayType;
+}
+
+// NdbPack::Spec
+
+inline
+NdbPack::Spec::Spec()
+{
+  reset();
+  m_buf = 0;
+  m_bufMaxCnt = 0;
+}
+
+inline void
+NdbPack::Spec::set_buf(Type* buf, Uint32 bufMaxCnt)
+{
+  reset();
+  m_buf = buf;
+  m_bufMaxCnt = bufMaxCnt;
+}
+
+inline void
+NdbPack::Spec::set_buf(Type* buf)
+{
+  m_buf = buf;
+}
+
+inline void
+NdbPack::Spec::reset()
+{
+  m_cnt = 0;
+  m_nullableCnt = 0;
+  m_varsizeCnt = 0;
+  m_maxByteSize = 0;
+}
+
+inline const NdbPack::Type&
+NdbPack::Spec::get_type(Uint32 i) const
+{
+  assert(i < m_cnt);
+  return m_buf[i];
+}
+
+inline Uint32
+NdbPack::Spec::get_cnt() const
+{
+  return m_cnt;
+}
+
+inline Uint32
+NdbPack::Spec::get_nullable_cnt(bool allNullable) const
+{
+  if (!allNullable)
+    return m_nullableCnt;
+  else
+    return m_cnt;
+}
+
+inline Uint32
+NdbPack::Spec::get_nullmask_len(bool allNullable) const
+{
+  return (get_nullable_cnt(allNullable) + 7) / 8;
+}
+
+inline Uint32
+NdbPack::Spec::get_max_data_len(bool allNullable) const
+{
+  return get_nullmask_len(allNullable) + m_maxByteSize;
+}
+
+inline Uint32
+NdbPack::Spec::get_min_var_bytes(bool allNullable) const
+{
+  const Uint32 len = get_max_data_len(allNullable);
+  return (len < 256 ? 1 : 2);
+}
+
+// NdbPack::Iter
+
+inline
+NdbPack::Iter::Iter(const DataC& data) :
+  m_spec(data.m_spec),
+  m_allNullable(data.m_allNullable)
+{
+  reset();
+}
+
+inline void
+NdbPack::Iter::reset()
+{
+  m_itemPos = m_spec.get_nullmask_len(m_allNullable);
+  m_cnt = 0;
+  m_nullCnt = 0;
+  m_lenBytes = 0;
+  m_bareLen = 0;
+  m_itemLen = 0;
+}
+
+// NdbPack::DataC
+
+inline
+NdbPack::DataC::DataC(const Spec& spec, bool allNullable) :
+  m_spec(spec),
+  m_allNullable(allNullable)
+{
+  m_buf = 0;
+  m_bufMaxLen = 0;
+  m_cnt = 0;
+}
+
+inline void
+NdbPack::DataC::set_buf(const void* buf, Uint32 bufMaxLen, Uint32 cnt)
+{
+  m_buf = static_cast<const Uint8*>(buf);
+  m_bufMaxLen = bufMaxLen;
+  m_cnt = cnt;
+}
+
+inline const NdbPack::Spec&
+NdbPack::DataC::get_spec() const
+{
+  return m_spec;
+}
+
+inline bool
+NdbPack::DataC::get_all_nullable() const
+{
+  return &m_allNullable;
+}
+
+inline const void*
+NdbPack::DataC::get_data_buf() const
+{
+  return &m_buf[0];
+}
+
+inline Uint32
+NdbPack::DataC::get_cnt() const
+{
+  return m_cnt;
+}
+
+inline bool
+NdbPack::DataC::is_empty() const
+{
+  return m_cnt == 0;
+}
+
+inline bool
+NdbPack::DataC::is_full() const
+{
+  return m_cnt == m_spec.m_cnt;
+}
+
+// NdbPack::Data
+
+inline
+NdbPack::Data::Data(const Spec& spec, bool allNullable, Uint32 varBytes) :
+  DataC(spec, allNullable),
+  m_varBytes(varBytes),
+  m_iter(*this)
+{
+  m_buf = 0;
+  m_bufMaxLen = 0;
+  m_endian = Endian::Native;
+}
+
+inline void
+NdbPack::Data::set_buf(void* buf, Uint32 bufMaxLen)
+{
+  m_buf = static_cast<Uint8*>(buf);
+  m_bufMaxLen = bufMaxLen;
+  reset();
+  assert(bufMaxLen >= m_varBytes);
+  DataC::set_buf(&m_buf[m_varBytes], m_bufMaxLen - m_varBytes, 0);
+}
+
+inline void
+NdbPack::Data::reset()
+{
+  m_cnt = 0;  // in DataC
+  const Uint32 bytes = m_varBytes + m_spec.get_nullmask_len(m_allNullable);
+  memset(m_buf, 0, bytes);
+  m_endian = Endian::Native;
+  m_iter.reset();
+}
+
+inline int
+NdbPack::Data::finalize()
+{
+  if (m_varBytes == 0 ||
+      finalize_impl() == 0)
+  {
+    m_endian = Endian::get_endian();
+    return 0;
+  }
+  return -1;
+}
+
+inline int
+NdbPack::Data::convert(Endian::Value to_endian)
+{
+  if (unlikely(to_endian == Endian::Native))
+    to_endian = Endian::get_endian();
+  if (m_endian == to_endian)
+    return 0;
+  if (convert_impl(to_endian) == 0)
+  {
+    m_endian = to_endian;
+    return 0;
+  }
+  return -1;
+}
+
+inline Uint32
+NdbPack::Data::get_max_len() const
+{
+  return m_varBytes + m_spec.get_max_data_len(m_allNullable);
+}
+
+inline Uint32
+NdbPack::Data::get_max_len4() const
+{
+  Uint32 len4 = get_max_len();
+  len4 += 3;
+  len4 /= 4;
+  len4 *= 4;
+  return len4;
+}
+
+inline Uint32
+NdbPack::Data::get_var_bytes() const
+{
+  return m_varBytes;
+}
+
+inline const void*
+NdbPack::Data::get_full_buf() const
+{
+  return &m_buf[0];
+}
+
+inline Uint32
+NdbPack::Data::get_full_len() const
+{
+  return m_varBytes + m_iter.m_itemPos + m_iter.m_itemLen;
+}
+
+inline Uint32
+NdbPack::Data::get_data_len() const
+{
+  return m_iter.m_itemPos + m_iter.m_itemLen;
+}
+
+inline Uint32
+NdbPack::Data::get_null_cnt() const
+{
+  return m_iter.m_nullCnt;
+}
+
+inline NdbPack::Endian::Value
+NdbPack::Data::get_endian() const
+{
+  return m_endian;
+}
+
+// NdbPack::BoundC
+
+inline
+NdbPack::BoundC::BoundC(DataC& data) :
+  m_data(data)
+{
+  m_side = 0;
+}
+
+inline int
+NdbPack::BoundC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
+{
+  const BoundC& b1 = *this;
+  const DataC& d1 = b1.m_data;
+  int res = d1.cmp(d2, cnt, num_eq);
+  if (res == 0 && d1.m_cnt <= d2.m_cnt)
+    res = b1.m_side;
+  return res;
+}
+
+inline NdbPack::DataC&
+NdbPack::BoundC::get_data() const
+{
+  return m_data;
+}
+
+inline int
+NdbPack::BoundC::get_side() const
+{
+  return m_side;
+}
+
+// NdbPack::Bound
+
+inline
+NdbPack::Bound::Bound(Data& data) :
+  BoundC(data),
+  m_data(data)
+{
+}
+
+inline void
+NdbPack::Bound::reset()
+{
+  m_data.reset();
+  m_side = 0;
+}
+
+inline int
+NdbPack::Bound::finalize(int side)
+{
+  if (m_data.finalize() == -1)
+  {
+    set_error(m_data);
+    return -1;
+  }
+  if (BoundC::finalize(side) == -1)
+    return -1;
+  return 0;
+}
+
+inline NdbPack::Data&
+NdbPack::Bound::get_data() const
+{
+  return m_data;
+}
+
+#endif // NDB_PACK_HPP

=== modified file 'storage/ndb/include/util/NdbSqlUtil.hpp'
--- a/storage/ndb/include/util/NdbSqlUtil.hpp	2011-02-19 03:13:04 +0000
+++ b/storage/ndb/include/util/NdbSqlUtil.hpp	2011-05-04 09:44:18 +0000
@@ -177,6 +177,7 @@ public:
                                Uint32 dataByteSize);
 
 private:
+  friend class NdbPack;
   /**
    * List of all types.  Must match Type::Enum.
    */

=== modified file 'storage/ndb/src/common/util/CMakeLists.txt'
--- a/storage/ndb/src/common/util/CMakeLists.txt	2011-04-01 11:46:04 +0000
+++ b/storage/ndb/src/common/util/CMakeLists.txt	2011-05-04 09:44:18 +0000
@@ -54,6 +54,7 @@ ADD_LIBRARY(ndbgeneral STATIC
 	    SparseBitmask.cpp
             require.c
             Vector.cpp
+            NdbPack.cpp
 )
 TARGET_LINK_LIBRARIES(ndbgeneral ndbtrace ${ZLIB_LIBRARY} mysys)
 
@@ -82,3 +83,8 @@ SET_TARGET_PROPERTIES(ndb_version-t
                       PROPERTIES COMPILE_FLAGS "-DTEST_VERSION")
 TARGET_LINK_LIBRARIES(ndb_version-t ndbgeneral)
 
+ADD_EXECUTABLE(NdbPack-t NdbPack.cpp)
+SET_TARGET_PROPERTIES(NdbPack-t
+                      PROPERTIES COMPILE_FLAGS "-DTEST_NDB_PACK")
+TARGET_LINK_LIBRARIES(NdbPack-t ndbgeneral)
+

=== modified file 'storage/ndb/src/common/util/Makefile.am'
--- a/storage/ndb/src/common/util/Makefile.am	2011-04-01 11:46:04 +0000
+++ b/storage/ndb/src/common/util/Makefile.am	2011-05-04 09:44:18 +0000
@@ -27,14 +27,15 @@ libgeneral_la_SOURCES = \
             strdup.c \
             ConfigValues.cpp ndb_init.cpp basestring_vsnprintf.c \
             Bitmask.cpp SparseBitmask.cpp parse_mask.hpp \
-	    ndb_rand.c require.c Vector.cpp
+	    ndb_rand.c require.c Vector.cpp \
+	    NdbPack.cpp
 
 INCLUDES_LOC = @ZLIB_INCLUDES@
 
 libndbazio_la_SOURCES = ndbzio.c
 libndbazio_la_LIBADD = @ZLIB_LIBS@
 
-noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t
+noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t NdbPack-t
 
 BaseString_t_SOURCES = BaseString.cpp
 BaseString_t_CXXFLAGS = -DTEST_BASE_STRING
@@ -73,5 +74,14 @@ ndb_version_t_SOURCES = version.cpp
 ndb_version_t_CXXFLAGS = -DTEST_VERSION
 ndb_version_t_LDADD = libgeneral.la
 
+NdbPack_t_SOURCES = NdbPack.cpp
+NdbPack_t_CXXFLAGS = -DTEST_NDB_PACK
+NdbPack_t_LDADD = \
+	libgeneral.la \
+	$(top_builddir)/storage/ndb/src/common/portlib/libportlib.la \
+	$(top_builddir)/mysys/libmysys.la \
+	$(top_builddir)/strings/libmystrings.la \
+	$(top_builddir)/dbug/libdbug.la
+
 include $(top_srcdir)/storage/ndb/config/common.mk.am
 include $(top_srcdir)/storage/ndb/config/type_util.mk.am

=== added file 'storage/ndb/src/common/util/NdbPack.cpp'
--- a/storage/ndb/src/common/util/NdbPack.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/NdbPack.cpp	2011-05-04 09:44:18 +0000
@@ -0,0 +1,2052 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include <ndb_global.h>
+#include <NdbPack.hpp>
+#include <NdbOut.hpp>
+#include <NdbEnv.h>
+
+// NdbPack::Error
+
+int
+NdbPack::Error::get_error_code() const
+{
+  return m_error_code;
+}
+
+int
+NdbPack::Error::get_error_line() const
+{
+  return m_error_line;
+}
+
+void
+NdbPack::Error::set_error(int code, int line) const
+{
+  m_error_code = code;
+  m_error_line = line;
+#ifdef VM_TRACE
+  const char* p = NdbEnv_GetEnv("NDB_PACK_ABORT_ON_ERROR", (char*)0, 0);
+  if (p != 0 && strchr("1Y", p[0]) != 0)
+    require(false);
+#endif
+}
+
+void
+NdbPack::Error::set_error(const Error& e2) const
+{
+  set_error(e2.m_error_code, e2.m_error_line);
+}
+
+// NdbPack::Endian
+
+void
+NdbPack::Endian::convert(void* ptr, Uint32 len)
+{
+  Uint8* p = (Uint8*)ptr;
+  for (Uint32 i = 0; i < len / 2; i++)
+  {
+    Uint32 j = len - i - 1;
+    Uint8 tmp = p[i];
+    p[i] = p[j];
+    p[j] = tmp;
+  }
+}
+
+// NdbPack::Type
+
+struct Ndb_pack_type_info {
+  bool m_supported;
+  Uint16 m_fixSize;     // if non-zero must have this exact size
+  Uint16 m_arrayType;   // 0,1,2 length bytes
+  bool m_charType;      // type with character set
+  bool m_convert;       // convert endian (reverse byte order)
+};
+
+static const Ndb_pack_type_info
+g_ndb_pack_type_info[] = {
+  { 0, 0, 0, 0, 0 }, // NDB_TYPE_UNDEFINED
+  { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYINT
+  { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYUNSIGNED
+  { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLINT
+  { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLUNSIGNED
+  { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMINT
+  { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMUNSIGNED
+  { 1, 4, 0, 0, 1 }, // NDB_TYPE_INT
+  { 1, 4, 0, 0, 1 }, // NDB_TYPE_UNSIGNED
+  { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGINT
+  { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGUNSIGNED
+  { 1, 4, 0, 0, 1 }, // NDB_TYPE_FLOAT
+  { 1, 8, 0, 0, 1 }, // NDB_TYPE_DOUBLE
+  { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMAL
+  { 1, 0, 0, 1, 0 }, // NDB_TYPE_CHAR
+  { 1, 0, 1, 1, 0 }, // NDB_TYPE_VARCHAR
+  { 1, 0, 0, 0, 0 }, // NDB_TYPE_BINARY
+  { 1, 0, 1, 0, 0 }, // NDB_TYPE_VARBINARY
+  { 1, 8, 0, 0, 0 }, // NDB_TYPE_DATETIME
+  { 1, 3, 0, 0, 0 }, // NDB_TYPE_DATE
+  { 0, 0, 0, 0, 0 }, // NDB_TYPE_BLOB
+  { 0, 0, 0, 1, 0 }, // NDB_TYPE_TEXT
+  { 0, 0, 0, 0, 0 }, // NDB_TYPE_BIT
+  { 1, 0, 2, 1, 0 }, // NDB_TYPE_LONGVARCHAR
+  { 1, 0, 2, 0, 0 }, // NDB_TYPE_LONGVARBINARY
+  { 1, 3, 0, 0, 0 }, // NDB_TYPE_TIME
+  { 1, 1, 0, 0, 0 }, // NDB_TYPE_YEAR
+  { 1, 4, 0, 0, 0 }, // NDB_TYPE_TIMESTAMP
+  { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMALUNSIGNED
+  { 1, 0, 0, 0, 0 }, // NDB_TYPE_DECIMAL
+  { 1, 0, 0, 0, 0 }  // NDB_TYPE_DECIMALUNSIGNED
+};
+
+static const int g_ndb_pack_type_info_cnt =
+  sizeof(g_ndb_pack_type_info) / sizeof(g_ndb_pack_type_info[0]);
+
+int
+NdbPack::Type::complete()
+{
+  if (m_typeId == 0)
+  {
+    set_error(TypeNotSet, __LINE__);
+    return -1;
+  }
+  if (m_typeId >= g_ndb_pack_type_info_cnt)
+  {
+    set_error(TypeNotSet, __LINE__);
+    return -1;
+  }
+  const Ndb_pack_type_info& info = g_ndb_pack_type_info[m_typeId];
+  if (!info.m_supported)
+  {
+    set_error(TypeNotSupported, __LINE__);
+    return -1;
+  }
+  if (m_byteSize == 0)
+  {
+    set_error(TypeSizeZero, __LINE__);
+    return -1;
+  }
+  if (info.m_fixSize != 0 && m_byteSize != info.m_fixSize)
+  {
+    set_error(TypeFixSizeInvalid, __LINE__);
+    return -1;
+  }
+  if (!(m_nullable <= 1))
+  {
+    set_error(TypeNullableNotBool, __LINE__);
+    return -1;
+  }
+  if (info.m_charType && m_csNumber == 0)
+  {
+    set_error(CharsetNotSpecified, __LINE__);
+    return -1;
+  }
+  if (info.m_charType && all_charsets[m_csNumber] == 0)
+  {
+    CHARSET_INFO* cs = get_charset(m_csNumber, MYF(0));
+    if (cs == 0)
+    {
+      set_error(CharsetNotFound, __LINE__);
+      return -1;
+    }
+    all_charsets[m_csNumber] = cs; // yes caller must do this
+  }
+  if (!info.m_charType && m_csNumber != 0)
+  {
+    set_error(CharsetNotAllowed, __LINE__);
+    return -1;
+  }
+  m_arrayType = info.m_arrayType;
+  return 0;
+}
+
+// NdbPack::Spec
+
+int
+NdbPack::Spec::add(Type type)
+{
+  Uint32 cnt = m_cnt;
+  Uint32 nullable_cnt = m_nullableCnt;
+  Uint32 varsize_cnt = m_varsizeCnt;
+  Uint32 max_byte_size = m_maxByteSize;
+  if (type.complete() == -1)
+  {
+    set_error(type);
+    return -1;
+  }
+  type.m_nullbitPos = 0xFFFF;
+  if (type.m_nullable)
+  {
+    type.m_nullbitPos = nullable_cnt;
+    nullable_cnt++;
+  }
+  if (type.m_arrayType != 0)
+  {
+    varsize_cnt++;
+  }
+  max_byte_size += type.m_byteSize;
+  if (cnt >= m_bufMaxCnt)
+  {
+    set_error(SpecBufOverflow, __LINE__);
+    return -1;
+  }
+  m_buf[cnt] = type;
+  cnt++;
+  m_cnt = cnt;
+  m_nullableCnt = nullable_cnt;
+  m_varsizeCnt = varsize_cnt;
+  m_maxByteSize = max_byte_size;
+  return 0;
+}
+
+int
+NdbPack::Spec::add(Type type, Uint32 cnt)
+{
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    if (add(type) == -1)
+      return -1;
+  }
+  return 0;
+}
+
+void
+NdbPack::Spec::copy(const Spec& s2)
+{
+  assert(m_bufMaxCnt >= s2.m_cnt);
+  reset();
+  m_cnt = s2.m_cnt;
+  m_nullableCnt = s2.m_nullableCnt;
+  m_varsizeCnt = s2.m_varsizeCnt;
+  m_maxByteSize = s2.m_maxByteSize;
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    m_buf[i] = s2.m_buf[i];
+  }
+}
+
+// NdbPack::Iter
+
+int
+NdbPack::Iter::desc(const Uint8* item)
+{
+  const Uint32 i = m_cnt; // item index
+  assert(i < m_spec.m_cnt);
+  const Type& type = m_spec.m_buf[i];
+  const Uint32 lenBytes = type.m_arrayType;
+  Uint32 bareLen = 0;
+  switch (lenBytes) {
+  case 0:
+    bareLen = type.m_byteSize;
+    break;
+  case 1:
+    bareLen = item[0];
+    break;
+  case 2:
+    bareLen = item[0] + (item[1] << 8);
+    break;
+  default:
+    assert(false);
+    set_error(InternalError, __LINE__);
+    return -1;
+  }
+  const Uint32 itemLen = lenBytes + bareLen;
+  if (itemLen > type.m_byteSize)
+  {
+    set_error(DataValueOverflow, __LINE__);
+    return -1;
+  }
+  m_itemPos += m_itemLen; // skip previous item
+  m_cnt++;
+  m_lenBytes = lenBytes;
+  m_bareLen = bareLen;
+  m_itemLen = itemLen;
+  return 0;
+}
+
+int
+NdbPack::Iter::desc_null()
+{
+  assert(m_cnt < m_spec.m_cnt);
+  // caller checks if null allowed
+  m_itemPos += m_itemLen; // skip previous item
+  m_cnt++;
+  m_nullCnt++;
+  m_lenBytes = 0;
+  m_bareLen = 0;
+  m_itemLen = 0;
+  return 0;
+}
+
+int
+NdbPack::Iter::cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const
+{
+  const Iter& r1 = *this;
+  assert(&r1.m_spec == &r2.m_spec);
+  assert(r1.m_cnt == r2.m_cnt && r1.m_cnt > 0);
+  const Uint32 i = r1.m_cnt - 1; // item index
+  int res = 0;
+  const Uint32 n1 = r1.m_itemLen;
+  const Uint32 n2 = r2.m_itemLen;
+  if (n1 != 0)
+  {
+    if (n2 != 0)
+    {
+      const Type& type = r1.m_spec.m_buf[i];
+      const NdbSqlUtil::Type& sqlType = getSqlType(type.m_typeId);
+      const Uint8* p1 = &buf1[r1.m_itemPos];
+      const Uint8* p2 = &buf2[r2.m_itemPos];
+      CHARSET_INFO* cs = all_charsets[type.m_csNumber];
+      res = (*sqlType.m_cmp)(cs, p1, n1, p2, n2, true);
+    }
+    else
+    {
+      res = +1;
+    }
+  }
+  else
+  {
+    if (n2 != 0)
+      res = -1;
+  }
+  return res;
+}
+
+// NdbPack::DataC
+
+int
+NdbPack::DataC::desc(Iter& r) const
+{
+  const Uint32 i = r.m_cnt; // item index
+  assert(i < m_cnt);
+  const Type& type = m_spec.m_buf[i];
+  if (type.m_nullable || m_allNullable)
+  {
+    Uint32 nullbitPos = 0;
+    if (!m_allNullable)
+      nullbitPos = type.m_nullbitPos;
+    else
+      nullbitPos = i;
+    const Uint32 byte_pos = nullbitPos / 8;
+    const Uint32 bit_pos = nullbitPos % 8;
+    const Uint8 bit_mask = (1 << bit_pos);
+    const Uint8& the_byte = m_buf[byte_pos];
+    if ((the_byte & bit_mask) != 0)
+    {
+      if (r.desc_null() == -1)
+      {
+        set_error(r);
+        return -1;
+      }
+      return 0;
+    }
+  }
+  const Uint32 pos = r.m_itemPos + r.m_itemLen;
+  const Uint8* item = &m_buf[pos];
+  if (r.desc(item) == -1)
+  {
+    set_error(r);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::DataC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
+{
+  const DataC& d1 = *this;
+  assert(cnt <= d1.m_cnt);
+  assert(cnt <= d2.m_cnt);
+  Iter r1(d1);
+  Iter r2(d2);
+  int res = 0;
+  Uint32 i; // remember last
+  for (i = 0; i < cnt; i++)
+  {
+    d1.desc(r1);
+    d2.desc(r2);
+    res = r1.cmp(r2, d1.m_buf, d2.m_buf);
+    if (res != 0)
+      break;
+  }
+  num_eq = i;
+  return res;
+}
+
+// NdbPack::Data
+
+int
+NdbPack::Data::add(const void* data, Uint32* len_out)
+{
+  assert(data != 0);
+  const Uint8* item = (const Uint8*)data;
+  const Uint32 i = m_cnt; // item index
+  if (i >= m_spec.m_cnt)
+  {
+    set_error(DataCntOverflow, __LINE__);
+    return -1;
+  }
+  Iter& r = m_iter;
+  assert(r.m_cnt == i);
+  const Uint32 fullLen = m_varBytes + r.m_itemPos + r.m_itemLen;
+  if (r.desc(item) == -1)
+  {
+    set_error(r);
+    return -1;
+  }
+  if (fullLen + r.m_itemLen > m_bufMaxLen)
+  {
+    set_error(DataBufOverflow, __LINE__);
+    return -1;
+  }
+  memcpy(&m_buf[fullLen], item, r.m_itemLen);
+  *len_out = r.m_itemLen;
+  m_cnt++;
+  return 0;
+}
+
+int
+NdbPack::Data::add(const void* data, Uint32 cnt, Uint32* len_out)
+{
+  const Uint8* data_ptr = (const Uint8*)data;
+  Uint32 len_tot = 0;
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    Uint32 len;
+    if (add(data_ptr, &len) == -1)
+      return -1;
+    if (data != 0)
+      data_ptr += len;
+    len_tot += len;
+  }
+  *len_out = len_tot;
+  return 0;
+}
+
+int
+NdbPack::Data::add_null(Uint32* len_out)
+{
+  const Uint32 i = m_cnt; // item index
+  if (i >= m_spec.m_cnt)
+  {
+    set_error(DataCntOverflow, __LINE__);
+    return -1;
+  }
+  Iter& r = m_iter;
+  assert(r.m_cnt == i);
+  if (r.desc_null() == -1)
+  {
+    set_error(r);
+    return -1;
+  }
+  Uint32 nullbitPos = 0;
+  if (!m_allNullable)
+  {
+    const Type& type = m_spec.m_buf[i];
+    if (!type.m_nullable)
+    {
+      set_error(DataNotNullable, __LINE__);
+      return -1;
+    }
+    nullbitPos = type.m_nullbitPos;
+  }
+  else
+  {
+    nullbitPos = i;
+  }
+  const Uint32 byte_pos = nullbitPos / 8;
+  const Uint32 bit_pos = nullbitPos % 8;
+  const Uint8 bit_mask = (1 << bit_pos);
+  Uint8& the_byte = m_buf[m_varBytes + byte_pos];
+  assert((the_byte & bit_mask) == 0);
+  the_byte |= bit_mask;
+  *len_out = r.m_itemLen;
+  m_cnt++;
+  return 0;
+}
+
+int
+NdbPack::Data::add_null(Uint32 cnt, Uint32* len_out)
+{
+  Uint32 len_tot = 0;
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    Uint32 len;
+    if (add_null(&len) == -1)
+      return -1;
+    len_tot += len;
+  }
+  *len_out = len_tot;
+  return 0;
+}
+
+int
+NdbPack::Data::add_poai(const Uint32* poai, Uint32* len_out)
+{
+  const AttributeHeader ah = *(const AttributeHeader*)&poai[0];
+  if (!ah.isNULL())
+  {
+    if (add(&poai[1], len_out) == -1)
+      return -1;
+  }
+  else
+  {
+    if (add_null(len_out) == -1)
+      return -1;
+  }
+  if (ah.getByteSize() != *len_out)
+  {
+    set_error(InvalidAttrInfo, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::Data::add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out)
+{
+  Uint32 len_tot = 0;
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    Uint32 len;
+    if (add_poai(poai, &len) == -1)
+      return -1;
+    len_tot += len;
+    poai += 1 + (len + 3) / 4;
+  }
+  *len_out = len_tot;
+  return 0;
+}
+
+int
+NdbPack::Data::finalize_impl()
+{
+  const Uint32 dataLen = m_iter.m_itemPos + m_iter.m_itemLen;
+  switch (m_varBytes) {
+  // case 0: inlined
+  case 1:
+    if (dataLen <= 0xFF)
+    {
+      m_buf[0] = dataLen;
+      return 0;
+    }
+    break;
+  case 2:
+    if (dataLen <= 0xFFFF)
+    {
+      m_buf[0] = (dataLen & 0xFF);
+      m_buf[1] = (dataLen >> 8);
+      return 0;
+    }
+    break;
+  default:
+    break;
+  }
+  set_error(InternalError, __LINE__);
+  return -1;
+}
+
+int
+NdbPack::Data::desc_all(Uint32 cnt)
+{
+  assert(m_cnt == 0); // reset() would destroy nullmask
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    m_cnt++;
+    if (desc(m_iter) == -1)
+      return -1;
+  }
+  if (finalize() == -1)
+    return -1;
+  return 0;
+}
+
+int
+NdbPack::Data::copy(const DataC& d2)
+{
+  reset();
+  Iter r2(d2);
+  const Uint32 cnt2 = d2.m_cnt;
+  for (Uint32 i = 0; i < cnt2; i++)
+  {
+    if (d2.desc(r2) == -1)
+      return -1;
+    Uint32 len_out = ~(Uint32)0;
+    if (r2.m_itemLen != 0)
+    {
+      if (add(&d2.m_buf[r2.m_itemPos], &len_out) == -1)
+          return -1;
+      assert(len_out == r2.m_itemLen);
+    }
+    else
+    {
+      if (add_null(&len_out) == -1)
+        return -1;
+      assert(len_out ==0);
+    }
+  }
+  if (finalize() == -1)
+    return -1;
+  return 0;
+}
+
+int
+NdbPack::Data::convert_impl(Endian::Value to_endian)
+{
+  const Spec& spec = m_spec;
+  Iter r(*this);
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    if (DataC::desc(r) == -1)
+    {
+      set_error(r);
+      return -1;
+    }
+    const Type& type = spec.m_buf[i];
+    const Uint32 typeId = type.m_typeId;
+    const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+    if (info.m_convert)
+    {
+      Uint8* ptr = &m_buf[m_varBytes + r.m_itemPos];
+      Uint32 len = r.m_itemLen;
+      Endian::convert(ptr, len);
+    }
+  }
+  return 0;
+}
+
+// NdbPack::BoundC
+
+int
+NdbPack::BoundC::finalize(int side)
+{
+  if (m_data.m_cnt == 0 && side != 0)
+  {
+    set_error(BoundEmptySide, __LINE__);
+    return -1;
+  }
+  if (m_data.m_cnt != 0 && side != -1 && side != +1)
+  {
+    set_error(BoundNonemptySide, __LINE__);
+    return -1;
+  }
+  m_side = side;
+  return 0;
+}
+
+int
+NdbPack::BoundC::cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const
+{
+  const BoundC& b1 = *this;
+  const DataC& d1 = b1.m_data;
+  const DataC& d2 = b2.m_data;
+  int res = d1.cmp(d2, cnt, num_eq);
+  if (res == 0)
+  {
+    if (cnt < d1.m_cnt && cnt < d2.m_cnt)
+      ;
+    else if (d1.m_cnt < d2.m_cnt)
+      res = (+1) * b1.m_side;
+    else if (d1.m_cnt > d2.m_cnt)
+      res = (-1) * b2.m_side;
+    else if (b1.m_side < b2.m_side)
+      res = -1;
+    else if (b1.m_side > b2.m_side)
+      res = +1;
+  }
+  return res;
+}
+
+// NdbPack::Bound
+
+// print
+
+NdbPack::Print::Print(char* buf, Uint32 bufsz) :
+  m_buf(buf), m_bufsz(bufsz), m_sz(0) {}
+
+void
+NdbPack::Print::print(const char* fmt, ...)
+{
+  va_list ap;
+  va_start(ap, fmt);
+  if (m_bufsz > m_sz)
+  {
+    vsnprintf(&m_buf[m_sz], m_bufsz - m_sz, fmt, ap);
+    m_sz += strlen(&m_buf[m_sz]);
+  }
+  va_end(ap);
+}
+
+// print Type
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Type& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::Type::print(NdbOut& out) const
+{
+  char buf[200];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Type::print(char* buf, Uint32 bufsz) const
+{
+  Print p(buf, bufsz);
+  p.print("typeId:%u", m_typeId);
+  p.print(" byteSize:%u", m_byteSize);
+  p.print(" nullable:%u", m_nullable);
+  p.print(" csNumber:%u", m_csNumber);
+  return buf;
+}
+
+// print Spec
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Spec& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::Spec::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Spec::print(char* buf, Uint32 bufsz) const
+{
+  Print p(buf, bufsz);
+  p.print("cnt:%u", m_cnt);
+  p.print(" nullableCnt:%u", m_nullableCnt);
+  p.print(" varsizeCnt:%u", m_varsizeCnt);
+  p.print(" nullmaskLen:%u", get_nullmask_len(false));
+  p.print(" maxByteSize:%u", m_maxByteSize);
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    const Type& type = m_buf[i];
+    p.print(" [%u", i);
+    p.print(" typeId:%u", type.m_typeId);
+    p.print(" nullable:%u", type.m_nullable);
+    p.print(" byteSize:%u", type.m_byteSize);
+    p.print(" csNumber:%u", type.m_csNumber);
+    p.print("]");
+  }
+  return buf;
+}
+
+// print DataC
+
+bool g_ndb_pack_print_hex_always = false;
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::DataC& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::DataC::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::DataC::print(char* buf, Uint32 bufsz, bool convert_flag) const
+{
+  Print p(buf, bufsz);
+  const Spec& spec = m_spec;
+  const Uint32 nullmask_len = spec.get_nullmask_len(m_allNullable);
+  if (nullmask_len != 0)
+  {
+    p.print("nullmask:");
+    for (Uint32 i = 0; i < nullmask_len; i++)
+    {
+      int x = m_buf[i];
+      p.print("%02x", x);
+    }
+  }
+  Iter r(*this);
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    desc(r);
+    const Uint8* value = &m_buf[r.m_itemPos];
+    p.print(" [%u", i);
+    p.print(" pos:%u", r.m_itemPos);
+    p.print(" len:%u", r.m_itemLen);
+    if (r.m_itemLen > 0)
+    {
+      p.print(" value:");
+      // some specific types for debugging
+      const Type& type = spec.m_buf[i];
+      bool ok = true;
+      switch (type.m_typeId) {
+      case NDB_TYPE_TINYINT:
+        {
+          Int8 x;
+          memcpy(&x, value, 1);
+          if (convert_flag)
+            Endian::convert(&x, 1);
+          p.print("%d", (int)x);
+        }
+        break;
+      case NDB_TYPE_TINYUNSIGNED:
+        {
+          Uint8 x;
+          memcpy(&x, value, 1);
+          if (convert_flag)
+            Endian::convert(&x, 1);
+          p.print("%u", (uint)x);
+        }
+        break;
+      case NDB_TYPE_SMALLINT:
+        {
+          Int16 x;
+          memcpy(&x, value, 2);
+          if (convert_flag)
+            Endian::convert(&x, 2);
+          p.print("%d", (int)x);
+        }
+        break;
+      case NDB_TYPE_SMALLUNSIGNED:
+        {
+          Uint16 x;
+          memcpy(&x, value, 2);
+          if (convert_flag)
+            Endian::convert(&x, 2);
+          p.print("%u", (uint)x);
+        }
+        break;
+      case NDB_TYPE_INT:
+        {
+          Int32 x;
+          memcpy(&x, value, 4);
+          if (convert_flag)
+            Endian::convert(&x, 4);
+          p.print("%d", (int)x);
+        }
+        break;
+      case NDB_TYPE_UNSIGNED:
+        {
+          Uint32 x;
+          memcpy(&x, value, 4);
+          if (convert_flag)
+            Endian::convert(&x, 4);
+          p.print("%u", (uint)x);
+        }
+        break;
+      case NDB_TYPE_FLOAT:
+        {
+          float x;
+          memcpy(&x, value, 4);
+          if (convert_flag)
+            Endian::convert(&x, 4);
+          p.print("%g", (double)x);
+        }
+        break;
+      case NDB_TYPE_DOUBLE:
+        {
+          double x;
+          memcpy(&x, value, 8);
+          if (convert_flag)
+            Endian::convert(&x, 8);
+          p.print("%g", x);
+        }
+        break;
+      case NDB_TYPE_CHAR:
+      case NDB_TYPE_VARCHAR:
+      case NDB_TYPE_LONGVARCHAR:
+        {
+          const Uint32 off = type.m_arrayType;
+          for (Uint32 j = 0; j < r.m_bareLen; j++)
+          {
+            Uint8 x = value[off + j];
+            p.print("%c", (int)x);
+          }
+        }
+        break;
+      default:
+        ok = false;
+        break;
+      }
+      if (!ok || g_ndb_pack_print_hex_always)
+      {
+        p.print("<");
+        for (Uint32 j = 0; j < r.m_itemLen; j++)
+        {
+          int x = value[j];
+          p.print("%02x", x);
+        }
+        p.print(">");
+      }
+    }
+    p.print("]");
+  }
+  return buf;
+}
+
+// print Data
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Data& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::Data::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Data::print(char* buf, Uint32 bufsz) const
+{
+  Print p(buf, bufsz);
+  char* ptr = buf;
+  if (m_varBytes != 0)
+  {
+    p.print("varBytes:");
+    for (Uint32 i = 0; i < m_varBytes; i++)
+    {
+      int r = m_buf[i];
+      p.print("%02x", r);
+    }
+    p.print(" ");
+  }
+  p.print("dataLen:%u", m_iter.m_itemPos + m_iter.m_itemLen);
+  p.print(" ");
+  const bool convert_flag =
+    m_endian != Endian::Native &&
+    m_endian != Endian::get_endian();
+  DataC::print(&buf[p.m_sz], bufsz - p.m_sz, convert_flag);
+  return buf;
+}
+
+// print BoundC
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::BoundC& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::BoundC::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::BoundC::print(char* buf, Uint32 bufsz) const
+{
+  Print p(buf, bufsz);
+  p.print("side:%s ", m_side < 0 ? "-" : m_side > 0 ? "+" : "0");
+  m_data.print(&buf[p.m_sz], bufsz - p.m_sz);
+  return buf;
+}
+
+// print Bound
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Bound& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::Bound::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Bound::print(char* buf, Uint32 bufsz) const
+{
+  BoundC::print(buf, bufsz);
+  return buf;
+}
+
+// validate
+
+int
+NdbPack::Type::validate() const
+{
+  Type type2 = *this;
+  if (type2.complete() == -1)
+  {
+    set_error(type2);
+    return -1;
+  }
+  if (memcmp(this, &type2, sizeof(Type)) != 0)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::Spec::validate() const
+{
+  Uint32 nullableCnt = 0;
+  Uint32 varsizeCnt = 0;
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    const Type& type = m_buf[i];
+    if (type.validate() == -1)
+    {
+      set_error(type);
+      return -1;
+    }
+    if (type.m_nullable)
+      nullableCnt++;
+    if (type.m_arrayType != 0)
+      varsizeCnt++;
+  }
+  if (m_nullableCnt != nullableCnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (m_varsizeCnt != varsizeCnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::Data::validate() const
+{
+  if (DataC::validate() == -1)
+    return -1;
+  const Iter& r = m_iter;
+  if (r.m_cnt != m_cnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  Iter r2(*this);
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    if (desc(r2) == -1)
+      return -1;
+  }
+  if (r.m_itemPos != r2.m_itemPos)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (r.m_cnt != r2.m_cnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (r.m_nullCnt != r2.m_nullCnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (r.m_itemLen != r2.m_itemLen)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::BoundC::validate() const
+{
+  if (m_data.validate() == -1)
+  {
+    set_error(m_data);
+    return -1;
+  }
+  if (m_data.m_cnt == 0 && m_side != 0)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (m_data.m_cnt != 0 && m_side != -1 && m_side != +1)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::Bound::validate() const
+{
+  if (BoundC::validate() == -1)
+    return -1;
+  if (m_data.validate() == -1)
+  {
+    set_error(m_data);
+    return -1;
+  }
+  return 0;
+}
+
+#ifdef TEST_NDB_PACK
+#include <util/NdbTap.hpp>
+
+#define chk1(x) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; require(false); } while (0)
+
+#define chk2(x, e) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; ndbout << "NdbPack code: " << (e).get_error_code() << " line: " << (e).get_error_line() << endl; require(false); } while (0)
+
+#define ll0(x) do { if (verbose < 0) break; ndbout << "0- " << x << endl; } while (0)
+#define ll1(x) do { if (verbose < 1) break; ndbout << "1- " << x << endl; } while (0)
+#define ll2(x) do { if (verbose < 2) break; ndbout << "2- " << x << endl; } while (0)
+#define ll3(x) do { if (verbose < 3) break; ndbout << "3- " << x << endl; } while (0)
+
+#define xmin(a, b) ((a) < (b) ? (a) : (b))
+
+#include <random.h>
+
+static uint // random 0..n-1
+getrandom(uint n)
+{
+  if (n != 0) {
+    uint k = random();
+    return k % n;
+  }
+  return 0;
+}
+
+static uint // random 0..n-1 biased exponentially to smaller
+getrandom(uint n, uint bias)
+{
+  assert(bias != 0);
+  uint k = getrandom(n);
+  bias--;
+  while (bias != 0) {
+    k = getrandom(k + 1);
+    bias--;
+  }
+  return k;
+}
+
+static bool
+getrandompct(uint pct)
+{
+  return getrandom(100) < pct;
+}
+
+// change in TAPTEST
+static int seed = -1; // random
+static int loops = 0;
+static int spec_cnt = -1; // random
+static int fix_type = 0; // all types
+static int no_nullable = 0;
+static int data_cnt = -1; // Max
+static int bound_cnt = -1; // Max
+static int verbose = 0;
+
+struct Tspec {
+  enum { Max = 100 };
+  enum { MaxBuf = Max * 4000 };
+  NdbPack::Spec m_spec;
+  NdbPack::Type m_type[Max];
+  Tspec() {
+    m_spec.set_buf(m_type, Max);
+  }
+  void create();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tspec& tspec)
+{
+  out << tspec.m_spec;
+  return out;
+}
+
+void
+Tspec::create()
+{
+  m_spec.reset();
+  int cnt = spec_cnt == -1 ? 1 + getrandom(Tspec::Max, 3) : spec_cnt;
+  int i = 0;
+  while (i < cnt) {
+    int typeId = fix_type;
+    if (typeId == 0)
+      typeId = getrandom(g_ndb_pack_type_info_cnt);
+    const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+    switch (typeId) {
+    case NDB_TYPE_INT:
+    case NDB_TYPE_UNSIGNED:
+    case NDB_TYPE_CHAR:
+    case NDB_TYPE_VARCHAR:
+    case NDB_TYPE_LONGVARCHAR:
+      break;
+    default:
+      continue;
+    }
+    require(info.m_supported);
+    int byteSize = 0;
+    if (info.m_fixSize != 0)
+      byteSize = info.m_fixSize;
+    else if (info.m_arrayType == 0)
+      byteSize = 1 + getrandom(128, 1);  // char(1-128)
+    else if (info.m_arrayType == 1)
+      byteSize = 1 + getrandom(256, 2);  // varchar(0-255)
+    else if (info.m_arrayType == 2)
+      byteSize = 2 + getrandom(1024, 3); // longvarchar(0-1023)
+    else
+      require(false);
+    bool nullable = no_nullable ? false : getrandompct(50);
+    int csNumber = 0;
+    if (info.m_charType) {
+      csNumber = 8; // should include ascii
+    }
+    NdbPack::Type type(typeId, byteSize, nullable, csNumber);
+    chk2(m_spec.add(type) == 0, m_spec);
+    i++;
+  }
+  chk2(m_spec.validate() == 0, m_spec);
+}
+
+struct Tdata {
+  const Tspec& m_tspec;
+  NdbPack::Data m_data;
+  const bool m_isBound;
+  int m_cnt;
+  Uint8* m_xbuf;        // unpacked
+  int m_xsize;
+  int m_xoff[Tspec::Max];
+  int m_xlen[Tspec::Max];
+  bool m_xnull[Tspec::Max];
+  int m_xnulls;
+  Uint32* m_poaiBuf;    // plain old attr info
+  int m_poaiSize;
+  Uint8* m_packBuf;     // packed
+  int m_packLen;
+  Tdata(Tspec& tspec, bool isBound, uint varBytes) :
+    m_tspec(tspec),
+    m_data(tspec.m_spec, isBound, varBytes),
+    m_isBound(isBound)
+  {
+    m_cnt = tspec.m_spec.get_cnt();
+    m_xbuf = 0;
+    m_poaiBuf = 0;
+    m_packBuf = 0;
+  }
+  ~Tdata() {
+    delete [] m_xbuf;
+    delete [] m_poaiBuf;
+    delete [] m_packBuf;
+  }
+  void create();
+  void add();
+  void finalize();
+  // compare using unpacked data
+  int xcmp(const Tdata& tdata2, int* num_eq) const;
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tdata& tdata)
+{
+  out << tdata.m_data;
+  return out;
+}
+
+void
+Tdata::create()
+{
+  union {
+    Uint8 xbuf[Tspec::MaxBuf];
+    Uint64 xbuf_align;
+  };
+  memset(xbuf, 0x3f, sizeof(xbuf));
+  m_xsize = 0;
+  m_xnulls = 0;
+  Uint32 poaiBuf[Tspec::MaxBuf / 4];
+  memset(poaiBuf, 0x5f, sizeof(poaiBuf));
+  m_poaiSize = 0;
+  m_packLen = m_data.get_var_bytes();
+  m_packLen += (m_tspec.m_spec.get_nullable_cnt(m_isBound) + 7) / 8;
+  int i = 0, j;
+  while (i < m_cnt) {
+    const NdbPack::Type& type = m_tspec.m_spec.get_type(i);
+    const int typeId = type.get_type_id();
+    const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+    m_xnull[i] = type.get_nullable() && getrandompct(25);
+    m_xnull[i] = false;
+    if (type.get_nullable() || m_isBound)
+      m_xnull[i] = getrandompct(20);
+    int pad = 0; // null-char pad not counted in xlen
+    if (!m_xnull[i]) {
+      m_xoff[i] = m_xsize;
+      Uint8* xptr = &xbuf[m_xsize];
+      switch (typeId) {
+      case NDB_TYPE_INT:
+        {
+          Int32 x = getrandom(10);
+          if (getrandompct(50))
+            x = (-1) * x;
+          memcpy(xptr, &x, 4);
+          m_xlen[i] = info.m_fixSize;
+        }
+        break;
+      case NDB_TYPE_UNSIGNED:
+        {
+          Uint32 x = getrandom(10);
+          memcpy(xptr, &x, 4);
+          m_xlen[i] = info.m_fixSize;
+        }
+        break;
+      case NDB_TYPE_CHAR:
+        {
+          require(type.get_byte_size() >= 1);
+          uint max_len = type.get_byte_size();
+          uint len = getrandom(max_len + 1, 1);
+          for (j = 0; j < len; j++)
+          {
+            xptr[j] = 'a' + getrandom(3);
+          }
+          for (j = len; j < max_len; j++)
+          {
+            xptr[j] = 0x20;
+          }
+          m_xlen[i] = max_len;
+          xptr[max_len] = 0;
+          pad = 1;
+        }
+        break;
+      case NDB_TYPE_VARCHAR:
+        {
+          require(type.get_byte_size() >= 1);
+          uint max_len = type.get_byte_size() - 1;
+          uint len = getrandom(max_len, 2);
+          require(len < 256);
+          xptr[0] = len;
+          for (j = 0; j < len; j++)
+          {
+            xptr[1 + j] = 'a' + getrandom(3);
+          }
+          m_xlen[i] = 1 + len;
+          xptr[1 + len] = 0;
+          pad = 1;
+        }
+        break;
+      case NDB_TYPE_LONGVARCHAR:
+        {
+          require(type.get_byte_size() >= 2);
+          uint max_len = type.get_byte_size() - 2;
+          uint len = getrandom(max_len, 3);
+          require(len < 256 * 256);
+          xptr[0] = (len & 0xFF);
+          xptr[1] = (len >> 8);
+          for (j = 0; j < len; j++)
+          {
+            xptr[2 + j] = 'a' + getrandom(3);
+          }
+          m_xlen[i] = 2 + len;
+          xptr[2 + len] = 0;
+          pad = 1;
+        }
+        break;
+      default:
+        require(false);
+        break;
+      }
+      m_xsize += m_xlen[i] + pad;
+      while (m_xsize % 8 != 0)
+        m_xsize++;
+      m_packLen += m_xlen[i];
+    } else {
+      m_xoff[i] = -1;
+      m_xlen[i] = 0;
+      m_xnulls++;
+    }
+    require(m_xnull[i] == (m_xoff[i] == -1));
+    require(m_xnull[i] == (m_xlen[i] == 0));
+    AttributeHeader* ah = (AttributeHeader*)&poaiBuf[m_poaiSize];
+    ah->setAttributeId(i); // not used
+    ah->setByteSize(m_xlen[i]);
+    m_poaiSize++;
+    if (!m_xnull[i]) {
+      memcpy(&poaiBuf[m_poaiSize], &xbuf[m_xoff[i]], m_xlen[i]);
+      m_poaiSize += (m_xlen[i] + 3) / 4;
+    }
+    i++;
+  }
+  require(m_xsize % 8 == 0);
+  m_xbuf = (Uint8*) new Uint64 [m_xsize / 8];
+  memcpy(m_xbuf, xbuf, m_xsize);
+  m_poaiBuf = (Uint32*) new Uint32 [m_poaiSize];
+  memcpy(m_poaiBuf, poaiBuf, m_poaiSize << 2);
+}
+
+void
+Tdata::add()
+{
+  m_packBuf = new Uint8 [m_packLen];
+  m_data.set_buf(m_packBuf, m_packLen);
+  int i, j;
+  j = 0;
+  while (j <= 1) {
+    if (j == 1)
+      m_data.reset();
+    i = 0;
+    while (i < m_cnt) {
+      Uint32 xlen = ~(Uint32)0;
+      if (!m_xnull[i]) {
+        int xoff = m_xoff[i];
+        const Uint8* xptr = &m_xbuf[xoff];
+        chk2(m_data.add(xptr, &xlen) == 0, m_data);
+        chk1(xlen == m_xlen[i]);
+      } else {
+        chk2(m_data.add_null(&xlen) == 0, m_data);
+        chk1(xlen == 0);
+      }
+      i++;
+    }
+    chk2(m_data.validate() == 0, m_data);
+    chk1(m_data.get_null_cnt() == m_xnulls);
+    j++;
+  }
+}
+
+void
+Tdata::finalize()
+{
+  chk2(m_data.finalize() == 0, m_data);
+  ll3("create: " << m_data);
+  chk1(m_data.get_full_len() == m_packLen);
+  {
+    const Uint8* p = (const Uint8*)m_data.get_full_buf();
+    chk1(p[0] + (p[1] << 8) == m_packLen - 2);
+  }
+}
+
+int
+Tdata::xcmp(const Tdata& tdata2, int* num_eq) const
+{
+  const Tdata& tdata1 = *this;
+  require(&tdata1.m_tspec == &tdata2.m_tspec);
+  const Tspec& tspec = tdata1.m_tspec;
+  int res = 0;
+  int cnt = xmin(tdata1.m_cnt, tdata2.m_cnt);
+  int i;
+  for (i = 0; i < cnt; i++) {
+    if (!tdata1.m_xnull[i]) {
+      if (!tdata2.m_xnull[i]) {
+        // the pointers are Uint64-aligned
+        const Uint8* xptr1 = &tdata1.m_xbuf[tdata1.m_xoff[i]];
+        const Uint8* xptr2 = &tdata2.m_xbuf[tdata2.m_xoff[i]];
+        const int xlen1 = tdata1.m_xlen[i];
+        const int xlen2 = tdata2.m_xlen[i];
+        const NdbPack::Type& type = tspec.m_spec.get_type(i);
+        const int typeId = type.get_type_id();
+        const int csNumber = type.get_cs_number();
+        CHARSET_INFO* cs = all_charsets[csNumber];
+        switch (typeId) {
+        case NDB_TYPE_INT:
+          {
+            require(cs == 0);
+            Int32 x1 = *(const Int32*)xptr1;
+            Int32 x2 = *(const Int32*)xptr2;
+            if (x1 < x2)
+              res = -1;
+            else if (x1 > x2)
+              res = +1;
+            ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
+          }
+          break;
+        case NDB_TYPE_UNSIGNED:
+          {
+            require(cs == 0);
+            Uint32 x1 = *(const Uint32*)xptr1;
+            Uint32 x2 = *(const Uint32*)xptr2;
+            if (x1 < x2)
+              res = -1;
+            else if (x1 > x2)
+              res = +1;
+            ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
+          }
+          break;
+        case NDB_TYPE_CHAR:
+          {
+            require(cs != 0 && cs->coll != 0);
+            const uint n1 = xlen1;
+            const uint n2 = xlen2;
+            const uchar* t1 = &xptr1[0];
+            const uchar* t2 = &xptr2[0];
+            const char* s1 = (const char*)t1;
+            const char* s2 = (const char*)t2;
+            chk1(n1 == strlen(s1));
+            chk1(n2 == strlen(s2));
+            res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+            ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+          }
+          break;
+        case NDB_TYPE_VARCHAR:
+          {
+            require(cs != 0 && cs->coll != 0);
+            const uint n1 = xptr1[0];
+            const uint n2 = xptr2[0];
+            const uchar* t1 = &xptr1[1];
+            const uchar* t2 = &xptr2[1];
+            const char* s1 = (const char*)t1;
+            const char* s2 = (const char*)t2;
+            chk1(n1 == strlen(s1));
+            chk1(n2 == strlen(s2));
+            res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+            ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+          }
+          break;
+        case NDB_TYPE_LONGVARCHAR:
+          {
+            require(cs != 0 && cs->coll != 0);
+            const uint n1 = xptr1[0] | (xptr1[1] << 8);
+            const uint n2 = xptr2[0] | (xptr2[1] << 8);
+            const uchar* t1 = &xptr1[2];
+            const uchar* t2 = &xptr2[2];
+            const char* s1 = (const char*)t1;
+            const char* s2 = (const char*)t2;
+            chk1(n1 == strlen(s1));
+            chk1(n2 == strlen(s2));
+            res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+            ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+          }
+          break;
+        default:
+          require(false);
+          break;
+        }
+      } else
+        res = +1;
+    } else if (!tdata2.m_xnull[i])
+      res = -1;
+    if (res != 0)
+      break;
+  }
+  *num_eq = i;
+  ll3("xcmp res:" << res << " num_eq:" << *num_eq);
+  return res;
+}
+
+struct Tbound {
+  Tdata& m_tdata;
+  NdbPack::Bound m_bound;
+  Tbound(Tdata& tdata) :
+    m_tdata(tdata),
+    m_bound(tdata.m_data)
+  {
+    m_tdata.m_cnt = 1 + getrandom(m_tdata.m_cnt);
+  }
+  void create();
+  void add();
+  void finalize();
+  int xcmp(const Tdata& tdata2, int* num_eq) const;
+  int xcmp(const Tbound& tbound2, int* num_eq) const;
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tbound& tbound)
+{
+  out << tbound.m_bound;
+  return out;
+}
+
+void
+Tbound::create()
+{
+  m_tdata.create();
+}
+
+void
+Tbound::add()
+{
+  m_tdata.add();
+}
+
+void
+Tbound::finalize()
+{
+  int side = getrandompct(50) ? -1 : +1;
+  chk2(m_bound.finalize(side) == 0, m_bound);
+  chk2(m_bound.validate() == 0, m_bound);
+  chk1(m_tdata.m_data.get_full_len() == m_tdata.m_packLen);
+}
+
+int
+Tbound::xcmp(const Tdata& tdata2, int* num_eq) const
+{
+  const Tbound& tbound1 = *this;
+  const Tdata& tdata1 = tbound1.m_tdata;
+  require(tdata1.m_cnt <= tdata2.m_cnt);
+  *num_eq = -1;
+  int res = tdata1.xcmp(tdata2, num_eq);
+  if (res == 0) {
+    chk1(*num_eq == tdata1.m_cnt);
+    res = m_bound.get_side();
+  }
+  return res;
+}
+
+int
+Tbound::xcmp(const Tbound& tbound2, int* num_eq) const
+{
+  const Tbound& tbound1 = *this;
+  const Tdata& tdata1 = tbound1.m_tdata;
+  const Tdata& tdata2 = tbound2.m_tdata;
+  *num_eq = -1;
+  int res = tdata1.xcmp(tdata2, num_eq);
+  chk1(0 <= *num_eq && *num_eq <= xmin(tdata1.m_cnt, tdata2.m_cnt));
+  if (res == 0) {
+    chk1(*num_eq == xmin(tdata1.m_cnt, tdata2.m_cnt));
+    if (tdata1.m_cnt < tdata2.m_cnt)
+      res = (+1) * tbound1.m_bound.get_side();
+    else if (tdata1.m_cnt > tdata2.m_cnt)
+      res = (-1) * tbound2.m_bound.get_side();
+    else if (tbound1.m_bound.get_side() < tbound2.m_bound.get_side())
+      res = -1;
+    else if (tbound1.m_bound.get_side() > tbound2.m_bound.get_side())
+      res = +1;
+  }
+  return res;
+}
+
+struct Tdatalist {
+  enum { Max = 1000 };
+  Tdata* m_tdata[Max];
+  int m_cnt;
+  Tdatalist(Tspec& tspec) {
+    m_cnt = data_cnt == -1 ? Max : data_cnt;
+    int i;
+    for (i = 0; i < m_cnt; i++) {
+      m_tdata[i] = new Tdata(tspec, false, 2);
+    }
+  }
+  ~Tdatalist() {
+    int i;
+    for (i = 0; i < m_cnt; i++) {
+      delete m_tdata[i];
+    }
+  }
+  void create();
+  void sort();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tdatalist& tdatalist)
+{
+  int i;
+  for (i = 0; i < tdatalist.m_cnt; i++) {
+    out << "data " << i << ": " << *tdatalist.m_tdata[i];
+    if (i + 1 < tdatalist.m_cnt)
+      out << endl;
+  }
+  return out;
+}
+
+void
+Tdatalist::create()
+{
+  int i;
+  for (i = 0; i < m_cnt; i++) {
+    Tdata& tdata = *m_tdata[i];
+    tdata.create();
+    tdata.add();
+    tdata.finalize();
+  }
+}
+
+static int
+data_cmp(const void* a1, const void* a2)
+{
+  const Tdata& tdata1 = **(const Tdata**)a1;
+  const Tdata& tdata2 = **(const Tdata**)a2;
+  require(tdata1.m_cnt == tdata2.m_cnt);
+  const Uint32 cnt = tdata1.m_cnt;
+  Uint32 num_eq = ~(Uint32)0;
+  int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq);
+  require(num_eq <= (Uint32)tdata1.m_cnt);
+  require(num_eq <= (Uint32)tdata2.m_cnt);
+  return res;
+}
+
+void
+Tdatalist::sort()
+{
+  ll1("data sort: in");
+  ll3(endl << *this);
+  qsort(m_tdata, m_cnt, sizeof(Tdata*), data_cmp);
+  ll1("data sort: out");
+  ll3(endl << *this);
+  int i;
+  for (i = 0; i + 1 < m_cnt; i++) {
+    const Tdata& tdata1 = *m_tdata[i];
+    const Tdata& tdata2 = *m_tdata[i + 1];
+    require(tdata1.m_cnt == tdata2.m_cnt);
+    const Uint32 cnt = tdata1.m_cnt;
+    Uint32 num_eq1 = ~(Uint32)0;
+    int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq1);
+    chk1(res <= 0);
+    // also via unpacked data
+    int num_eq2 = -1;
+    int res2 = tdata1.xcmp(tdata2, &num_eq2);
+    if (res < 0)
+      chk1(res2 < 0);
+    else if (res == 0)
+      chk1(res2 == 0);
+    else
+      chk1(res2 > 0);
+    chk1(num_eq1 == (Uint32)num_eq2);
+  }
+}
+
+struct Tboundlist {
+  enum { Max = 1000 };
+  Tbound* m_tbound[Max];
+  int m_cnt;
+  Tboundlist(Tspec& tspec) {
+    m_cnt = bound_cnt == -1 ? Max : bound_cnt;
+    int i;
+    for (i = 0; i < m_cnt; i++) {
+      Tdata* tdata = new Tdata(tspec, true, 0);
+      m_tbound[i] = new Tbound(*tdata);
+    }
+  }
+  ~Tboundlist() {
+    int i;
+    for (i = 0; i < m_cnt; i++) {
+      Tdata* tdata = &m_tbound[i]->m_tdata;
+      delete m_tbound[i];
+      delete tdata;
+    }
+  }
+  void create();
+  void sort();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tboundlist& tboundlist)
+{
+  int i;
+  for (i = 0; i < tboundlist.m_cnt; i++) {
+    out << "bound " << i << ": " << *tboundlist.m_tbound[i];
+    if (i + 1 < tboundlist.m_cnt)
+      out << endl;
+  }
+  return out;
+}
+
+void
+Tboundlist::create()
+{
+  int i;
+  for (i = 0; i < m_cnt; i++) {
+    Tbound& tbound = *m_tbound[i];
+    tbound.create();
+    tbound.add();
+    tbound.finalize();
+  }
+}
+
+static int
+bound_cmp(const void* a1, const void* a2)
+{
+  const Tbound& tbound1 = **(const Tbound**)a1;
+  const Tbound& tbound2 = **(const Tbound**)a2;
+  const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
+  Uint32 num_eq = ~(Uint32)0;
+  int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq);
+  require(num_eq <= cnt);
+  require(num_eq <= cnt);
+  return res;
+}
+
+void
+Tboundlist::sort()
+{
+  ll1("bound sort: in");
+  ll3(endl << *this);
+  qsort(m_tbound, m_cnt, sizeof(Tbound*), bound_cmp);
+  ll1("bound sort: out");
+  ll3(endl << *this);
+  int i;
+  for (i = 0; i + 1 < m_cnt; i++) {
+    const Tbound& tbound1 = *m_tbound[i];
+    const Tbound& tbound2 = *m_tbound[i + 1];
+    const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
+    Uint32 num_eq1 = ~(Uint32)0;
+    int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq1);
+    chk1(res <= 0);
+    // also via unpacked data
+    int num_eq2 = -1;
+    int res2 = tbound1.xcmp(tbound2, &num_eq2);
+    if (res < 0)
+      chk1(res2 < 0);
+    else if (res == 0)
+      chk1(res2 == 0);
+    else
+      chk1(res2 > 0);
+    chk1(num_eq1 == (Uint32)num_eq2);
+  }
+}
+
+static void
+testdesc(const Tdata& tdata)
+{
+  ll3("testdesc: " << tdata);
+  const Tspec& tspec = tdata.m_tspec;
+  const NdbPack::Data& data = tdata.m_data;
+  const Uint8* buf_old = (const Uint8*)data.get_full_buf();
+  const Uint32 varBytes = data.get_var_bytes();
+  const Uint32 nullMaskLen = tspec.m_spec.get_nullmask_len(false);
+  const Uint32 dataLen = data.get_data_len();
+  const Uint32 fullLen = data.get_full_len();
+  const Uint32 cnt = data.get_cnt();
+  chk1(fullLen == varBytes + dataLen);
+  NdbPack::Data data_new(tspec.m_spec, false, varBytes);
+  Uint8 buf_new[Tspec::MaxBuf];
+  data_new.set_buf(buf_new, sizeof(buf_new));
+  memcpy(buf_new, buf_old, fullLen);
+  chk2(data_new.desc_all(cnt) == 0, data_new);
+  chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
+  chk1(data_new.get_data_len() == data.get_data_len());
+  chk1(data_new.get_cnt() == data.get_cnt());
+  chk1(data_new.get_null_cnt() == data.get_null_cnt());
+}
+
+static void
+testcopy(const Tdata& tdata)
+{
+  ll3("testcopy: " << tdata);
+  const Tspec& tspec = tdata.m_tspec;
+  const NdbPack::Data& data = tdata.m_data;
+  int n = getrandom(tdata.m_cnt + 1);
+  do {
+    ll3("testcopy: cnt:" << tdata.m_cnt << " n:" << n);
+    NdbPack::DataC data_old(tspec.m_spec, false);
+    data_old.set_buf(data.get_data_buf(), data.get_data_len(), n);
+    chk1(data_old.get_cnt() == n);
+    NdbPack::Data data_new(tspec.m_spec, false, 0);
+    Uint8 buf_new[Tspec::MaxBuf];
+    data_new.set_buf(buf_new, sizeof(buf_new));
+    chk2(data_new.copy(data_old) == 0, data_new);
+    chk1(data_new.get_cnt() == n);
+    Uint32 num_eq1 = ~(Uint32)0;
+    chk1(data_new.cmp(data_old, n, num_eq1) == 0);
+    chk1(num_eq1 == n);
+    Uint32 num_eq2 = ~(Uint32)0;
+    chk1(data_old.cmp(data_new, n, num_eq2) == 0);
+    chk1(num_eq2 == n);
+    n = getrandom(n);
+  } while (n != 0);
+}
+
+static void
+testpoai(const Tdata& tdata)
+{
+  ll3("testpoai: " << tdata);
+  const Tspec& tspec = tdata.m_tspec;
+  const NdbPack::Data& data = tdata.m_data;
+  NdbPack::Data data_new(tspec.m_spec, false, data.get_var_bytes());
+  Uint8 buf_new[Tspec::MaxBuf];
+  data_new.set_buf(buf_new, sizeof(buf_new));
+  Uint32 poaiLen = ~(Uint32)0;
+  chk2(data_new.add_poai(tdata.m_poaiBuf, tdata.m_cnt, &poaiLen) == 0, data);
+  chk2(data_new.finalize() == 0, data_new);
+  chk2(data_new.validate() == 0, data_new);
+  chk1(tspec.m_spec.get_nullmask_len(false) + poaiLen == data.get_data_len());
+  chk1(data_new.get_full_len() == data.get_full_len());
+  chk1(memcmp(data_new.get_full_buf(), data.get_full_buf(), data.get_full_len()) == 0);
+  chk1(data_new.get_null_cnt() == data.get_null_cnt());
+}
+
+static void
+testconvert(const Tdata& tdata)
+{
+  ll3("testconvert: " << tdata);
+  const Tspec& tspec = tdata.m_tspec;
+  const NdbPack::Data& data = tdata.m_data;
+  NdbPack::Data data_new(tspec.m_spec, false, 2);
+  Uint8 buf_new[Tspec::MaxBuf];
+  data_new.set_buf(buf_new, sizeof(buf_new));
+  chk2(data_new.copy(data) == 0, data_new);
+  require(tdata.m_cnt == data.get_cnt());
+  require(data.get_cnt() == data_new.get_cnt());
+  const Uint32 cnt = tdata.m_cnt;
+  Uint32 num_eq;
+  switch (NdbPack::Endian::get_endian()) {
+  case NdbPack::Endian::Little:
+    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+    num_eq = ~(Uint32)0;
+    chk1(data.cmp(data_new, cnt, num_eq) == 0);
+    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+    num_eq = ~(Uint32)0;
+    chk1(data.cmp(data_new, cnt, num_eq) == 0);
+    break;
+  case NdbPack::Endian::Big:
+    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+    num_eq = ~(Uint32)0;
+    chk1(data.cmp(data_new, cnt, num_eq) == 0);
+    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+    num_eq = ~(Uint32)0;
+    chk1(data.cmp(data_new, cnt, num_eq) == 0);
+    break;
+  default:
+    require(false);
+    break;
+  }
+}
+
+static void
+testdata(const Tdatalist& tdatalist)
+{
+  int i;
+  for (i = 0; i < tdatalist.m_cnt; i++) {
+    const Tdata& tdata = *tdatalist.m_tdata[i];
+    testdesc(tdata);
+    testcopy(tdata);
+    testpoai(tdata);
+    testconvert(tdata);
+  }
+}
+
+static void
+testcmp(const Tbound& tbound, const Tdatalist& tdatalist, int* kb)
+{
+  ll3("testcmp: " << tbound);
+  int oldres = 0;
+  int n1 = 0;
+  int n2 = 0;
+  int i;
+  for (i = 0; i < tdatalist.m_cnt; i++) {
+    const Tdata& tdata = *tdatalist.m_tdata[i];
+    require(tbound.m_tdata.m_cnt == tbound.m_bound.get_data().get_cnt());
+    const Uint32 cnt = tbound.m_tdata.m_cnt;
+    Uint32 num_eq1 = ~(Uint32)0;
+    // reverse result for key vs bound
+    int res = (-1) * tbound.m_bound.cmp(tdata.m_data, cnt, num_eq1);
+    chk1(res != 0);
+    res = (res < 0 ? (n1++, -1) : (n2++, +1));
+    if (i > 0) {
+      // at some point flips from -1 to +1
+      chk1(oldres <= res);
+    }
+    oldres = res;
+    // also via unpacked data
+    int num_eq2 = -1;
+    int res2 = (-1) * tbound.xcmp(tdata, &num_eq2);
+    if (res < 0)
+      chk1(res2 < 0);
+    else
+      chk1(res2 > 0);
+    chk1(num_eq1 == (Uint32)num_eq2);
+  }
+  require(n1 + n2 == tdatalist.m_cnt);
+  ll2("keys before:" << n1 << " after:" << n2);
+  *kb = n1;
+}
+
+static void
+testcmp(const Tboundlist& tboundlist, const Tdatalist& tdatalist)
+{
+  int i;
+  int oldkb = 0;
+  for (i = 0; i < tboundlist.m_cnt; i++) {
+    const Tbound& tbound = *tboundlist.m_tbound[i];
+    int kb = 0;
+    testcmp(tbound, tdatalist, &kb);
+    if (i > 0) {
+      chk1(oldkb <= kb);
+    }
+    oldkb = kb;
+  }
+}
+
+static void
+testrun()
+{
+  Tspec tspec;
+  tspec.create();
+  ll1("spec: " << tspec);
+  Tdatalist tdatalist(tspec);
+  tdatalist.create();
+  tdatalist.sort();
+  testdata(tdatalist);
+  if (bound_cnt != 0) {
+    Tboundlist tboundlist(tspec);
+    tboundlist.create();
+    tboundlist.sort();
+    testcmp(tboundlist, tdatalist);
+  }
+}
+
+extern void NdbOut_Init();
+
+static int
+testmain()
+{
+  my_init();
+  NdbOut_Init();
+  signal(SIGABRT, SIG_DFL);
+  { const char* p = NdbEnv_GetEnv("TEST_NDB_PACK_VERBOSE", (char*)0, 0);
+    if (p != 0)
+      verbose = atoi(p);
+  }
+  if (seed == 0)
+    ll0("random seed: loop number");
+  else {
+    if (seed < 0)
+      seed = getpid();
+    ll0("random seed: " << seed);
+    srandom(seed);
+  }
+  loops = 100;
+  int i;
+  for (i = 0; loops == 0 || i < loops; i++) {
+    ll0("loop:" << i << "/" << loops);
+    if (seed == 0)
+      srandom(i);
+    testrun();
+  }
+  ndbout << "ok" << endl;
+  return 0;
+}
+
+TAPTEST(NdbPack)
+{
+  int ret = testmain();
+  return (ret == 0);
+}
+
+#endif


Attachment: [text/bzr-bundle] bzr/pekka@mysql.com-20110504094418-0ntwcu4728sv7n2l.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-wl4163 branch (pekka:4351) WL#4163Pekka Nousiainen4 May