#At file:///home/msvensson/mysql/tmp/mLgrDbtgG8/7.1/ based on revid:craig.russell@stripped
4212 magnus.blaudd@stripped 2011-05-23 [merge]
Merge 7.0 -> 7.1
added:
mysql-test/suite/ndb/r/ndb_dd_bug12581213.result
mysql-test/suite/ndb/t/ndb_dd_bug12581213.cnf
mysql-test/suite/ndb/t/ndb_dd_bug12581213.test
storage/ndb/include/util/NdbPack.hpp
storage/ndb/src/common/util/NdbPack.cpp
modified:
storage/ndb/CMakeLists.txt
storage/ndb/include/util/NdbSqlUtil.hpp
storage/ndb/src/common/util/CMakeLists.txt
storage/ndb/src/common/util/Makefile.am
storage/ndb/src/common/util/NdbSqlUtil.cpp
storage/ndb/src/kernel/blocks/ERROR_codes.txt
storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp
storage/ndb/src/kernel/blocks/ndbfs/Pool.hpp
storage/ndb/src/ndbapi/NdbIndexStat.cpp
storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
storage/ndb/test/run-test/conf-blade08.cnf
storage/ndb/test/run-test/conf-dl145a.cnf
storage/ndb/test/run-test/conf-fimafeng08.cnf
storage/ndb/test/run-test/conf-fimafeng09.cnf*
storage/ndb/test/run-test/conf-loki27.cnf*
storage/ndb/test/run-test/conf-ndb07.cnf
storage/ndb/test/run-test/conf-ndbmaster.cnf
storage/ndb/test/run-test/conf-repl.cnf
storage/ndb/test/run-test/conf-techra29.cnf*
storage/ndb/test/run-test/conf-test.cnf
storage/ndb/test/run-test/conf-tyr64.cnf*
storage/ndb/test/run-test/conf-upgrade.cnf
=== added file 'mysql-test/suite/ndb/r/ndb_dd_bug12581213.result'
--- a/mysql-test/suite/ndb/r/ndb_dd_bug12581213.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_dd_bug12581213.result 2011-05-23 10:38:41 +0000
@@ -0,0 +1,17 @@
+CREATE LOGFILE GROUP lg1
+ADD UNDOFILE 'undofile.dat'
+INITIAL_SIZE 16M
+UNDO_BUFFER_SIZE = 1M
+ENGINE NDB;
+CREATE TABLESPACE ts1
+ADD DATAFILE 'datafile.dat'
+USE LOGFILE GROUP lg1
+INITIAL_SIZE 12M
+ENGINE NDB;
+alter tablespace ts1
+drop datafile 'datafile.dat'
+engine ndb;
+drop tablespace ts1
+engine ndb;
+drop logfile group lg1
+engine ndb;
=== added file 'mysql-test/suite/ndb/t/ndb_dd_bug12581213.cnf'
--- a/mysql-test/suite/ndb/t/ndb_dd_bug12581213.cnf 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_dd_bug12581213.cnf 2011-05-23 10:38:41 +0000
@@ -0,0 +1,7 @@
+!include suite/ndb/my.cnf
+
+[cluster_config.1]
+ndbd=
+NoOfReplicas=1
+MaxNoOfOpenFiles=27
+InitialNoOfOpenFiles=26
=== added file 'mysql-test/suite/ndb/t/ndb_dd_bug12581213.test'
--- a/mysql-test/suite/ndb/t/ndb_dd_bug12581213.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_dd_bug12581213.test 2011-05-23 10:38:41 +0000
@@ -0,0 +1,23 @@
+-- source include/have_ndb.inc
+
+CREATE LOGFILE GROUP lg1
+ADD UNDOFILE 'undofile.dat'
+INITIAL_SIZE 16M
+UNDO_BUFFER_SIZE = 1M
+ENGINE NDB;
+
+CREATE TABLESPACE ts1
+ADD DATAFILE 'datafile.dat'
+USE LOGFILE GROUP lg1
+INITIAL_SIZE 12M
+ENGINE NDB;
+
+alter tablespace ts1
+drop datafile 'datafile.dat'
+engine ndb;
+
+drop tablespace ts1
+engine ndb;
+
+drop logfile group lg1
+engine ndb;
=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt 2011-05-09 14:01:14 +0000
+++ b/storage/ndb/CMakeLists.txt 2011-05-23 14:13:35 +0000
@@ -17,6 +17,8 @@
SET(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH}
${CMAKE_SOURCE_DIR}/cmake
${CMAKE_SOURCE_DIR}/storage/ndb/cmake)
+
+MESSAGE(STATUS "Using cmake version ${CMAKE_VERSION}")
# Check if this is MySQL Cluster build i.e the MySQL Server
# version string ends in -ndb-Y.Y.Y[-status]
=== added file 'storage/ndb/include/util/NdbPack.hpp'
--- a/storage/ndb/include/util/NdbPack.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/NdbPack.hpp 2011-05-04 09:44:18 +0000
@@ -0,0 +1,857 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_PACK_HPP
+#define NDB_PACK_HPP
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <kernel/AttributeHeader.hpp>
+#include <NdbSqlUtil.hpp>
+#include <NdbEnv.h>
+class NdbOut;
+
+/*
+ * Pack an array of NDB data values. The types are specified by an
+ * array of data types. There is no associated table or attribute ids.
+ * All or an initial sequence of the specified values are present.
+ *
+ * Currently used for ordered index keys and bounds in kernel (DBTUX)
+ * and in index statistics (mysqld). The comparison methods use the
+ * primitive type comparisons from NdbSqlUtil.
+ *
+ * Keys and bounds use same spec. However a value in an index bound can
+ * be NULL even if the key attribute is not nullable. Therefore bounds
+ * set the "allNullable" property and have a longer null mask.
+ *
+ * There are two distinct use occasions: 1) construction of data or
+ * bound 2) operating on previously constructed data or bound. There
+ * are classes Data/DataC and Bound/BoundC for these uses. The latter
+ * often can return a result without interpreting the full value.
+ *
+ * Methods return -1 on error and 0 on success. Comparison methods
+ * assume well-formed data and return negative, zero, positive for less,
+ * equal, greater.
+ */
+
+class NdbPack {
+public:
+ class Endian;
+ class Type;
+ class Spec;
+ class Iter;
+ class DataC;
+ class Data;
+ class BoundC;
+ class Bound;
+
+ /*
+ * Get SQL type.
+ */
+ static const NdbSqlUtil::Type& getSqlType(Uint32 typeId);
+
+ /*
+ * Error codes for core dumps.
+ */
+ class Error {
+ public:
+ enum {
+ TypeNotSet = -101, // type id was not set
+ TypeOutOfRange = -102, // type id is out of range
+ TypeNotSupported = -103, // blob (and for now bit) types
+ TypeSizeZero = -104, // max size was set to zero
+ TypeFixSizeInvalid = -105, // fixed size specified wrong
+ TypeNullableNotBool = -106, // nullable must be 0 or 1
+ CharsetNotSpecified = -107, // char type with no charset number
+ CharsetNotFound = -108, // cannot install in all_charsets[]
+ CharsetNotAllowed = -109, // non-char type with charset
+ SpecBufOverflow = -201, // more spec items than allocated
+ DataCntOverflow = -301, // more data items than in spec
+ DataBufOverflow = -302, // more data bytes than allocated
+ DataValueOverflow = -303, // var length exceeds max size
+ DataNotNullable = -304, // NULL value to not-nullable type
+ InvalidAttrInfo = -305, // invalid plain old attr info
+ BoundEmptySide = -401, // side not 0 for empty bound
+ BoundNonemptySide = -402, // side not -1,+1 for non-empty bound
+ InternalError = -901,
+ ValidationError = -902,
+ NoError = 0
+ };
+ Error();
+ ~Error() {}
+ int get_error_code() const;
+ int get_error_line() const;
+
+ private:
+ friend class Endian;
+ friend class Type;
+ friend class Spec;
+ friend class Iter;
+ friend class DataC;
+ friend class Data;
+ friend class BoundC;
+ friend class Bound;
+ void set_error(int code, int line) const;
+ void set_error(const Error& e2) const;
+ mutable int m_error_code;
+ mutable int m_error_line;
+ };
+
+ /*
+ * Endian definitions.
+ */
+ class Endian {
+ public:
+ enum Value {
+ Native = 0,
+ Little = 1,
+ Big = 2
+ };
+ static Value get_endian();
+ static void convert(void* ptr, Uint32 len);
+ };
+
+ /*
+ * Data type.
+ */
+ class Type : public Error {
+ public:
+ Type();
+ Type(int typeId, Uint32 byteSize, bool nullable, Uint32 csNumber);
+ ~Type() {}
+ /*
+ * Define the type. Size is fixed or max size. Values of variable
+ * length have length bytes. The definition is verified when the
+ * type is added to the specification. This also installs missing
+ * CHARSET_INFO* into all_charsets[].
+ */
+ void set(Uint32 typeId, Uint32 byteSize, bool nullable, Uint32 csNumber);
+ // getters
+ Uint32 get_type_id() const;
+ Uint32 get_byte_size() const;
+ bool get_nullable() const;
+ Uint32 get_cs_number() const;
+ Uint32 get_array_type() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const Type&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ friend class Spec;
+ friend class Iter;
+ friend class DataC;
+ friend class Data;
+ // verify and complete when added to specification
+ int complete();
+ Uint16 m_typeId;
+ Uint16 m_byteSize; // fixed or max size in bytes
+ Uint16 m_nullable;
+ Uint16 m_csNumber;
+ Uint16 m_arrayType; // 0,1,2 length bytes
+ Uint16 m_nullbitPos; // computed as part of Spec
+ };
+
+ /*
+ * Data specification i.e. array of types. Usually constructed on the
+ * heap, so keep fairly small. Used for boths keys and bounds.
+ */
+ class Spec : public Error {
+ public:
+ Spec();
+ ~Spec() {}
+ // set initial buffer (calls reset)
+ void set_buf(Type* buf, Uint32 bufMaxCnt);
+ // use if buffer is relocated
+ void set_buf(Type* buf);
+ // reset but keep buffer
+ void reset();
+ // add type to specification once or number of times
+ int add(Type type);
+ int add(Type type, Uint32 cnt);
+ // copy from
+ void copy(const Spec& s2);
+ // getters (bounds set allNullable)
+ const Type& get_type(Uint32 i) const;
+ Uint32 get_cnt() const;
+ Uint32 get_nullable_cnt(bool allNullable) const;
+ Uint32 get_nullmask_len(bool allNullable) const;
+ // max data length including null mask
+ Uint32 get_max_data_len(bool allNullable) const;
+ // minimum var bytes (if used by Data instance)
+ Uint32 get_min_var_bytes(bool allNullable) const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const Spec&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ friend class Iter;
+ friend class DataC;
+ friend class Data;
+ friend class BoundC;
+ // undefined
+ Spec(const Spec&);
+ Spec& operator=(const Spec&);
+ Type* m_buf;
+ Uint16 m_bufMaxCnt;
+ Uint16 m_cnt;
+ Uint16 m_nullableCnt;
+ Uint16 m_varsizeCnt;
+ Uint32 m_maxByteSize; // excludes null mask
+ };
+
+ /*
+ * Iterator over data items. DataC uses external Iter instances in
+ * comparison methods etc. Data contains an Iter instance which
+ * iterates on items added.
+ */
+ class Iter : public Error {
+ public:
+ // the data instance is only used to set metadata
+ Iter(const DataC& data);
+ ~Iter() {}
+ void reset();
+
+ private:
+ friend class DataC;
+ friend class Data;
+ friend class BoundC;
+ // undefined
+ Iter(const Iter&);
+ Iter& operator=(const Iter&);
+ // describe next non-null or null item and advance iterator
+ int desc(const Uint8* item);
+ int desc_null();
+ // compare current items (DataC buffers are passed)
+ int cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const;
+
+ const Spec& m_spec;
+ const bool m_allNullable;
+ // iterator
+ Uint32 m_itemPos; // position of current item in DataC buffer
+ Uint32 m_cnt; // number of items described so far
+ Uint32 m_nullCnt;
+ // current item
+ Uint32 m_lenBytes; // 0-2
+ Uint32 m_bareLen; // excludes length bytes
+ Uint32 m_itemLen; // full length, value zero means null
+ };
+
+ /*
+ * Read-only superclass of Data. Initialized from a previously
+ * constructed Data buffer (any var bytes skipped). Methods interpret
+ * one data item at a time. Values are native endian.
+ */
+ class DataC : public Error {
+ public:
+ DataC(const Spec& spec, bool allNullable);
+ // set buffer to previously constructed one with given item count
+ void set_buf(const void* buf, Uint32 bufMaxLen, Uint32 cnt);
+ // interpret next data item
+ int desc(Iter& r) const;
+ // compare cnt attrs and also return number of initial equal attrs
+ int cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const;
+ // getters
+ const Spec& get_spec() const;
+ bool get_all_nullable() const;
+ const void* get_data_buf() const;
+ Uint32 get_cnt() const;
+ bool is_empty() const;
+ bool is_full() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const DataC&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz, bool convert_flag = false) const;
+ int validate() const { return 0; }
+
+ private:
+ friend class Iter;
+ friend class Data;
+ friend class BoundC;
+ // undefined
+ DataC(const Data&);
+ DataC& operator=(const DataC&);
+ const Spec& m_spec;
+ const bool m_allNullable;
+ const Uint8* m_buf;
+ Uint32 m_bufMaxLen;
+ // can be updated as part of Data instance
+ Uint32 m_cnt;
+ };
+
+ /*
+ * Instance of an array of data values. The values are packed into
+ * a byte buffer. The buffer is also maintained as a single varbinary
+ * value if non-zero var bytes (length bytes) is specified.
+ */
+ class Data : public DataC {
+ public:
+ Data(const Spec& spec, bool allNullable, Uint32 varBytes);
+ // set buffer (calls reset)
+ void set_buf(void* buf, Uint32 bufMaxLen);
+ // reset but keep buffer (header is zeroed)
+ void reset();
+ // add non-null data items and return length in bytes
+ int add(const void* data, Uint32* len_out);
+ int add(const void* data, Uint32 cnt, Uint32* len_out);
+ // add null data items and return length 0 bytes
+ int add_null(Uint32* len_out);
+ int add_null(Uint32 cnt, Uint32* len_out);
+ // add from "plain old attr info"
+ int add_poai(const Uint32* poai, Uint32* len_out);
+ int add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out);
+ // call this before first use
+ int finalize();
+ // copy from
+ int copy(const DataC& d2);
+ // convert endian
+ int convert(Endian::Value to_endian);
+ // create complete instance from buffer contents
+ int desc_all(Uint32 cnt);
+ // getters
+ Uint32 get_max_len() const;
+ Uint32 get_max_len4() const;
+ Uint32 get_var_bytes() const;
+ const void* get_full_buf() const;
+ Uint32 get_full_len() const;
+ Uint32 get_data_len() const;
+ Uint32 get_null_cnt() const;
+ Endian::Value get_endian() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const Data&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ friend class Iter;
+ friend class Bound;
+ // undefined
+ Data(const Data&);
+ Data& operator=(const Data&);
+ int finalize_impl();
+ int convert_impl(Endian::Value to_endian);
+ const Uint32 m_varBytes;
+ Uint8* m_buf;
+ Uint32 m_bufMaxLen;
+ Endian::Value m_endian; // Native until finalize()
+ // iterator on items added
+ Iter m_iter;
+ };
+
+ /*
+ * Read-only superclass of BoundC, analogous to DataC. Initialized
+ * from a previously constructed Bound or DataC buffer.
+ */
+ class BoundC : public Error {
+ public:
+ BoundC(DataC& data);
+ ~BoundC() {}
+ // call this before first use
+ int finalize(int side);
+ // compare bound to key (may return 0 if bound is longer)
+ int cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const;
+ // compare bounds (may return 0 if cnt is less than min length)
+ int cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const;
+ // getters
+ DataC& get_data() const;
+ int get_side() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const BoundC&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ friend class Bound;
+ // undefined
+ BoundC(const BoundC&);
+ BoundC& operator=(const BoundC&);
+ DataC& m_data;
+ int m_side;
+ };
+
+ /*
+ * Ordered index range bound consists of a partial key and a "side".
+ * The partial key is a Data instance where some initial number of
+ * values are present. It is defined separately by the caller and
+ * passed to Bound ctor by reference.
+ */
+ class Bound : public BoundC {
+ public:
+ Bound(Data& data);
+ ~Bound() {}
+ void reset();
+ // call this before first use
+ int finalize(int side);
+ // getters
+ Data& get_data() const;
+ // print
+ friend NdbOut& operator<<(NdbOut&, const Bound&);
+ void print(NdbOut& out) const;
+ const char* print(char* buf, Uint32 bufsz) const;
+ int validate() const;
+
+ private:
+ // undefined
+ Bound(const Bound&);
+ Bound& operator=(const Bound&);
+ Data& m_data;
+ };
+
+ /*
+ * Helper for print() methods.
+ */
+ struct Print {
+ private:
+ friend class Endian;
+ friend class Type;
+ friend class Spec;
+ friend class Iter;
+ friend class DataC;
+ friend class Data;
+ friend class BoundC;
+ friend class Bound;
+ Print(char* buf, Uint32 bufsz);
+ void print(const char* frm, ...);
+ char* m_buf;
+ Uint32 m_bufsz;
+ Uint32 m_sz;
+ };
+};
+
+// NdbPack
+
+inline const NdbSqlUtil::Type&
+NdbPack::getSqlType(Uint32 typeId)
+{
+ return NdbSqlUtil::m_typeList[typeId];
+}
+
+// NdbPack::Error
+
+inline
+NdbPack::Error::Error()
+{
+ m_error_code = 0;
+ m_error_line = 0;
+}
+
+// NdbPack::Endian
+
+inline NdbPack::Endian::Value
+NdbPack::Endian::get_endian()
+{
+#ifndef WORDS_BIGENDIAN
+ return Little;
+#else
+ return Big;
+#endif
+}
+
+// NdbPack::Type
+
+inline
+NdbPack::Type::Type()
+{
+ m_typeId = NDB_TYPE_UNDEFINED;
+ m_byteSize = 0;
+ m_nullable = true;
+ m_csNumber = 0;
+ m_arrayType = 0;
+ m_nullbitPos = 0;
+}
+
+inline
+NdbPack::Type::Type(int typeId, Uint32 byteSize, bool nullable, Uint32 csNumber)
+{
+ set(typeId, byteSize, nullable, csNumber);
+}
+
+inline void
+NdbPack::Type::set(Uint32 typeId, Uint32 byteSize, bool nullable, Uint32 csNumber)
+{
+ m_typeId = typeId;
+ m_byteSize = byteSize;
+ m_nullable = nullable;
+ m_csNumber = csNumber;
+}
+
+inline Uint32
+NdbPack::Type::get_type_id() const
+{
+ return m_typeId;
+}
+
+inline Uint32
+NdbPack::Type::get_byte_size() const
+{
+ return m_byteSize;
+}
+
+inline bool
+NdbPack::Type::get_nullable() const
+{
+ return (bool)m_nullable;
+}
+
+inline Uint32
+NdbPack::Type::get_cs_number() const
+{
+ return m_csNumber;
+}
+
+inline Uint32
+NdbPack::Type::get_array_type() const
+{
+ return m_arrayType;
+}
+
+// NdbPack::Spec
+
+inline
+NdbPack::Spec::Spec()
+{
+ reset();
+ m_buf = 0;
+ m_bufMaxCnt = 0;
+}
+
+inline void
+NdbPack::Spec::set_buf(Type* buf, Uint32 bufMaxCnt)
+{
+ reset();
+ m_buf = buf;
+ m_bufMaxCnt = bufMaxCnt;
+}
+
+inline void
+NdbPack::Spec::set_buf(Type* buf)
+{
+ m_buf = buf;
+}
+
+inline void
+NdbPack::Spec::reset()
+{
+ m_cnt = 0;
+ m_nullableCnt = 0;
+ m_varsizeCnt = 0;
+ m_maxByteSize = 0;
+}
+
+inline const NdbPack::Type&
+NdbPack::Spec::get_type(Uint32 i) const
+{
+ assert(i < m_cnt);
+ return m_buf[i];
+}
+
+inline Uint32
+NdbPack::Spec::get_cnt() const
+{
+ return m_cnt;
+}
+
+inline Uint32
+NdbPack::Spec::get_nullable_cnt(bool allNullable) const
+{
+ if (!allNullable)
+ return m_nullableCnt;
+ else
+ return m_cnt;
+}
+
+inline Uint32
+NdbPack::Spec::get_nullmask_len(bool allNullable) const
+{
+ return (get_nullable_cnt(allNullable) + 7) / 8;
+}
+
+inline Uint32
+NdbPack::Spec::get_max_data_len(bool allNullable) const
+{
+ return get_nullmask_len(allNullable) + m_maxByteSize;
+}
+
+inline Uint32
+NdbPack::Spec::get_min_var_bytes(bool allNullable) const
+{
+ const Uint32 len = get_max_data_len(allNullable);
+ return (len < 256 ? 1 : 2);
+}
+
+// NdbPack::Iter
+
+inline
+NdbPack::Iter::Iter(const DataC& data) :
+ m_spec(data.m_spec),
+ m_allNullable(data.m_allNullable)
+{
+ reset();
+}
+
+inline void
+NdbPack::Iter::reset()
+{
+ m_itemPos = m_spec.get_nullmask_len(m_allNullable);
+ m_cnt = 0;
+ m_nullCnt = 0;
+ m_lenBytes = 0;
+ m_bareLen = 0;
+ m_itemLen = 0;
+}
+
+// NdbPack::DataC
+
+inline
+NdbPack::DataC::DataC(const Spec& spec, bool allNullable) :
+ m_spec(spec),
+ m_allNullable(allNullable)
+{
+ m_buf = 0;
+ m_bufMaxLen = 0;
+ m_cnt = 0;
+}
+
+inline void
+NdbPack::DataC::set_buf(const void* buf, Uint32 bufMaxLen, Uint32 cnt)
+{
+ m_buf = static_cast<const Uint8*>(buf);
+ m_bufMaxLen = bufMaxLen;
+ m_cnt = cnt;
+}
+
+inline const NdbPack::Spec&
+NdbPack::DataC::get_spec() const
+{
+ return m_spec;
+}
+
+inline bool
+NdbPack::DataC::get_all_nullable() const
+{
+ return &m_allNullable;
+}
+
+inline const void*
+NdbPack::DataC::get_data_buf() const
+{
+ return &m_buf[0];
+}
+
+inline Uint32
+NdbPack::DataC::get_cnt() const
+{
+ return m_cnt;
+}
+
+inline bool
+NdbPack::DataC::is_empty() const
+{
+ return m_cnt == 0;
+}
+
+inline bool
+NdbPack::DataC::is_full() const
+{
+ return m_cnt == m_spec.m_cnt;
+}
+
+// NdbPack::Data
+
+inline
+NdbPack::Data::Data(const Spec& spec, bool allNullable, Uint32 varBytes) :
+ DataC(spec, allNullable),
+ m_varBytes(varBytes),
+ m_iter(*this)
+{
+ m_buf = 0;
+ m_bufMaxLen = 0;
+ m_endian = Endian::Native;
+}
+
+inline void
+NdbPack::Data::set_buf(void* buf, Uint32 bufMaxLen)
+{
+ m_buf = static_cast<Uint8*>(buf);
+ m_bufMaxLen = bufMaxLen;
+ reset();
+ assert(bufMaxLen >= m_varBytes);
+ DataC::set_buf(&m_buf[m_varBytes], m_bufMaxLen - m_varBytes, 0);
+}
+
+inline void
+NdbPack::Data::reset()
+{
+ m_cnt = 0; // in DataC
+ const Uint32 bytes = m_varBytes + m_spec.get_nullmask_len(m_allNullable);
+ memset(m_buf, 0, bytes);
+ m_endian = Endian::Native;
+ m_iter.reset();
+}
+
+inline int
+NdbPack::Data::finalize()
+{
+ if (m_varBytes == 0 ||
+ finalize_impl() == 0)
+ {
+ m_endian = Endian::get_endian();
+ return 0;
+ }
+ return -1;
+}
+
+inline int
+NdbPack::Data::convert(Endian::Value to_endian)
+{
+ if (unlikely(to_endian == Endian::Native))
+ to_endian = Endian::get_endian();
+ if (m_endian == to_endian)
+ return 0;
+ if (convert_impl(to_endian) == 0)
+ {
+ m_endian = to_endian;
+ return 0;
+ }
+ return -1;
+}
+
+inline Uint32
+NdbPack::Data::get_max_len() const
+{
+ return m_varBytes + m_spec.get_max_data_len(m_allNullable);
+}
+
+inline Uint32
+NdbPack::Data::get_max_len4() const
+{
+ Uint32 len4 = get_max_len();
+ len4 += 3;
+ len4 /= 4;
+ len4 *= 4;
+ return len4;
+}
+
+inline Uint32
+NdbPack::Data::get_var_bytes() const
+{
+ return m_varBytes;
+}
+
+inline const void*
+NdbPack::Data::get_full_buf() const
+{
+ return &m_buf[0];
+}
+
+inline Uint32
+NdbPack::Data::get_full_len() const
+{
+ return m_varBytes + m_iter.m_itemPos + m_iter.m_itemLen;
+}
+
+inline Uint32
+NdbPack::Data::get_data_len() const
+{
+ return m_iter.m_itemPos + m_iter.m_itemLen;
+}
+
+inline Uint32
+NdbPack::Data::get_null_cnt() const
+{
+ return m_iter.m_nullCnt;
+}
+
+inline NdbPack::Endian::Value
+NdbPack::Data::get_endian() const
+{
+ return m_endian;
+}
+
+// NdbPack::BoundC
+
+inline
+NdbPack::BoundC::BoundC(DataC& data) :
+ m_data(data)
+{
+ m_side = 0;
+}
+
+inline int
+NdbPack::BoundC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
+{
+ const BoundC& b1 = *this;
+ const DataC& d1 = b1.m_data;
+ int res = d1.cmp(d2, cnt, num_eq);
+ if (res == 0 && d1.m_cnt <= d2.m_cnt)
+ res = b1.m_side;
+ return res;
+}
+
+inline NdbPack::DataC&
+NdbPack::BoundC::get_data() const
+{
+ return m_data;
+}
+
+inline int
+NdbPack::BoundC::get_side() const
+{
+ return m_side;
+}
+
+// NdbPack::Bound
+
+inline
+NdbPack::Bound::Bound(Data& data) :
+ BoundC(data),
+ m_data(data)
+{
+}
+
+inline void
+NdbPack::Bound::reset()
+{
+ m_data.reset();
+ m_side = 0;
+}
+
+inline int
+NdbPack::Bound::finalize(int side)
+{
+ if (m_data.finalize() == -1)
+ {
+ set_error(m_data);
+ return -1;
+ }
+ if (BoundC::finalize(side) == -1)
+ return -1;
+ return 0;
+}
+
+inline NdbPack::Data&
+NdbPack::Bound::get_data() const
+{
+ return m_data;
+}
+
+#endif // NDB_PACK_HPP
=== modified file 'storage/ndb/include/util/NdbSqlUtil.hpp'
--- a/storage/ndb/include/util/NdbSqlUtil.hpp 2011-02-19 03:13:04 +0000
+++ b/storage/ndb/include/util/NdbSqlUtil.hpp 2011-05-04 15:10:05 +0000
@@ -37,23 +37,18 @@ typedef struct charset_info_st CHARSET_I
class NdbSqlUtil {
public:
/**
- * Compare attribute values. Returns -1, 0, +1 for less, equal,
- * greater, respectively. Parameters are pointers to values and their
- * lengths in bytes. The lengths can differ.
+ * Compare attribute values. Returns negative, zero, positive for
+ * less, equal, greater. We trust DBTUP to validate all data and
+ * mysql upgrade to not invalidate them. Bad values (such as NaN)
+ * causing undefined results crash here always (require, not assert)
+ * since they are likely to cause a more obscure crash in DBTUX.
+ * wl4163_todo: API probably should not crash.
*
- * First value is a full value but second value can be partial. If
- * the partial value is not enough to determine the result, CmpUnknown
- * will be returned. A shorter second value is not necessarily
- * partial. Partial values are allowed only for types where prefix
- * comparison is possible (basically, binary strings).
- *
- * First parameter is a pointer to type specific extra info. Char
- * types receive CHARSET_INFO in it.
- *
- * If a value cannot be parsed, it compares like NULL i.e. less than
- * any valid value.
+ * Parameters are pointers to values (no alignment requirements) and
+ * their lengths in bytes. First parameter is a pointer to type
+ * specific extra info. Char types receive CHARSET_INFO in it.
*/
- typedef int Cmp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full);
+ typedef int Cmp(const void* info, const void* p1, uint n1, const void* p2, uint n2);
/**
* Prototype for "like" comparison. Defined for string types. First
@@ -73,13 +68,6 @@ public:
*/
typedef int AndMask(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero);
- enum CmpResult {
- CmpLess = -1,
- CmpEqual = 0,
- CmpGreater = 1,
- CmpUnknown = 2 // insufficient partial data
- };
-
struct Type {
enum Enum {
Undefined = NDB_TYPE_UNDEFINED,
@@ -126,13 +114,6 @@ public:
static const Type& getType(Uint32 typeId);
/**
- * Get the normalized type used in hashing and key comparisons.
- * Maps all string types to Binary. This includes Var* strings
- * because strxfrm result is padded to fixed (maximum) length.
- */
- static const Type& getTypeBinary(Uint32 typeId);
-
- /**
* Check character set.
*/
static uint check_column_for_pk(Uint32 typeId, const void* info);
@@ -159,11 +140,6 @@ public:
const uchar *src, size_t srclen);
/**
- * Compare decimal numbers.
- */
- static int cmp_olddecimal(const uchar* s1, const uchar* s2, unsigned n);
-
- /**
* Convert attribute data to/from network byte order
* This method converts the passed data of the passed type
* between host and network byte order.
@@ -177,6 +153,7 @@ public:
Uint32 dataByteSize);
private:
+ friend class NdbPack;
/**
* List of all types. Must match Type::Enum.
*/
=== modified file 'storage/ndb/src/common/util/CMakeLists.txt'
--- a/storage/ndb/src/common/util/CMakeLists.txt 2011-04-01 11:46:04 +0000
+++ b/storage/ndb/src/common/util/CMakeLists.txt 2011-05-09 15:35:25 +0000
@@ -54,6 +54,7 @@ ADD_LIBRARY(ndbgeneral STATIC
SparseBitmask.cpp
require.c
Vector.cpp
+ NdbPack.cpp
)
TARGET_LINK_LIBRARIES(ndbgeneral ndbtrace ${ZLIB_LIBRARY} mysys)
@@ -82,3 +83,8 @@ SET_TARGET_PROPERTIES(ndb_version-t
PROPERTIES COMPILE_FLAGS "-DTEST_VERSION")
TARGET_LINK_LIBRARIES(ndb_version-t ndbgeneral)
+ADD_EXECUTABLE(NdbPack-t NdbPack.cpp)
+SET_TARGET_PROPERTIES(NdbPack-t
+ PROPERTIES COMPILE_FLAGS "-DTEST_NDB_PACK")
+TARGET_LINK_LIBRARIES(NdbPack-t ndbgeneral ndbportlib)
+
=== modified file 'storage/ndb/src/common/util/Makefile.am'
--- a/storage/ndb/src/common/util/Makefile.am 2011-04-01 11:46:04 +0000
+++ b/storage/ndb/src/common/util/Makefile.am 2011-05-04 09:44:18 +0000
@@ -27,14 +27,15 @@ libgeneral_la_SOURCES = \
strdup.c \
ConfigValues.cpp ndb_init.cpp basestring_vsnprintf.c \
Bitmask.cpp SparseBitmask.cpp parse_mask.hpp \
- ndb_rand.c require.c Vector.cpp
+ ndb_rand.c require.c Vector.cpp \
+ NdbPack.cpp
INCLUDES_LOC = @ZLIB_INCLUDES@
libndbazio_la_SOURCES = ndbzio.c
libndbazio_la_LIBADD = @ZLIB_LIBS@
-noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t
+noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t NdbPack-t
BaseString_t_SOURCES = BaseString.cpp
BaseString_t_CXXFLAGS = -DTEST_BASE_STRING
@@ -73,5 +74,14 @@ ndb_version_t_SOURCES = version.cpp
ndb_version_t_CXXFLAGS = -DTEST_VERSION
ndb_version_t_LDADD = libgeneral.la
+NdbPack_t_SOURCES = NdbPack.cpp
+NdbPack_t_CXXFLAGS = -DTEST_NDB_PACK
+NdbPack_t_LDADD = \
+ libgeneral.la \
+ $(top_builddir)/storage/ndb/src/common/portlib/libportlib.la \
+ $(top_builddir)/mysys/libmysys.la \
+ $(top_builddir)/strings/libmystrings.la \
+ $(top_builddir)/dbug/libdbug.la
+
include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_util.mk.am
=== added file 'storage/ndb/src/common/util/NdbPack.cpp'
--- a/storage/ndb/src/common/util/NdbPack.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/NdbPack.cpp 2011-05-09 15:35:25 +0000
@@ -0,0 +1,2053 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include <ndb_global.h>
+#include <NdbPack.hpp>
+#include <NdbOut.hpp>
+#include <NdbEnv.h>
+
+// NdbPack::Error
+
+int
+NdbPack::Error::get_error_code() const
+{
+ return m_error_code;
+}
+
+int
+NdbPack::Error::get_error_line() const
+{
+ return m_error_line;
+}
+
+void
+NdbPack::Error::set_error(int code, int line) const
+{
+ m_error_code = code;
+ m_error_line = line;
+#ifdef VM_TRACE
+ const char* p = NdbEnv_GetEnv("NDB_PACK_ABORT_ON_ERROR", (char*)0, 0);
+ if (p != 0 && strchr("1Y", p[0]) != 0)
+ require(false);
+#endif
+}
+
+void
+NdbPack::Error::set_error(const Error& e2) const
+{
+ set_error(e2.m_error_code, e2.m_error_line);
+}
+
+// NdbPack::Endian
+
+void
+NdbPack::Endian::convert(void* ptr, Uint32 len)
+{
+ Uint8* p = (Uint8*)ptr;
+ for (Uint32 i = 0; i < len / 2; i++)
+ {
+ Uint32 j = len - i - 1;
+ Uint8 tmp = p[i];
+ p[i] = p[j];
+ p[j] = tmp;
+ }
+}
+
+// NdbPack::Type
+
+struct Ndb_pack_type_info {
+ bool m_supported;
+ Uint16 m_fixSize; // if non-zero must have this exact size
+ Uint16 m_arrayType; // 0,1,2 length bytes
+ bool m_charType; // type with character set
+ bool m_convert; // convert endian (reverse byte order)
+};
+
+static const Ndb_pack_type_info
+g_ndb_pack_type_info[] = {
+ { 0, 0, 0, 0, 0 }, // NDB_TYPE_UNDEFINED
+ { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYINT
+ { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYUNSIGNED
+ { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLINT
+ { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLUNSIGNED
+ { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMINT
+ { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMUNSIGNED
+ { 1, 4, 0, 0, 1 }, // NDB_TYPE_INT
+ { 1, 4, 0, 0, 1 }, // NDB_TYPE_UNSIGNED
+ { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGINT
+ { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGUNSIGNED
+ { 1, 4, 0, 0, 1 }, // NDB_TYPE_FLOAT
+ { 1, 8, 0, 0, 1 }, // NDB_TYPE_DOUBLE
+ { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMAL
+ { 1, 0, 0, 1, 0 }, // NDB_TYPE_CHAR
+ { 1, 0, 1, 1, 0 }, // NDB_TYPE_VARCHAR
+ { 1, 0, 0, 0, 0 }, // NDB_TYPE_BINARY
+ { 1, 0, 1, 0, 0 }, // NDB_TYPE_VARBINARY
+ { 1, 8, 0, 0, 0 }, // NDB_TYPE_DATETIME
+ { 1, 3, 0, 0, 0 }, // NDB_TYPE_DATE
+ { 0, 0, 0, 0, 0 }, // NDB_TYPE_BLOB
+ { 0, 0, 0, 1, 0 }, // NDB_TYPE_TEXT
+ { 0, 0, 0, 0, 0 }, // NDB_TYPE_BIT
+ { 1, 0, 2, 1, 0 }, // NDB_TYPE_LONGVARCHAR
+ { 1, 0, 2, 0, 0 }, // NDB_TYPE_LONGVARBINARY
+ { 1, 3, 0, 0, 0 }, // NDB_TYPE_TIME
+ { 1, 1, 0, 0, 0 }, // NDB_TYPE_YEAR
+ { 1, 4, 0, 0, 0 }, // NDB_TYPE_TIMESTAMP
+ { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMALUNSIGNED
+ { 1, 0, 0, 0, 0 }, // NDB_TYPE_DECIMAL
+ { 1, 0, 0, 0, 0 } // NDB_TYPE_DECIMALUNSIGNED
+};
+
+static const int g_ndb_pack_type_info_cnt =
+ sizeof(g_ndb_pack_type_info) / sizeof(g_ndb_pack_type_info[0]);
+
+int
+NdbPack::Type::complete()
+{
+ if (m_typeId == 0)
+ {
+ set_error(TypeNotSet, __LINE__);
+ return -1;
+ }
+ if (m_typeId >= g_ndb_pack_type_info_cnt)
+ {
+ set_error(TypeNotSet, __LINE__);
+ return -1;
+ }
+ const Ndb_pack_type_info& info = g_ndb_pack_type_info[m_typeId];
+ if (!info.m_supported)
+ {
+ set_error(TypeNotSupported, __LINE__);
+ return -1;
+ }
+ if (m_byteSize == 0)
+ {
+ set_error(TypeSizeZero, __LINE__);
+ return -1;
+ }
+ if (info.m_fixSize != 0 && m_byteSize != info.m_fixSize)
+ {
+ set_error(TypeFixSizeInvalid, __LINE__);
+ return -1;
+ }
+ if (!(m_nullable <= 1))
+ {
+ set_error(TypeNullableNotBool, __LINE__);
+ return -1;
+ }
+ if (info.m_charType && m_csNumber == 0)
+ {
+ set_error(CharsetNotSpecified, __LINE__);
+ return -1;
+ }
+ if (info.m_charType && all_charsets[m_csNumber] == 0)
+ {
+ CHARSET_INFO* cs = get_charset(m_csNumber, MYF(0));
+ if (cs == 0)
+ {
+ set_error(CharsetNotFound, __LINE__);
+ return -1;
+ }
+ all_charsets[m_csNumber] = cs; // yes caller must do this
+ }
+ if (!info.m_charType && m_csNumber != 0)
+ {
+ set_error(CharsetNotAllowed, __LINE__);
+ return -1;
+ }
+ m_arrayType = info.m_arrayType;
+ return 0;
+}
+
+// NdbPack::Spec
+
+int
+NdbPack::Spec::add(Type type)
+{
+ Uint32 cnt = m_cnt;
+ Uint32 nullable_cnt = m_nullableCnt;
+ Uint32 varsize_cnt = m_varsizeCnt;
+ Uint32 max_byte_size = m_maxByteSize;
+ if (type.complete() == -1)
+ {
+ set_error(type);
+ return -1;
+ }
+ type.m_nullbitPos = 0xFFFF;
+ if (type.m_nullable)
+ {
+ type.m_nullbitPos = nullable_cnt;
+ nullable_cnt++;
+ }
+ if (type.m_arrayType != 0)
+ {
+ varsize_cnt++;
+ }
+ max_byte_size += type.m_byteSize;
+ if (cnt >= m_bufMaxCnt)
+ {
+ set_error(SpecBufOverflow, __LINE__);
+ return -1;
+ }
+ m_buf[cnt] = type;
+ cnt++;
+ m_cnt = cnt;
+ m_nullableCnt = nullable_cnt;
+ m_varsizeCnt = varsize_cnt;
+ m_maxByteSize = max_byte_size;
+ return 0;
+}
+
+int
+NdbPack::Spec::add(Type type, Uint32 cnt)
+{
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ if (add(type) == -1)
+ return -1;
+ }
+ return 0;
+}
+
+void
+NdbPack::Spec::copy(const Spec& s2)
+{
+ assert(m_bufMaxCnt >= s2.m_cnt);
+ reset();
+ m_cnt = s2.m_cnt;
+ m_nullableCnt = s2.m_nullableCnt;
+ m_varsizeCnt = s2.m_varsizeCnt;
+ m_maxByteSize = s2.m_maxByteSize;
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ m_buf[i] = s2.m_buf[i];
+ }
+}
+
+// NdbPack::Iter
+
+int
+NdbPack::Iter::desc(const Uint8* item)
+{
+ const Uint32 i = m_cnt; // item index
+ assert(i < m_spec.m_cnt);
+ const Type& type = m_spec.m_buf[i];
+ const Uint32 lenBytes = type.m_arrayType;
+ Uint32 bareLen = 0;
+ switch (lenBytes) {
+ case 0:
+ bareLen = type.m_byteSize;
+ break;
+ case 1:
+ bareLen = item[0];
+ break;
+ case 2:
+ bareLen = item[0] + (item[1] << 8);
+ break;
+ default:
+ assert(false);
+ set_error(InternalError, __LINE__);
+ return -1;
+ }
+ const Uint32 itemLen = lenBytes + bareLen;
+ if (itemLen > type.m_byteSize)
+ {
+ set_error(DataValueOverflow, __LINE__);
+ return -1;
+ }
+ m_itemPos += m_itemLen; // skip previous item
+ m_cnt++;
+ m_lenBytes = lenBytes;
+ m_bareLen = bareLen;
+ m_itemLen = itemLen;
+ return 0;
+}
+
+int
+NdbPack::Iter::desc_null()
+{
+ assert(m_cnt < m_spec.m_cnt);
+ // caller checks if null allowed
+ m_itemPos += m_itemLen; // skip previous item
+ m_cnt++;
+ m_nullCnt++;
+ m_lenBytes = 0;
+ m_bareLen = 0;
+ m_itemLen = 0;
+ return 0;
+}
+
+int
+NdbPack::Iter::cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const
+{
+ const Iter& r1 = *this;
+ assert(&r1.m_spec == &r2.m_spec);
+ assert(r1.m_cnt == r2.m_cnt && r1.m_cnt > 0);
+ const Uint32 i = r1.m_cnt - 1; // item index
+ int res = 0;
+ const Uint32 n1 = r1.m_itemLen;
+ const Uint32 n2 = r2.m_itemLen;
+ if (n1 != 0)
+ {
+ if (n2 != 0)
+ {
+ const Type& type = r1.m_spec.m_buf[i];
+ const NdbSqlUtil::Type& sqlType = getSqlType(type.m_typeId);
+ const Uint8* p1 = &buf1[r1.m_itemPos];
+ const Uint8* p2 = &buf2[r2.m_itemPos];
+ CHARSET_INFO* cs = all_charsets[type.m_csNumber];
+ res = (*sqlType.m_cmp)(cs, p1, n1, p2, n2);
+ }
+ else
+ {
+ res = +1;
+ }
+ }
+ else
+ {
+ if (n2 != 0)
+ res = -1;
+ }
+ return res;
+}
+
+// NdbPack::DataC
+
+int
+NdbPack::DataC::desc(Iter& r) const
+{
+ const Uint32 i = r.m_cnt; // item index
+ assert(i < m_cnt);
+ const Type& type = m_spec.m_buf[i];
+ if (type.m_nullable || m_allNullable)
+ {
+ Uint32 nullbitPos = 0;
+ if (!m_allNullable)
+ nullbitPos = type.m_nullbitPos;
+ else
+ nullbitPos = i;
+ const Uint32 byte_pos = nullbitPos / 8;
+ const Uint32 bit_pos = nullbitPos % 8;
+ const Uint8 bit_mask = (1 << bit_pos);
+ const Uint8& the_byte = m_buf[byte_pos];
+ if ((the_byte & bit_mask) != 0)
+ {
+ if (r.desc_null() == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ return 0;
+ }
+ }
+ const Uint32 pos = r.m_itemPos + r.m_itemLen;
+ const Uint8* item = &m_buf[pos];
+ if (r.desc(item) == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::DataC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
+{
+ const DataC& d1 = *this;
+ assert(cnt <= d1.m_cnt);
+ assert(cnt <= d2.m_cnt);
+ Iter r1(d1);
+ Iter r2(d2);
+ int res = 0;
+ Uint32 i; // remember last
+ for (i = 0; i < cnt; i++)
+ {
+ d1.desc(r1);
+ d2.desc(r2);
+ res = r1.cmp(r2, d1.m_buf, d2.m_buf);
+ if (res != 0)
+ break;
+ }
+ num_eq = i;
+ return res;
+}
+
+// NdbPack::Data
+
+int
+NdbPack::Data::add(const void* data, Uint32* len_out)
+{
+ assert(data != 0);
+ const Uint8* item = (const Uint8*)data;
+ const Uint32 i = m_cnt; // item index
+ if (i >= m_spec.m_cnt)
+ {
+ set_error(DataCntOverflow, __LINE__);
+ return -1;
+ }
+ Iter& r = m_iter;
+ assert(r.m_cnt == i);
+ const Uint32 fullLen = m_varBytes + r.m_itemPos + r.m_itemLen;
+ if (r.desc(item) == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ if (fullLen + r.m_itemLen > m_bufMaxLen)
+ {
+ set_error(DataBufOverflow, __LINE__);
+ return -1;
+ }
+ memcpy(&m_buf[fullLen], item, r.m_itemLen);
+ *len_out = r.m_itemLen;
+ m_cnt++;
+ return 0;
+}
+
+int
+NdbPack::Data::add(const void* data, Uint32 cnt, Uint32* len_out)
+{
+ const Uint8* data_ptr = (const Uint8*)data;
+ Uint32 len_tot = 0;
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ Uint32 len;
+ if (add(data_ptr, &len) == -1)
+ return -1;
+ if (data != 0)
+ data_ptr += len;
+ len_tot += len;
+ }
+ *len_out = len_tot;
+ return 0;
+}
+
+int
+NdbPack::Data::add_null(Uint32* len_out)
+{
+ const Uint32 i = m_cnt; // item index
+ if (i >= m_spec.m_cnt)
+ {
+ set_error(DataCntOverflow, __LINE__);
+ return -1;
+ }
+ Iter& r = m_iter;
+ assert(r.m_cnt == i);
+ if (r.desc_null() == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ Uint32 nullbitPos = 0;
+ if (!m_allNullable)
+ {
+ const Type& type = m_spec.m_buf[i];
+ if (!type.m_nullable)
+ {
+ set_error(DataNotNullable, __LINE__);
+ return -1;
+ }
+ nullbitPos = type.m_nullbitPos;
+ }
+ else
+ {
+ nullbitPos = i;
+ }
+ const Uint32 byte_pos = nullbitPos / 8;
+ const Uint32 bit_pos = nullbitPos % 8;
+ const Uint8 bit_mask = (1 << bit_pos);
+ Uint8& the_byte = m_buf[m_varBytes + byte_pos];
+ assert((the_byte & bit_mask) == 0);
+ the_byte |= bit_mask;
+ *len_out = r.m_itemLen;
+ m_cnt++;
+ return 0;
+}
+
+int
+NdbPack::Data::add_null(Uint32 cnt, Uint32* len_out)
+{
+ Uint32 len_tot = 0;
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ Uint32 len;
+ if (add_null(&len) == -1)
+ return -1;
+ len_tot += len;
+ }
+ *len_out = len_tot;
+ return 0;
+}
+
+int
+NdbPack::Data::add_poai(const Uint32* poai, Uint32* len_out)
+{
+ const AttributeHeader ah = *(const AttributeHeader*)&poai[0];
+ if (!ah.isNULL())
+ {
+ if (add(&poai[1], len_out) == -1)
+ return -1;
+ }
+ else
+ {
+ if (add_null(len_out) == -1)
+ return -1;
+ }
+ if (ah.getByteSize() != *len_out)
+ {
+ set_error(InvalidAttrInfo, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::Data::add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out)
+{
+ Uint32 len_tot = 0;
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ Uint32 len;
+ if (add_poai(poai, &len) == -1)
+ return -1;
+ len_tot += len;
+ poai += 1 + (len + 3) / 4;
+ }
+ *len_out = len_tot;
+ return 0;
+}
+
+int
+NdbPack::Data::finalize_impl()
+{
+ const Uint32 dataLen = m_iter.m_itemPos + m_iter.m_itemLen;
+ switch (m_varBytes) {
+ // case 0: inlined
+ case 1:
+ if (dataLen <= 0xFF)
+ {
+ m_buf[0] = dataLen;
+ return 0;
+ }
+ break;
+ case 2:
+ if (dataLen <= 0xFFFF)
+ {
+ m_buf[0] = (dataLen & 0xFF);
+ m_buf[1] = (dataLen >> 8);
+ return 0;
+ }
+ break;
+ default:
+ break;
+ }
+ set_error(InternalError, __LINE__);
+ return -1;
+}
+
+int
+NdbPack::Data::desc_all(Uint32 cnt)
+{
+ assert(m_cnt == 0); // reset() would destroy nullmask
+ for (Uint32 i = 0; i < cnt; i++)
+ {
+ m_cnt++;
+ if (desc(m_iter) == -1)
+ return -1;
+ }
+ if (finalize() == -1)
+ return -1;
+ return 0;
+}
+
+int
+NdbPack::Data::copy(const DataC& d2)
+{
+ reset();
+ Iter r2(d2);
+ const Uint32 cnt2 = d2.m_cnt;
+ for (Uint32 i = 0; i < cnt2; i++)
+ {
+ if (d2.desc(r2) == -1)
+ return -1;
+ Uint32 len_out = ~(Uint32)0;
+ if (r2.m_itemLen != 0)
+ {
+ if (add(&d2.m_buf[r2.m_itemPos], &len_out) == -1)
+ return -1;
+ assert(len_out == r2.m_itemLen);
+ }
+ else
+ {
+ if (add_null(&len_out) == -1)
+ return -1;
+ assert(len_out ==0);
+ }
+ }
+ if (finalize() == -1)
+ return -1;
+ return 0;
+}
+
+int
+NdbPack::Data::convert_impl(Endian::Value to_endian)
+{
+ const Spec& spec = m_spec;
+ Iter r(*this);
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ if (DataC::desc(r) == -1)
+ {
+ set_error(r);
+ return -1;
+ }
+ const Type& type = spec.m_buf[i];
+ const Uint32 typeId = type.m_typeId;
+ const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+ if (info.m_convert)
+ {
+ Uint8* ptr = &m_buf[m_varBytes + r.m_itemPos];
+ Uint32 len = r.m_itemLen;
+ Endian::convert(ptr, len);
+ }
+ }
+ return 0;
+}
+
+// NdbPack::BoundC
+
+int
+NdbPack::BoundC::finalize(int side)
+{
+ if (m_data.m_cnt == 0 && side != 0)
+ {
+ set_error(BoundEmptySide, __LINE__);
+ return -1;
+ }
+ if (m_data.m_cnt != 0 && side != -1 && side != +1)
+ {
+ set_error(BoundNonemptySide, __LINE__);
+ return -1;
+ }
+ m_side = side;
+ return 0;
+}
+
+int
+NdbPack::BoundC::cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const
+{
+ const BoundC& b1 = *this;
+ const DataC& d1 = b1.m_data;
+ const DataC& d2 = b2.m_data;
+ int res = d1.cmp(d2, cnt, num_eq);
+ if (res == 0)
+ {
+ if (cnt < d1.m_cnt && cnt < d2.m_cnt)
+ ;
+ else if (d1.m_cnt < d2.m_cnt)
+ res = (+1) * b1.m_side;
+ else if (d1.m_cnt > d2.m_cnt)
+ res = (-1) * b2.m_side;
+ else if (b1.m_side < b2.m_side)
+ res = -1;
+ else if (b1.m_side > b2.m_side)
+ res = +1;
+ }
+ return res;
+}
+
+// NdbPack::Bound
+
+// print
+
+NdbPack::Print::Print(char* buf, Uint32 bufsz) :
+ m_buf(buf), m_bufsz(bufsz), m_sz(0) {}
+
+void
+NdbPack::Print::print(const char* fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ if (m_bufsz > m_sz)
+ {
+ BaseString::vsnprintf(&m_buf[m_sz], m_bufsz - m_sz, fmt, ap);
+ m_sz += (Uint32)strlen(&m_buf[m_sz]);
+ }
+ va_end(ap);
+}
+
+// print Type
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Type& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::Type::print(NdbOut& out) const
+{
+ char buf[200];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Type::print(char* buf, Uint32 bufsz) const
+{
+ Print p(buf, bufsz);
+ p.print("typeId:%u", m_typeId);
+ p.print(" byteSize:%u", m_byteSize);
+ p.print(" nullable:%u", m_nullable);
+ p.print(" csNumber:%u", m_csNumber);
+ return buf;
+}
+
+// print Spec
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Spec& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::Spec::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Spec::print(char* buf, Uint32 bufsz) const
+{
+ Print p(buf, bufsz);
+ p.print("cnt:%u", m_cnt);
+ p.print(" nullableCnt:%u", m_nullableCnt);
+ p.print(" varsizeCnt:%u", m_varsizeCnt);
+ p.print(" nullmaskLen:%u", get_nullmask_len(false));
+ p.print(" maxByteSize:%u", m_maxByteSize);
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ const Type& type = m_buf[i];
+ p.print(" [%u", i);
+ p.print(" typeId:%u", type.m_typeId);
+ p.print(" nullable:%u", type.m_nullable);
+ p.print(" byteSize:%u", type.m_byteSize);
+ p.print(" csNumber:%u", type.m_csNumber);
+ p.print("]");
+ }
+ return buf;
+}
+
+// print DataC
+
+bool g_ndb_pack_print_hex_always = false;
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::DataC& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::DataC::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::DataC::print(char* buf, Uint32 bufsz, bool convert_flag) const
+{
+ Print p(buf, bufsz);
+ const Spec& spec = m_spec;
+ const Uint32 nullmask_len = spec.get_nullmask_len(m_allNullable);
+ if (nullmask_len != 0)
+ {
+ p.print("nullmask:");
+ for (Uint32 i = 0; i < nullmask_len; i++)
+ {
+ int x = m_buf[i];
+ p.print("%02x", x);
+ }
+ }
+ Iter r(*this);
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ desc(r);
+ const Uint8* value = &m_buf[r.m_itemPos];
+ p.print(" [%u", i);
+ p.print(" pos:%u", r.m_itemPos);
+ p.print(" len:%u", r.m_itemLen);
+ if (r.m_itemLen > 0)
+ {
+ p.print(" value:");
+ // some specific types for debugging
+ const Type& type = spec.m_buf[i];
+ bool ok = true;
+ switch (type.m_typeId) {
+ case NDB_TYPE_TINYINT:
+ {
+ Int8 x;
+ memcpy(&x, value, 1);
+ if (convert_flag)
+ Endian::convert(&x, 1);
+ p.print("%d", (int)x);
+ }
+ break;
+ case NDB_TYPE_TINYUNSIGNED:
+ {
+ Uint8 x;
+ memcpy(&x, value, 1);
+ if (convert_flag)
+ Endian::convert(&x, 1);
+ p.print("%u", (uint)x);
+ }
+ break;
+ case NDB_TYPE_SMALLINT:
+ {
+ Int16 x;
+ memcpy(&x, value, 2);
+ if (convert_flag)
+ Endian::convert(&x, 2);
+ p.print("%d", (int)x);
+ }
+ break;
+ case NDB_TYPE_SMALLUNSIGNED:
+ {
+ Uint16 x;
+ memcpy(&x, value, 2);
+ if (convert_flag)
+ Endian::convert(&x, 2);
+ p.print("%u", (uint)x);
+ }
+ break;
+ case NDB_TYPE_INT:
+ {
+ Int32 x;
+ memcpy(&x, value, 4);
+ if (convert_flag)
+ Endian::convert(&x, 4);
+ p.print("%d", (int)x);
+ }
+ break;
+ case NDB_TYPE_UNSIGNED:
+ {
+ Uint32 x;
+ memcpy(&x, value, 4);
+ if (convert_flag)
+ Endian::convert(&x, 4);
+ p.print("%u", (uint)x);
+ }
+ break;
+ case NDB_TYPE_FLOAT:
+ {
+ float x;
+ memcpy(&x, value, 4);
+ if (convert_flag)
+ Endian::convert(&x, 4);
+ p.print("%g", (double)x);
+ }
+ break;
+ case NDB_TYPE_DOUBLE:
+ {
+ double x;
+ memcpy(&x, value, 8);
+ if (convert_flag)
+ Endian::convert(&x, 8);
+ p.print("%g", x);
+ }
+ break;
+ case NDB_TYPE_CHAR:
+ case NDB_TYPE_VARCHAR:
+ case NDB_TYPE_LONGVARCHAR:
+ {
+ const Uint32 off = type.m_arrayType;
+ for (Uint32 j = 0; j < r.m_bareLen; j++)
+ {
+ Uint8 x = value[off + j];
+ p.print("%c", (int)x);
+ }
+ }
+ break;
+ default:
+ ok = false;
+ break;
+ }
+ if (!ok || g_ndb_pack_print_hex_always)
+ {
+ p.print("<");
+ for (Uint32 j = 0; j < r.m_itemLen; j++)
+ {
+ int x = value[j];
+ p.print("%02x", x);
+ }
+ p.print(">");
+ }
+ }
+ p.print("]");
+ }
+ return buf;
+}
+
+// print Data
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Data& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::Data::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Data::print(char* buf, Uint32 bufsz) const
+{
+ Print p(buf, bufsz);
+ char* ptr = buf;
+ if (m_varBytes != 0)
+ {
+ p.print("varBytes:");
+ for (Uint32 i = 0; i < m_varBytes; i++)
+ {
+ int r = m_buf[i];
+ p.print("%02x", r);
+ }
+ p.print(" ");
+ }
+ p.print("dataLen:%u", m_iter.m_itemPos + m_iter.m_itemLen);
+ p.print(" ");
+ const bool convert_flag =
+ m_endian != Endian::Native &&
+ m_endian != Endian::get_endian();
+ DataC::print(&buf[p.m_sz], bufsz - p.m_sz, convert_flag);
+ return buf;
+}
+
+// print BoundC
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::BoundC& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::BoundC::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::BoundC::print(char* buf, Uint32 bufsz) const
+{
+ Print p(buf, bufsz);
+ p.print("side:%s ", m_side < 0 ? "-" : m_side > 0 ? "+" : "0");
+ m_data.print(&buf[p.m_sz], bufsz - p.m_sz);
+ return buf;
+}
+
+// print Bound
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Bound& a)
+{
+ a.print(out);
+ return out;
+}
+
+void
+NdbPack::Bound::print(NdbOut& out) const
+{
+ char buf[8000];
+ out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Bound::print(char* buf, Uint32 bufsz) const
+{
+ BoundC::print(buf, bufsz);
+ return buf;
+}
+
+// validate
+
+int
+NdbPack::Type::validate() const
+{
+ Type type2 = *this;
+ if (type2.complete() == -1)
+ {
+ set_error(type2);
+ return -1;
+ }
+ if (memcmp(this, &type2, sizeof(Type)) != 0)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::Spec::validate() const
+{
+ Uint32 nullableCnt = 0;
+ Uint32 varsizeCnt = 0;
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ const Type& type = m_buf[i];
+ if (type.validate() == -1)
+ {
+ set_error(type);
+ return -1;
+ }
+ if (type.m_nullable)
+ nullableCnt++;
+ if (type.m_arrayType != 0)
+ varsizeCnt++;
+ }
+ if (m_nullableCnt != nullableCnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (m_varsizeCnt != varsizeCnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::Data::validate() const
+{
+ if (DataC::validate() == -1)
+ return -1;
+ const Iter& r = m_iter;
+ if (r.m_cnt != m_cnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ Iter r2(*this);
+ for (Uint32 i = 0; i < m_cnt; i++)
+ {
+ if (desc(r2) == -1)
+ return -1;
+ }
+ if (r.m_itemPos != r2.m_itemPos)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (r.m_cnt != r2.m_cnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (r.m_nullCnt != r2.m_nullCnt)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (r.m_itemLen != r2.m_itemLen)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::BoundC::validate() const
+{
+ if (m_data.validate() == -1)
+ {
+ set_error(m_data);
+ return -1;
+ }
+ if (m_data.m_cnt == 0 && m_side != 0)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ if (m_data.m_cnt != 0 && m_side != -1 && m_side != +1)
+ {
+ set_error(ValidationError, __LINE__);
+ return -1;
+ }
+ return 0;
+}
+
+int
+NdbPack::Bound::validate() const
+{
+ if (BoundC::validate() == -1)
+ return -1;
+ if (m_data.validate() == -1)
+ {
+ set_error(m_data);
+ return -1;
+ }
+ return 0;
+}
+
+#ifdef TEST_NDB_PACK
+#include <util/NdbTap.hpp>
+
+#define chk1(x) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; require(false); } while (0)
+
+#define chk2(x, e) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; ndbout << "NdbPack code: " << (e).get_error_code() << " line: " << (e).get_error_line() << endl; require(false); } while (0)
+
+#define ll0(x) do { if (verbose < 0) break; ndbout << "0- " << x << endl; } while (0)
+#define ll1(x) do { if (verbose < 1) break; ndbout << "1- " << x << endl; } while (0)
+#define ll2(x) do { if (verbose < 2) break; ndbout << "2- " << x << endl; } while (0)
+#define ll3(x) do { if (verbose < 3) break; ndbout << "3- " << x << endl; } while (0)
+
+#define xmin(a, b) ((a) < (b) ? (a) : (b))
+
+#include <ndb_rand.h>
+
+static uint // random 0..n-1
+getrandom(uint n)
+{
+ if (n != 0) {
+ uint k = ndb_rand();
+ return k % n;
+ }
+ return 0;
+}
+
+static uint // random 0..n-1 biased exponentially to smaller
+getrandom(uint n, uint bias)
+{
+ assert(bias != 0);
+ uint k = getrandom(n);
+ bias--;
+ while (bias != 0) {
+ k = getrandom(k + 1);
+ bias--;
+ }
+ return k;
+}
+
+static bool
+getrandompct(uint pct)
+{
+ return getrandom(100) < pct;
+}
+
+// change in TAPTEST
+static int seed = -1; // random
+static int loops = 0;
+static int spec_cnt = -1; // random
+static int fix_type = 0; // all types
+static int no_nullable = 0;
+static int data_cnt = -1; // Max
+static int bound_cnt = -1; // Max
+static int verbose = 0;
+
+struct Tspec {
+ enum { Max = 100 };
+ enum { MaxBuf = Max * 4000 };
+ NdbPack::Spec m_spec;
+ NdbPack::Type m_type[Max];
+ Tspec() {
+ m_spec.set_buf(m_type, Max);
+ }
+ void create();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tspec& tspec)
+{
+ out << tspec.m_spec;
+ return out;
+}
+
+void
+Tspec::create()
+{
+ m_spec.reset();
+ int cnt = spec_cnt == -1 ? 1 + getrandom(Tspec::Max, 3) : spec_cnt;
+ int i = 0;
+ while (i < cnt) {
+ int typeId = fix_type;
+ if (typeId == 0)
+ typeId = getrandom(g_ndb_pack_type_info_cnt);
+ const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+ switch (typeId) {
+ case NDB_TYPE_INT:
+ case NDB_TYPE_UNSIGNED:
+ case NDB_TYPE_CHAR:
+ case NDB_TYPE_VARCHAR:
+ case NDB_TYPE_LONGVARCHAR:
+ break;
+ default:
+ continue;
+ }
+ require(info.m_supported);
+ int byteSize = 0;
+ if (info.m_fixSize != 0)
+ byteSize = info.m_fixSize;
+ else if (info.m_arrayType == 0)
+ byteSize = 1 + getrandom(128, 1); // char(1-128)
+ else if (info.m_arrayType == 1)
+ byteSize = 1 + getrandom(256, 2); // varchar(0-255)
+ else if (info.m_arrayType == 2)
+ byteSize = 2 + getrandom(1024, 3); // longvarchar(0-1023)
+ else
+ require(false);
+ bool nullable = no_nullable ? false : getrandompct(50);
+ int csNumber = 0;
+ if (info.m_charType) {
+ csNumber = 8; // should include ascii
+ }
+ NdbPack::Type type(typeId, byteSize, nullable, csNumber);
+ chk2(m_spec.add(type) == 0, m_spec);
+ i++;
+ }
+ chk2(m_spec.validate() == 0, m_spec);
+}
+
+struct Tdata {
+ const Tspec& m_tspec;
+ NdbPack::Data m_data;
+ const bool m_isBound;
+ int m_cnt;
+ Uint8* m_xbuf; // unpacked
+ int m_xsize;
+ int m_xoff[Tspec::Max];
+ int m_xlen[Tspec::Max];
+ bool m_xnull[Tspec::Max];
+ int m_xnulls;
+ Uint32* m_poaiBuf; // plain old attr info
+ int m_poaiSize;
+ Uint8* m_packBuf; // packed
+ int m_packLen;
+ Tdata(Tspec& tspec, bool isBound, uint varBytes) :
+ m_tspec(tspec),
+ m_data(tspec.m_spec, isBound, varBytes),
+ m_isBound(isBound)
+ {
+ m_cnt = tspec.m_spec.get_cnt();
+ m_xbuf = 0;
+ m_poaiBuf = 0;
+ m_packBuf = 0;
+ }
+ ~Tdata() {
+ delete [] m_xbuf;
+ delete [] m_poaiBuf;
+ delete [] m_packBuf;
+ }
+ void create();
+ void add();
+ void finalize();
+ // compare using unpacked data
+ int xcmp(const Tdata& tdata2, int* num_eq) const;
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tdata& tdata)
+{
+ out << tdata.m_data;
+ return out;
+}
+
+void
+Tdata::create()
+{
+ union {
+ Uint8 xbuf[Tspec::MaxBuf];
+ Uint64 xbuf_align;
+ };
+ memset(xbuf, 0x3f, sizeof(xbuf));
+ m_xsize = 0;
+ m_xnulls = 0;
+ Uint32 poaiBuf[Tspec::MaxBuf / 4];
+ memset(poaiBuf, 0x5f, sizeof(poaiBuf));
+ m_poaiSize = 0;
+ m_packLen = m_data.get_var_bytes();
+ m_packLen += (m_tspec.m_spec.get_nullable_cnt(m_isBound) + 7) / 8;
+ int i = 0, j;
+ while (i < m_cnt) {
+ const NdbPack::Type& type = m_tspec.m_spec.get_type(i);
+ const int typeId = type.get_type_id();
+ const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+ m_xnull[i] = type.get_nullable() && getrandompct(25);
+ m_xnull[i] = false;
+ if (type.get_nullable() || m_isBound)
+ m_xnull[i] = getrandompct(20);
+ int pad = 0; // null-char pad not counted in xlen
+ if (!m_xnull[i]) {
+ m_xoff[i] = m_xsize;
+ Uint8* xptr = &xbuf[m_xsize];
+ switch (typeId) {
+ case NDB_TYPE_INT:
+ {
+ Int32 x = getrandom(10);
+ if (getrandompct(50))
+ x = (-1) * x;
+ memcpy(xptr, &x, 4);
+ m_xlen[i] = info.m_fixSize;
+ }
+ break;
+ case NDB_TYPE_UNSIGNED:
+ {
+ Uint32 x = getrandom(10);
+ memcpy(xptr, &x, 4);
+ m_xlen[i] = info.m_fixSize;
+ }
+ break;
+ case NDB_TYPE_CHAR:
+ {
+ require(type.get_byte_size() >= 1);
+ int max_len = type.get_byte_size();
+ int len = getrandom(max_len + 1, 1);
+ for (j = 0; j < len; j++)
+ {
+ xptr[j] = 'a' + getrandom(3);
+ }
+ for (j = len; j < max_len; j++)
+ {
+ xptr[j] = 0x20;
+ }
+ m_xlen[i] = max_len;
+ xptr[max_len] = 0;
+ pad = 1;
+ }
+ break;
+ case NDB_TYPE_VARCHAR:
+ {
+ require(type.get_byte_size() >= 1);
+ int max_len = type.get_byte_size() - 1;
+ int len = getrandom(max_len, 2);
+ require(len < 256);
+ xptr[0] = len;
+ for (j = 0; j < len; j++)
+ {
+ xptr[1 + j] = 'a' + getrandom(3);
+ }
+ m_xlen[i] = 1 + len;
+ xptr[1 + len] = 0;
+ pad = 1;
+ }
+ break;
+ case NDB_TYPE_LONGVARCHAR:
+ {
+ require(type.get_byte_size() >= 2);
+ int max_len = type.get_byte_size() - 2;
+ int len = getrandom(max_len, 3);
+ require(len < 256 * 256);
+ xptr[0] = (len & 0xFF);
+ xptr[1] = (len >> 8);
+ for (j = 0; j < len; j++)
+ {
+ xptr[2 + j] = 'a' + getrandom(3);
+ }
+ m_xlen[i] = 2 + len;
+ xptr[2 + len] = 0;
+ pad = 1;
+ }
+ break;
+ default:
+ require(false);
+ break;
+ }
+ m_xsize += m_xlen[i] + pad;
+ while (m_xsize % 8 != 0)
+ m_xsize++;
+ m_packLen += m_xlen[i];
+ } else {
+ m_xoff[i] = -1;
+ m_xlen[i] = 0;
+ m_xnulls++;
+ }
+ require(m_xnull[i] == (m_xoff[i] == -1));
+ require(m_xnull[i] == (m_xlen[i] == 0));
+ AttributeHeader* ah = (AttributeHeader*)&poaiBuf[m_poaiSize];
+ ah->setAttributeId(i); // not used
+ ah->setByteSize(m_xlen[i]);
+ m_poaiSize++;
+ if (!m_xnull[i]) {
+ memcpy(&poaiBuf[m_poaiSize], &xbuf[m_xoff[i]], m_xlen[i]);
+ m_poaiSize += (m_xlen[i] + 3) / 4;
+ }
+ i++;
+ }
+ require(m_xsize % 8 == 0);
+ m_xbuf = (Uint8*) new Uint64 [m_xsize / 8];
+ memcpy(m_xbuf, xbuf, m_xsize);
+ m_poaiBuf = (Uint32*) new Uint32 [m_poaiSize];
+ memcpy(m_poaiBuf, poaiBuf, m_poaiSize << 2);
+}
+
+void
+Tdata::add()
+{
+ m_packBuf = new Uint8 [m_packLen];
+ m_data.set_buf(m_packBuf, m_packLen);
+ int i, j;
+ j = 0;
+ while (j <= 1) {
+ if (j == 1)
+ m_data.reset();
+ i = 0;
+ while (i < m_cnt) {
+ Uint32 xlen = ~(Uint32)0;
+ if (!m_xnull[i]) {
+ int xoff = m_xoff[i];
+ const Uint8* xptr = &m_xbuf[xoff];
+ chk2(m_data.add(xptr, &xlen) == 0, m_data);
+ chk1((int)xlen == m_xlen[i]);
+ } else {
+ chk2(m_data.add_null(&xlen) == 0, m_data);
+ chk1(xlen == 0);
+ }
+ i++;
+ }
+ chk2(m_data.validate() == 0, m_data);
+ chk1((int)m_data.get_null_cnt() == m_xnulls);
+ j++;
+ }
+}
+
+void
+Tdata::finalize()
+{
+ chk2(m_data.finalize() == 0, m_data);
+ ll3("create: " << m_data);
+ chk1((int)m_data.get_full_len() == m_packLen);
+ {
+ const Uint8* p = (const Uint8*)m_data.get_full_buf();
+ chk1(p[0] + (p[1] << 8) == m_packLen - 2);
+ }
+}
+
+int
+Tdata::xcmp(const Tdata& tdata2, int* num_eq) const
+{
+ const Tdata& tdata1 = *this;
+ require(&tdata1.m_tspec == &tdata2.m_tspec);
+ const Tspec& tspec = tdata1.m_tspec;
+ int res = 0;
+ int cnt = xmin(tdata1.m_cnt, tdata2.m_cnt);
+ int i;
+ for (i = 0; i < cnt; i++) {
+ if (!tdata1.m_xnull[i]) {
+ if (!tdata2.m_xnull[i]) {
+ // the pointers are Uint64-aligned
+ const Uint8* xptr1 = &tdata1.m_xbuf[tdata1.m_xoff[i]];
+ const Uint8* xptr2 = &tdata2.m_xbuf[tdata2.m_xoff[i]];
+ const int xlen1 = tdata1.m_xlen[i];
+ const int xlen2 = tdata2.m_xlen[i];
+ const NdbPack::Type& type = tspec.m_spec.get_type(i);
+ const int typeId = type.get_type_id();
+ const int csNumber = type.get_cs_number();
+ CHARSET_INFO* cs = all_charsets[csNumber];
+ switch (typeId) {
+ case NDB_TYPE_INT:
+ {
+ require(cs == 0);
+ Int32 x1 = *(const Int32*)xptr1;
+ Int32 x2 = *(const Int32*)xptr2;
+ if (x1 < x2)
+ res = -1;
+ else if (x1 > x2)
+ res = +1;
+ ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
+ }
+ break;
+ case NDB_TYPE_UNSIGNED:
+ {
+ require(cs == 0);
+ Uint32 x1 = *(const Uint32*)xptr1;
+ Uint32 x2 = *(const Uint32*)xptr2;
+ if (x1 < x2)
+ res = -1;
+ else if (x1 > x2)
+ res = +1;
+ ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
+ }
+ break;
+ case NDB_TYPE_CHAR:
+ {
+ require(cs != 0 && cs->coll != 0);
+ const uint n1 = xlen1;
+ const uint n2 = xlen2;
+ const uchar* t1 = &xptr1[0];
+ const uchar* t2 = &xptr2[0];
+ const char* s1 = (const char*)t1;
+ const char* s2 = (const char*)t2;
+ chk1(n1 == strlen(s1));
+ chk1(n2 == strlen(s2));
+ res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+ ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+ }
+ break;
+ case NDB_TYPE_VARCHAR:
+ {
+ require(cs != 0 && cs->coll != 0);
+ const uint n1 = xptr1[0];
+ const uint n2 = xptr2[0];
+ const uchar* t1 = &xptr1[1];
+ const uchar* t2 = &xptr2[1];
+ const char* s1 = (const char*)t1;
+ const char* s2 = (const char*)t2;
+ chk1(n1 == strlen(s1));
+ chk1(n2 == strlen(s2));
+ res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+ ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+ }
+ break;
+ case NDB_TYPE_LONGVARCHAR:
+ {
+ require(cs != 0 && cs->coll != 0);
+ const uint n1 = xptr1[0] | (xptr1[1] << 8);
+ const uint n2 = xptr2[0] | (xptr2[1] << 8);
+ const uchar* t1 = &xptr1[2];
+ const uchar* t2 = &xptr2[2];
+ const char* s1 = (const char*)t1;
+ const char* s2 = (const char*)t2;
+ chk1(n1 == strlen(s1));
+ chk1(n2 == strlen(s2));
+ res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+ ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+ }
+ break;
+ default:
+ require(false);
+ break;
+ }
+ } else
+ res = +1;
+ } else if (!tdata2.m_xnull[i])
+ res = -1;
+ if (res != 0)
+ break;
+ }
+ *num_eq = i;
+ ll3("xcmp res:" << res << " num_eq:" << *num_eq);
+ return res;
+}
+
+struct Tbound {
+ Tdata& m_tdata;
+ NdbPack::Bound m_bound;
+ Tbound(Tdata& tdata) :
+ m_tdata(tdata),
+ m_bound(tdata.m_data)
+ {
+ m_tdata.m_cnt = 1 + getrandom(m_tdata.m_cnt);
+ }
+ void create();
+ void add();
+ void finalize();
+ int xcmp(const Tdata& tdata2, int* num_eq) const;
+ int xcmp(const Tbound& tbound2, int* num_eq) const;
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tbound& tbound)
+{
+ out << tbound.m_bound;
+ return out;
+}
+
+void
+Tbound::create()
+{
+ m_tdata.create();
+}
+
+void
+Tbound::add()
+{
+ m_tdata.add();
+}
+
+void
+Tbound::finalize()
+{
+ int side = getrandompct(50) ? -1 : +1;
+ chk2(m_bound.finalize(side) == 0, m_bound);
+ chk2(m_bound.validate() == 0, m_bound);
+ chk1((int)m_tdata.m_data.get_full_len() == m_tdata.m_packLen);
+}
+
+int
+Tbound::xcmp(const Tdata& tdata2, int* num_eq) const
+{
+ const Tbound& tbound1 = *this;
+ const Tdata& tdata1 = tbound1.m_tdata;
+ require(tdata1.m_cnt <= tdata2.m_cnt);
+ *num_eq = -1;
+ int res = tdata1.xcmp(tdata2, num_eq);
+ if (res == 0) {
+ chk1(*num_eq == tdata1.m_cnt);
+ res = m_bound.get_side();
+ }
+ return res;
+}
+
+int
+Tbound::xcmp(const Tbound& tbound2, int* num_eq) const
+{
+ const Tbound& tbound1 = *this;
+ const Tdata& tdata1 = tbound1.m_tdata;
+ const Tdata& tdata2 = tbound2.m_tdata;
+ *num_eq = -1;
+ int res = tdata1.xcmp(tdata2, num_eq);
+ chk1(0 <= *num_eq && *num_eq <= xmin(tdata1.m_cnt, tdata2.m_cnt));
+ if (res == 0) {
+ chk1(*num_eq == xmin(tdata1.m_cnt, tdata2.m_cnt));
+ if (tdata1.m_cnt < tdata2.m_cnt)
+ res = (+1) * tbound1.m_bound.get_side();
+ else if (tdata1.m_cnt > tdata2.m_cnt)
+ res = (-1) * tbound2.m_bound.get_side();
+ else if (tbound1.m_bound.get_side() < tbound2.m_bound.get_side())
+ res = -1;
+ else if (tbound1.m_bound.get_side() > tbound2.m_bound.get_side())
+ res = +1;
+ }
+ return res;
+}
+
+struct Tdatalist {
+ enum { Max = 1000 };
+ Tdata* m_tdata[Max];
+ int m_cnt;
+ Tdatalist(Tspec& tspec) {
+ m_cnt = data_cnt == -1 ? Max : data_cnt;
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ m_tdata[i] = new Tdata(tspec, false, 2);
+ }
+ }
+ ~Tdatalist() {
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ delete m_tdata[i];
+ }
+ }
+ void create();
+ void sort();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tdatalist& tdatalist)
+{
+ int i;
+ for (i = 0; i < tdatalist.m_cnt; i++) {
+ out << "data " << i << ": " << *tdatalist.m_tdata[i];
+ if (i + 1 < tdatalist.m_cnt)
+ out << endl;
+ }
+ return out;
+}
+
+void
+Tdatalist::create()
+{
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ Tdata& tdata = *m_tdata[i];
+ tdata.create();
+ tdata.add();
+ tdata.finalize();
+ }
+}
+
+static int
+data_cmp(const void* a1, const void* a2)
+{
+ const Tdata& tdata1 = **(const Tdata**)a1;
+ const Tdata& tdata2 = **(const Tdata**)a2;
+ require(tdata1.m_cnt == tdata2.m_cnt);
+ const Uint32 cnt = tdata1.m_cnt;
+ Uint32 num_eq = ~(Uint32)0;
+ int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq);
+ require(num_eq <= (Uint32)tdata1.m_cnt);
+ require(num_eq <= (Uint32)tdata2.m_cnt);
+ return res;
+}
+
+void
+Tdatalist::sort()
+{
+ ll1("data sort: in");
+ ll3(endl << *this);
+ qsort(m_tdata, m_cnt, sizeof(Tdata*), data_cmp);
+ ll1("data sort: out");
+ ll3(endl << *this);
+ int i;
+ for (i = 0; i + 1 < m_cnt; i++) {
+ const Tdata& tdata1 = *m_tdata[i];
+ const Tdata& tdata2 = *m_tdata[i + 1];
+ require(tdata1.m_cnt == tdata2.m_cnt);
+ const Uint32 cnt = tdata1.m_cnt;
+ Uint32 num_eq1 = ~(Uint32)0;
+ int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq1);
+ chk1(res <= 0);
+ // also via unpacked data
+ int num_eq2 = -1;
+ int res2 = tdata1.xcmp(tdata2, &num_eq2);
+ if (res < 0)
+ chk1(res2 < 0);
+ else if (res == 0)
+ chk1(res2 == 0);
+ else
+ chk1(res2 > 0);
+ chk1(num_eq1 == (Uint32)num_eq2);
+ }
+}
+
+struct Tboundlist {
+ enum { Max = 1000 };
+ Tbound* m_tbound[Max];
+ int m_cnt;
+ Tboundlist(Tspec& tspec) {
+ m_cnt = bound_cnt == -1 ? Max : bound_cnt;
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ Tdata* tdata = new Tdata(tspec, true, 0);
+ m_tbound[i] = new Tbound(*tdata);
+ }
+ }
+ ~Tboundlist() {
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ Tdata* tdata = &m_tbound[i]->m_tdata;
+ delete m_tbound[i];
+ delete tdata;
+ }
+ }
+ void create();
+ void sort();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tboundlist& tboundlist)
+{
+ int i;
+ for (i = 0; i < tboundlist.m_cnt; i++) {
+ out << "bound " << i << ": " << *tboundlist.m_tbound[i];
+ if (i + 1 < tboundlist.m_cnt)
+ out << endl;
+ }
+ return out;
+}
+
+void
+Tboundlist::create()
+{
+ int i;
+ for (i = 0; i < m_cnt; i++) {
+ Tbound& tbound = *m_tbound[i];
+ tbound.create();
+ tbound.add();
+ tbound.finalize();
+ }
+}
+
+static int
+bound_cmp(const void* a1, const void* a2)
+{
+ const Tbound& tbound1 = **(const Tbound**)a1;
+ const Tbound& tbound2 = **(const Tbound**)a2;
+ const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
+ Uint32 num_eq = ~(Uint32)0;
+ int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq);
+ require(num_eq <= cnt);
+ require(num_eq <= cnt);
+ return res;
+}
+
+void
+Tboundlist::sort()
+{
+ ll1("bound sort: in");
+ ll3(endl << *this);
+ qsort(m_tbound, m_cnt, sizeof(Tbound*), bound_cmp);
+ ll1("bound sort: out");
+ ll3(endl << *this);
+ int i;
+ for (i = 0; i + 1 < m_cnt; i++) {
+ const Tbound& tbound1 = *m_tbound[i];
+ const Tbound& tbound2 = *m_tbound[i + 1];
+ const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
+ Uint32 num_eq1 = ~(Uint32)0;
+ int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq1);
+ chk1(res <= 0);
+ // also via unpacked data
+ int num_eq2 = -1;
+ int res2 = tbound1.xcmp(tbound2, &num_eq2);
+ if (res < 0)
+ chk1(res2 < 0);
+ else if (res == 0)
+ chk1(res2 == 0);
+ else
+ chk1(res2 > 0);
+ chk1(num_eq1 == (Uint32)num_eq2);
+ }
+}
+
+static void
+testdesc(const Tdata& tdata)
+{
+ ll3("testdesc: " << tdata);
+ const Tspec& tspec = tdata.m_tspec;
+ const NdbPack::Data& data = tdata.m_data;
+ const Uint8* buf_old = (const Uint8*)data.get_full_buf();
+ const Uint32 varBytes = data.get_var_bytes();
+ const Uint32 nullMaskLen = tspec.m_spec.get_nullmask_len(false);
+ const Uint32 dataLen = data.get_data_len();
+ const Uint32 fullLen = data.get_full_len();
+ const Uint32 cnt = data.get_cnt();
+ chk1(fullLen == varBytes + dataLen);
+ NdbPack::Data data_new(tspec.m_spec, false, varBytes);
+ Uint8 buf_new[Tspec::MaxBuf];
+ data_new.set_buf(buf_new, sizeof(buf_new));
+ memcpy(buf_new, buf_old, fullLen);
+ chk2(data_new.desc_all(cnt) == 0, data_new);
+ chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
+ chk1(data_new.get_data_len() == data.get_data_len());
+ chk1(data_new.get_cnt() == data.get_cnt());
+ chk1(data_new.get_null_cnt() == data.get_null_cnt());
+}
+
+static void
+testcopy(const Tdata& tdata)
+{
+ ll3("testcopy: " << tdata);
+ const Tspec& tspec = tdata.m_tspec;
+ const NdbPack::Data& data = tdata.m_data;
+ uint n = getrandom(tdata.m_cnt + 1);
+ do {
+ ll3("testcopy: cnt:" << tdata.m_cnt << " n:" << n);
+ NdbPack::DataC data_old(tspec.m_spec, false);
+ data_old.set_buf(data.get_data_buf(), data.get_data_len(), n);
+ chk1(data_old.get_cnt() == n);
+ NdbPack::Data data_new(tspec.m_spec, false, 0);
+ Uint8 buf_new[Tspec::MaxBuf];
+ data_new.set_buf(buf_new, sizeof(buf_new));
+ chk2(data_new.copy(data_old) == 0, data_new);
+ chk1(data_new.get_cnt() == n);
+ Uint32 num_eq1 = ~(Uint32)0;
+ chk1(data_new.cmp(data_old, n, num_eq1) == 0);
+ chk1(num_eq1 == n);
+ Uint32 num_eq2 = ~(Uint32)0;
+ chk1(data_old.cmp(data_new, n, num_eq2) == 0);
+ chk1(num_eq2 == n);
+ n = getrandom(n);
+ } while (n != 0);
+}
+
+static void
+testpoai(const Tdata& tdata)
+{
+ ll3("testpoai: " << tdata);
+ const Tspec& tspec = tdata.m_tspec;
+ const NdbPack::Data& data = tdata.m_data;
+ NdbPack::Data data_new(tspec.m_spec, false, data.get_var_bytes());
+ Uint8 buf_new[Tspec::MaxBuf];
+ data_new.set_buf(buf_new, sizeof(buf_new));
+ Uint32 poaiLen = ~(Uint32)0;
+ chk2(data_new.add_poai(tdata.m_poaiBuf, tdata.m_cnt, &poaiLen) == 0, data);
+ chk2(data_new.finalize() == 0, data_new);
+ chk2(data_new.validate() == 0, data_new);
+ chk1(tspec.m_spec.get_nullmask_len(false) + poaiLen == data.get_data_len());
+ chk1(data_new.get_full_len() == data.get_full_len());
+ chk1(memcmp(data_new.get_full_buf(), data.get_full_buf(), data.get_full_len()) == 0);
+ chk1(data_new.get_null_cnt() == data.get_null_cnt());
+}
+
+static void
+testconvert(const Tdata& tdata)
+{
+ ll3("testconvert: " << tdata);
+ const Tspec& tspec = tdata.m_tspec;
+ const NdbPack::Data& data = tdata.m_data;
+ NdbPack::Data data_new(tspec.m_spec, false, 2);
+ Uint8 buf_new[Tspec::MaxBuf];
+ data_new.set_buf(buf_new, sizeof(buf_new));
+ chk2(data_new.copy(data) == 0, data_new);
+ require(tdata.m_cnt == (int)data.get_cnt());
+ require(data.get_cnt() == data_new.get_cnt());
+ const Uint32 cnt = tdata.m_cnt;
+ Uint32 num_eq;
+ switch (NdbPack::Endian::get_endian()) {
+ case NdbPack::Endian::Little:
+ chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+ num_eq = ~(Uint32)0;
+ chk1(data.cmp(data_new, cnt, num_eq) == 0);
+ chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+ chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+ num_eq = ~(Uint32)0;
+ chk1(data.cmp(data_new, cnt, num_eq) == 0);
+ break;
+ case NdbPack::Endian::Big:
+ chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+ num_eq = ~(Uint32)0;
+ chk1(data.cmp(data_new, cnt, num_eq) == 0);
+ chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+ chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+ num_eq = ~(Uint32)0;
+ chk1(data.cmp(data_new, cnt, num_eq) == 0);
+ break;
+ default:
+ require(false);
+ break;
+ }
+}
+
+static void
+testdata(const Tdatalist& tdatalist)
+{
+ int i;
+ for (i = 0; i < tdatalist.m_cnt; i++) {
+ const Tdata& tdata = *tdatalist.m_tdata[i];
+ testdesc(tdata);
+ testcopy(tdata);
+ testpoai(tdata);
+ testconvert(tdata);
+ }
+}
+
+static void
+testcmp(const Tbound& tbound, const Tdatalist& tdatalist, int* kb)
+{
+ ll3("testcmp: " << tbound);
+ int oldres = 0;
+ int n1 = 0;
+ int n2 = 0;
+ int i;
+ for (i = 0; i < tdatalist.m_cnt; i++) {
+ const Tdata& tdata = *tdatalist.m_tdata[i];
+ require(tbound.m_tdata.m_cnt == (int)tbound.m_bound.get_data().get_cnt());
+ const Uint32 cnt = tbound.m_tdata.m_cnt;
+ Uint32 num_eq1 = ~(Uint32)0;
+ // reverse result for key vs bound
+ int res = (-1) * tbound.m_bound.cmp(tdata.m_data, cnt, num_eq1);
+ chk1(res != 0);
+ res = (res < 0 ? (n1++, -1) : (n2++, +1));
+ if (i > 0) {
+ // at some point flips from -1 to +1
+ chk1(oldres <= res);
+ }
+ oldres = res;
+ // also via unpacked data
+ int num_eq2 = -1;
+ int res2 = (-1) * tbound.xcmp(tdata, &num_eq2);
+ if (res < 0)
+ chk1(res2 < 0);
+ else
+ chk1(res2 > 0);
+ chk1(num_eq1 == (Uint32)num_eq2);
+ }
+ require(n1 + n2 == tdatalist.m_cnt);
+ ll2("keys before:" << n1 << " after:" << n2);
+ *kb = n1;
+}
+
+static void
+testcmp(const Tboundlist& tboundlist, const Tdatalist& tdatalist)
+{
+ int i;
+ int oldkb = 0;
+ for (i = 0; i < tboundlist.m_cnt; i++) {
+ const Tbound& tbound = *tboundlist.m_tbound[i];
+ int kb = 0;
+ testcmp(tbound, tdatalist, &kb);
+ if (i > 0) {
+ chk1(oldkb <= kb);
+ }
+ oldkb = kb;
+ }
+}
+
+static void
+testrun()
+{
+ Tspec tspec;
+ tspec.create();
+ ll1("spec: " << tspec);
+ Tdatalist tdatalist(tspec);
+ tdatalist.create();
+ tdatalist.sort();
+ testdata(tdatalist);
+ if (bound_cnt != 0) {
+ Tboundlist tboundlist(tspec);
+ tboundlist.create();
+ tboundlist.sort();
+ testcmp(tboundlist, tdatalist);
+ }
+}
+
+extern void NdbOut_Init();
+
+static int
+testmain()
+{
+ my_init();
+ NdbOut_Init();
+ signal(SIGABRT, SIG_DFL);
+ { const char* p = NdbEnv_GetEnv("TEST_NDB_PACK_VERBOSE", (char*)0, 0);
+ if (p != 0)
+ verbose = atoi(p);
+ }
+ if (seed == 0)
+ ll0("random seed: loop number");
+ else {
+ if (seed < 0)
+ seed = getpid();
+ ll0("random seed: " << seed);
+ ndb_srand(seed);
+ }
+ loops = 100;
+ int i;
+ for (i = 0; loops == 0 || i < loops; i++) {
+ ll0("loop:" << i << "/" << loops);
+ if (seed == 0)
+ ndb_srand(i);
+ testrun();
+ }
+ // do not print "ok" in TAPTEST
+ ndbout << "passed" << endl;
+ return 0;
+}
+
+TAPTEST(NdbPack)
+{
+ int ret = testmain();
+ return (ret == 0);
+}
+
+#endif
=== modified file 'storage/ndb/src/common/util/NdbSqlUtil.cpp'
--- a/storage/ndb/src/common/util/NdbSqlUtil.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/common/util/NdbSqlUtil.cpp 2011-05-04 16:04:33 +0000
@@ -17,6 +17,7 @@
#include <NdbSqlUtil.hpp>
#include <ndb_version.h>
+#include <math.h>
/*
* Data types. The entries must be in the numerical order.
@@ -222,243 +223,185 @@ NdbSqlUtil::getType(Uint32 typeId)
return m_typeList[Type::Undefined];
}
-const NdbSqlUtil::Type&
-NdbSqlUtil::getTypeBinary(Uint32 typeId)
-{
- switch (typeId) {
- case Type::Char:
- case Type::Varchar:
- case Type::Binary:
- case Type::Varbinary:
- case Type::Longvarchar:
- case Type::Longvarbinary:
- typeId = Type::Binary;
- break;
- case Type::Text:
- typeId = Type::Blob;
- break;
- default:
- break;
- }
- return getType(typeId);
-}
-
/*
* Comparison functions.
*/
int
-NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Int8)) {
- Int8 v1, v2;
- memcpy(&v1, p1, sizeof(Int8));
- memcpy(&v2, p2, sizeof(Int8));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 1 && n2 == 1);
+ Int8 v1, v2;
+ memcpy(&v1, p1, 1);
+ memcpy(&v2, p2, 1);
+ int w1 = (int)v1;
+ int w2 = (int)v2;
+ return w1 - w2;
}
int
-NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Uint8)) {
- Uint8 v1, v2;
- memcpy(&v1, p1, sizeof(Uint8));
- memcpy(&v2, p2, sizeof(Uint8));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 1 && n2 == 1);
+ Uint8 v1, v2;
+ memcpy(&v1, p1, 1);
+ memcpy(&v2, p2, 1);
+ int w1 = (int)v1;
+ int w2 = (int)v2;
+ return w1 - w2;
}
int
-NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Int16)) {
- Int16 v1, v2;
- memcpy(&v1, p1, sizeof(Int16));
- memcpy(&v2, p2, sizeof(Int16));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 2 && n2 == 2);
+ Int16 v1, v2;
+ memcpy(&v1, p1, 2);
+ memcpy(&v2, p2, 2);
+ int w1 = (int)v1;
+ int w2 = (int)v2;
+ return w1 - w2;
}
int
-NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Uint16)) {
- Uint16 v1, v2;
- memcpy(&v1, p1, sizeof(Uint16));
- memcpy(&v2, p2, sizeof(Uint16));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 2 && n2 == 2);
+ Uint16 v1, v2;
+ memcpy(&v1, p1, 2);
+ memcpy(&v2, p2, 2);
+ int w1 = (int)v1;
+ int w2 = (int)v2;
+ return w1 - w2;
}
int
-NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= 3) {
- Int32 v1, v2;
- v1 = sint3korr((const uchar*)p1);
- v2 = sint3korr((const uchar*)p2);
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 3 && n2 == 3);
+ uchar b1[4];
+ uchar b2[4];
+ memcpy(b1, p1, 3);
+ b1[3] = 0;
+ memcpy(b2, p2, 3);
+ b2[3] = 0;
+ int w1 = (int)sint3korr(b1);
+ int w2 = (int)sint3korr(b2);
+ return w1 - w2;
}
int
-NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= 3) {
- Uint32 v1, v2;
- v1 = uint3korr((const uchar*)p1);
- v2 = uint3korr((const uchar*)p2);
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 3 && n2 == 3);
+ uchar b1[4];
+ uchar b2[4];
+ memcpy(b1, p1, 3);
+ b1[3] = 0;
+ memcpy(b2, p2, 3);
+ b2[3] = 0;
+ int w1 = (int)uint3korr(b1);
+ int w2 = (int)uint3korr(b2);
+ return w1 - w2;
}
int
-NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Int32)) {
- Int32 v1, v2;
- memcpy(&v1, p1, sizeof(Int32));
- memcpy(&v2, p2, sizeof(Int32));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 4 && n2 == 4);
+ Int32 v1, v2;
+ memcpy(&v1, p1, 4);
+ memcpy(&v2, p2, 4);
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
}
int
-NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Uint32)) {
- Uint32 v1, v2;
- memcpy(&v1, p1, sizeof(Uint32));
- memcpy(&v2, p2, sizeof(Uint32));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 4 && n2 == 4);
+ Uint32 v1, v2;
+ memcpy(&v1, p1, 4);
+ memcpy(&v2, p2, 4);
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
}
int
-NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Int64)) {
- Int64 v1, v2;
- memcpy(&v1, p1, sizeof(Int64));
- memcpy(&v2, p2, sizeof(Int64));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 8 && n2 == 8);
+ Int64 v1, v2;
+ memcpy(&v1, p1, 8);
+ memcpy(&v2, p2, 8);
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
}
int
-NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Uint64)) {
- Uint64 v1, v2;
- memcpy(&v1, p1, sizeof(Uint64));
- memcpy(&v2, p2, sizeof(Uint64));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 8 && n2 == 8);
+ Uint64 v1, v2;
+ memcpy(&v1, p1, 8);
+ memcpy(&v2, p2, 8);
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
}
int
-NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(float)) {
- float v1, v2;
- memcpy(&v1, p1, sizeof(float));
- memcpy(&v2, p2, sizeof(float));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 4 && n2 == 4);
+ float v1, v2;
+ memcpy(&v1, p1, 4);
+ memcpy(&v2, p2, 4);
+ require(!isnan(v1) && !isnan(v2));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
}
int
-NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(double)) {
- double v1, v2;
- memcpy(&v1, p1, sizeof(double));
- memcpy(&v2, p2, sizeof(double));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 8 && n2 == 8);
+ double v1, v2;
+ memcpy(&v1, p1, 8);
+ memcpy(&v2, p2, 8);
+ require(!isnan(v1) && !isnan(v2));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
}
int
-NdbSqlUtil::cmp_olddecimal(const uchar* s1, const uchar* s2, unsigned n)
+NdbSqlUtil::cmpOlddecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
+ assert(info == 0 && n1 == n2);
+ const uchar* v1 = (const uchar*)p1;
+ const uchar* v2 = (const uchar*)p2;
int sgn = +1;
unsigned i = 0;
- while (i < n) {
- int c1 = s1[i];
- int c2 = s2[i];
+ while (i < n1) {
+ int c1 = v1[i];
+ int c2 = v2[i];
if (c1 == c2) {
if (c1 == '-')
sgn = -1;
@@ -477,225 +420,121 @@ NdbSqlUtil::cmp_olddecimal(const uchar*
}
int
-NdbSqlUtil::cmpOlddecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpOlddecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (full) {
- assert(n1 == n2);
- const uchar* v1 = (const uchar*)p1;
- const uchar* v2 = (const uchar*)p2;
- return cmp_olddecimal(v1, v2, n1);
- }
- return CmpUnknown;
+ return cmpOlddecimal(info, p1, n1, p2, n2);
}
int
-NdbSqlUtil::cmpOlddecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (full) {
- assert(n1 == n2);
- const uchar* v1 = (const uchar*)p1;
- const uchar* v2 = (const uchar*)p2;
- return cmp_olddecimal(v1, v2, n1);
- }
- return CmpUnknown;
+ return cmpBinary(info, p1, n1, p2, n2);
}
int
-NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- const uchar* v1 = (const uchar*)p1;
- const uchar* v2 = (const uchar*)p2;
- // compare as binary strings
- unsigned n = (n1 <= n2 ? n1 : n2);
- int k = memcmp(v1, v2, n);
- if (k == 0) {
- k = (full ? n1 : n) - n2;
- }
- return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
+ return cmpBinary(info, p1, n1, p2, n2);
}
int
-NdbSqlUtil::cmpDecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
+ // allow different lengths
+ assert(info != 0);
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
- // compare as binary strings
- unsigned n = (n1 <= n2 ? n1 : n2);
- int k = memcmp(v1, v2, n);
- if (k == 0) {
- k = (full ? n1 : n) - n2;
- }
- return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
+ CHARSET_INFO* cs = (CHARSET_INFO*)info;
+ // compare with space padding
+ int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false);
+ return k;
}
int
-NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- // collation does not work on prefix for some charsets
- assert(full);
+ assert(info != 0);
+ const uint lb = 1;
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
- // not const in MySQL
- CHARSET_INFO* cs = (CHARSET_INFO*)(info);
+ uint m1 = v1[0];
+ uint m2 = v2[0];
+ require(lb + m1 <= n1 && lb + m2 <= n2);
+ CHARSET_INFO* cs = (CHARSET_INFO*)info;
// compare with space padding
- int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false);
- return k < 0 ? -1 : k > 0 ? +1 : 0;
+ int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
+ return k;
}
int
-NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- const unsigned lb = 1;
- // collation does not work on prefix for some charsets
- assert(full && n1 >= lb && n2 >= lb);
+ // allow different lengths
+ assert(info == 0);
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
- unsigned m1 = *v1;
- unsigned m2 = *v2;
- if (m1 <= n1 - lb && m2 <= n2 - lb) {
- CHARSET_INFO* cs = (CHARSET_INFO*)(info);
- // compare with space padding
- int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
- return k < 0 ? -1 : k > 0 ? +1 : 0;
+ int k = 0;
+ if (n1 < n2) {
+ k = memcmp(v1, v2, n1);
+ if (k == 0)
+ k = -1;
+ } else if (n1 > n2) {
+ k = memcmp(v1, v2, n2);
+ if (k == 0)
+ k = +1;
+ } else {
+ k = memcmp(v1, v2, n1);
}
- // treat bad data as NULL
- if (m1 > n1 - lb && m2 <= n2 - lb)
- return -1;
- if (m1 <= n1 - lb && m2 > n2 - lb)
- return +1;
- return 0;
+ return k;
}
int
-NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
+ assert(info == 0);
+ const uint lb = 1;
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
- // compare as binary strings
- unsigned n = (n1 <= n2 ? n1 : n2);
- int k = memcmp(v1, v2, n);
- if (k == 0) {
- k = (full ? n1 : n) - n2;
- }
- return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
+ uint m1 = v1[0];
+ uint m2 = v2[0];
+ require(lb + m1 <= n1 && lb + m2 <= n2);
+ int k = cmpBinary(info, v1 + lb, m1, v2 + lb, m2);
+ return k;
}
int
-NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- const unsigned lb = 1;
- if (n2 >= lb) {
- assert(n1 >= lb);
- const uchar* v1 = (const uchar*)p1;
- const uchar* v2 = (const uchar*)p2;
- unsigned m1 = *v1;
- unsigned m2 = *v2;
- if (m1 <= n1 - lb && m2 <= n2 - lb) {
- // compare as binary strings
- unsigned m = (m1 <= m2 ? m1 : m2);
- int k = memcmp(v1 + lb, v2 + lb, m);
- if (k == 0) {
- k = (full ? m1 : m) - m2;
- }
- return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
- }
- // treat bad data as NULL
- if (m1 > n1 - lb && m2 <= n2 - lb)
- return -1;
- if (m1 <= n1 - lb && m2 > n2 - lb)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 8 && n2 == 8);
+ Int64 v1, v2;
+ memcpy(&v1, p1, sizeof(Int64));
+ memcpy(&v2, p2, sizeof(Int64));
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
}
int
-NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDate(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Int64)) {
- Int64 v1, v2;
- memcpy(&v1, p1, sizeof(Int64));
- memcpy(&v2, p2, sizeof(Int64));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
-}
-
-int
-NdbSqlUtil::cmpDate(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
-{
-#ifdef ndb_date_is_4_byte_native_int
- if (n2 >= sizeof(Int32)) {
- Int32 v1, v2;
- memcpy(&v1, p1, sizeof(Int32));
- memcpy(&v2, p2, sizeof(Int32));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
-#else
-#ifdef ndb_date_sol9x86_cc_xO3_madness
- if (n2 >= 3) {
- const uchar* v1 = (const uchar*)p1;
- const uchar* v2 = (const uchar*)p2;
- // from Field_newdate::val_int
- Uint64 j1 = uint3korr(v1);
- Uint64 j2 = uint3korr(v2);
- j1 = (j1 % 32L)+(j1 / 32L % 16L)*100L + (j1/(16L*32L))*10000L;
- j2 = (j2 % 32L)+(j2 / 32L % 16L)*100L + (j2/(16L*32L))*10000L;
- if (j1 < j2)
- return -1;
- if (j1 > j2)
- return +1;
- return 0;
- }
-#else
- if (n2 >= 3) {
- const uchar* v1 = (const uchar*)p1;
- const uchar* v2 = (const uchar*)p2;
- uint j1 = uint3korr(v1);
- uint j2 = uint3korr(v2);
- uint d1 = (j1 & 31);
- uint d2 = (j2 & 31);
- j1 = (j1 >> 5);
- j2 = (j2 >> 5);
- uint m1 = (j1 & 15);
- uint m2 = (j2 & 15);
- j1 = (j1 >> 4);
- j2 = (j2 >> 4);
- uint y1 = j1;
- uint y2 = j2;
- if (y1 < y2)
- return -1;
- if (y1 > y2)
- return +1;
- if (m1 < m2)
- return -1;
- if (m1 > m2)
- return +1;
- if (d1 < d2)
- return -1;
- if (d1 > d2)
- return +1;
- return 0;
- }
-#endif
-#endif
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 3 && n2 == 3);
+ uchar b1[4];
+ uchar b2[4];
+ memcpy(b1, p1, 3);
+ b1[3] = 0;
+ memcpy(b2, p2, 3);
+ b2[3] = 0;
+ // from Field_newdate::val_int
+ int w1 = (int)uint3korr(b1);
+ int w2 = (int)uint3korr(b2);
+ return w1 - w2;
}
// not supported
int
-NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
assert(false);
return 0;
@@ -703,14 +542,14 @@ NdbSqlUtil::cmpBlob(const void* info, co
// not supported
int
-NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
assert(false);
return 0;
}
int
-NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
/* Bitfields are stored as 32-bit words
* This means that a byte-by-byte comparison will not work on all platforms
@@ -733,7 +572,7 @@ NdbSqlUtil::cmpBit(const void* info, con
memcpy(copyP1, p1, words << 2);
memcpy(copyP2, p2, words << 2);
- return cmpBit(info, copyP1, bytes, copyP2, bytes, full);
+ return cmpBit(info, copyP1, bytes, copyP2, bytes);
}
const Uint32* wp1= (const Uint32*) p1;
@@ -764,112 +603,81 @@ NdbSqlUtil::cmpBit(const void* info, con
int
-NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= 3) {
- const uchar* v1 = (const uchar*)p1;
- const uchar* v2 = (const uchar*)p2;
- // from Field_time::val_int
- Int32 j1 = sint3korr(v1);
- Int32 j2 = sint3korr(v2);
- if (j1 < j2)
- return -1;
- if (j1 > j2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 3 && n2 == 3);
+ uchar b1[4];
+ uchar b2[4];
+ memcpy(b1, p1, 3);
+ b1[3] = 0;
+ memcpy(b2, p2, 3);
+ b2[3] = 0;
+ // from Field_time::val_int
+ int j1 = (int)sint3korr(b1);
+ int j2 = (int)sint3korr(b2);
+ if (j1 < j2)
+ return -1;
+ if (j1 > j2)
+ return +1;
+ return 0;
}
// not yet
int
-NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- const unsigned lb = 2;
- // collation does not work on prefix for some charsets
- assert(full && n1 >= lb && n2 >= lb);
+ assert(info != 0);
+ const uint lb = 2;
const uchar* v1 = (const uchar*)p1;
const uchar* v2 = (const uchar*)p2;
- unsigned m1 = uint2korr(v1);
- unsigned m2 = uint2korr(v2);
- if (m1 <= n1 - lb && m2 <= n2 - lb) {
- CHARSET_INFO* cs = (CHARSET_INFO*)(info);
- // compare with space padding
- int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
- return k < 0 ? -1 : k > 0 ? +1 : 0;
- }
- // treat bad data as NULL
- if (m1 > n1 - lb && m2 <= n2 - lb)
- return -1;
- if (m1 <= n1 - lb && m2 > n2 - lb)
- return +1;
- return 0;
+ uint m1 = v1[0] | (v1[1] << 8);
+ uint m2 = v2[0] | (v2[1] << 8);
+ require(lb + m1 <= n1 && lb + m2 <= n2);
+ CHARSET_INFO* cs = (CHARSET_INFO*)info;
+ // compare with space padding
+ int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
+ return k;
}
int
-NdbSqlUtil::cmpLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- const unsigned lb = 2;
- if (n2 >= lb) {
- assert(n1 >= lb);
- const uchar* v1 = (const uchar*)p1;
- const uchar* v2 = (const uchar*)p2;
- unsigned m1 = uint2korr(v1);
- unsigned m2 = uint2korr(v2);
- if (m1 <= n1 - lb && m2 <= n2 - lb) {
- // compare as binary strings
- unsigned m = (m1 <= m2 ? m1 : m2);
- int k = memcmp(v1 + lb, v2 + lb, m);
- if (k == 0) {
- k = (full ? m1 : m) - m2;
- }
- return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
- }
- // treat bad data as NULL
- if (m1 > n1 - lb && m2 <= n2 - lb)
- return -1;
- if (m1 <= n1 - lb && m2 > n2 - lb)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0);
+ const uint lb = 2;
+ const uchar* v1 = (const uchar*)p1;
+ const uchar* v2 = (const uchar*)p2;
+ uint m1 = v1[0] | (v1[1] << 8);
+ uint m2 = v2[0] | (v2[1] << 8);
+ require(lb + m1 <= n1 && lb + m2 <= n2);
+ int k = cmpBinary(info, v1 + lb, m1, v2 + lb, m2);
+ return k;
}
int
-NdbSqlUtil::cmpYear(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpYear(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Uint8)) {
- Uint8 v1, v2;
- memcpy(&v1, p1, sizeof(Uint8));
- memcpy(&v2, p2, sizeof(Uint8));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 1 && n2 == 1);
+ Uint8 v1, v2;
+ memcpy(&v1, p1, 1);
+ memcpy(&v2, p2, 1);
+ int w1 = (int)v1;
+ int w2 = (int)v2;
+ return w1 - w2;
}
int
-NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
{
- if (n2 >= sizeof(Uint32)) {
- Uint32 v1, v2;
- memcpy(&v1, p1, sizeof(Uint32));
- memcpy(&v2, p2, sizeof(Uint32));
- if (v1 < v2)
- return -1;
- if (v1 > v2)
- return +1;
- return 0;
- }
- assert(! full);
- return CmpUnknown;
+ assert(info == 0 && n1 == 4 && n2 == 4);
+ Uint32 v1, v2;
+ memcpy(&v1, p1, 4);
+ memcpy(&v2, p2, 4);
+ if (v1 < v2)
+ return -1;
+ if (v1 > v2)
+ return +1;
+ return 0;
}
// like
=== modified file 'storage/ndb/src/kernel/blocks/ERROR_codes.txt'
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt 2011-05-19 07:32:39 +0000
@@ -29,6 +29,7 @@ Next DBTUX 12010
Next SUMA 13047
Next LGMAN 15001
Next TSMAN 16001
+Next DBSPJ 17000
TESTING NODE FAILURE, ARBITRATION
---------------------------------
=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2011-05-02 13:36:19 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2011-05-19 07:32:39 +0000
@@ -1431,6 +1431,11 @@ void Cmvmi::execTAMPER_ORD(Signal* signa
jam();
tuserblockref = TSMAN_REF;
}
+ else if (errNo < 18000)
+ {
+ jam();
+ tuserblockref = DBSPJ_REF;
+ }
else if (errNo < 30000)
{
/*--------------------------------------------------------------------*/
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2011-05-17 11:41:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2011-05-18 09:07:07 +0000
@@ -1268,7 +1268,6 @@ private:
// Variables to support record structures and their free lists
- ApiConnectRecord *apiConnectRecord;
Uint32 capiConnectFileSize;
ConnectRecord *connectRecord;
@@ -1313,9 +1312,25 @@ private:
2.4 C O M M O N S T O R E D V A R I A B L E S
----------------------------------------------------
*/
- Uint32 cfirstVerifyQueue;
- Uint32 clastVerifyQueue;
- Uint32 cverifyQueueCounter;
+ struct DIVERIFY_queue
+ {
+ DIVERIFY_queue() {
+ cfirstVerifyQueue = clastVerifyQueue = RNIL;
+ cverifyQueueCounter = 0;
+ apiConnectRecord = 0;
+ }
+ Uint32 cfirstVerifyQueue;
+ Uint32 clastVerifyQueue;
+ Uint32 cverifyQueueCounter;
+ ApiConnectRecord *apiConnectRecord;
+ };
+
+ bool isEmpty(const DIVERIFY_queue&);
+ void enqueue(DIVERIFY_queue&, Ptr<ApiConnectRecord>);
+ void dequeue(DIVERIFY_queue&, Ptr<ApiConnectRecord> &);
+
+ DIVERIFY_queue c_diverify_queue[1];
+ Uint32 c_diverify_queue_cnt;
/*------------------------------------------------------------------------*/
/* THIS VARIABLE KEEPS THE REFERENCES TO FILE RECORDS THAT DESCRIBE */
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp 2011-02-15 11:41:27 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp 2011-05-18 09:07:07 +0000
@@ -73,13 +73,16 @@ void Dbdih::initData()
c_2pass_inr = false;
}//Dbdih::initData()
-void Dbdih::initRecords()
+void Dbdih::initRecords()
{
// Records with dynamic sizes
- apiConnectRecord = (ApiConnectRecord*)
- allocRecord("ApiConnectRecord",
- sizeof(ApiConnectRecord),
- capiConnectFileSize);
+ for (Uint32 i = 0; i < c_diverify_queue_cnt; i++)
+ {
+ c_diverify_queue[i].apiConnectRecord = (ApiConnectRecord*)
+ allocRecord("ApiConnectRecord",
+ sizeof(ApiConnectRecord),
+ capiConnectFileSize);
+ }
connectRecord = (ConnectRecord*)allocRecord("ConnectRecord",
sizeof(ConnectRecord),
@@ -306,7 +309,6 @@ Dbdih::Dbdih(Block_context& ctx):
&Dbdih::execDIH_GET_TABINFO_CONF);
#endif
- apiConnectRecord = 0;
connectRecord = 0;
fileRecord = 0;
fragmentstore = 0;
@@ -319,15 +321,20 @@ Dbdih::Dbdih(Block_context& ctx):
c_nextNodeGroup = 0;
c_fragments_per_node = 1;
bzero(c_node_groups, sizeof(c_node_groups));
+ c_diverify_queue_cnt = 1;
}//Dbdih::Dbdih()
-Dbdih::~Dbdih()
+Dbdih::~Dbdih()
{
- deallocRecord((void **)&apiConnectRecord, "ApiConnectRecord",
- sizeof(ApiConnectRecord),
- capiConnectFileSize);
-
+ for (Uint32 i = 0; i<c_diverify_queue_cnt; i++)
+ {
+ deallocRecord((void **)&c_diverify_queue[i].apiConnectRecord,
+ "ApiConnectRecord",
+ sizeof(ApiConnectRecord),
+ capiConnectFileSize);
+ }
+
deallocRecord((void **)&connectRecord, "ConnectRecord",
sizeof(ConnectRecord),
cconnectFileSize);
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-05-17 12:14:32 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2011-05-23 14:13:35 +0000
@@ -9004,12 +9004,13 @@ void Dbdih::execDIGETNODESREQ(Signal* si
Uint32 fragId, newFragId = RNIL;
DiGetNodesConf * const conf = (DiGetNodesConf *)&signal->theData[0];
TabRecord* regTabDesc = tabRecord;
- jamEntry();
+ EmulatedJamBuffer * jambuf = jamBuffer();
+ thrjamEntry(jambuf);
ptrCheckGuard(tabPtr, ttabFileSize, regTabDesc);
if (DictTabInfo::isOrderedIndex(tabPtr.p->tableType))
{
- jam();
+ thrjam(jambuf);
tabPtr.i = tabPtr.p->primaryTableId;
ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
}
@@ -9027,7 +9028,7 @@ void Dbdih::execDIGETNODESREQ(Signal* si
if (unlikely(fragId >= tabPtr.p->totalfragments))
{
- jam();
+ thrjam(jambuf);
conf->zero= 1; //Indicate error;
signal->theData[1]= ZUNDEFINED_FRAGMENT_ERROR;
return;
@@ -9035,40 +9036,40 @@ void Dbdih::execDIGETNODESREQ(Signal* si
}
else if (tabPtr.p->method == TabRecord::HASH_MAP)
{
- jam();
+ thrjam(jambuf);
Ptr<Hash2FragmentMap> ptr;
g_hash_map.getPtr(ptr, map_ptr_i);
fragId = ptr.p->m_map[hashValue % ptr.p->m_cnt];
if (unlikely(new_map_ptr_i != RNIL))
{
- jam();
+ thrjam(jambuf);
g_hash_map.getPtr(ptr, new_map_ptr_i);
newFragId = ptr.p->m_map[hashValue % ptr.p->m_cnt];
if (newFragId == fragId)
{
- jam();
+ thrjam(jambuf);
newFragId = RNIL;
}
}
}
else if (tabPtr.p->method == TabRecord::LINEAR_HASH)
{
- jam();
+ thrjam(jambuf);
fragId = hashValue & tabPtr.p->mask;
if (fragId < tabPtr.p->hashpointer) {
- jam();
+ thrjam(jambuf);
fragId = hashValue & ((tabPtr.p->mask << 1) + 1);
}//if
}
else if (tabPtr.p->method == TabRecord::NORMAL_HASH)
{
- jam();
+ thrjam(jambuf);
fragId= hashValue % tabPtr.p->totalfragments;
}
else
{
- jam();
+ thrjam(jambuf);
ndbassert(tabPtr.p->method == TabRecord::USER_DEFINED);
/* User defined partitioning, but no distribution key passed */
@@ -9087,7 +9088,7 @@ void Dbdih::execDIGETNODESREQ(Signal* si
if (unlikely(newFragId != RNIL))
{
- jam();
+ thrjam(jambuf);
conf->reqinfo |= DiGetNodesConf::REORG_MOVING;
getFragstore(tabPtr.p, newFragId, fragPtr);
nodeCount = extractNodeInfo(fragPtr.p, conf->nodes + 2 + MAX_REPLICAS);
@@ -9201,6 +9202,64 @@ void Dbdih::initialiseFragstore()
}//for
}//Dbdih::initialiseFragstore()
+inline
+bool
+Dbdih::isEmpty(const DIVERIFY_queue & q)
+{
+ return q.cverifyQueueCounter == 0;
+}
+
+inline
+void
+Dbdih::enqueue(DIVERIFY_queue & q, Ptr<ApiConnectRecord> conRecord)
+{
+ Uint32 first = q.cfirstVerifyQueue;
+ Uint32 last = q.clastVerifyQueue;
+ Uint32 count = q.cverifyQueueCounter;
+ ApiConnectRecord * apiConnectRecord = q.apiConnectRecord;
+
+ Ptr<ApiConnectRecord> tmp;
+ tmp.i = last;
+ if (last != RNIL)
+ {
+ tmp.i = last;
+ ptrCheckGuard(tmp, capiConnectFileSize, apiConnectRecord);
+ tmp.p->nextApi = conRecord.i;
+ }
+ else
+ {
+ ndbassert(count == 0);
+ first = conRecord.i;
+ }
+ q.cfirstVerifyQueue = first;
+ q.clastVerifyQueue = conRecord.i;
+ q.cverifyQueueCounter = count + 1;
+}
+
+inline
+void
+Dbdih::dequeue(DIVERIFY_queue & q, Ptr<ApiConnectRecord> & conRecord)
+{
+ Uint32 first = q.cfirstVerifyQueue;
+ Uint32 last = q.clastVerifyQueue;
+ Uint32 count = q.cverifyQueueCounter;
+ ApiConnectRecord * apiConnectRecord = q.apiConnectRecord;
+
+ conRecord.i = first;
+ ptrCheckGuard(conRecord, capiConnectFileSize, apiConnectRecord);
+ Uint32 next = conRecord.p->nextApi;
+ if (first == last)
+ {
+ ndbrequire(next == RNIL);
+ ndbassert(count == 1);
+ last = RNIL;
+ }
+ ndbrequire(count > 0);
+ q.cfirstVerifyQueue = next;
+ q.clastVerifyQueue = last;
+ q.cverifyQueueCounter = count - 1;
+}
+
/*
3.9 V E R I F I C A T I O N
****************************=
@@ -9212,13 +9271,14 @@ void Dbdih::initialiseFragstore()
3.9.1 R E C E I V I N G O F V E R I F I C A T I O N R E Q U E S T
*************************************************************************
*/
-void Dbdih::execDIVERIFYREQ(Signal* signal)
+void Dbdih::execDIVERIFYREQ(Signal* signal)
{
-
- jamEntry();
+ EmulatedJamBuffer * jambuf = jamBuffer();
+ thrjamEntry(jambuf);
if ((getBlockCommit() == false) &&
- (cfirstVerifyQueue == RNIL)) {
- jam();
+ isEmpty(c_diverify_queue[0]))
+ {
+ thrjam(jambuf);
/*-----------------------------------------------------------------------*/
// We are not blocked and the verify queue was empty currently so we can
// simply reply back to TC immediately. The method was called with
@@ -9235,24 +9295,15 @@ void Dbdih::execDIVERIFYREQ(Signal* sign
// Since we are blocked we need to put this operation last in the verify
// queue to ensure that operation starts up in the correct order.
/*-------------------------------------------------------------------------*/
- ApiConnectRecordPtr tmpApiConnectptr;
ApiConnectRecordPtr localApiConnectptr;
+ DIVERIFY_queue & q = c_diverify_queue[0];
- cverifyQueueCounter++;
localApiConnectptr.i = signal->theData[0];
- tmpApiConnectptr.i = clastVerifyQueue;
- ptrCheckGuard(localApiConnectptr, capiConnectFileSize, apiConnectRecord);
+ ptrCheckGuard(localApiConnectptr, capiConnectFileSize, q.apiConnectRecord);
localApiConnectptr.p->apiGci = m_micro_gcp.m_new_gci;
localApiConnectptr.p->nextApi = RNIL;
- clastVerifyQueue = localApiConnectptr.i;
- if (tmpApiConnectptr.i == RNIL) {
- jam();
- cfirstVerifyQueue = localApiConnectptr.i;
- } else {
- jam();
- ptrCheckGuard(tmpApiConnectptr, capiConnectFileSize, apiConnectRecord);
- tmpApiConnectptr.p->nextApi = localApiConnectptr.i;
- }//if
+
+ enqueue(q, localApiConnectptr);
emptyverificbuffer(signal, false);
signal->theData[3] = 1; // Indicate no immediate return
return;
@@ -9440,7 +9491,7 @@ Dbdih::execUPGRADE_PROTOCOL_ORD(Signal*
void
Dbdih::startGcpLab(Signal* signal, Uint32 aWaitTime)
{
- if (cfirstVerifyQueue != RNIL)
+ if (! isEmpty(c_diverify_queue[0]))
{
// Previous global checkpoint is not yet completed.
jam();
@@ -14672,26 +14723,18 @@ void Dbdih::createFileRw(Signal* signal,
sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
}//Dbdih::createFileRw()
-void Dbdih::emptyverificbuffer(Signal* signal, bool aContinueB)
+void Dbdih::emptyverificbuffer(Signal* signal, bool aContinueB)
{
- if(cfirstVerifyQueue == RNIL){
+ if (isEmpty(c_diverify_queue[0]))
+ {
jam();
return;
}//if
ApiConnectRecordPtr localApiConnectptr;
if(getBlockCommit() == false){
jam();
- ndbrequire(cverifyQueueCounter > 0);
- cverifyQueueCounter--;
- localApiConnectptr.i = cfirstVerifyQueue;
- ptrCheckGuard(localApiConnectptr, capiConnectFileSize, apiConnectRecord);
+ dequeue(c_diverify_queue[0], localApiConnectptr);
ndbrequire(localApiConnectptr.p->apiGci <= m_micro_gcp.m_current_gci);
- cfirstVerifyQueue = localApiConnectptr.p->nextApi;
- if (cfirstVerifyQueue == RNIL) {
- jam();
- ndbrequire(cverifyQueueCounter == 0);
- clastVerifyQueue = RNIL;
- }//if
signal->theData[0] = localApiConnectptr.i;
signal->theData[1] = (Uint32)(m_micro_gcp.m_current_gci >> 32);
signal->theData[2] = (Uint32)(m_micro_gcp.m_current_gci & 0xFFFFFFFF);
@@ -15065,11 +15108,9 @@ void Dbdih::initCommonData()
cfailurenr = 1;
cfirstAliveNode = RNIL;
cfirstDeadNode = RNIL;
- cfirstVerifyQueue = RNIL;
cgckptflag = false;
cgcpOrderBlocked = 0;
- clastVerifyQueue = RNIL;
c_lcpMasterTakeOverState.set(LMTOS_IDLE, __LINE__);
c_lcpState.clcpDelay = 0;
@@ -15103,7 +15144,6 @@ void Dbdih::initCommonData()
cstarttype = (Uint32)-1;
csystemnodes = 0;
c_newest_restorable_gci = 0;
- cverifyQueueCounter = 0;
cwaitLcpSr = false;
c_nodeStartMaster.blockGcp = 0;
@@ -15416,12 +15456,17 @@ void Dbdih::initialiseRecordsLab(Signal*
case 1:{
ApiConnectRecordPtr apiConnectptr;
jam();
- /******** INTIALIZING API CONNECT RECORDS ********/
- for (apiConnectptr.i = 0; apiConnectptr.i < capiConnectFileSize; apiConnectptr.i++) {
- refresh_watch_dog();
- ptrAss(apiConnectptr, apiConnectRecord);
- apiConnectptr.p->nextApi = RNIL;
- }//for
+ for (Uint32 i = 0; i < c_diverify_queue_cnt; i++)
+ {
+ /******** INTIALIZING API CONNECT RECORDS ********/
+ for (apiConnectptr.i = 0;
+ apiConnectptr.i < capiConnectFileSize; apiConnectptr.i++)
+ {
+ refresh_watch_dog();
+ ptrAss(apiConnectptr, c_diverify_queue[i].apiConnectRecord);
+ apiConnectptr.p->nextApi = RNIL;
+ }//for
+ }
jam();
break;
}
@@ -17208,11 +17253,16 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
if (arg == DumpStateOrd::DihDumpNodeRestartInfo) {
infoEvent("c_nodeStartMaster.blockLcp = %d, c_nodeStartMaster.blockGcp = %d, c_nodeStartMaster.wait = %d",
c_nodeStartMaster.blockLcp, c_nodeStartMaster.blockGcp, c_nodeStartMaster.wait);
- infoEvent("cfirstVerifyQueue = %d, cverifyQueueCounter = %d",
- cfirstVerifyQueue, cverifyQueueCounter);
+ for (Uint32 i = 0; i < c_diverify_queue_cnt; i++)
+ {
+ infoEvent("[ %u : cfirstVerifyQueue = 0x%.8x, cverifyQueueCounter = %u ]",
+ i,
+ c_diverify_queue[i].cfirstVerifyQueue,
+ c_diverify_queue[i].cverifyQueueCounter);
+ }
infoEvent("cgcpOrderBlocked = %d",
cgcpOrderBlocked);
- }//if
+ }//if
if (arg == DumpStateOrd::DihDumpNodeStatusInfo) {
NodeRecordPtr localNodePtr;
infoEvent("Printing nodeStatus of all nodes");
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2011-05-20 05:11:01 +0000
@@ -1740,11 +1740,12 @@ public:
* specified by location of original tuple and version number. Input
* is attribute ids in AttributeHeader format. Output is attribute
* data with headers. Uses readAttributes with xfrm option set.
+ * After wl4163, xfrm is not set.
* Returns number of words or negative (-terrorCode) on error.
*/
int tuxReadAttrs(EmulatedJamBuffer*,
Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion,
- const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut);
+ const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut, bool xfrmFlag);
/*
* TUX reads primary key without headers into an array of words. Used
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2011-05-20 05:11:01 +0000
@@ -2914,7 +2914,7 @@ int Dbtup::interpreterNextLab(Signal* si
{
return TUPKEY_abort(req_struct, 40);
}
- res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
+ res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen);
}
} else {
if ((cond == Interpreter::LIKE) ||
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp 2011-04-19 09:01:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp 2011-05-04 11:58:38 +0000
@@ -131,7 +131,8 @@ Dbtup::tuxReadAttrs(EmulatedJamBuffer *
Uint32 tupVersion,
const Uint32* attrIds,
Uint32 numAttrs,
- Uint32* dataOut)
+ Uint32* dataOut,
+ bool xfrmFlag)
{
thrjamEntry(jamBuf);
// use own variables instead of globals
@@ -185,7 +186,7 @@ Dbtup::tuxReadAttrs(EmulatedJamBuffer *
numAttrs,
dataOut,
ZNIL,
- true);
+ xfrmFlag);
// done
return ret;
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2011-04-25 16:46:59 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp 2011-05-17 12:19:20 +0000
@@ -30,6 +30,9 @@
// big brother
#include <dbtup/Dbtup.hpp>
+// packed index keys and bounds
+#include <NdbPack.hpp>
+
// signal classes
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp>
@@ -118,44 +121,24 @@ private:
// sizes are in words (Uint32)
STATIC_CONST( MaxIndexFragments = MAX_FRAG_PER_NODE );
STATIC_CONST( MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX );
- /*
- * Allow space for per-attribute overhead (at least bound type and
- * attribute header) and key data xfrm-ed. execTUX_BOUND_INFO unpacks
- * all in same buffer so double the size. The xfrm should disappear
- * in 7.x wl#4163.
- */
- STATIC_CONST( MaxAttrDataSize =
- (
- 4 * MAX_ATTRIBUTES_IN_INDEX +
- MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY
- ) * 2
- );
-
+ STATIC_CONST( MaxAttrDataSize = 2 * MAX_ATTRIBUTES_IN_INDEX + MAX_KEY_SIZE_IN_WORDS );
+ STATIC_CONST( MaxXfrmDataSize = MaxAttrDataSize * MAX_XFRM_MULTIPLY);
public:
- STATIC_CONST( DescPageSize = 256 );
+ STATIC_CONST( DescPageSize = 512 );
private:
STATIC_CONST( MaxTreeNodeSize = MAX_TTREE_NODE_SIZE );
STATIC_CONST( MaxPrefSize = MAX_TTREE_PREF_SIZE );
STATIC_CONST( ScanBoundSegmentSize = 7 );
STATIC_CONST( MaxAccLockOps = MAX_PARALLEL_OP_PER_SCAN );
STATIC_CONST( MaxTreeDepth = 32 ); // strict
+#ifdef VM_TRACE
+ // for TuxCtx::c_debugBuffer
+ STATIC_CONST( DebugBufferBytes = (MaxAttrDataSize << 2) );
+#endif
BLOCK_DEFINES(Dbtux);
// forward declarations
struct TuxCtx;
- struct DescEnt;
-
- // Pointer to array of Uint32 represents attribute data and bounds
-
- typedef Uint32 *Data;
- inline AttributeHeader& ah(Data data) {
- return *reinterpret_cast<AttributeHeader*>(data);
- }
-
- typedef const Uint32* ConstData;
- inline const AttributeHeader& ah(ConstData data) {
- return *reinterpret_cast<const AttributeHeader*>(data);
- }
// AttributeHeader size is assumed to be 1 word
STATIC_CONST( AttributeHeaderSize = 1 );
@@ -257,7 +240,7 @@ private:
TupLoc m_root; // root node
TreeHead();
// methods
- Data getPref(TreeNode* node) const;
+ Uint32* getPref(TreeNode* node) const;
TreeEnt* getEntList(TreeNode* node) const;
};
@@ -281,7 +264,10 @@ private:
/*
* Descriptor page. The "hot" metadata for an index is stored as
- * a contiguous array of words on some page.
+ * contiguous array of words on some page. It has 3 parts:
+ * 1) DescHead
+ * 2) array of NdbPack::Type used by NdbPack::Spec of index key
+ * 3) array of attr headers for reading index key values from TUP
*/
struct DescPage;
friend struct DescPage;
@@ -298,48 +284,36 @@ private:
ArrayPool<DescPage> c_descPagePool;
Uint32 c_descPageList;
- /*
- * Header for index metadata. Size must be multiple of word size.
- */
struct DescHead {
- unsigned m_indexId : 24;
- unsigned pad1 : 8;
+ Uint32 m_indexId;
+ Uint16 m_numAttrs;
+ Uint16 m_magic;
+ enum { Magic = 0xDE5C };
};
STATIC_CONST( DescHeadSize = sizeof(DescHead) >> 2 );
- /*
- * Attribute metadata. Size must be multiple of word size.
- *
- * Prefix comparison of char data must use strxfrm and binary
- * comparison. The charset is currently unused.
- */
- struct DescAttr {
- Uint32 m_attrDesc; // standard AttributeDescriptor
- Uint16 m_primaryAttrId;
- unsigned m_typeId : 6;
- unsigned m_charset : 10;
- };
- STATIC_CONST( DescAttrSize = sizeof(DescAttr) >> 2 );
-
- /*
- * Complete metadata for one index. The array of attributes has
- * variable size.
- */
- friend struct DescEnt;
- struct DescEnt {
- DescHead m_descHead;
- DescAttr m_descAttr[1]; // variable size data
- };
+ typedef NdbPack::Type KeyType;
+ typedef NdbPack::Spec KeySpec;
+ STATIC_CONST( KeyTypeSize = sizeof(KeyType) >> 2 );
+
+ typedef NdbPack::DataC KeyDataC;
+ typedef NdbPack::Data KeyData;
+ typedef NdbPack::BoundC KeyBoundC;
+ typedef NdbPack::Bound KeyBound;
// range scan
-
+
/*
- * Scan bounds are stored in linked list of segments.
+ * ScanBound instances are members of ScanOp. Bound data is stored in
+ * a separate segmented buffer pool.
*/
- typedef DataBuffer<ScanBoundSegmentSize> ScanBound;
- typedef DataBuffer<ScanBoundSegmentSize>::ConstDataBufferIterator ScanBoundIterator;
- typedef DataBuffer<ScanBoundSegmentSize>::DataBufferPool ScanBoundPool;
- ScanBoundPool c_scanBoundPool;
+ struct ScanBound {
+ DataBuffer<ScanBoundSegmentSize>::Head m_head;
+ Uint16 m_cnt; // number of attributes
+ Int16 m_side;
+ ScanBound();
+ };
+ DataBuffer<ScanBoundSegmentSize>::DataBufferPool c_scanBoundPool;
// ScanLock
struct ScanLock {
@@ -408,10 +382,7 @@ private:
Uint8 m_readCommitted; // no locking
Uint8 m_lockMode;
Uint8 m_descending;
- ScanBound m_boundMin;
- ScanBound m_boundMax;
- ScanBound* m_bound[2]; // pointers to above 2
- Uint16 m_boundCnt[2]; // number of bounds in each
+ ScanBound m_scanBound[2];
TreePos m_scanPos; // position
TreeEnt m_scanEnt; // latest entry found
Uint32 m_nodeScan; // next scan at node (single-linked)
@@ -420,7 +391,7 @@ private:
Uint32 nextList;
};
Uint32 prevList;
- ScanOp(ScanBoundPool& scanBoundPool);
+ ScanOp();
};
typedef Ptr<ScanOp> ScanOpPtr;
ArrayPool<ScanOp> c_scanOpPool;
@@ -451,8 +422,11 @@ private:
Uint32 m_descPage; // descriptor page
Uint16 m_descOff; // offset within the page
Uint16 m_numAttrs;
- bool m_storeNullKey;
+ Uint16 m_prefAttrs; // attributes in min prefix
+ Uint16 m_prefBytes; // max bytes in min prefix
+ KeySpec m_keySpec;
union {
+ bool m_storeNullKey;
Uint32 nextPool;
};
Index();
@@ -473,10 +447,6 @@ private:
Uint32 m_indexId;
Uint16 unused;
Uint16 m_fragId;
- Uint32 m_descPage; // copy from index level
- Uint16 m_descOff;
- Uint16 m_numAttrs;
- bool m_storeNullKey;
TreeHead m_tree;
TupLoc m_freeLoc; // one free node for next op
DLList<ScanOp> m_scanList; // current scans on this fragment
@@ -543,7 +513,7 @@ private:
void setBalance(int b);
void setNodeScan(Uint32 scanPtrI);
// access other parts of the node
- Data getPref();
+ Uint32* getPref();
TreeEnt getEnt(unsigned pos);
// for ndbrequire and ndbassert
void progError(int line, int cause, const char* file);
@@ -560,11 +530,9 @@ private:
void execNODE_STATE_REP(Signal* signal);
// utils
- void setKeyAttrs(TuxCtx&, const Frag& frag);
- void readKeyAttrs(TuxCtx&, const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
- void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
- void copyAttrs(TuxCtx&, const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
- void unpackBound(const ScanBound& bound, Data data);
+ void readKeyAttrs(TuxCtx&, const Frag& frag, TreeEnt ent, KeyData& keyData, Uint32 count);
+ void readTablePk(const Frag& frag, TreeEnt ent, Uint32* pkData, unsigned& pkSize);
+ void unpackBound(TuxCtx&, const ScanBound& bound, KeyBoundC& searchBound);
void findFrag(const Index& index, Uint32 fragId, FragPtr& fragPtr);
/*
@@ -657,20 +625,20 @@ private:
/*
* DbtuxSearch.cpp
*/
- void findNodeToUpdate(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode);
- bool findPosToAdd(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
- bool findPosToRemove(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
- bool searchToAdd(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
- bool searchToRemove(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
- void findNodeToScan(Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode);
- void findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos);
- void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
+ void findNodeToUpdate(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode);
+ bool findPosToAdd(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
+ bool findPosToRemove(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
+ bool searchToAdd(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos);
+ bool searchToRemove(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos);
+ void findNodeToScan(Frag& frag, unsigned dir, const KeyBoundC& searchBound, NodeHandle& currNode);
+ void findPosToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode, Uint16* pos);
+ void searchToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, TreePos& treePos);
/*
* DbtuxCmp.cpp
*/
- int cmpSearchKey(TuxCtx&, const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
- int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
+ int cmpSearchKey(TuxCtx&, const KeyDataC& searchKey, const KeyDataC& entryKey, Uint32 cnt);
+ int cmpSearchBound(TuxCtx&, const KeyBoundC& searchBound, const KeyDataC& entryKey, Uint32 cnt);
/*
* DbtuxStat.cpp
@@ -702,7 +670,7 @@ private:
friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
friend class NdbOut& operator<<(NdbOut&, const TreeHead&);
friend class NdbOut& operator<<(NdbOut&, const TreePos&);
- friend class NdbOut& operator<<(NdbOut&, const DescAttr&);
+ friend class NdbOut& operator<<(NdbOut&, const KeyType&);
friend class NdbOut& operator<<(NdbOut&, const ScanOp&);
friend class NdbOut& operator<<(NdbOut&, const Index&);
friend class NdbOut& operator<<(NdbOut&, const Frag&);
@@ -738,30 +706,30 @@ private:
{
EmulatedJamBuffer * jamBuffer;
- // index key attr ids with sizes in AttributeHeader format
- Data c_keyAttrs;
-
- // pointers to index key comparison functions
- NdbSqlUtil::Cmp** c_sqlCmp;
+ // buffer for scan bound and search key data
+ Uint32* c_searchKey;
- /*
- * Other buffers used during the operation.
- */
+ // buffer for current entry key data
+ Uint32* c_entryKey;
- // buffer for search key data with headers
- Data c_searchKey;
+ // buffer for xfrm-ed PK and for temporary use
+ Uint32* c_dataBuffer;
- // buffer for current entry key data with headers
- Data c_entryKey;
+#ifdef VM_TRACE
+ char* c_debugBuffer;
+#endif
};
struct TuxCtx c_ctx; // Global Tux context, for everything build MT-index build
- // buffer for scan bounds and keyinfo (primary key)
- Data c_dataBuffer;
-
// inlined utils
- DescEnt& getDescEnt(Uint32 descPage, Uint32 descOff);
+ Uint32 getDescSize(const Index& index);
+ DescHead& getDescHead(const Index& index);
+ KeyType* getKeyTypes(DescHead& descHead);
+ const KeyType* getKeyTypes(const DescHead& descHead);
+ AttributeHeader* getKeyAttrs(DescHead& descHead);
+ const AttributeHeader* getKeyAttrs(const DescHead& descHead);
+ //
void getTupAddr(const Frag& frag, TreeEnt ent, Uint32& lkey1, Uint32& lkey2);
static unsigned min(unsigned x, unsigned y);
static unsigned max(unsigned x, unsigned y);
@@ -915,7 +883,7 @@ Dbtux::TreeHead::TreeHead() :
{
}
-inline Dbtux::Data
+inline Uint32*
Dbtux::TreeHead::getPref(TreeNode* node) const
{
Uint32* ptr = (Uint32*)node + NodeHeadSize;
@@ -955,10 +923,20 @@ Dbtux::DescPage::DescPage() :
}
}
+// Dbtux::ScanBound
+
+inline
+Dbtux::ScanBound::ScanBound() :
+ m_head(),
+ m_cnt(0),
+ m_side(0)
+{
+}
+
// Dbtux::ScanOp
inline
-Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
+Dbtux::ScanOp::ScanOp() :
m_state(Undef),
m_lockwait(false),
m_errorCode(0),
@@ -975,16 +953,11 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& sca
m_readCommitted(0),
m_lockMode(0),
m_descending(0),
- m_boundMin(scanBoundPool),
- m_boundMax(scanBoundPool),
+ m_scanBound(),
m_scanPos(),
m_scanEnt(),
m_nodeScan(RNIL)
{
- m_bound[0] = &m_boundMin;
- m_bound[1] = &m_boundMax;
- m_boundCnt[0] = 0;
- m_boundCnt[1] = 0;
}
// Dbtux::Index
@@ -998,6 +971,9 @@ Dbtux::Index::Index() :
m_descPage(RNIL),
m_descOff(0),
m_numAttrs(0),
+ m_prefAttrs(0),
+ m_prefBytes(0),
+ m_keySpec(),
m_storeNullKey(false)
{
for (unsigned i = 0; i < MaxIndexFragments; i++) {
@@ -1013,17 +989,13 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& sca
m_tableId(RNIL),
m_indexId(RNIL),
m_fragId(ZNIL),
- m_descPage(RNIL),
- m_descOff(0),
- m_numAttrs(ZNIL),
- m_storeNullKey(false),
m_tree(),
m_freeLoc(),
m_scanList(scanOpPool),
- m_tupIndexFragPtrI(RNIL)
+ m_tupIndexFragPtrI(RNIL),
+ m_tupTableFragPtrI(RNIL),
+ m_accTableFragPtrI(RNIL)
{
- m_tupTableFragPtrI = RNIL;
- m_accTableFragPtrI = RNIL;
}
// Dbtux::FragOp
@@ -1157,7 +1129,7 @@ Dbtux::NodeHandle::setNodeScan(Uint32 sc
m_node->m_nodeScan = scanPtrI;
}
-inline Dbtux::Data
+inline Uint32*
Dbtux::NodeHandle::getPref()
{
TreeHead& tree = m_frag.m_tree;
@@ -1193,15 +1165,60 @@ Dbtux::PrintPar::PrintPar() :
// utils
-inline Dbtux::DescEnt&
-Dbtux::getDescEnt(Uint32 descPage, Uint32 descOff)
+inline Uint32
+Dbtux::getDescSize(const Index& index)
+{
+ return
+ DescHeadSize +
+ index.m_numAttrs * KeyTypeSize +
+ index.m_numAttrs * AttributeHeaderSize;
+}
+
+inline Dbtux::DescHead&
+Dbtux::getDescHead(const Index& index)
{
DescPagePtr pagePtr;
- pagePtr.i = descPage;
+ pagePtr.i = index.m_descPage;
c_descPagePool.getPtr(pagePtr);
- ndbrequire(descOff < DescPageSize);
- DescEnt* descEnt = (DescEnt*)&pagePtr.p->m_data[descOff];
- return *descEnt;
+ ndbrequire(index.m_descOff < DescPageSize);
+ Uint32* ptr = &pagePtr.p->m_data[index.m_descOff];
+ DescHead* descHead = reinterpret_cast<DescHead*>(ptr);
+ ndbrequire(descHead->m_magic == DescHead::Magic);
+ return *descHead;
+}
+
+inline Dbtux::KeyType*
+Dbtux::getKeyTypes(DescHead& descHead)
+{
+ Uint32* ptr = reinterpret_cast<Uint32*>(&descHead);
+ ptr += DescHeadSize;
+ return reinterpret_cast<KeyType*>(ptr);
+}
+
+inline const Dbtux::KeyType*
+Dbtux::getKeyTypes(const DescHead& descHead)
+{
+ const Uint32* ptr = reinterpret_cast<const Uint32*>(&descHead);
+ ptr += DescHeadSize;
+ return reinterpret_cast<const KeyType*>(ptr);
+}
+
+inline AttributeHeader*
+Dbtux::getKeyAttrs(DescHead& descHead)
+{
+ Uint32* ptr = reinterpret_cast<Uint32*>(&descHead);
+ ptr += DescHeadSize;
+ ptr += descHead.m_numAttrs * KeyTypeSize;
+ return reinterpret_cast<AttributeHeader*>(ptr);
+}
+
+inline const AttributeHeader*
+Dbtux::getKeyAttrs(const DescHead& descHead)
+{
+ const Uint32* ptr = reinterpret_cast<const Uint32*>(&descHead);
+ ptr += DescHeadSize;
+ ptr += descHead.m_numAttrs * KeyTypeSize;
+ return reinterpret_cast<const AttributeHeader*>(ptr);
}
inline
@@ -1227,4 +1244,40 @@ Dbtux::max(unsigned x, unsigned y)
return x > y ? x : y;
}
+// DbtuxCmp.cpp
+
+inline int
+Dbtux::cmpSearchKey(TuxCtx& ctx, const KeyDataC& searchKey, const KeyDataC& entryKey, Uint32 cnt)
+{
+ // compare cnt attributes from each
+ Uint32 num_eq;
+ int ret = searchKey.cmp(entryKey, cnt, num_eq);
+#ifdef VM_TRACE
+ if (debugFlags & DebugMaint) {
+ debugOut << "cmpSearchKey: ret:" << ret;
+ debugOut << " search:" << searchKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+ debugOut << " entry:" << entryKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+ debugOut << endl;
+ }
+#endif
+ return ret;
+}
+
+inline int
+Dbtux::cmpSearchBound(TuxCtx& ctx, const KeyBoundC& searchBound, const KeyDataC& entryKey, Uint32 cnt)
+{
+ // compare cnt attributes from each
+ Uint32 num_eq;
+ int ret = searchBound.cmp(entryKey, cnt, num_eq);
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "cmpSearchBound: res:" << ret;
+ debugOut << " search:" << searchBound.print(ctx.c_debugBuffer, DebugBufferBytes);
+ debugOut << " entry:" << entryKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+ debugOut << endl;
+ }
+#endif
+ return ret;
+}
+
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp 2011-04-24 13:10:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp 2011-05-17 12:19:20 +0000
@@ -48,16 +48,16 @@ Dbtux::mt_buildIndexFragment_wrapper(voi
tux_ctx->jamBuffer = (EmulatedJamBuffer*)ptr;
tux_ctx->jamBuffer->theEmulatedJamIndex = 0;
ptr += (sizeof(EmulatedJamBuffer) + 3) / 4;
- tux_ctx->c_keyAttrs = ptr;
- ptr += MaxIndexAttributes;
- while (UintPtr(ptr) & 7)
- ptr++;
- tux_ctx->c_sqlCmp = (NdbSqlUtil::Cmp**)ptr;
- ptr += (sizeof(void*) * MaxIndexAttributes) / sizeof(Uint32);
tux_ctx->c_searchKey = ptr;
ptr += MaxAttrDataSize;
tux_ctx->c_entryKey = ptr;
ptr += MaxAttrDataSize;
+ tux_ctx->c_dataBuffer = ptr;
+ ptr += MaxAttrDataSize;
+#ifdef VM_TRACE
+ tux_ctx->c_debugBuffer = (char*)ptr;
+ ptr += (DebugBufferBytes + 3) / 4;
+#endif
if (!(UintPtr(ptr) - UintPtr(req->mem_buffer) <= req->buffer_size))
abort();
}
@@ -88,8 +88,6 @@ Dbtux::mt_buildIndexFragment(mt_BuildInd
Frag& frag = *fragPtr.p;
TuxCtx & ctx = * (TuxCtx*)req->tux_ctx_ptr;
- // set up index keys for this operation
- setKeyAttrs(ctx, frag);
Local_key pos;
Uint32 fragPtrI;
@@ -108,31 +106,19 @@ Dbtux::mt_buildIndexFragment(mt_BuildInd
ent.m_tupLoc = TupLoc(pos.m_page_no, pos.m_page_idx);
ent.m_tupVersion = pos.m_file_no; // used for version
- // read search key
- readKeyAttrs(ctx, frag, ent, 0, ctx.c_searchKey);
- if (! frag.m_storeNullKey)
- {
- // check if all keys are null
- const unsigned numAttrs = frag.m_numAttrs;
- bool allNull = true;
- for (unsigned i = 0; i < numAttrs; i++)
- {
- if (ctx.c_searchKey[i] != 0)
- {
- jam();
- allNull = false;
- break;
- }
- }
- if (allNull)
- {
- jam();
- continue;
- }
+ // set up and read search key
+ KeyData searchKey(indexPtr.p->m_keySpec, false, 0);
+ searchKey.set_buf(ctx.c_searchKey, MaxAttrDataSize << 2);
+ readKeyAttrs(ctx, frag, ent, searchKey, indexPtr.p->m_numAttrs);
+
+ if (unlikely(! indexPtr.p->m_storeNullKey) &&
+ searchKey.get_null_cnt() == indexPtr.p->m_numAttrs) {
+ jam();
+ continue;
}
TreePos treePos;
- bool ok = searchToAdd(ctx, frag, ctx.c_searchKey, ent, treePos);
+ bool ok = searchToAdd(ctx, frag, searchKey, ent, treePos);
ndbrequire(ok);
/*
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp 2011-05-04 12:29:26 +0000
@@ -18,163 +18,3 @@
#define DBTUX_CMP_CPP
#include "Dbtux.hpp"
-
-/*
- * Search key vs node prefix or entry.
- *
- * The comparison starts at given attribute position. The position is
- * updated by number of equal initial attributes found. The entry data
- * may be partial in which case CmpUnknown may be returned.
- *
- * The attributes are normalized and have variable size given in words.
- */
-int
-Dbtux::cmpSearchKey(TuxCtx& ctx,
- const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen)
-{
- const unsigned numAttrs = frag.m_numAttrs;
- const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
- // skip to right position in search key only
- for (unsigned i = 0; i < start; i++) {
- thrjam(ctx.jamBuffer);
- searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
- }
- // number of words of entry data left
- unsigned len2 = maxlen;
- int ret = 0;
- while (start < numAttrs) {
- if (len2 <= AttributeHeaderSize) {
- thrjam(ctx.jamBuffer);
- ret = NdbSqlUtil::CmpUnknown;
- break;
- }
- len2 -= AttributeHeaderSize;
- if (! ah(searchKey).isNULL()) {
- if (! ah(entryData).isNULL()) {
- thrjam(ctx.jamBuffer);
- // verify attribute id
- const DescAttr& descAttr = descEnt.m_descAttr[start];
- ndbrequire(ah(searchKey).getAttributeId() == descAttr.m_primaryAttrId);
- ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
- // sizes
- const unsigned bytes1 = ah(searchKey).getByteSize();
- const unsigned bytes2 = min(ah(entryData).getByteSize(), len2 << 2);
- const unsigned size2 = min(ah(entryData).getDataSize(), len2);
- len2 -= size2;
- // compare
- NdbSqlUtil::Cmp* const cmp = ctx.c_sqlCmp[start];
- const Uint32* const p1 = &searchKey[AttributeHeaderSize];
- const Uint32* const p2 = &entryData[AttributeHeaderSize];
- const bool full = (maxlen == MaxAttrDataSize);
- ret = (*cmp)(0, p1, bytes1, p2, bytes2, full);
- if (ret != 0) {
- thrjam(ctx.jamBuffer);
- break;
- }
- } else {
- thrjam(ctx.jamBuffer);
- // not NULL > NULL
- ret = +1;
- break;
- }
- } else {
- if (! ah(entryData).isNULL()) {
- thrjam(ctx.jamBuffer);
- // NULL < not NULL
- ret = -1;
- break;
- }
- }
- searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
- entryData += AttributeHeaderSize + ah(entryData).getDataSize();
- start++;
- }
- return ret;
-}
-
-/*
- * Scan bound vs node prefix or entry.
- *
- * Compare lower or upper bound and index entry data. The entry data
- * may be partial in which case CmpUnknown may be returned. Otherwise
- * returns -1 if the bound is to the left of the entry and +1 if the
- * bound is to the right of the entry.
- *
- * The routine is similar to cmpSearchKey, but 0 is never returned.
- * Suppose all attributes compare equal. Recall that all bounds except
- * possibly the last one are non-strict. Use the given bound direction
- * (0-lower 1-upper) and strictness of last bound to return -1 or +1.
- *
- * Following example illustrates this. We are at (a=2, b=3).
- *
- * idir bounds strict return
- * 0 a >= 2 and b >= 3 no -1
- * 0 a >= 2 and b > 3 yes +1
- * 1 a <= 2 and b <= 3 no +1
- * 1 a <= 2 and b < 3 yes -1
- *
- * The attributes are normalized and have variable size given in words.
- */
-int
-Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
-{
- const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
- // direction 0-lower 1-upper
- ndbrequire(idir <= 1);
- // number of words of data left
- unsigned len2 = maxlen;
- // in case of no bounds, init last type to something non-strict
- unsigned type = 4;
- while (boundCount != 0) {
- if (len2 <= AttributeHeaderSize) {
- jam();
- return NdbSqlUtil::CmpUnknown;
- }
- len2 -= AttributeHeaderSize;
- // get and skip bound type (it is used after the loop)
- type = boundInfo[0];
- boundInfo += 1;
- if (! ah(boundInfo).isNULL()) {
- if (! ah(entryData).isNULL()) {
- jam();
- // verify attribute id
- const Uint32 index = ah(boundInfo).getAttributeId();
- ndbrequire(index < frag.m_numAttrs);
- const DescAttr& descAttr = descEnt.m_descAttr[index];
- ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
- // sizes
- const unsigned bytes1 = ah(boundInfo).getByteSize();
- const unsigned bytes2 = min(ah(entryData).getByteSize(), len2 << 2);
- const unsigned size2 = min(ah(entryData).getDataSize(), len2);
- len2 -= size2;
- // compare
- NdbSqlUtil::Cmp* const cmp = c_ctx.c_sqlCmp[index];
- const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
- const Uint32* const p2 = &entryData[AttributeHeaderSize];
- const bool full = (maxlen == MaxAttrDataSize);
- int ret = (*cmp)(0, p1, bytes1, p2, bytes2, full);
- if (ret != 0) {
- jam();
- return ret;
- }
- } else {
- jam();
- // not NULL > NULL
- return +1;
- }
- } else {
- jam();
- if (! ah(entryData).isNULL()) {
- jam();
- // NULL < not NULL
- return -1;
- }
- }
- boundInfo += AttributeHeaderSize + ah(boundInfo).getDataSize();
- entryData += AttributeHeaderSize + ah(entryData).getDataSize();
- boundCount -= 1;
- }
- // all attributes were equal
- const int strict = (type & 0x1);
- return (idir == 0 ? (strict == 0 ? -1 : +1) : (strict == 0 ? +1 : -1));
-}
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp 2011-04-24 16:20:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp 2011-05-17 12:19:20 +0000
@@ -211,6 +211,7 @@ Dbtux::printNode(TuxCtx & ctx,
par.m_depth = 0;
return;
}
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
TreeHead& tree = frag.m_tree;
NodeHandle node(frag);
selectNode(node, loc);
@@ -283,33 +284,34 @@ Dbtux::printNode(TuxCtx & ctx,
}
#endif
// check inline prefix
- { ConstData data1 = node.getPref();
+ {
+ KeyDataC keyData1(index.m_keySpec, false);
+ const Uint32* data1 = node.getPref();
+ keyData1.set_buf(data1, index.m_prefBytes, index.m_prefAttrs);
+ KeyData keyData2(index.m_keySpec, false, 0);
Uint32 data2[MaxPrefSize];
- memset(data2, DataFillByte, MaxPrefSize << 2);
- readKeyAttrs(ctx, frag, node.getEnt(0), 0, ctx.c_searchKey);
- copyAttrs(ctx, frag, ctx.c_searchKey, data2, tree.m_prefSize);
- for (unsigned n = 0; n < tree.m_prefSize; n++) {
- if (data1[n] != data2[n]) {
- par.m_ok = false;
- out << par.m_path << sep;
- out << "inline prefix mismatch word " << n;
- out << " value " << hex << data1[n];
- out << " should be " << hex << data2[n] << endl;
- break;
- }
+ keyData2.set_buf(data2, MaxPrefSize << 2);
+ readKeyAttrs(ctx, frag, node.getEnt(0), keyData2, index.m_prefAttrs);
+ if (cmpSearchKey(ctx, keyData1, keyData2, index.m_prefAttrs) != 0) {
+ par.m_ok = false;
+ out << par.m_path << sep;
+ out << "inline prefix mismatch" << endl;
}
}
// check ordering within node
for (unsigned j = 1; j < node.getOccup(); j++) {
const TreeEnt ent1 = node.getEnt(j - 1);
const TreeEnt ent2 = node.getEnt(j);
- unsigned start = 0;
- readKeyAttrs(ctx, frag, ent1, start, ctx.c_searchKey);
- readKeyAttrs(ctx, frag, ent2, start, ctx.c_entryKey);
- int ret = cmpSearchKey(ctx, frag, start, ctx.c_searchKey, ctx.c_entryKey);
+ KeyData entryKey1(index.m_keySpec, false, 0);
+ KeyData entryKey2(index.m_keySpec, false, 0);
+ entryKey1.set_buf(ctx.c_searchKey, MaxAttrDataSize << 2);
+ entryKey2.set_buf(ctx.c_entryKey, MaxAttrDataSize << 2);
+ readKeyAttrs(ctx, frag, ent1, entryKey1, index.m_numAttrs);
+ readKeyAttrs(ctx, frag, ent2, entryKey2, index.m_numAttrs);
+ int ret = cmpSearchKey(ctx, entryKey1, entryKey2, index.m_numAttrs);
if (ret == 0)
ret = ent1.cmp(ent2);
- if (ret != -1) {
+ if (! (ret < 0)) {
par.m_ok = false;
out << par.m_path << sep;
out << " disorder within node at pos " << j << endl;
@@ -322,13 +324,17 @@ Dbtux::printNode(TuxCtx & ctx,
const TreeEnt ent1 = cpar[i].m_minmax[1 - i];
const unsigned pos = (i == 0 ? 0 : node.getOccup() - 1);
const TreeEnt ent2 = node.getEnt(pos);
- unsigned start = 0;
- readKeyAttrs(ctx, frag, ent1, start, ctx.c_searchKey);
- readKeyAttrs(ctx, frag, ent2, start, ctx.c_entryKey);
- int ret = cmpSearchKey(ctx, frag, start, ctx.c_searchKey, ctx.c_entryKey);
+ KeyData entryKey1(index.m_keySpec, false, 0);
+ KeyData entryKey2(index.m_keySpec, false, 0);
+ entryKey1.set_buf(ctx.c_searchKey, MaxAttrDataSize << 2);
+ entryKey2.set_buf(ctx.c_entryKey, MaxAttrDataSize << 2);
+ readKeyAttrs(ctx, frag, ent1, entryKey1, index.m_numAttrs);
+ readKeyAttrs(ctx, frag, ent2, entryKey2, index.m_numAttrs);
+ int ret = cmpSearchKey(ctx, entryKey1, entryKey2, index.m_numAttrs);
if (ret == 0)
ret = ent1.cmp(ent2);
- if (ret != (i == 0 ? -1 : +1)) {
+ if (i == 0 && ! (ret < 0) ||
+ i == 1 && ! (ret > 0)) {
par.m_ok = false;
out << par.m_path << sep;
out << " disorder wrt subtree " << i << endl;
@@ -406,17 +412,6 @@ operator<<(NdbOut& out, const Dbtux::Tre
}
NdbOut&
-operator<<(NdbOut& out, const Dbtux::DescAttr& descAttr)
-{
- out << "[DescAttr " << hex << &descAttr;
- out << " [attrDesc " << hex << descAttr.m_attrDesc;
- out << " [primaryAttrId " << dec << descAttr.m_primaryAttrId << "]";
- out << " [typeId " << dec << descAttr.m_typeId << "]";
- out << "]";
- return out;
-}
-
-NdbOut&
operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
{
Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX);
@@ -447,17 +442,15 @@ operator<<(NdbOut& out, const Dbtux::Sca
out << " [pos " << scan.m_scanPos << "]";
out << " [ent " << scan.m_scanEnt << "]";
for (unsigned i = 0; i <= 1; i++) {
- out << " [bound " << dec << i;
- Dbtux::ScanBound& bound = *scan.m_bound[i];
- Dbtux::ScanBoundIterator iter;
- bound.first(iter);
- for (unsigned j = 0; j < bound.getSize(); j++) {
- out << " " << hex << *iter.data;
- bound.next(iter);
- }
+ const Dbtux::ScanBound scanBound = scan.m_scanBound[i];
+ const Dbtux::Index& index = *tux->c_indexPool.getPtr(scan.m_indexId);
+ Dbtux::KeyDataC keyBoundData(index.m_keySpec, true);
+ Dbtux::KeyBoundC keyBound(keyBoundData);
+ tux->unpackBound(tux->c_ctx, scanBound, keyBound);
+ out << " [scanBound " << dec << i;
+ out << " " << keyBound.print(tux->c_ctx.c_debugBuffer, Dbtux::DebugBufferBytes);
out << "]";
}
- out << "]";
return out;
}
@@ -477,6 +470,8 @@ operator<<(NdbOut& out, const Dbtux::Ind
out << " [descPage " << hex << index.m_descPage << "]";
out << " [descOff " << dec << index.m_descOff << "]";
out << " [numAttrs " << dec << index.m_numAttrs << "]";
+ out << " [prefAttrs " << dec << index.m_prefAttrs << "]";
+ out << " [prefBytes " << dec << index.m_prefBytes << "]";
out << "]";
return out;
}
@@ -488,9 +483,6 @@ operator<<(NdbOut& out, const Dbtux::Fra
out << " [tableId " << dec << frag.m_tableId << "]";
out << " [indexId " << dec << frag.m_indexId << "]";
out << " [fragId " << dec << frag.m_fragId << "]";
- out << " [descPage " << hex << frag.m_descPage << "]";
- out << " [descOff " << dec << frag.m_descOff << "]";
- out << " [numAttrs " << dec << frag.m_numAttrs << "]";
out << " [tree " << frag.m_tree << "]";
out << "]";
return out;
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2011-04-24 13:10:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2011-05-17 12:19:20 +0000
@@ -39,7 +39,7 @@ Dbtux::Dbtux(Block_context& ctx, Uint32
(sizeof(TreeEnt) & 0x3) == 0 &&
(sizeof(TreeNode) & 0x3) == 0 &&
(sizeof(DescHead) & 0x3) == 0 &&
- (sizeof(DescAttr) & 0x3) == 0
+ (sizeof(KeyType) & 0x3) == 0
);
/*
* DbtuxGen.cpp
@@ -201,7 +201,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signa
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp));
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_BATCH_SIZE, &nScanBatch));
- const Uint32 nDescPage = (nIndex * DescHeadSize + nAttribute * DescAttrSize + DescPageSize - 1) / DescPageSize;
+ const Uint32 nDescPage = (nIndex * DescHeadSize + nAttribute * KeyTypeSize + nAttribute * AttributeHeaderSize + DescPageSize - 1) / DescPageSize;
const Uint32 nScanBoundWords = nScanOp * ScanBoundSegmentSize * 4;
const Uint32 nScanLock = nScanOp * nScanBatch;
@@ -229,12 +229,14 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signa
}
// allocate buffers
c_ctx.jamBuffer = jamBuffer();
- c_ctx.c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes);
- c_ctx.c_sqlCmp = (NdbSqlUtil::Cmp**)allocRecord("c_sqlCmp", sizeof(NdbSqlUtil::Cmp*), MaxIndexAttributes);
c_ctx.c_searchKey = (Uint32*)allocRecord("c_searchKey", sizeof(Uint32), MaxAttrDataSize);
c_ctx.c_entryKey = (Uint32*)allocRecord("c_entryKey", sizeof(Uint32), MaxAttrDataSize);
- c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1);
+ c_ctx.c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxXfrmDataSize + 1) >> 1);
+
+#ifdef VM_TRACE
+ c_ctx.c_debugBuffer = (char*)allocRecord("c_debugBuffer", sizeof(char), DebugBufferBytes);
+#endif
// ack
ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -247,123 +249,79 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signa
// utils
void
-Dbtux::setKeyAttrs(TuxCtx& ctx, const Frag& frag)
+Dbtux::readKeyAttrs(TuxCtx& ctx, const Frag& frag, TreeEnt ent, KeyData& keyData, Uint32 count)
{
- Data keyAttrs = ctx.c_keyAttrs;
- NdbSqlUtil::Cmp** sqlCmp = ctx.c_sqlCmp; // global
- const unsigned numAttrs = frag.m_numAttrs;
- const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
- for (unsigned i = 0; i < numAttrs; i++) {
- thrjam(ctx.jamBuffer);
- const DescAttr& descAttr = descEnt.m_descAttr[i];
- Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
- // set attr id and fixed size
- ah(keyAttrs) = AttributeHeader(descAttr.m_primaryAttrId, size);
- keyAttrs += 1;
- // set comparison method pointer
- const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
- ndbrequire(sqlType.m_cmp != 0);
- *(sqlCmp++) = sqlType.m_cmp;
- }
-}
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+ const DescHead& descHead = getDescHead(index);
+ const AttributeHeader* keyAttrs = getKeyAttrs(descHead);
+ Uint32* const outputBuffer = ctx.c_dataBuffer;
+
+#ifdef VM_TRACE
+ ndbrequire(&keyData.get_spec() == &index.m_keySpec);
+ ndbrequire(keyData.get_spec().validate() == 0);
+ ndbrequire(count <= index.m_numAttrs);
+#endif
-void
-Dbtux::readKeyAttrs(TuxCtx& ctx, const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
-{
- ConstData keyAttrs = ctx.c_keyAttrs;
- const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
const TupLoc tupLoc = ent.m_tupLoc;
+ const Uint32 pageId = tupLoc.getPageId();
+ const Uint32 pageOffset = tupLoc.getPageOffset();
const Uint32 tupVersion = ent.m_tupVersion;
- ndbrequire(start < frag.m_numAttrs);
- const Uint32 numAttrs = frag.m_numAttrs - start;
- // skip to start position in keyAttrs only
- keyAttrs += start;
- int ret = c_tup->tuxReadAttrs(ctx.jamBuffer,
- tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(),
- tupVersion, keyAttrs, numAttrs, keyData);
- thrjamEntry(ctx.jamBuffer);
- // TODO handle error
+ const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
+ const Uint32* keyAttrs32 = (const Uint32*)&keyAttrs[0];
+
+ int ret;
+ ret = c_tup->tuxReadAttrs(ctx.jamBuffer, tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), tupVersion, keyAttrs32, count, outputBuffer, false);
+ jamEntry();
ndbrequire(ret > 0);
+ keyData.reset();
+ Uint32 len;
+ ret = keyData.add_poai(outputBuffer, count, &len);
+ ndbrequire(ret == 0);
+ ret = keyData.finalize();
+ ndbrequire(ret == 0);
+
#ifdef VM_TRACE
if (debugFlags & (DebugMaint | DebugScan)) {
- debugOut << "readKeyAttrs:" << endl;
- ConstData data = keyData;
- Uint32 totalSize = 0;
- for (Uint32 i = start; i < frag.m_numAttrs; i++) {
- Uint32 attrId = ah(data).getAttributeId();
- Uint32 dataSize = ah(data).getDataSize();
- debugOut << i << " attrId=" << attrId << " size=" << dataSize;
- data += 1;
- for (Uint32 j = 0; j < dataSize; j++) {
- debugOut << " " << hex << data[0];
- data += 1;
- }
- debugOut << endl;
- totalSize += 1 + dataSize;
- }
- ndbassert((int)totalSize == ret);
+ debugOut << "readKeyAttrs: ";
+ debugOut << " ent:" << ent << " count:" << count;
+ debugOut << " data:" << keyData.print(ctx.c_debugBuffer, DebugBufferBytes);
+ debugOut << endl;
}
#endif
}
void
-Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize)
+Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Uint32* pkData, unsigned& pkSize)
{
const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
const TupLoc tupLoc = ent.m_tupLoc;
int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), pkData, true);
jamEntry();
- // TODO handle error
ndbrequire(ret > 0);
pkSize = ret;
}
-/*
- * Copy attribute data with headers. Input is all index key data.
- * Copies whatever fits.
- */
-void
-Dbtux::copyAttrs(TuxCtx& ctx, const Frag& frag, ConstData data1, Data data2, unsigned maxlen2)
-{
- unsigned n = frag.m_numAttrs;
- unsigned len2 = maxlen2;
- while (n != 0) {
- thrjam(ctx.jamBuffer);
- const unsigned dataSize = ah(data1).getDataSize();
- // copy header
- if (len2 == 0)
- return;
- data2[0] = data1[0];
- data1 += 1;
- data2 += 1;
- len2 -= 1;
- // copy data
- for (unsigned i = 0; i < dataSize; i++) {
- if (len2 == 0)
- return;
- data2[i] = data1[i];
- len2 -= 1;
- }
- data1 += dataSize;
- data2 += dataSize;
- n -= 1;
- }
-#ifdef VM_TRACE
- memset(data2, DataFillByte, len2 << 2);
-#endif
-}
-
void
-Dbtux::unpackBound(const ScanBound& bound, Data dest)
+Dbtux::unpackBound(TuxCtx& ctx, const ScanBound& scanBound, KeyBoundC& searchBound)
{
- ScanBoundIterator iter;
- bound.first(iter);
- const unsigned n = bound.getSize();
- unsigned j;
- for (j = 0; j < n; j++) {
- dest[j] = *iter.data;
- bound.next(iter);
+ // there is no const version of LocalDataBuffer
+ DataBuffer<ScanBoundSegmentSize>::Head head = scanBound.m_head;
+ LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+ DataBuffer<ScanBoundSegmentSize>::ConstDataBufferIterator iter;
+ // always use searchKey buffer
+ Uint32* const outputBuffer = ctx.c_searchKey;
+ b.first(iter);
+ const Uint32 n = b.getSize();
+ ndbrequire(n <= MaxAttrDataSize);
+ for (Uint32 i = 0; i < n; i++) {
+ outputBuffer[i] = *iter.data;
+ b.next(iter);
}
+ // set bound to the unpacked data buffer
+ KeyDataC& searchBoundData = searchBound.get_data();
+ searchBoundData.set_buf(outputBuffer, MaxAttrDataSize << 2, scanBound.m_cnt);
+ int ret = searchBound.finalize(scanBound.m_side);
+ ndbrequire(ret == 0);
}
void
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp 2011-04-25 14:42:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp 2011-05-04 11:58:38 +0000
@@ -66,31 +66,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
findFrag(*indexPtr.p, fragId, fragPtr);
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
- // set up index keys for this operation
- setKeyAttrs(c_ctx, frag);
// set up search entry
TreeEnt ent;
ent.m_tupLoc = TupLoc(req->pageId, req->pageIndex);
ent.m_tupVersion = req->tupVersion;
- // read search key
- readKeyAttrs(c_ctx, frag, ent, 0, c_ctx.c_searchKey);
- if (! frag.m_storeNullKey) {
- // check if all keys are null
- const unsigned numAttrs = frag.m_numAttrs;
- bool allNull = true;
- for (unsigned i = 0; i < numAttrs; i++) {
- if (c_ctx.c_searchKey[i] != 0) {
- jam();
- allNull = false;
- break;
- }
- }
- if (allNull) {
- jam();
- req->errorCode = 0;
- *sig = *req;
- return;
- }
+ // set up and read search key
+ KeyData searchKey(indexPtr.p->m_keySpec, false, 0);
+ searchKey.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
+ readKeyAttrs(c_ctx, frag, ent, searchKey, indexPtr.p->m_numAttrs);
+ if (unlikely(! indexPtr.p->m_storeNullKey) &&
+ searchKey.get_null_cnt() == indexPtr.p->m_numAttrs) {
+ jam();
+ return;
}
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
@@ -110,7 +97,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
- ok = searchToAdd(c_ctx, frag, c_ctx.c_searchKey, ent, treePos);
+ ok = searchToAdd(c_ctx, frag, searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (! ok ? " - error" : "") << endl;
@@ -144,7 +131,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
case TuxMaintReq::OpRemove:
jam();
- ok = searchToRemove(c_ctx, frag, c_ctx.c_searchKey, ent, treePos);
+ ok = searchToRemove(c_ctx, frag, searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (! ok ? " - error" : "") << endl;
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp 2011-04-24 16:20:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp 2011-05-08 08:15:58 +0000
@@ -157,31 +157,36 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signa
indexPtr.p->m_state == Index::Defining &&
attrId < indexPtr.p->m_numAttrs &&
attrId == req->attrId);
- // define the attribute
- DescEnt& descEnt = getDescEnt(indexPtr.p->m_descPage, indexPtr.p->m_descOff);
- DescAttr& descAttr = descEnt.m_descAttr[attrId];
- descAttr.m_attrDesc = req->attrDescriptor;
- descAttr.m_primaryAttrId = req->primaryAttrId;
- descAttr.m_typeId = AttributeDescriptor::getType(req->attrDescriptor);
- descAttr.m_charset = (req->extTypeInfo >> 16);
-#ifdef VM_TRACE
- if (debugFlags & DebugMeta) {
- debugOut << "attr " << attrId << " " << descAttr << endl;
- }
-#endif
- // check that type is valid and has a binary comparison method
- const NdbSqlUtil::Type& type = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
- if (type.m_typeId == NdbSqlUtil::Type::Undefined ||
- type.m_cmp == 0) {
+ const Uint32 ad = req->attrDescriptor;
+ const Uint32 typeId = AttributeDescriptor::getType(ad);
+ const Uint32 sizeInBytes = AttributeDescriptor::getSizeInBytes(ad);
+ const Uint32 nullable = AttributeDescriptor::getNullable(ad);
+ const Uint32 csNumber = req->extTypeInfo >> 16;
+ const Uint32 primaryAttrId = req->primaryAttrId;
+
+ DescHead& descHead = getDescHead(*indexPtr.p);
+ // add type to spec
+ KeySpec& keySpec = indexPtr.p->m_keySpec;
+ KeyType keyType(typeId, sizeInBytes, nullable, csNumber);
+ if (keySpec.add(keyType) == -1) {
jam();
errorCode = TuxAddAttrRef::InvalidAttributeType;
break;
}
- if (descAttr.m_charset != 0) {
- uint err;
- CHARSET_INFO *cs = all_charsets[descAttr.m_charset];
+ // add primary attr to read keys array
+ AttributeHeader* keyAttrs = getKeyAttrs(descHead);
+ AttributeHeader& keyAttr = keyAttrs[attrId];
+ new (&keyAttr) AttributeHeader(primaryAttrId, sizeInBytes);
+#ifdef VM_TRACE
+ if (debugFlags & DebugMeta) {
+ debugOut << "attr " << attrId << " " << keyType << endl;
+ }
+#endif
+ if (csNumber != 0) {
+ unsigned err;
+ CHARSET_INFO *cs = all_charsets[csNumber];
ndbrequire(cs != 0);
- if ((err = NdbSqlUtil::check_column_for_ordered_index(descAttr.m_typeId, cs))) {
+ if ((err = NdbSqlUtil::check_column_for_ordered_index(typeId, cs))) {
jam();
errorCode = (TuxAddAttrRef::ErrorCode) err;
break;
@@ -196,6 +201,30 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signa
break;
}
if (lastAttr) {
+ // compute min prefix
+ const KeySpec& keySpec = indexPtr.p->m_keySpec;
+ unsigned attrs = 0;
+ unsigned bytes = keySpec.get_nullmask_len(false);
+ unsigned maxAttrs = indexPtr.p->m_numAttrs;
+#ifdef VM_TRACE
+ {
+ const char* p = NdbEnv_GetEnv("MAX_TTREE_PREF_ATTRS", (char*)0, 0);
+ if (p != 0 && p[0] != 0 && maxAttrs > (unsigned)atoi(p))
+ maxAttrs = atoi(p);
+ }
+#endif
+ while (attrs < maxAttrs) {
+ const KeyType& keyType = keySpec.get_type(attrs);
+ const unsigned newbytes = bytes + keyType.get_byte_size();
+ if (newbytes > (MAX_TTREE_PREF_SIZE << 2))
+ break;
+ attrs++;
+ bytes = newbytes;
+ }
+ if (attrs == 0)
+ bytes = 0;
+ indexPtr.p->m_prefAttrs = attrs;
+ indexPtr.p->m_prefBytes = bytes;
// fragment is defined
#ifdef VM_TRACE
if (debugFlags & DebugMeta) {
@@ -278,8 +307,6 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
fragPtr.p->m_tableId = req->primaryTableId;
fragPtr.p->m_indexId = req->tableId;
fragPtr.p->m_fragId = req->fragId;
- fragPtr.p->m_numAttrs = indexPtr.p->m_numAttrs;
- fragPtr.p->m_storeNullKey = true; // not yet configurable
fragPtr.p->m_tupIndexFragPtrI = req->tupIndexFragPtrI;
fragPtr.p->m_tupTableFragPtrI = req->tupTableFragPtrI;
fragPtr.p->m_accTableFragPtrI = req->accTableFragPtrI;
@@ -288,10 +315,6 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
indexPtr.p->m_fragId[indexPtr.p->m_numFrags] = req->fragId;
indexPtr.p->m_fragPtrI[indexPtr.p->m_numFrags] = fragPtr.i;
indexPtr.p->m_numFrags++;
-
- // copy metadata address to each fragment
- fragPtr.p->m_descPage = indexPtr.p->m_descPage;
- fragPtr.p->m_descOff = indexPtr.p->m_descOff;
#ifdef VM_TRACE
if (debugFlags & DebugMeta) {
debugOut << "Add frag " << fragPtr.i << " " << *fragPtr.p << endl;
@@ -311,7 +334,7 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
new (&tree) TreeHead();
// make these configurable later
tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
- tree.m_prefSize = MAX_TTREE_PREF_SIZE;
+ tree.m_prefSize = (indexPtr.p->m_prefBytes + 3) / 4;
const unsigned maxSlack = MAX_TTREE_NODE_SLACK;
// size of header and min prefix
const unsigned fixedSize = NodeHeadSize + tree.m_prefSize;
@@ -542,7 +565,7 @@ bool
Dbtux::allocDescEnt(IndexPtr indexPtr)
{
jam();
- const unsigned size = DescHeadSize + indexPtr.p->m_numAttrs * DescAttrSize;
+ const Uint32 size = getDescSize(*indexPtr.p);
DescPagePtr pagePtr;
pagePtr.i = c_descPageList;
while (pagePtr.i != RNIL) {
@@ -570,9 +593,13 @@ Dbtux::allocDescEnt(IndexPtr indexPtr)
indexPtr.p->m_descPage = pagePtr.i;
indexPtr.p->m_descOff = DescPageSize - pagePtr.p->m_numFree;
pagePtr.p->m_numFree -= size;
- DescEnt& descEnt = getDescEnt(indexPtr.p->m_descPage, indexPtr.p->m_descOff);
- descEnt.m_descHead.m_indexId = indexPtr.i;
- descEnt.m_descHead.pad1 = 0;
+ DescHead& descHead = *(DescHead*)&pagePtr.p->m_data[indexPtr.p->m_descOff];
+ descHead.m_indexId = indexPtr.i;
+ descHead.m_numAttrs = indexPtr.p->m_numAttrs;
+ descHead.m_magic = DescHead::Magic;
+ KeySpec& keySpec = indexPtr.p->m_keySpec;
+ KeyType* keyTypes = getKeyTypes(descHead);
+ keySpec.set_buf(keyTypes, indexPtr.p->m_numAttrs);
return true;
}
@@ -582,36 +609,36 @@ Dbtux::freeDescEnt(IndexPtr indexPtr)
DescPagePtr pagePtr;
c_descPagePool.getPtr(pagePtr, indexPtr.p->m_descPage);
Uint32* const data = pagePtr.p->m_data;
- const unsigned size = DescHeadSize + indexPtr.p->m_numAttrs * DescAttrSize;
- unsigned off = indexPtr.p->m_descOff;
+ const Uint32 size = getDescSize(*indexPtr.p);
+ Uint32 off = indexPtr.p->m_descOff;
// move the gap to the free area at the top
while (off + size < DescPageSize - pagePtr.p->m_numFree) {
jam();
// next entry to move over the gap
- DescEnt& descEnt2 = *(DescEnt*)&data[off + size];
- Uint32 indexId2 = descEnt2.m_descHead.m_indexId;
+ DescHead& descHead2 = *(DescHead*)&data[off + size];
+ Uint32 indexId2 = descHead2.m_indexId;
Index& index2 = *c_indexPool.getPtr(indexId2);
- unsigned size2 = DescHeadSize + index2.m_numAttrs * DescAttrSize;
+ Uint32 size2 = getDescSize(index2);
ndbrequire(
index2.m_descPage == pagePtr.i &&
- index2.m_descOff == off + size);
+ index2.m_descOff == off + size &&
+ index2.m_numAttrs == descHead2.m_numAttrs);
// move the entry (overlapping copy if size < size2)
- unsigned i;
+ Uint32 i;
for (i = 0; i < size2; i++) {
jam();
data[off + i] = data[off + size + i];
}
off += size2;
- // adjust page offset in index and all fragments
+ // adjust page offset in index
index2.m_descOff -= size;
- for (i = 0; i < index2.m_numFrags; i++) {
- jam();
- Frag& frag2 = *c_fragPool.getPtr(index2.m_fragPtrI[i]);
- frag2.m_descOff -= size;
- ndbrequire(
- frag2.m_descPage == index2.m_descPage &&
- frag2.m_descOff == index2.m_descOff);
- }
+ {
+ // move KeySpec pointer
+ DescHead& descHead2 = getDescHead(index2);
+ KeyType* keyType2 = getKeyTypes(descHead2);
+ index2.m_keySpec.set_buf(keyType2);
+ ndbrequire(index2.m_keySpec.validate() == 0);
+ }
}
ndbrequire(off + size == DescPageSize - pagePtr.p->m_numFree);
pagePtr.p->m_numFree += size;
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-04-24 16:20:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-05-17 12:19:20 +0000
@@ -148,16 +148,26 @@ Dbtux::freePreallocatedNode(Frag& frag)
}
/*
- * Set prefix. Copies the number of words that fits. Includes
- * attribute headers for now. XXX use null mask instead
+ * Set prefix. Copies the defined number of attributes.
*/
void
Dbtux::setNodePref(TuxCtx & ctx, NodeHandle& node)
{
const Frag& frag = node.m_frag;
- const TreeHead& tree = frag.m_tree;
- readKeyAttrs(ctx, frag, node.getEnt(0), 0, ctx.c_entryKey);
- copyAttrs(ctx, frag, ctx.c_entryKey, node.getPref(), tree.m_prefSize);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+ KeyData prefKey(index.m_keySpec, false, 0);
+ prefKey.set_buf(node.getPref(), index.m_prefBytes);
+ if (index.m_prefAttrs > 0) {
+ jam();
+ readKeyAttrs(ctx, frag, node.getEnt(0), prefKey, index.m_prefAttrs);
+ }
+#ifdef VM_TRACE
+ if (debugFlags & DebugMaint) {
+ debugOut << "setNodePref: " << node;
+ debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+ debugOut << endl;
+ }
+#endif
}
// node operations
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp 2011-04-25 15:57:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp 2011-05-17 12:19:20 +0000
@@ -75,7 +75,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
errorCode = AccScanRef::TuxNoFreeScanOp;
break;
}
- new (scanPtr.p) ScanOp(c_scanBoundPool);
+ new (scanPtr.p) ScanOp;
scanPtr.p->m_state = ScanOp::First;
scanPtr.p->m_userPtr = req->senderData;
scanPtr.p->m_userRef = req->senderRef;
@@ -131,9 +131,9 @@ Dbtux::execACC_SCANREQ(Signal* signal)
* Check that sets of lower and upper bounds are on initial sequences of
* keys and that all but possibly last bound is non-strict.
*
- * Finally save the sets of lower and upper bounds (i.e. start key and
- * end key). Full bound type is included but only the strict bit is
- * used since lower and upper have now been separated.
+ * Finally convert the sets of lower and upper bounds (i.e. start key
+ * and end key) to NdbPack format. The data is saved in segmented
+ * memory. The bound is reconstructed at use time via unpackBound().
*
* Error handling: Error code is set in the scan and also returned in
* EXECUTE_DIRECT (the old way).
@@ -143,174 +143,144 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal
{
jamEntry();
// get records
- TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
- const TuxBoundInfo* const req = (const TuxBoundInfo*)sig;
+ TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI);
const Index& index = *c_indexPool.getPtr(scan.m_indexId);
- const DescEnt& descEnt = getDescEnt(index.m_descPage, index.m_descOff);
- // collect normalized lower and upper bounds
- struct BoundInfo {
- int type2; // with EQ -> LE/GE
- Uint32 offset; // offset in xfrmData
- Uint32 size;
- };
- BoundInfo boundInfo[2][MaxIndexAttributes];
- const unsigned dstSize = MaxAttrDataSize;
- // use some static buffer (they are only used within a timeslice)
- Uint32* const xfrmData = c_dataBuffer;
- Uint32 dstPos = 0;
- // largest attrId seen plus one
- Uint32 maxAttrId[2] = { 0, 0 };
- // walk through entries
- const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
- Uint32 offset = 0;
- while (offset + 2 <= req->boundAiLength) {
- jam();
- const unsigned type = data[offset];
- const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
- const Uint32 attrId = ah->getAttributeId();
- const Uint32 byteSize = ah->getByteSize();
- const Uint32 dataSize = ah->getDataSize();
- if (type > 4 || attrId >= index.m_numAttrs || dstPos + 2 + dataSize > dstSize) {
- jam();
- scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
- sig->errorCode = scan.m_errorCode;
- return;
- }
- // copy header
- xfrmData[dstPos + 0] = data[offset + 0];
- xfrmData[dstPos + 1] = data[offset + 1];
- // copy bound value
- Uint32 dstBytes = 0;
- Uint32 dstWords = 0;
- if (! ah->isNULL()) {
- jam();
- const uchar* srcPtr = (const uchar*)&data[offset + 2];
- const DescAttr& descAttr = descEnt.m_descAttr[attrId];
- Uint32 typeId = descAttr.m_typeId;
- Uint32 maxBytes = AttributeDescriptor::getSizeInBytes(descAttr.m_attrDesc);
- Uint32 lb, len;
- bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, maxBytes, lb, len);
- if (! ok) {
- jam();
- scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
- sig->errorCode = scan.m_errorCode;
- return;
- }
- Uint32 srcBytes = lb + len;
- Uint32 srcWords = (srcBytes + 3) / 4;
- if (srcBytes != byteSize) {
+ const DescHead& descHead = getDescHead(index);
+ const KeyType* keyTypes = getKeyTypes(descHead);
+ // extract lower and upper bound in separate passes
+ for (unsigned idir = 0; idir <= 1; idir++) {
+ jam();
+ struct BoundInfo {
+ int type2; // with EQ -> LE/GE
+ Uint32 offset; // word offset in signal data
+ Uint32 bytes;
+ };
+ BoundInfo boundInfo[MaxIndexAttributes];
+ // largest attrId seen plus one
+ Uint32 maxAttrId = 0;
+ const Uint32* const data = &req->data[0];
+ Uint32 offset = 0;
+ while (offset + 2 <= req->boundAiLength) {
+ jam();
+ const Uint32 type = data[offset];
+ const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
+ const Uint32 attrId = ah->getAttributeId();
+ const Uint32 byteSize = ah->getByteSize();
+ const Uint32 dataSize = ah->getDataSize();
+ // check type
+ if (unlikely(type > 4)) {
jam();
scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
- sig->errorCode = scan.m_errorCode;
+ req->errorCode = scan.m_errorCode;
return;
}
- uchar* dstPtr = (uchar*)&xfrmData[dstPos + 2];
- if (descAttr.m_charset == 0) {
- memcpy(dstPtr, srcPtr, srcWords << 2);
- dstBytes = srcBytes;
- dstWords = srcWords;
- } else {
+ Uint32 type2 = type;
+ if (type2 == 4) {
jam();
- CHARSET_INFO* cs = all_charsets[descAttr.m_charset];
- Uint32 xmul = cs->strxfrm_multiply;
- if (xmul == 0)
- xmul = 1;
- // see comment in DbtcMain.cpp
- Uint32 dstLen = xmul * (maxBytes - lb);
- if (dstLen > ((dstSize - dstPos) << 2)) {
+ type2 = (idir << 1); // LE=0 GE=2
+ }
+ // check if attribute belongs to this bound
+ if ((type2 & 0x2) == (idir << 1)) {
+ if (unlikely(attrId >= index.m_numAttrs)) {
jam();
- scan.m_errorCode = TuxBoundInfo::TooMuchAttrInfo;
- sig->errorCode = scan.m_errorCode;
+ scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
+ req->errorCode = scan.m_errorCode;
return;
}
- int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
- ndbrequire(n != -1);
- dstBytes = n;
- while ((n & 3) != 0) {
- dstPtr[n++] = 0;
+ // mark entries in any gap as undefined
+ while (maxAttrId <= attrId) {
+ jam();
+ BoundInfo& b = boundInfo[maxAttrId];
+ b.type2 = -1;
+ maxAttrId++;
}
- dstWords = n / 4;
- }
- }
- for (unsigned j = 0; j <= 1; j++) {
- jam();
- // check if lower/upper bit matches
- const unsigned luBit = (j << 1);
- if ((type & 0x2) != luBit && type != 4)
- continue;
- // EQ -> LE, GE
- const unsigned type2 = (type & 0x1) | luBit;
- // fill in any gap
- while (maxAttrId[j] <= attrId) {
- jam();
- BoundInfo& b = boundInfo[j][maxAttrId[j]];
- maxAttrId[j]++;
- b.type2 = -1;
- }
- BoundInfo& b = boundInfo[j][attrId];
- if (b.type2 != -1) {
- // compare with previously defined bound
- if (b.type2 != (int)type2 ||
- b.size != 2 + dstWords ||
- memcmp(&xfrmData[b.offset + 2], &xfrmData[dstPos + 2], dstWords << 2) != 0) {
+ BoundInfo& b = boundInfo[attrId];
+ // duplicate no longer allowed (wl#4163)
+ if (unlikely(b.type2 != -1)) {
jam();
scan.m_errorCode = TuxBoundInfo::InvalidBounds;
- sig->errorCode = scan.m_errorCode;
+ req->errorCode = scan.m_errorCode;
return;
}
- } else {
- // fix length
- AttributeHeader* ah = (AttributeHeader*)&xfrmData[dstPos + 1];
- ah->setByteSize(dstBytes);
- // enter new bound
- jam();
- b.type2 = type2;
- b.offset = dstPos;
- b.size = 2 + dstWords;
+ b.type2 = (int)type2;
+ b.offset = offset + 1; // poai
+ b.bytes = byteSize;
}
+ // jump to next
+ offset += 2 + dataSize;
}
- // jump to next
- offset += 2 + dataSize;
- dstPos += 2 + dstWords;
- }
- if (offset != req->boundAiLength) {
- jam();
- scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
- sig->errorCode = scan.m_errorCode;
- return;
- }
- for (unsigned j = 0; j <= 1; j++) {
- // save lower/upper bound in index attribute id order
- for (unsigned i = 0; i < maxAttrId[j]; i++) {
- jam();
- const BoundInfo& b = boundInfo[j][i];
- // check for gap or strict bound before last
- if (b.type2 == -1 || (i + 1 < maxAttrId[j] && (b.type2 & 0x1))) {
- jam();
- scan.m_errorCode = TuxBoundInfo::InvalidBounds;
- sig->errorCode = scan.m_errorCode;
- return;
- }
- bool ok = scan.m_bound[j]->append(&xfrmData[b.offset], b.size);
- if (! ok) {
+ if (unlikely(offset != req->boundAiLength)) {
+ jam();
+ scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
+ req->errorCode = scan.m_errorCode;
+ return;
+ }
+ // check and pack the bound data
+ KeyData searchBoundData(index.m_keySpec, true, 0);
+ KeyBound searchBound(searchBoundData);
+ searchBoundData.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
+ int strict = 0; // 0 or 1
+ Uint32 i;
+ for (i = 0; i < maxAttrId; i++) {
+ jam();
+ const BoundInfo& b = boundInfo[i];
+ // check for gap or strict bound before last
+ strict = (b.type2 & 0x1);
+ if (unlikely(b.type2 == -1 || (i + 1 < maxAttrId && strict))) {
+ jam();
+ scan.m_errorCode = TuxBoundInfo::InvalidBounds;
+ req->errorCode = scan.m_errorCode;
+ return;
+ }
+ Uint32 len;
+ if (unlikely(searchBoundData.add_poai(&data[b.offset], &len) == -1 ||
+ b.bytes != len)) {
+ jam();
+ scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
+ req->errorCode = scan.m_errorCode;
+ return;
+ }
+ }
+ int side = 0;
+ if (maxAttrId != 0) {
+ // arithmetic is faster
+ // side = (idir == 0 ? (strict ? +1 : -1) : (strict ? -1 : +1));
+ side = (-1) * (1 - 2 * strict) * (1 - 2 * int(idir));
+ }
+ if (unlikely(searchBound.finalize(side) == -1)) {
+ jam();
+ scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
+ req->errorCode = scan.m_errorCode;
+ return;
+ }
+ ScanBound& scanBound = scan.m_scanBound[idir];
+ scanBound.m_cnt = maxAttrId;
+ scanBound.m_side = side;
+ // save data words in segmented memory
+ {
+ DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
+ LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+ const Uint32* data = (const Uint32*)searchBoundData.get_data_buf();
+ Uint32 size = (searchBoundData.get_data_len() + 3) / 4;
+ bool ok = b.append(data, size);
+ if (unlikely(!ok)) {
jam();
scan.m_errorCode = TuxBoundInfo::OutOfBuffers;
- sig->errorCode = scan.m_errorCode;
+ req->errorCode = scan.m_errorCode;
return;
}
}
- scan.m_boundCnt[j] = maxAttrId[j];
}
if (ERROR_INSERTED(12009)) {
jam();
CLEAR_ERROR_INSERT_VALUE;
scan.m_errorCode = TuxBoundInfo::InvalidBounds;
- sig->errorCode = scan.m_errorCode;
+ req->errorCode = scan.m_errorCode;
return;
}
// no error
- sig->errorCode = 0;
+ req->errorCode = 0;
}
void
@@ -478,7 +448,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal
scanFind(scanPtr);
}
// for reading tuple key in Found or Locked state
- Data pkData = c_dataBuffer;
+ Uint32* pkData = c_ctx.c_dataBuffer;
unsigned pkSize = 0; // indicates not yet done
if (scan.m_state == ScanOp::Found) {
// found an entry to return
@@ -757,18 +727,21 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
}
#endif
- // set up index keys for this operation
- setKeyAttrs(c_ctx, frag);
// scan direction 0, 1
const unsigned idir = scan.m_descending;
- unpackBound(*scan.m_bound[idir], c_dataBuffer);
+ // set up bound from segmented memory
+ const ScanBound& scanBound = scan.m_scanBound[idir];
+ KeyDataC searchBoundData(index.m_keySpec, true);
+ KeyBoundC searchBound(searchBoundData);
+ unpackBound(c_ctx, scanBound, searchBound);
TreePos treePos;
- searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
+ searchToScan(frag, idir, searchBound, treePos);
if (treePos.m_loc != NullTupLoc) {
scan.m_scanPos = treePos;
// link the scan to node found
@@ -779,11 +752,15 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
jam();
// check upper bound
TreeEnt ent = node.getEnt(treePos.m_pos);
- if (scanCheck(scanPtr, ent))
+ if (scanCheck(scanPtr, ent)) {
+ jam();
scan.m_state = ScanOp::Current;
- else
+ } else {
+ jam();
scan.m_state = ScanOp::Last;
+ }
} else {
+ jam();
scan.m_state = ScanOp::Next;
}
} else {
@@ -874,8 +851,6 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool
#endif
// cannot be moved away from tuple we have locked
ndbrequire(scan.m_state != ScanOp::Locked);
- // set up index keys for this operation
- setKeyAttrs(c_ctx, frag);
// scan direction
const unsigned idir = scan.m_descending; // 0, 1
const int jdir = 1 - 2 * (int)idir; // 1, -1
@@ -1030,17 +1005,34 @@ Dbtux::scanCheck(ScanOpPtr scanPtr, Tree
return false;
}
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
const unsigned idir = scan.m_descending;
const int jdir = 1 - 2 * (int)idir;
- unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
- unsigned boundCnt = scan.m_boundCnt[1 - idir];
- readKeyAttrs(c_ctx, frag, ent, 0, c_ctx.c_entryKey);
- int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, boundCnt, c_ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
- if (jdir * ret > 0)
- return true;
- // hit upper bound of single range scan
- return false;
+ const ScanBound& scanBound = scan.m_scanBound[1 - idir];
+ int ret = 0;
+ if (scanBound.m_cnt != 0) {
+ jam();
+ // set up bound from segmented memory
+ KeyDataC searchBoundData(index.m_keySpec, true);
+ KeyBoundC searchBound(searchBoundData);
+ unpackBound(c_ctx, scanBound, searchBound);
+ // key data for the entry
+ KeyData entryKey(index.m_keySpec, true, 0);
+ entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
+ readKeyAttrs(c_ctx, frag, ent, entryKey, index.m_numAttrs);
+ // compare bound to key
+ const Uint32 boundCount = searchBound.get_data().get_cnt();
+ ret = cmpSearchBound(c_ctx, searchBound, entryKey, boundCount);
+ ndbrequire(ret != 0);
+ ret = (-1) * ret; // reverse for key vs bound
+ ret = jdir * ret; // reverse for descending scan
+ }
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Check scan " << scanPtr.i << " " << scan << " ret:" << dec << ret << endl;
+ }
+#endif
+ return (ret <= 0);
}
/*
@@ -1207,8 +1199,12 @@ Dbtux::releaseScanOp(ScanOpPtr& scanPtr)
}
#endif
Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
- scanPtr.p->m_boundMin.release();
- scanPtr.p->m_boundMax.release();
+ for (unsigned i = 0; i <= 1; i++) {
+ ScanBound& scanBound = scanPtr.p->m_scanBound[i];
+ DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
+ LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+ b.release();
+ }
// unlink from per-fragment list and release from pool
frag.m_scanList.release(scanPtr);
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp 2011-04-27 21:13:10 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp 2011-05-17 12:19:20 +0000
@@ -32,25 +32,30 @@
* is within the node.
*/
void
-Dbtux::findNodeToUpdate(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode)
+Dbtux::findNodeToUpdate(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode)
{
- const TreeHead& tree = frag.m_tree;
- const Uint32 numAttrs = frag.m_numAttrs;
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+ const Uint32 numAttrs = index.m_numAttrs;
+ const Uint32 prefAttrs = index.m_prefAttrs;
+ const Uint32 prefBytes = index.m_prefBytes;
+ KeyData entryKey(index.m_keySpec, false, 0);
+ entryKey.set_buf(ctx.c_entryKey, MaxAttrDataSize << 2);
+ KeyDataC prefKey(index.m_keySpec, false);
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
thrjam(ctx.jamBuffer);
selectNode(currNode, currNode.m_loc);
- int ret;
- // compare prefix
- unsigned start = 0;
- ret = cmpSearchKey(ctx, frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
- if (ret == NdbSqlUtil::CmpUnknown) {
+ prefKey.set_buf(currNode.getPref(), prefBytes, prefAttrs);
+ int ret = 0;
+ if (prefAttrs > 0) {
+ thrjam(ctx.jamBuffer);
+ ret = cmpSearchKey(ctx, searchKey, prefKey, prefAttrs);
+ }
+ if (ret == 0 && prefAttrs < numAttrs) {
thrjam(ctx.jamBuffer);
- // read and compare remaining attributes
- ndbrequire(start < numAttrs);
- readKeyAttrs(ctx, frag, currNode.getEnt(0), start, ctx.c_entryKey);
- ret = cmpSearchKey(ctx, frag, start, searchKey, ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+ // read and compare all attributes
+ readKeyAttrs(ctx, frag, currNode.getEnt(0), entryKey, numAttrs);
+ ret = cmpSearchKey(ctx, searchKey, entryKey, numAttrs);
}
if (ret == 0) {
thrjam(ctx.jamBuffer);
@@ -97,20 +102,20 @@ Dbtux::findNodeToUpdate(TuxCtx& ctx, Fra
* search. Return true if ok i.e. entry to add is not a duplicate.
*/
bool
-Dbtux::findPosToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
+Dbtux::findPosToAdd(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
{
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
int lo = -1;
int hi = (int)currNode.getOccup();
- int ret;
+ KeyData entryKey(index.m_keySpec, false, 0);
+ entryKey.set_buf(ctx.c_entryKey, MaxAttrDataSize << 2);
while (hi - lo > 1) {
thrjam(ctx.jamBuffer);
// hi - lo > 1 implies lo < j < hi
int j = (hi + lo) / 2;
- // read and compare attributes
- unsigned start = 0;
- readKeyAttrs(ctx, frag, currNode.getEnt(j), start, ctx.c_entryKey);
- ret = cmpSearchKey(ctx, frag, start, searchKey, ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+ // read and compare all attributes
+ readKeyAttrs(ctx, frag, currNode.getEnt(j), entryKey, index.m_numAttrs);
+ int ret = cmpSearchKey(ctx, searchKey, entryKey, index.m_numAttrs);
if (ret == 0) {
thrjam(ctx.jamBuffer);
// keys are equal, compare entry values
@@ -139,7 +144,7 @@ Dbtux::findPosToAdd(TuxCtx& ctx, Frag& f
* search. Return true if ok i.e. the entry was found.
*/
bool
-Dbtux::findPosToRemove(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
+Dbtux::findPosToRemove(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
{
const unsigned occup = currNode.getOccup();
for (unsigned j = 0; j < occup; j++) {
@@ -160,7 +165,7 @@ Dbtux::findPosToRemove(TuxCtx& ctx, Frag
* Search for entry to add.
*/
bool
-Dbtux::searchToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToAdd(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
@@ -183,7 +188,7 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
* Search for entry to remove.
*/
bool
-Dbtux::searchToRemove(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToRemove(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
@@ -208,22 +213,38 @@ Dbtux::searchToRemove(TuxCtx& ctx, Frag&
* Search within the found node is done by caller.
*/
void
-Dbtux::findNodeToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode)
+Dbtux::findNodeToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode)
{
- const TreeHead& tree = frag.m_tree;
+ const int jdir = 1 - 2 * int(idir);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+ const Uint32 numAttrs = searchBound.get_data().get_cnt();
+ const Uint32 prefAttrs = min(index.m_prefAttrs, numAttrs);
+ const Uint32 prefBytes = index.m_prefBytes;
+ KeyData entryKey(index.m_keySpec, false, 0);
+ entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
+ KeyDataC prefKey(index.m_keySpec, false);
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(currNode, currNode.m_loc);
- int ret;
- // compare prefix
- ret = cmpScanBound(frag, idir, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
- if (ret == NdbSqlUtil::CmpUnknown) {
+ prefKey.set_buf(currNode.getPref(), prefBytes, prefAttrs);
+ int ret = 0;
+ if (numAttrs > 0) {
+ if (prefAttrs > 0) {
+ jam();
+ // compare node prefix - result 0 implies bound is longer
+ ret = cmpSearchBound(c_ctx, searchBound, prefKey, prefAttrs);
+ }
+ if (ret == 0) {
+ jam();
+ // read and compare all attributes
+ readKeyAttrs(c_ctx, frag, currNode.getEnt(0), entryKey, numAttrs);
+ ret = cmpSearchBound(c_ctx, searchBound, entryKey, numAttrs);
+ ndbrequire(ret != 0);
+ }
+ } else {
jam();
- // read and compare all attributes
- readKeyAttrs(c_ctx, frag, currNode.getEnt(0), 0, c_ctx.c_entryKey);
- ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+ ret = (-1) * jdir;
}
if (ret < 0) {
// bound is left of this node
@@ -266,23 +287,26 @@ Dbtux::findNodeToScan(Frag& frag, unsign
* search similar to findPosToAdd().
*/
void
-Dbtux::findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos)
+Dbtux::findPosToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode, Uint16* pos)
{
const int jdir = 1 - 2 * int(idir);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+ const Uint32 numAttrs = searchBound.get_data().get_cnt();
int lo = -1;
int hi = (int)currNode.getOccup();
+ KeyData entryKey(index.m_keySpec, false, 0);
+ entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
while (hi - lo > 1) {
jam();
// hi - lo > 1 implies lo < j < hi
int j = (hi + lo) / 2;
int ret = (-1) * jdir;
- if (boundCount != 0) {
- // read and compare attributes
- const TreeEnt currEnt = currNode.getEnt(j);
- readKeyAttrs(c_ctx, frag, currEnt, 0, c_ctx.c_entryKey);
- ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
+ if (numAttrs != 0) {
+ // read and compare all attributes
+ readKeyAttrs(c_ctx, frag, currNode.getEnt(j), entryKey, numAttrs);
+ ret = cmpSearchBound(c_ctx, searchBound, entryKey, numAttrs);
+ ndbrequire(ret != 0);
}
- ndbrequire(ret != 0);
if (ret < 0) {
jam();
hi = j;
@@ -302,7 +326,7 @@ Dbtux::findPosToScan(Frag& frag, unsigne
* Search for scan start position.
*/
void
-Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
+Dbtux::searchToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
@@ -312,11 +336,10 @@ Dbtux::searchToScan(Frag& frag, ConstDat
jam();
return;
}
- const unsigned idir = unsigned(descending);
- findNodeToScan(frag, idir, boundInfo, boundCount, currNode);
+ findNodeToScan(frag, idir, searchBound, currNode);
treePos.m_loc = currNode.m_loc;
Uint16 pos;
- findPosToScan(frag, idir, boundInfo, boundCount, currNode, &pos);
+ findPosToScan(frag, idir, searchBound, currNode, &pos);
const unsigned occup = currNode.getOccup();
if (idir == 0) {
if (pos < occup) {
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp 2011-02-01 21:05:11 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp 2011-05-04 12:08:38 +0000
@@ -49,22 +49,18 @@ Dbtux::statRecordsInRange(ScanOpPtr scan
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+ const Index& index = *c_indexPool.getPtr(frag.m_indexId);
TreeHead& tree = frag.m_tree;
// get first and last position
TreePos pos1 = scan.m_scanPos;
TreePos pos2;
{ // as in scanFirst()
- setKeyAttrs(c_ctx, frag);
const unsigned idir = 1;
- const ScanBound& bound = *scan.m_bound[idir];
- ScanBoundIterator iter;
- bound.first(iter);
- for (unsigned j = 0; j < bound.getSize(); j++) {
- jam();
- c_dataBuffer[j] = *iter.data;
- bound.next(iter);
- }
- searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], true, pos2);
+ const ScanBound& scanBound = scan.m_scanBound[idir];
+ KeyDataC searchBoundData(index.m_keySpec, true);
+ KeyBoundC searchBound(searchBoundData);
+ unpackBound(c_ctx, scanBound, searchBound);
+ searchToScan(frag, idir, searchBound, pos2);
// committed read (same timeslice) and range not empty
ndbrequire(pos2.m_loc != NullTupLoc);
}
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp 2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp 2011-05-23 10:38:41 +0000
@@ -30,17 +30,17 @@
#include <EventLogger.hpp>
extern EventLogger * g_eventLogger;
-AsyncIoThread::AsyncIoThread(class Ndbfs& fs, AsyncFile* file)
+AsyncIoThread::AsyncIoThread(class Ndbfs& fs, bool bound)
: m_fs(fs)
{
- m_current_file = file;
- if (file)
+ m_current_file = 0;
+ if (bound)
{
- theMemoryChannelPtr = &theMemoryChannel;
+ theMemoryChannelPtr = &m_fs.theToBoundThreads;
}
else
{
- theMemoryChannelPtr = &m_fs.theToThreads;
+ theMemoryChannelPtr = &m_fs.theToUnboundThreads;
}
theReportTo = &m_fs.theFromThreads;
}
@@ -149,13 +149,17 @@ AsyncIoThread::run()
switch (request->action) {
case Request::open:
file->openReq(request);
+ if (request->error == 0 && request->m_do_bind)
+ attach(file);
break;
case Request::close:
file->closeReq(request);
+ detach(file);
break;
case Request::closeRemove:
file->closeReq(request);
file->removeReq(request);
+ detach(file);
break;
case Request::readPartial:
case Request::read:
@@ -265,3 +269,32 @@ AsyncIoThread::buildIndxReq(Request* req
req.buffer_size = request->file->m_page_cnt * sizeof(GlobalPage);
request->error = (* req.func_ptr)(&req);
}
+
+void
+AsyncIoThread::attach(AsyncFile* file)
+{
+ assert(m_current_file == 0);
+ assert(theMemoryChannelPtr == &m_fs.theToBoundThreads);
+ m_current_file = file;
+ theMemoryChannelPtr = &theMemoryChannel;
+ file->attach(this);
+ m_fs.cnt_active_bound(1);
+}
+
+void
+AsyncIoThread::detach(AsyncFile* file)
+{
+ if (m_current_file == 0)
+ {
+ assert(file->getThread() == 0);
+ }
+ else
+ {
+ assert(m_current_file == file);
+ assert(theMemoryChannelPtr = &theMemoryChannel);
+ m_current_file = 0;
+ theMemoryChannelPtr = &m_fs.theToBoundThreads;
+ file->detach(this);
+ m_fs.cnt_active_bound(-1);
+ }
+}
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp 2011-04-21 09:21:18 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp 2011-05-23 10:38:41 +0000
@@ -43,6 +43,8 @@ class Request
public:
Request() {}
+ void atGet() { m_do_bind = false; }
+
enum Action {
open,
close,
@@ -113,6 +115,7 @@ public:
// Information for open, needed if the first open action fails.
AsyncFile* file;
Uint32 theTrace;
+ bool m_do_bind;
MemoryChannel<Request>::ListMember m_mem_channel;
};
@@ -134,7 +137,7 @@ class AsyncIoThread
friend class Ndbfs;
friend class AsyncFile;
public:
- AsyncIoThread(class Ndbfs&, AsyncFile* file);
+ AsyncIoThread(class Ndbfs&, bool bound);
virtual ~AsyncIoThread() {};
struct NdbThread* doStart();
@@ -174,6 +177,8 @@ private:
*/
void buildIndxReq(Request*);
+ void attach(AsyncFile*);
+ void detach(AsyncFile*);
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2011-04-21 09:21:18 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2011-05-23 10:38:41 +0000
@@ -45,6 +45,8 @@
#include <EventLogger.hpp>
extern EventLogger * g_eventLogger;
+NdbMutex g_active_bound_threads_mutex;
+
inline
int pageSize( const NewVARIABLE* baseAddrRef )
{
@@ -62,10 +64,15 @@ Ndbfs::Ndbfs(Block_context& ctx) :
scanningInProgress(false),
theLastId(0),
theRequestPool(0),
- m_maxOpenedFiles(0)
+ m_maxOpenedFiles(0),
+ m_bound_threads_cnt(0),
+ m_unbounds_threads_cnt(0),
+ m_active_bound_threads_cnt(0)
{
BLOCK_CONSTRUCTOR(Ndbfs);
+ NdbMutex_Init(&g_active_bound_threads_mutex);
+
// Set received signals
addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ);
addRecSignal(GSN_DUMP_STATE_ORD, &Ndbfs::execDUMP_STATE_ORD);
@@ -100,7 +107,8 @@ Ndbfs::~Ndbfs()
request.action = Request::end;
for (unsigned i = 0; i < theThreads.size(); i++)
{
- theToThreads.writeChannel(&request);
+ theToBoundThreads.writeChannel(&request);
+ theToUnboundThreads.writeChannel(&request);
}
for (unsigned i = 0; i < theThreads.size(); i++)
@@ -274,7 +282,12 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
// Create idle AsyncFiles
for (Uint32 i = 0; i < noIdleFiles; i++)
{
- theIdleBoundFiles.push_back(createAsyncFile(true /* bound */));
+ theIdleFiles.push_back(createAsyncFile());
+ AsyncIoThread * thr = createIoThread(/* bound */ true);
+ if (thr)
+ {
+ theThreads.push_back(thr);
+ }
}
Uint32 threadpool = 2;
@@ -283,7 +296,7 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
// Create IoThreads
for (Uint32 i = 0; i < threadpool; i++)
{
- AsyncIoThread * thr = createIoThread(0);
+ AsyncIoThread * thr = createIoThread(/* bound */ false);
if (thr)
{
jam();
@@ -339,7 +352,7 @@ Ndbfs::execSTTOR(Signal* signal)
ndbrequire(0);
}
-int
+int
Ndbfs::forward( AsyncFile * file, Request* request)
{
jam();
@@ -348,9 +361,13 @@ Ndbfs::forward( AsyncFile * file, Reques
{
thr->dispatch(request);
}
+ else if (request->m_do_bind)
+ {
+ theToBoundThreads.writeChannel(request);
+ }
else
{
- theToThreads.writeChannel(request);
+ theToUnboundThreads.writeChannel(request);
}
return 1;
}
@@ -444,7 +461,8 @@ Ndbfs::execFSOPENREQ(Signal* signal)
request->par.open.file_size <<= 32;
request->par.open.file_size |= fsOpenReq->file_size_lo;
request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
-
+ request->m_do_bind = bound;
+
ndbrequire(forward(file, request));
}
@@ -454,7 +472,8 @@ Ndbfs::execFSREMOVEREQ(Signal* signal)
jamEntry();
const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
const BlockReference userRef = req->userReference;
- AsyncFile* file = getIdleFile(true);
+ bool bound = true;
+ AsyncFile* file = getIdleFile(bound);
ndbrequire(file != NULL);
SectionHandle handle(this, signal);
@@ -479,7 +498,8 @@ Ndbfs::execFSREMOVEREQ(Signal* signal)
request->set(userRef, req->userPointer, newId() );
request->file = file;
request->theTrace = signal->getTrace();
-
+ request->m_do_bind = bound;
+
if (version == 6)
{
ndbrequire(bp < NDB_ARRAY_SIZE(m_base_path));
@@ -541,6 +561,7 @@ Ndbfs::execFSCLOSEREQ(Signal * signal)
request->file = openFile;
request->error = 0;
request->theTrace = signal->getTrace();
+ request->m_do_bind = false;
ndbrequire(forward(openFile, request));
}
@@ -584,6 +605,7 @@ Ndbfs::readWriteRequest(int action, Sign
request->file = openFile;
request->action = (Request::Action) action;
request->theTrace = signal->getTrace();
+ request->m_do_bind = false;
Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag);
@@ -804,7 +826,8 @@ Ndbfs::execFSSYNCREQ(Signal * signal)
request->set(userRef, userPointer, filePointer);
request->file = openFile;
request->theTrace = signal->getTrace();
-
+ request->m_do_bind = false;
+
ndbrequire(forward(openFile,request));
}
@@ -832,6 +855,7 @@ Ndbfs::execFSSUSPENDORD(Signal * signal)
request->file = openFile;
request->theTrace = signal->getTrace();
request->par.suspend.milliseconds = millis;
+ request->m_do_bind = false;
ndbrequire(forward(openFile,request));
}
@@ -895,6 +919,7 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
request->action = Request::append;
else
request->action = Request::append_synch;
+ request->m_do_bind = false;
ndbrequire(forward(openFile, request));
return;
@@ -918,7 +943,8 @@ Ndbfs::execALLOC_MEM_REQ(Signal* signal)
AllocMemReq* req = (AllocMemReq*)signal->getDataPtr();
- AsyncFile* file = getIdleFile(true);
+ bool bound = true;
+ AsyncFile* file = getIdleFile(bound);
ndbrequire(file != NULL);
Request *request = theRequestPool->get();
@@ -932,6 +958,7 @@ Ndbfs::execALLOC_MEM_REQ(Signal* signal)
request->par.alloc.requestInfo = req->requestInfo;
request->par.alloc.bytes = (Uint64(req->bytes_hi) << 32) + req->bytes_lo;
request->action = Request::allocmem;
+ request->m_do_bind = bound;
ndbrequire(forward(file, request));
}
@@ -943,7 +970,8 @@ Ndbfs::execBUILD_INDX_IMPL_REQ(Signal* s
jamEntry();
mt_BuildIndxReq * req = (mt_BuildIndxReq*)signal->getDataPtr();
- AsyncFile* file = getIdleFile(true);
+ bool bound = true;
+ AsyncFile* file = getIdleFile(bound);
ndbrequire(file != NULL);
Request *request = theRequestPool->get();
@@ -972,6 +1000,7 @@ Ndbfs::execBUILD_INDX_IMPL_REQ(Signal* s
memcpy(&request->par.build.m_req, req, sizeof(* req));
request->action = Request::buildindx;
+ request->m_do_bind = bound;
ndbrequire(forward(file, request));
}
@@ -1000,8 +1029,8 @@ Ndbfs::newId()
}
AsyncFile*
-Ndbfs::createAsyncFile(bool bound){
-
+Ndbfs::createAsyncFile()
+{
// Check limit of open files
if (m_maxFiles !=0 && theFiles.size() == m_maxFiles)
{
@@ -1024,42 +1053,35 @@ Ndbfs::createAsyncFile(bool bound){
ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
}
- if (bound)
- {
- AsyncIoThread * thr = createIoThread(file);
- theThreads.push_back(thr);
- file->attach(thr);
-
-#ifdef VM_TRACE
- ndbout_c("NDBFS: Created new file thread %d", theFiles.size());
-#endif
- }
-
theFiles.push_back(file);
-
return file;
}
void
Ndbfs::pushIdleFile(AsyncFile* file)
{
- if (file->getThread())
- {
- theIdleBoundFiles.push_back(file);
- }
- else
- {
- theIdleUnboundFiles.push_back(file);
- }
+ assert(file->getThread() == 0);
+ theIdleFiles.push_back(file);
}
AsyncIoThread*
-Ndbfs::createIoThread(AsyncFile* file)
+Ndbfs::createIoThread(bool bound)
{
- AsyncIoThread* thr = new AsyncIoThread(*this, file);
+ AsyncIoThread* thr = new AsyncIoThread(*this, bound);
+ if (thr)
+ {
+#ifdef VM_TRACE
+ ndbout_c("NDBFS: Created new file thread %d", theThreads.size());
+#endif
- struct NdbThread* thrptr = thr->doStart();
- globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
+ struct NdbThread* thrptr = thr->doStart();
+ globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
+
+ if (bound)
+ m_bound_threads_cnt++;
+ else
+ m_unbounds_threads_cnt++;
+ }
return thr;
}
@@ -1067,31 +1089,50 @@ Ndbfs::createIoThread(AsyncFile* file)
AsyncFile*
Ndbfs::getIdleFile(bool bound)
{
- if (bound)
+ AsyncFile* file = 0;
+ Uint32 sz = theIdleFiles.size();
+ if (sz)
{
- Uint32 sz = theIdleBoundFiles.size();
- if (sz)
- {
- AsyncFile* file = theIdleBoundFiles[sz - 1];
- theIdleBoundFiles.erase(sz - 1);
- return file;
- }
+ file = theIdleFiles[sz - 1];
+ theIdleFiles.erase(sz - 1);
}
else
{
- Uint32 sz = theIdleUnboundFiles.size();
- if (sz)
+ file = createAsyncFile();
+ }
+
+ if (bound)
+ {
+ /**
+ * Check if we should create thread
+ */
+ if (m_active_bound_threads_cnt == m_bound_threads_cnt)
{
- AsyncFile* file = theIdleUnboundFiles[sz - 1];
- theIdleUnboundFiles.erase(sz - 1);
- return file;
+ AsyncIoThread * thr = createIoThread(true);
+ if (thr)
+ {
+ theThreads.push_back(thr);
+ }
}
}
-
- return createAsyncFile(bound);
+ return file;
}
-
+void
+Ndbfs::cnt_active_bound(int val)
+{
+ Guard g(&g_active_bound_threads_mutex);
+ if (val < 0)
+ {
+ val = -val;
+ assert(m_active_bound_threads_cnt >= (Uint32)val);
+ m_active_bound_threads_cnt -= val;
+ }
+ else
+ {
+ m_active_bound_threads_cnt += val;
+ }
+}
void
Ndbfs::report(Request * request, Signal* signal)
@@ -1506,10 +1547,13 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
infoEvent("NDBFS: Files: %d Open files: %d",
theFiles.size(),
theOpenFiles.size());
- infoEvent(" Idle files: (bound: %u unbound: %u) Max opened files: %d",
- theIdleBoundFiles.size(),
- theIdleUnboundFiles.size(),
+ infoEvent(" Idle files: %u Max opened files: %d",
+ theIdleFiles.size(),
m_maxOpenedFiles);
+ infoEvent(" Bound Threads: %u (active %u) Unbound threads: %u",
+ m_bound_threads_cnt,
+ m_active_bound_threads_cnt,
+ m_unbounds_threads_cnt);
infoEvent(" Max files: %d",
m_maxFiles);
infoEvent(" Requests: %d",
@@ -1522,7 +1566,10 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
for (unsigned i = 0; i < theOpenFiles.size(); i++){
AsyncFile* file = theOpenFiles.getFile(i);
- infoEvent("%2d (0x%lx): %s", i, (long)file, file->theFileName.c_str());
+ infoEvent("%2d (0x%lx): %s thr: %lx", i,
+ (long)file,
+ file->theFileName.c_str(),
+ (long)file->getThread());
}
return;
}
@@ -1536,18 +1583,14 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
return;
}
if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
- infoEvent("NDBFS: Dump idle files: %d %u",
- theIdleBoundFiles.size(), theIdleUnboundFiles.size());
-
- for (unsigned i = 0; i < theIdleBoundFiles.size(); i++){
- AsyncFile* file = theIdleBoundFiles[i];
- infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
- }
+ infoEvent("NDBFS: Dump idle files: %u",
+ theIdleFiles.size());
- for (unsigned i = 0; i < theIdleUnboundFiles.size(); i++){
- AsyncFile* file = theIdleUnboundFiles[i];
+ for (unsigned i = 0; i < theIdleFiles.size(); i++){
+ AsyncFile* file = theIdleFiles[i];
infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
}
+
return;
}
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp 2011-05-23 10:38:41 +0000
@@ -79,19 +79,19 @@ private:
// Communication from/to files
MemoryChannel<Request> theFromThreads;
- MemoryChannel<Request> theToThreads;
+ MemoryChannel<Request> theToBoundThreads;
+ MemoryChannel<Request> theToUnboundThreads;
Pool<Request>* theRequestPool;
- AsyncIoThread* createIoThread(AsyncFile* file);
- AsyncFile* createAsyncFile(bool bound);
+ AsyncIoThread* createIoThread(bool bound);
+ AsyncFile* createAsyncFile();
AsyncFile* getIdleFile(bool bound);
void pushIdleFile(AsyncFile*);
Vector<AsyncIoThread*> theThreads;// List of all created threads
Vector<AsyncFile*> theFiles; // List all created AsyncFiles
- Vector<AsyncFile*> theIdleBoundFiles; // List of idle AsyncFiles
- Vector<AsyncFile*> theIdleUnboundFiles; // List of idle AsyncFiles
+ Vector<AsyncFile*> theIdleFiles; // List of idle AsyncFiles
OpenFiles theOpenFiles; // List of open AsyncFiles
BaseString m_base_path[FsOpenReq::BP_MAX];
@@ -105,6 +105,11 @@ private:
void readWriteRequest( int action, Signal * signal );
static Uint32 translateErrno(int aErrno);
+
+ Uint32 m_bound_threads_cnt;
+ Uint32 m_unbounds_threads_cnt;
+ Uint32 m_active_bound_threads_cnt;
+ void cnt_active_bound(int val);
public:
const BaseString& get_base_path(Uint32 no) const;
};
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Pool.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Pool.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Pool.hpp 2011-05-23 10:38:41 +0000
@@ -249,6 +249,7 @@ template <class T> inline T* Pool<T>::ge
}
--theTop;
tmp = theList[theTop];
+ tmp->atGet();
return tmp;
}
=== modified file 'storage/ndb/src/ndbapi/NdbIndexStat.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStat.cpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStat.cpp 2011-05-04 14:45:46 +0000
@@ -125,7 +125,7 @@ NdbIndexStat::stat_verify()
const Uint32* entrykey1 = (const Uint32*)&e1 + EntrySize;
const Uint32* entrykey2 = (const Uint32*)&e2 + EntrySize;
int ret = stat_cmpkey(a, entrykey1, e1.m_keylen, entrykey2, e2.m_keylen);
- assert(ret == -1);
+ assert(ret < 0);
}
}
}
@@ -164,7 +164,7 @@ NdbIndexStat::stat_cmpkey(const Area& a,
if (! ah1.isNULL()) {
if (! ah2.isNULL()) {
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(c->m_type);
- ret = (*sqlType.m_cmp)(c->m_cs, &key1[i1], n, &key2[i2], n, true);
+ ret = (*sqlType.m_cmp)(c->m_cs, &key1[i1], n, &key2[i2], n);
if (ret != 0)
break;
} else {
=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp 2011-05-05 11:06:08 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp 2011-05-17 12:47:21 +0000
@@ -1749,11 +1749,9 @@ NdbQueryIndexScanOperationDefImpl::check
const int res=
(*recAttr.compare_function)(recAttr.charset_info,
keyPart1.ptr, keyPart1.len,
- highKeyPart.ptr, highKeyPart.len,
- true);
+ highKeyPart.ptr, highKeyPart.len);
if (res!=0)
{ // Not equal
- assert(res != NdbSqlUtil::CmpUnknown);
return 0;
}
} // if (keyPos == keyEnd ||
=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2011-05-11 13:31:44 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2011-05-17 12:47:21 +0000
@@ -717,10 +717,9 @@ compare_index_row_prefix(const NdbRecord
void *info= col->charset_info;
int res=
- (*col->compare_function)(info, ptr1, maxSize, ptr2, maxSize, true);
+ (*col->compare_function)(info, ptr1, maxSize, ptr2, maxSize);
if (res)
{
- assert(res != NdbSqlUtil::CmpUnknown);
return res;
}
}
@@ -3593,10 +3592,9 @@ int compare_ndbrecord(const NdbReceiver
void *info= result_col->charset_info;
int res=
(*result_col->compare_function)
- (info, a_ptr, maxSize, b_ptr, maxSize, true);
+ (info, a_ptr, maxSize, b_ptr, maxSize);
if (res)
{
- assert(res != NdbSqlUtil::CmpUnknown);
return res * jdir;
}
}
=== modified file 'storage/ndb/test/run-test/conf-blade08.cnf'
--- a/storage/ndb/test/run-test/conf-blade08.cnf 2011-04-20 09:18:09 +0000
+++ b/storage/ndb/test/run-test/conf-blade08.cnf 2011-05-19 17:47:28 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
DataMemory = 300M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
NoOfFragmentLogFiles = 4
FragmentLogFileSize = 64M
=== modified file 'storage/ndb/test/run-test/conf-dl145a.cnf'
--- a/storage/ndb/test/run-test/conf-dl145a.cnf 2011-02-19 10:31:42 +0000
+++ b/storage/ndb/test/run-test/conf-dl145a.cnf 2011-05-19 18:19:47 +0000
@@ -20,7 +20,7 @@ IndexMemory = 100M
DataMemory = 300M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
NoOfFragmentLogFiles = 4
FragmentLogFileSize = 64M
=== modified file 'storage/ndb/test/run-test/conf-fimafeng08.cnf'
--- a/storage/ndb/test/run-test/conf-fimafeng08.cnf 2010-02-02 13:44:41 +0000
+++ b/storage/ndb/test/run-test/conf-fimafeng08.cnf 2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
DataMemory = 300M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
RedoBuffer = 32M
=== modified file 'storage/ndb/test/run-test/conf-fimafeng09.cnf' (properties changed: +x to -x)
--- a/storage/ndb/test/run-test/conf-fimafeng09.cnf 2010-02-02 13:44:41 +0000
+++ b/storage/ndb/test/run-test/conf-fimafeng09.cnf 2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
DataMemory = 300M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
RedoBuffer = 32M
=== modified file 'storage/ndb/test/run-test/conf-loki27.cnf' (properties changed: +x to -x)
--- a/storage/ndb/test/run-test/conf-loki27.cnf 2010-06-15 15:02:16 +0000
+++ b/storage/ndb/test/run-test/conf-loki27.cnf 2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
DataMemory = 300M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
NoOfFragmentLogFiles = 4
FragmentLogFileSize = 64M
=== modified file 'storage/ndb/test/run-test/conf-ndb07.cnf'
--- a/storage/ndb/test/run-test/conf-ndb07.cnf 2011-02-19 10:31:42 +0000
+++ b/storage/ndb/test/run-test/conf-ndb07.cnf 2011-05-19 18:19:47 +0000
@@ -27,7 +27,7 @@ IndexMemory = 100M
DataMemory = 500M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
NoOfFragmentLogFiles = 8
FragmentLogFileSize = 64M
ODirect=1
=== modified file 'storage/ndb/test/run-test/conf-ndbmaster.cnf'
--- a/storage/ndb/test/run-test/conf-ndbmaster.cnf 2009-02-17 09:26:44 +0000
+++ b/storage/ndb/test/run-test/conf-ndbmaster.cnf 2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
DataMemory = 300M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
SharedGlobalMemory=256M
=== modified file 'storage/ndb/test/run-test/conf-repl.cnf'
--- a/storage/ndb/test/run-test/conf-repl.cnf 2007-02-13 01:38:54 +0000
+++ b/storage/ndb/test/run-test/conf-repl.cnf 2011-05-19 17:47:28 +0000
@@ -11,7 +11,7 @@ skip-innodb
skip-bdb
[cluster_config]
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
DataMemory = 100M
[cluster_config.master]
=== modified file 'storage/ndb/test/run-test/conf-techra29.cnf' (properties changed: +x to -x)
--- a/storage/ndb/test/run-test/conf-techra29.cnf 2010-02-02 13:44:41 +0000
+++ b/storage/ndb/test/run-test/conf-techra29.cnf 2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
DataMemory = 300M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
RedoBuffer = 32M
=== modified file 'storage/ndb/test/run-test/conf-test.cnf'
--- a/storage/ndb/test/run-test/conf-test.cnf 2007-11-15 07:57:00 +0000
+++ b/storage/ndb/test/run-test/conf-test.cnf 2011-05-19 17:47:28 +0000
@@ -19,7 +19,7 @@ IndexMemory = 25M
DataMemory = 100M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
NoOfFragmentLogFiles = 4
FragmentLogFileSize = 64M
=== modified file 'storage/ndb/test/run-test/conf-tyr64.cnf' (properties changed: +x to -x)
--- a/storage/ndb/test/run-test/conf-tyr64.cnf 2010-02-02 13:44:41 +0000
+++ b/storage/ndb/test/run-test/conf-tyr64.cnf 2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
DataMemory = 300M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
RedoBuffer = 32M
=== modified file 'storage/ndb/test/run-test/conf-upgrade.cnf'
--- a/storage/ndb/test/run-test/conf-upgrade.cnf 2009-06-23 18:40:35 +0000
+++ b/storage/ndb/test/run-test/conf-upgrade.cnf 2011-05-19 17:47:28 +0000
@@ -26,7 +26,7 @@ IndexMemory = 50M
DataMemory = 100M
BackupMemory = 64M
MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
SendBufferMemory = 2M
NoOfFragmentLogFiles = 4
FragmentLogFileSize = 64M
No bundle (reason: revision is a merge (you can force generation of a bundle with env var BZR_FORCE_BUNDLE=1)).
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.1 branch (magnus.blaudd:4212) | magnus.blaudd | 23 May |