List:Commits« Previous MessageNext Message »
From:magnus.blaudd Date:May 23 2011 2:13pm
Subject:bzr commit into mysql-5.1-telco-7.1 branch (magnus.blaudd:4212)
View as plain text  
#At file:///home/msvensson/mysql/tmp/mLgrDbtgG8/7.1/ based on revid:craig.russell@stripped

 4212 magnus.blaudd@stripped	2011-05-23 [merge]
      Merge 7.0 -> 7.1

    added:
      mysql-test/suite/ndb/r/ndb_dd_bug12581213.result
      mysql-test/suite/ndb/t/ndb_dd_bug12581213.cnf
      mysql-test/suite/ndb/t/ndb_dd_bug12581213.test
      storage/ndb/include/util/NdbPack.hpp
      storage/ndb/src/common/util/NdbPack.cpp
    modified:
      storage/ndb/CMakeLists.txt
      storage/ndb/include/util/NdbSqlUtil.hpp
      storage/ndb/src/common/util/CMakeLists.txt
      storage/ndb/src/common/util/Makefile.am
      storage/ndb/src/common/util/NdbSqlUtil.cpp
      storage/ndb/src/kernel/blocks/ERROR_codes.txt
      storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
      storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp
      storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp
      storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp
      storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
      storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp
      storage/ndb/src/kernel/blocks/ndbfs/Pool.hpp
      storage/ndb/src/ndbapi/NdbIndexStat.cpp
      storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
      storage/ndb/test/run-test/conf-blade08.cnf
      storage/ndb/test/run-test/conf-dl145a.cnf
      storage/ndb/test/run-test/conf-fimafeng08.cnf
      storage/ndb/test/run-test/conf-fimafeng09.cnf*
      storage/ndb/test/run-test/conf-loki27.cnf*
      storage/ndb/test/run-test/conf-ndb07.cnf
      storage/ndb/test/run-test/conf-ndbmaster.cnf
      storage/ndb/test/run-test/conf-repl.cnf
      storage/ndb/test/run-test/conf-techra29.cnf*
      storage/ndb/test/run-test/conf-test.cnf
      storage/ndb/test/run-test/conf-tyr64.cnf*
      storage/ndb/test/run-test/conf-upgrade.cnf
=== added file 'mysql-test/suite/ndb/r/ndb_dd_bug12581213.result'
--- a/mysql-test/suite/ndb/r/ndb_dd_bug12581213.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_dd_bug12581213.result	2011-05-23 10:38:41 +0000
@@ -0,0 +1,17 @@
+CREATE LOGFILE GROUP lg1
+ADD UNDOFILE 'undofile.dat'
+INITIAL_SIZE 16M
+UNDO_BUFFER_SIZE = 1M
+ENGINE NDB;
+CREATE TABLESPACE ts1
+ADD DATAFILE 'datafile.dat'
+USE LOGFILE GROUP lg1
+INITIAL_SIZE 12M
+ENGINE NDB;
+alter tablespace ts1
+drop datafile 'datafile.dat'
+engine ndb;
+drop tablespace ts1
+engine ndb;
+drop logfile group lg1
+engine ndb;

=== added file 'mysql-test/suite/ndb/t/ndb_dd_bug12581213.cnf'
--- a/mysql-test/suite/ndb/t/ndb_dd_bug12581213.cnf	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_dd_bug12581213.cnf	2011-05-23 10:38:41 +0000
@@ -0,0 +1,7 @@
+!include suite/ndb/my.cnf
+
+[cluster_config.1]
+ndbd=
+NoOfReplicas=1
+MaxNoOfOpenFiles=27
+InitialNoOfOpenFiles=26

=== added file 'mysql-test/suite/ndb/t/ndb_dd_bug12581213.test'
--- a/mysql-test/suite/ndb/t/ndb_dd_bug12581213.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_dd_bug12581213.test	2011-05-23 10:38:41 +0000
@@ -0,0 +1,23 @@
+-- source include/have_ndb.inc
+
+CREATE LOGFILE GROUP lg1
+ADD UNDOFILE 'undofile.dat'
+INITIAL_SIZE 16M
+UNDO_BUFFER_SIZE = 1M
+ENGINE NDB;
+
+CREATE TABLESPACE ts1
+ADD DATAFILE 'datafile.dat'
+USE LOGFILE GROUP lg1
+INITIAL_SIZE 12M
+ENGINE NDB;
+
+alter tablespace ts1
+drop datafile 'datafile.dat'
+engine ndb;
+
+drop tablespace ts1
+engine ndb;
+
+drop logfile group lg1
+engine ndb;

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2011-05-09 14:01:14 +0000
+++ b/storage/ndb/CMakeLists.txt	2011-05-23 14:13:35 +0000
@@ -17,6 +17,8 @@
 SET(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH}
     ${CMAKE_SOURCE_DIR}/cmake
     ${CMAKE_SOURCE_DIR}/storage/ndb/cmake)
+
+MESSAGE(STATUS "Using cmake version ${CMAKE_VERSION}")
     
 # Check if this is MySQL Cluster build i.e the MySQL Server
 # version string ends in -ndb-Y.Y.Y[-status]    

=== added file 'storage/ndb/include/util/NdbPack.hpp'
--- a/storage/ndb/include/util/NdbPack.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/NdbPack.hpp	2011-05-04 09:44:18 +0000
@@ -0,0 +1,857 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_PACK_HPP
+#define NDB_PACK_HPP
+
+#include <ndb_global.h>
+#include <my_sys.h>
+#include <kernel/AttributeHeader.hpp>
+#include <NdbSqlUtil.hpp>
+#include <NdbEnv.h>
+class NdbOut;
+
+/*
+ * Pack an array of NDB data values.  The types are specified by an
+ * array of data types.  There is no associated table or attribute ids.
+ * All or an initial sequence of the specified values are present.
+ *
+ * Currently used for ordered index keys and bounds in kernel (DBTUX)
+ * and in index statistics (mysqld).  The comparison methods use the
+ * primitive type comparisons from NdbSqlUtil.
+ *
+ * Keys and bounds use same spec.  However a value in an index bound can
+ * be NULL even if the key attribute is not nullable.  Therefore bounds
+ * set the "allNullable" property and have a longer null mask.
+ *
+ * There are two distinct use occasions: 1) construction of data or
+ * bound 2) operating on previously constructed data or bound.  There
+ * are classes Data/DataC and Bound/BoundC for these uses.  The latter
+ * often can return a result without interpreting the full value.
+ *
+ * Methods return -1 on error and 0 on success.  Comparison methods
+ * assume well-formed data and return negative, zero, positive for less,
+ * equal, greater.
+ */
+
+class NdbPack {
+public:
+  class Endian;
+  class Type;
+  class Spec;
+  class Iter;
+  class DataC;
+  class Data;
+  class BoundC;
+  class Bound;
+
+  /*
+   * Get SQL type.
+   */
+  static const NdbSqlUtil::Type& getSqlType(Uint32 typeId);
+
+  /*
+   * Error codes for core dumps.
+   */
+  class Error {
+  public:
+    enum {
+      TypeNotSet = -101,          // type id was not set
+      TypeOutOfRange = -102,      // type id is out of range
+      TypeNotSupported = -103,    // blob (and for now bit) types
+      TypeSizeZero = -104,        // max size was set to zero
+      TypeFixSizeInvalid = -105,  // fixed size specified wrong
+      TypeNullableNotBool = -106, // nullable must be 0 or 1
+      CharsetNotSpecified = -107, // char type with no charset number
+      CharsetNotFound = -108,     // cannot install in all_charsets[]
+      CharsetNotAllowed = -109,   // non-char type with charset
+      SpecBufOverflow = -201,     // more spec items than allocated
+      DataCntOverflow = -301,     // more data items than in spec
+      DataBufOverflow = -302,     // more data bytes than allocated
+      DataValueOverflow = -303,   // var length exceeds max size
+      DataNotNullable = -304,     // NULL value to not-nullable type
+      InvalidAttrInfo = -305,     // invalid plain old attr info
+      BoundEmptySide = -401,      // side not 0 for empty bound
+      BoundNonemptySide = -402,   // side not -1,+1 for non-empty bound
+      InternalError = -901,
+      ValidationError = -902,
+      NoError = 0
+    };
+    Error();
+    ~Error() {}
+    int get_error_code() const;
+    int get_error_line() const;
+
+  private:
+    friend class Endian;
+    friend class Type;
+    friend class Spec;
+    friend class Iter;
+    friend class DataC;
+    friend class Data;
+    friend class BoundC;
+    friend class Bound;
+    void set_error(int code, int line) const;
+    void set_error(const Error& e2) const;
+    mutable int m_error_code;
+    mutable int m_error_line;
+  };
+
+  /*
+   * Endian definitions.
+   */
+  class Endian {
+  public:
+    enum Value {
+      Native = 0,
+      Little = 1,
+      Big = 2
+    };
+    static Value get_endian();
+    static void convert(void* ptr, Uint32 len);
+  };
+
+  /*
+   * Data type.
+   */
+  class Type : public Error {
+  public:
+    Type();
+    Type(int typeId, Uint32 byteSize, bool nullable, Uint32 csNumber);
+    ~Type() {}
+    /*
+     * Define the type.  Size is fixed or max size.  Values of variable
+     * length have length bytes.  The definition is verified when the
+     * type is added to the specification.  This also installs missing
+     * CHARSET_INFO* into all_charsets[].
+     */
+    void set(Uint32 typeId, Uint32 byteSize, bool nullable, Uint32 csNumber);
+    // getters
+    Uint32 get_type_id() const;
+    Uint32 get_byte_size() const;
+    bool get_nullable() const;
+    Uint32 get_cs_number() const;
+    Uint32 get_array_type() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const Type&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    friend class Spec;
+    friend class Iter;
+    friend class DataC;
+    friend class Data;
+    // verify and complete when added to specification
+    int complete();
+    Uint16 m_typeId;
+    Uint16 m_byteSize;    // fixed or max size in bytes
+    Uint16 m_nullable;
+    Uint16 m_csNumber;
+    Uint16 m_arrayType;   // 0,1,2 length bytes
+    Uint16 m_nullbitPos;  // computed as part of Spec
+  };
+
+  /*
+   * Data specification i.e. array of types.  Usually constructed on the
+   * heap, so keep fairly small.  Used for boths keys and bounds.
+   */
+  class Spec : public Error {
+  public:
+    Spec();
+    ~Spec() {}
+    // set initial buffer (calls reset)
+    void set_buf(Type* buf, Uint32 bufMaxCnt);
+    // use if buffer is relocated
+    void set_buf(Type* buf);
+    // reset but keep buffer
+    void reset();
+    // add type to specification once or number of times
+    int add(Type type);
+    int add(Type type, Uint32 cnt);
+    // copy from
+    void copy(const Spec& s2);
+    // getters (bounds set allNullable)
+    const Type& get_type(Uint32 i) const;
+    Uint32 get_cnt() const;
+    Uint32 get_nullable_cnt(bool allNullable) const;
+    Uint32 get_nullmask_len(bool allNullable) const;
+    // max data length including null mask
+    Uint32 get_max_data_len(bool allNullable) const;
+    // minimum var bytes (if used by Data instance)
+    Uint32 get_min_var_bytes(bool allNullable) const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const Spec&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    friend class Iter;
+    friend class DataC;
+    friend class Data;
+    friend class BoundC;
+    // undefined
+    Spec(const Spec&);
+    Spec& operator=(const Spec&);
+    Type* m_buf;
+    Uint16 m_bufMaxCnt;
+    Uint16 m_cnt;
+    Uint16 m_nullableCnt;
+    Uint16 m_varsizeCnt;
+    Uint32 m_maxByteSize; // excludes null mask
+  };
+
+  /*
+   * Iterator over data items.  DataC uses external Iter instances in
+   * comparison methods etc.  Data contains an Iter instance which
+   * iterates on items added.
+   */
+  class Iter : public Error {
+  public:
+    // the data instance is only used to set metadata
+    Iter(const DataC& data);
+    ~Iter() {}
+    void reset();
+
+  private:
+    friend class DataC;
+    friend class Data;
+    friend class BoundC;
+    // undefined
+    Iter(const Iter&);
+    Iter& operator=(const Iter&);
+    // describe next non-null or null item and advance iterator
+    int desc(const Uint8* item);
+    int desc_null();
+    // compare current items (DataC buffers are passed)
+    int cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const;
+
+    const Spec& m_spec;
+    const bool m_allNullable;
+    // iterator
+    Uint32 m_itemPos;     // position of current item in DataC buffer
+    Uint32 m_cnt;         // number of items described so far
+    Uint32 m_nullCnt;
+    // current item
+    Uint32 m_lenBytes;    // 0-2
+    Uint32 m_bareLen;     // excludes length bytes
+    Uint32 m_itemLen;     // full length, value zero means null
+  };
+
+  /*
+   * Read-only superclass of Data.  Initialized from a previously
+   * constructed Data buffer (any var bytes skipped).  Methods interpret
+   * one data item at a time.  Values are native endian.
+   */
+  class DataC : public Error {
+  public:
+    DataC(const Spec& spec, bool allNullable);
+    // set buffer to previously constructed one with given item count
+    void set_buf(const void* buf, Uint32 bufMaxLen, Uint32 cnt);
+    // interpret next data item
+    int desc(Iter& r) const;
+    // compare cnt attrs and also return number of initial equal attrs
+    int cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const;
+    // getters
+    const Spec& get_spec() const;
+    bool get_all_nullable() const;
+    const void* get_data_buf() const;
+    Uint32 get_cnt() const;
+    bool is_empty() const;
+    bool is_full() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const DataC&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz, bool convert_flag = false) const;
+    int validate() const { return 0; }
+
+  private:
+    friend class Iter;
+    friend class Data;
+    friend class BoundC;
+    // undefined
+    DataC(const Data&);
+    DataC& operator=(const DataC&);
+    const Spec& m_spec;
+    const bool m_allNullable;
+    const Uint8* m_buf;
+    Uint32 m_bufMaxLen;
+    // can be updated as part of Data instance
+    Uint32 m_cnt;
+  };
+
+  /*
+   * Instance of an array of data values.  The values are packed into
+   * a byte buffer.  The buffer is also maintained as a single varbinary
+   * value if non-zero var bytes (length bytes) is specified.
+   */
+  class Data : public DataC {
+  public:
+    Data(const Spec& spec, bool allNullable, Uint32 varBytes);
+    // set buffer (calls reset)
+    void set_buf(void* buf, Uint32 bufMaxLen);
+    // reset but keep buffer (header is zeroed)
+    void reset();
+    // add non-null data items and return length in bytes
+    int add(const void* data, Uint32* len_out);
+    int add(const void* data, Uint32 cnt, Uint32* len_out);
+    // add null data items and return length 0 bytes
+    int add_null(Uint32* len_out);
+    int add_null(Uint32 cnt, Uint32* len_out);
+    // add from "plain old attr info"
+    int add_poai(const Uint32* poai, Uint32* len_out);
+    int add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out);
+    // call this before first use
+    int finalize();
+    // copy from
+    int copy(const DataC& d2);
+    // convert endian
+    int convert(Endian::Value to_endian);
+    // create complete instance from buffer contents
+    int desc_all(Uint32 cnt);
+    // getters
+    Uint32 get_max_len() const;
+    Uint32 get_max_len4() const;
+    Uint32 get_var_bytes() const;
+    const void* get_full_buf() const;
+    Uint32 get_full_len() const;
+    Uint32 get_data_len() const;
+    Uint32 get_null_cnt() const;
+    Endian::Value get_endian() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const Data&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    friend class Iter;
+    friend class Bound;
+    // undefined
+    Data(const Data&);
+    Data& operator=(const Data&);
+    int finalize_impl();
+    int convert_impl(Endian::Value to_endian);
+    const Uint32 m_varBytes;
+    Uint8* m_buf;
+    Uint32 m_bufMaxLen;
+    Endian::Value m_endian; // Native until finalize()
+    // iterator on items added
+    Iter m_iter;
+  };
+
+  /*
+   * Read-only superclass of BoundC, analogous to DataC.  Initialized
+   * from a previously constructed Bound or DataC buffer.
+   */
+  class BoundC : public Error {
+  public:
+    BoundC(DataC& data);
+    ~BoundC() {}
+    // call this before first use
+    int finalize(int side);
+    // compare bound to key (may return 0 if bound is longer)
+    int cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const;
+    // compare bounds (may return 0 if cnt is less than min length)
+    int cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const;
+    // getters
+    DataC& get_data() const;
+    int get_side() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const BoundC&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    friend class Bound;
+    // undefined
+    BoundC(const BoundC&);
+    BoundC& operator=(const BoundC&);
+    DataC& m_data;
+    int m_side;
+  };
+
+  /*
+   * Ordered index range bound consists of a partial key and a "side".
+   * The partial key is a Data instance where some initial number of
+   * values are present.  It is defined separately by the caller and
+   * passed to Bound ctor by reference.
+   */
+  class Bound : public BoundC {
+  public:
+    Bound(Data& data);
+    ~Bound() {}
+    void reset();
+    // call this before first use
+    int finalize(int side);
+    // getters
+    Data& get_data() const;
+    // print
+    friend NdbOut& operator<<(NdbOut&, const Bound&);
+    void print(NdbOut& out) const;
+    const char* print(char* buf, Uint32 bufsz) const;
+    int validate() const;
+
+  private:
+    // undefined
+    Bound(const Bound&);
+    Bound& operator=(const Bound&);
+    Data& m_data;
+  };
+
+  /*
+   * Helper for print() methods.
+   */
+  struct Print {
+  private:
+    friend class Endian;
+    friend class Type;
+    friend class Spec;
+    friend class Iter;
+    friend class DataC;
+    friend class Data;
+    friend class BoundC;
+    friend class Bound;
+    Print(char* buf, Uint32 bufsz);
+    void print(const char* frm, ...);
+    char* m_buf;
+    Uint32 m_bufsz;
+    Uint32 m_sz;
+  };
+};
+
+// NdbPack
+
+inline const NdbSqlUtil::Type&
+NdbPack::getSqlType(Uint32 typeId)
+{
+  return NdbSqlUtil::m_typeList[typeId];
+}
+
+// NdbPack::Error
+
+inline
+NdbPack::Error::Error()
+{
+  m_error_code = 0;
+  m_error_line = 0;
+}
+
+// NdbPack::Endian
+
+inline NdbPack::Endian::Value
+NdbPack::Endian::get_endian()
+{
+#ifndef WORDS_BIGENDIAN
+  return Little;
+#else
+  return Big;
+#endif
+}
+
+// NdbPack::Type
+
+inline
+NdbPack::Type::Type()
+{
+  m_typeId = NDB_TYPE_UNDEFINED;
+  m_byteSize = 0;
+  m_nullable = true;
+  m_csNumber = 0;
+  m_arrayType = 0;
+  m_nullbitPos = 0;
+}
+
+inline
+NdbPack::Type::Type(int typeId, Uint32 byteSize, bool nullable, Uint32 csNumber)
+{
+  set(typeId, byteSize, nullable, csNumber);
+}
+
+inline void
+NdbPack::Type::set(Uint32 typeId, Uint32 byteSize, bool nullable, Uint32 csNumber)
+{
+  m_typeId = typeId;
+  m_byteSize = byteSize;
+  m_nullable = nullable;
+  m_csNumber = csNumber;
+}
+
+inline Uint32
+NdbPack::Type::get_type_id() const
+{
+  return m_typeId;
+}
+
+inline Uint32
+NdbPack::Type::get_byte_size() const
+{
+  return m_byteSize;
+}
+
+inline bool
+NdbPack::Type::get_nullable() const
+{
+  return (bool)m_nullable;
+}
+
+inline Uint32
+NdbPack::Type::get_cs_number() const
+{
+  return m_csNumber;
+}
+
+inline Uint32
+NdbPack::Type::get_array_type() const
+{
+  return m_arrayType;
+}
+
+// NdbPack::Spec
+
+inline
+NdbPack::Spec::Spec()
+{
+  reset();
+  m_buf = 0;
+  m_bufMaxCnt = 0;
+}
+
+inline void
+NdbPack::Spec::set_buf(Type* buf, Uint32 bufMaxCnt)
+{
+  reset();
+  m_buf = buf;
+  m_bufMaxCnt = bufMaxCnt;
+}
+
+inline void
+NdbPack::Spec::set_buf(Type* buf)
+{
+  m_buf = buf;
+}
+
+inline void
+NdbPack::Spec::reset()
+{
+  m_cnt = 0;
+  m_nullableCnt = 0;
+  m_varsizeCnt = 0;
+  m_maxByteSize = 0;
+}
+
+inline const NdbPack::Type&
+NdbPack::Spec::get_type(Uint32 i) const
+{
+  assert(i < m_cnt);
+  return m_buf[i];
+}
+
+inline Uint32
+NdbPack::Spec::get_cnt() const
+{
+  return m_cnt;
+}
+
+inline Uint32
+NdbPack::Spec::get_nullable_cnt(bool allNullable) const
+{
+  if (!allNullable)
+    return m_nullableCnt;
+  else
+    return m_cnt;
+}
+
+inline Uint32
+NdbPack::Spec::get_nullmask_len(bool allNullable) const
+{
+  return (get_nullable_cnt(allNullable) + 7) / 8;
+}
+
+inline Uint32
+NdbPack::Spec::get_max_data_len(bool allNullable) const
+{
+  return get_nullmask_len(allNullable) + m_maxByteSize;
+}
+
+inline Uint32
+NdbPack::Spec::get_min_var_bytes(bool allNullable) const
+{
+  const Uint32 len = get_max_data_len(allNullable);
+  return (len < 256 ? 1 : 2);
+}
+
+// NdbPack::Iter
+
+inline
+NdbPack::Iter::Iter(const DataC& data) :
+  m_spec(data.m_spec),
+  m_allNullable(data.m_allNullable)
+{
+  reset();
+}
+
+inline void
+NdbPack::Iter::reset()
+{
+  m_itemPos = m_spec.get_nullmask_len(m_allNullable);
+  m_cnt = 0;
+  m_nullCnt = 0;
+  m_lenBytes = 0;
+  m_bareLen = 0;
+  m_itemLen = 0;
+}
+
+// NdbPack::DataC
+
+inline
+NdbPack::DataC::DataC(const Spec& spec, bool allNullable) :
+  m_spec(spec),
+  m_allNullable(allNullable)
+{
+  m_buf = 0;
+  m_bufMaxLen = 0;
+  m_cnt = 0;
+}
+
+inline void
+NdbPack::DataC::set_buf(const void* buf, Uint32 bufMaxLen, Uint32 cnt)
+{
+  m_buf = static_cast<const Uint8*>(buf);
+  m_bufMaxLen = bufMaxLen;
+  m_cnt = cnt;
+}
+
+inline const NdbPack::Spec&
+NdbPack::DataC::get_spec() const
+{
+  return m_spec;
+}
+
+inline bool
+NdbPack::DataC::get_all_nullable() const
+{
+  return &m_allNullable;
+}
+
+inline const void*
+NdbPack::DataC::get_data_buf() const
+{
+  return &m_buf[0];
+}
+
+inline Uint32
+NdbPack::DataC::get_cnt() const
+{
+  return m_cnt;
+}
+
+inline bool
+NdbPack::DataC::is_empty() const
+{
+  return m_cnt == 0;
+}
+
+inline bool
+NdbPack::DataC::is_full() const
+{
+  return m_cnt == m_spec.m_cnt;
+}
+
+// NdbPack::Data
+
+inline
+NdbPack::Data::Data(const Spec& spec, bool allNullable, Uint32 varBytes) :
+  DataC(spec, allNullable),
+  m_varBytes(varBytes),
+  m_iter(*this)
+{
+  m_buf = 0;
+  m_bufMaxLen = 0;
+  m_endian = Endian::Native;
+}
+
+inline void
+NdbPack::Data::set_buf(void* buf, Uint32 bufMaxLen)
+{
+  m_buf = static_cast<Uint8*>(buf);
+  m_bufMaxLen = bufMaxLen;
+  reset();
+  assert(bufMaxLen >= m_varBytes);
+  DataC::set_buf(&m_buf[m_varBytes], m_bufMaxLen - m_varBytes, 0);
+}
+
+inline void
+NdbPack::Data::reset()
+{
+  m_cnt = 0;  // in DataC
+  const Uint32 bytes = m_varBytes + m_spec.get_nullmask_len(m_allNullable);
+  memset(m_buf, 0, bytes);
+  m_endian = Endian::Native;
+  m_iter.reset();
+}
+
+inline int
+NdbPack::Data::finalize()
+{
+  if (m_varBytes == 0 ||
+      finalize_impl() == 0)
+  {
+    m_endian = Endian::get_endian();
+    return 0;
+  }
+  return -1;
+}
+
+inline int
+NdbPack::Data::convert(Endian::Value to_endian)
+{
+  if (unlikely(to_endian == Endian::Native))
+    to_endian = Endian::get_endian();
+  if (m_endian == to_endian)
+    return 0;
+  if (convert_impl(to_endian) == 0)
+  {
+    m_endian = to_endian;
+    return 0;
+  }
+  return -1;
+}
+
+inline Uint32
+NdbPack::Data::get_max_len() const
+{
+  return m_varBytes + m_spec.get_max_data_len(m_allNullable);
+}
+
+inline Uint32
+NdbPack::Data::get_max_len4() const
+{
+  Uint32 len4 = get_max_len();
+  len4 += 3;
+  len4 /= 4;
+  len4 *= 4;
+  return len4;
+}
+
+inline Uint32
+NdbPack::Data::get_var_bytes() const
+{
+  return m_varBytes;
+}
+
+inline const void*
+NdbPack::Data::get_full_buf() const
+{
+  return &m_buf[0];
+}
+
+inline Uint32
+NdbPack::Data::get_full_len() const
+{
+  return m_varBytes + m_iter.m_itemPos + m_iter.m_itemLen;
+}
+
+inline Uint32
+NdbPack::Data::get_data_len() const
+{
+  return m_iter.m_itemPos + m_iter.m_itemLen;
+}
+
+inline Uint32
+NdbPack::Data::get_null_cnt() const
+{
+  return m_iter.m_nullCnt;
+}
+
+inline NdbPack::Endian::Value
+NdbPack::Data::get_endian() const
+{
+  return m_endian;
+}
+
+// NdbPack::BoundC
+
+inline
+NdbPack::BoundC::BoundC(DataC& data) :
+  m_data(data)
+{
+  m_side = 0;
+}
+
+inline int
+NdbPack::BoundC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
+{
+  const BoundC& b1 = *this;
+  const DataC& d1 = b1.m_data;
+  int res = d1.cmp(d2, cnt, num_eq);
+  if (res == 0 && d1.m_cnt <= d2.m_cnt)
+    res = b1.m_side;
+  return res;
+}
+
+inline NdbPack::DataC&
+NdbPack::BoundC::get_data() const
+{
+  return m_data;
+}
+
+inline int
+NdbPack::BoundC::get_side() const
+{
+  return m_side;
+}
+
+// NdbPack::Bound
+
+inline
+NdbPack::Bound::Bound(Data& data) :
+  BoundC(data),
+  m_data(data)
+{
+}
+
+inline void
+NdbPack::Bound::reset()
+{
+  m_data.reset();
+  m_side = 0;
+}
+
+inline int
+NdbPack::Bound::finalize(int side)
+{
+  if (m_data.finalize() == -1)
+  {
+    set_error(m_data);
+    return -1;
+  }
+  if (BoundC::finalize(side) == -1)
+    return -1;
+  return 0;
+}
+
+inline NdbPack::Data&
+NdbPack::Bound::get_data() const
+{
+  return m_data;
+}
+
+#endif // NDB_PACK_HPP

=== modified file 'storage/ndb/include/util/NdbSqlUtil.hpp'
--- a/storage/ndb/include/util/NdbSqlUtil.hpp	2011-02-19 03:13:04 +0000
+++ b/storage/ndb/include/util/NdbSqlUtil.hpp	2011-05-04 15:10:05 +0000
@@ -37,23 +37,18 @@ typedef struct charset_info_st CHARSET_I
 class NdbSqlUtil {
 public:
   /**
-   * Compare attribute values.  Returns -1, 0, +1 for less, equal,
-   * greater, respectively.  Parameters are pointers to values and their
-   * lengths in bytes.  The lengths can differ.
+   * Compare attribute values.  Returns negative, zero, positive for
+   * less, equal, greater.  We trust DBTUP to validate all data and
+   * mysql upgrade to not invalidate them.  Bad values (such as NaN)
+   * causing undefined results crash here always (require, not assert)
+   * since they are likely to cause a more obscure crash in DBTUX.
+   * wl4163_todo: API probably should not crash.
    *
-   * First value is a full value but second value can be partial.  If
-   * the partial value is not enough to determine the result, CmpUnknown
-   * will be returned.  A shorter second value is not necessarily
-   * partial.  Partial values are allowed only for types where prefix
-   * comparison is possible (basically, binary strings).
-   *
-   * First parameter is a pointer to type specific extra info.  Char
-   * types receive CHARSET_INFO in it.
-   *
-   * If a value cannot be parsed, it compares like NULL i.e. less than
-   * any valid value.
+   * Parameters are pointers to values (no alignment requirements) and
+   * their lengths in bytes.  First parameter is a pointer to type
+   * specific extra info.  Char types receive CHARSET_INFO in it.
    */
-  typedef int Cmp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full);
+  typedef int Cmp(const void* info, const void* p1, uint n1, const void* p2, uint n2);
 
   /**
    * Prototype for "like" comparison.  Defined for string types.  First
@@ -73,13 +68,6 @@ public:
    */
   typedef int AndMask(const void* data, unsigned dataLen, const void* mask, unsigned maskLen, bool cmpZero); 
 
