#At file:///export/space/pekka/ms/ms-wl4163-70/ based on revid:pekka@stripped
4351 Pekka Nousiainen 2011-05-04
wl#4163 e01_pack.diff
pack ndb data according to spec
added:
storage/ndb/include/util/NdbPack.hpp
storage/ndb/src/common/util/NdbPack.cpp
modified:
storage/ndb/include/util/NdbSqlUtil.hpp
storage/ndb/src/common/util/CMakeLists.txt
storage/ndb/src/common/util/Makefile.am
=== added file 'storage/ndb/include/util/NdbPack.hpp'
--- a/storage/ndb/include/util/NdbPack.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/NdbPack.hpp 2011-05-04 09:44:18 +0000
@@ -0,0 +1,857 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_PACK_HPP
+#define NDB_PACK_HPP
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <kernel/AttributeHeader.hpp>
+#include <NdbSqlUtil.hpp>
+#include <NdbEnv.h>
+class NdbOut;
+
+/*
+ * Pack an array of NDB data values. The types are specified by an
+ * array of data types. There is no associated table or attribute ids.
+ * All or an initial sequence of the specified values are present.
+ *
+ * Currently used for ordered index keys and bounds in kernel (DBTUX)
+ * and in index statistics (mysqld). The comparison methods use the
+ * primitive type comparisons from NdbSqlUtil.
+ *
+ * Keys and bounds use same spec. However a value in an index bound can
+ * be NULL even if the key attribute is not nullable. Therefore bounds
+ * set the "allNullable" property and have a longer null mask.
+ *
+ * There are two distinct use occasions: 1) construction of data or
+ * bound 2) operating on previously constructed data or bound. There
+ * are classes Data/DataC and Bound/BoundC for these uses. The latter
+ * often can return a result without interpreting the full value.
+ *
+ * Methods return -1 on error and 0 on success. Comparison methods
+ * assume well-formed data and return negative, zero, positive for less,
+ * equal, greater.
+ */
+
+class NdbPack {
+public:
+ class Endian;
+ class Type;
+ class Spec;
+ class Iter;
+ class DataC;
+ class Data;
+ class BoundC;
+ class Bound;
+
+ /*
+ * Get SQL type.
+ */
+ static const NdbSqlUtil::Type& getSqlType(Uint32 typeId);
+
+ /*
+ * Error codes for core dumps.
+ */
+ class Error {
+ public:
+ enum {
+ TypeNotSet = -101, // type id was not set
+ TypeOutOfRange = -102, // type id is out of range
+ TypeNotSupported = -103, // blob (and for now bit) types
+ TypeSizeZero = -104, // max size was set to zero
+ TypeFixSizeInvalid = -105, // fixed size specified wrong
+ TypeNullableNotBool = -106, // nullable must be 0 or 1
+ CharsetNotSpecified = -107, // char type with no charset number
+ CharsetNotFound = -108, // cannot install in all_charsets[]
+ CharsetNotAllowed = -109, // non-char type with charset
+ SpecBufOverflow = -201, // more spec items than allocated
+ DataCntOverflow = -301, // more data items than in spec
+ DataBufOverflow = -302, // more data bytes than allocated
+ DataValueOverflow = -303, // var length exceeds max size
+ DataNotNullable = -304, // NULL value to not-nullable type
+ InvalidAttrInfo = -305, // invalid plain old attr info
+ BoundEmptySide = -401, // side not 0 for empty bound
+ BoundNonemptySide = -402, // side not -1,+1 for non-empty bound
+ InternalError = -901,
+ ValidationError = -902,
+ NoError = 0
+ };
+ Error();
+ ~Error() {}
+ int get_error_code() const;
+ int get_error_line() const;
+
+ private:
+ friend class Endian;
+ friend class Type;
+ friend class Spec;
+ friend class Iter;
+ friend class DataC;
+ friend class Data;
+ friend class BoundC;
+ friend class Bound;
+ void set_error(int code, int line) const;
+ void set_error(const Error& e2) const;
+ mutable int m_error_code;
+ mutable int m_error_line;
+ };
+
+ /*
+ * Endian definitions.
+ */
+ class Endian {
+ public:
+ enum Value {
+ Native = 0,
+ Little = 1,
+ Big = 2
+ };
+ static Value get_endian();
+ static void convert(void* ptr, Uint32 len);
+ };
+
+ /*
+ * Data type.
+ */
+ class Type : public Error {
+ public:
+ Type();
+ Type(int typeId, Uint32 byteSize, bool nullable, Uint32 csNumber);
+ ~Type() {}
+ /*
+ * Define the type. Size is fixed or max size. Values of variable
+ * length have length bytes. The definition is verified when the
+ * type is added to the specification. This also installs missing
+ * CHARSET_INFO* into all_charsets[].
+ */
+ void set(Uint32 typeId, Uint32 byteSize, bool nullable, Uint32 csNumber);
+ // getters
+ Uint32 get_type_id() const;
+ Uint32 get_byte_size() const;
+ bool get_nullable() const;
+ Uint32 get_cs_number() const;
+ Uint32 get_array_type() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const Type&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ friend class Spec;
+ friend class Iter;
+ friend class DataC;
+ friend class Data;
+ // verify and complete when added to specification
+ int complete();
+ Uint16 m_typeId;
+ Uint16 m_byteSize; // fixed or max size in bytes
+ Uint16 m_nullable;
+ Uint16 m_csNumber;
+ Uint16 m_arrayType; // 0,1,2 length bytes
+ Uint16 m_nullbitPos; // computed as part of Spec
+ };
+
+ /*
+ * Data specification i.e. array of types. Usually constructed on the
+ * heap, so keep fairly small. Used for boths keys and bounds.
+ */
+ class Spec : public Error {
+ public:
+ Spec();
+ ~Spec() {}
+ // set initial buffer (calls reset)
+ void set_buf(Type* buf, Uint32 bufMaxCnt);
+ // use if buffer is relocated
+ void set_buf(Type* buf);
+ // reset but keep buffer
+ void reset();
+ // add type to specification once or number of times
+ int add(Type type);
+ int add(Type type, Uint32 cnt);
+ // copy from
+ void copy(const Spec& s2);
+ // getters (bounds set allNullable)
+ const Type& get_type(Uint32 i) const;
+ Uint32 get_cnt() const;
+ Uint32 get_nullable_cnt(bool allNullable) const;
+ Uint32 get_nullmask_len(bool allNullable) const;
+ // max data length including null mask
+ Uint32 get_max_data_len(bool allNullable) const;
+ // minimum var bytes (if used by Data instance)
+ Uint32 get_min_var_bytes(bool allNullable) const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const Spec&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ friend class Iter;
+ friend class DataC;
+ friend class Data;
+ friend class BoundC;
+ // undefined
+ Spec(const Spec&);
+ Spec& operator=(const Spec&);
+ Type* m_buf;
+ Uint16 m_bufMaxCnt;
+ Uint16 m_cnt;
+ Uint16 m_nullableCnt;
+ Uint16 m_varsizeCnt;
+ Uint32 m_maxByteSize; // excludes null mask
+ };
+
+ /*
+ * Iterator over data items. DataC uses external Iter instances in
+ * comparison methods etc. Data contains an Iter instance which
+ * iterates on items added.
+ */
+ class Iter : public Error {
+ public:
+ // the data instance is only used to set metadata
+ Iter(const DataC& data);
+ ~Iter() {}
+ void reset();
+
+ private:
+ friend class DataC;
+ friend class Data;
+ friend class BoundC;
+ // undefined
+ Iter(const Iter&);
+ Iter& operator=(const Iter&);
+ // describe next non-null or null item and advance iterator
+ int desc(const Uint8* item);
+ int desc_null();
+ // compare current items (DataC buffers are passed)
+ int cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const;
+
+ const Spec& m_spec;
+ const bool m_allNullable;
+ // iterator
+ Uint32 m_itemPos; // position of current item in DataC buffer
+ Uint32 m_cnt; // number of items described so far
+ Uint32 m_nullCnt;
+ // current item
+ Uint32 m_lenBytes; // 0-2
+ Uint32 m_bareLen; // excludes length bytes
+ Uint32 m_itemLen; // full length, value zero means null
+ };
+
+ /*
+ * Read-only superclass of Data. Initialized from a previously
+ * constructed Data buffer (any var bytes skipped). Methods interpret
+ * one data item at a time. Values are native endian.
+ */
+ class DataC : public Error {
+ public:
+ DataC(const Spec& spec, bool allNullable);
+ // set buffer to previously constructed one with given item count
+ void set_buf(const void* buf, Uint32 bufMaxLen, Uint32 cnt);
+ // interpret next data item
+ int desc(Iter& r) const;
+ // compare cnt attrs and also return number of initial equal attrs
+ int cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const;
+ // getters
+ const Spec& get_spec() const;
+ bool get_all_nullable() const;
+ const void* get_data_buf() const;
+ Uint32 get_cnt() const;
+ bool is_empty() const;
+ bool is_full() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const DataC&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz, bool convert_flag = false) const;
+ int validate() const { return 0; }
+
+ private:
+ friend class Iter;
+ friend class Data;
+ friend class BoundC;
+ // undefined
+ DataC(const Data&);
+ DataC& operator=(const DataC&);
+ const Spec& m_spec;
+ const bool m_allNullable;
+ const Uint8* m_buf;
+ Uint32 m_bufMaxLen;
+ // can be updated as part of Data instance
+ Uint32 m_cnt;
+ };
+
+ /*
+ * Instance of an array of data values. The values are packed into
+ * a byte buffer. The buffer is also maintained as a single varbinary
+ * value if non-zero var bytes (length bytes) is specified.
+ */
+ class Data : public DataC {
+ public:
+ Data(const Spec& spec, bool allNullable, Uint32 varBytes);
+ // set buffer (calls reset)
+ void set_buf(void* buf, Uint32 bufMaxLen);
+ // reset but keep buffer (header is zeroed)
+ void reset();
+ // add non-null data items and return length in bytes
+ int add(const void* data, Uint32* len_out);
+ int add(const void* data, Uint32 cnt, Uint32* len_out);
+ // add null data items and return length 0 bytes
+ int add_null(Uint32* len_out);
+ int add_null(Uint32 cnt, Uint32* len_out);
+ // add from "plain old attr info"
+ int add_poai(const Uint32* poai, Uint32* len_out);
+ int add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out);
+ // call this before first use
+ int finalize();
+ // copy from
+ int copy(const DataC& d2);
+ // convert endian
+ int convert(Endian::Value to_endian);
+ // create complete instance from buffer contents
+ int desc_all(Uint32 cnt);
+ // getters
+ Uint32 get_max_len() const;
+ Uint32 get_max_len4() const;
+ Uint32 get_var_bytes() const;
+ const void* get_full_buf() const;
+ Uint32 get_full_len() const;
+ Uint32 get_data_len() const;
+ Uint32 get_null_cnt() const;
+ Endian::Value get_endian() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const Data&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ friend class Iter;
+ friend class Bound;
+ // undefined
+ Data(const Data&);
+ Data& operator=(const Data&);
+ int finalize_impl();
+ int convert_impl(Endian::Value to_endian);
+ const Uint32 m_varBytes;
+ Uint8* m_buf;
+ Uint32 m_bufMaxLen;
+ Endian::Value m_endian; // Native until finalize()
+ // iterator on items added
+ Iter m_iter;
+ };
+
+ /*
+ * Read-only superclass of BoundC, analogous to DataC. Initialized
+ * from a previously constructed Bound or DataC buffer.
+ */
+ class BoundC : public Error {
+ public:
+ BoundC(DataC& data);
+ ~BoundC() {}
+ // call this before first use
+ int finalize(int side);
+ // compare bound to key (may return 0 if bound is longer)
+ int cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const;
+ // compare bounds (may return 0 if cnt is less than min length)
+ int cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const;
+ // getters
+ DataC& get_data() const;
+ int get_side() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const BoundC&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ friend class Bound;
+ // undefined
+ BoundC(const BoundC&);
+ BoundC& operator=(const BoundC&);
+ DataC& m_data;
+ int m_side;
+ };
+
+ /*
+ * Ordered index range bound consists of a partial key and a "side".
+ * The partial key is a Data instance where some initial number of
+ * values are present. It is defined separately by the caller and
+ * passed to Bound ctor by reference.
+ */
+ class Bound : public BoundC {
+ public:
+ Bound(Data& data);
+ ~Bound() {}
+ void reset();
+ // call this before first use
+ int finalize(int side);
+ // getters
+ Data& get_data() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const Bound&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ // undefined
+ Bound(const Bound&);
+ Bound& operator=(const Bound&);
+ Data& m_data;
+ };
+
+ /*
+ * Helper for print() methods.
+ */
+ struct Print {
+ private:
+ friend class Endian;
+ friend class Type;
+ friend class Spec;
+ friend class Iter;
+ friend class DataC;
+ friend class Data;
+ friend class BoundC;
+ friend class Bound;
+ Print(char* buf, Uint32 bufsz);
+ void print(const char* frm, ...);
+ char* m_buf;
+ Uint32 m_bufsz;
+ Uint32 m_sz;
+ };
+};
+
+// NdbPack
+
+inline const NdbSqlUtil::Type&
+NdbPack::getSqlType(Uint32 typeId)
+{
+ return NdbSqlUtil::m_typeList[typeId];
+}
+
+// NdbPack::Error
+
+inline
+NdbPack::Error::Error()
+{
+ m_error_code = 0;
+ m_error_line = 0;
+}
+
+// NdbPack::Endian
+
+inline NdbPack::Endian::Value
+NdbPack::Endian::get_endian()
+{
+#ifndef WORDS_BIGENDIAN
+ return Little;
+#else
+ return Big;
+#endif
+}
+
+// NdbPack::Type
+
+inline
+NdbPack::Type::Type()
+{
+ m_typeId = NDB_TYPE_UNDEFINED;
+ m_byteSize = 0;
+ m_nullable = true;
+ m_csNumber = 0;
+ m_arrayType = 0;
+ m_nullbitPos = 0;
+}
+
+inline
+NdbPack::Type::Type(int typeId, Uint32 byteSize, bool nullable, Uint32 csNumber)
+{
+ set(typeId, byteSize, nullable, csNumber);
+}
+
+inline void
+NdbPack::Type::set(Uint32 typeId, Uint32 byteSize, bool nullable, Uint32 csNumber)
+{
+ m_typeId = typeId;
+ m_byteSize = byteSize;
+ m_nullable = nullable;
+ m_csNumber = csNumber;
+}
+
+inline Uint32
+NdbPack::Type::get_type_id() const
+{
+ return m_typeId;
+}
+
+inline Uint32
+NdbPack::Type::get_byte_size() const
+{
+ return m_byteSize;
+}
+
+inline bool
+NdbPack::Type::get_nullable() const
+{
+ return (bool)m_nullable;
+}
+
+inline Uint32
+NdbPack::Type::get_cs_number() const
+{
+ return m_csNumber;
+}
+
+inline Uint32
+NdbPack::Type::get_array_type() const
+{
+ return m_arrayType;
+}
+
+// NdbPack::Spec
+
+inline
+NdbPack::Spec::Spec()
+{
+ reset();
+ m_buf = 0;
+ m_bufMaxCnt = 0;
+}
+
+inline void
+NdbPack::Spec::set_buf(Type* buf, Uint32 bufMaxCnt)
+{
+ reset();
+ m_buf = buf;
+ m_bufMaxCnt = bufMaxCnt;
+}
+
+inline void
+NdbPack::Spec::set_buf(Type* buf)
+{
+ m_buf = buf;
+}
+
+inline void
+NdbPack::Spec::reset()
+{
+ m_cnt = 0;
+ m_nullableCnt = 0;
+ m_varsizeCnt = 0;
+ m_maxByteSize = 0;
+}
+
+inline const NdbPack::Type&
+NdbPack::Spec::get_type(Uint32 i) const
+{
+ assert(i < m_cnt);
+ return m_buf[i];
+}
+
+inline Uint32
+NdbPack::Spec::get_cnt() const
+{
+ return m_cnt;
+}
+
+inline Uint32
+NdbPack::Spec::get_nullable_cnt(bool allNullable) const
+{
+ if (!allNullable)
+ return m_nullableCnt;
+ else
+ return m_cnt;
+}
+
+inline Uint32
+NdbPack::Spec::get_nullmask_len(bool allNullable) const
+{
+ return (get_nullable_cnt(allNullable) + 7) / 8;
+}
+
+inline Uint32
+NdbPack::Spec::get_max_data_len(bool allNullable) const
+{
+ return get_nullmask_len(allNullable) + m_maxByteSize;
+}
+
+inline Uint32
+NdbPack::Spec::get_min_var_bytes(bool allNullable) const
+{
+ const Uint32 len = get_max_data_len(allNullable);
+ return (len < 256 ? 1 : 2);
+}
+
+// NdbPack::Iter
+
+inline
+NdbPack::Iter::Iter(const DataC& data) :
+ m_spec(data.m_spec),
+ m_allNullable(data.m_allNullable)
+{
+ reset();
+}
+
+inline void
+NdbPack::Iter::reset()
+{
+ m_itemPos = m_spec.get_nullmask_len(m_allNullable);
+ m_cnt = 0;
+ m_nullCnt = 0;
+ m_lenBytes = 0;
+ m_bareLen = 0;
+ m_itemLen = 0;
+}
+
+// NdbPack::DataC
+
+inline
+NdbPack::DataC::DataC(const Spec& spec, bool allNullable) :
+ m_spec(spec),
+ m_allNullable(allNullable)
+{
+ m_buf = 0;
+ m_bufMaxLen = 0;
+ m_cnt = 0;
+}
+
+inline void
+NdbPack::DataC::set_buf(const void* buf, Uint32 bufMaxLen, Uint32 cnt)
+{
+ m_buf = static_cast<const Uint8*>(buf);
+ m_bufMaxLen = bufMaxLen;
+ m_cnt = cnt;
+}
+
+inline const NdbPack::Spec&
+NdbPack::DataC::get_spec() const
+{
+ return m_spec;
+}
+
+inline bool
+NdbPack::DataC::get_all_nullable() const
+{
+ return &m_allNullable;
+}
+
+inline const void*
+NdbPack::DataC::get_data_buf() const
+{
+ return &m_buf[0];
+}
+
+inline Uint32
+NdbPack::DataC::get_cnt() const
+{
+ return m_cnt;
+}
+
+inline bool
+NdbPack::DataC::is_empty() const
+{
+ return m_cnt == 0;
+}
+
+inline bool
+NdbPack::DataC::is_full() const
+{
+ return m_cnt == m_spec.m_cnt;
+}
+
+// NdbPack::Data
+
+inline
+NdbPack::Data::Data(const Spec& spec, bool allNullable, Uint32 varBytes) :
+ DataC(spec, allNullable),
+ m_varBytes(varBytes),
+ m_iter(*this)
+{
+ m_buf = 0;
+ m_bufMaxLen = 0;
+ m_endian = Endian::Native;
+}
+
+inline void
+NdbPack::Data::set_buf(void* buf, Uint32 bufMaxLen)
+{
+ m_buf = static_cast<Uint8*>(buf);
+ m_bufMaxLen = bufMaxLen;
+ reset();
+ assert(bufMaxLen >= m_varBytes);
+ DataC::set_buf(&m_buf[m_varBytes], m_bufMaxLen - m_varBytes, 0);
+}
+
+inline void
+NdbPack::Data::reset()
+{
+ m_cnt = 0; // in DataC
+ const Uint32 bytes = m_varBytes + m_spec.get_nullmask_len(m_allNullable);
+ memset(m_buf, 0, bytes);
+ m_endian = Endian::Native;
+ m_iter.reset();
+}
+
+inline int
+NdbPack::Data::finalize()
+{
+ if (m_varBytes == 0 ||
+ finalize_impl() == 0)
+ {
+ m_endian = Endian::get_endian();
+ return 0;
+ }
+ return -1;
+}
+
+inline int
+NdbPack::Data::convert(Endian::Value to_endian)
+{
+ if (unlikely(to_endian == Endian::Native))
+ to_endian = Endian::get_endian();
+ if (m_endian == to_endian)
+ return 0;
+ if (convert_impl(to_endian) == 0)
+ {
+ m_endian = to_endian;
+ return 0;
+ }
+ return -1;
+}
+
+inline Uint32
+NdbPack::Data::get_max_len() const
+{
+ return m_varBytes + m_spec.get_max_data_len(m_allNullable);
+}
+
+inline Uint32
+NdbPack::Data::get_max_len4() const
+{
+ Uint32 len4 = get_max_len();
+ len4 += 3;
+ len4 /= 4;
+ len4 *= 4;
+ return len4;
+}
+
+inline Uint32
+NdbPack::Data::get_var_bytes() const
+{
+ return m_varBytes;
+}
+
+inline const void*
+NdbPack::Data::get_full_buf() const
+{
+ return &m_buf[0];
+}
+
+inline Uint32
+NdbPack::Data::get_full_len() const
+{
+ return m_varBytes + m_iter.m_itemPos + m_iter.m_itemLen;
+}
+
+inline Uint32
+NdbPack::Data::get_data_len() const
+{
+ return m_iter.m_itemPos + m_iter.m_itemLen;
+}
+
+inline Uint32
+NdbPack::Data::get_null_cnt() const
+{
+ return m_iter.m_nullCnt;
+}
+
+inline NdbPack::Endian::Value
+NdbPack::Data::get_endian() const
+{
+ return m_endian;
+}
+
+// NdbPack::BoundC
+
+inline
+NdbPack::BoundC::BoundC(DataC& data) :
+ m_data(data)
+{
+ m_side = 0;
+}
+
+inline int
+NdbPack::BoundC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
+{
+ const BoundC& b1 = *this;
+ const DataC& d1 = b1.m_data;
+ int res = d1.cmp(d2, cnt, num_eq);
+ if (res == 0 && d1.m_cnt <= d2.m_cnt)
+ res = b1.m_side;
+ return res;
+}
+
+inline NdbPack::DataC&
+NdbPack::BoundC::get_data() const
+{
+ return m_data;
+}
+
+inline int
+NdbPack::BoundC::get_side() const
+{
+ return m_side;
+}
+
+// NdbPack::Bound
+
+inline
+NdbPack::Bound::Bound(Data& data) :
+ BoundC(data),
+ m_data(data)
+{
+}
+
+inline void
+NdbPack::Bound::reset()
+{
+ m_data.reset();
+ m_side = 0;
+}
+
+inline int
+NdbPack::Bound::finalize(int side)
+{
+ if (m_data.finalize() == -1)
+ {
+ set_error(m_data);
+ return -1;
+ }
+ if (BoundC::finalize(side) == -1)
+ return -1;
+ return 0;
+}
+
+inline NdbPack::Data&
+NdbPack::Bound::get_data() const
+{
+ return m_data;
+}
+
+#endif // NDB_PACK_HPP
=== modified file 'storage/ndb/include/util/NdbSqlUtil.hpp'
--- a/storage/ndb/include/util/NdbSqlUtil.hpp 2011-02-19 03:13:04 +0000
+++ b/storage/ndb/include/util/NdbSqlUtil.hpp 2011-05-04 09:44:18 +0000
@@ -177,6 +177,7 @@ public:
Uint32 dataByteSize);
private:
+ friend class NdbPack;
/**
* List of all types. Must match Type::Enum.
*/
=== modified file 'storage/ndb/src/common/util/CMakeLists.txt'
--- a/storage/ndb/src/common/util/CMakeLists.txt 2011-04-01 11:46:04 +0000
+++ b/storage/ndb/src/common/util/CMakeLists.txt 2011-05-04 09:44:18 +0000
@@ -54,6 +54,7 @@ ADD_LIBRARY(ndbgeneral STATIC
SparseBitmask.cpp
require.c
Vector.cpp
+ NdbPack.cpp
)
TARGET_LINK_LIBRARIES(ndbgeneral ndbtrace ${ZLIB_LIBRARY} mysys)
@@ -82,3 +83,8 @@ SET_TARGET_PROPERTIES(ndb_version-t
PROPERTIES COMPILE_FLAGS "-DTEST_VERSION")
TARGET_LINK_LIBRARIES(ndb_version-t ndbgeneral)
+ADD_EXECUTABLE(NdbPack-t NdbPack.cpp)
+SET_TARGET_PROPERTIES(NdbPack-t
+ PROPERTIES COMPILE_FLAGS "-DTEST_NDB_PACK")
+TARGET_LINK_LIBRARIES(NdbPack-t ndbgeneral)
+
=== modified file 'storage/ndb/src/common/util/Makefile.am'
--- a/storage/ndb/src/common/util/Makefile.am 2011-04-01 11:46:04 +0000
+++ b/storage/ndb/src/common/util/Makefile.am 2011-05-04 09:44:18 +0000
@@ -27,14 +27,15 @@ libgeneral_la_SOURCES = \
strdup.c \
ConfigValues.cpp ndb_init.cpp basestring_vsnprintf.c \
Bitmask.cpp SparseBitmask.cpp parse_mask.hpp \
- ndb_rand.c require.c Vector.cpp
+ ndb_rand.c require.c Vector.cpp \
+ NdbPack.cpp
INCLUDES_LOC = @ZLIB_INCLUDES@
libndbazio_la_SOURCES = ndbzio.c
libndbazio_la_LIBADD = @ZLIB_LIBS@
-noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t
+noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t NdbPack-t
BaseString_t_SOURCES = BaseString.cpp
BaseString_t_CXXFLAGS = -DTEST_BASE_STRING
@@ -73,5 +74,14 @@ ndb_version_t_SOURCES = version.cpp
ndb_version_t_CXXFLAGS = -DTEST_VERSION
ndb_version_t_LDADD = libgeneral.la
+NdbPack_t_SOURCES = NdbPack.cpp
+NdbPack_t_CXXFLAGS = -DTEST_NDB_PACK
+NdbPack_t_LDADD = \
+ libgeneral.la \
+ $(top_builddir)/storage/ndb/src/common/portlib/libportlib.la \
+ $(top_builddir)/mysys/libmysys.la \
+ $(top_builddir)/strings/libmystrings.la \
+ $(top_builddir)/dbug/libdbug.la
+
include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_util.mk.am
=== added file 'storage/ndb/src/common/util/NdbPack.cpp'
--- a/storage/ndb/src/common/util/NdbPack.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/NdbPack.cpp 2011-05-04 09:44:18 +0000
@@ -0,0 +1,2052 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include <ndb_global.h>
+#include <NdbPack.hpp>
+#include <NdbOut.hpp>
+#include <NdbEnv.h>
+
+// NdbPack::Error
+
+int
+NdbPack::Error::get_error_code() const
+{
+ return m_error_code;
+}
+
+int
+NdbPack::Error::get_error_line() const
+{
+ return m_error_line;
+}
+
+void
+NdbPack::Error::set_error(int code, int line) const
+{
+ m_error_code = code;
+ m_error_line = line;
+#ifdef VM_TRACE
+ const char* p = NdbEnv_GetEnv("NDB_PACK_ABORT_ON_ERROR", (char*)0, 0);
+ if (p != 0 && strchr("1Y", p[0]) != 0)
+ require(false);
+#endif
+}
+
+void
+NdbPack::Error::set_error(const Error& e2) const
+{
+ set_error(e2.m_error_code, e2.m_error_line);
+}
+
+// NdbPack::Endian
+
+void
+NdbPack::Endian::convert(void* ptr, Uint32 len)
+{
+ Uint8* p = (Uint8*)ptr;
+ for (Uint32 i = 0; i < len / 2; i++)
+ {
+ Uint32 j = len - i - 1;
+ Uint8 tmp = p[i];
+ p[i] = p[j];
+ p[j] = tmp;
+ }
+}
+
+// NdbPack::Type
+
+struct Ndb_pack_type_info {
+ bool m_supported;
+ Uint16 m_fixSize; // if non-zero must have this exact size
+ Uint16 m_arrayType; // 0,1,2 length bytes
+ bool m_charType; // type with character set
+ bool m_convert; // convert endian (reverse byte order)
+};
+
+static const Ndb_pack_type_info
+g_ndb_pack_type_info[] = {
+ { 0, 0, 0, 0, 0 }, // NDB_TYPE_UNDEFINED
+ { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYINT
+ { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYUNSIGNED
+ { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLINT
+ { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLUNSIGNED
+ { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMINT
+ { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMUNSIGNED
+ { 1, 4, 0, 0, 1 }, // NDB_TYPE_INT
+ { 1, 4, 0, 0, 1 }, // NDB_TYPE_UNSIGNED
+ { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGINT
+ { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGUNSIGNED
+ { 1, 4, 0, 0, 1 }, // NDB_TYPE_FLOAT
+ { 1, 8, 0, 0, 1 }, // NDB_TYPE_DOUBLE
+ { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMAL
+ { 1, 0, 0, 1, 0 }, // NDB_TYPE_CHAR
+ { 1, 0, 1, 1, 0 }, // NDB_TYPE_VARCHAR
+ { 1, 0, 0, 0, 0 }, // NDB_TYPE_BINARY
+ { 1, 0, 1, 0, 0 }, // NDB_TYPE_VARBINARY
+ { 1, 8, 0, 0, 0 }, // NDB_TYPE_DATETIME
+ { 1, 3, 0, 0, 0 }, // NDB_TYPE_DATE
+ { 0, 0, 0, 0, 0 }, // NDB_TYPE_BLOB
+ { 0, 0, 0, 1, 0 }, // NDB_TYPE_TEXT
+ { 0, 0, 0, 0, 0 }, // NDB_TYPE_BIT
+ { 1, 0, 2, 1, 0 }, // NDB_TYPE_LONGVARCHAR
+ { 1, 0, 2, 0, 0 }, // NDB_TYPE_LONGVARBINARY
+ { 1, 3, 0, 0, 0 }, // NDB_TYPE_TIME
+ { 1, 1, 0, 0, 0 }, // NDB_TYPE_YEAR
+ { 1, 4, 0, 0, 0 }, // NDB_TYPE_TIMESTAMP
+ { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMALUNSIGNED
+ { 1, 0, 0, 0, 0 }, // NDB_TYPE_DECIMAL
+ { 1, 0, 0, 0, 0 } // NDB_TYPE_DECIMALUNSIGNED
+};
+
+static const int g_ndb_pack_type_info_cnt =
+ sizeof(g_ndb_pack_type_info) / sizeof(g_ndb_pack_type_info[0]);
+
+int
+NdbPack::Type::complete()
+{
+ if (m_typeId == 0)
+ {
+ set_error(TypeNotSet, __LINE__);
+ return -1;
+ }
+ if (m_typeId >= g_ndb_pack_type_info_cnt)
+ {
+ set_error(TypeNotSet, __LINE__);
+ return -1;
+ }
+ const Ndb_pack_type_info& info = g_ndb_pack_type_info[m_typeId];
+ if (!info.m_supported)
+ {
+ set_error(TypeNotSupported, __LINE__);
+ return -1;
+ }
+ if (m_byteSize == 0)
+ {
+ set_error(TypeSizeZero, __LINE__);
+ return -1;
+ }
+ if (info.m_fixSize != 0 && m_byteSize != info.m_fixSize)
+ {
+ set_error(TypeFixSizeInvalid, __LINE__);
+ return -1;
+ }
+ if (!(m_nullable <= 1))
+ {
+ set_error(TypeNullableNotBool, __LINE__);
+ return -1;
+ }
+ if (info.m_charType && m_csNumber == 0)
+ {
+ set_error(CharsetNotSpecified, __LINE__);
+ return -1;
+ }
+ if (info.m_charType && all_charsets[m_csNumber] == 0)
+ {
+ CHARSET_INFO* cs = get_charset(m_csNumber, MYF(0));
+ if (cs == 0)
+ {
+ set_error(CharsetNotFound, __LINE__);
+ return -1;
+ }
+ all_charsets[m_csNumber] = cs; // yes caller must do this
+ }
+ if (!info.m_charType && m_csNumber != 0)
+ {
+ set_error(CharsetNotAllowed, __LINE__);
+ return -1;
+ }
+ m_arrayType = info.m_arrayType;
+ return 0;
+}
+
+// NdbPack::Spec
+
+int
+NdbPack::Spec::add(Type type)
+{
+ Uint32 cnt = m_cnt;
+ Uint32 nullable_cnt = m_nullableCnt;
+ Uint32 varsize_cnt = m_varsizeCnt;
+ Uint32 max_byte_size = m_maxByteSize;
+ if (type.complete() == -1)
+ {
+ set_error(type);
+ return -1;
+ }
+ type.m_nullbitPos = 0xFFFF;
+ if (type.m_nullable)
+ {
+ type.m_nullbitPos = nullable_cnt;
+ nullable_cnt++;
+ }
+ if (type.m_arrayType != 0)
+ {
+ varsize_cnt++;
+ }
+ max_byte_size += type.m_byteSize;
+ if (cnt >= m_bufMaxCnt)
+ {
+ set_error(SpecBufOverflow, __LINE__);
+ return -1;
+ }
+ m_buf[cnt] = type;
+ cnt++;
+ m_cnt = cnt;
+ m_nullableCnt = nullable_cnt;
+ m_varsizeCnt = varsize_cnt;
+ m_maxByteSize = max_byte_size;
+ return 0;
+}
+
+int
+NdbPack::Spec::add(Type type, Uint32 cnt)
+{
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ if (add(type) == -1)
+ return -1;
+ }
+ return 0;
+}
+
+void
+NdbPack::Spec::copy(const Spec& s2)
+{
+ assert(m_bufMaxCnt >= s2.m_cnt);
+ reset();
+ m_cnt = s2.m_cnt;
+ m_nullableCnt = s2.m_nullableCnt;
+ m_varsizeCnt = s2.m_varsizeCnt;
+ m_maxByteSize = s2.m_maxByteSize;
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ m_buf[i] = s2.m_buf[i];
+ }
+}
+
+// NdbPack::Iter
+
+int
+NdbPack::Iter::desc(const Uint8* item)
+{
+ const Uint32 i = m_cnt; // item index
+ assert(i < m_spec.m_cnt);
+ const Type& type = m_spec.m_buf[i];
+ const Uint32 lenBytes = type.m_arrayType;
+ Uint32 bareLen = 0;
+ switch (lenBytes) {
+ case 0:
+ bareLen = type.m_byteSize;
+ break;
+ case 1:
+ bareLen = item[0];
+ break;
+ case 2:
+ bareLen = item[0] + (item[1] << 8);
+ break;
+ default:
+ assert(false);
+ set_error(InternalError, __LINE__);
+ return -1;
+ }
+ const Uint32 itemLen = lenBytes + bareLen;
+ if (itemLen > type.m_byteSize)
+ {
+ set_error(DataValueOverflow, __LINE__);
+ return -1;
+ }
+ m_itemPos += m_itemLen; // skip previous item
+ m_cnt++;
+ m_lenBytes = lenBytes;
+ m_bareLen = bareLen;
+ m_itemLen = itemLen;
+ return 0;
+}
+
+int
+NdbPack::Iter::desc_null()
+{
+ assert(m_cnt < m_spec.m_cnt);
+ // caller checks if null allowed
+ m_itemPos += m_itemLen; // skip previous item
+ m_cnt++;
+ m_nullCnt++;
+ m_lenBytes = 0;
+ m_bareLen = 0;
+ m_itemLen = 0;
+ return 0;
+}
+
+int
+NdbPack::Iter::cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const
+{
+ const Iter& r1 = *this;
+ assert(&r1.m_spec == &r2.m_spec);
+ assert(r1.m_cnt == r2.m_cnt && r1.m_cnt > 0);
+ const Uint32 i = r1.m_cnt - 1; // item index
+ int res = 0;
+ const Uint32 n1 = r1.m_itemLen;
+ const Uint32 n2 = r2.m_itemLen;
+ if (n1 != 0)
+ {
+ if (n2 != 0)
+ {
+ const Type& type = r1.m_spec.m_buf[i];
+ const NdbSqlUtil::Type& sqlType = getSqlType(type.m_typeId);
+ const Uint8* p1 = &buf1[r1.m_itemPos];
+ const Uint8* p2 = &buf2[r2.m_itemPos];
+ CHARSET_INFO* cs = all_charsets[type.m_csNumber];
+ res = (*sqlType.m_cmp)(cs, p1, n1, p2, n2, true);
+ }
+ else
+ {
+ res = +1;
+ }
+ }
+ else
+ {
+ if (n2 != 0)
+ res = -1;
+ }
+ return res;
+}
+
+// NdbPack::DataC
+
+int
+NdbPack::DataC::desc(Iter& r) const
+{
+ const Uint32 i = r.m_cnt; // item index
+ assert(i < m_cnt);
+ const Type& type = m_spec.m_buf[i];
+ if (type.m_nullable || m_allNullable)
+ {
+ Uint32 nullbitPos = 0;
+ if (!m_allNullable)
+ nullbitPos = type.m_nullbitPos;
+ else
+ nullbitPos = i;
+ const Uint32 byte_pos = nullbitPos / 8;
+ const Uint32 bit_pos = nullbitPos % 8;
+ const Uint8 bit_mask = (1 << bit_pos);
+ const Uint8& the_byte = m_buf[byte_pos];
+ if ((the_byte & bit_mask) != 0)
+ {
+ if (r.desc_null() == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ return 0;
+ }
+ }
+ const Uint32 pos = r.m_itemPos + r.m_itemLen;
+ const Uint8* item = &m_buf[pos];
+ if (r.desc(item) == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::DataC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
+{
+ const DataC& d1 = *this;
+ assert(cnt <= d1.m_cnt);
+ assert(cnt <= d2.m_cnt);
+ Iter r1(d1);
+ Iter r2(d2);
+ int res = 0;
+ Uint32 i; // remember last
+ for (i = 0; i < cnt; i++)
+ {
+ d1.desc(r1);
+ d2.desc(r2);
+ res = r1.cmp(r2, d1.m_buf, d2.m_buf);
+ if (res != 0)
+ break;
+ }
+ num_eq = i;
+ return res;
+}
+
+// NdbPack::Data
+
+int
+NdbPack::Data::add(const void* data, Uint32* len_out)
+{
+ assert(data != 0);
+ const Uint8* item = (const Uint8*)data;
+ const Uint32 i = m_cnt; // item index
+ if (i >= m_spec.m_cnt)
+ {
+ set_error(DataCntOverflow, __LINE__);
+ return -1;
+ }
+ Iter& r = m_iter;
+ assert(r.m_cnt == i);
+ const Uint32 fullLen = m_varBytes + r.m_itemPos + r.m_itemLen;
+ if (r.desc(item) == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ if (fullLen + r.m_itemLen > m_bufMaxLen)
+ {
+ set_error(DataBufOverflow, __LINE__);
+ return -1;
+ }
+ memcpy(&m_buf[fullLen], item, r.m_itemLen);
+ *len_out = r.m_itemLen;
+ m_cnt++;
+ return 0;
+}
+
+int
+NdbPack::Data::add(const void* data, Uint32 cnt, Uint32* len_out)
+{
+ const Uint8* data_ptr = (const Uint8*)data;
+ Uint32 len_tot = 0;
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ Uint32 len;
+ if (add(data_ptr, &len) == -1)
+ return -1;
+ if (data != 0)
+ data_ptr += len;
+ len_tot += len;
+ }
+ *len_out = len_tot;
+ return 0;
+}
+
+int
+NdbPack::Data::add_null(Uint32* len_out)
+{
+ const Uint32 i = m_cnt; // item index
+ if (i >= m_spec.m_cnt)
+ {
+ set_error(DataCntOverflow, __LINE__);
+ return -1;
+ }
+ Iter& r = m_iter;
+ assert(r.m_cnt == i);
+ if (r.desc_null() == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ Uint32 nullbitPos = 0;
+ if (!m_allNullable)
+ {
+ const Type& type = m_spec.m_buf[i];
+ if (!type.m_nullable)
+ {
+ set_error(DataNotNullable, __LINE__);
+ return -1;
+ }
+ nullbitPos = type.m_nullbitPos;
+ }
+ else
+ {
+ nullbitPos = i;
+ }
+ const Uint32 byte_pos = nullbitPos / 8;
+ const Uint32 bit_pos = nullbitPos % 8;
+ const Uint8 bit_mask = (1 << bit_pos);
+ Uint8& the_byte = m_buf[m_varBytes + byte_pos];
+ assert((the_byte & bit_mask) == 0);
+ the_byte |= bit_mask;
+ *len_out = r.m_itemLen;
+ m_cnt++;
+ return 0;
+}
+
+int
+NdbPack::Data::add_null(Uint32 cnt, Uint32* len_out)
+{
+ Uint32 len_tot = 0;
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ Uint32 len;
+ if (add_null(&len) == -1)
+ return -1;
+ len_tot += len;
+ }
+ *len_out = len_tot;
+ return 0;
+}
+
+int
+NdbPack::Data::add_poai(const Uint32* poai, Uint32* len_out)
+{
+ const AttributeHeader ah = *(const AttributeHeader*)&poai[0];
+ if (!ah.isNULL())
+ {
+ if (add(&poai[1], len_out) == -1)
+ return -1;
+ }
+ else
+ {
+ if (add_null(len_out) == -1)
+ return -1;
+ }
+ if (ah.getByteSize() != *len_out)
+ {
+ set_error(InvalidAttrInfo, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::Data::add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out)
+{
+ Uint32 len_tot = 0;
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ Uint32 len;
+ if (add_poai(poai, &len) == -1)
+ return -1;
+ len_tot += len;
+ poai += 1 + (len + 3) / 4;
+ }
+ *len_out = len_tot;
+ return 0;
+}
+
+int
+NdbPack::Data::finalize_impl()
+{
+ const Uint32 dataLen = m_iter.m_itemPos + m_iter.m_itemLen;
+ switch (m_varBytes) {
+ // case 0: inlined
+ case 1:
+ if (dataLen <= 0xFF)
+ {
+ m_buf[0] = dataLen;
+ return 0;
+ }
+ break;
+ case 2:
+ if (dataLen <= 0xFFFF)
+ {
+ m_buf[0] = (dataLen & 0xFF);
+ m_buf[1] = (dataLen >> 8);
+ return 0;
+ }
+ break;
+ default:
+ break;
+ }
+ set_error(InternalError, __LINE__);
+ return -1;
+}
+
+int
+NdbPack::Data::desc_all(Uint32 cnt)
+{
+ assert(m_cnt == 0); // reset() would destroy nullmask
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ m_cnt++;
+ if (desc(m_iter) == -1)
+ return -1;
+ }
+ if (finalize() == -1)
+ return -1;
+ return 0;
+}
+
+int
+NdbPack::Data::copy(const DataC& d2)
+{
+ reset();
+ Iter r2(d2);
+ const Uint32 cnt2 = d2.m_cnt;
+ for (Uint32 i = 0; i < cnt2; i++)
+ {
+ if (d2.desc(r2) == -1)
+ return -1;
+ Uint32 len_out = ~(Uint32)0;
+ if (r2.m_itemLen != 0)
+ {
+ if (add(&d2.m_buf[r2.m_itemPos], &len_out) == -1)
+ return -1;
+ assert(len_out == r2.m_itemLen);
+ }
+ else
+ {
+ if (add_null(&len_out) == -1)
+ return -1;
+ assert(len_out ==0);
+ }
+ }
+ if (finalize() == -1)
+ return -1;
+ return 0;
+}
+
+int
+NdbPack::Data::convert_impl(Endian::Value to_endian)
+{
+ const Spec& spec = m_spec;
+ Iter r(*this);
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ if (DataC::desc(r) == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ const Type& type = spec.m_buf[i];
+ const Uint32 typeId = type.m_typeId;
+ const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+ if (info.m_convert)
+ {
+ Uint8* ptr = &m_buf[m_varBytes + r.m_itemPos];
+ Uint32 len = r.m_itemLen;
+ Endian::convert(ptr, len);
+ }
+ }
+ return 0;
+}
+
+// NdbPack::BoundC
+
+int
+NdbPack::BoundC::finalize(int side)
+{
+ if (m_data.m_cnt == 0 && side != 0)
+ {
+ set_error(BoundEmptySide, __LINE__);
+ return -1;
+ }
+ if (m_data.m_cnt != 0 && side != -1 && side != +1)
+ {
+ set_error(BoundNonemptySide, __LINE__);
+ return -1;
+ }
+ m_side = side;
+ return 0;
+}
+
+int
+NdbPack::BoundC::cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const
+{
+ const BoundC& b1 = *this;
+ const DataC& d1 = b1.m_data;
+ const DataC& d2 = b2.m_data;
+ int res = d1.cmp(d2, cnt, num_eq);
+ if (res == 0)
+ {
+ if (cnt < d1.m_cnt && cnt < d2.m_cnt)
+ ;
+ else if (d1.m_cnt < d2.m_cnt)
+ res = (+1) * b1.m_side;
+ else if (d1.m_cnt > d2.m_cnt)
+ res = (-1) * b2.m_side;
+ else if (b1.m_side < b2.m_side)
+ res = -1;
+ else if (b1.m_side > b2.m_side)
+ res = +1;
+ }
+ return res;
+}
+
+// NdbPack::Bound
+
+// print
+
+NdbPack::Print::Print(char* buf, Uint32 bufsz) :
+ m_buf(buf), m_bufsz(bufsz), m_sz(0) {}
+
+void
+NdbPack::Print::print(const char* fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ if (m_bufsz > m_sz)
+ {
+ vsnprintf(&m_buf[m_sz], m_bufsz - m_sz, fmt, ap);
+ m_sz += strlen(&m_buf[m_sz]);
+ }
+ va_end(ap);
+}
+
+// print Type
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Type& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::Type::print(NdbOut& out) const
+{
+ char buf[200];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Type::print(char* buf, Uint32 bufsz) const
+{
+ Print p(buf, bufsz);
+ p.print("typeId:%u", m_typeId);
+ p.print(" byteSize:%u", m_byteSize);
+ p.print(" nullable:%u", m_nullable);
+ p.print(" csNumber:%u", m_csNumber);
+ return buf;
+}
+
+// print Spec
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Spec& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::Spec::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Spec::print(char* buf, Uint32 bufsz) const
+{
+ Print p(buf, bufsz);
+ p.print("cnt:%u", m_cnt);
+ p.print(" nullableCnt:%u", m_nullableCnt);
+ p.print(" varsizeCnt:%u", m_varsizeCnt);
+ p.print(" nullmaskLen:%u", get_nullmask_len(false));
+ p.print(" maxByteSize:%u", m_maxByteSize);
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ const Type& type = m_buf[i];
+ p.print(" [%u", i);
+ p.print(" typeId:%u", type.m_typeId);
+ p.print(" nullable:%u", type.m_nullable);
+ p.print(" byteSize:%u", type.m_byteSize);
+ p.print(" csNumber:%u", type.m_csNumber);
+ p.print("]");
+ }
+ return buf;
+}
+
+// print DataC
+
+bool g_ndb_pack_print_hex_always = false;
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::DataC& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::DataC::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::DataC::print(char* buf, Uint32 bufsz, bool convert_flag) const
+{
+ Print p(buf, bufsz);
+ const Spec& spec = m_spec;
+ const Uint32 nullmask_len = spec.get_nullmask_len(m_allNullable);
+ if (nullmask_len != 0)
+ {
+ p.print("nullmask:");
+ for (Uint32 i = 0; i < nullmask_len; i++)
+ {
+ int x = m_buf[i];
+ p.print("%02x", x);
+ }
+ }
+ Iter r(*this);
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ desc(r);
+ const Uint8* value = &m_buf[r.m_itemPos];
+ p.print(" [%u", i);
+ p.print(" pos:%u", r.m_itemPos);
+ p.print(" len:%u", r.m_itemLen);
+ if (r.m_itemLen > 0)
+ {
+ p.print(" value:");
+ // some specific types for debugging
+ const Type& type = spec.m_buf[i];
+ bool ok = true;
+ switch (type.m_typeId) {
+ case NDB_TYPE_TINYINT:
+ {
+ Int8 x;
+ memcpy(&x, value, 1);
+ if (convert_flag)
+ Endian::convert(&x, 1);
+ p.print("%d", (int)x);
+ }
+ break;
+ case NDB_TYPE_TINYUNSIGNED:
+ {
+ Uint8 x;
+ memcpy(&x, value, 1);
+ if (convert_flag)
+ Endian::convert(&x, 1);
+ p.print("%u", (uint)x);
+ }
+ break;
+ case NDB_TYPE_SMALLINT:
+ {
+ Int16 x;
+ memcpy(&x, value, 2);
+ if (convert_flag)
+ Endian::convert(&x, 2);
+ p.print("%d", (int)x);
+ }
+ break;
+ case NDB_TYPE_SMALLUNSIGNED:
+ {
+ Uint16 x;
+ memcpy(&x, value, 2);
+ if (convert_flag)
+ Endian::convert(&x, 2);
+ p.print("%u", (uint)x);
+ }
+ break;
+ case NDB_TYPE_INT:
+ {
+ Int32 x;
+ memcpy(&x, value, 4);
+ if (convert_flag)
+ Endian::convert(&x, 4);
+ p.print("%d", (int)x);
+ }
+ break;
+ case NDB_TYPE_UNSIGNED:
+ {
+ Uint32 x;
+ memcpy(&x, value, 4);
+ if (convert_flag)
+ Endian::convert(&x, 4);
+ p.print("%u", (uint)x);
+ }
+ break;
+ case NDB_TYPE_FLOAT:
+ {
+ float x;
+ memcpy(&x, value, 4);
+ if (convert_flag)
+ Endian::convert(&x, 4);
+ p.print("%g", (double)x);
+ }
+ break;
+ case NDB_TYPE_DOUBLE:
+ {
+ double x;
+ memcpy(&x, value, 8);
+ if (convert_flag)
+ Endian::convert(&x, 8);
+ p.print("%g", x);
+ }
+ break;
+ case NDB_TYPE_CHAR:
+ case NDB_TYPE_VARCHAR:
+ case NDB_TYPE_LONGVARCHAR:
+ {
+ const Uint32 off = type.m_arrayType;
+ for (Uint32 j = 0; j < r.m_bareLen; j++)
+ {
+ Uint8 x = value[off + j];
+ p.print("%c", (int)x);
+ }
+ }
+ break;
+ default:
+ ok = false;
+ break;
+ }
+ if (!ok || g_ndb_pack_print_hex_always)
+ {
+ p.print("<");
+ for (Uint32 j = 0; j < r.m_itemLen; j++)
+ {
+ int x = value[j];
+ p.print("%02x", x);
+ }
+ p.print(">");
+ }
+ }
+ p.print("]");
+ }
+ return buf;
+}
+
+// print Data
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Data& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::Data::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Data::print(char* buf, Uint32 bufsz) const
+{
+ Print p(buf, bufsz);
+ char* ptr = buf;
+ if (m_varBytes != 0)
+ {
+ p.print("varBytes:");
+ for (Uint32 i = 0; i < m_varBytes; i++)
+ {
+ int r = m_buf[i];
+ p.print("%02x", r);
+ }
+ p.print(" ");
+ }
+ p.print("dataLen:%u", m_iter.m_itemPos + m_iter.m_itemLen);
+ p.print(" ");
+ const bool convert_flag =
+ m_endian != Endian::Native &&
+ m_endian != Endian::get_endian();
+ DataC::print(&buf[p.m_sz], bufsz - p.m_sz, convert_flag);
+ return buf;
+}
+
+// print BoundC
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::BoundC& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::BoundC::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::BoundC::print(char* buf, Uint32 bufsz) const
+{
+ Print p(buf, bufsz);
+ p.print("side:%s ", m_side < 0 ? "-" : m_side > 0 ? "+" : "0");
+ m_data.print(&buf[p.m_sz], bufsz - p.m_sz);
+ return buf;
+}
+
+// print Bound
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Bound& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::Bound::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Bound::print(char* buf, Uint32 bufsz) const
+{
+ BoundC::print(buf, bufsz);
+ return buf;
+}
+
+// validate
+
+int
+NdbPack::Type::validate() const
+{
+ Type type2 = *this;
+ if (type2.complete() == -1)
+ {
+ set_error(type2);
+ return -1;
+ }
+ if (memcmp(this, &type2, sizeof(Type)) != 0)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::Spec::validate() const
+{
+ Uint32 nullableCnt = 0;
+ Uint32 varsizeCnt = 0;
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ const Type& type = m_buf[i];
+ if (type.validate() == -1)
+ {
+ set_error(type);
+ return -1;
+ }
+ if (type.m_nullable)
+ nullableCnt++;
+ if (type.m_arrayType != 0)
+ varsizeCnt++;
+ }
+ if (m_nullableCnt != nullableCnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (m_varsizeCnt != varsizeCnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::Data::validate() const
+{
+ if (DataC::validate() == -1)
+ return -1;
+ const Iter& r = m_iter;
+ if (r.m_cnt != m_cnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ Iter r2(*this);
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ if (desc(r2) == -1)
+ return -1;
+ }
+ if (r.m_itemPos != r2.m_itemPos)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (r.m_cnt != r2.m_cnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (r.m_nullCnt != r2.m_nullCnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (r.m_itemLen != r2.m_itemLen)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::BoundC::validate() const
+{
+ if (m_data.validate() == -1)
+ {
+ set_error(m_data);
+ return -1;
+ }
+ if (m_data.m_cnt == 0 && m_side != 0)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (m_data.m_cnt != 0 && m_side != -1 && m_side != +1)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::Bound::validate() const
+{
+ if (BoundC::validate() == -1)
+ return -1;
+ if (m_data.validate() == -1)
+ {
+ set_error(m_data);
+ return -1;
+ }
+ return 0;
+}
+
+#ifdef TEST_NDB_PACK
+#include <util/NdbTap.hpp>
+
+#define chk1(x) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; require(false); } while (0)
+
+#define chk2(x, e) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; ndbout << "NdbPack code: " << (e).get_error_code() << " line: " << (e).get_error_line() << endl; require(false); } while (0)
+
+#define ll0(x) do { if (verbose < 0) break; ndbout << "0- " << x << endl; } while (0)
+#define ll1(x) do { if (verbose < 1) break; ndbout << "1- " << x << endl; } while (0)
+#define ll2(x) do { if (verbose < 2) break; ndbout << "2- " << x << endl; } while (0)
+#define ll3(x) do { if (verbose < 3) break; ndbout << "3- " << x << endl; } while (0)
+
+#define xmin(a, b) ((a) < (b) ? (a) : (b))
+
+#include <random.h>
+
+static uint // random 0..n-1
+getrandom(uint n)
+{
+ if (n != 0) {
+ uint k = random();
+ return k % n;
+ }
+ return 0;
+}
+
+static uint // random 0..n-1 biased exponentially to smaller
+getrandom(uint n, uint bias)
+{
+ assert(bias != 0);
+ uint k = getrandom(n);
+ bias--;
+ while (bias != 0) {
+ k = getrandom(k + 1);
+ bias--;
+ }
+ return k;
+}
+
+static bool
+getrandompct(uint pct)
+{
+ return getrandom(100) < pct;
+}
+
+// change in TAPTEST
+static int seed = -1; // random
+static int loops = 0;
+static int spec_cnt = -1; // random
+static int fix_type = 0; // all types
+static int no_nullable = 0;
+static int data_cnt = -1; // Max
+static int bound_cnt = -1; // Max
+static int verbose = 0;
+
+struct Tspec {
+ enum { Max = 100 };
+ enum { MaxBuf = Max * 4000 };
+ NdbPack::Spec m_spec;
+ NdbPack::Type m_type[Max];
+ Tspec() {
+ m_spec.set_buf(m_type, Max);
+ }
+ void create();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tspec& tspec)
+{
+ out << tspec.m_spec;
+ return out;
+}
+
+void
+Tspec::create()
+{
+ m_spec.reset();
+ int cnt = spec_cnt == -1 ? 1 + getrandom(Tspec::Max, 3) : spec_cnt;
+ int i = 0;
+ while (i < cnt) {
+ int typeId = fix_type;
+ if (typeId == 0)
+ typeId = getrandom(g_ndb_pack_type_info_cnt);
+ const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+ switch (typeId) {
+ case NDB_TYPE_INT:
+ case NDB_TYPE_UNSIGNED:
+ case NDB_TYPE_CHAR:
+ case NDB_TYPE_VARCHAR:
+ case NDB_TYPE_LONGVARCHAR:
+ break;
+ default:
+ continue;
+ }
+ require(info.m_supported);
+ int byteSize = 0;
+ if (info.m_fixSize != 0)
+ byteSize = info.m_fixSize;
+ else if (info.m_arrayType == 0)
+ byteSize = 1 + getrandom(128, 1); // char(1-128)
+ else if (info.m_arrayType == 1)
+ byteSize = 1 + getrandom(256, 2); // varchar(0-255)
+ else if (info.m_arrayType == 2)
+ byteSize = 2 + getrandom(1024, 3); // longvarchar(0-1023)
+ else
+ require(false);
+ bool nullable = no_nullable ? false : getrandompct(50);
+ int csNumber = 0;
+ if (info.m_charType) {
+ csNumber = 8; // should include ascii
+ }
+ NdbPack::Type type(typeId, byteSize, nullable, csNumber);
+ chk2(m_spec.add(type) == 0, m_spec);
+ i++;
+ }
+ chk2(m_spec.validate() == 0, m_spec);
+}
+
+struct Tdata {
+ const Tspec& m_tspec;
+ NdbPack::Data m_data;
+ const bool m_isBound;
+ int m_cnt;
+ Uint8* m_xbuf; // unpacked
+ int m_xsize;
+ int m_xoff[Tspec::Max];
+ int m_xlen[Tspec::Max];
+ bool m_xnull[Tspec::Max];
+ int m_xnulls;
+ Uint32* m_poaiBuf; // plain old attr info
+ int m_poaiSize;
+ Uint8* m_packBuf; // packed
+ int m_packLen;
+ Tdata(Tspec& tspec, bool isBound, uint varBytes) :
+ m_tspec(tspec),
+ m_data(tspec.m_spec, isBound, varBytes),
+ m_isBound(isBound)
+ {
+ m_cnt = tspec.m_spec.get_cnt();
+ m_xbuf = 0;
+ m_poaiBuf = 0;
+ m_packBuf = 0;
+ }
+ ~Tdata() {
+ delete [] m_xbuf;
+ delete [] m_poaiBuf;
+ delete [] m_packBuf;
+ }
+ void create();
+ void add();
+ void finalize();
+ // compare using unpacked data
+ int xcmp(const Tdata& tdata2, int* num_eq) const;
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tdata& tdata)
+{
+ out << tdata.m_data;
+ return out;
+}
+
+void
+Tdata::create()
+{
+ union {
+ Uint8 xbuf[Tspec::MaxBuf];
+ Uint64 xbuf_align;
+ };
+ memset(xbuf, 0x3f, sizeof(xbuf));
+ m_xsize = 0;
+ m_xnulls = 0;
+ Uint32 poaiBuf[Tspec::MaxBuf / 4];
+ memset(poaiBuf, 0x5f, sizeof(poaiBuf));
+ m_poaiSize = 0;
+ m_packLen = m_data.get_var_bytes();
+ m_packLen += (m_tspec.m_spec.get_nullable_cnt(m_isBound) + 7) / 8;
+ int i = 0, j;
+ while (i < m_cnt) {
+ const NdbPack::Type& type = m_tspec.m_spec.get_type(i);
+ const int typeId = type.get_type_id();
+ const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+ m_xnull[i] = type.get_nullable() && getrandompct(25);
+ m_xnull[i] = false;
+ if (type.get_nullable() || m_isBound)
+ m_xnull[i] = getrandompct(20);
+ int pad = 0; // null-char pad not counted in xlen
+ if (!m_xnull[i]) {
+ m_xoff[i] = m_xsize;
+ Uint8* xptr = &xbuf[m_xsize];
+ switch (typeId) {
+ case NDB_TYPE_INT:
+ {
+ Int32 x = getrandom(10);
+ if (getrandompct(50))
+ x = (-1) * x;
+ memcpy(xptr, &x, 4);
+ m_xlen[i] = info.m_fixSize;
+ }
+ break;
+ case NDB_TYPE_UNSIGNED:
+ {
+ Uint32 x = getrandom(10);
+ memcpy(xptr, &x, 4);
+ m_xlen[i] = info.m_fixSize;
+ }
+ break;
+ case NDB_TYPE_CHAR:
+ {
+ require(type.get_byte_size() >= 1);
+ uint max_len = type.get_byte_size();
+ uint len = getrandom(max_len + 1, 1);
+ for (j = 0; j < len; j++)
+ {
+ xptr[j] = 'a' + getrandom(3);
+ }
+ for (j = len; j < max_len; j++)
+ {
+ xptr[j] = 0x20;
+ }
+ m_xlen[i] = max_len;
+ xptr[max_len] = 0;
+ pad = 1;
+ }
+ break;
+ case NDB_TYPE_VARCHAR:
+ {
+ require(type.get_byte_size() >= 1);
+ uint max_len = type.get_byte_size() - 1;
+ uint len = getrandom(max_len, 2);
+ require(len < 256);
+ xptr[0] = len;
+ for (j = 0; j < len; j++)
+ {
+ xptr[1 + j] = 'a' + getrandom(3);
+ }
+ m_xlen[i] = 1 + len;
+ xptr[1 + len] = 0;
+ pad = 1;
+ }
+ break;
+ case NDB_TYPE_LONGVARCHAR:
+ {
+ require(type.get_byte_size() >= 2);
+ uint max_len = type.get_byte_size() - 2;
+ uint len = getrandom(max_len, 3);
+ require(len < 256 * 256);
+ xptr[0] = (len & 0xFF);
+ xptr[1] = (len >> 8);
+ for (j = 0; j < len; j++)
+ {
+ xptr[2 + j] = 'a' + getrandom(3);
+ }
+ m_xlen[i] = 2 + len;
+ xptr[2 + len] = 0;
+ pad = 1;
+ }
+ break;
+ default:
+ require(false);
+ break;
+ }
+ m_xsize += m_xlen[i] + pad;
+ while (m_xsize % 8 != 0)
+ m_xsize++;
+ m_packLen += m_xlen[i];
+ } else {
+ m_xoff[i] = -1;
+ m_xlen[i] = 0;
+ m_xnulls++;
+ }
+ require(m_xnull[i] == (m_xoff[i] == -1));
+ require(m_xnull[i] == (m_xlen[i] == 0));
+ AttributeHeader* ah = (AttributeHeader*)&poaiBuf[m_poaiSize];
+ ah->setAttributeId(i); // not used
+ ah->setByteSize(m_xlen[i]);
+ m_poaiSize++;
+ if (!m_xnull[i]) {
+ memcpy(&poaiBuf[m_poaiSize], &xbuf[m_xoff[i]], m_xlen[i]);
+ m_poaiSize += (m_xlen[i] + 3) / 4;
+ }
+ i++;
+ }
+ require(m_xsize % 8 == 0);
+ m_xbuf = (Uint8*) new Uint64 [m_xsize / 8];
+ memcpy(m_xbuf, xbuf, m_xsize);
+ m_poaiBuf = (Uint32*) new Uint32 [m_poaiSize];
+ memcpy(m_poaiBuf, poaiBuf, m_poaiSize << 2);
+}
+
+void
+Tdata::add()
+{
+ m_packBuf = new Uint8 [m_packLen];
+ m_data.set_buf(m_packBuf, m_packLen);
+ int i, j;
+ j = 0;
+ while (j <= 1) {
+ if (j == 1)
+ m_data.reset();
+ i = 0;
+ while (i < m_cnt) {
+ Uint32 xlen = ~(Uint32)0;
+ if (!m_xnull[i]) {
+ int xoff = m_xoff[i];
+ const Uint8* xptr = &m_xbuf[xoff];
+ chk2(m_data.add(xptr, &xlen) == 0, m_data);
+ chk1(xlen == m_xlen[i]);
+ } else {
+ chk2(m_data.add_null(&xlen) == 0, m_data);
+ chk1(xlen == 0);
+ }
+ i++;
+ }
+ chk2(m_data.validate() == 0, m_data);
+ chk1(m_data.get_null_cnt() == m_xnulls);
+ j++;
+ }
+}
+
+void
+Tdata::finalize()
+{
+ chk2(m_data.finalize() == 0, m_data);
+ ll3("create: " << m_data);
+ chk1(m_data.get_full_len() == m_packLen);
+ {
+ const Uint8* p = (const Uint8*)m_data.get_full_buf();
+ chk1(p[0] + (p[1] << 8) == m_packLen - 2);
+ }
+}
+
+int
+Tdata::xcmp(const Tdata& tdata2, int* num_eq) const
+{
+ const Tdata& tdata1 = *this;
+ require(&tdata1.m_tspec == &tdata2.m_tspec);
+ const Tspec& tspec = tdata1.m_tspec;
+ int res = 0;
+ int cnt = xmin(tdata1.m_cnt, tdata2.m_cnt);
+ int i;
+ for (i = 0; i < cnt; i++) {
+ if (!tdata1.m_xnull[i]) {
+ if (!tdata2.m_xnull[i]) {
+ // the pointers are Uint64-aligned
+ const Uint8* xptr1 = &tdata1.m_xbuf[tdata1.m_xoff[i]];
+ const Uint8* xptr2 = &tdata2.m_xbuf[tdata2.m_xoff[i]];
+ const int xlen1 = tdata1.m_xlen[i];
+ const int xlen2 = tdata2.m_xlen[i];
+ const NdbPack::Type& type = tspec.m_spec.get_type(i);
+ const int typeId = type.get_type_id();
+ const int csNumber = type.get_cs_number();
+ CHARSET_INFO* cs = all_charsets[csNumber];
+ switch (typeId) {
+ case NDB_TYPE_INT:
+ {
+ require(cs == 0);
+ Int32 x1 = *(const Int32*)xptr1;
+ Int32 x2 = *(const Int32*)xptr2;
+ if (x1 < x2)
+ res = -1;
+ else if (x1 > x2)
+ res = +1;
+ ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
+ }
+ break;
+ case NDB_TYPE_UNSIGNED:
+ {
+ require(cs == 0);
+ Uint32 x1 = *(const Uint32*)xptr1;
+ Uint32 x2 = *(const Uint32*)xptr2;
+ if (x1 < x2)
+ res = -1;
+ else if (x1 > x2)
+ res = +1;
+ ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
+ }
+ break;
+ case NDB_TYPE_CHAR:
+ {
+ require(cs != 0 && cs->coll != 0);
+ const uint n1 = xlen1;
+ const uint n2 = xlen2;
+ const uchar* t1 = &xptr1[0];
+ const uchar* t2 = &xptr2[0];
+ const char* s1 = (const char*)t1;
+ const char* s2 = (const char*)t2;
+ chk1(n1 == strlen(s1));
+ chk1(n2 == strlen(s2));
+ res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+ ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+ }
+ break;
+ case NDB_TYPE_VARCHAR:
+ {
+ require(cs != 0 && cs->coll != 0);
+ const uint n1 = xptr1[0];
+ const uint n2 = xptr2[0];
+ const uchar* t1 = &xptr1[1];
+ const uchar* t2 = &xptr2[1];
+ const char* s1 = (const char*)t1;
+ const char* s2 = (const char*)t2;
+ chk1(n1 == strlen(s1));
+ chk1(n2 == strlen(s2));
+ res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+ ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+ }
+ break;
+ case NDB_TYPE_LONGVARCHAR:
+ {
+ require(cs != 0 && cs->coll != 0);
+ const uint n1 = xptr1[0] | (xptr1[1] << 8);
+ const uint n2 = xptr2[0] | (xptr2[1] << 8);
+ const uchar* t1 = &xptr1[2];
+ const uchar* t2 = &xptr2[2];
+ const char* s1 = (const char*)t1;
+ const char* s2 = (const char*)t2;
+ chk1(n1 == strlen(s1));
+ chk1(n2 == strlen(s2));
+ res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+ ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+ }
+ break;
+ default:
+ require(false);
+ break;
+ }
+ } else
+ res = +1;
+ } else if (!tdata2.m_xnull[i])
+ res = -1;
+ if (res != 0)
+ break;
+ }
+ *num_eq = i;
+ ll3("xcmp res:" << res << " num_eq:" << *num_eq);
+ return res;
+}
+
+struct Tbound {
+ Tdata& m_tdata;
+ NdbPack::Bound m_bound;
+ Tbound(Tdata& tdata) :
+ m_tdata(tdata),
+ m_bound(tdata.m_data)
+ {
+ m_tdata.m_cnt = 1 + getrandom(m_tdata.m_cnt);
+ }
+ void create();
+ void add();
+ void finalize();
+ int xcmp(const Tdata& tdata2, int* num_eq) const;
+ int xcmp(const Tbound& tbound2, int* num_eq) const;
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tbound& tbound)
+{
+ out << tbound.m_bound;
+ return out;
+}
+
+void
+Tbound::create()
+{
+ m_tdata.create();
+}
+
+void
+Tbound::add()
+{
+ m_tdata.add();
+}
+
+void
+Tbound::finalize()
+{
+ int side = getrandompct(50) ? -1 : +1;
+ chk2(m_bound.finalize(side) == 0, m_bound);
+ chk2(m_bound.validate() == 0, m_bound);
+ chk1(m_tdata.m_data.get_full_len() == m_tdata.m_packLen);
+}
+
+int
+Tbound::xcmp(const Tdata& tdata2, int* num_eq) const
+{
+ const Tbound& tbound1 = *this;
+ const Tdata& tdata1 = tbound1.m_tdata;
+ require(tdata1.m_cnt <= tdata2.m_cnt);
+ *num_eq = -1;
+ int res = tdata1.xcmp(tdata2, num_eq);
+ if (res == 0) {
+ chk1(*num_eq == tdata1.m_cnt);
+ res = m_bound.get_side();
+ }
+ return res;
+}
+
+int
+Tbound::xcmp(const Tbound& tbound2, int* num_eq) const
+{
+ const Tbound& tbound1 = *this;
+ const Tdata& tdata1 = tbound1.m_tdata;
+ const Tdata& tdata2 = tbound2.m_tdata;
+ *num_eq = -1;
+ int res = tdata1.xcmp(tdata2, num_eq);
+ chk1(0 <= *num_eq && *num_eq <= xmin(tdata1.m_cnt, tdata2.m_cnt));
+ if (res == 0) {
+ chk1(*num_eq == xmin(tdata1.m_cnt, tdata2.m_cnt));
+ if (tdata1.m_cnt < tdata2.m_cnt)
+ res = (+1) * tbound1.m_bound.get_side();
+ else if (tdata1.m_cnt > tdata2.m_cnt)
+ res = (-1) * tbound2.m_bound.get_side();
+ else if (tbound1.m_bound.get_side() < tbound2.m_bound.get_side())
+ res = -1;
+ else if (tbound1.m_bound.get_side() > tbound2.m_bound.get_side())
+ res = +1;
+ }
+ return res;
+}
+
+struct Tdatalist {
+ enum { Max = 1000 };
+ Tdata* m_tdata[Max];
+ int m_cnt;
+ Tdatalist(Tspec& tspec) {
+ m_cnt = data_cnt == -1 ? Max : data_cnt;
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ m_tdata[i] = new Tdata(tspec, false, 2);
+ }
+ }
+ ~Tdatalist() {
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ delete m_tdata[i];
+ }
+ }
+ void create();
+ void sort();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tdatalist& tdatalist)
+{
+ int i;
+ for (i = 0; i < tdatalist.m_cnt; i++) {
+ out << "data " << i << ": " << *tdatalist.m_tdata[i];
+ if (i + 1 < tdatalist.m_cnt)
+ out << endl;
+ }
+ return out;
+}
+
+void
+Tdatalist::create()
+{
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ Tdata& tdata = *m_tdata[i];
+ tdata.create();
+ tdata.add();
+ tdata.finalize();
+ }
+}
+
+static int
+data_cmp(const void* a1, const void* a2)
+{
+ const Tdata& tdata1 = **(const Tdata**)a1;
+ const Tdata& tdata2 = **(const Tdata**)a2;
+ require(tdata1.m_cnt == tdata2.m_cnt);
+ const Uint32 cnt = tdata1.m_cnt;
+ Uint32 num_eq = ~(Uint32)0;
+ int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq);
+ require(num_eq <= (Uint32)tdata1.m_cnt);
+ require(num_eq <= (Uint32)tdata2.m_cnt);
+ return res;
+}
+
+void
+Tdatalist::sort()
+{
+ ll1("data sort: in");
+ ll3(endl << *this);
+ qsort(m_tdata, m_cnt, sizeof(Tdata*), data_cmp);
+ ll1("data sort: out");
+ ll3(endl << *this);
+ int i;
+ for (i = 0; i + 1 < m_cnt; i++) {
+ const Tdata& tdata1 = *m_tdata[i];
+ const Tdata& tdata2 = *m_tdata[i + 1];
+ require(tdata1.m_cnt == tdata2.m_cnt);
+ const Uint32 cnt = tdata1.m_cnt;
+ Uint32 num_eq1 = ~(Uint32)0;
+ int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq1);
+ chk1(res <= 0);
+ // also via unpacked data
+ int num_eq2 = -1;
+ int res2 = tdata1.xcmp(tdata2, &num_eq2);
+ if (res < 0)
+ chk1(res2 < 0);
+ else if (res == 0)
+ chk1(res2 == 0);
+ else
+ chk1(res2 > 0);
+ chk1(num_eq1 == (Uint32)num_eq2);
+ }
+}
+
+struct Tboundlist {
+ enum { Max = 1000 };
+ Tbound* m_tbound[Max];
+ int m_cnt;
+ Tboundlist(Tspec& tspec) {
+ m_cnt = bound_cnt == -1 ? Max : bound_cnt;
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ Tdata* tdata = new Tdata(tspec, true, 0);
+ m_tbound[i] = new Tbound(*tdata);
+ }
+ }
+ ~Tboundlist() {
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ Tdata* tdata = &m_tbound[i]->m_tdata;
+ delete m_tbound[i];
+ delete tdata;
+ }
+ }
+ void create();
+ void sort();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tboundlist& tboundlist)
+{
+ int i;
+ for (i = 0; i < tboundlist.m_cnt; i++) {
+ out << "bound " << i << ": " << *tboundlist.m_tbound[i];
+ if (i + 1 < tboundlist.m_cnt)
+ out << endl;
+ }
+ return out;
+}
+
+void
+Tboundlist::create()
+{
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ Tbound& tbound = *m_tbound[i];
+ tbound.create();
+ tbound.add();
+ tbound.finalize();
+ }
+}
+
+static int
+bound_cmp(const void* a1, const void* a2)
+{
+ const Tbound& tbound1 = **(const Tbound**)a1;
+ const Tbound& tbound2 = **(const Tbound**)a2;
+ const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
+ Uint32 num_eq = ~(Uint32)0;
+ int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq);
+ require(num_eq <= cnt);
+ require(num_eq <= cnt);
+ return res;
+}
+
+void
+Tboundlist::sort()
+{
+ ll1("bound sort: in");
+ ll3(endl << *this);
+ qsort(m_tbound, m_cnt, sizeof(Tbound*), bound_cmp);
+ ll1("bound sort: out");
+ ll3(endl << *this);
+ int i;
+ for (i = 0; i + 1 < m_cnt; i++) {
+ const Tbound& tbound1 = *m_tbound[i];
+ const Tbound& tbound2 = *m_tbound[i + 1];
+ const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
+ Uint32 num_eq1 = ~(Uint32)0;
+ int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq1);
+ chk1(res <= 0);
+ // also via unpacked data
+ int num_eq2 = -1;
+ int res2 = tbound1.xcmp(tbound2, &num_eq2);
+ if (res < 0)
+ chk1(res2 < 0);
+ else if (res == 0)
+ chk1(res2 == 0);
+ else
+ chk1(res2 > 0);
+ chk1(num_eq1 == (Uint32)num_eq2);
+ }
+}
+
+static void
+testdesc(const Tdata& tdata)
+{
+ ll3("testdesc: " << tdata);
+ const Tspec& tspec = tdata.m_tspec;
+ const NdbPack::Data& data = tdata.m_data;
+ const Uint8* buf_old = (const Uint8*)data.get_full_buf();
+ const Uint32 varBytes = data.get_var_bytes();
+ const Uint32 nullMaskLen = tspec.m_spec.get_nullmask_len(false);
+ const Uint32 dataLen = data.get_data_len();
+ const Uint32 fullLen = data.get_full_len();
+ const Uint32 cnt = data.get_cnt();
+ chk1(fullLen == varBytes + dataLen);
+ NdbPack::Data data_new(tspec.m_spec, false, varBytes);
+ Uint8 buf_new[Tspec::MaxBuf];
+ data_new.set_buf(buf_new, sizeof(buf_new));
+ memcpy(buf_new, buf_old, fullLen);
+ chk2(data_new.desc_all(cnt) == 0, data_new);
+ chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
+ chk1(data_new.get_data_len() == data.get_data_len());
+ chk1(data_new.get_cnt() == data.get_cnt());
+ chk1(data_new.get_null_cnt() == data.get_null_cnt());
+}
+
+static void
+testcopy(const Tdata& tdata)
+{
+ ll3("testcopy: " << tdata);
+ const Tspec& tspec = tdata.m_tspec;
+ const NdbPack::Data& data = tdata.m_data;
+ int n = getrandom(tdata.m_cnt + 1);
+ do {
+ ll3("testcopy: cnt:" << tdata.m_cnt << " n:" << n);
+ NdbPack::DataC data_old(tspec.m_spec, false);
+ data_old.set_buf(data.get_data_buf(), data.get_data_len(), n);
+ chk1(data_old.get_cnt() == n);
+ NdbPack::Data data_new(tspec.m_spec, false, 0);
+ Uint8 buf_new[Tspec::MaxBuf];
+ data_new.set_buf(buf_new, sizeof(buf_new));
+ chk2(data_new.copy(data_old) == 0, data_new);
+ chk1(data_new.get_cnt() == n);
+ Uint32 num_eq1 = ~(Uint32)0;
+ chk1(data_new.cmp(data_old, n, num_eq1) == 0);
+ chk1(num_eq1 == n);
+ Uint32 num_eq2 = ~(Uint32)0;
+ chk1(data_old.cmp(data_new, n, num_eq2) == 0);
+ chk1(num_eq2 == n);
+ n = getrandom(n);
+ } while (n != 0);
+}
+
+static void
+testpoai(const Tdata& tdata)
+{
+ ll3("testpoai: " << tdata);
+ const Tspec& tspec = tdata.m_tspec;
+ const NdbPack::Data& data = tdata.m_data;
+ NdbPack::Data data_new(tspec.m_spec, false, data.get_var_bytes());
+ Uint8 buf_new[Tspec::MaxBuf];
+ data_new.set_buf(buf_new, sizeof(buf_new));
+ Uint32 poaiLen = ~(Uint32)0;
+ chk2(data_new.add_poai(tdata.m_poaiBuf, tdata.m_cnt, &poaiLen) == 0, data);
+ chk2(data_new.finalize() == 0, data_new);
+ chk2(data_new.validate() == 0, data_new);
+ chk1(tspec.m_spec.get_nullmask_len(false) + poaiLen == data.get_data_len());
+ chk1(data_new.get_full_len() == data.get_full_len());
+ chk1(memcmp(data_new.get_full_buf(), data.get_full_buf(), data.get_full_len()) == 0);
+ chk1(data_new.get_null_cnt() == data.get_null_cnt());
+}
+
+static void
+testconvert(const Tdata& tdata)
+{
+ ll3("testconvert: " << tdata);
+ const Tspec& tspec = tdata.m_tspec;
+ const NdbPack::Data& data = tdata.m_data;
+ NdbPack::Data data_new(tspec.m_spec, false, 2);
+ Uint8 buf_new[Tspec::MaxBuf];
+ data_new.set_buf(buf_new, sizeof(buf_new));
+ chk2(data_new.copy(data) == 0, data_new);
+ require(tdata.m_cnt == data.get_cnt());
+ require(data.get_cnt() == data_new.get_cnt());
+ const Uint32 cnt = tdata.m_cnt;
+ Uint32 num_eq;
+ switch (NdbPack::Endian::get_endian()) {
+ case NdbPack::Endian::Little:
+ chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+ num_eq = ~(Uint32)0;
+ chk1(data.cmp(data_new, cnt, num_eq) == 0);
+ chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+ chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+ num_eq = ~(Uint32)0;
+ chk1(data.cmp(data_new, cnt, num_eq) == 0);
+ break;
+ case NdbPack::Endian::Big:
+ chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+ num_eq = ~(Uint32)0;
+ chk1(data.cmp(data_new, cnt, num_eq) == 0);
+ chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+ chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+ num_eq = ~(Uint32)0;
+ chk1(data.cmp(data_new, cnt, num_eq) == 0);
+ break;
+ default:
+ require(false);
+ break;
+ }
+}
+
+static void
+testdata(const Tdatalist& tdatalist)
+{
+ int i;
+ for (i = 0; i < tdatalist.m_cnt; i++) {
+ const Tdata& tdata = *tdatalist.m_tdata[i];
+ testdesc(tdata);
+ testcopy(tdata);
+ testpoai(tdata);
+ testconvert(tdata);
+ }
+}
+
+static void
+testcmp(const Tbound& tbound, const Tdatalist& tdatalist, int* kb)
+{
+ ll3("testcmp: " << tbound);
+ int oldres = 0;
+ int n1 = 0;
+ int n2 = 0;
+ int i;
+ for (i = 0; i < tdatalist.m_cnt; i++) {
+ const Tdata& tdata = *tdatalist.m_tdata[i];
+ require(tbound.m_tdata.m_cnt == tbound.m_bound.get_data().get_cnt());
+ const Uint32 cnt = tbound.m_tdata.m_cnt;
+ Uint32 num_eq1 = ~(Uint32)0;
+ // reverse result for key vs bound
+ int res = (-1) * tbound.m_bound.cmp(tdata.m_data, cnt, num_eq1);
+ chk1(res != 0);
+ res = (res < 0 ? (n1++, -1) : (n2++, +1));
+ if (i > 0) {
+ // at some point flips from -1 to +1
+ chk1(oldres <= res);
+ }
+ oldres = res;
+ // also via unpacked data
+ int num_eq2 = -1;
+ int res2 = (-1) * tbound.xcmp(tdata, &num_eq2);
+ if (res < 0)
+ chk1(res2 < 0);
+ else
+ chk1(res2 > 0);
+ chk1(num_eq1 == (Uint32)num_eq2);
+ }
+ require(n1 + n2 == tdatalist.m_cnt);
+ ll2("keys before:" << n1 << " after:" << n2);
+ *kb = n1;
+}
+
+static void
+testcmp(const Tboundlist& tboundlist, const Tdatalist& tdatalist)
+{
+ int i;
+ int oldkb = 0;
+ for (i = 0; i < tboundlist.m_cnt; i++) {
+ const Tbound& tbound = *tboundlist.m_tbound[i];
+ int kb = 0;
+ testcmp(tbound, tdatalist, &kb);
+ if (i > 0) {
+ chk1(oldkb <= kb);
+ }
+ oldkb = kb;
+ }
+}
+
+static void
+testrun()
+{
+ Tspec tspec;
+ tspec.create();
+ ll1("spec: " << tspec);
+ Tdatalist tdatalist(tspec);
+ tdatalist.create();
+ tdatalist.sort();
+ testdata(tdatalist);
+ if (bound_cnt != 0) {
+ Tboundlist tboundlist(tspec);
+ tboundlist.create();
+ tboundlist.sort();
+ testcmp(tboundlist, tdatalist);
+ }
+}
+
+extern void NdbOut_Init();
+
+static int
+testmain()
+{
+ my_init();
+ NdbOut_Init();
+ signal(SIGABRT, SIG_DFL);
+ { const char* p = NdbEnv_GetEnv("TEST_NDB_PACK_VERBOSE", (char*)0, 0);
+ if (p != 0)
+ verbose = atoi(p);
+ }
+ if (seed == 0)
+ ll0("random seed: loop number");
+ else {
+ if (seed < 0)
+ seed = getpid();
+ ll0("random seed: " << seed);
+ srandom(seed);
+ }
+ loops = 100;
+ int i;
+ for (i = 0; loops == 0 || i < loops; i++) {
+ ll0("loop:" << i << "/" << loops);
+ if (seed == 0)
+ srandom(i);
+ testrun();
+ }
+ ndbout << "ok" << endl;
+ return 0;
+}
+
+TAPTEST(NdbPack)
+{
+ int ret = testmain();
+ return (ret == 0);
+}
+
+#endif
Attachment: [text/bzr-bundle] bzr/pekka@mysql.com-20110504094418-0ntwcu4728sv7n2l.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0-wl4163 branch (pekka:4351) WL#4163 | Pekka Nousiainen | 4 May |