-  enum CmpResult {
-    CmpLess = -1,
-    CmpEqual = 0,
-    CmpGreater = 1,
-    CmpUnknown = 2      // insufficient partial data
-  };
-
   struct Type {
     enum Enum {
       Undefined = NDB_TYPE_UNDEFINED,
@@ -126,13 +114,6 @@ public:
   static const Type& getType(Uint32 typeId);
 
   /**
-   * Get the normalized type used in hashing and key comparisons.
-   * Maps all string types to Binary.  This includes Var* strings
-   * because strxfrm result is padded to fixed (maximum) length.
-   */
-  static const Type& getTypeBinary(Uint32 typeId);
-
-  /**
    * Check character set.
    */
   static uint check_column_for_pk(Uint32 typeId, const void* info);
@@ -159,11 +140,6 @@ public:
                              const uchar *src, size_t srclen);
 
   /**
-   * Compare decimal numbers.
-   */
-  static int cmp_olddecimal(const uchar* s1, const uchar* s2, unsigned n);
-
-  /**
    * Convert attribute data to/from network byte order
    * This method converts the passed data of the passed type
    * between host and network byte order.
@@ -177,6 +153,7 @@ public:
                                Uint32 dataByteSize);
 
 private:
+  friend class NdbPack;
   /**
    * List of all types.  Must match Type::Enum.
    */

=== modified file 'storage/ndb/src/common/util/CMakeLists.txt'
--- a/storage/ndb/src/common/util/CMakeLists.txt	2011-04-01 11:46:04 +0000
+++ b/storage/ndb/src/common/util/CMakeLists.txt	2011-05-09 15:35:25 +0000
@@ -54,6 +54,7 @@ ADD_LIBRARY(ndbgeneral STATIC
 	    SparseBitmask.cpp
             require.c
             Vector.cpp
+            NdbPack.cpp
 )
 TARGET_LINK_LIBRARIES(ndbgeneral ndbtrace ${ZLIB_LIBRARY} mysys)
 
@@ -82,3 +83,8 @@ SET_TARGET_PROPERTIES(ndb_version-t
                       PROPERTIES COMPILE_FLAGS "-DTEST_VERSION")
 TARGET_LINK_LIBRARIES(ndb_version-t ndbgeneral)
 
+ADD_EXECUTABLE(NdbPack-t NdbPack.cpp)
+SET_TARGET_PROPERTIES(NdbPack-t
+                      PROPERTIES COMPILE_FLAGS "-DTEST_NDB_PACK")
+TARGET_LINK_LIBRARIES(NdbPack-t ndbgeneral ndbportlib)
+

=== modified file 'storage/ndb/src/common/util/Makefile.am'
--- a/storage/ndb/src/common/util/Makefile.am	2011-04-01 11:46:04 +0000
+++ b/storage/ndb/src/common/util/Makefile.am	2011-05-04 09:44:18 +0000
@@ -27,14 +27,15 @@ libgeneral_la_SOURCES = \
             strdup.c \
             ConfigValues.cpp ndb_init.cpp basestring_vsnprintf.c \
             Bitmask.cpp SparseBitmask.cpp parse_mask.hpp \
-	    ndb_rand.c require.c Vector.cpp
+	    ndb_rand.c require.c Vector.cpp \
+	    NdbPack.cpp
 
 INCLUDES_LOC = @ZLIB_INCLUDES@
 
 libndbazio_la_SOURCES = ndbzio.c
 libndbazio_la_LIBADD = @ZLIB_LIBS@
 
-noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t
+noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t NdbPack-t
 
 BaseString_t_SOURCES = BaseString.cpp
 BaseString_t_CXXFLAGS = -DTEST_BASE_STRING
@@ -73,5 +74,14 @@ ndb_version_t_SOURCES = version.cpp
 ndb_version_t_CXXFLAGS = -DTEST_VERSION
 ndb_version_t_LDADD = libgeneral.la
 
+NdbPack_t_SOURCES = NdbPack.cpp
+NdbPack_t_CXXFLAGS = -DTEST_NDB_PACK
+NdbPack_t_LDADD = \
+	libgeneral.la \
+	$(top_builddir)/storage/ndb/src/common/portlib/libportlib.la \
+	$(top_builddir)/mysys/libmysys.la \
+	$(top_builddir)/strings/libmystrings.la \
+	$(top_builddir)/dbug/libdbug.la
+
 include $(top_srcdir)/storage/ndb/config/common.mk.am
 include $(top_srcdir)/storage/ndb/config/type_util.mk.am

=== added file 'storage/ndb/src/common/util/NdbPack.cpp'
--- a/storage/ndb/src/common/util/NdbPack.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/NdbPack.cpp	2011-05-09 15:35:25 +0000
@@ -0,0 +1,2053 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include <ndb_global.h>
+#include <NdbPack.hpp>
+#include <NdbOut.hpp>
+#include <NdbEnv.h>
+
+// NdbPack::Error
+
+int
+NdbPack::Error::get_error_code() const
+{
+  return m_error_code;
+}
+
+int
+NdbPack::Error::get_error_line() const
+{
+  return m_error_line;
+}
+
+void
+NdbPack::Error::set_error(int code, int line) const
+{
+  m_error_code = code;
+  m_error_line = line;
+#ifdef VM_TRACE
+  const char* p = NdbEnv_GetEnv("NDB_PACK_ABORT_ON_ERROR", (char*)0, 0);
+  if (p != 0 && strchr("1Y", p[0]) != 0)
+    require(false);
+#endif
+}
+
+void
+NdbPack::Error::set_error(const Error& e2) const
+{
+  set_error(e2.m_error_code, e2.m_error_line);
+}
+
+// NdbPack::Endian
+
+void
+NdbPack::Endian::convert(void* ptr, Uint32 len)
+{
+  Uint8* p = (Uint8*)ptr;
+  for (Uint32 i = 0; i < len / 2; i++)
+  {
+    Uint32 j = len - i - 1;
+    Uint8 tmp = p[i];
+    p[i] = p[j];
+    p[j] = tmp;
+  }
+}
+
+// NdbPack::Type
+
+struct Ndb_pack_type_info {
+  bool m_supported;
+  Uint16 m_fixSize;     // if non-zero must have this exact size
+  Uint16 m_arrayType;   // 0,1,2 length bytes
+  bool m_charType;      // type with character set
+  bool m_convert;       // convert endian (reverse byte order)
+};
+
+static const Ndb_pack_type_info
+g_ndb_pack_type_info[] = {
+  { 0, 0, 0, 0, 0 }, // NDB_TYPE_UNDEFINED
+  { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYINT
+  { 1, 1, 0, 0, 1 }, // NDB_TYPE_TINYUNSIGNED
+  { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLINT
+  { 1, 2, 0, 0, 1 }, // NDB_TYPE_SMALLUNSIGNED
+  { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMINT
+  { 1, 3, 0, 0, 1 }, // NDB_TYPE_MEDIUMUNSIGNED
+  { 1, 4, 0, 0, 1 }, // NDB_TYPE_INT
+  { 1, 4, 0, 0, 1 }, // NDB_TYPE_UNSIGNED
+  { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGINT
+  { 1, 8, 0, 0, 1 }, // NDB_TYPE_BIGUNSIGNED
+  { 1, 4, 0, 0, 1 }, // NDB_TYPE_FLOAT
+  { 1, 8, 0, 0, 1 }, // NDB_TYPE_DOUBLE
+  { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMAL
+  { 1, 0, 0, 1, 0 }, // NDB_TYPE_CHAR
+  { 1, 0, 1, 1, 0 }, // NDB_TYPE_VARCHAR
+  { 1, 0, 0, 0, 0 }, // NDB_TYPE_BINARY
+  { 1, 0, 1, 0, 0 }, // NDB_TYPE_VARBINARY
+  { 1, 8, 0, 0, 0 }, // NDB_TYPE_DATETIME
+  { 1, 3, 0, 0, 0 }, // NDB_TYPE_DATE
+  { 0, 0, 0, 0, 0 }, // NDB_TYPE_BLOB
+  { 0, 0, 0, 1, 0 }, // NDB_TYPE_TEXT
+  { 0, 0, 0, 0, 0 }, // NDB_TYPE_BIT
+  { 1, 0, 2, 1, 0 }, // NDB_TYPE_LONGVARCHAR
+  { 1, 0, 2, 0, 0 }, // NDB_TYPE_LONGVARBINARY
+  { 1, 3, 0, 0, 0 }, // NDB_TYPE_TIME
+  { 1, 1, 0, 0, 0 }, // NDB_TYPE_YEAR
+  { 1, 4, 0, 0, 0 }, // NDB_TYPE_TIMESTAMP
+  { 1, 0, 0, 0, 0 }, // NDB_TYPE_OLDDECIMALUNSIGNED
+  { 1, 0, 0, 0, 0 }, // NDB_TYPE_DECIMAL
+  { 1, 0, 0, 0, 0 }  // NDB_TYPE_DECIMALUNSIGNED
+};
+
+static const int g_ndb_pack_type_info_cnt =
+  sizeof(g_ndb_pack_type_info) / sizeof(g_ndb_pack_type_info[0]);
+
+int
+NdbPack::Type::complete()
+{
+  if (m_typeId == 0)
+  {
+    set_error(TypeNotSet, __LINE__);
+    return -1;
+  }
+  if (m_typeId >= g_ndb_pack_type_info_cnt)
+  {
+    set_error(TypeNotSet, __LINE__);
+    return -1;
+  }
+  const Ndb_pack_type_info& info = g_ndb_pack_type_info[m_typeId];
+  if (!info.m_supported)
+  {
+    set_error(TypeNotSupported, __LINE__);
+    return -1;
+  }
+  if (m_byteSize == 0)
+  {
+    set_error(TypeSizeZero, __LINE__);
+    return -1;
+  }
+  if (info.m_fixSize != 0 && m_byteSize != info.m_fixSize)
+  {
+    set_error(TypeFixSizeInvalid, __LINE__);
+    return -1;
+  }
+  if (!(m_nullable <= 1))
+  {
+    set_error(TypeNullableNotBool, __LINE__);
+    return -1;
+  }
+  if (info.m_charType && m_csNumber == 0)
+  {
+    set_error(CharsetNotSpecified, __LINE__);
+    return -1;
+  }
+  if (info.m_charType && all_charsets[m_csNumber] == 0)
+  {
+    CHARSET_INFO* cs = get_charset(m_csNumber, MYF(0));
+    if (cs == 0)
+    {
+      set_error(CharsetNotFound, __LINE__);
+      return -1;
+    }
+    all_charsets[m_csNumber] = cs; // yes caller must do this
+  }
+  if (!info.m_charType && m_csNumber != 0)
+  {
+    set_error(CharsetNotAllowed, __LINE__);
+    return -1;
+  }
+  m_arrayType = info.m_arrayType;
+  return 0;
+}
+
+// NdbPack::Spec
+
+int
+NdbPack::Spec::add(Type type)
+{
+  Uint32 cnt = m_cnt;
+  Uint32 nullable_cnt = m_nullableCnt;
+  Uint32 varsize_cnt = m_varsizeCnt;
+  Uint32 max_byte_size = m_maxByteSize;
+  if (type.complete() == -1)
+  {
+    set_error(type);
+    return -1;
+  }
+  type.m_nullbitPos = 0xFFFF;
+  if (type.m_nullable)
+  {
+    type.m_nullbitPos = nullable_cnt;
+    nullable_cnt++;
+  }
+  if (type.m_arrayType != 0)
+  {
+    varsize_cnt++;
+  }
+  max_byte_size += type.m_byteSize;
+  if (cnt >= m_bufMaxCnt)
+  {
+    set_error(SpecBufOverflow, __LINE__);
+    return -1;
+  }
+  m_buf[cnt] = type;
+  cnt++;
+  m_cnt = cnt;
+  m_nullableCnt = nullable_cnt;
+  m_varsizeCnt = varsize_cnt;
+  m_maxByteSize = max_byte_size;
+  return 0;
+}
+
+int
+NdbPack::Spec::add(Type type, Uint32 cnt)
+{
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    if (add(type) == -1)
+      return -1;
+  }
+  return 0;
+}
+
+void
+NdbPack::Spec::copy(const Spec& s2)
+{
+  assert(m_bufMaxCnt >= s2.m_cnt);
+  reset();
+  m_cnt = s2.m_cnt;
+  m_nullableCnt = s2.m_nullableCnt;
+  m_varsizeCnt = s2.m_varsizeCnt;
+  m_maxByteSize = s2.m_maxByteSize;
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    m_buf[i] = s2.m_buf[i];
+  }
+}
+
+// NdbPack::Iter
+
+int
+NdbPack::Iter::desc(const Uint8* item)
+{
+  const Uint32 i = m_cnt; // item index
+  assert(i < m_spec.m_cnt);
+  const Type& type = m_spec.m_buf[i];
+  const Uint32 lenBytes = type.m_arrayType;
+  Uint32 bareLen = 0;
+  switch (lenBytes) {
+  case 0:
+    bareLen = type.m_byteSize;
+    break;
+  case 1:
+    bareLen = item[0];
+    break;
+  case 2:
+    bareLen = item[0] + (item[1] << 8);
+    break;
+  default:
+    assert(false);
+    set_error(InternalError, __LINE__);
+    return -1;
+  }
+  const Uint32 itemLen = lenBytes + bareLen;
+  if (itemLen > type.m_byteSize)
+  {
+    set_error(DataValueOverflow, __LINE__);
+    return -1;
+  }
+  m_itemPos += m_itemLen; // skip previous item
+  m_cnt++;
+  m_lenBytes = lenBytes;
+  m_bareLen = bareLen;
+  m_itemLen = itemLen;
+  return 0;
+}
+
+int
+NdbPack::Iter::desc_null()
+{
+  assert(m_cnt < m_spec.m_cnt);
+  // caller checks if null allowed
+  m_itemPos += m_itemLen; // skip previous item
+  m_cnt++;
+  m_nullCnt++;
+  m_lenBytes = 0;
+  m_bareLen = 0;
+  m_itemLen = 0;
+  return 0;
+}
+
+int
+NdbPack::Iter::cmp(const Iter& r2, const Uint8* buf1, const Uint8* buf2) const
+{
+  const Iter& r1 = *this;
+  assert(&r1.m_spec == &r2.m_spec);
+  assert(r1.m_cnt == r2.m_cnt && r1.m_cnt > 0);
+  const Uint32 i = r1.m_cnt - 1; // item index
+  int res = 0;
+  const Uint32 n1 = r1.m_itemLen;
+  const Uint32 n2 = r2.m_itemLen;
+  if (n1 != 0)
+  {
+    if (n2 != 0)
+    {
+      const Type& type = r1.m_spec.m_buf[i];
+      const NdbSqlUtil::Type& sqlType = getSqlType(type.m_typeId);
+      const Uint8* p1 = &buf1[r1.m_itemPos];
+      const Uint8* p2 = &buf2[r2.m_itemPos];
+      CHARSET_INFO* cs = all_charsets[type.m_csNumber];
+      res = (*sqlType.m_cmp)(cs, p1, n1, p2, n2);
+    }
+    else
+    {
+      res = +1;
+    }
+  }
+  else
+  {
+    if (n2 != 0)
+      res = -1;
+  }
+  return res;
+}
+
+// NdbPack::DataC
+
+int
+NdbPack::DataC::desc(Iter& r) const
+{
+  const Uint32 i = r.m_cnt; // item index
+  assert(i < m_cnt);
+  const Type& type = m_spec.m_buf[i];
+  if (type.m_nullable || m_allNullable)
+  {
+    Uint32 nullbitPos = 0;
+    if (!m_allNullable)
+      nullbitPos = type.m_nullbitPos;
+    else
+      nullbitPos = i;
+    const Uint32 byte_pos = nullbitPos / 8;
+    const Uint32 bit_pos = nullbitPos % 8;
+    const Uint8 bit_mask = (1 << bit_pos);
+    const Uint8& the_byte = m_buf[byte_pos];
+    if ((the_byte & bit_mask) != 0)
+    {
+      if (r.desc_null() == -1)
+      {
+        set_error(r);
+        return -1;
+      }
+      return 0;
+    }
+  }
+  const Uint32 pos = r.m_itemPos + r.m_itemLen;
+  const Uint8* item = &m_buf[pos];
+  if (r.desc(item) == -1)
+  {
+    set_error(r);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::DataC::cmp(const DataC& d2, Uint32 cnt, Uint32& num_eq) const
+{
+  const DataC& d1 = *this;
+  assert(cnt <= d1.m_cnt);
+  assert(cnt <= d2.m_cnt);
+  Iter r1(d1);
+  Iter r2(d2);
+  int res = 0;
+  Uint32 i; // remember last
+  for (i = 0; i < cnt; i++)
+  {
+    d1.desc(r1);
+    d2.desc(r2);
+    res = r1.cmp(r2, d1.m_buf, d2.m_buf);
+    if (res != 0)
+      break;
+  }
+  num_eq = i;
+  return res;
+}
+
+// NdbPack::Data
+
+int
+NdbPack::Data::add(const void* data, Uint32* len_out)
+{
+  assert(data != 0);
+  const Uint8* item = (const Uint8*)data;
+  const Uint32 i = m_cnt; // item index
+  if (i >= m_spec.m_cnt)
+  {
+    set_error(DataCntOverflow, __LINE__);
+    return -1;
+  }
+  Iter& r = m_iter;
+  assert(r.m_cnt == i);
+  const Uint32 fullLen = m_varBytes + r.m_itemPos + r.m_itemLen;
+  if (r.desc(item) == -1)
+  {
+    set_error(r);
+    return -1;
+  }
+  if (fullLen + r.m_itemLen > m_bufMaxLen)
+  {
+    set_error(DataBufOverflow, __LINE__);
+    return -1;
+  }
+  memcpy(&m_buf[fullLen], item, r.m_itemLen);
+  *len_out = r.m_itemLen;
+  m_cnt++;
+  return 0;
+}
+
+int
+NdbPack::Data::add(const void* data, Uint32 cnt, Uint32* len_out)
+{
+  const Uint8* data_ptr = (const Uint8*)data;
+  Uint32 len_tot = 0;
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    Uint32 len;
+    if (add(data_ptr, &len) == -1)
+      return -1;
+    if (data != 0)
+      data_ptr += len;
+    len_tot += len;
+  }
+  *len_out = len_tot;
+  return 0;
+}
+
+int
+NdbPack::Data::add_null(Uint32* len_out)
+{
+  const Uint32 i = m_cnt; // item index
+  if (i >= m_spec.m_cnt)
+  {
+    set_error(DataCntOverflow, __LINE__);
+    return -1;
+  }
+  Iter& r = m_iter;
+  assert(r.m_cnt == i);
+  if (r.desc_null() == -1)
+  {
+    set_error(r);
+    return -1;
+  }
+  Uint32 nullbitPos = 0;
+  if (!m_allNullable)
+  {
+    const Type& type = m_spec.m_buf[i];
+    if (!type.m_nullable)
+    {
+      set_error(DataNotNullable, __LINE__);
+      return -1;
+    }
+    nullbitPos = type.m_nullbitPos;
+  }
+  else
+  {
+    nullbitPos = i;
+  }
+  const Uint32 byte_pos = nullbitPos / 8;
+  const Uint32 bit_pos = nullbitPos % 8;
+  const Uint8 bit_mask = (1 << bit_pos);
+  Uint8& the_byte = m_buf[m_varBytes + byte_pos];
+  assert((the_byte & bit_mask) == 0);
+  the_byte |= bit_mask;
+  *len_out = r.m_itemLen;
+  m_cnt++;
+  return 0;
+}
+
+int
+NdbPack::Data::add_null(Uint32 cnt, Uint32* len_out)
+{
+  Uint32 len_tot = 0;
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    Uint32 len;
+    if (add_null(&len) == -1)
+      return -1;
+    len_tot += len;
+  }
+  *len_out = len_tot;
+  return 0;
+}
+
+int
+NdbPack::Data::add_poai(const Uint32* poai, Uint32* len_out)
+{
+  const AttributeHeader ah = *(const AttributeHeader*)&poai[0];
+  if (!ah.isNULL())
+  {
+    if (add(&poai[1], len_out) == -1)
+      return -1;
+  }
+  else
+  {
+    if (add_null(len_out) == -1)
+      return -1;
+  }
+  if (ah.getByteSize() != *len_out)
+  {
+    set_error(InvalidAttrInfo, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::Data::add_poai(const Uint32* poai, Uint32 cnt, Uint32* len_out)
+{
+  Uint32 len_tot = 0;
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    Uint32 len;
+    if (add_poai(poai, &len) == -1)
+      return -1;
+    len_tot += len;
+    poai += 1 + (len + 3) / 4;
+  }
+  *len_out = len_tot;
+  return 0;
+}
+
+int
+NdbPack::Data::finalize_impl()
+{
+  const Uint32 dataLen = m_iter.m_itemPos + m_iter.m_itemLen;
+  switch (m_varBytes) {
+  // case 0: inlined
+  case 1:
+    if (dataLen <= 0xFF)
+    {
+      m_buf[0] = dataLen;
+      return 0;
+    }
+    break;
+  case 2:
+    if (dataLen <= 0xFFFF)
+    {
+      m_buf[0] = (dataLen & 0xFF);
+      m_buf[1] = (dataLen >> 8);
+      return 0;
+    }
+    break;
+  default:
+    break;
+  }
+  set_error(InternalError, __LINE__);
+  return -1;
+}
+
+int
+NdbPack::Data::desc_all(Uint32 cnt)
+{
+  assert(m_cnt == 0); // reset() would destroy nullmask
+  for (Uint32 i = 0; i < cnt; i++)
+  {
+    m_cnt++;
+    if (desc(m_iter) == -1)
+      return -1;
+  }
+  if (finalize() == -1)
+    return -1;
+  return 0;
+}
+
+int
+NdbPack::Data::copy(const DataC& d2)
+{
+  reset();
+  Iter r2(d2);
+  const Uint32 cnt2 = d2.m_cnt;
+  for (Uint32 i = 0; i < cnt2; i++)
+  {
+    if (d2.desc(r2) == -1)
+      return -1;
+    Uint32 len_out = ~(Uint32)0;
+    if (r2.m_itemLen != 0)
+    {
+      if (add(&d2.m_buf[r2.m_itemPos], &len_out) == -1)
+          return -1;
+      assert(len_out == r2.m_itemLen);
+    }
+    else
+    {
+      if (add_null(&len_out) == -1)
+        return -1;
+      assert(len_out ==0);
+    }
+  }
+  if (finalize() == -1)
+    return -1;
+  return 0;
+}
+
+int
+NdbPack::Data::convert_impl(Endian::Value to_endian)
+{
+  const Spec& spec = m_spec;
+  Iter r(*this);
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    if (DataC::desc(r) == -1)
+    {
+      set_error(r);
+      return -1;
+    }
+    const Type& type = spec.m_buf[i];
+    const Uint32 typeId = type.m_typeId;
+    const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+    if (info.m_convert)
+    {
+      Uint8* ptr = &m_buf[m_varBytes + r.m_itemPos];
+      Uint32 len = r.m_itemLen;
+      Endian::convert(ptr, len);
+    }
+  }
+  return 0;
+}
+
+// NdbPack::BoundC
+
+int
+NdbPack::BoundC::finalize(int side)
+{
+  if (m_data.m_cnt == 0 && side != 0)
+  {
+    set_error(BoundEmptySide, __LINE__);
+    return -1;
+  }
+  if (m_data.m_cnt != 0 && side != -1 && side != +1)
+  {
+    set_error(BoundNonemptySide, __LINE__);
+    return -1;
+  }
+  m_side = side;
+  return 0;
+}
+
+int
+NdbPack::BoundC::cmp(const BoundC& b2, Uint32 cnt, Uint32& num_eq) const
+{
+  const BoundC& b1 = *this;
+  const DataC& d1 = b1.m_data;
+  const DataC& d2 = b2.m_data;
+  int res = d1.cmp(d2, cnt, num_eq);
+  if (res == 0)
+  {
+    if (cnt < d1.m_cnt && cnt < d2.m_cnt)
+      ;
+    else if (d1.m_cnt < d2.m_cnt)
+      res = (+1) * b1.m_side;
+    else if (d1.m_cnt > d2.m_cnt)
+      res = (-1) * b2.m_side;
+    else if (b1.m_side < b2.m_side)
+      res = -1;
+    else if (b1.m_side > b2.m_side)
+      res = +1;
+  }
+  return res;
+}
+
+// NdbPack::Bound
+
+// print
+
+NdbPack::Print::Print(char* buf, Uint32 bufsz) :
+  m_buf(buf), m_bufsz(bufsz), m_sz(0) {}
+
+void
+NdbPack::Print::print(const char* fmt, ...)
+{
+  va_list ap;
+  va_start(ap, fmt);
+  if (m_bufsz > m_sz)
+  {
+    BaseString::vsnprintf(&m_buf[m_sz], m_bufsz - m_sz, fmt, ap);
+    m_sz += (Uint32)strlen(&m_buf[m_sz]);
+  }
+  va_end(ap);
+}
+
+// print Type
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Type& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::Type::print(NdbOut& out) const
+{
+  char buf[200];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Type::print(char* buf, Uint32 bufsz) const
+{
+  Print p(buf, bufsz);
+  p.print("typeId:%u", m_typeId);
+  p.print(" byteSize:%u", m_byteSize);
+  p.print(" nullable:%u", m_nullable);
+  p.print(" csNumber:%u", m_csNumber);
+  return buf;
+}
+
+// print Spec
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Spec& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::Spec::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Spec::print(char* buf, Uint32 bufsz) const
+{
+  Print p(buf, bufsz);
+  p.print("cnt:%u", m_cnt);
+  p.print(" nullableCnt:%u", m_nullableCnt);
+  p.print(" varsizeCnt:%u", m_varsizeCnt);
+  p.print(" nullmaskLen:%u", get_nullmask_len(false));
+  p.print(" maxByteSize:%u", m_maxByteSize);
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    const Type& type = m_buf[i];
+    p.print(" [%u", i);
+    p.print(" typeId:%u", type.m_typeId);
+    p.print(" nullable:%u", type.m_nullable);
+    p.print(" byteSize:%u", type.m_byteSize);
+    p.print(" csNumber:%u", type.m_csNumber);
+    p.print("]");
+  }
+  return buf;
+}
+
+// print DataC
+
+bool g_ndb_pack_print_hex_always = false;
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::DataC& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::DataC::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::DataC::print(char* buf, Uint32 bufsz, bool convert_flag) const
+{
+  Print p(buf, bufsz);
+  const Spec& spec = m_spec;
+  const Uint32 nullmask_len = spec.get_nullmask_len(m_allNullable);
+  if (nullmask_len != 0)
+  {
+    p.print("nullmask:");
+    for (Uint32 i = 0; i < nullmask_len; i++)
+    {
+      int x = m_buf[i];
+      p.print("%02x", x);
+    }
+  }
+  Iter r(*this);
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    desc(r);
+    const Uint8* value = &m_buf[r.m_itemPos];
+    p.print(" [%u", i);
+    p.print(" pos:%u", r.m_itemPos);
+    p.print(" len:%u", r.m_itemLen);
+    if (r.m_itemLen > 0)
+    {
+      p.print(" value:");
+      // some specific types for debugging
+      const Type& type = spec.m_buf[i];
+      bool ok = true;
+      switch (type.m_typeId) {
+      case NDB_TYPE_TINYINT:
+        {
+          Int8 x;
+          memcpy(&x, value, 1);
+          if (convert_flag)
+            Endian::convert(&x, 1);
+          p.print("%d", (int)x);
+        }
+        break;
+      case NDB_TYPE_TINYUNSIGNED:
+        {
+          Uint8 x;
+          memcpy(&x, value, 1);
+          if (convert_flag)
+            Endian::convert(&x, 1);
+          p.print("%u", (uint)x);
+        }
+        break;
+      case NDB_TYPE_SMALLINT:
+        {
+          Int16 x;
+          memcpy(&x, value, 2);
+          if (convert_flag)
+            Endian::convert(&x, 2);
+          p.print("%d", (int)x);
+        }
+        break;
+      case NDB_TYPE_SMALLUNSIGNED:
+        {
+          Uint16 x;
+          memcpy(&x, value, 2);
+          if (convert_flag)
+            Endian::convert(&x, 2);
+          p.print("%u", (uint)x);
+        }
+        break;
+      case NDB_TYPE_INT:
+        {
+          Int32 x;
+          memcpy(&x, value, 4);
+          if (convert_flag)
+            Endian::convert(&x, 4);
+          p.print("%d", (int)x);
+        }
+        break;
+      case NDB_TYPE_UNSIGNED:
+        {
+          Uint32 x;
+          memcpy(&x, value, 4);
+          if (convert_flag)
+            Endian::convert(&x, 4);
+          p.print("%u", (uint)x);
+        }
+        break;
+      case NDB_TYPE_FLOAT:
+        {
+          float x;
+          memcpy(&x, value, 4);
+          if (convert_flag)
+            Endian::convert(&x, 4);
+          p.print("%g", (double)x);
+        }
+        break;
+      case NDB_TYPE_DOUBLE:
+        {
+          double x;
+          memcpy(&x, value, 8);
+          if (convert_flag)
+            Endian::convert(&x, 8);
+          p.print("%g", x);
+        }
+        break;
+      case NDB_TYPE_CHAR:
+      case NDB_TYPE_VARCHAR:
+      case NDB_TYPE_LONGVARCHAR:
+        {
+          const Uint32 off = type.m_arrayType;
+          for (Uint32 j = 0; j < r.m_bareLen; j++)
+          {
+            Uint8 x = value[off + j];
+            p.print("%c", (int)x);
+          }
+        }
+        break;
+      default:
+        ok = false;
+        break;
+      }
+      if (!ok || g_ndb_pack_print_hex_always)
+      {
+        p.print("<");
+        for (Uint32 j = 0; j < r.m_itemLen; j++)
+        {
+          int x = value[j];
+          p.print("%02x", x);
+        }
+        p.print(">");
+      }
+    }
+    p.print("]");
+  }
+  return buf;
+}
+
+// print Data
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Data& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::Data::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Data::print(char* buf, Uint32 bufsz) const
+{
+  Print p(buf, bufsz);
+  char* ptr = buf;
+  if (m_varBytes != 0)
+  {
+    p.print("varBytes:");
+    for (Uint32 i = 0; i < m_varBytes; i++)
+    {
+      int r = m_buf[i];
+      p.print("%02x", r);
+    }
+    p.print(" ");
+  }
+  p.print("dataLen:%u", m_iter.m_itemPos + m_iter.m_itemLen);
+  p.print(" ");
+  const bool convert_flag =
+    m_endian != Endian::Native &&
+    m_endian != Endian::get_endian();
+  DataC::print(&buf[p.m_sz], bufsz - p.m_sz, convert_flag);
+  return buf;
+}
+
+// print BoundC
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::BoundC& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::BoundC::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::BoundC::print(char* buf, Uint32 bufsz) const
+{
+  Print p(buf, bufsz);
+  p.print("side:%s ", m_side < 0 ? "-" : m_side > 0 ? "+" : "0");
+  m_data.print(&buf[p.m_sz], bufsz - p.m_sz);
+  return buf;
+}
+
+// print Bound
+
+NdbOut&
+operator<<(NdbOut& out, const NdbPack::Bound& a)
+{
+  a.print(out);
+  return out;
+}
+
+void
+NdbPack::Bound::print(NdbOut& out) const
+{
+  char buf[8000];
+  out << print(buf, sizeof(buf));
+}
+
+const char*
+NdbPack::Bound::print(char* buf, Uint32 bufsz) const
+{
+  BoundC::print(buf, bufsz);
+  return buf;
+}
+
+// validate
+
+int
+NdbPack::Type::validate() const
+{
+  Type type2 = *this;
+  if (type2.complete() == -1)
+  {
+    set_error(type2);
+    return -1;
+  }
+  if (memcmp(this, &type2, sizeof(Type)) != 0)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::Spec::validate() const
+{
+  Uint32 nullableCnt = 0;
+  Uint32 varsizeCnt = 0;
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    const Type& type = m_buf[i];
+    if (type.validate() == -1)
+    {
+      set_error(type);
+      return -1;
+    }
+    if (type.m_nullable)
+      nullableCnt++;
+    if (type.m_arrayType != 0)
+      varsizeCnt++;
+  }
+  if (m_nullableCnt != nullableCnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (m_varsizeCnt != varsizeCnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::Data::validate() const
+{
+  if (DataC::validate() == -1)
+    return -1;
+  const Iter& r = m_iter;
+  if (r.m_cnt != m_cnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  Iter r2(*this);
+  for (Uint32 i = 0; i < m_cnt; i++)
+  {
+    if (desc(r2) == -1)
+      return -1;
+  }
+  if (r.m_itemPos != r2.m_itemPos)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (r.m_cnt != r2.m_cnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (r.m_nullCnt != r2.m_nullCnt)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (r.m_itemLen != r2.m_itemLen)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::BoundC::validate() const
+{
+  if (m_data.validate() == -1)
+  {
+    set_error(m_data);
+    return -1;
+  }
+  if (m_data.m_cnt == 0 && m_side != 0)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  if (m_data.m_cnt != 0 && m_side != -1 && m_side != +1)
+  {
+    set_error(ValidationError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbPack::Bound::validate() const
+{
+  if (BoundC::validate() == -1)
+    return -1;
+  if (m_data.validate() == -1)
+  {
+    set_error(m_data);
+    return -1;
+  }
+  return 0;
+}
+
+#ifdef TEST_NDB_PACK
+#include <util/NdbTap.hpp>
+
+#define chk1(x) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; require(false); } while (0)
+
+#define chk2(x, e) do { if (x) break; ndbout << "line " << __LINE__ << ": " << #x << endl; ndbout << "NdbPack code: " << (e).get_error_code() << " line: " << (e).get_error_line() << endl; require(false); } while (0)
+
+#define ll0(x) do { if (verbose < 0) break; ndbout << "0- " << x << endl; } while (0)
+#define ll1(x) do { if (verbose < 1) break; ndbout << "1- " << x << endl; } while (0)
+#define ll2(x) do { if (verbose < 2) break; ndbout << "2- " << x << endl; } while (0)
+#define ll3(x) do { if (verbose < 3) break; ndbout << "3- " << x << endl; } while (0)
+
+#define xmin(a, b) ((a) < (b) ? (a) : (b))
+
+#include <ndb_rand.h>
+
+static uint // random 0..n-1
+getrandom(uint n)
+{
+  if (n != 0) {
+    uint k = ndb_rand();
+    return k % n;
+  }
+  return 0;
+}
+
+static uint // random 0..n-1 biased exponentially to smaller
+getrandom(uint n, uint bias)
+{
+  assert(bias != 0);
+  uint k = getrandom(n);
+  bias--;
+  while (bias != 0) {
+    k = getrandom(k + 1);
+    bias--;
+  }
+  return k;
+}
+
+static bool
+getrandompct(uint pct)
+{
+  return getrandom(100) < pct;
+}
+
+// change in TAPTEST
+static int seed = -1; // random
+static int loops = 0;
+static int spec_cnt = -1; // random
+static int fix_type = 0; // all types
+static int no_nullable = 0;
+static int data_cnt = -1; // Max
+static int bound_cnt = -1; // Max
+static int verbose = 0;
+
+struct Tspec {
+  enum { Max = 100 };
+  enum { MaxBuf = Max * 4000 };
+  NdbPack::Spec m_spec;
+  NdbPack::Type m_type[Max];
+  Tspec() {
+    m_spec.set_buf(m_type, Max);
+  }
+  void create();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tspec& tspec)
+{
+  out << tspec.m_spec;
+  return out;
+}
+
+void
+Tspec::create()
+{
+  m_spec.reset();
+  int cnt = spec_cnt == -1 ? 1 + getrandom(Tspec::Max, 3) : spec_cnt;
+  int i = 0;
+  while (i < cnt) {
+    int typeId = fix_type;
+    if (typeId == 0)
+      typeId = getrandom(g_ndb_pack_type_info_cnt);
+    const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+    switch (typeId) {
+    case NDB_TYPE_INT:
+    case NDB_TYPE_UNSIGNED:
+    case NDB_TYPE_CHAR:
+    case NDB_TYPE_VARCHAR:
+    case NDB_TYPE_LONGVARCHAR:
+      break;
+    default:
+      continue;
+    }
+    require(info.m_supported);
+    int byteSize = 0;
+    if (info.m_fixSize != 0)
+      byteSize = info.m_fixSize;
+    else if (info.m_arrayType == 0)
+      byteSize = 1 + getrandom(128, 1);  // char(1-128)
+    else if (info.m_arrayType == 1)
+      byteSize = 1 + getrandom(256, 2);  // varchar(0-255)
+    else if (info.m_arrayType == 2)
+      byteSize = 2 + getrandom(1024, 3); // longvarchar(0-1023)
+    else
+      require(false);
+    bool nullable = no_nullable ? false : getrandompct(50);
+    int csNumber = 0;
+    if (info.m_charType) {
+      csNumber = 8; // should include ascii
+    }
+    NdbPack::Type type(typeId, byteSize, nullable, csNumber);
+    chk2(m_spec.add(type) == 0, m_spec);
+    i++;
+  }
+  chk2(m_spec.validate() == 0, m_spec);
+}
+
+struct Tdata {
+  const Tspec& m_tspec;
+  NdbPack::Data m_data;
+  const bool m_isBound;
+  int m_cnt;
+  Uint8* m_xbuf;        // unpacked
+  int m_xsize;
+  int m_xoff[Tspec::Max];
+  int m_xlen[Tspec::Max];
+  bool m_xnull[Tspec::Max];
+  int m_xnulls;
+  Uint32* m_poaiBuf;    // plain old attr info
+  int m_poaiSize;
+  Uint8* m_packBuf;     // packed
+  int m_packLen;
+  Tdata(Tspec& tspec, bool isBound, uint varBytes) :
+    m_tspec(tspec),
+    m_data(tspec.m_spec, isBound, varBytes),
+    m_isBound(isBound)
+  {
+    m_cnt = tspec.m_spec.get_cnt();
+    m_xbuf = 0;
+    m_poaiBuf = 0;
+    m_packBuf = 0;
+  }
+  ~Tdata() {
+    delete [] m_xbuf;
+    delete [] m_poaiBuf;
+    delete [] m_packBuf;
+  }
+  void create();
+  void add();
+  void finalize();
+  // compare using unpacked data
+  int xcmp(const Tdata& tdata2, int* num_eq) const;
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tdata& tdata)
+{
+  out << tdata.m_data;
+  return out;
+}
+
+void
+Tdata::create()
+{
+  union {
+    Uint8 xbuf[Tspec::MaxBuf];
+    Uint64 xbuf_align;
+  };
+  memset(xbuf, 0x3f, sizeof(xbuf));
+  m_xsize = 0;
+  m_xnulls = 0;
+  Uint32 poaiBuf[Tspec::MaxBuf / 4];
+  memset(poaiBuf, 0x5f, sizeof(poaiBuf));
+  m_poaiSize = 0;
+  m_packLen = m_data.get_var_bytes();
+  m_packLen += (m_tspec.m_spec.get_nullable_cnt(m_isBound) + 7) / 8;
+  int i = 0, j;
+  while (i < m_cnt) {
+    const NdbPack::Type& type = m_tspec.m_spec.get_type(i);
+    const int typeId = type.get_type_id();
+    const Ndb_pack_type_info& info = g_ndb_pack_type_info[typeId];
+    m_xnull[i] = type.get_nullable() && getrandompct(25);
+    m_xnull[i] = false;
+    if (type.get_nullable() || m_isBound)
+      m_xnull[i] = getrandompct(20);
+    int pad = 0; // null-char pad not counted in xlen
+    if (!m_xnull[i]) {
+      m_xoff[i] = m_xsize;
+      Uint8* xptr = &xbuf[m_xsize];
+      switch (typeId) {
+      case NDB_TYPE_INT:
+        {
+          Int32 x = getrandom(10);
+          if (getrandompct(50))
+            x = (-1) * x;
+          memcpy(xptr, &x, 4);
+          m_xlen[i] = info.m_fixSize;
+        }
+        break;
+      case NDB_TYPE_UNSIGNED:
+        {
+          Uint32 x = getrandom(10);
+          memcpy(xptr, &x, 4);
+          m_xlen[i] = info.m_fixSize;
+        }
+        break;
+      case NDB_TYPE_CHAR:
+        {
+          require(type.get_byte_size() >= 1);
+          int max_len = type.get_byte_size();
+          int len = getrandom(max_len + 1, 1);
+          for (j = 0; j < len; j++)
+          {
+            xptr[j] = 'a' + getrandom(3);
+          }
+          for (j = len; j < max_len; j++)
+          {
+            xptr[j] = 0x20;
+          }
+          m_xlen[i] = max_len;
+          xptr[max_len] = 0;
+          pad = 1;
+        }
+        break;
+      case NDB_TYPE_VARCHAR:
+        {
+          require(type.get_byte_size() >= 1);
+          int max_len = type.get_byte_size() - 1;
+          int len = getrandom(max_len, 2);
+          require(len < 256);
+          xptr[0] = len;
+          for (j = 0; j < len; j++)
+          {
+            xptr[1 + j] = 'a' + getrandom(3);
+          }
+          m_xlen[i] = 1 + len;
+          xptr[1 + len] = 0;
+          pad = 1;
+        }
+        break;
+      case NDB_TYPE_LONGVARCHAR:
+        {
+          require(type.get_byte_size() >= 2);
+          int max_len = type.get_byte_size() - 2;
+          int len = getrandom(max_len, 3);
+          require(len < 256 * 256);
+          xptr[0] = (len & 0xFF);
+          xptr[1] = (len >> 8);
+          for (j = 0; j < len; j++)
+          {
+            xptr[2 + j] = 'a' + getrandom(3);
+          }
+          m_xlen[i] = 2 + len;
+          xptr[2 + len] = 0;
+          pad = 1;
+        }
+        break;
+      default:
+        require(false);
+        break;
+      }
+      m_xsize += m_xlen[i] + pad;
+      while (m_xsize % 8 != 0)
+        m_xsize++;
+      m_packLen += m_xlen[i];
+    } else {
+      m_xoff[i] = -1;
+      m_xlen[i] = 0;
+      m_xnulls++;
+    }
+    require(m_xnull[i] == (m_xoff[i] == -1));
+    require(m_xnull[i] == (m_xlen[i] == 0));
+    AttributeHeader* ah = (AttributeHeader*)&poaiBuf[m_poaiSize];
+    ah->setAttributeId(i); // not used
+    ah->setByteSize(m_xlen[i]);
+    m_poaiSize++;
+    if (!m_xnull[i]) {
+      memcpy(&poaiBuf[m_poaiSize], &xbuf[m_xoff[i]], m_xlen[i]);
+      m_poaiSize += (m_xlen[i] + 3) / 4;
+    }
+    i++;
+  }
+  require(m_xsize % 8 == 0);
+  m_xbuf = (Uint8*) new Uint64 [m_xsize / 8];
+  memcpy(m_xbuf, xbuf, m_xsize);
+  m_poaiBuf = (Uint32*) new Uint32 [m_poaiSize];
+  memcpy(m_poaiBuf, poaiBuf, m_poaiSize << 2);
+}
+
+void
+Tdata::add()
+{
+  m_packBuf = new Uint8 [m_packLen];
+  m_data.set_buf(m_packBuf, m_packLen);
+  int i, j;
+  j = 0;
+  while (j <= 1) {
+    if (j == 1)
+      m_data.reset();
+    i = 0;
+    while (i < m_cnt) {
+      Uint32 xlen = ~(Uint32)0;
+      if (!m_xnull[i]) {
+        int xoff = m_xoff[i];
+        const Uint8* xptr = &m_xbuf[xoff];
+        chk2(m_data.add(xptr, &xlen) == 0, m_data);
+        chk1((int)xlen == m_xlen[i]);
+      } else {
+        chk2(m_data.add_null(&xlen) == 0, m_data);
+        chk1(xlen == 0);
+      }
+      i++;
+    }
+    chk2(m_data.validate() == 0, m_data);
+    chk1((int)m_data.get_null_cnt() == m_xnulls);
+    j++;
+  }
+}
+
+void
+Tdata::finalize()
+{
+  chk2(m_data.finalize() == 0, m_data);
+  ll3("create: " << m_data);
+  chk1((int)m_data.get_full_len() == m_packLen);
+  {
+    const Uint8* p = (const Uint8*)m_data.get_full_buf();
+    chk1(p[0] + (p[1] << 8) == m_packLen - 2);
+  }
+}
+
+int
+Tdata::xcmp(const Tdata& tdata2, int* num_eq) const
+{
+  const Tdata& tdata1 = *this;
+  require(&tdata1.m_tspec == &tdata2.m_tspec);
+  const Tspec& tspec = tdata1.m_tspec;
+  int res = 0;
+  int cnt = xmin(tdata1.m_cnt, tdata2.m_cnt);
+  int i;
+  for (i = 0; i < cnt; i++) {
+    if (!tdata1.m_xnull[i]) {
+      if (!tdata2.m_xnull[i]) {
+        // the pointers are Uint64-aligned
+        const Uint8* xptr1 = &tdata1.m_xbuf[tdata1.m_xoff[i]];
+        const Uint8* xptr2 = &tdata2.m_xbuf[tdata2.m_xoff[i]];
+        const int xlen1 = tdata1.m_xlen[i];
+        const int xlen2 = tdata2.m_xlen[i];
+        const NdbPack::Type& type = tspec.m_spec.get_type(i);
+        const int typeId = type.get_type_id();
+        const int csNumber = type.get_cs_number();
+        CHARSET_INFO* cs = all_charsets[csNumber];
+        switch (typeId) {
+        case NDB_TYPE_INT:
+          {
+            require(cs == 0);
+            Int32 x1 = *(const Int32*)xptr1;
+            Int32 x2 = *(const Int32*)xptr2;
+            if (x1 < x2)
+              res = -1;
+            else if (x1 > x2)
+              res = +1;
+            ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
+          }
+          break;
+        case NDB_TYPE_UNSIGNED:
+          {
+            require(cs == 0);
+            Uint32 x1 = *(const Uint32*)xptr1;
+            Uint32 x2 = *(const Uint32*)xptr2;
+            if (x1 < x2)
+              res = -1;
+            else if (x1 > x2)
+              res = +1;
+            ll3("cmp res:" << res <<" x1:" << x1 << " x2:" << x2);
+          }
+          break;
+        case NDB_TYPE_CHAR:
+          {
+            require(cs != 0 && cs->coll != 0);
+            const uint n1 = xlen1;
+            const uint n2 = xlen2;
+            const uchar* t1 = &xptr1[0];
+            const uchar* t2 = &xptr2[0];
+            const char* s1 = (const char*)t1;
+            const char* s2 = (const char*)t2;
+            chk1(n1 == strlen(s1));
+            chk1(n2 == strlen(s2));
+            res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+            ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+          }
+          break;
+        case NDB_TYPE_VARCHAR:
+          {
+            require(cs != 0 && cs->coll != 0);
+            const uint n1 = xptr1[0];
+            const uint n2 = xptr2[0];
+            const uchar* t1 = &xptr1[1];
+            const uchar* t2 = &xptr2[1];
+            const char* s1 = (const char*)t1;
+            const char* s2 = (const char*)t2;
+            chk1(n1 == strlen(s1));
+            chk1(n2 == strlen(s2));
+            res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+            ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+          }
+          break;
+        case NDB_TYPE_LONGVARCHAR:
+          {
+            require(cs != 0 && cs->coll != 0);
+            const uint n1 = xptr1[0] | (xptr1[1] << 8);
+            const uint n2 = xptr2[0] | (xptr2[1] << 8);
+            const uchar* t1 = &xptr1[2];
+            const uchar* t2 = &xptr2[2];
+            const char* s1 = (const char*)t1;
+            const char* s2 = (const char*)t2;
+            chk1(n1 == strlen(s1));
+            chk1(n2 == strlen(s2));
+            res = (*cs->coll->strnncollsp)(cs, t1, n1, t2, n2, false);
+            ll3("cmp res:" << res <<" s1:" << s1 << " s2:" << s2);
+          }
+          break;
+        default:
+          require(false);
+          break;
+        }
+      } else
+        res = +1;
+    } else if (!tdata2.m_xnull[i])
+      res = -1;
+    if (res != 0)
+      break;
+  }
+  *num_eq = i;
+  ll3("xcmp res:" << res << " num_eq:" << *num_eq);
+  return res;
+}
+
+struct Tbound {
+  Tdata& m_tdata;
+  NdbPack::Bound m_bound;
+  Tbound(Tdata& tdata) :
+    m_tdata(tdata),
+    m_bound(tdata.m_data)
+  {
+    m_tdata.m_cnt = 1 + getrandom(m_tdata.m_cnt);
+  }
+  void create();
+  void add();
+  void finalize();
+  int xcmp(const Tdata& tdata2, int* num_eq) const;
+  int xcmp(const Tbound& tbound2, int* num_eq) const;
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tbound& tbound)
+{
+  out << tbound.m_bound;
+  return out;
+}
+
+void
+Tbound::create()
+{
+  m_tdata.create();
+}
+
+void
+Tbound::add()
+{
+  m_tdata.add();
+}
+
+void
+Tbound::finalize()
+{
+  int side = getrandompct(50) ? -1 : +1;
+  chk2(m_bound.finalize(side) == 0, m_bound);
+  chk2(m_bound.validate() == 0, m_bound);
+  chk1((int)m_tdata.m_data.get_full_len() == m_tdata.m_packLen);
+}
+
+int
+Tbound::xcmp(const Tdata& tdata2, int* num_eq) const
+{
+  const Tbound& tbound1 = *this;
+  const Tdata& tdata1 = tbound1.m_tdata;
+  require(tdata1.m_cnt <= tdata2.m_cnt);
+  *num_eq = -1;
+  int res = tdata1.xcmp(tdata2, num_eq);
+  if (res == 0) {
+    chk1(*num_eq == tdata1.m_cnt);
+    res = m_bound.get_side();
+  }
+  return res;
+}
+
+int
+Tbound::xcmp(const Tbound& tbound2, int* num_eq) const
+{
+  const Tbound& tbound1 = *this;
+  const Tdata& tdata1 = tbound1.m_tdata;
+  const Tdata& tdata2 = tbound2.m_tdata;
+  *num_eq = -1;
+  int res = tdata1.xcmp(tdata2, num_eq);
+  chk1(0 <= *num_eq && *num_eq <= xmin(tdata1.m_cnt, tdata2.m_cnt));
+  if (res == 0) {
+    chk1(*num_eq == xmin(tdata1.m_cnt, tdata2.m_cnt));
+    if (tdata1.m_cnt < tdata2.m_cnt)
+      res = (+1) * tbound1.m_bound.get_side();
+    else if (tdata1.m_cnt > tdata2.m_cnt)
+      res = (-1) * tbound2.m_bound.get_side();
+    else if (tbound1.m_bound.get_side() < tbound2.m_bound.get_side())
+      res = -1;
+    else if (tbound1.m_bound.get_side() > tbound2.m_bound.get_side())
+      res = +1;
+  }
+  return res;
+}
+
+struct Tdatalist {
+  enum { Max = 1000 };
+  Tdata* m_tdata[Max];
+  int m_cnt;
+  Tdatalist(Tspec& tspec) {
+    m_cnt = data_cnt == -1 ? Max : data_cnt;
+    int i;
+    for (i = 0; i < m_cnt; i++) {
+      m_tdata[i] = new Tdata(tspec, false, 2);
+    }
+  }
+  ~Tdatalist() {
+    int i;
+    for (i = 0; i < m_cnt; i++) {
+      delete m_tdata[i];
+    }
+  }
+  void create();
+  void sort();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tdatalist& tdatalist)
+{
+  int i;
+  for (i = 0; i < tdatalist.m_cnt; i++) {
+    out << "data " << i << ": " << *tdatalist.m_tdata[i];
+    if (i + 1 < tdatalist.m_cnt)
+      out << endl;
+  }
+  return out;
+}
+
+void
+Tdatalist::create()
+{
+  int i;
+  for (i = 0; i < m_cnt; i++) {
+    Tdata& tdata = *m_tdata[i];
+    tdata.create();
+    tdata.add();
+    tdata.finalize();
+  }
+}
+
+static int
+data_cmp(const void* a1, const void* a2)
+{
+  const Tdata& tdata1 = **(const Tdata**)a1;
+  const Tdata& tdata2 = **(const Tdata**)a2;
+  require(tdata1.m_cnt == tdata2.m_cnt);
+  const Uint32 cnt = tdata1.m_cnt;
+  Uint32 num_eq = ~(Uint32)0;
+  int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq);
+  require(num_eq <= (Uint32)tdata1.m_cnt);
+  require(num_eq <= (Uint32)tdata2.m_cnt);
+  return res;
+}
+
+void
+Tdatalist::sort()
+{
+  ll1("data sort: in");
+  ll3(endl << *this);
+  qsort(m_tdata, m_cnt, sizeof(Tdata*), data_cmp);
+  ll1("data sort: out");
+  ll3(endl << *this);
+  int i;
+  for (i = 0; i + 1 < m_cnt; i++) {
+    const Tdata& tdata1 = *m_tdata[i];
+    const Tdata& tdata2 = *m_tdata[i + 1];
+    require(tdata1.m_cnt == tdata2.m_cnt);
+    const Uint32 cnt = tdata1.m_cnt;
+    Uint32 num_eq1 = ~(Uint32)0;
+    int res = tdata1.m_data.cmp(tdata2.m_data, cnt, num_eq1);
+    chk1(res <= 0);
+    // also via unpacked data
+    int num_eq2 = -1;
+    int res2 = tdata1.xcmp(tdata2, &num_eq2);
+    if (res < 0)
+      chk1(res2 < 0);
+    else if (res == 0)
+      chk1(res2 == 0);
+    else
+      chk1(res2 > 0);
+    chk1(num_eq1 == (Uint32)num_eq2);
+  }
+}
+
+struct Tboundlist {
+  enum { Max = 1000 };
+  Tbound* m_tbound[Max];
+  int m_cnt;
+  Tboundlist(Tspec& tspec) {
+    m_cnt = bound_cnt == -1 ? Max : bound_cnt;
+    int i;
+    for (i = 0; i < m_cnt; i++) {
+      Tdata* tdata = new Tdata(tspec, true, 0);
+      m_tbound[i] = new Tbound(*tdata);
+    }
+  }
+  ~Tboundlist() {
+    int i;
+    for (i = 0; i < m_cnt; i++) {
+      Tdata* tdata = &m_tbound[i]->m_tdata;
+      delete m_tbound[i];
+      delete tdata;
+    }
+  }
+  void create();
+  void sort();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Tboundlist& tboundlist)
+{
+  int i;
+  for (i = 0; i < tboundlist.m_cnt; i++) {
+    out << "bound " << i << ": " << *tboundlist.m_tbound[i];
+    if (i + 1 < tboundlist.m_cnt)
+      out << endl;
+  }
+  return out;
+}
+
+void
+Tboundlist::create()
+{
+  int i;
+  for (i = 0; i < m_cnt; i++) {
+    Tbound& tbound = *m_tbound[i];
+    tbound.create();
+    tbound.add();
+    tbound.finalize();
+  }
+}
+
+static int
+bound_cmp(const void* a1, const void* a2)
+{
+  const Tbound& tbound1 = **(const Tbound**)a1;
+  const Tbound& tbound2 = **(const Tbound**)a2;
+  const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
+  Uint32 num_eq = ~(Uint32)0;
+  int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq);
+  require(num_eq <= cnt);
+  require(num_eq <= cnt);
+  return res;
+}
+
+void
+Tboundlist::sort()
+{
+  ll1("bound sort: in");
+  ll3(endl << *this);
+  qsort(m_tbound, m_cnt, sizeof(Tbound*), bound_cmp);
+  ll1("bound sort: out");
+  ll3(endl << *this);
+  int i;
+  for (i = 0; i + 1 < m_cnt; i++) {
+    const Tbound& tbound1 = *m_tbound[i];
+    const Tbound& tbound2 = *m_tbound[i + 1];
+    const Uint32 cnt = xmin(tbound1.m_tdata.m_cnt, tbound2.m_tdata.m_cnt);
+    Uint32 num_eq1 = ~(Uint32)0;
+    int res = tbound1.m_bound.cmp(tbound2.m_bound, cnt, num_eq1);
+    chk1(res <= 0);
+    // also via unpacked data
+    int num_eq2 = -1;
+    int res2 = tbound1.xcmp(tbound2, &num_eq2);
+    if (res < 0)
+      chk1(res2 < 0);
+    else if (res == 0)
+      chk1(res2 == 0);
+    else
+      chk1(res2 > 0);
+    chk1(num_eq1 == (Uint32)num_eq2);
+  }
+}
+
+static void
+testdesc(const Tdata& tdata)
+{
+  ll3("testdesc: " << tdata);
+  const Tspec& tspec = tdata.m_tspec;
+  const NdbPack::Data& data = tdata.m_data;
+  const Uint8* buf_old = (const Uint8*)data.get_full_buf();
+  const Uint32 varBytes = data.get_var_bytes();
+  const Uint32 nullMaskLen = tspec.m_spec.get_nullmask_len(false);
+  const Uint32 dataLen = data.get_data_len();
+  const Uint32 fullLen = data.get_full_len();
+  const Uint32 cnt = data.get_cnt();
+  chk1(fullLen == varBytes + dataLen);
+  NdbPack::Data data_new(tspec.m_spec, false, varBytes);
+  Uint8 buf_new[Tspec::MaxBuf];
+  data_new.set_buf(buf_new, sizeof(buf_new));
+  memcpy(buf_new, buf_old, fullLen);
+  chk2(data_new.desc_all(cnt) == 0, data_new);
+  chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
+  chk1(data_new.get_data_len() == data.get_data_len());
+  chk1(data_new.get_cnt() == data.get_cnt());
+  chk1(data_new.get_null_cnt() == data.get_null_cnt());
+}
+
+static void
+testcopy(const Tdata& tdata)
+{
+  ll3("testcopy: " << tdata);
+  const Tspec& tspec = tdata.m_tspec;
+  const NdbPack::Data& data = tdata.m_data;
+  uint n = getrandom(tdata.m_cnt + 1);
+  do {
+    ll3("testcopy: cnt:" << tdata.m_cnt << " n:" << n);
+    NdbPack::DataC data_old(tspec.m_spec, false);
+    data_old.set_buf(data.get_data_buf(), data.get_data_len(), n);
+    chk1(data_old.get_cnt() == n);
+    NdbPack::Data data_new(tspec.m_spec, false, 0);
+    Uint8 buf_new[Tspec::MaxBuf];
+    data_new.set_buf(buf_new, sizeof(buf_new));
+    chk2(data_new.copy(data_old) == 0, data_new);
+    chk1(data_new.get_cnt() == n);
+    Uint32 num_eq1 = ~(Uint32)0;
+    chk1(data_new.cmp(data_old, n, num_eq1) == 0);
+    chk1(num_eq1 == n);
+    Uint32 num_eq2 = ~(Uint32)0;
+    chk1(data_old.cmp(data_new, n, num_eq2) == 0);
+    chk1(num_eq2 == n);
+    n = getrandom(n);
+  } while (n != 0);
+}
+
+static void
+testpoai(const Tdata& tdata)
+{
+  ll3("testpoai: " << tdata);
+  const Tspec& tspec = tdata.m_tspec;
+  const NdbPack::Data& data = tdata.m_data;
+  NdbPack::Data data_new(tspec.m_spec, false, data.get_var_bytes());
+  Uint8 buf_new[Tspec::MaxBuf];
+  data_new.set_buf(buf_new, sizeof(buf_new));
+  Uint32 poaiLen = ~(Uint32)0;
+  chk2(data_new.add_poai(tdata.m_poaiBuf, tdata.m_cnt, &poaiLen) == 0, data);
+  chk2(data_new.finalize() == 0, data_new);
+  chk2(data_new.validate() == 0, data_new);
+  chk1(tspec.m_spec.get_nullmask_len(false) + poaiLen == data.get_data_len());
+  chk1(data_new.get_full_len() == data.get_full_len());
+  chk1(memcmp(data_new.get_full_buf(), data.get_full_buf(), data.get_full_len()) == 0);
+  chk1(data_new.get_null_cnt() == data.get_null_cnt());
+}
+
+static void
+testconvert(const Tdata& tdata)
+{
+  ll3("testconvert: " << tdata);
+  const Tspec& tspec = tdata.m_tspec;
+  const NdbPack::Data& data = tdata.m_data;
+  NdbPack::Data data_new(tspec.m_spec, false, 2);
+  Uint8 buf_new[Tspec::MaxBuf];
+  data_new.set_buf(buf_new, sizeof(buf_new));
+  chk2(data_new.copy(data) == 0, data_new);
+  require(tdata.m_cnt == (int)data.get_cnt());
+  require(data.get_cnt() == data_new.get_cnt());
+  const Uint32 cnt = tdata.m_cnt;
+  Uint32 num_eq;
+  switch (NdbPack::Endian::get_endian()) {
+  case NdbPack::Endian::Little:
+    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+    num_eq = ~(Uint32)0;
+    chk1(data.cmp(data_new, cnt, num_eq) == 0);
+    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+    num_eq = ~(Uint32)0;
+    chk1(data.cmp(data_new, cnt, num_eq) == 0);
+    break;
+  case NdbPack::Endian::Big:
+    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+    num_eq = ~(Uint32)0;
+    chk1(data.cmp(data_new, cnt, num_eq) == 0);
+    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
+    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
+    num_eq = ~(Uint32)0;
+    chk1(data.cmp(data_new, cnt, num_eq) == 0);
+    break;
+  default:
+    require(false);
+    break;
+  }
+}
+
+static void
+testdata(const Tdatalist& tdatalist)
+{
+  int i;
+  for (i = 0; i < tdatalist.m_cnt; i++) {
+    const Tdata& tdata = *tdatalist.m_tdata[i];
+    testdesc(tdata);
+    testcopy(tdata);
+    testpoai(tdata);
+    testconvert(tdata);
+  }
+}
+
+static void
+testcmp(const Tbound& tbound, const Tdatalist& tdatalist, int* kb)
+{
+  ll3("testcmp: " << tbound);
+  int oldres = 0;
+  int n1 = 0;
+  int n2 = 0;
+  int i;
+  for (i = 0; i < tdatalist.m_cnt; i++) {
+    const Tdata& tdata = *tdatalist.m_tdata[i];
+    require(tbound.m_tdata.m_cnt == (int)tbound.m_bound.get_data().get_cnt());
+    const Uint32 cnt = tbound.m_tdata.m_cnt;
+    Uint32 num_eq1 = ~(Uint32)0;
+    // reverse result for key vs bound
+    int res = (-1) * tbound.m_bound.cmp(tdata.m_data, cnt, num_eq1);
+    chk1(res != 0);
+    res = (res < 0 ? (n1++, -1) : (n2++, +1));
+    if (i > 0) {
+      // at some point flips from -1 to +1
+      chk1(oldres <= res);
+    }
+    oldres = res;
+    // also via unpacked data
+    int num_eq2 = -1;
+    int res2 = (-1) * tbound.xcmp(tdata, &num_eq2);
+    if (res < 0)
+      chk1(res2 < 0);
+    else
+      chk1(res2 > 0);
+    chk1(num_eq1 == (Uint32)num_eq2);
+  }
+  require(n1 + n2 == tdatalist.m_cnt);
+  ll2("keys before:" << n1 << " after:" << n2);
+  *kb = n1;
+}
+
+static void
+testcmp(const Tboundlist& tboundlist, const Tdatalist& tdatalist)
+{
+  int i;
+  int oldkb = 0;
+  for (i = 0; i < tboundlist.m_cnt; i++) {
+    const Tbound& tbound = *tboundlist.m_tbound[i];
+    int kb = 0;
+    testcmp(tbound, tdatalist, &kb);
+    if (i > 0) {
+      chk1(oldkb <= kb);
+    }
+    oldkb = kb;
+  }
+}
+
+static void
+testrun()
+{
+  Tspec tspec;
+  tspec.create();
+  ll1("spec: " << tspec);
+  Tdatalist tdatalist(tspec);
+  tdatalist.create();
+  tdatalist.sort();
+  testdata(tdatalist);
+  if (bound_cnt != 0) {
+    Tboundlist tboundlist(tspec);
+    tboundlist.create();
+    tboundlist.sort();
+    testcmp(tboundlist, tdatalist);
+  }
+}
+
+extern void NdbOut_Init();
+
+static int
+testmain()
+{
+  my_init();
+  NdbOut_Init();
+  signal(SIGABRT, SIG_DFL);
+  { const char* p = NdbEnv_GetEnv("TEST_NDB_PACK_VERBOSE", (char*)0, 0);
+    if (p != 0)
+      verbose = atoi(p);
+  }
+  if (seed == 0)
+    ll0("random seed: loop number");
+  else {
+    if (seed < 0)
+      seed = getpid();
+    ll0("random seed: " << seed);
+    ndb_srand(seed);
+  }
+  loops = 100;
+  int i;
+  for (i = 0; loops == 0 || i < loops; i++) {
+    ll0("loop:" << i << "/" << loops);
+    if (seed == 0)
+      ndb_srand(i);
+    testrun();
+  }
+  // do not print "ok" in TAPTEST
+  ndbout << "passed" << endl;
+  return 0;
+}
+
+TAPTEST(NdbPack)
+{
+  int ret = testmain();
+  return (ret == 0);
+}
+
+#endif

=== modified file 'storage/ndb/src/common/util/NdbSqlUtil.cpp'
--- a/storage/ndb/src/common/util/NdbSqlUtil.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/common/util/NdbSqlUtil.cpp	2011-05-04 16:04:33 +0000
@@ -17,6 +17,7 @@
 
 #include <NdbSqlUtil.hpp>
 #include <ndb_version.h>
+#include <math.h>
 
 /*
  * Data types.  The entries must be in the numerical order.
@@ -222,243 +223,185 @@ NdbSqlUtil::getType(Uint32 typeId)
   return m_typeList[Type::Undefined];
 }
 
-const NdbSqlUtil::Type&
-NdbSqlUtil::getTypeBinary(Uint32 typeId)
-{
-  switch (typeId) {
-  case Type::Char:
-  case Type::Varchar:
-  case Type::Binary:
-  case Type::Varbinary:
-  case Type::Longvarchar:
-  case Type::Longvarbinary:
-    typeId = Type::Binary;
-    break;
-  case Type::Text:
-    typeId = Type::Blob;
-    break;
-  default:
-    break;
-  }
-  return getType(typeId);
-}
-
 /*
  * Comparison functions.
  */
 
 int
-NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpTinyint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Int8)) {
-    Int8 v1, v2;
-    memcpy(&v1, p1, sizeof(Int8));
-    memcpy(&v2, p2, sizeof(Int8));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 1 && n2 == 1);
+  Int8 v1, v2;
+  memcpy(&v1, p1, 1);
+  memcpy(&v2, p2, 1);
+  int w1 = (int)v1;
+  int w2 = (int)v2;
+  return w1 - w2;
 }
 
 int
-NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpTinyunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Uint8)) {
-    Uint8 v1, v2;
-    memcpy(&v1, p1, sizeof(Uint8));
-    memcpy(&v2, p2, sizeof(Uint8));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 1 && n2 == 1);
+  Uint8 v1, v2;
+  memcpy(&v1, p1, 1);
+  memcpy(&v2, p2, 1);
+  int w1 = (int)v1;
+  int w2 = (int)v2;
+  return w1 - w2;
 }
 
 int
-NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpSmallint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Int16)) {
-    Int16 v1, v2;
-    memcpy(&v1, p1, sizeof(Int16));
-    memcpy(&v2, p2, sizeof(Int16));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 2 && n2 == 2);
+  Int16 v1, v2;
+  memcpy(&v1, p1, 2);
+  memcpy(&v2, p2, 2);
+  int w1 = (int)v1;
+  int w2 = (int)v2;
+  return w1 - w2;
 }
 
 int
-NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpSmallunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Uint16)) {
-    Uint16 v1, v2;
-    memcpy(&v1, p1, sizeof(Uint16));
-    memcpy(&v2, p2, sizeof(Uint16));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 2 && n2 == 2);
+  Uint16 v1, v2;
+  memcpy(&v1, p1, 2);
+  memcpy(&v2, p2, 2);
+  int w1 = (int)v1;
+  int w2 = (int)v2;
+  return w1 - w2;
 }
 
 int
-NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpMediumint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= 3) {
-    Int32 v1, v2;
-    v1 = sint3korr((const uchar*)p1);
-    v2 = sint3korr((const uchar*)p2);
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 3 && n2 == 3);
+  uchar b1[4];
+  uchar b2[4];
+  memcpy(b1, p1, 3);
+  b1[3] = 0;
+  memcpy(b2, p2, 3);
+  b2[3] = 0;
+  int w1 = (int)sint3korr(b1);
+  int w2 = (int)sint3korr(b2);
+  return w1 - w2;
 }
 
 int
-NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpMediumunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= 3) {
-    Uint32 v1, v2;
-    v1 = uint3korr((const uchar*)p1);
-    v2 = uint3korr((const uchar*)p2);
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 3 && n2 == 3);
+  uchar b1[4];
+  uchar b2[4];
+  memcpy(b1, p1, 3);
+  b1[3] = 0;
+  memcpy(b2, p2, 3);
+  b2[3] = 0;
+  int w1 = (int)uint3korr(b1);
+  int w2 = (int)uint3korr(b2);
+  return w1 - w2;
 }
 
 int
-NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpInt(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Int32)) {
-    Int32 v1, v2;
-    memcpy(&v1, p1, sizeof(Int32));
-    memcpy(&v2, p2, sizeof(Int32));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 4 && n2 == 4);
+  Int32 v1, v2;
+  memcpy(&v1, p1, 4);
+  memcpy(&v2, p2, 4);
+  if (v1 < v2)
+    return -1;
+  if (v1 > v2)
+    return +1;
+  return 0;
 }
 
 int
-NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpUnsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Uint32)) {
-    Uint32 v1, v2;
-    memcpy(&v1, p1, sizeof(Uint32));
-    memcpy(&v2, p2, sizeof(Uint32));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 4 && n2 == 4);
+  Uint32 v1, v2;
+  memcpy(&v1, p1, 4);
+  memcpy(&v2, p2, 4);
+  if (v1 < v2)
+    return -1;
+  if (v1 > v2)
+    return +1;
+  return 0;
 }
 
 int
-NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBigint(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Int64)) {
-    Int64 v1, v2;
-    memcpy(&v1, p1, sizeof(Int64));
-    memcpy(&v2, p2, sizeof(Int64));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 8 && n2 == 8);
+  Int64 v1, v2;
+  memcpy(&v1, p1, 8);
+  memcpy(&v2, p2, 8);
+  if (v1 < v2)
+    return -1;
+  if (v1 > v2)
+    return +1;
+  return 0;
 }
 
 int
-NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBigunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Uint64)) {
-    Uint64 v1, v2;
-    memcpy(&v1, p1, sizeof(Uint64));
-    memcpy(&v2, p2, sizeof(Uint64));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 8 && n2 == 8);
+  Uint64 v1, v2;
+  memcpy(&v1, p1, 8);
+  memcpy(&v2, p2, 8);
+  if (v1 < v2)
+    return -1;
+  if (v1 > v2)
+    return +1;
+  return 0;
 }
 
 int
-NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpFloat(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(float)) {
-    float v1, v2;
-    memcpy(&v1, p1, sizeof(float));
-    memcpy(&v2, p2, sizeof(float));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 4 && n2 == 4);
+  float v1, v2;
+  memcpy(&v1, p1, 4);
+  memcpy(&v2, p2, 4);
+  require(!isnan(v1) && !isnan(v2));
+  if (v1 < v2)
+    return -1;
+  if (v1 > v2)
+    return +1;
+  return 0;
 }
 
 int
-NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDouble(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(double)) {
-    double v1, v2;
-    memcpy(&v1, p1, sizeof(double));
-    memcpy(&v2, p2, sizeof(double));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 8 && n2 == 8);
+  double v1, v2;
+  memcpy(&v1, p1, 8);
+  memcpy(&v2, p2, 8);
+  require(!isnan(v1) && !isnan(v2));
+  if (v1 < v2)
+    return -1;
+  if (v1 > v2)
+    return +1;
+  return 0;
 }
 
 int
-NdbSqlUtil::cmp_olddecimal(const uchar* s1, const uchar* s2, unsigned n)
+NdbSqlUtil::cmpOlddecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
+  assert(info == 0 && n1 == n2);
+  const uchar* v1 = (const uchar*)p1;
+  const uchar* v2 = (const uchar*)p2;
   int sgn = +1;
   unsigned i = 0;
-  while (i < n) {
-    int c1 = s1[i];
-    int c2 = s2[i];
+  while (i < n1) {
+    int c1 = v1[i];
+    int c2 = v2[i];
     if (c1 == c2) {
       if (c1 == '-')
         sgn = -1;
@@ -477,225 +420,121 @@ NdbSqlUtil::cmp_olddecimal(const uchar*
 }
 
 int
-NdbSqlUtil::cmpOlddecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpOlddecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (full) {
-    assert(n1 == n2);
-    const uchar* v1 = (const uchar*)p1;
-    const uchar* v2 = (const uchar*)p2;
-    return cmp_olddecimal(v1, v2, n1);
-  }
-  return CmpUnknown;
+  return cmpOlddecimal(info, p1, n1, p2, n2);
 }
 
 int
-NdbSqlUtil::cmpOlddecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (full) {
-    assert(n1 == n2);
-    const uchar* v1 = (const uchar*)p1;
-    const uchar* v2 = (const uchar*)p2;
-    return cmp_olddecimal(v1, v2, n1);
-  }
-  return CmpUnknown;
+  return cmpBinary(info, p1, n1, p2, n2);
 }
 
 int
-NdbSqlUtil::cmpDecimal(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  const uchar* v1 = (const uchar*)p1;
-  const uchar* v2 = (const uchar*)p2;
-  // compare as binary strings
-  unsigned n = (n1 <= n2 ? n1 : n2);
-  int k = memcmp(v1, v2, n);
-  if (k == 0) {
-    k = (full ? n1 : n) - n2;
-  }
-  return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
+  return cmpBinary(info, p1, n1, p2, n2);
 }
 
 int
-NdbSqlUtil::cmpDecimalunsigned(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
+  // allow different lengths
+  assert(info != 0);
   const uchar* v1 = (const uchar*)p1;
   const uchar* v2 = (const uchar*)p2;
-  // compare as binary strings
-  unsigned n = (n1 <= n2 ? n1 : n2);
-  int k = memcmp(v1, v2, n);
-  if (k == 0) {
-    k = (full ? n1 : n) - n2;
-  }
-  return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
+  CHARSET_INFO* cs = (CHARSET_INFO*)info;
+  // compare with space padding
+  int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false);
+  return k;
 }
 
 int
-NdbSqlUtil::cmpChar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  // collation does not work on prefix for some charsets
-  assert(full);
+  assert(info != 0);
+  const uint lb = 1;
   const uchar* v1 = (const uchar*)p1;
   const uchar* v2 = (const uchar*)p2;
-  // not const in MySQL
-  CHARSET_INFO* cs = (CHARSET_INFO*)(info);
+  uint m1 = v1[0];
+  uint m2 = v2[0];
+  require(lb + m1 <= n1 && lb + m2 <= n2);
+  CHARSET_INFO* cs = (CHARSET_INFO*)info;
   // compare with space padding
-  int k = (*cs->coll->strnncollsp)(cs, v1, n1, v2, n2, false);
-  return k < 0 ? -1 : k > 0 ? +1 : 0;
+  int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
+  return k;
 }
 
 int
-NdbSqlUtil::cmpVarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  const unsigned lb = 1;
-  // collation does not work on prefix for some charsets
-  assert(full && n1 >= lb && n2 >= lb);
+  // allow different lengths
+  assert(info == 0);
   const uchar* v1 = (const uchar*)p1;
   const uchar* v2 = (const uchar*)p2;
-  unsigned m1 = *v1;
-  unsigned m2 = *v2;
-  if (m1 <= n1 - lb && m2 <= n2 - lb) {
-    CHARSET_INFO* cs = (CHARSET_INFO*)(info);
-    // compare with space padding
-    int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
-    return k < 0 ? -1 : k > 0 ? +1 : 0;
+  int k = 0;
+  if (n1 < n2) {
+    k = memcmp(v1, v2, n1);
+    if (k == 0)
+      k = -1;
+  } else if (n1 > n2) {
+    k = memcmp(v1, v2, n2);
+    if (k == 0)
+      k = +1;
+  } else {
+    k = memcmp(v1, v2, n1);
   }
-  // treat bad data as NULL
-  if (m1 > n1 - lb && m2 <= n2 - lb)
-    return -1;
-  if (m1 <= n1 - lb && m2 > n2 - lb)
-    return +1;
-  return 0;
+  return k;
 }
 
 int
-NdbSqlUtil::cmpBinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
+  assert(info == 0);
+  const uint lb = 1;
   const uchar* v1 = (const uchar*)p1;
   const uchar* v2 = (const uchar*)p2;
-  // compare as binary strings
-  unsigned n = (n1 <= n2 ? n1 : n2);
-  int k = memcmp(v1, v2, n);
-  if (k == 0) {
-    k = (full ? n1 : n) - n2;
-  }
-  return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
+  uint m1 = v1[0];
+  uint m2 = v2[0];
+  require(lb + m1 <= n1 && lb + m2 <= n2);
+  int k = cmpBinary(info, v1 + lb, m1, v2 + lb, m2);
+  return k;
 }
 
 int
-NdbSqlUtil::cmpVarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  const unsigned lb = 1;
-  if (n2 >= lb) {
-    assert(n1 >= lb);
-    const uchar* v1 = (const uchar*)p1;
-    const uchar* v2 = (const uchar*)p2;
-    unsigned m1 = *v1;
-    unsigned m2 = *v2;
-    if (m1 <= n1 - lb && m2 <= n2 - lb) {
-      // compare as binary strings
-      unsigned m = (m1 <= m2 ? m1 : m2);
-      int k = memcmp(v1 + lb, v2 + lb, m);
-      if (k == 0) {
-        k = (full ? m1 : m) - m2;
-      }
-      return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
-    }
-    // treat bad data as NULL
-    if (m1 > n1 - lb && m2 <= n2 - lb)
-      return -1;
-    if (m1 <= n1 - lb && m2 > n2 - lb)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 8 && n2 == 8);
+  Int64 v1, v2;
+  memcpy(&v1, p1, sizeof(Int64));
+  memcpy(&v2, p2, sizeof(Int64));
+  if (v1 < v2)
+    return -1;
+  if (v1 > v2)
+    return +1;
+  return 0;
 }
 
 int
-NdbSqlUtil::cmpDatetime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpDate(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Int64)) {
-    Int64 v1, v2;
-    memcpy(&v1, p1, sizeof(Int64));
-    memcpy(&v2, p2, sizeof(Int64));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
-}
-
-int
-NdbSqlUtil::cmpDate(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
-{
-#ifdef ndb_date_is_4_byte_native_int
-  if (n2 >= sizeof(Int32)) {
-    Int32 v1, v2;
-    memcpy(&v1, p1, sizeof(Int32));
-    memcpy(&v2, p2, sizeof(Int32));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-#else
-#ifdef ndb_date_sol9x86_cc_xO3_madness
-  if (n2 >= 3) {
-    const uchar* v1 = (const uchar*)p1;
-    const uchar* v2 = (const uchar*)p2;
-    // from Field_newdate::val_int
-    Uint64 j1 = uint3korr(v1);
-    Uint64 j2 = uint3korr(v2);
-    j1 = (j1 % 32L)+(j1 / 32L % 16L)*100L + (j1/(16L*32L))*10000L;
-    j2 = (j2 % 32L)+(j2 / 32L % 16L)*100L + (j2/(16L*32L))*10000L;
-    if (j1 < j2)
-      return -1;
-    if (j1 > j2)
-      return +1;
-    return 0;
-  }
-#else
-  if (n2 >= 3) {
-    const uchar* v1 = (const uchar*)p1;
-    const uchar* v2 = (const uchar*)p2;
-    uint j1 = uint3korr(v1);
-    uint j2 = uint3korr(v2);
-    uint d1 = (j1 & 31);
-    uint d2 = (j2 & 31);
-    j1 = (j1 >> 5);
-    j2 = (j2 >> 5);
-    uint m1 = (j1 & 15);
-    uint m2 = (j2 & 15);
-    j1 = (j1 >> 4);
-    j2 = (j2 >> 4);
-    uint y1 = j1;
-    uint y2 = j2;
-    if (y1 < y2)
-      return -1;
-    if (y1 > y2)
-      return +1;
-    if (m1 < m2)
-      return -1;
-    if (m1 > m2)
-      return +1;
-    if (d1 < d2)
-      return -1;
-    if (d1 > d2)
-      return +1;
-    return 0;
-  }
-#endif
-#endif
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 3 && n2 == 3);
+  uchar b1[4];
+  uchar b2[4];
+  memcpy(b1, p1, 3);
+  b1[3] = 0;
+  memcpy(b2, p2, 3);
+  b2[3] = 0;
+  // from Field_newdate::val_int
+  int w1 = (int)uint3korr(b1);
+  int w2 = (int)uint3korr(b2);
+  return w1 - w2;
 }
 
 // not supported
 int
-NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBlob(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
   assert(false);
   return 0;
@@ -703,14 +542,14 @@ NdbSqlUtil::cmpBlob(const void* info, co
 
 // not supported
 int
-NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
   assert(false);
   return 0;
 }
 
 int
-NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 { 
   /* Bitfields are stored as 32-bit words
    * This means that a byte-by-byte comparison will not work on all platforms
@@ -733,7 +572,7 @@ NdbSqlUtil::cmpBit(const void* info, con
     memcpy(copyP1, p1, words << 2);
     memcpy(copyP2, p2, words << 2);
 
-    return cmpBit(info, copyP1, bytes, copyP2, bytes, full);
+    return cmpBit(info, copyP1, bytes, copyP2, bytes);
   }
 
   const Uint32* wp1= (const Uint32*) p1;
@@ -764,112 +603,81 @@ NdbSqlUtil::cmpBit(const void* info, con
 
 
 int
-NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= 3) {
-    const uchar* v1 = (const uchar*)p1;
-    const uchar* v2 = (const uchar*)p2;
-    // from Field_time::val_int
-    Int32 j1 = sint3korr(v1);
-    Int32 j2 = sint3korr(v2);
-    if (j1 < j2)
-      return -1;
-    if (j1 > j2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 3 && n2 == 3);
+  uchar b1[4];
+  uchar b2[4];
+  memcpy(b1, p1, 3);
+  b1[3] = 0;
+  memcpy(b2, p2, 3);
+  b2[3] = 0;
+  // from Field_time::val_int
+  int j1 = (int)sint3korr(b1);
+  int j2 = (int)sint3korr(b2);
+  if (j1 < j2)
+    return -1;
+  if (j1 > j2)
+    return +1;
+  return 0;
 }
 
 // not yet
 
 int
-NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  const unsigned lb = 2;
-  // collation does not work on prefix for some charsets
-  assert(full && n1 >= lb && n2 >= lb);
+  assert(info != 0);
+  const uint lb = 2;
   const uchar* v1 = (const uchar*)p1;
   const uchar* v2 = (const uchar*)p2;
-  unsigned m1 = uint2korr(v1);
-  unsigned m2 = uint2korr(v2);
-  if (m1 <= n1 - lb && m2 <= n2 - lb) {
-    CHARSET_INFO* cs = (CHARSET_INFO*)(info);
-    // compare with space padding
-    int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
-    return k < 0 ? -1 : k > 0 ? +1 : 0;
-  }
-  // treat bad data as NULL
-  if (m1 > n1 - lb && m2 <= n2 - lb)
-    return -1;
-  if (m1 <= n1 - lb && m2 > n2 - lb)
-    return +1;
-  return 0;
+  uint m1 = v1[0] | (v1[1] << 8);
+  uint m2 = v2[0] | (v2[1] << 8);
+  require(lb + m1 <= n1 && lb + m2 <= n2);
+  CHARSET_INFO* cs = (CHARSET_INFO*)info;
+  // compare with space padding
+  int k = (*cs->coll->strnncollsp)(cs, v1 + lb, m1, v2 + lb, m2, false);
+  return k;
 }
 
 int
-NdbSqlUtil::cmpLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpLongvarbinary(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  const unsigned lb = 2;
-  if (n2 >= lb) {
-    assert(n1 >= lb);
-    const uchar* v1 = (const uchar*)p1;
-    const uchar* v2 = (const uchar*)p2;
-    unsigned m1 = uint2korr(v1);
-    unsigned m2 = uint2korr(v2);
-    if (m1 <= n1 - lb && m2 <= n2 - lb) {
-      // compare as binary strings
-      unsigned m = (m1 <= m2 ? m1 : m2);
-      int k = memcmp(v1 + lb, v2 + lb, m);
-      if (k == 0) {
-        k = (full ? m1 : m) - m2;
-      }
-      return k < 0 ? -1 : k > 0 ? +1 : full ? 0 : CmpUnknown;
-    }
-    // treat bad data as NULL
-    if (m1 > n1 - lb && m2 <= n2 - lb)
-      return -1;
-    if (m1 <= n1 - lb && m2 > n2 - lb)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0);
+  const uint lb = 2;
+  const uchar* v1 = (const uchar*)p1;
+  const uchar* v2 = (const uchar*)p2;
+  uint m1 = v1[0] | (v1[1] << 8);
+  uint m2 = v2[0] | (v2[1] << 8);
+  require(lb + m1 <= n1 && lb + m2 <= n2);
+  int k = cmpBinary(info, v1 + lb, m1, v2 + lb, m2);
+  return k;
 }
 
 int
-NdbSqlUtil::cmpYear(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpYear(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Uint8)) {
-    Uint8 v1, v2;
-    memcpy(&v1, p1, sizeof(Uint8));
-    memcpy(&v2, p2, sizeof(Uint8));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 1 && n2 == 1);
+  Uint8 v1, v2;
+  memcpy(&v1, p1, 1);
+  memcpy(&v2, p2, 1);
+  int w1 = (int)v1;
+  int w2 = (int)v2;
+  return w1 - w2;
 }
 
 int
-NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
+NdbSqlUtil::cmpTimestamp(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2)
 {
-  if (n2 >= sizeof(Uint32)) {
-    Uint32 v1, v2;
-    memcpy(&v1, p1, sizeof(Uint32));
-    memcpy(&v2, p2, sizeof(Uint32));
-    if (v1 < v2)
-      return -1;
-    if (v1 > v2)
-      return +1;
-    return 0;
-  }
-  assert(! full);
-  return CmpUnknown;
+  assert(info == 0 && n1 == 4 && n2 == 4);
+  Uint32 v1, v2;
+  memcpy(&v1, p1, 4);
+  memcpy(&v2, p2, 4);
+  if (v1 < v2)
+    return -1;
+  if (v1 > v2)
+    return +1;
+  return 0;
 }
 
 // like

=== modified file 'storage/ndb/src/kernel/blocks/ERROR_codes.txt'
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt	2011-05-19 07:32:39 +0000
@@ -29,6 +29,7 @@ Next DBTUX 12010
 Next SUMA 13047
 Next LGMAN 15001
 Next TSMAN 16001
+Next DBSPJ 17000
 
 TESTING NODE FAILURE, ARBITRATION
 ---------------------------------

=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2011-05-02 13:36:19 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2011-05-19 07:32:39 +0000
@@ -1431,6 +1431,11 @@ void Cmvmi::execTAMPER_ORD(Signal* signa
     jam();
     tuserblockref = TSMAN_REF;
   }
+  else if (errNo < 18000)
+  {
+    jam();
+    tuserblockref = DBSPJ_REF;
+  }
   else if (errNo < 30000)
   {
     /*--------------------------------------------------------------------*/

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2011-05-17 11:41:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2011-05-18 09:07:07 +0000
@@ -1268,7 +1268,6 @@ private:
 
   // Variables to support record structures and their free lists
 
-  ApiConnectRecord *apiConnectRecord;
   Uint32 capiConnectFileSize;
 
   ConnectRecord *connectRecord;
@@ -1313,9 +1312,25 @@ private:
     2.4  C O M M O N    S T O R E D    V A R I A B L E S
     ----------------------------------------------------
   */
-  Uint32 cfirstVerifyQueue;
-  Uint32 clastVerifyQueue;
-  Uint32 cverifyQueueCounter;
+  struct DIVERIFY_queue
+  {
+    DIVERIFY_queue() {
+      cfirstVerifyQueue = clastVerifyQueue = RNIL;
+      cverifyQueueCounter = 0;
+      apiConnectRecord = 0;
+    }
+    Uint32 cfirstVerifyQueue;
+    Uint32 clastVerifyQueue;
+    Uint32 cverifyQueueCounter;
+    ApiConnectRecord *apiConnectRecord;
+  };
+
+  bool isEmpty(const DIVERIFY_queue&);
+  void enqueue(DIVERIFY_queue&, Ptr<ApiConnectRecord>);
+  void dequeue(DIVERIFY_queue&, Ptr<ApiConnectRecord> &);
+
+  DIVERIFY_queue c_diverify_queue[1];
+  Uint32 c_diverify_queue_cnt;
 
   /*------------------------------------------------------------------------*/
   /*       THIS VARIABLE KEEPS THE REFERENCES TO FILE RECORDS THAT DESCRIBE */

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	2011-02-15 11:41:27 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	2011-05-18 09:07:07 +0000
@@ -73,13 +73,16 @@ void Dbdih::initData()
   c_2pass_inr = false;
 }//Dbdih::initData()
 
-void Dbdih::initRecords() 
+void Dbdih::initRecords()
 {
   // Records with dynamic sizes
-  apiConnectRecord = (ApiConnectRecord*)
-    allocRecord("ApiConnectRecord", 
-                sizeof(ApiConnectRecord),
-                capiConnectFileSize);
+  for (Uint32 i = 0; i < c_diverify_queue_cnt; i++)
+  {
+    c_diverify_queue[i].apiConnectRecord = (ApiConnectRecord*)
+      allocRecord("ApiConnectRecord",
+                  sizeof(ApiConnectRecord),
+                  capiConnectFileSize);
+  }
 
   connectRecord = (ConnectRecord*)allocRecord("ConnectRecord",
                                               sizeof(ConnectRecord), 
@@ -306,7 +309,6 @@ Dbdih::Dbdih(Block_context& ctx):
                &Dbdih::execDIH_GET_TABINFO_CONF);
 #endif
 
-  apiConnectRecord = 0;
   connectRecord = 0;
   fileRecord = 0;
   fragmentstore = 0;
@@ -319,15 +321,20 @@ Dbdih::Dbdih(Block_context& ctx):
   c_nextNodeGroup = 0;
   c_fragments_per_node = 1;
   bzero(c_node_groups, sizeof(c_node_groups));
+  c_diverify_queue_cnt = 1;
 
 }//Dbdih::Dbdih()
 
-Dbdih::~Dbdih() 
+Dbdih::~Dbdih()
 {
-  deallocRecord((void **)&apiConnectRecord, "ApiConnectRecord", 
-                sizeof(ApiConnectRecord),
-                capiConnectFileSize);
-  
+  for (Uint32 i = 0; i<c_diverify_queue_cnt; i++)
+  {
+    deallocRecord((void **)&c_diverify_queue[i].apiConnectRecord,
+                  "ApiConnectRecord",
+                  sizeof(ApiConnectRecord),
+                  capiConnectFileSize);
+  }
+
   deallocRecord((void **)&connectRecord, "ConnectRecord",
                 sizeof(ConnectRecord), 
                 cconnectFileSize);

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-05-17 12:14:32 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-05-23 14:13:35 +0000
@@ -9004,12 +9004,13 @@ void Dbdih::execDIGETNODESREQ(Signal* si
   Uint32 fragId, newFragId = RNIL;
   DiGetNodesConf * const conf = (DiGetNodesConf *)&signal->theData[0];
   TabRecord* regTabDesc = tabRecord;
-  jamEntry();
+  EmulatedJamBuffer * jambuf = jamBuffer();
+  thrjamEntry(jambuf);
   ptrCheckGuard(tabPtr, ttabFileSize, regTabDesc);
 
   if (DictTabInfo::isOrderedIndex(tabPtr.p->tableType))
   {
-    jam();
+    thrjam(jambuf);
     tabPtr.i = tabPtr.p->primaryTableId;
     ptrCheckGuard(tabPtr, ctabFileSize, tabRecord);
   }
@@ -9027,7 +9028,7 @@ void Dbdih::execDIGETNODESREQ(Signal* si
     
     if (unlikely(fragId >= tabPtr.p->totalfragments))
     {
-      jam();
+      thrjam(jambuf);
       conf->zero= 1; //Indicate error;
       signal->theData[1]= ZUNDEFINED_FRAGMENT_ERROR;
       return;
@@ -9035,40 +9036,40 @@ void Dbdih::execDIGETNODESREQ(Signal* si
   }
   else if (tabPtr.p->method == TabRecord::HASH_MAP)
   {
-    jam();
+    thrjam(jambuf);
     Ptr<Hash2FragmentMap> ptr;
     g_hash_map.getPtr(ptr, map_ptr_i);
     fragId = ptr.p->m_map[hashValue % ptr.p->m_cnt];
 
     if (unlikely(new_map_ptr_i != RNIL))
     {
-      jam();
+      thrjam(jambuf);
       g_hash_map.getPtr(ptr, new_map_ptr_i);
       newFragId = ptr.p->m_map[hashValue % ptr.p->m_cnt];
       if (newFragId == fragId)
       {
-        jam();
+        thrjam(jambuf);
         newFragId = RNIL;
       }
     }
   }
   else if (tabPtr.p->method == TabRecord::LINEAR_HASH)
   {
-    jam();
+    thrjam(jambuf);
     fragId = hashValue & tabPtr.p->mask;
     if (fragId < tabPtr.p->hashpointer) {
-      jam();
+      thrjam(jambuf);
       fragId = hashValue & ((tabPtr.p->mask << 1) + 1);
     }//if
   }
   else if (tabPtr.p->method == TabRecord::NORMAL_HASH)
   {
-    jam();
+    thrjam(jambuf);
     fragId= hashValue % tabPtr.p->totalfragments;
   }
   else
   {
-    jam();
+    thrjam(jambuf);
     ndbassert(tabPtr.p->method == TabRecord::USER_DEFINED);
 
     /* User defined partitioning, but no distribution key passed */
@@ -9087,7 +9088,7 @@ void Dbdih::execDIGETNODESREQ(Signal* si
 
   if (unlikely(newFragId != RNIL))
   {
-    jam();
+    thrjam(jambuf);
     conf->reqinfo |= DiGetNodesConf::REORG_MOVING;
     getFragstore(tabPtr.p, newFragId, fragPtr);
     nodeCount = extractNodeInfo(fragPtr.p, conf->nodes + 2 + MAX_REPLICAS);
@@ -9201,6 +9202,64 @@ void Dbdih::initialiseFragstore()
   }//for    
 }//Dbdih::initialiseFragstore()
 
+inline
+bool
+Dbdih::isEmpty(const DIVERIFY_queue & q)
+{
+  return q.cverifyQueueCounter == 0;
+}
+
+inline
+void
+Dbdih::enqueue(DIVERIFY_queue & q, Ptr<ApiConnectRecord> conRecord)
+{
+  Uint32 first = q.cfirstVerifyQueue;
+  Uint32 last = q.clastVerifyQueue;
+  Uint32 count = q.cverifyQueueCounter;
+  ApiConnectRecord * apiConnectRecord = q.apiConnectRecord;
+
+  Ptr<ApiConnectRecord> tmp;
+  tmp.i = last;
+  if (last != RNIL)
+  {
+    tmp.i = last;
+    ptrCheckGuard(tmp, capiConnectFileSize, apiConnectRecord);
+    tmp.p->nextApi = conRecord.i;
+  }
+  else
+  {
+    ndbassert(count == 0);
+    first = conRecord.i;
+  }
+  q.cfirstVerifyQueue = first;
+  q.clastVerifyQueue = conRecord.i;
+  q.cverifyQueueCounter = count + 1;
+}
+
+inline
+void
+Dbdih::dequeue(DIVERIFY_queue & q, Ptr<ApiConnectRecord> & conRecord)
+{
+  Uint32 first = q.cfirstVerifyQueue;
+  Uint32 last = q.clastVerifyQueue;
+  Uint32 count = q.cverifyQueueCounter;
+  ApiConnectRecord * apiConnectRecord = q.apiConnectRecord;
+
+  conRecord.i = first;
+  ptrCheckGuard(conRecord, capiConnectFileSize, apiConnectRecord);
+  Uint32 next = conRecord.p->nextApi;
+  if (first == last)
+  {
+    ndbrequire(next == RNIL);
+    ndbassert(count == 1);
+    last = RNIL;
+  }
+  ndbrequire(count > 0);
+  q.cfirstVerifyQueue = next;
+  q.clastVerifyQueue = last;
+  q.cverifyQueueCounter = count - 1;
+}
+
 /*
   3.9   V E R I F I C A T I O N
   ****************************=
@@ -9212,13 +9271,14 @@ void Dbdih::initialiseFragstore()
   3.9.1     R E C E I V I N G  O F  V E R I F I C A T I O N   R E Q U E S T
   *************************************************************************
   */
-void Dbdih::execDIVERIFYREQ(Signal* signal) 
+void Dbdih::execDIVERIFYREQ(Signal* signal)
 {
-
-  jamEntry();
+  EmulatedJamBuffer * jambuf = jamBuffer();
+  thrjamEntry(jambuf);
   if ((getBlockCommit() == false) &&
-      (cfirstVerifyQueue == RNIL)) {
-    jam();
+      isEmpty(c_diverify_queue[0]))
+  {
+    thrjam(jambuf);
     /*-----------------------------------------------------------------------*/
     // We are not blocked and the verify queue was empty currently so we can
     // simply reply back to TC immediately. The method was called with 
@@ -9235,24 +9295,15 @@ void Dbdih::execDIVERIFYREQ(Signal* sign
   // Since we are blocked we need to put this operation last in the verify
   // queue to ensure that operation starts up in the correct order.
   /*-------------------------------------------------------------------------*/
-  ApiConnectRecordPtr tmpApiConnectptr;
   ApiConnectRecordPtr localApiConnectptr;
+  DIVERIFY_queue & q = c_diverify_queue[0];
 
-  cverifyQueueCounter++;
   localApiConnectptr.i = signal->theData[0];
-  tmpApiConnectptr.i = clastVerifyQueue;
-  ptrCheckGuard(localApiConnectptr, capiConnectFileSize, apiConnectRecord);
+  ptrCheckGuard(localApiConnectptr, capiConnectFileSize, q.apiConnectRecord);
   localApiConnectptr.p->apiGci = m_micro_gcp.m_new_gci;
   localApiConnectptr.p->nextApi = RNIL;
-  clastVerifyQueue = localApiConnectptr.i;
-  if (tmpApiConnectptr.i == RNIL) {
-    jam();
-    cfirstVerifyQueue = localApiConnectptr.i;
-  } else {
-    jam();
-    ptrCheckGuard(tmpApiConnectptr, capiConnectFileSize, apiConnectRecord);
-    tmpApiConnectptr.p->nextApi = localApiConnectptr.i;
-  }//if
+
+  enqueue(q, localApiConnectptr);
   emptyverificbuffer(signal, false);
   signal->theData[3] = 1; // Indicate no immediate return
   return;
@@ -9440,7 +9491,7 @@ Dbdih::execUPGRADE_PROTOCOL_ORD(Signal*
 void
 Dbdih::startGcpLab(Signal* signal, Uint32 aWaitTime) 
 {
-  if (cfirstVerifyQueue != RNIL)
+  if (! isEmpty(c_diverify_queue[0]))
   {
     // Previous global checkpoint is not yet completed.
     jam();
@@ -14672,26 +14723,18 @@ void Dbdih::createFileRw(Signal* signal,
   sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
 }//Dbdih::createFileRw()
 
-void Dbdih::emptyverificbuffer(Signal* signal, bool aContinueB) 
+void Dbdih::emptyverificbuffer(Signal* signal, bool aContinueB)
 {
-  if(cfirstVerifyQueue == RNIL){
+  if (isEmpty(c_diverify_queue[0]))
+  {
     jam();
     return;
   }//if
   ApiConnectRecordPtr localApiConnectptr;
   if(getBlockCommit() == false){
     jam();
-    ndbrequire(cverifyQueueCounter > 0);
-    cverifyQueueCounter--;
-    localApiConnectptr.i = cfirstVerifyQueue;
-    ptrCheckGuard(localApiConnectptr, capiConnectFileSize, apiConnectRecord);
+    dequeue(c_diverify_queue[0], localApiConnectptr);
     ndbrequire(localApiConnectptr.p->apiGci <= m_micro_gcp.m_current_gci);
-    cfirstVerifyQueue = localApiConnectptr.p->nextApi;
-    if (cfirstVerifyQueue == RNIL) {
-      jam();
-      ndbrequire(cverifyQueueCounter == 0);
-      clastVerifyQueue = RNIL;
-    }//if
     signal->theData[0] = localApiConnectptr.i;
     signal->theData[1] = (Uint32)(m_micro_gcp.m_current_gci >> 32);
     signal->theData[2] = (Uint32)(m_micro_gcp.m_current_gci & 0xFFFFFFFF);
@@ -15065,11 +15108,9 @@ void Dbdih::initCommonData()
   cfailurenr = 1;
   cfirstAliveNode = RNIL;
   cfirstDeadNode = RNIL;
-  cfirstVerifyQueue = RNIL;
   cgckptflag = false;
   cgcpOrderBlocked = 0;
 
-  clastVerifyQueue = RNIL;
   c_lcpMasterTakeOverState.set(LMTOS_IDLE, __LINE__);
 
   c_lcpState.clcpDelay = 0;
@@ -15103,7 +15144,6 @@ void Dbdih::initCommonData()
   cstarttype = (Uint32)-1;
   csystemnodes = 0;
   c_newest_restorable_gci = 0;
-  cverifyQueueCounter = 0;
   cwaitLcpSr = false;
   c_nodeStartMaster.blockGcp = 0;
 
@@ -15416,12 +15456,17 @@ void Dbdih::initialiseRecordsLab(Signal*
   case 1:{
     ApiConnectRecordPtr apiConnectptr;
     jam();
-    /******** INTIALIZING API CONNECT RECORDS ********/
-    for (apiConnectptr.i = 0; apiConnectptr.i < capiConnectFileSize; apiConnectptr.i++) {
-      refresh_watch_dog();
-      ptrAss(apiConnectptr, apiConnectRecord);
-      apiConnectptr.p->nextApi = RNIL;
-    }//for
+    for (Uint32 i = 0; i < c_diverify_queue_cnt; i++)
+    {
+      /******** INTIALIZING API CONNECT RECORDS ********/
+      for (apiConnectptr.i = 0;
+           apiConnectptr.i < capiConnectFileSize; apiConnectptr.i++)
+      {
+        refresh_watch_dog();
+        ptrAss(apiConnectptr, c_diverify_queue[i].apiConnectRecord);
+        apiConnectptr.p->nextApi = RNIL;
+      }//for
+    }
     jam();
     break;
   }
@@ -17208,11 +17253,16 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
   if (arg == DumpStateOrd::DihDumpNodeRestartInfo) {
     infoEvent("c_nodeStartMaster.blockLcp = %d, c_nodeStartMaster.blockGcp = %d, c_nodeStartMaster.wait = %d",
 	      c_nodeStartMaster.blockLcp, c_nodeStartMaster.blockGcp, c_nodeStartMaster.wait);
-    infoEvent("cfirstVerifyQueue = %d, cverifyQueueCounter = %d",
-              cfirstVerifyQueue, cverifyQueueCounter);
+    for (Uint32 i = 0; i < c_diverify_queue_cnt; i++)
+    {
+      infoEvent("[ %u : cfirstVerifyQueue = 0x%.8x, cverifyQueueCounter = %u ]",
+                i,
+                c_diverify_queue[i].cfirstVerifyQueue,
+                c_diverify_queue[i].cverifyQueueCounter);
+    }
     infoEvent("cgcpOrderBlocked = %d",
               cgcpOrderBlocked);
-  }//if  
+  }//if
   if (arg == DumpStateOrd::DihDumpNodeStatusInfo) {
     NodeRecordPtr localNodePtr;
     infoEvent("Printing nodeStatus of all nodes");

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2011-05-20 05:11:01 +0000
@@ -1740,11 +1740,12 @@ public:
    * specified by location of original tuple and version number.  Input
    * is attribute ids in AttributeHeader format.  Output is attribute
    * data with headers.  Uses readAttributes with xfrm option set.
+   * After wl4163, xfrm is not set.
    * Returns number of words or negative (-terrorCode) on error.
    */
   int tuxReadAttrs(EmulatedJamBuffer*,
                    Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion,
-                   const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut);
+                   const Uint32* attrIds, Uint32 numAttrs, Uint32* dataOut, bool xfrmFlag);
 
   /*
    * TUX reads primary key without headers into an array of words.  Used

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2011-05-20 05:11:01 +0000
@@ -2914,7 +2914,7 @@ int Dbtup::interpreterNextLab(Signal* si
 	    {
 	      return TUPKEY_abort(req_struct, 40);
 	    }
-            res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
+            res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen);
           }
 	} else {
           if ((cond == Interpreter::LIKE) ||

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2011-04-19 09:01:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2011-05-04 11:58:38 +0000
@@ -131,7 +131,8 @@ Dbtup::tuxReadAttrs(EmulatedJamBuffer *
                     Uint32 tupVersion,
                     const Uint32* attrIds,
                     Uint32 numAttrs,
-                    Uint32* dataOut)
+                    Uint32* dataOut,
+                    bool xfrmFlag)
 {
   thrjamEntry(jamBuf);
   // use own variables instead of globals
@@ -185,7 +186,7 @@ Dbtup::tuxReadAttrs(EmulatedJamBuffer *
                            numAttrs,
                            dataOut,
                            ZNIL,
-                           true);
+                           xfrmFlag);
 
   // done
   return ret;

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2011-04-25 16:46:59 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2011-05-17 12:19:20 +0000
@@ -30,6 +30,9 @@
 // big brother
 #include <dbtup/Dbtup.hpp>
 
+// packed index keys and bounds
+#include <NdbPack.hpp>
+
 // signal classes
 #include <signaldata/DictTabInfo.hpp>
 #include <signaldata/TuxContinueB.hpp>
@@ -118,44 +121,24 @@ private:
   // sizes are in words (Uint32)
   STATIC_CONST( MaxIndexFragments = MAX_FRAG_PER_NODE );
   STATIC_CONST( MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX );
-  /*
-   * Allow space for per-attribute overhead (at least bound type and
-   * attribute header) and key data xfrm-ed.  execTUX_BOUND_INFO unpacks
-   * all in same buffer so double the size.  The xfrm should disappear
-   * in 7.x wl#4163.
-   */
-  STATIC_CONST( MaxAttrDataSize =
-      (
-        4 * MAX_ATTRIBUTES_IN_INDEX +
-        MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY
-      ) * 2
-  );
-
+  STATIC_CONST( MaxAttrDataSize = 2 * MAX_ATTRIBUTES_IN_INDEX + MAX_KEY_SIZE_IN_WORDS );
+  STATIC_CONST( MaxXfrmDataSize = MaxAttrDataSize * MAX_XFRM_MULTIPLY);
 public:
-  STATIC_CONST( DescPageSize = 256 );
+  STATIC_CONST( DescPageSize = 512 );
 private:
   STATIC_CONST( MaxTreeNodeSize = MAX_TTREE_NODE_SIZE );
   STATIC_CONST( MaxPrefSize = MAX_TTREE_PREF_SIZE );
   STATIC_CONST( ScanBoundSegmentSize = 7 );
   STATIC_CONST( MaxAccLockOps = MAX_PARALLEL_OP_PER_SCAN );
   STATIC_CONST( MaxTreeDepth = 32 );    // strict
+#ifdef VM_TRACE
+  // for TuxCtx::c_debugBuffer
+  STATIC_CONST( DebugBufferBytes = (MaxAttrDataSize << 2) );
+#endif
   BLOCK_DEFINES(Dbtux);
 
   // forward declarations
   struct TuxCtx;
-  struct DescEnt;
-
-  // Pointer to array of Uint32 represents attribute data and bounds
-
-  typedef Uint32 *Data;
-  inline AttributeHeader& ah(Data data) {
-    return *reinterpret_cast<AttributeHeader*>(data);
-  }
-
-  typedef const Uint32* ConstData;
-  inline const AttributeHeader& ah(ConstData data) {
-    return *reinterpret_cast<const AttributeHeader*>(data);
-  }
 
   // AttributeHeader size is assumed to be 1 word
   STATIC_CONST( AttributeHeaderSize = 1 );
@@ -257,7 +240,7 @@ private:
     TupLoc m_root;              // root node
     TreeHead();
     // methods
-    Data getPref(TreeNode* node) const;
+    Uint32* getPref(TreeNode* node) const;
     TreeEnt* getEntList(TreeNode* node) const;
   };
 
@@ -281,7 +264,10 @@ private:
 
   /*
    * Descriptor page.  The "hot" metadata for an index is stored as
-   * a contiguous array of words on some page.
+   * contiguous array of words on some page.  It has 3 parts:
+   * 1) DescHead
+   * 2) array of NdbPack::Type used by NdbPack::Spec of index key
+   * 3) array of attr headers for reading index key values from TUP
    */
   struct DescPage;
   friend struct DescPage;
@@ -298,48 +284,36 @@ private:
   ArrayPool<DescPage> c_descPagePool;
   Uint32 c_descPageList;
 
-  /*
-   * Header for index metadata.  Size must be multiple of word size.
-   */
   struct DescHead {
-    unsigned m_indexId : 24;
-    unsigned pad1 : 8;
+    Uint32 m_indexId;
+    Uint16 m_numAttrs;
+    Uint16 m_magic;
+    enum { Magic = 0xDE5C };
   };
   STATIC_CONST( DescHeadSize = sizeof(DescHead) >> 2 );
 
-  /*
-   * Attribute metadata.  Size must be multiple of word size.
-   *
-   * Prefix comparison of char data must use strxfrm and binary
-   * comparison.  The charset is currently unused.
-   */
-  struct DescAttr {
-    Uint32 m_attrDesc;          // standard AttributeDescriptor
-    Uint16 m_primaryAttrId;
-    unsigned m_typeId : 6;
-    unsigned m_charset : 10;
-  };
-  STATIC_CONST( DescAttrSize = sizeof(DescAttr) >> 2 );
-
-  /*
-   * Complete metadata for one index. The array of attributes has
-   * variable size.
-   */
-  friend struct DescEnt;
-  struct DescEnt {
-    DescHead m_descHead;
-    DescAttr m_descAttr[1];     // variable size data
-  };
+  typedef NdbPack::Type KeyType;
+  typedef NdbPack::Spec KeySpec;
+  STATIC_CONST( KeyTypeSize = sizeof(KeyType) >> 2 );
+
+  typedef NdbPack::DataC KeyDataC;
+  typedef NdbPack::Data KeyData;
+  typedef NdbPack::BoundC KeyBoundC;
+  typedef NdbPack::Bound KeyBound;
 
   // range scan
- 
+
   /*
-   * Scan bounds are stored in linked list of segments.
+   * ScanBound instances are members of ScanOp.  Bound data is stored in
+   * a separate segmented buffer pool.
    */
-  typedef DataBuffer<ScanBoundSegmentSize> ScanBound;
-  typedef DataBuffer<ScanBoundSegmentSize>::ConstDataBufferIterator ScanBoundIterator;
-  typedef DataBuffer<ScanBoundSegmentSize>::DataBufferPool ScanBoundPool;
-  ScanBoundPool c_scanBoundPool;
+  struct ScanBound {
+    DataBuffer<ScanBoundSegmentSize>::Head m_head;
+    Uint16 m_cnt;       // number of attributes
+    Int16 m_side;
+    ScanBound();
+  };
+  DataBuffer<ScanBoundSegmentSize>::DataBufferPool c_scanBoundPool;
 
   // ScanLock
   struct ScanLock {
@@ -408,10 +382,7 @@ private:
     Uint8 m_readCommitted;      // no locking
     Uint8 m_lockMode;
     Uint8 m_descending;
-    ScanBound m_boundMin;
-    ScanBound m_boundMax;
-    ScanBound* m_bound[2];      // pointers to above 2
-    Uint16 m_boundCnt[2];       // number of bounds in each
+    ScanBound m_scanBound[2];
     TreePos m_scanPos;          // position
     TreeEnt m_scanEnt;          // latest entry found
     Uint32 m_nodeScan;          // next scan at node (single-linked)
@@ -420,7 +391,7 @@ private:
     Uint32 nextList;
     };
     Uint32 prevList;
-    ScanOp(ScanBoundPool& scanBoundPool);
+    ScanOp();
   };
   typedef Ptr<ScanOp> ScanOpPtr;
   ArrayPool<ScanOp> c_scanOpPool;
@@ -451,8 +422,11 @@ private:
     Uint32 m_descPage;          // descriptor page
     Uint16 m_descOff;           // offset within the page
     Uint16 m_numAttrs;
-    bool m_storeNullKey;
+    Uint16 m_prefAttrs;         // attributes in min prefix
+    Uint16 m_prefBytes;         // max bytes in min prefix
+    KeySpec m_keySpec;
     union {
+    bool m_storeNullKey;
     Uint32 nextPool;
     };
     Index();
@@ -473,10 +447,6 @@ private:
     Uint32 m_indexId;
     Uint16 unused;
     Uint16 m_fragId;
-    Uint32 m_descPage;          // copy from index level
-    Uint16 m_descOff;
-    Uint16 m_numAttrs;
-    bool m_storeNullKey;
     TreeHead m_tree;
     TupLoc m_freeLoc;           // one free node for next op
     DLList<ScanOp> m_scanList;  // current scans on this fragment
@@ -543,7 +513,7 @@ private:
     void setBalance(int b);
     void setNodeScan(Uint32 scanPtrI);
     // access other parts of the node
-    Data getPref();
+    Uint32* getPref();
     TreeEnt getEnt(unsigned pos);
     // for ndbrequire and ndbassert
     void progError(int line, int cause, const char* file);
@@ -560,11 +530,9 @@ private:
   void execNODE_STATE_REP(Signal* signal);
 
   // utils
-  void setKeyAttrs(TuxCtx&, const Frag& frag);
-  void readKeyAttrs(TuxCtx&, const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
-  void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
-  void copyAttrs(TuxCtx&, const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
-  void unpackBound(const ScanBound& bound, Data data);
+  void readKeyAttrs(TuxCtx&, const Frag& frag, TreeEnt ent, KeyData& keyData, Uint32 count);
+  void readTablePk(const Frag& frag, TreeEnt ent, Uint32* pkData, unsigned& pkSize);
+  void unpackBound(TuxCtx&, const ScanBound& bound, KeyBoundC& searchBound);
   void findFrag(const Index& index, Uint32 fragId, FragPtr& fragPtr);
 
   /*
@@ -657,20 +625,20 @@ private:
   /*
    * DbtuxSearch.cpp
    */
-  void findNodeToUpdate(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode);
-  bool findPosToAdd(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
-  bool findPosToRemove(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
-  bool searchToAdd(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
-  bool searchToRemove(TuxCtx&, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
-  void findNodeToScan(Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode);
-  void findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos);
-  void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
+  void findNodeToUpdate(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode);
+  bool findPosToAdd(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
+  bool findPosToRemove(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos);
+  bool searchToAdd(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos);
+  bool searchToRemove(TuxCtx&, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos);
+  void findNodeToScan(Frag& frag, unsigned dir, const KeyBoundC& searchBound, NodeHandle& currNode);
+  void findPosToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode, Uint16* pos);
+  void searchToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, TreePos& treePos);
 
   /*
    * DbtuxCmp.cpp
    */
-  int cmpSearchKey(TuxCtx&, const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
-  int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
+  int cmpSearchKey(TuxCtx&, const KeyDataC& searchKey, const KeyDataC& entryKey, Uint32 cnt);
+  int cmpSearchBound(TuxCtx&, const KeyBoundC& searchBound, const KeyDataC& entryKey, Uint32 cnt);
 
   /*
    * DbtuxStat.cpp
@@ -702,7 +670,7 @@ private:
   friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
   friend class NdbOut& operator<<(NdbOut&, const TreeHead&);
   friend class NdbOut& operator<<(NdbOut&, const TreePos&);
-  friend class NdbOut& operator<<(NdbOut&, const DescAttr&);
+  friend class NdbOut& operator<<(NdbOut&, const KeyType&);
   friend class NdbOut& operator<<(NdbOut&, const ScanOp&);
   friend class NdbOut& operator<<(NdbOut&, const Index&);
   friend class NdbOut& operator<<(NdbOut&, const Frag&);
@@ -738,30 +706,30 @@ private:
   {
     EmulatedJamBuffer * jamBuffer;
 
-    // index key attr ids with sizes in AttributeHeader format
-    Data c_keyAttrs;
-
-    // pointers to index key comparison functions
-    NdbSqlUtil::Cmp** c_sqlCmp;
+    // buffer for scan bound and search key data
+    Uint32* c_searchKey;
 
-    /*
-     * Other buffers used during the operation.
-     */
+    // buffer for current entry key data
+    Uint32* c_entryKey;
 
-    // buffer for search key data with headers
-    Data c_searchKey;
+    // buffer for xfrm-ed PK and for temporary use
+    Uint32* c_dataBuffer;
 
-    // buffer for current entry key data with headers
-    Data c_entryKey;
+#ifdef VM_TRACE
+    char* c_debugBuffer;
+#endif
   };
 
   struct TuxCtx c_ctx; // Global Tux context, for everything build MT-index build
 
-  // buffer for scan bounds and keyinfo (primary key)
-  Data c_dataBuffer;
-
   // inlined utils
-  DescEnt& getDescEnt(Uint32 descPage, Uint32 descOff);
+  Uint32 getDescSize(const Index& index);
+  DescHead& getDescHead(const Index& index);
+  KeyType* getKeyTypes(DescHead& descHead);
+  const KeyType* getKeyTypes(const DescHead& descHead);
+  AttributeHeader* getKeyAttrs(DescHead& descHead);
+  const AttributeHeader* getKeyAttrs(const DescHead& descHead);
+  //
   void getTupAddr(const Frag& frag, TreeEnt ent, Uint32& lkey1, Uint32& lkey2);
   static unsigned min(unsigned x, unsigned y);
   static unsigned max(unsigned x, unsigned y);
@@ -915,7 +883,7 @@ Dbtux::TreeHead::TreeHead() :
 {
 }
 
-inline Dbtux::Data
+inline Uint32*
 Dbtux::TreeHead::getPref(TreeNode* node) const
 {
   Uint32* ptr = (Uint32*)node + NodeHeadSize;
@@ -955,10 +923,20 @@ Dbtux::DescPage::DescPage() :
   }
 }
 
+// Dbtux::ScanBound
+
+inline
+Dbtux::ScanBound::ScanBound() :
+  m_head(),
+  m_cnt(0),
+  m_side(0)
+{
+}
+
 // Dbtux::ScanOp
 
 inline
-Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
+Dbtux::ScanOp::ScanOp() :
   m_state(Undef),
   m_lockwait(false),
   m_errorCode(0),
@@ -975,16 +953,11 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& sca
   m_readCommitted(0),
   m_lockMode(0),
   m_descending(0),
-  m_boundMin(scanBoundPool),
-  m_boundMax(scanBoundPool),
+  m_scanBound(),
   m_scanPos(),
   m_scanEnt(),
   m_nodeScan(RNIL)
 {
-  m_bound[0] = &m_boundMin;
-  m_bound[1] = &m_boundMax;
-  m_boundCnt[0] = 0;
-  m_boundCnt[1] = 0;
 }
 
 // Dbtux::Index
@@ -998,6 +971,9 @@ Dbtux::Index::Index() :
   m_descPage(RNIL),
   m_descOff(0),
   m_numAttrs(0),
+  m_prefAttrs(0),
+  m_prefBytes(0),
+  m_keySpec(),
   m_storeNullKey(false)
 {
   for (unsigned i = 0; i < MaxIndexFragments; i++) {
@@ -1013,17 +989,13 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& sca
   m_tableId(RNIL),
   m_indexId(RNIL),
   m_fragId(ZNIL),
-  m_descPage(RNIL),
-  m_descOff(0),
-  m_numAttrs(ZNIL),
-  m_storeNullKey(false),
   m_tree(),
   m_freeLoc(),
   m_scanList(scanOpPool),
-  m_tupIndexFragPtrI(RNIL)
+  m_tupIndexFragPtrI(RNIL),
+  m_tupTableFragPtrI(RNIL),
+  m_accTableFragPtrI(RNIL)
 {
-  m_tupTableFragPtrI = RNIL;
-  m_accTableFragPtrI = RNIL;
 }
 
 // Dbtux::FragOp
@@ -1157,7 +1129,7 @@ Dbtux::NodeHandle::setNodeScan(Uint32 sc
   m_node->m_nodeScan = scanPtrI;
 }
 
-inline Dbtux::Data
+inline Uint32*
 Dbtux::NodeHandle::getPref()
 {
   TreeHead& tree = m_frag.m_tree;
@@ -1193,15 +1165,60 @@ Dbtux::PrintPar::PrintPar() :
 
 // utils
 
-inline Dbtux::DescEnt&
-Dbtux::getDescEnt(Uint32 descPage, Uint32 descOff)
+inline Uint32
+Dbtux::getDescSize(const Index& index)
+{
+  return
+    DescHeadSize +
+    index.m_numAttrs * KeyTypeSize +
+    index.m_numAttrs * AttributeHeaderSize;
+}
+
+inline Dbtux::DescHead&
+Dbtux::getDescHead(const Index& index)
 {
   DescPagePtr pagePtr;
-  pagePtr.i = descPage;
+  pagePtr.i = index.m_descPage;
   c_descPagePool.getPtr(pagePtr);
-  ndbrequire(descOff < DescPageSize);
-  DescEnt* descEnt = (DescEnt*)&pagePtr.p->m_data[descOff];
-  return *descEnt;
+  ndbrequire(index.m_descOff < DescPageSize);
+  Uint32* ptr = &pagePtr.p->m_data[index.m_descOff];
+  DescHead* descHead = reinterpret_cast<DescHead*>(ptr);
+  ndbrequire(descHead->m_magic == DescHead::Magic);
+  return *descHead;
+}
+
+inline Dbtux::KeyType*
+Dbtux::getKeyTypes(DescHead& descHead)
+{
+  Uint32* ptr = reinterpret_cast<Uint32*>(&descHead);
+  ptr += DescHeadSize;
+  return reinterpret_cast<KeyType*>(ptr);
+}
+
+inline const Dbtux::KeyType*
+Dbtux::getKeyTypes(const DescHead& descHead)
+{
+  const Uint32* ptr = reinterpret_cast<const Uint32*>(&descHead);
+  ptr += DescHeadSize;
+  return reinterpret_cast<const KeyType*>(ptr);
+}
+
+inline AttributeHeader*
+Dbtux::getKeyAttrs(DescHead& descHead)
+{
+  Uint32* ptr = reinterpret_cast<Uint32*>(&descHead);
+  ptr += DescHeadSize;
+  ptr += descHead.m_numAttrs * KeyTypeSize;
+  return reinterpret_cast<AttributeHeader*>(ptr);
+}
+
+inline const AttributeHeader*
+Dbtux::getKeyAttrs(const DescHead& descHead)
+{
+  const Uint32* ptr = reinterpret_cast<const Uint32*>(&descHead);
+  ptr += DescHeadSize;
+  ptr += descHead.m_numAttrs * KeyTypeSize;
+  return reinterpret_cast<const AttributeHeader*>(ptr);
 }
 
 inline
@@ -1227,4 +1244,40 @@ Dbtux::max(unsigned x, unsigned y)
   return x > y ? x : y;
 }
 
+// DbtuxCmp.cpp
+
+inline int
+Dbtux::cmpSearchKey(TuxCtx& ctx, const KeyDataC& searchKey, const KeyDataC& entryKey, Uint32 cnt)
+{
+  // compare cnt attributes from each
+  Uint32 num_eq;
+  int ret = searchKey.cmp(entryKey, cnt, num_eq);
+#ifdef VM_TRACE
+  if (debugFlags & DebugMaint) {
+    debugOut << "cmpSearchKey: ret:" << ret;
+    debugOut << " search:" << searchKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+    debugOut << " entry:" << entryKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+    debugOut << endl;
+  }
+#endif
+  return ret;
+}
+
+inline int
+Dbtux::cmpSearchBound(TuxCtx& ctx, const KeyBoundC& searchBound, const KeyDataC& entryKey, Uint32 cnt)
+{
+  // compare cnt attributes from each
+  Uint32 num_eq;
+  int ret = searchBound.cmp(entryKey, cnt, num_eq);
+#ifdef VM_TRACE
+  if (debugFlags & DebugScan) {
+    debugOut << "cmpSearchBound: res:" << ret;
+    debugOut << " search:" << searchBound.print(ctx.c_debugBuffer, DebugBufferBytes);
+    debugOut << " entry:" << entryKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+    debugOut << endl;
+  }
+#endif
+  return ret;
+}
+
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp	2011-04-24 13:10:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxBuild.cpp	2011-05-17 12:19:20 +0000
@@ -48,16 +48,16 @@ Dbtux::mt_buildIndexFragment_wrapper(voi
     tux_ctx->jamBuffer = (EmulatedJamBuffer*)ptr;
     tux_ctx->jamBuffer->theEmulatedJamIndex = 0;
     ptr += (sizeof(EmulatedJamBuffer) + 3) / 4;
-    tux_ctx->c_keyAttrs = ptr;
-    ptr += MaxIndexAttributes;
-    while (UintPtr(ptr) & 7)
-      ptr++;
-    tux_ctx->c_sqlCmp = (NdbSqlUtil::Cmp**)ptr;
-    ptr += (sizeof(void*) *  MaxIndexAttributes) / sizeof(Uint32);
     tux_ctx->c_searchKey = ptr;
     ptr += MaxAttrDataSize;
     tux_ctx->c_entryKey = ptr;
     ptr += MaxAttrDataSize;
+    tux_ctx->c_dataBuffer = ptr;
+    ptr += MaxAttrDataSize;
+#ifdef VM_TRACE
+    tux_ctx->c_debugBuffer = (char*)ptr;
+    ptr += (DebugBufferBytes + 3) / 4;
+#endif
     if (!(UintPtr(ptr) - UintPtr(req->mem_buffer) <= req->buffer_size))
       abort();
   }
@@ -88,8 +88,6 @@ Dbtux::mt_buildIndexFragment(mt_BuildInd
   Frag& frag = *fragPtr.p;
 
   TuxCtx & ctx = * (TuxCtx*)req->tux_ctx_ptr;
-  // set up index keys for this operation
-  setKeyAttrs(ctx, frag);
 
   Local_key pos;
   Uint32 fragPtrI;
@@ -108,31 +106,19 @@ Dbtux::mt_buildIndexFragment(mt_BuildInd
     ent.m_tupLoc = TupLoc(pos.m_page_no, pos.m_page_idx);
     ent.m_tupVersion = pos.m_file_no; // used for version
 
-    // read search key
-    readKeyAttrs(ctx, frag, ent, 0, ctx.c_searchKey);
-    if (! frag.m_storeNullKey)
-    {
-      // check if all keys are null
-      const unsigned numAttrs = frag.m_numAttrs;
-      bool allNull = true;
-      for (unsigned i = 0; i < numAttrs; i++)
-      {
-        if (ctx.c_searchKey[i] != 0)
-        {
-          jam();
-          allNull = false;
-          break;
-        }
-      }
-      if (allNull)
-      {
-        jam();
-        continue;
-      }
+    // set up and read search key
+    KeyData searchKey(indexPtr.p->m_keySpec, false, 0);
+    searchKey.set_buf(ctx.c_searchKey, MaxAttrDataSize << 2);
+    readKeyAttrs(ctx, frag, ent, searchKey, indexPtr.p->m_numAttrs);
+
+    if (unlikely(! indexPtr.p->m_storeNullKey) &&
+        searchKey.get_null_cnt() == indexPtr.p->m_numAttrs) {
+      jam();
+      continue;
     }
 
     TreePos treePos;
-    bool ok = searchToAdd(ctx, frag, ctx.c_searchKey, ent, treePos);
+    bool ok = searchToAdd(ctx, frag, searchKey, ent, treePos);
     ndbrequire(ok);
 
     /*

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp	2011-05-04 12:29:26 +0000
@@ -18,163 +18,3 @@
 
 #define DBTUX_CMP_CPP
 #include "Dbtux.hpp"
-
-/*
- * Search key vs node prefix or entry.
- *
- * The comparison starts at given attribute position.  The position is
- * updated by number of equal initial attributes found.  The entry data
- * may be partial in which case CmpUnknown may be returned.
- *
- * The attributes are normalized and have variable size given in words.
- */
-int
-Dbtux::cmpSearchKey(TuxCtx& ctx,
-                    const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen)
-{
-  const unsigned numAttrs = frag.m_numAttrs;
-  const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
-  // skip to right position in search key only
-  for (unsigned i = 0; i < start; i++) {
-    thrjam(ctx.jamBuffer);
-    searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
-  }
-  // number of words of entry data left
-  unsigned len2 = maxlen;
-  int ret = 0;
-  while (start < numAttrs) {
-    if (len2 <= AttributeHeaderSize) {
-      thrjam(ctx.jamBuffer);
-      ret = NdbSqlUtil::CmpUnknown;
-      break;
-    }
-    len2 -= AttributeHeaderSize;
-    if (! ah(searchKey).isNULL()) {
-      if (! ah(entryData).isNULL()) {
-        thrjam(ctx.jamBuffer);
-        // verify attribute id
-        const DescAttr& descAttr = descEnt.m_descAttr[start];
-        ndbrequire(ah(searchKey).getAttributeId() == descAttr.m_primaryAttrId);
-        ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
-        // sizes
-        const unsigned bytes1 = ah(searchKey).getByteSize();
-        const unsigned bytes2 = min(ah(entryData).getByteSize(), len2 << 2);
-        const unsigned size2 = min(ah(entryData).getDataSize(), len2);
-        len2 -= size2;
-        // compare
-        NdbSqlUtil::Cmp* const cmp = ctx.c_sqlCmp[start];
-        const Uint32* const p1 = &searchKey[AttributeHeaderSize];
-        const Uint32* const p2 = &entryData[AttributeHeaderSize];
-        const bool full = (maxlen == MaxAttrDataSize);
-        ret = (*cmp)(0, p1, bytes1, p2, bytes2, full);
-        if (ret != 0) {
-          thrjam(ctx.jamBuffer);
-          break;
-        }
-      } else {
-        thrjam(ctx.jamBuffer);
-        // not NULL > NULL
-        ret = +1;
-        break;
-      }
-    } else {
-      if (! ah(entryData).isNULL()) {
-        thrjam(ctx.jamBuffer);
-        // NULL < not NULL
-        ret = -1;
-        break;
-      }
-    }
-    searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
-    entryData += AttributeHeaderSize + ah(entryData).getDataSize();
-    start++;
-  }
-  return ret;
-}
-
-/*
- * Scan bound vs node prefix or entry.
- *
- * Compare lower or upper bound and index entry data.  The entry data
- * may be partial in which case CmpUnknown may be returned.  Otherwise
- * returns -1 if the bound is to the left of the entry and +1 if the
- * bound is to the right of the entry.
- *
- * The routine is similar to cmpSearchKey, but 0 is never returned.
- * Suppose all attributes compare equal.  Recall that all bounds except
- * possibly the last one are non-strict.  Use the given bound direction
- * (0-lower 1-upper) and strictness of last bound to return -1 or +1.
- *
- * Following example illustrates this.  We are at (a=2, b=3).
- *
- * idir bounds                  strict          return
- * 0    a >= 2 and b >= 3       no              -1
- * 0    a >= 2 and b >  3       yes             +1
- * 1    a <= 2 and b <= 3       no              +1
- * 1    a <= 2 and b <  3       yes             -1
- *
- * The attributes are normalized and have variable size given in words.
- */
-int
-Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
-{
-  const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
-  // direction 0-lower 1-upper
-  ndbrequire(idir <= 1);
-  // number of words of data left
-  unsigned len2 = maxlen;
-  // in case of no bounds, init last type to something non-strict
-  unsigned type = 4;
-  while (boundCount != 0) {
-    if (len2 <= AttributeHeaderSize) {
-      jam();
-      return NdbSqlUtil::CmpUnknown;
-    }
-    len2 -= AttributeHeaderSize;
-    // get and skip bound type (it is used after the loop)
-    type = boundInfo[0];
-    boundInfo += 1;
-    if (! ah(boundInfo).isNULL()) {
-      if (! ah(entryData).isNULL()) {
-        jam();
-        // verify attribute id
-        const Uint32 index = ah(boundInfo).getAttributeId();
-        ndbrequire(index < frag.m_numAttrs);
-        const DescAttr& descAttr = descEnt.m_descAttr[index];
-        ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
-        // sizes
-        const unsigned bytes1 = ah(boundInfo).getByteSize();
-        const unsigned bytes2 = min(ah(entryData).getByteSize(), len2 << 2);
-        const unsigned size2 = min(ah(entryData).getDataSize(), len2);
-        len2 -= size2;
-        // compare
-        NdbSqlUtil::Cmp* const cmp = c_ctx.c_sqlCmp[index];
-        const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
-        const Uint32* const p2 = &entryData[AttributeHeaderSize];
-        const bool full = (maxlen == MaxAttrDataSize);
-        int ret = (*cmp)(0, p1, bytes1, p2, bytes2, full);
-        if (ret != 0) {
-          jam();
-          return ret;
-        }
-      } else {
-        jam();
-        // not NULL > NULL
-        return +1;
-      }
-    } else {
-      jam();
-      if (! ah(entryData).isNULL()) {
-        jam();
-        // NULL < not NULL
-        return -1;
-      }
-    }
-    boundInfo += AttributeHeaderSize + ah(boundInfo).getDataSize();
-    entryData += AttributeHeaderSize + ah(entryData).getDataSize();
-    boundCount -= 1;
-  }
-  // all attributes were equal
-  const int strict = (type & 0x1);
-  return (idir == 0 ? (strict == 0 ? -1 : +1) : (strict == 0 ? +1 : -1));
-}

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2011-04-24 16:20:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2011-05-17 12:19:20 +0000
@@ -211,6 +211,7 @@ Dbtux::printNode(TuxCtx & ctx,
     par.m_depth = 0;
     return;
   }
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
   TreeHead& tree = frag.m_tree;
   NodeHandle node(frag);
   selectNode(node, loc);
@@ -283,33 +284,34 @@ Dbtux::printNode(TuxCtx & ctx,
   }
 #endif
   // check inline prefix
-  { ConstData data1 = node.getPref();
+  {
+    KeyDataC keyData1(index.m_keySpec, false);
+    const Uint32* data1 = node.getPref();
+    keyData1.set_buf(data1, index.m_prefBytes, index.m_prefAttrs);
+    KeyData keyData2(index.m_keySpec, false, 0);
     Uint32 data2[MaxPrefSize];
-    memset(data2, DataFillByte, MaxPrefSize << 2);
-    readKeyAttrs(ctx, frag, node.getEnt(0), 0, ctx.c_searchKey);
-    copyAttrs(ctx, frag, ctx.c_searchKey, data2, tree.m_prefSize);
-    for (unsigned n = 0; n < tree.m_prefSize; n++) {
-      if (data1[n] != data2[n]) {
-        par.m_ok = false;
-        out << par.m_path << sep;
-        out << "inline prefix mismatch word " << n;
-        out << " value " << hex << data1[n];
-        out << " should be " << hex << data2[n] << endl;
-        break;
-      }
+    keyData2.set_buf(data2, MaxPrefSize << 2);
+    readKeyAttrs(ctx, frag, node.getEnt(0), keyData2, index.m_prefAttrs);
+    if (cmpSearchKey(ctx, keyData1, keyData2, index.m_prefAttrs) != 0) {
+      par.m_ok = false;
+      out << par.m_path << sep;
+      out << "inline prefix mismatch" << endl;
     }
   }
   // check ordering within node
   for (unsigned j = 1; j < node.getOccup(); j++) {
     const TreeEnt ent1 = node.getEnt(j - 1);
     const TreeEnt ent2 = node.getEnt(j);
-    unsigned start = 0;
-    readKeyAttrs(ctx, frag, ent1, start, ctx.c_searchKey);
-    readKeyAttrs(ctx, frag, ent2, start, ctx.c_entryKey);
-    int ret = cmpSearchKey(ctx, frag, start, ctx.c_searchKey, ctx.c_entryKey);
+    KeyData entryKey1(index.m_keySpec, false, 0);
+    KeyData entryKey2(index.m_keySpec, false, 0);
+    entryKey1.set_buf(ctx.c_searchKey, MaxAttrDataSize << 2);
+    entryKey2.set_buf(ctx.c_entryKey, MaxAttrDataSize << 2);
+    readKeyAttrs(ctx, frag, ent1, entryKey1, index.m_numAttrs);
+    readKeyAttrs(ctx, frag, ent2, entryKey2, index.m_numAttrs);
+    int ret = cmpSearchKey(ctx, entryKey1, entryKey2, index.m_numAttrs);
     if (ret == 0)
       ret = ent1.cmp(ent2);
-    if (ret != -1) {
+    if (! (ret < 0)) {
       par.m_ok = false;
       out << par.m_path << sep;
       out << " disorder within node at pos " << j << endl;
@@ -322,13 +324,17 @@ Dbtux::printNode(TuxCtx & ctx,
     const TreeEnt ent1 = cpar[i].m_minmax[1 - i];
     const unsigned pos = (i == 0 ? 0 : node.getOccup() - 1);
     const TreeEnt ent2 = node.getEnt(pos);
-    unsigned start = 0;
-    readKeyAttrs(ctx, frag, ent1, start, ctx.c_searchKey);
-    readKeyAttrs(ctx, frag, ent2, start, ctx.c_entryKey);
-    int ret = cmpSearchKey(ctx, frag, start, ctx.c_searchKey, ctx.c_entryKey);
+    KeyData entryKey1(index.m_keySpec, false, 0);
+    KeyData entryKey2(index.m_keySpec, false, 0);
+    entryKey1.set_buf(ctx.c_searchKey, MaxAttrDataSize << 2);
+    entryKey2.set_buf(ctx.c_entryKey, MaxAttrDataSize << 2);
+    readKeyAttrs(ctx, frag, ent1, entryKey1, index.m_numAttrs);
+    readKeyAttrs(ctx, frag, ent2, entryKey2, index.m_numAttrs);
+    int ret = cmpSearchKey(ctx, entryKey1, entryKey2, index.m_numAttrs);
     if (ret == 0)
       ret = ent1.cmp(ent2);
-    if (ret != (i == 0 ? -1 : +1)) {
+    if (i == 0 && ! (ret < 0) ||
+        i == 1 && ! (ret > 0)) {
       par.m_ok = false;
       out << par.m_path << sep;
       out << " disorder wrt subtree " << i << endl;
@@ -406,17 +412,6 @@ operator<<(NdbOut& out, const Dbtux::Tre
 }
 
 NdbOut&
-operator<<(NdbOut& out, const Dbtux::DescAttr& descAttr)
-{
-  out << "[DescAttr " << hex << &descAttr;
-  out << " [attrDesc " << hex << descAttr.m_attrDesc;
-  out << " [primaryAttrId " << dec << descAttr.m_primaryAttrId << "]";
-  out << " [typeId " << dec << descAttr.m_typeId << "]";
-  out << "]";
-  return out;
-}
-
-NdbOut&
 operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
 {
   Dbtux* tux = (Dbtux*)globalData.getBlock(DBTUX);
@@ -447,17 +442,15 @@ operator<<(NdbOut& out, const Dbtux::Sca
   out << " [pos " << scan.m_scanPos << "]";
   out << " [ent " << scan.m_scanEnt << "]";
   for (unsigned i = 0; i <= 1; i++) {
-    out << " [bound " << dec << i;
-    Dbtux::ScanBound& bound = *scan.m_bound[i];
-    Dbtux::ScanBoundIterator iter;
-    bound.first(iter);
-    for (unsigned j = 0; j < bound.getSize(); j++) {
-      out << " " << hex << *iter.data;
-      bound.next(iter);
-    }
+    const Dbtux::ScanBound scanBound = scan.m_scanBound[i];
+    const Dbtux::Index& index = *tux->c_indexPool.getPtr(scan.m_indexId);
+    Dbtux::KeyDataC keyBoundData(index.m_keySpec, true);
+    Dbtux::KeyBoundC keyBound(keyBoundData);
+    tux->unpackBound(tux->c_ctx, scanBound, keyBound);
+    out << " [scanBound " << dec << i;
+    out << " " << keyBound.print(tux->c_ctx.c_debugBuffer, Dbtux::DebugBufferBytes);
     out << "]";
   }
-  out << "]";
   return out;
 }
 
@@ -477,6 +470,8 @@ operator<<(NdbOut& out, const Dbtux::Ind
   out << " [descPage " << hex << index.m_descPage << "]";
   out << " [descOff " << dec << index.m_descOff << "]";
   out << " [numAttrs " << dec << index.m_numAttrs << "]";
+  out << " [prefAttrs " << dec << index.m_prefAttrs << "]";
+  out << " [prefBytes " << dec << index.m_prefBytes << "]";
   out << "]";
   return out;
 }
@@ -488,9 +483,6 @@ operator<<(NdbOut& out, const Dbtux::Fra
   out << " [tableId " << dec << frag.m_tableId << "]";
   out << " [indexId " << dec << frag.m_indexId << "]";
   out << " [fragId " << dec << frag.m_fragId << "]";
-  out << " [descPage " << hex << frag.m_descPage << "]";
-  out << " [descOff " << dec << frag.m_descOff << "]";
-  out << " [numAttrs " << dec << frag.m_numAttrs << "]";
   out << " [tree " << frag.m_tree << "]";
   out << "]";
   return out;

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2011-04-24 13:10:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp	2011-05-17 12:19:20 +0000
@@ -39,7 +39,7 @@ Dbtux::Dbtux(Block_context& ctx, Uint32
       (sizeof(TreeEnt) & 0x3) == 0 &&
       (sizeof(TreeNode) & 0x3) == 0 &&
       (sizeof(DescHead) & 0x3) == 0 &&
-      (sizeof(DescAttr) & 0x3) == 0
+      (sizeof(KeyType) & 0x3) == 0
   );
   /*
    * DbtuxGen.cpp
@@ -201,7 +201,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signa
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp));
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_BATCH_SIZE, &nScanBatch));
 
-  const Uint32 nDescPage = (nIndex * DescHeadSize + nAttribute * DescAttrSize + DescPageSize - 1) / DescPageSize;
+  const Uint32 nDescPage = (nIndex * DescHeadSize + nAttribute * KeyTypeSize + nAttribute * AttributeHeaderSize + DescPageSize - 1) / DescPageSize;
   const Uint32 nScanBoundWords = nScanOp * ScanBoundSegmentSize * 4;
   const Uint32 nScanLock = nScanOp * nScanBatch;
   
@@ -229,12 +229,14 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signa
   }
   // allocate buffers
   c_ctx.jamBuffer = jamBuffer();
-  c_ctx.c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes);
-  c_ctx.c_sqlCmp = (NdbSqlUtil::Cmp**)allocRecord("c_sqlCmp", sizeof(NdbSqlUtil::Cmp*), MaxIndexAttributes);
   c_ctx.c_searchKey = (Uint32*)allocRecord("c_searchKey", sizeof(Uint32), MaxAttrDataSize);
   c_ctx.c_entryKey = (Uint32*)allocRecord("c_entryKey", sizeof(Uint32), MaxAttrDataSize);
 
-  c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1);
+  c_ctx.c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxXfrmDataSize + 1) >> 1);
+
+#ifdef VM_TRACE
+  c_ctx.c_debugBuffer = (char*)allocRecord("c_debugBuffer", sizeof(char), DebugBufferBytes);
+#endif
 
   // ack
   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
@@ -247,123 +249,79 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signa
 // utils
 
 void
-Dbtux::setKeyAttrs(TuxCtx& ctx, const Frag& frag)
+Dbtux::readKeyAttrs(TuxCtx& ctx, const Frag& frag, TreeEnt ent, KeyData& keyData, Uint32 count)
 {
-  Data keyAttrs = ctx.c_keyAttrs;
-  NdbSqlUtil::Cmp** sqlCmp = ctx.c_sqlCmp; // global
-  const unsigned numAttrs = frag.m_numAttrs;
-  const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
-  for (unsigned i = 0; i < numAttrs; i++) {
-    thrjam(ctx.jamBuffer);
-    const DescAttr& descAttr = descEnt.m_descAttr[i];
-    Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
-    // set attr id and fixed size
-    ah(keyAttrs) = AttributeHeader(descAttr.m_primaryAttrId, size);
-    keyAttrs += 1;
-    // set comparison method pointer
-    const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
-    ndbrequire(sqlType.m_cmp != 0);
-    *(sqlCmp++) = sqlType.m_cmp;
-  }
-}
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+  const DescHead& descHead = getDescHead(index);
+  const AttributeHeader* keyAttrs = getKeyAttrs(descHead);
+  Uint32* const outputBuffer = ctx.c_dataBuffer;
+
+#ifdef VM_TRACE
+  ndbrequire(&keyData.get_spec() == &index.m_keySpec);
+  ndbrequire(keyData.get_spec().validate() == 0);
+  ndbrequire(count <= index.m_numAttrs);
+#endif
 
-void
-Dbtux::readKeyAttrs(TuxCtx& ctx, const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
-{
-  ConstData keyAttrs = ctx.c_keyAttrs;
-  const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
   const TupLoc tupLoc = ent.m_tupLoc;
+  const Uint32 pageId = tupLoc.getPageId();
+  const Uint32 pageOffset = tupLoc.getPageOffset();
   const Uint32 tupVersion = ent.m_tupVersion;
-  ndbrequire(start < frag.m_numAttrs);
-  const Uint32 numAttrs = frag.m_numAttrs - start;
-  // skip to start position in keyAttrs only
-  keyAttrs += start;
-  int ret = c_tup->tuxReadAttrs(ctx.jamBuffer,
-                                tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(),
-                                tupVersion, keyAttrs, numAttrs, keyData);
-  thrjamEntry(ctx.jamBuffer);
-  // TODO handle error
+  const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
+  const Uint32* keyAttrs32 = (const Uint32*)&keyAttrs[0];
+
+  int ret;
+  ret = c_tup->tuxReadAttrs(ctx.jamBuffer, tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), tupVersion, keyAttrs32, count, outputBuffer, false);
+  jamEntry();
   ndbrequire(ret > 0);
+  keyData.reset();
+  Uint32 len;
+  ret = keyData.add_poai(outputBuffer, count, &len);
+  ndbrequire(ret == 0);
+  ret = keyData.finalize();
+  ndbrequire(ret == 0);
+
 #ifdef VM_TRACE
   if (debugFlags & (DebugMaint | DebugScan)) {
-    debugOut << "readKeyAttrs:" << endl;
-    ConstData data = keyData;
-    Uint32 totalSize = 0;
-    for (Uint32 i = start; i < frag.m_numAttrs; i++) {
-      Uint32 attrId = ah(data).getAttributeId();
-      Uint32 dataSize = ah(data).getDataSize();
-      debugOut << i << " attrId=" << attrId << " size=" << dataSize;
-      data += 1;
-      for (Uint32 j = 0; j < dataSize; j++) {
-        debugOut << " " << hex << data[0];
-        data += 1;
-      }
-      debugOut << endl;
-      totalSize += 1 + dataSize;
-    }
-    ndbassert((int)totalSize == ret);
+    debugOut << "readKeyAttrs: ";
+    debugOut << " ent:" << ent << " count:" << count;
+    debugOut << " data:" << keyData.print(ctx.c_debugBuffer, DebugBufferBytes);
+    debugOut << endl;
   }
 #endif
 }
 
 void
-Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize)
+Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Uint32* pkData, unsigned& pkSize)
 {
   const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
   const TupLoc tupLoc = ent.m_tupLoc;
   int ret = c_tup->tuxReadPk(tableFragPtrI, tupLoc.getPageId(), tupLoc.getPageOffset(), pkData, true);
   jamEntry();
-  // TODO handle error
   ndbrequire(ret > 0);
   pkSize = ret;
 }
 
-/*
- * Copy attribute data with headers.  Input is all index key data.
- * Copies whatever fits.
- */
-void
-Dbtux::copyAttrs(TuxCtx& ctx, const Frag& frag, ConstData data1, Data data2, unsigned maxlen2)
-{
-  unsigned n = frag.m_numAttrs;
-  unsigned len2 = maxlen2;
-  while (n != 0) {
-    thrjam(ctx.jamBuffer);
-    const unsigned dataSize = ah(data1).getDataSize();
-    // copy header
-    if (len2 == 0)
-      return;
-    data2[0] = data1[0];
-    data1 += 1;
-    data2 += 1;
-    len2 -= 1;
-    // copy data
-    for (unsigned i = 0; i < dataSize; i++) {
-      if (len2 == 0)
-        return;
-      data2[i] = data1[i];
-      len2 -= 1;
-    }
-    data1 += dataSize;
-    data2 += dataSize;
-    n -= 1;
-  }
-#ifdef VM_TRACE
-  memset(data2, DataFillByte, len2 << 2);
-#endif
-}
-
 void
-Dbtux::unpackBound(const ScanBound& bound, Data dest)
+Dbtux::unpackBound(TuxCtx& ctx, const ScanBound& scanBound, KeyBoundC& searchBound)
 {
-  ScanBoundIterator iter;
-  bound.first(iter);
-  const unsigned n = bound.getSize();
-  unsigned j;
-  for (j = 0; j < n; j++) {
-    dest[j] = *iter.data;
-    bound.next(iter);
+  // there is no const version of LocalDataBuffer
+  DataBuffer<ScanBoundSegmentSize>::Head head = scanBound.m_head;
+  LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+  DataBuffer<ScanBoundSegmentSize>::ConstDataBufferIterator iter;
+  // always use searchKey buffer
+  Uint32* const outputBuffer = ctx.c_searchKey;
+  b.first(iter);
+  const Uint32 n = b.getSize();
+  ndbrequire(n <= MaxAttrDataSize);
+  for (Uint32 i = 0; i < n; i++) {
+    outputBuffer[i] = *iter.data;
+    b.next(iter);
   }
+  // set bound to the unpacked data buffer
+  KeyDataC& searchBoundData = searchBound.get_data();
+  searchBoundData.set_buf(outputBuffer, MaxAttrDataSize << 2, scanBound.m_cnt);
+  int ret = searchBound.finalize(scanBound.m_side);
+  ndbrequire(ret == 0);
 }
 
 void

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2011-04-25 14:42:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp	2011-05-04 11:58:38 +0000
@@ -66,31 +66,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
   findFrag(*indexPtr.p, fragId, fragPtr);
   ndbrequire(fragPtr.i != RNIL);
   Frag& frag = *fragPtr.p;
-  // set up index keys for this operation
-  setKeyAttrs(c_ctx, frag);
   // set up search entry
   TreeEnt ent;
   ent.m_tupLoc = TupLoc(req->pageId, req->pageIndex);
   ent.m_tupVersion = req->tupVersion;
-  // read search key
-  readKeyAttrs(c_ctx, frag, ent, 0, c_ctx.c_searchKey);
-  if (! frag.m_storeNullKey) {
-    // check if all keys are null
-    const unsigned numAttrs = frag.m_numAttrs;
-    bool allNull = true;
-    for (unsigned i = 0; i < numAttrs; i++) {
-      if (c_ctx.c_searchKey[i] != 0) {
-        jam();
-        allNull = false;
-        break;
-      }
-    }
-    if (allNull) {
-      jam();
-      req->errorCode = 0;
-      *sig = *req;
-      return;
-    }
+  // set up and read search key
+  KeyData searchKey(indexPtr.p->m_keySpec, false, 0);
+  searchKey.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
+  readKeyAttrs(c_ctx, frag, ent, searchKey, indexPtr.p->m_numAttrs);
+  if (unlikely(! indexPtr.p->m_storeNullKey) &&
+      searchKey.get_null_cnt() == indexPtr.p->m_numAttrs) {
+    jam();
+    return;
   }
 #ifdef VM_TRACE
   if (debugFlags & DebugMaint) {
@@ -110,7 +97,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
   switch (opCode) {
   case TuxMaintReq::OpAdd:
     jam();
-    ok = searchToAdd(c_ctx, frag, c_ctx.c_searchKey, ent, treePos);
+    ok = searchToAdd(c_ctx, frag, searchKey, ent, treePos);
 #ifdef VM_TRACE
     if (debugFlags & DebugMaint) {
       debugOut << treePos << (! ok ? " - error" : "") << endl;
@@ -144,7 +131,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
     break;
   case TuxMaintReq::OpRemove:
     jam();
-    ok = searchToRemove(c_ctx, frag, c_ctx.c_searchKey, ent, treePos);
+    ok = searchToRemove(c_ctx, frag, searchKey, ent, treePos);
 #ifdef VM_TRACE
     if (debugFlags & DebugMaint) {
       debugOut << treePos << (! ok ? " - error" : "") << endl;

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp	2011-04-24 16:20:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp	2011-05-08 08:15:58 +0000
@@ -157,31 +157,36 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signa
         indexPtr.p->m_state == Index::Defining &&
         attrId < indexPtr.p->m_numAttrs &&
         attrId == req->attrId);
-    // define the attribute
-    DescEnt& descEnt = getDescEnt(indexPtr.p->m_descPage, indexPtr.p->m_descOff);
-    DescAttr& descAttr = descEnt.m_descAttr[attrId];
-    descAttr.m_attrDesc = req->attrDescriptor;
-    descAttr.m_primaryAttrId = req->primaryAttrId;
-    descAttr.m_typeId = AttributeDescriptor::getType(req->attrDescriptor);
-    descAttr.m_charset = (req->extTypeInfo >> 16);
-#ifdef VM_TRACE
-    if (debugFlags & DebugMeta) {
-      debugOut << "attr " << attrId << " " << descAttr << endl;
-    }
-#endif
-    // check that type is valid and has a binary comparison method
-    const NdbSqlUtil::Type& type = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
-    if (type.m_typeId == NdbSqlUtil::Type::Undefined ||
-        type.m_cmp == 0) {
+    const Uint32 ad = req->attrDescriptor;
+    const Uint32 typeId = AttributeDescriptor::getType(ad);
+    const Uint32 sizeInBytes = AttributeDescriptor::getSizeInBytes(ad);
+    const Uint32 nullable = AttributeDescriptor::getNullable(ad);
+    const Uint32 csNumber = req->extTypeInfo >> 16;
+    const Uint32 primaryAttrId = req->primaryAttrId;
+
+    DescHead& descHead = getDescHead(*indexPtr.p);
+    // add type to spec
+    KeySpec& keySpec = indexPtr.p->m_keySpec;
+    KeyType keyType(typeId, sizeInBytes, nullable, csNumber);
+    if (keySpec.add(keyType) == -1) {
       jam();
       errorCode = TuxAddAttrRef::InvalidAttributeType;
       break;
     }
-    if (descAttr.m_charset != 0) {
-      uint err;
-      CHARSET_INFO *cs = all_charsets[descAttr.m_charset];
+    // add primary attr to read keys array
+    AttributeHeader* keyAttrs = getKeyAttrs(descHead);
+    AttributeHeader& keyAttr = keyAttrs[attrId];
+    new (&keyAttr) AttributeHeader(primaryAttrId, sizeInBytes);
+#ifdef VM_TRACE
+    if (debugFlags & DebugMeta) {
+      debugOut << "attr " << attrId << " " << keyType << endl;
+    }
+#endif
+    if (csNumber != 0) {
+      unsigned err;
+      CHARSET_INFO *cs = all_charsets[csNumber];
       ndbrequire(cs != 0);
-      if ((err = NdbSqlUtil::check_column_for_ordered_index(descAttr.m_typeId, cs))) {
+      if ((err = NdbSqlUtil::check_column_for_ordered_index(typeId, cs))) {
         jam();
         errorCode = (TuxAddAttrRef::ErrorCode) err;
         break;
@@ -196,6 +201,30 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signa
       break;
     }
     if (lastAttr) {
+      // compute min prefix
+      const KeySpec& keySpec = indexPtr.p->m_keySpec;
+      unsigned attrs = 0;
+      unsigned bytes = keySpec.get_nullmask_len(false);
+      unsigned maxAttrs = indexPtr.p->m_numAttrs;
+#ifdef VM_TRACE
+      {
+        const char* p = NdbEnv_GetEnv("MAX_TTREE_PREF_ATTRS", (char*)0, 0);
+        if (p != 0 && p[0] != 0 && maxAttrs > (unsigned)atoi(p))
+          maxAttrs = atoi(p);
+      }
+#endif
+      while (attrs < maxAttrs) {
+        const KeyType& keyType = keySpec.get_type(attrs);
+        const unsigned newbytes = bytes + keyType.get_byte_size();
+        if (newbytes > (MAX_TTREE_PREF_SIZE << 2))
+          break;
+        attrs++;
+        bytes = newbytes;
+      }
+      if (attrs == 0)
+        bytes = 0;
+      indexPtr.p->m_prefAttrs = attrs;
+      indexPtr.p->m_prefBytes = bytes;
       // fragment is defined
 #ifdef VM_TRACE
       if (debugFlags & DebugMeta) {
@@ -278,8 +307,6 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
     fragPtr.p->m_tableId = req->primaryTableId;
     fragPtr.p->m_indexId = req->tableId;
     fragPtr.p->m_fragId = req->fragId;
-    fragPtr.p->m_numAttrs = indexPtr.p->m_numAttrs;
-    fragPtr.p->m_storeNullKey = true;  // not yet configurable
     fragPtr.p->m_tupIndexFragPtrI = req->tupIndexFragPtrI;
     fragPtr.p->m_tupTableFragPtrI = req->tupTableFragPtrI;
     fragPtr.p->m_accTableFragPtrI = req->accTableFragPtrI;
@@ -288,10 +315,6 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
     indexPtr.p->m_fragId[indexPtr.p->m_numFrags] = req->fragId;
     indexPtr.p->m_fragPtrI[indexPtr.p->m_numFrags] = fragPtr.i;
     indexPtr.p->m_numFrags++;
-
-    // copy metadata address to each fragment
-    fragPtr.p->m_descPage = indexPtr.p->m_descPage;
-    fragPtr.p->m_descOff = indexPtr.p->m_descOff;
 #ifdef VM_TRACE
     if (debugFlags & DebugMeta) {
       debugOut << "Add frag " << fragPtr.i << " " << *fragPtr.p << endl;
@@ -311,7 +334,7 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
     new (&tree) TreeHead();
     // make these configurable later
     tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
-    tree.m_prefSize = MAX_TTREE_PREF_SIZE;
+    tree.m_prefSize = (indexPtr.p->m_prefBytes + 3) / 4;
     const unsigned maxSlack = MAX_TTREE_NODE_SLACK;
     // size of header and min prefix
     const unsigned fixedSize = NodeHeadSize + tree.m_prefSize;
@@ -542,7 +565,7 @@ bool
 Dbtux::allocDescEnt(IndexPtr indexPtr)
 {
   jam();
-  const unsigned size = DescHeadSize + indexPtr.p->m_numAttrs * DescAttrSize;
+  const Uint32 size = getDescSize(*indexPtr.p);
   DescPagePtr pagePtr;
   pagePtr.i = c_descPageList;
   while (pagePtr.i != RNIL) {
@@ -570,9 +593,13 @@ Dbtux::allocDescEnt(IndexPtr indexPtr)
   indexPtr.p->m_descPage = pagePtr.i;
   indexPtr.p->m_descOff = DescPageSize - pagePtr.p->m_numFree;
   pagePtr.p->m_numFree -= size;
-  DescEnt& descEnt = getDescEnt(indexPtr.p->m_descPage, indexPtr.p->m_descOff);
-  descEnt.m_descHead.m_indexId = indexPtr.i;
-  descEnt.m_descHead.pad1 = 0;
+  DescHead& descHead = *(DescHead*)&pagePtr.p->m_data[indexPtr.p->m_descOff];
+  descHead.m_indexId = indexPtr.i;
+  descHead.m_numAttrs = indexPtr.p->m_numAttrs;
+  descHead.m_magic = DescHead::Magic;
+  KeySpec& keySpec = indexPtr.p->m_keySpec;
+  KeyType* keyTypes = getKeyTypes(descHead);
+  keySpec.set_buf(keyTypes, indexPtr.p->m_numAttrs);
   return true;
 }
 
@@ -582,36 +609,36 @@ Dbtux::freeDescEnt(IndexPtr indexPtr)
   DescPagePtr pagePtr;
   c_descPagePool.getPtr(pagePtr, indexPtr.p->m_descPage);
   Uint32* const data = pagePtr.p->m_data;
-  const unsigned size = DescHeadSize + indexPtr.p->m_numAttrs * DescAttrSize;
-  unsigned off = indexPtr.p->m_descOff;
+  const Uint32 size = getDescSize(*indexPtr.p);
+  Uint32 off = indexPtr.p->m_descOff;
   // move the gap to the free area at the top
   while (off + size < DescPageSize - pagePtr.p->m_numFree) {
     jam();
     // next entry to move over the gap
-    DescEnt& descEnt2 = *(DescEnt*)&data[off + size];
-    Uint32 indexId2 = descEnt2.m_descHead.m_indexId;
+    DescHead& descHead2 = *(DescHead*)&data[off + size];
+    Uint32 indexId2 = descHead2.m_indexId;
     Index& index2 = *c_indexPool.getPtr(indexId2);
-    unsigned size2 = DescHeadSize + index2.m_numAttrs * DescAttrSize;
+    Uint32 size2 = getDescSize(index2);
     ndbrequire(
         index2.m_descPage == pagePtr.i &&
-        index2.m_descOff == off + size);
+        index2.m_descOff == off + size &&
+        index2.m_numAttrs == descHead2.m_numAttrs);
     // move the entry (overlapping copy if size < size2)
-    unsigned i;
+    Uint32 i;
     for (i = 0; i < size2; i++) {
       jam();
       data[off + i] = data[off + size + i];
     }
     off += size2;
-    // adjust page offset in index and all fragments
+    // adjust page offset in index
     index2.m_descOff -= size;
-    for (i = 0; i < index2.m_numFrags; i++) {
-      jam();
-      Frag& frag2 = *c_fragPool.getPtr(index2.m_fragPtrI[i]);
-      frag2.m_descOff -= size;
-      ndbrequire(
-          frag2.m_descPage == index2.m_descPage &&
-          frag2.m_descOff == index2.m_descOff);
-    }
+    {
+      // move KeySpec pointer
+      DescHead& descHead2 = getDescHead(index2);
+      KeyType* keyType2 = getKeyTypes(descHead2);
+      index2.m_keySpec.set_buf(keyType2);
+      ndbrequire(index2.m_keySpec.validate() == 0);
+     }
   }
   ndbrequire(off + size == DescPageSize - pagePtr.p->m_numFree);
   pagePtr.p->m_numFree += size;

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-04-24 16:20:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-05-17 12:19:20 +0000
@@ -148,16 +148,26 @@ Dbtux::freePreallocatedNode(Frag& frag)
 }
 
 /*
- * Set prefix.  Copies the number of words that fits.  Includes
- * attribute headers for now.  XXX use null mask instead
+ * Set prefix.  Copies the defined number of attributes.
  */
 void
 Dbtux::setNodePref(TuxCtx & ctx, NodeHandle& node)
 {
   const Frag& frag = node.m_frag;
-  const TreeHead& tree = frag.m_tree;
-  readKeyAttrs(ctx, frag, node.getEnt(0), 0, ctx.c_entryKey);
-  copyAttrs(ctx, frag, ctx.c_entryKey, node.getPref(), tree.m_prefSize);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+  KeyData prefKey(index.m_keySpec, false, 0);
+  prefKey.set_buf(node.getPref(), index.m_prefBytes);
+  if (index.m_prefAttrs > 0) {
+    jam();
+    readKeyAttrs(ctx, frag, node.getEnt(0), prefKey, index.m_prefAttrs);
+  }
+#ifdef VM_TRACE
+  if (debugFlags & DebugMaint) {
+    debugOut << "setNodePref: " << node;
+    debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+    debugOut << endl;
+  }
+#endif
 }
 
 // node operations

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2011-04-25 15:57:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2011-05-17 12:19:20 +0000
@@ -75,7 +75,7 @@ Dbtux::execACC_SCANREQ(Signal* signal)
       errorCode = AccScanRef::TuxNoFreeScanOp;
       break;
     }
-    new (scanPtr.p) ScanOp(c_scanBoundPool);
+    new (scanPtr.p) ScanOp;
     scanPtr.p->m_state = ScanOp::First;
     scanPtr.p->m_userPtr = req->senderData;
     scanPtr.p->m_userRef = req->senderRef;
@@ -131,9 +131,9 @@ Dbtux::execACC_SCANREQ(Signal* signal)
  * Check that sets of lower and upper bounds are on initial sequences of
  * keys and that all but possibly last bound is non-strict.
  *
- * Finally save the sets of lower and upper bounds (i.e. start key and
- * end key).  Full bound type is included but only the strict bit is
- * used since lower and upper have now been separated.
+ * Finally convert the sets of lower and upper bounds (i.e. start key
+ * and end key) to NdbPack format.  The data is saved in segmented
+ * memory.  The bound is reconstructed at use time via unpackBound().
  *
  * Error handling:  Error code is set in the scan and also returned in
  * EXECUTE_DIRECT (the old way).
@@ -143,174 +143,144 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal
 {
   jamEntry();
   // get records
-  TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
-  const TuxBoundInfo* const req = (const TuxBoundInfo*)sig;
+  TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
   ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI);
   const Index& index = *c_indexPool.getPtr(scan.m_indexId);
-  const DescEnt& descEnt = getDescEnt(index.m_descPage, index.m_descOff);
-  // collect normalized lower and upper bounds
-  struct BoundInfo {
-    int type2;     // with EQ -> LE/GE
-    Uint32 offset; // offset in xfrmData
-    Uint32 size;
-  };
-  BoundInfo boundInfo[2][MaxIndexAttributes];
-  const unsigned dstSize = MaxAttrDataSize;
-  // use some static buffer (they are only used within a timeslice)
-  Uint32* const xfrmData = c_dataBuffer;
-  Uint32 dstPos = 0;
-  // largest attrId seen plus one
-  Uint32 maxAttrId[2] = { 0, 0 };
-  // walk through entries
-  const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
-  Uint32 offset = 0;
-  while (offset + 2 <= req->boundAiLength) {
-    jam();
-    const unsigned type = data[offset];
-    const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
-    const Uint32 attrId = ah->getAttributeId();
-    const Uint32 byteSize = ah->getByteSize();
-    const Uint32 dataSize = ah->getDataSize();
-    if (type > 4 || attrId >= index.m_numAttrs || dstPos + 2 + dataSize > dstSize) {
-      jam();
-      scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
-      sig->errorCode = scan.m_errorCode;
-      return;
-    }
-    // copy header
-    xfrmData[dstPos + 0] = data[offset + 0];
-    xfrmData[dstPos + 1] = data[offset + 1];
-    // copy bound value
-    Uint32 dstBytes = 0;
-    Uint32 dstWords = 0;
-    if (! ah->isNULL()) {
-      jam();
-      const uchar* srcPtr = (const uchar*)&data[offset + 2];
-      const DescAttr& descAttr = descEnt.m_descAttr[attrId];
-      Uint32 typeId = descAttr.m_typeId;
-      Uint32 maxBytes = AttributeDescriptor::getSizeInBytes(descAttr.m_attrDesc);
-      Uint32 lb, len;
-      bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, maxBytes, lb, len);
-      if (! ok) {
-        jam();
-        scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
-        sig->errorCode = scan.m_errorCode;
-        return;
-      }
-      Uint32 srcBytes = lb + len;
-      Uint32 srcWords = (srcBytes + 3) / 4;
-      if (srcBytes != byteSize) {
+  const DescHead& descHead = getDescHead(index);
+  const KeyType* keyTypes = getKeyTypes(descHead);
+  // extract lower and upper bound in separate passes
+  for (unsigned idir = 0; idir <= 1; idir++) {
+    jam();
+    struct BoundInfo {
+      int type2;      // with EQ -> LE/GE
+      Uint32 offset;  // word offset in signal data
+      Uint32 bytes;
+    };
+    BoundInfo boundInfo[MaxIndexAttributes];
+    // largest attrId seen plus one
+    Uint32 maxAttrId = 0;
+    const Uint32* const data = &req->data[0];
+    Uint32 offset = 0;
+    while (offset + 2 <= req->boundAiLength) {
+      jam();
+      const Uint32 type = data[offset];
+      const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
+      const Uint32 attrId = ah->getAttributeId();
+      const Uint32 byteSize = ah->getByteSize();
+      const Uint32 dataSize = ah->getDataSize();
+      // check type
+      if (unlikely(type > 4)) {
         jam();
         scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
-        sig->errorCode = scan.m_errorCode;
+        req->errorCode = scan.m_errorCode;
         return;
       }
-      uchar* dstPtr = (uchar*)&xfrmData[dstPos + 2];
-      if (descAttr.m_charset == 0) {
-        memcpy(dstPtr, srcPtr, srcWords << 2);
-        dstBytes = srcBytes;
-        dstWords = srcWords;
-      } else {
+      Uint32 type2 = type;
+      if (type2 == 4) {
         jam();
-        CHARSET_INFO* cs = all_charsets[descAttr.m_charset];
-        Uint32 xmul = cs->strxfrm_multiply;
-        if (xmul == 0)
-          xmul = 1;
-        // see comment in DbtcMain.cpp
-        Uint32 dstLen = xmul * (maxBytes - lb);
-        if (dstLen > ((dstSize - dstPos) << 2)) {
+        type2 = (idir << 1); // LE=0 GE=2
+      }
+      // check if attribute belongs to this bound
+      if ((type2 & 0x2) == (idir << 1)) {
+        if (unlikely(attrId >= index.m_numAttrs)) {
           jam();
-          scan.m_errorCode = TuxBoundInfo::TooMuchAttrInfo;
-          sig->errorCode = scan.m_errorCode;
+          scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
+          req->errorCode = scan.m_errorCode;
           return;
         }
-        int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
-        ndbrequire(n != -1);
-        dstBytes = n;
-        while ((n & 3) != 0) {
-          dstPtr[n++] = 0;
+        // mark entries in any gap as undefined
+        while (maxAttrId <= attrId) {
+          jam();
+          BoundInfo& b = boundInfo[maxAttrId];
+          b.type2 = -1;
+          maxAttrId++;
         }
-        dstWords = n / 4;
-      }
-    }
-    for (unsigned j = 0; j <= 1; j++) {
-      jam();
-      // check if lower/upper bit matches
-      const unsigned luBit = (j << 1);
-      if ((type & 0x2) != luBit && type != 4)
-        continue;
-      // EQ -> LE, GE
-      const unsigned type2 = (type & 0x1) | luBit;
-      // fill in any gap
-      while (maxAttrId[j] <= attrId) {
-        jam();
-        BoundInfo& b = boundInfo[j][maxAttrId[j]];
-        maxAttrId[j]++;
-        b.type2 = -1;
-      }
-      BoundInfo& b = boundInfo[j][attrId];
-      if (b.type2 != -1) {
-        // compare with previously defined bound
-        if (b.type2 != (int)type2 ||
-            b.size != 2 + dstWords ||
-            memcmp(&xfrmData[b.offset + 2], &xfrmData[dstPos + 2], dstWords << 2) != 0) {
+        BoundInfo& b = boundInfo[attrId];
+        // duplicate no longer allowed (wl#4163)
+        if (unlikely(b.type2 != -1)) {
           jam();
           scan.m_errorCode = TuxBoundInfo::InvalidBounds;
-          sig->errorCode = scan.m_errorCode;
+          req->errorCode = scan.m_errorCode;
           return;
         }
-      } else {
-        // fix length
-        AttributeHeader* ah = (AttributeHeader*)&xfrmData[dstPos + 1];
-        ah->setByteSize(dstBytes);
-        // enter new bound
-        jam();
-        b.type2 = type2;
-        b.offset = dstPos;
-        b.size = 2 + dstWords;
+        b.type2 = (int)type2;
+        b.offset = offset + 1; // poai
+        b.bytes = byteSize;
       }
+      // jump to next
+      offset += 2 + dataSize;
     }
-    // jump to next
-    offset += 2 + dataSize;
-    dstPos += 2 + dstWords;
-  }
-  if (offset != req->boundAiLength) {
-    jam();
-    scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
-    sig->errorCode = scan.m_errorCode;
-    return;
-  }
-  for (unsigned j = 0; j <= 1; j++) {
-    // save lower/upper bound in index attribute id order
-    for (unsigned i = 0; i < maxAttrId[j]; i++) {
-      jam();
-      const BoundInfo& b = boundInfo[j][i];
-      // check for gap or strict bound before last
-      if (b.type2 == -1 || (i + 1 < maxAttrId[j] && (b.type2 & 0x1))) {
-        jam();
-        scan.m_errorCode = TuxBoundInfo::InvalidBounds;
-        sig->errorCode = scan.m_errorCode;
-        return;
-      }
-      bool ok = scan.m_bound[j]->append(&xfrmData[b.offset], b.size);
-      if (! ok) {
+    if (unlikely(offset != req->boundAiLength)) {
+      jam();
+      scan.m_errorCode = TuxBoundInfo::InvalidAttrInfo;
+      req->errorCode = scan.m_errorCode;
+      return;
+    }
+    // check and pack the bound data
+    KeyData searchBoundData(index.m_keySpec, true, 0);
+    KeyBound searchBound(searchBoundData);
+    searchBoundData.set_buf(c_ctx.c_searchKey, MaxAttrDataSize << 2);
+    int strict = 0; // 0 or 1
+    Uint32 i;
+    for (i = 0; i < maxAttrId; i++) {
+      jam();
+      const BoundInfo& b = boundInfo[i];
+       // check for gap or strict bound before last
+       strict = (b.type2 & 0x1);
+       if (unlikely(b.type2 == -1 || (i + 1 < maxAttrId && strict))) {
+         jam();
+         scan.m_errorCode = TuxBoundInfo::InvalidBounds;
+         req->errorCode = scan.m_errorCode;
+         return;
+       }
+       Uint32 len;
+       if (unlikely(searchBoundData.add_poai(&data[b.offset], &len) == -1 ||
+           b.bytes != len)) {
+         jam();
+         scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
+         req->errorCode = scan.m_errorCode;
+         return;
+       }
+    }
+    int side = 0;
+    if (maxAttrId != 0) {
+      // arithmetic is faster
+      // side = (idir == 0 ? (strict ? +1 : -1) : (strict ? -1 : +1));
+      side = (-1) * (1 - 2 * strict) * (1 - 2 * int(idir));
+    }
+    if (unlikely(searchBound.finalize(side) == -1)) {
+      jam();
+      scan.m_errorCode = TuxBoundInfo::InvalidCharFormat;
+      req->errorCode = scan.m_errorCode;
+      return;
+    }
+    ScanBound& scanBound = scan.m_scanBound[idir];
+    scanBound.m_cnt = maxAttrId;
+    scanBound.m_side = side;
+    // save data words in segmented memory
+    {
+      DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
+      LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+      const Uint32* data = (const Uint32*)searchBoundData.get_data_buf();
+      Uint32 size = (searchBoundData.get_data_len() + 3) / 4;
+      bool ok = b.append(data, size);
+      if (unlikely(!ok)) {
         jam();
         scan.m_errorCode = TuxBoundInfo::OutOfBuffers;
-        sig->errorCode = scan.m_errorCode;
+        req->errorCode = scan.m_errorCode;
         return;
       }
     }
-    scan.m_boundCnt[j] = maxAttrId[j];
   }
   if (ERROR_INSERTED(12009)) {
     jam();
     CLEAR_ERROR_INSERT_VALUE;
     scan.m_errorCode = TuxBoundInfo::InvalidBounds;
-    sig->errorCode = scan.m_errorCode;
+    req->errorCode = scan.m_errorCode;
     return;
   }
   // no error
-  sig->errorCode = 0;
+  req->errorCode = 0;
 }
 
 void
@@ -478,7 +448,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal
     scanFind(scanPtr);
   }
   // for reading tuple key in Found or Locked state
-  Data pkData = c_dataBuffer;
+  Uint32* pkData = c_ctx.c_dataBuffer;
   unsigned pkSize = 0; // indicates not yet done
   if (scan.m_state == ScanOp::Found) {
     // found an entry to return
@@ -757,18 +727,21 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
 {
   ScanOp& scan = *scanPtr.p;
   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
 #ifdef VM_TRACE
   if (debugFlags & DebugScan) {
     debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
   }
 #endif
-  // set up index keys for this operation
-  setKeyAttrs(c_ctx, frag);
   // scan direction 0, 1
   const unsigned idir = scan.m_descending;
-  unpackBound(*scan.m_bound[idir], c_dataBuffer);
+  // set up bound from segmented memory
+  const ScanBound& scanBound = scan.m_scanBound[idir];
+  KeyDataC searchBoundData(index.m_keySpec, true);
+  KeyBoundC searchBound(searchBoundData);
+  unpackBound(c_ctx, scanBound, searchBound);
   TreePos treePos;
-  searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
+  searchToScan(frag, idir, searchBound, treePos);
   if (treePos.m_loc != NullTupLoc) {
     scan.m_scanPos = treePos;
     // link the scan to node found
@@ -779,11 +752,15 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
       jam();
       // check upper bound
       TreeEnt ent = node.getEnt(treePos.m_pos);
-      if (scanCheck(scanPtr, ent))
+      if (scanCheck(scanPtr, ent)) {
+        jam();
         scan.m_state = ScanOp::Current;
-      else
+      } else {
+        jam();
         scan.m_state = ScanOp::Last;
+      }
     } else {
+      jam();
       scan.m_state = ScanOp::Next;
     }
   } else {
@@ -874,8 +851,6 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool
 #endif
   // cannot be moved away from tuple we have locked
   ndbrequire(scan.m_state != ScanOp::Locked);
-  // set up index keys for this operation
-  setKeyAttrs(c_ctx, frag);
   // scan direction
   const unsigned idir = scan.m_descending; // 0, 1
   const int jdir = 1 - 2 * (int)idir;      // 1, -1
@@ -1030,17 +1005,34 @@ Dbtux::scanCheck(ScanOpPtr scanPtr, Tree
     return false;
   }
   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
   const unsigned idir = scan.m_descending;
   const int jdir = 1 - 2 * (int)idir;
-  unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
-  unsigned boundCnt = scan.m_boundCnt[1 - idir];
-  readKeyAttrs(c_ctx, frag, ent, 0, c_ctx.c_entryKey);
-  int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, boundCnt, c_ctx.c_entryKey);
-  ndbrequire(ret != NdbSqlUtil::CmpUnknown);
-  if (jdir * ret > 0)
-    return true;
-  // hit upper bound of single range scan
-  return false;
+  const ScanBound& scanBound = scan.m_scanBound[1 - idir];
+  int ret = 0;
+  if (scanBound.m_cnt != 0) {
+    jam();
+    // set up bound from segmented memory
+    KeyDataC searchBoundData(index.m_keySpec, true);
+    KeyBoundC searchBound(searchBoundData);
+    unpackBound(c_ctx, scanBound, searchBound);
+    // key data for the entry
+    KeyData entryKey(index.m_keySpec, true, 0);
+    entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
+    readKeyAttrs(c_ctx, frag, ent, entryKey, index.m_numAttrs);
+    // compare bound to key
+    const Uint32 boundCount = searchBound.get_data().get_cnt();
+    ret = cmpSearchBound(c_ctx, searchBound, entryKey, boundCount);
+    ndbrequire(ret != 0);
+    ret = (-1) * ret; // reverse for key vs bound
+    ret = jdir * ret; // reverse for descending scan
+  }
+#ifdef VM_TRACE
+  if (debugFlags & DebugScan) {
+    debugOut << "Check scan " << scanPtr.i << " " << scan << " ret:" << dec << ret << endl;
+  }
+#endif
+  return (ret <= 0);
 }
 
 /*
@@ -1207,8 +1199,12 @@ Dbtux::releaseScanOp(ScanOpPtr& scanPtr)
   }
 #endif
   Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
-  scanPtr.p->m_boundMin.release();
-  scanPtr.p->m_boundMax.release();
+  for (unsigned i = 0; i <= 1; i++) {
+    ScanBound& scanBound = scanPtr.p->m_scanBound[i];
+    DataBuffer<ScanBoundSegmentSize>::Head& head = scanBound.m_head;
+    LocalDataBuffer<ScanBoundSegmentSize> b(c_scanBoundPool, head);
+    b.release();
+  }
   // unlink from per-fragment list and release from pool
   frag.m_scanList.release(scanPtr);
 }

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp	2011-04-27 21:13:10 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp	2011-05-17 12:19:20 +0000
@@ -32,25 +32,30 @@
  * is within the node.
  */
 void
-Dbtux::findNodeToUpdate(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode)
+Dbtux::findNodeToUpdate(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode)
 {
-  const TreeHead& tree = frag.m_tree;
-  const Uint32 numAttrs = frag.m_numAttrs;
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+  const Uint32 numAttrs = index.m_numAttrs;
+  const Uint32 prefAttrs = index.m_prefAttrs;
+  const Uint32 prefBytes = index.m_prefBytes;
+  KeyData entryKey(index.m_keySpec, false, 0);
+  entryKey.set_buf(ctx.c_entryKey, MaxAttrDataSize << 2);
+  KeyDataC prefKey(index.m_keySpec, false);
   NodeHandle glbNode(frag);     // potential g.l.b of final node
   while (true) {
     thrjam(ctx.jamBuffer);
     selectNode(currNode, currNode.m_loc);
-    int ret;
-    // compare prefix
-    unsigned start = 0;
-    ret = cmpSearchKey(ctx, frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
-    if (ret == NdbSqlUtil::CmpUnknown) {
+    prefKey.set_buf(currNode.getPref(), prefBytes, prefAttrs);
+    int ret = 0;
+    if (prefAttrs > 0) {
+      thrjam(ctx.jamBuffer);
+      ret = cmpSearchKey(ctx, searchKey, prefKey, prefAttrs);
+    }
+    if (ret == 0 && prefAttrs < numAttrs) {
       thrjam(ctx.jamBuffer);
-      // read and compare remaining attributes
-      ndbrequire(start < numAttrs);
-      readKeyAttrs(ctx, frag, currNode.getEnt(0), start, ctx.c_entryKey);
-      ret = cmpSearchKey(ctx, frag, start, searchKey, ctx.c_entryKey);
-      ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+      // read and compare all attributes
+      readKeyAttrs(ctx, frag, currNode.getEnt(0), entryKey, numAttrs);
+      ret = cmpSearchKey(ctx, searchKey, entryKey, numAttrs);
     }
     if (ret == 0) {
       thrjam(ctx.jamBuffer);
@@ -97,20 +102,20 @@ Dbtux::findNodeToUpdate(TuxCtx& ctx, Fra
  * search.  Return true if ok i.e. entry to add is not a duplicate.
  */
 bool
-Dbtux::findPosToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
+Dbtux::findPosToAdd(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
 {
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
   int lo = -1;
   int hi = (int)currNode.getOccup();
-  int ret;
+  KeyData entryKey(index.m_keySpec, false, 0);
+  entryKey.set_buf(ctx.c_entryKey, MaxAttrDataSize << 2);
   while (hi - lo > 1) {
     thrjam(ctx.jamBuffer);
     // hi - lo > 1 implies lo < j < hi
     int j = (hi + lo) / 2;
-    // read and compare attributes
-    unsigned start = 0;
-    readKeyAttrs(ctx, frag, currNode.getEnt(j), start, ctx.c_entryKey);
-    ret = cmpSearchKey(ctx, frag, start, searchKey, ctx.c_entryKey);
-    ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+    // read and compare all attributes
+    readKeyAttrs(ctx, frag, currNode.getEnt(j), entryKey, index.m_numAttrs);
+    int ret = cmpSearchKey(ctx, searchKey, entryKey, index.m_numAttrs);
     if (ret == 0) {
       thrjam(ctx.jamBuffer);
       // keys are equal, compare entry values
@@ -139,7 +144,7 @@ Dbtux::findPosToAdd(TuxCtx& ctx, Frag& f
  * search.  Return true if ok i.e. the entry was found.
  */
 bool
-Dbtux::findPosToRemove(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
+Dbtux::findPosToRemove(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, NodeHandle& currNode, TreePos& treePos)
 {
   const unsigned occup = currNode.getOccup();
   for (unsigned j = 0; j < occup; j++) {
@@ -160,7 +165,7 @@ Dbtux::findPosToRemove(TuxCtx& ctx, Frag
  * Search for entry to add.
  */
 bool
-Dbtux::searchToAdd(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToAdd(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos)
 {
   const TreeHead& tree = frag.m_tree;
   NodeHandle currNode(frag);
@@ -183,7 +188,7 @@ Dbtux::searchToAdd(TuxCtx& ctx, Frag& fr
  * Search for entry to remove.
  */
 bool
-Dbtux::searchToRemove(TuxCtx& ctx, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
+Dbtux::searchToRemove(TuxCtx& ctx, Frag& frag, const KeyDataC& searchKey, TreeEnt searchEnt, TreePos& treePos)
 {
   const TreeHead& tree = frag.m_tree;
   NodeHandle currNode(frag);
@@ -208,22 +213,38 @@ Dbtux::searchToRemove(TuxCtx& ctx, Frag&
  * Search within the found node is done by caller.
  */
 void
-Dbtux::findNodeToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode)
+Dbtux::findNodeToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode)
 {
-  const TreeHead& tree = frag.m_tree;
+  const int jdir = 1 - 2 * int(idir);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+  const Uint32 numAttrs = searchBound.get_data().get_cnt();
+  const Uint32 prefAttrs = min(index.m_prefAttrs, numAttrs);
+  const Uint32 prefBytes = index.m_prefBytes;
+  KeyData entryKey(index.m_keySpec, false, 0);
+  entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
+  KeyDataC prefKey(index.m_keySpec, false);
   NodeHandle glbNode(frag);     // potential g.l.b of final node
   while (true) {
     jam();
     selectNode(currNode, currNode.m_loc);
-    int ret;
-    // compare prefix
-    ret = cmpScanBound(frag, idir, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
-    if (ret == NdbSqlUtil::CmpUnknown) {
+    prefKey.set_buf(currNode.getPref(), prefBytes, prefAttrs);
+    int ret = 0;
+    if (numAttrs > 0) {
+      if (prefAttrs > 0) {
+        jam();
+        // compare node prefix - result 0 implies bound is longer
+        ret = cmpSearchBound(c_ctx, searchBound, prefKey, prefAttrs);
+      }
+      if (ret == 0) {
+        jam();
+        // read and compare all attributes
+        readKeyAttrs(c_ctx, frag, currNode.getEnt(0), entryKey, numAttrs);
+        ret = cmpSearchBound(c_ctx, searchBound, entryKey, numAttrs);
+        ndbrequire(ret != 0);
+      }
+    } else {
       jam();
-      // read and compare all attributes
-      readKeyAttrs(c_ctx, frag, currNode.getEnt(0), 0, c_ctx.c_entryKey);
-      ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
-      ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+      ret = (-1) * jdir;
     }
     if (ret < 0) {
       // bound is left of this node
@@ -266,23 +287,26 @@ Dbtux::findNodeToScan(Frag& frag, unsign
  * search similar to findPosToAdd().
  */
 void
-Dbtux::findPosToScan(Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, NodeHandle& currNode, Uint16* pos)
+Dbtux::findPosToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, NodeHandle& currNode, Uint16* pos)
 {
   const int jdir = 1 - 2 * int(idir);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
+  const Uint32 numAttrs = searchBound.get_data().get_cnt();
   int lo = -1;
   int hi = (int)currNode.getOccup();
+  KeyData entryKey(index.m_keySpec, false, 0);
+  entryKey.set_buf(c_ctx.c_entryKey, MaxAttrDataSize << 2);
   while (hi - lo > 1) {
     jam();
     // hi - lo > 1 implies lo < j < hi
     int j = (hi + lo) / 2;
     int ret = (-1) * jdir;
-    if (boundCount != 0) {
-      // read and compare attributes
-      const TreeEnt currEnt = currNode.getEnt(j);
-      readKeyAttrs(c_ctx, frag, currEnt, 0, c_ctx.c_entryKey);
-      ret = cmpScanBound(frag, idir, boundInfo, boundCount, c_ctx.c_entryKey);
+    if (numAttrs != 0) {
+      // read and compare all attributes
+      readKeyAttrs(c_ctx, frag, currNode.getEnt(j), entryKey, numAttrs);
+      ret = cmpSearchBound(c_ctx, searchBound, entryKey, numAttrs);
+      ndbrequire(ret != 0);
     }
-    ndbrequire(ret != 0);
     if (ret < 0) {
       jam();
       hi = j;
@@ -302,7 +326,7 @@ Dbtux::findPosToScan(Frag& frag, unsigne
  * Search for scan start position.
  */
 void
-Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
+Dbtux::searchToScan(Frag& frag, unsigned idir, const KeyBoundC& searchBound, TreePos& treePos)
 {
   const TreeHead& tree = frag.m_tree;
   NodeHandle currNode(frag);
@@ -312,11 +336,10 @@ Dbtux::searchToScan(Frag& frag, ConstDat
     jam();
     return;
   }
-  const unsigned idir = unsigned(descending);
-  findNodeToScan(frag, idir, boundInfo, boundCount, currNode);
+  findNodeToScan(frag, idir, searchBound, currNode);
   treePos.m_loc = currNode.m_loc;
   Uint16 pos;
-  findPosToScan(frag, idir, boundInfo, boundCount, currNode, &pos);
+  findPosToScan(frag, idir, searchBound, currNode, &pos);
   const unsigned occup = currNode.getOccup();
   if (idir == 0) {
     if (pos < occup) {

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp	2011-02-01 21:05:11 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp	2011-05-04 12:08:38 +0000
@@ -49,22 +49,18 @@ Dbtux::statRecordsInRange(ScanOpPtr scan
 {
   ScanOp& scan = *scanPtr.p;
   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+  const Index& index = *c_indexPool.getPtr(frag.m_indexId);
   TreeHead& tree = frag.m_tree;
   // get first and last position
   TreePos pos1 = scan.m_scanPos;
   TreePos pos2;
   { // as in scanFirst()
-    setKeyAttrs(c_ctx, frag);
     const unsigned idir = 1;
-    const ScanBound& bound = *scan.m_bound[idir];
-    ScanBoundIterator iter;
-    bound.first(iter);
-    for (unsigned j = 0; j < bound.getSize(); j++) {
-      jam();
-      c_dataBuffer[j] = *iter.data;
-      bound.next(iter);
-    }
-    searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], true, pos2);
+    const ScanBound& scanBound = scan.m_scanBound[idir];
+    KeyDataC searchBoundData(index.m_keySpec, true);
+    KeyBoundC searchBound(searchBoundData);
+    unpackBound(c_ctx, scanBound, searchBound);
+    searchToScan(frag, idir, searchBound, pos2);
     // committed read (same timeslice) and range not empty
     ndbrequire(pos2.m_loc != NullTupLoc);
   }

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp	2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.cpp	2011-05-23 10:38:41 +0000
@@ -30,17 +30,17 @@
 #include <EventLogger.hpp>
 extern EventLogger * g_eventLogger;
 
-AsyncIoThread::AsyncIoThread(class Ndbfs& fs, AsyncFile* file)
+AsyncIoThread::AsyncIoThread(class Ndbfs& fs, bool bound)
   : m_fs(fs)
 {
-  m_current_file = file;
-  if (file)
+  m_current_file = 0;
+  if (bound)
   {
-    theMemoryChannelPtr = &theMemoryChannel;
+    theMemoryChannelPtr = &m_fs.theToBoundThreads;
   }
   else
   {
-    theMemoryChannelPtr = &m_fs.theToThreads;
+    theMemoryChannelPtr = &m_fs.theToUnboundThreads;
   }
   theReportTo = &m_fs.theFromThreads;
 }
@@ -149,13 +149,17 @@ AsyncIoThread::run()
     switch (request->action) {
     case Request::open:
       file->openReq(request);
+      if (request->error == 0 && request->m_do_bind)
+        attach(file);
       break;
     case Request::close:
       file->closeReq(request);
+      detach(file);
       break;
     case Request::closeRemove:
       file->closeReq(request);
       file->removeReq(request);
+      detach(file);
       break;
     case Request::readPartial:
     case Request::read:
@@ -265,3 +269,32 @@ AsyncIoThread::buildIndxReq(Request* req
   req.buffer_size = request->file->m_page_cnt * sizeof(GlobalPage);
   request->error = (* req.func_ptr)(&req);
 }
+
+void
+AsyncIoThread::attach(AsyncFile* file)
+{
+  assert(m_current_file == 0);
+  assert(theMemoryChannelPtr == &m_fs.theToBoundThreads);
+  m_current_file = file;
+  theMemoryChannelPtr = &theMemoryChannel;
+  file->attach(this);
+  m_fs.cnt_active_bound(1);
+}
+
+void
+AsyncIoThread::detach(AsyncFile* file)
+{
+  if (m_current_file == 0)
+  {
+    assert(file->getThread() == 0);
+  }
+  else
+  {
+    assert(m_current_file == file);
+    assert(theMemoryChannelPtr = &theMemoryChannel);
+    m_current_file = 0;
+    theMemoryChannelPtr = &m_fs.theToBoundThreads;
+    file->detach(this);
+    m_fs.cnt_active_bound(-1);
+  }
+}

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp	2011-04-21 09:21:18 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncIoThread.hpp	2011-05-23 10:38:41 +0000
@@ -43,6 +43,8 @@ class Request
 public:
   Request() {}
 
+  void atGet() { m_do_bind = false; }
+
   enum Action {
     open,
     close,
@@ -113,6 +115,7 @@ public:
    // Information for open, needed if the first open action fails.
   AsyncFile* file;
   Uint32 theTrace;
+  bool m_do_bind;
 
   MemoryChannel<Request>::ListMember m_mem_channel;
 };
@@ -134,7 +137,7 @@ class AsyncIoThread
   friend class Ndbfs;
   friend class AsyncFile;
 public:
-  AsyncIoThread(class Ndbfs&, AsyncFile* file);
+  AsyncIoThread(class Ndbfs&, bool bound);
   virtual ~AsyncIoThread() {};
 
   struct NdbThread* doStart();
@@ -174,6 +177,8 @@ private:
    */
   void buildIndxReq(Request*);
 
+  void attach(AsyncFile*);
+  void detach(AsyncFile*);
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2011-04-21 09:21:18 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2011-05-23 10:38:41 +0000
@@ -45,6 +45,8 @@
 #include <EventLogger.hpp>
 extern EventLogger * g_eventLogger;
 
+NdbMutex g_active_bound_threads_mutex;
+
 inline
 int pageSize( const NewVARIABLE* baseAddrRef )
 {
@@ -62,10 +64,15 @@ Ndbfs::Ndbfs(Block_context& ctx) :
   scanningInProgress(false),
   theLastId(0),
   theRequestPool(0),
-  m_maxOpenedFiles(0)
+  m_maxOpenedFiles(0),
+  m_bound_threads_cnt(0),
+  m_unbounds_threads_cnt(0),
+  m_active_bound_threads_cnt(0)
 {
   BLOCK_CONSTRUCTOR(Ndbfs);
 
+  NdbMutex_Init(&g_active_bound_threads_mutex);
+
   // Set received signals
   addRecSignal(GSN_READ_CONFIG_REQ, &Ndbfs::execREAD_CONFIG_REQ);
   addRecSignal(GSN_DUMP_STATE_ORD,  &Ndbfs::execDUMP_STATE_ORD);
@@ -100,7 +107,8 @@ Ndbfs::~Ndbfs()
   request.action = Request::end;
   for (unsigned i = 0; i < theThreads.size(); i++)
   {
-    theToThreads.writeChannel(&request);
+    theToBoundThreads.writeChannel(&request);
+    theToUnboundThreads.writeChannel(&request);
   }
 
   for (unsigned i = 0; i < theThreads.size(); i++)
@@ -274,7 +282,12 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
   // Create idle AsyncFiles
   for (Uint32 i = 0; i < noIdleFiles; i++)
   {
-    theIdleBoundFiles.push_back(createAsyncFile(true /* bound */));
+    theIdleFiles.push_back(createAsyncFile());
+    AsyncIoThread * thr = createIoThread(/* bound */ true);
+    if (thr)
+    {
+      theThreads.push_back(thr);
+    }
   }
 
   Uint32 threadpool = 2;
@@ -283,7 +296,7 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
   // Create IoThreads
   for (Uint32 i = 0; i < threadpool; i++)
   {
-    AsyncIoThread * thr = createIoThread(0);
+    AsyncIoThread * thr = createIoThread(/* bound */ false);
     if (thr)
     {
       jam();
@@ -339,7 +352,7 @@ Ndbfs::execSTTOR(Signal* signal)
   ndbrequire(0);
 }
 
-int 
+int
 Ndbfs::forward( AsyncFile * file, Request* request)
 {
   jam();
@@ -348,9 +361,13 @@ Ndbfs::forward( AsyncFile * file, Reques
   {
     thr->dispatch(request);
   }
+  else if (request->m_do_bind)
+  {
+    theToBoundThreads.writeChannel(request);
+  }
   else
   {
-    theToThreads.writeChannel(request);
+    theToUnboundThreads.writeChannel(request);
   }
   return 1;
 }
@@ -444,7 +461,8 @@ Ndbfs::execFSOPENREQ(Signal* signal)
   request->par.open.file_size <<= 32;
   request->par.open.file_size |= fsOpenReq->file_size_lo;
   request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
-  
+  request->m_do_bind = bound;
+
   ndbrequire(forward(file, request));
 }
 
@@ -454,7 +472,8 @@ Ndbfs::execFSREMOVEREQ(Signal* signal)
   jamEntry();
   const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
   const BlockReference userRef = req->userReference;
-  AsyncFile* file = getIdleFile(true);
+  bool bound = true;
+  AsyncFile* file = getIdleFile(bound);
   ndbrequire(file != NULL);
 
   SectionHandle handle(this, signal);
@@ -479,7 +498,8 @@ Ndbfs::execFSREMOVEREQ(Signal* signal)
   request->set(userRef, req->userPointer, newId() );
   request->file = file;
   request->theTrace = signal->getTrace();
-  
+  request->m_do_bind = bound;
+
   if (version == 6)
   {
     ndbrequire(bp < NDB_ARRAY_SIZE(m_base_path));
@@ -541,6 +561,7 @@ Ndbfs::execFSCLOSEREQ(Signal * signal)
   request->file = openFile;
   request->error = 0;
   request->theTrace = signal->getTrace();
+  request->m_do_bind = false;
 
   ndbrequire(forward(openFile, request));
 }
@@ -584,6 +605,7 @@ Ndbfs::readWriteRequest(int action, Sign
   request->file = openFile;
   request->action = (Request::Action) action;
   request->theTrace = signal->getTrace();
+  request->m_do_bind = false;
 
   Uint32 format = fsRWReq->getFormatFlag(fsRWReq->operationFlag);
 
@@ -804,7 +826,8 @@ Ndbfs::execFSSYNCREQ(Signal * signal)
   request->set(userRef, userPointer, filePointer);
   request->file = openFile;
   request->theTrace = signal->getTrace();
-  
+  request->m_do_bind = false;
+
   ndbrequire(forward(openFile,request));
 }
 
@@ -832,6 +855,7 @@ Ndbfs::execFSSUSPENDORD(Signal * signal)
   request->file = openFile;
   request->theTrace = signal->getTrace();
   request->par.suspend.milliseconds = millis;
+  request->m_do_bind = false;
 
   ndbrequire(forward(openFile,request));
 }
@@ -895,6 +919,7 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
     request->action = Request::append;
   else
     request->action = Request::append_synch;
+  request->m_do_bind = false;
   ndbrequire(forward(openFile, request));
   return;
   
@@ -918,7 +943,8 @@ Ndbfs::execALLOC_MEM_REQ(Signal* signal)
 
   AllocMemReq* req = (AllocMemReq*)signal->getDataPtr();
 
-  AsyncFile* file = getIdleFile(true);
+  bool bound = true;
+  AsyncFile* file = getIdleFile(bound);
   ndbrequire(file != NULL);
 
   Request *request = theRequestPool->get();
@@ -932,6 +958,7 @@ Ndbfs::execALLOC_MEM_REQ(Signal* signal)
   request->par.alloc.requestInfo = req->requestInfo;
   request->par.alloc.bytes = (Uint64(req->bytes_hi) << 32) + req->bytes_lo;
   request->action = Request::allocmem;
+  request->m_do_bind = bound;
   ndbrequire(forward(file, request));
 }
 
@@ -943,7 +970,8 @@ Ndbfs::execBUILD_INDX_IMPL_REQ(Signal* s
   jamEntry();
   mt_BuildIndxReq * req = (mt_BuildIndxReq*)signal->getDataPtr();
 
-  AsyncFile* file = getIdleFile(true);
+  bool bound = true;
+  AsyncFile* file = getIdleFile(bound);
   ndbrequire(file != NULL);
 
   Request *request = theRequestPool->get();
@@ -972,6 +1000,7 @@ Ndbfs::execBUILD_INDX_IMPL_REQ(Signal* s
 
   memcpy(&request->par.build.m_req, req, sizeof(* req));
   request->action = Request::buildindx;
+  request->m_do_bind = bound;
   ndbrequire(forward(file, request));
 }
 
@@ -1000,8 +1029,8 @@ Ndbfs::newId()
 }
 
 AsyncFile*
-Ndbfs::createAsyncFile(bool bound){
-
+Ndbfs::createAsyncFile()
+{
   // Check limit of open files
   if (m_maxFiles !=0 && theFiles.size() ==  m_maxFiles)
   {
@@ -1024,42 +1053,35 @@ Ndbfs::createAsyncFile(bool bound){
     ERROR_SET(fatal, NDBD_EXIT_AFS_MAXOPEN,""," Ndbfs::createAsyncFile");
   }
 
-  if (bound)
-  {
-    AsyncIoThread * thr = createIoThread(file);
-    theThreads.push_back(thr);
-    file->attach(thr);
-
-#ifdef VM_TRACE
-    ndbout_c("NDBFS: Created new file thread %d", theFiles.size());
-#endif
-  }
-
   theFiles.push_back(file);
-  
   return file;
 }
 
 void
 Ndbfs::pushIdleFile(AsyncFile* file)
 {
-  if (file->getThread())
-  {
-    theIdleBoundFiles.push_back(file);
-  }
-  else
-  {
-    theIdleUnboundFiles.push_back(file);
-  }
+  assert(file->getThread() == 0);
+  theIdleFiles.push_back(file);
 }
 
 AsyncIoThread*
-Ndbfs::createIoThread(AsyncFile* file)
+Ndbfs::createIoThread(bool bound)
 {
-  AsyncIoThread* thr = new AsyncIoThread(*this, file);
+  AsyncIoThread* thr = new AsyncIoThread(*this, bound);
+  if (thr)
+  {
+#ifdef VM_TRACE
+    ndbout_c("NDBFS: Created new file thread %d", theThreads.size());
+#endif
 
-  struct NdbThread* thrptr = thr->doStart();
-  globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
+    struct NdbThread* thrptr = thr->doStart();
+    globalEmulatorData.theConfiguration->addThread(thrptr, NdbfsThread);
+
+    if (bound)
+      m_bound_threads_cnt++;
+    else
+      m_unbounds_threads_cnt++;
+  }
 
   return thr;
 }
@@ -1067,31 +1089,50 @@ Ndbfs::createIoThread(AsyncFile* file)
 AsyncFile*
 Ndbfs::getIdleFile(bool bound)
 {
-  if (bound)
+  AsyncFile* file = 0;
+  Uint32 sz = theIdleFiles.size();
+  if (sz)
   {
-    Uint32 sz = theIdleBoundFiles.size();
-    if (sz)
-    {
-      AsyncFile* file = theIdleBoundFiles[sz - 1];
-      theIdleBoundFiles.erase(sz - 1);
-      return file;
-    }
+    file = theIdleFiles[sz - 1];
+    theIdleFiles.erase(sz - 1);
   }
   else
   {
-    Uint32 sz = theIdleUnboundFiles.size();
-    if (sz)
+    file = createAsyncFile();
+  }
+
+  if (bound)
+  {
+    /**
+     * Check if we should create thread
+     */
+    if (m_active_bound_threads_cnt == m_bound_threads_cnt)
     {
-      AsyncFile* file = theIdleUnboundFiles[sz - 1];
-      theIdleUnboundFiles.erase(sz - 1);
-      return file;
+      AsyncIoThread * thr = createIoThread(true);
+      if (thr)
+      {
+        theThreads.push_back(thr);
+      }
     }
   }
-
-  return createAsyncFile(bound);
+  return file;
 }
 
-
+void
+Ndbfs::cnt_active_bound(int val)
+{
+  Guard g(&g_active_bound_threads_mutex);
+  if (val < 0)
+  {
+    val = -val;
+    assert(m_active_bound_threads_cnt >= (Uint32)val);
+    m_active_bound_threads_cnt -= val;
+  }
+  else
+  {
+    m_active_bound_threads_cnt += val;
+  }
+}
 
 void
 Ndbfs::report(Request * request, Signal* signal)
@@ -1506,10 +1547,13 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
     infoEvent("NDBFS: Files: %d Open files: %d",
 	      theFiles.size(),
 	      theOpenFiles.size());
-    infoEvent(" Idle files: (bound: %u unbound: %u) Max opened files: %d",
-              theIdleBoundFiles.size(),
-              theIdleUnboundFiles.size(),
+    infoEvent(" Idle files: %u Max opened files: %d",
+              theIdleFiles.size(),
               m_maxOpenedFiles);
+    infoEvent(" Bound Threads: %u (active %u) Unbound threads: %u",
+              m_bound_threads_cnt,
+              m_active_bound_threads_cnt,
+              m_unbounds_threads_cnt);
     infoEvent(" Max files: %d",
 	      m_maxFiles);
     infoEvent(" Requests: %d",
@@ -1522,7 +1566,10 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
     
     for (unsigned i = 0; i < theOpenFiles.size(); i++){
       AsyncFile* file = theOpenFiles.getFile(i);
-      infoEvent("%2d (0x%lx): %s", i, (long)file, file->theFileName.c_str());
+      infoEvent("%2d (0x%lx): %s thr: %lx", i,
+                (long)file,
+                file->theFileName.c_str(),
+                (long)file->getThread());
     }
     return;
   }
@@ -1536,18 +1583,14 @@ Ndbfs::execDUMP_STATE_ORD(Signal* signal
     return;
   }
   if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
-    infoEvent("NDBFS: Dump idle files: %d %u",
-              theIdleBoundFiles.size(), theIdleUnboundFiles.size());
-    
-    for (unsigned i = 0; i < theIdleBoundFiles.size(); i++){
-      AsyncFile* file = theIdleBoundFiles[i];
-      infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
-    }
+    infoEvent("NDBFS: Dump idle files: %u",
+              theIdleFiles.size());
 
-    for (unsigned i = 0; i < theIdleUnboundFiles.size(); i++){
-      AsyncFile* file = theIdleUnboundFiles[i];
+    for (unsigned i = 0; i < theIdleFiles.size(); i++){
+      AsyncFile* file = theIdleFiles[i];
       infoEvent("%2d (0x%lx): %s", i, (long)file, file->isOpen()?"OPEN":"CLOSED");
     }
+
     return;
   }
 

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp	2011-05-23 10:38:41 +0000
@@ -79,19 +79,19 @@ private:
 
   // Communication from/to files
   MemoryChannel<Request> theFromThreads;
-  MemoryChannel<Request> theToThreads;
+  MemoryChannel<Request> theToBoundThreads;
+  MemoryChannel<Request> theToUnboundThreads;
 
   Pool<Request>* theRequestPool;
 
-  AsyncIoThread* createIoThread(AsyncFile* file);
-  AsyncFile* createAsyncFile(bool bound);
+  AsyncIoThread* createIoThread(bool bound);
+  AsyncFile* createAsyncFile();
   AsyncFile* getIdleFile(bool bound);
   void pushIdleFile(AsyncFile*);
 
   Vector<AsyncIoThread*> theThreads;// List of all created threads
   Vector<AsyncFile*> theFiles;      // List all created AsyncFiles
-  Vector<AsyncFile*> theIdleBoundFiles;   // List of idle AsyncFiles
-  Vector<AsyncFile*> theIdleUnboundFiles; // List of idle AsyncFiles
+  Vector<AsyncFile*> theIdleFiles;  // List of idle AsyncFiles
   OpenFiles theOpenFiles;           // List of open AsyncFiles
 
   BaseString m_base_path[FsOpenReq::BP_MAX];
@@ -105,6 +105,11 @@ private:
   void readWriteRequest(  int action, Signal * signal );
 
   static Uint32 translateErrno(int aErrno);
+
+  Uint32 m_bound_threads_cnt;
+  Uint32 m_unbounds_threads_cnt;
+  Uint32 m_active_bound_threads_cnt;
+  void cnt_active_bound(int val);
 public:
   const BaseString& get_base_path(Uint32 no) const;
 };

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Pool.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Pool.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Pool.hpp	2011-05-23 10:38:41 +0000
@@ -249,6 +249,7 @@ template <class T> inline T* Pool<T>::ge
    }
    --theTop;
    tmp = theList[theTop];
+   tmp->atGet();
    return tmp;
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStat.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStat.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStat.cpp	2011-05-04 14:45:46 +0000
@@ -125,7 +125,7 @@ NdbIndexStat::stat_verify()
       const Uint32* entrykey1 = (const Uint32*)&e1 + EntrySize;
       const Uint32* entrykey2 = (const Uint32*)&e2 + EntrySize;
       int ret = stat_cmpkey(a, entrykey1, e1.m_keylen, entrykey2, e2.m_keylen);
-      assert(ret == -1);
+      assert(ret < 0);
     }
   }
 }
@@ -164,7 +164,7 @@ NdbIndexStat::stat_cmpkey(const Area& a,
         if (! ah1.isNULL()) {
           if (! ah2.isNULL()) {
             const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(c->m_type);
-            ret = (*sqlType.m_cmp)(c->m_cs, &key1[i1], n, &key2[i2], n, true);
+            ret = (*sqlType.m_cmp)(c->m_cs, &key1[i1], n, &key2[i2], n);
             if (ret != 0)
               break;
           } else {

=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2011-05-05 11:06:08 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2011-05-17 12:47:21 +0000
@@ -1749,11 +1749,9 @@ NdbQueryIndexScanOperationDefImpl::check
           const int res=
             (*recAttr.compare_function)(recAttr.charset_info,
                                         keyPart1.ptr, keyPart1.len,
-                                        highKeyPart.ptr, highKeyPart.len, 
-                                        true);
+                                        highKeyPart.ptr, highKeyPart.len);
           if (res!=0)
           {  // Not equal
-            assert(res != NdbSqlUtil::CmpUnknown);
             return 0;
           }
         } // if (keyPos == keyEnd ||

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2011-05-11 13:31:44 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2011-05-17 12:47:21 +0000
@@ -717,10 +717,9 @@ compare_index_row_prefix(const NdbRecord
 
       void *info= col->charset_info;
       int res=
-        (*col->compare_function)(info, ptr1, maxSize, ptr2, maxSize, true);
+        (*col->compare_function)(info, ptr1, maxSize, ptr2, maxSize);
       if (res)
       {
-        assert(res != NdbSqlUtil::CmpUnknown);
         return res;
       }
     }
@@ -3593,10 +3592,9 @@ int compare_ndbrecord(const NdbReceiver
       void *info= result_col->charset_info;
       int res=
         (*result_col->compare_function)
-            (info, a_ptr, maxSize, b_ptr, maxSize, true);
+            (info, a_ptr, maxSize, b_ptr, maxSize);
       if (res)
       {
-        assert(res != NdbSqlUtil::CmpUnknown);
         return res * jdir;
       }
     }

=== modified file 'storage/ndb/test/run-test/conf-blade08.cnf'
--- a/storage/ndb/test/run-test/conf-blade08.cnf	2011-04-20 09:18:09 +0000
+++ b/storage/ndb/test/run-test/conf-blade08.cnf	2011-05-19 17:47:28 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
 DataMemory = 300M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 NoOfFragmentLogFiles = 4
 FragmentLogFileSize = 64M

=== modified file 'storage/ndb/test/run-test/conf-dl145a.cnf'
--- a/storage/ndb/test/run-test/conf-dl145a.cnf	2011-02-19 10:31:42 +0000
+++ b/storage/ndb/test/run-test/conf-dl145a.cnf	2011-05-19 18:19:47 +0000
@@ -20,7 +20,7 @@ IndexMemory = 100M
 DataMemory = 300M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 NoOfFragmentLogFiles = 4
 FragmentLogFileSize = 64M

=== modified file 'storage/ndb/test/run-test/conf-fimafeng08.cnf'
--- a/storage/ndb/test/run-test/conf-fimafeng08.cnf	2010-02-02 13:44:41 +0000
+++ b/storage/ndb/test/run-test/conf-fimafeng08.cnf	2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
 DataMemory = 300M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 
 RedoBuffer = 32M

=== modified file 'storage/ndb/test/run-test/conf-fimafeng09.cnf' (properties changed: +x to -x)
--- a/storage/ndb/test/run-test/conf-fimafeng09.cnf	2010-02-02 13:44:41 +0000
+++ b/storage/ndb/test/run-test/conf-fimafeng09.cnf	2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
 DataMemory = 300M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 RedoBuffer = 32M
 

=== modified file 'storage/ndb/test/run-test/conf-loki27.cnf' (properties changed: +x to -x)
--- a/storage/ndb/test/run-test/conf-loki27.cnf	2010-06-15 15:02:16 +0000
+++ b/storage/ndb/test/run-test/conf-loki27.cnf	2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
 DataMemory = 300M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 NoOfFragmentLogFiles = 4
 FragmentLogFileSize = 64M

=== modified file 'storage/ndb/test/run-test/conf-ndb07.cnf'
--- a/storage/ndb/test/run-test/conf-ndb07.cnf	2011-02-19 10:31:42 +0000
+++ b/storage/ndb/test/run-test/conf-ndb07.cnf	2011-05-19 18:19:47 +0000
@@ -27,7 +27,7 @@ IndexMemory = 100M
 DataMemory = 500M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 NoOfFragmentLogFiles = 8
 FragmentLogFileSize = 64M
 ODirect=1

=== modified file 'storage/ndb/test/run-test/conf-ndbmaster.cnf'
--- a/storage/ndb/test/run-test/conf-ndbmaster.cnf	2009-02-17 09:26:44 +0000
+++ b/storage/ndb/test/run-test/conf-ndbmaster.cnf	2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
 DataMemory = 300M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 
 SharedGlobalMemory=256M

=== modified file 'storage/ndb/test/run-test/conf-repl.cnf'
--- a/storage/ndb/test/run-test/conf-repl.cnf	2007-02-13 01:38:54 +0000
+++ b/storage/ndb/test/run-test/conf-repl.cnf	2011-05-19 17:47:28 +0000
@@ -11,7 +11,7 @@ skip-innodb
 skip-bdb
 
 [cluster_config]
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 DataMemory = 100M
 
 [cluster_config.master]

=== modified file 'storage/ndb/test/run-test/conf-techra29.cnf' (properties changed: +x to -x)
--- a/storage/ndb/test/run-test/conf-techra29.cnf	2010-02-02 13:44:41 +0000
+++ b/storage/ndb/test/run-test/conf-techra29.cnf	2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
 DataMemory = 300M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 RedoBuffer = 32M
 

=== modified file 'storage/ndb/test/run-test/conf-test.cnf'
--- a/storage/ndb/test/run-test/conf-test.cnf	2007-11-15 07:57:00 +0000
+++ b/storage/ndb/test/run-test/conf-test.cnf	2011-05-19 17:47:28 +0000
@@ -19,7 +19,7 @@ IndexMemory = 25M
 DataMemory = 100M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 NoOfFragmentLogFiles = 4
 FragmentLogFileSize = 64M

=== modified file 'storage/ndb/test/run-test/conf-tyr64.cnf' (properties changed: +x to -x)
--- a/storage/ndb/test/run-test/conf-tyr64.cnf	2010-02-02 13:44:41 +0000
+++ b/storage/ndb/test/run-test/conf-tyr64.cnf	2011-05-19 18:19:47 +0000
@@ -19,7 +19,7 @@ IndexMemory = 100M
 DataMemory = 300M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 
 RedoBuffer = 32M

=== modified file 'storage/ndb/test/run-test/conf-upgrade.cnf'
--- a/storage/ndb/test/run-test/conf-upgrade.cnf	2009-06-23 18:40:35 +0000
+++ b/storage/ndb/test/run-test/conf-upgrade.cnf	2011-05-19 17:47:28 +0000
@@ -26,7 +26,7 @@ IndexMemory = 50M
 DataMemory = 100M
 BackupMemory = 64M
 MaxNoOfConcurrentScans = 100
-MaxNoOfSavedMessages= 1000
+MaxNoOfSavedMessages= 5
 SendBufferMemory = 2M
 NoOfFragmentLogFiles = 4
 FragmentLogFileSize = 64M

No bundle (reason: revision is a merge (you can force generation of a bundle with env var BZR_FORCE_BUNDLE=1)).
Thread
bzr commit into mysql-5.1-telco-7.1 branch (magnus.blaudd:4212) magnus.blaudd23 May