List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:June 6 2011 12:18pm
Subject:bzr commit into mysql-5.1-telco-7.0-wl4124-new0 branch
(pekka.nousiainen:4390) WL#4124
View as plain text  
#At file:///export/space/pekka/ms/ms-wl4124-70/ based on revid:pekka.nousiainen@stripped

 4390 Pekka Nousiainen	2011-06-06
      wl#4124 b05_api.diff
      ndbapi methods, utils, test

    added:
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp
      storage/ndb/tools/ndb_index_stat.cpp
    modified:
      sql/ha_ndbcluster.cc
      storage/ndb/include/ndbapi/NdbApi.hpp
      storage/ndb/include/ndbapi/NdbDictionary.hpp
      storage/ndb/include/ndbapi/NdbIndexStat.hpp
      storage/ndb/src/kernel/blocks/trix/Trix.cpp
      storage/ndb/src/ndbapi/API.hpp
      storage/ndb/src/ndbapi/CMakeLists.txt
      storage/ndb/src/ndbapi/Makefile.am
      storage/ndb/src/ndbapi/NdbDictionary.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
      storage/ndb/src/ndbapi/NdbIndexStat.cpp
      storage/ndb/src/ndbapi/Ndberr.cpp
      storage/ndb/src/ndbapi/Ndbif.cpp
      storage/ndb/src/ndbapi/ndberror.c
      storage/ndb/test/ndbapi/testIndexStat.cpp
      storage/ndb/tools/Makefile.am
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-04-29 11:45:56 +0000
+++ b/sql/ha_ndbcluster.cc	2011-06-06 12:18:27 +0000
@@ -2248,7 +2248,7 @@ int ha_ndbcluster::add_index_handle(THD
     d.index_stat=NULL;
     if (THDVAR(thd, index_stat_enable))
     {
-      d.index_stat=new NdbIndexStat(index);
+      d.index_stat=new NdbIndexStat();
       d.index_stat_cache_entries= THDVAR(thd, index_stat_cache_entries);
       d.index_stat_update_freq= THDVAR(thd, index_stat_update_freq);
       d.index_stat_query_count=0;

=== modified file 'storage/ndb/include/ndbapi/NdbApi.hpp'
--- a/storage/ndb/include/ndbapi/NdbApi.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/ndbapi/NdbApi.hpp	2011-06-06 12:18:27 +0000
@@ -28,6 +28,7 @@
 #include "NdbScanOperation.hpp"
 #include "NdbIndexOperation.hpp"
 #include "NdbIndexScanOperation.hpp"
+#include "NdbIndexStat.hpp"
 #include "NdbInterpretedCode.hpp"
 #include "NdbScanFilter.hpp"
 #include "NdbRecAttr.hpp"

=== modified file 'storage/ndb/include/ndbapi/NdbDictionary.hpp'
--- a/storage/ndb/include/ndbapi/NdbDictionary.hpp	2011-05-31 08:28:58 +0000
+++ b/storage/ndb/include/ndbapi/NdbDictionary.hpp	2011-06-06 12:18:27 +0000
@@ -2348,6 +2348,30 @@ public:
      */
     int dropIndex(const char * indexName,
 		  const char * tableName);
+
+    /*
+     * Force update of ordered index stats.  Scans an assigned fragment
+     * in the kernel and updates result in stats tables.  This one-time
+     * update is independent of IndexStatAuto settings.  Common use case
+     * is mysql "analyze table".
+     */
+    int updateIndexStat(const Index&, const Table&);
+
+    /*
+     * Force update of ordered index stats where index is given by id.
+     */
+    int updateIndexStat(Uint32 indexId, Uint32 indexVersion, Uint32 tableId);
+
+    /*
+     * Delete ordered index stats.  If IndexStatAutoUpdate is set, also
+     * stops automatic updates, until another forced update is done.
+     */
+    int deleteIndexStat(const Index&, const Table&);
+
+    /*
+     * Delete ordered index stats where index is given by id.
+     */
+    int deleteIndexStat(Uint32 indexId, Uint32 indexVersion, Uint32 tableId);
     
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     void removeCachedTable(const Table *table);

=== modified file 'storage/ndb/include/ndbapi/NdbIndexStat.hpp'
--- a/storage/ndb/include/ndbapi/NdbIndexStat.hpp	2011-02-01 21:05:11 +0000
+++ b/storage/ndb/include/ndbapi/NdbIndexStat.hpp	2011-06-06 12:18:27 +0000
@@ -23,143 +23,352 @@
 #include "NdbDictionary.hpp"
 #include "NdbError.hpp"
 #include "NdbIndexScanOperation.hpp"
-class NdbIndexImpl;
+class NdbIndexStatImpl;
 
 /*
- * Statistics for an ordered index.
+ * Ordered index stats "v4".  Includes 1) the old records_in_range in
+ * simplified form 2) the new scanned and stored stats.  These are
+ * completely different.  1) makes a one-round-trip query directly to
+ * the index while 2) reads more extensive stats from sys tables where
+ * they were stored previously by NDB kernel.
+ *
+ * Methods in general return 0 on success and -1 on error.  The error
+ * details are available via getNdbError().
  */
+
 class NdbIndexStat {
 public:
-  NdbIndexStat(const NdbDictionary::Index* index);
+  NdbIndexStat();
   ~NdbIndexStat();
+
+  // dummy defs to make handler compile at "ndb api" patch level
+  int alloc_cache(Uint32 entries) { return 0; }
+  enum { RR_UseDb = 1, RR_NoUpdate = 2 };
+
   /*
-   * Allocate memory for cache.  Argument is minimum number of stat
-   * entries and applies to lower and upper bounds separately.  More
-   * entries may fit (keys have variable size).  If not used, db is
-   * contacted always.
-   */
-  int alloc_cache(Uint32 entries);
-  /*
-   * Flags for records_in_range.
+   * Get latest error.  Can be printed like any NdbError instance and
+   * includes some extras.
    */
-  enum {
-    RR_UseDb = 1,       // contact db
-    RR_NoUpdate = 2     // but do not update cache
+  struct Error : public NdbError {
+    int line;  // source code line number
+    int extra; // extra error code
+    Error();
   };
+  const Error& getNdbError() const;
+
   /*
-   * Estimate how many index records need to be scanned.  The scan
-   * operation must be prepared with lock mode LM_CommittedRead and must
-   * have the desired bounds set.  The routine may use local cache or
-   * may contact db by executing the operation.
-   *
-   * If returned count is zero then db was contacted and the count is
-   * exact.  Otherwise the count is approximate.  If cache is used then
-   * caller must provide estimated number of table rows.  It will be
-   * multiplied by a percentage obtained from the cache (result zero is
-   * returned as 1).
+   * Estimate how many records exist in given range.  Does a single
+   * tree-dive on each index fragment, estimates the count from tree
+   * properties, and sums up the results.
+   *
+   * Caller provides index and scan transaction and range bounds.
+   * A scan operation is created and executed.  The result is returned
+   * in out-parameter "count".  The result is not transactional.  Value
+   * zero is exact (range was empty when checked).
+   *
+   * This is basically a static method.  The class instance is used only
+   * to return errors.
    */
   int records_in_range(const NdbDictionary::Index* index,
                        NdbTransaction* trans,
                        const NdbRecord* key_record,
                        const NdbRecord* result_record,
                        const NdbIndexScanOperation::IndexBound* ib,
-                       Uint64 table_rows,
+                       Uint64 table_rows, // not used
                        Uint64* count,
-                       int flags);
+                       int flags); // not used
 
   /*
-   * Get latest error.
+   * Methods for stored stats.
+   *
+   * There are two distinct users: 1) writer reads samples from sys
+   * tables and creates a new query cache 2) readers make concurrent
+   * stats queries on current query cache.
+   *
+   * Writer provides any Ndb object required.  Its database name must be
+   * "mysql".  No reference to it is kept.
+   *
+   * Readers provide structs such as Bound on stack or in TLS.  The
+   * structs are opaque.  With source code the structs can be cast to
+   * NdbIndexStatImpl structs.
    */
-  const NdbError& getNdbError() const;
 
-private:
+  enum {
+    NoSysTables = 4714,   // all sys tables missing
+    NoIndexStats = 4715,  // given index has no stored stats
+    UsageError = 4716,    // wrong state, invalid input
+    NoMemError = 4717,
+    InvalidCache = 4718,
+    InternalError = 4719,
+    BadSysTables = 4720,  // sys tables partly missing or invalid
+    HaveSysTables = 4244  // create error if all sys tables exist
+  };
+
   /*
-   * There are 2 areas: start keys and end keys.  An area has pointers
-   * at beginning and entries at end.  Pointers are sorted by key.
+   * Methods for sys tables.
    *
-   * A pointer contains entry offset and also entry timestamp.  An entry
-   * contains the key and percentage of rows _not_ satisfying the bound
-   * i.e. less than start key or greater than end key.
-   *
-   * A key is an array of index key bounds.  Each has type (0-4) in
-   * first word followed by data with AttributeHeader.
-   *
-   * Stat update comes as pair of start and end key and associated
-   * percentages.  Stat query takes best match of start and end key from
-   * each area separately.  Rows in range percentage is then computed by
-   * excluding the two i.e. as 100 - (start key pct + end key pct).
-   *
-   * TODO use more compact key format
-   */
-  struct Pointer;
-  friend struct Pointer;
-  struct Entry;
-  friend struct Entry;
-  struct Area;
-  friend struct Area;
-  struct Pointer {
-    Uint16 m_pos;
-    Uint16 m_seq;
-  };
-  struct Entry {
-    float m_pct;
-    Uint32 m_keylen;
+   * Create fails if any objects exist.  Specific errors are
+   * BadSysTables (drop required) and HaveSysTables.
+   *
+   * Drop always succeeds and drops any objects that exist.
+   *
+   * Check succeeds if all correct objects exist.  Specific errors are
+   * BadSysTables (drop required) and NoSysTables.
+   *
+   * Database of the Ndb object is used and must be "mysql" for kernel
+   * to see the tables.
+   */
+  int create_systables(Ndb* ndb);
+  int drop_systables(Ndb* ndb);
+  int check_systables(Ndb* ndb);
+
+  /*
+   * Set index operated on.  Allocates internal structs.  Makes no
+   * database access and keeps no references to the objects.
+   */
+  int set_index(const NdbDictionary::Index& index,
+                const NdbDictionary::Table& table);
+
+  /*
+   * Release index.  Required only if re-used for another index.
+   */
+  void reset_index();
+
+  /*
+   * Trivial invocation of NdbDictionary::Dictionary::updateIndexStat.
+   */
+  int update_stat(Ndb* ndb);
+
+  /*
+   * Trivial invocation of NdbDictionary::Dictionary::deleteIndexStat.
+   */
+  int delete_stat(Ndb* ndb);
+
+  /*
+   * Cache types.
+   */
+  enum CacheType {
+    CacheBuild = 1,     // new cache under construction
+    CacheQuery = 2,     // cache used to answer queries
+    CacheClean = 3      // old caches waiting to be deleted
+  };
+
+  /*
+   * Move CacheQuery (if any) to CacheClean and CacheBuild (if any) to
+   * CacheQuery.  The CacheQuery switch is atomic.
+   */
+  void move_cache();
+
+  /*
+   * Delete all CacheClean instances.  This can be safely done after old
+   * cache queries have finished.  Cache queries are fast since they do
+   * binary searches in memory.
+   */
+  void clean_cache();
+
+  /*
+   * Cache info.  CacheClean may have several instances and the values
+   * for them are summed up.
+   */
+  struct CacheInfo {
+    Uint32 m_count;       // number of instances
+    Uint32 m_valid;       // should be except for incomplete CacheBuild
+    Uint32 m_sampleCount; // number of samples
+    Uint32 m_totalBytes;  // total bytes memory used
+    Uint64 m_save_time;   // microseconds to read stats into cache
+    Uint64 m_sort_time;   // microseconds to sort the cache
+    // end v4 fields
   };
-  STATIC_CONST( EntrySize = sizeof(Entry) >> 2 );
-  STATIC_CONST( PointerSize = sizeof(Pointer) >> 2 );
+
+  /*
+   * Get info about a cache type.
+   */
+  void get_cache_info(CacheInfo& info, CacheType type) const;
+
+  /*
+   * Saved head record retrieved with get_head().  The database fields
+   * are updated by any method which reads stats tables.  Stats exist if
+   * sampleVersion is not zero.
+   */
+  struct Head {
+    Int32 m_found;        // -1 no read done, 0 = no record, 1 = exists
+    Uint32 m_indexId;
+    Uint32 m_indexVersion;
+    Uint32 m_tableId;
+    Uint32 m_fragCount;
+    Uint32 m_valueFormat;
+    Uint32 m_sampleVersion;
+    Uint32 m_loadTime;
+    Uint32 m_sampleCount;
+    Uint32 m_keyBytes;
+    // end v4 fields
+  };
+
+  /*
+   * Get latest saved head record.  Makes no database access.
+   */
+  void get_head(Head& head) const;
+
+  /*
+   * Read stats head record for the index.  Returns error and sets code
+   * to NoIndexStats if head record does not exist or sample version is
+   * zero.  Use get_head() to retrieve the results.
+   */
+  int read_head(Ndb* ndb);
+
+  /*
+   * Read current version of stats into CacheBuild.  A move_cache() is
+   * required before it is available for queries.
+   */
+  int read_stat(Ndb* ndb);
+
+  /*
+   * Reader provides bounds for cache query.  The struct must be
+   * initialized from a thread-local byte buffer of the given size.
+   * NdbIndexStat instance is used and must have index set.  Note that
+   * a bound becomes low or high only as part of Range.
+   */
+  enum { BoundBufferBytes = 8192 };
+  struct Bound {
+    Bound(const NdbIndexStat* is, void* buffer);
+    void* m_impl;
+  };
+
+  /*
+   * Add non-NULL attribute value to the bound.  May return error for
+   * invalid data.
+   */
+  int add_bound(Bound& bound, const void* value);
+
+  /*
+   * Add NULL attribute value to the bound.
+   */
+  int add_bound_null(Bound& bound);
+
+  /*
+   * A non-empty bound must be set strict (true) or non-strict (false).
+   * For empty bound this must remain unset (-1).
+   */
+  void set_bound_strict(Bound& bound, int strict);
+
+  /*
+   * To re-use same bound instance, a reset is required.
+   */
+  void reset_bound(Bound& bound);
+
+  /*
+   * Queries take a range consisting of low and high bound (start key
+   * and end key in mysql).
+   */
+  struct Range {
+    Range(Bound& bound1, Bound& bound2);
+    Bound& m_bound1;
+    Bound& m_bound2;
+  };
+
+  /*
+   * After defining bounds, the range must be finalized.  This updates
+   * internal info.  Usage error is possible.
+   */
+  int finalize_range(Range& range);
+
+  /*
+   * Reset the bounds.
+   */
+  void reset_range(Range& range);
+
+  /*
+   * Convert NdbRecord index bound to Range.  Invokes reset and finalize
+   * and cannot be mixed with the other methods.
+   */
+  int convert_range(Range& range,
+                    const NdbRecord* key_record,
+                    const NdbIndexScanOperation::IndexBound* ib);
+
+  /*
+   * Reader provides storage for stats values.  The struct must be
+   * initialized from a thread-local byte buffer of the given size.
+   */
+  enum { StatBufferBytes = 2048 };
+  struct Stat {
+    Stat(void* buffer);
+    void* m_impl;
+  };
+
+  /*
+   * Compute Stat for a Range from the query cache.  Returns error
+   * if there is no valid query cache.  The Stat is used to get
+   * stats values without further reference to the Range.
+   */
+  int query_stat(const Range& range, Stat& stat);
+
+  /*
+   * Check if range is empty i.e. bound1 >= bound2 (for bounds this
+   * means empty) or the query cache is empty.  The RIR and RPK return
+   * 1.0 if range is empty.
+   */
+  static void get_empty(const Stat& stat, bool* empty);
+
+  /*
+   * Get estimated RIR (records in range).  Value is always >= 1.0 since
+   * no exact 0 rows can be returned.
+   */
+  static void get_rir(const Stat& stat, double* rir);
+
+  /*
+   * Get estimated RPK (records per key) at given level k (from 0 to
+   * NK-1 where NK = number of index keys).  Value is >= 1.0.
+   */
+  static void get_rpk(const Stat& stat, Uint32 k, double* rpk);
+
+  /*
+   * Get a short string summarizing the rules used.
+   */
+  enum { RuleBufferBytes = 80 };
+  static void get_rule(const Stat& stat, char* buffer);
+
+  /*
+   * Memory allocator for the stats caches.  By default each instance
+   * uses its own malloc-based implementation.
+   */
+  struct Mem {
+    Mem();
+    virtual ~Mem();
+    virtual void* mem_alloc(size_t size) = 0;
+    virtual void mem_free(void* ptr) = 0;
+    virtual size_t mem_used() const = 0;
+  };
+
+  /*
+   * Set a non-default memory allocator.
+   */
+  void set_mem_handler(Mem* mem);
+
+  // get impl class for use in NDB API programs
+  NdbIndexStatImpl& getImpl();
+
+private:
   /* Need 2 words per column in a bound plus space for the
    * bound data.
    * Worst case is 32 cols in key and max key size used.
    */
   STATIC_CONST( BoundBufWords = (2 * NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY)
                 + NDB_MAX_KEYSIZE_IN_WORDS );
-  struct Area {
-    Uint32* m_data;
-    Uint32 m_offset;
-    Uint32 m_free;
-    Uint16 m_entries;
-    Uint8 m_idir;
-    Uint8 pad1;
-    Area() {}
-    Pointer& get_pointer(unsigned i) const {
-      return *(Pointer*)&m_data[i];
-    }
-    Entry& get_entry(unsigned i) const {
-      return *(Entry*)&m_data[get_pointer(i).m_pos];
-    }
-    Uint32 get_pos(const Entry& e) const {
-      return (const Uint32*)&e - m_data;
-    }
-    unsigned get_firstpos() const {
-      return PointerSize * m_entries + m_free;
-    }
-  };
-  const NdbIndexImpl& m_index;
-  Uint32 m_areasize;
-  Uint16 m_seq;
-  Area m_area[2];
-  Uint32* m_cache;
-  NdbError m_error;
-#ifdef VM_TRACE
-  void stat_verify();
-#endif
-  int stat_cmpkey(const Area& a, const Uint32* key1, Uint32 keylen1,
-                  const Uint32* key2, Uint32 keylen2);
-  int stat_search(const Area& a, const Uint32* key, Uint32 keylen,
-                  Uint32* idx, bool* match);
-  int stat_oldest(const Area& a);
-  int stat_delete(Area& a, Uint32 k);
-  int stat_update(const Uint32* key1, Uint32 keylen1,
-                  const Uint32* key2, Uint32 keylen2, const float pct[2]);
-  int stat_select(const Uint32* key1, Uint32 keylen1,
-                  const Uint32* key2, Uint32 keylen2, float pct[2]);
-  void set_error(int code);
   int addKeyPartInfo(const NdbRecord* record,
                      const char* keyRecordData,
                      Uint32 keyPartNum,
                      const NdbIndexScanOperation::BoundType boundType,
                      Uint32* keyStatData,
                      Uint32& keyLength);
+
+  // stored stats
+
+  friend class NdbIndexStatImpl;
+  NdbIndexStat(NdbIndexStatImpl& impl);
+  NdbIndexStatImpl& m_impl;
 };
 
+class NdbOut&
+operator<<(class NdbOut& out, const NdbIndexStat::Error&);
+
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.cpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2011-05-31 12:35:28 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2011-06-06 12:18:27 +0000
@@ -1760,7 +1760,8 @@ Trix::g_statMetaSample = {
 
 const Trix::SysIndex
 Trix::g_statMetaSampleX1 = {
-  NDB_INDEX_STAT_DB "/" NDB_INDEX_STAT_SCHEMA "/%u/" NDB_INDEX_STAT_SAMPLE_INDEX1,
+  // indexes are always in "sys"
+  "sys" "/" NDB_INDEX_STAT_SCHEMA "/%u/" NDB_INDEX_STAT_SAMPLE_INDEX1,
   ~(Uint32)0,
   ~(Uint32)0
 };

=== modified file 'storage/ndb/src/ndbapi/API.hpp'
--- a/storage/ndb/src/ndbapi/API.hpp	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/src/ndbapi/API.hpp	2011-06-06 12:18:27 +0000
@@ -35,6 +35,8 @@
 #include <NdbIndexOperation.hpp>
 #include <NdbScanOperation.hpp>
 #include <NdbIndexScanOperation.hpp>
+#include <NdbIndexStat.hpp>
+#include "NdbIndexStatImpl.hpp"
 #include <NdbRecAttr.hpp>
 #include <NdbReceiver.hpp>
 #include <NdbBlob.hpp>

=== modified file 'storage/ndb/src/ndbapi/CMakeLists.txt'
--- a/storage/ndb/src/ndbapi/CMakeLists.txt	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/src/ndbapi/CMakeLists.txt	2011-06-06 12:18:27 +0000
@@ -21,6 +21,7 @@ ADD_LIBRARY(ndbapi STATIC
             NdbEventOperation.cpp
             NdbEventOperationImpl.cpp
             NdbIndexStat.cpp
+	    NdbIndexStatImpl.cpp
             NdbInterpretedCode.cpp
             TransporterFacade.cpp
             ClusterMgr.cpp

=== modified file 'storage/ndb/src/ndbapi/Makefile.am'
--- a/storage/ndb/src/ndbapi/Makefile.am	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/src/ndbapi/Makefile.am	2011-06-06 12:18:27 +0000
@@ -58,6 +58,7 @@ libndbapi_la_SOURCES = \
         ndb_cluster_connection.cpp \
 	NdbBlob.cpp \
 	NdbIndexStat.cpp \
+	NdbIndexStatImpl.cpp \
         SignalSender.cpp \
         ObjectMap.cpp \
 	NdbInterpretedCode.cpp \

=== modified file 'storage/ndb/src/ndbapi/NdbDictionary.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionary.cpp	2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionary.cpp	2011-06-06 12:18:27 +0000
@@ -2588,6 +2588,62 @@ NdbDictionary::Dictionary::dropIndexGlob
   return ret;
 }
 
+int
+NdbDictionary::Dictionary::updateIndexStat(const Index& index,
+                                           const Table& table)
+{
+  int ret;
+  DO_TRANS(
+    ret,
+    m_impl.updateIndexStat(NdbIndexImpl::getImpl(index),
+                           NdbTableImpl::getImpl(table))
+  );
+  return ret;
+}
+
+int
+NdbDictionary::Dictionary::updateIndexStat(Uint32 indexId,
+                                           Uint32 indexVersion,
+                                           Uint32 tableId)
+{
+  int ret;
+  DO_TRANS(
+    ret,
+    m_impl.updateIndexStat(indexId,
+                           indexVersion,
+                           tableId)
+  );
+  return ret;
+}
+
+int
+NdbDictionary::Dictionary::deleteIndexStat(const Index& index,
+                                           const Table& table)
+{
+  int ret;
+  DO_TRANS(
+    ret,
+    m_impl.deleteIndexStat(NdbIndexImpl::getImpl(index),
+                           NdbTableImpl::getImpl(table))
+  );
+  return ret;
+}
+
+int
+NdbDictionary::Dictionary::deleteIndexStat(Uint32 indexId,
+                                           Uint32 indexVersion,
+                                           Uint32 tableId)
+{
+  int ret;
+  DO_TRANS(
+    ret,
+    m_impl.deleteIndexStat(indexId,
+                           indexVersion,
+                           tableId)
+  );
+  return ret;
+}
+
 const NdbDictionary::Index * 
 NdbDictionary::Dictionary::getIndex(const char * indexName,
 				    const char * tableName) const
@@ -2756,7 +2812,6 @@ NdbDictionary::Dictionary::listIndexes(L
   return m_impl.listIndexes(list, table.getTableId());
 }
 
-
 const struct NdbError & 
 NdbDictionary::Dictionary::getNdbError() const {
   return m_impl.getNdbError();

=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2011-05-31 08:28:58 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2011-06-06 12:18:27 +0000
@@ -26,6 +26,7 @@
 #include <NdbMem.h>
 #include <util/version.h>
 #include <NdbSleep.h>
+#include <signaldata/IndexStatSignal.hpp>
 
 #include <signaldata/GetTabInfo.hpp>
 #include <signaldata/DictTabInfo.hpp>
@@ -2190,6 +2191,12 @@ NdbDictInterface::execSignal(void* dictI
   case GSN_DROP_INDX_CONF:
     tmp->execDROP_INDX_CONF(signal, ptr);
     break;
+  case GSN_INDEX_STAT_CONF:
+    tmp->execINDEX_STAT_CONF(signal, ptr);
+    break;
+  case GSN_INDEX_STAT_REF:
+    tmp->execINDEX_STAT_REF(signal, ptr);
+    break;
   case GSN_CREATE_EVNT_REF:
     tmp->execCREATE_EVNT_REF(signal, ptr);
     break;
@@ -4330,6 +4337,100 @@ NdbDictInterface::execCREATE_INDX_REF(co
   m_error.code = ref->errorCode;
   if (m_error.code == ref->NotMaster)
     m_masterNodeId = ref->masterNodeId;
+  m_impl->theWaiter.signal(NO_WAIT);
+}
+
+// INDEX_STAT
+
+int
+NdbDictionaryImpl::updateIndexStat(const NdbIndexImpl& index,
+                                   const NdbTableImpl& table)
+{
+  Uint32 rt = IndexStatReq::RT_UPDATE_STAT;
+  return m_receiver.doIndexStatReq(m_ndb, index, table, rt);
+}
+
+int
+NdbDictionaryImpl::updateIndexStat(Uint32 indexId,
+                                   Uint32 indexVersion,
+                                   Uint32 tableId)
+{
+  Uint32 rt = IndexStatReq::RT_UPDATE_STAT;
+  return m_receiver.doIndexStatReq(m_ndb, indexId, indexVersion, tableId, rt);
+}
+
+int
+NdbDictionaryImpl::deleteIndexStat(const NdbIndexImpl& index,
+                                   const NdbTableImpl& table)
+{
+  Uint32 rt = IndexStatReq::RT_DELETE_STAT;
+  return m_receiver.doIndexStatReq(m_ndb, index, table, rt);
+}
+
+int
+NdbDictionaryImpl::deleteIndexStat(Uint32 indexId,
+                                   Uint32 indexVersion,
+                                   Uint32 tableId)
+{
+  Uint32 rt = IndexStatReq::RT_DELETE_STAT;
+  return m_receiver.doIndexStatReq(m_ndb, indexId, indexVersion, tableId, rt);
+}
+
+int
+NdbDictInterface::doIndexStatReq(Ndb& ndb,
+                                 const NdbIndexImpl& index,
+                                 const NdbTableImpl& table,
+                                 Uint32 rt)
+{
+  return doIndexStatReq(ndb, index.m_id, index.m_version, table.m_id, rt);
+}
+
+int
+NdbDictInterface::doIndexStatReq(Ndb& ndb,
+                                 Uint32 indexId,
+                                 Uint32 indexVersion,
+                                 Uint32 tableId,
+                                 Uint32 requestType)
+{
+  NdbApiSignal tSignal(m_reference);
+  tSignal.theReceiversBlockNumber = DBDICT;
+  tSignal.theVerId_signalNumber = GSN_INDEX_STAT_REQ;
+  tSignal.theLength = IndexStatReq::SignalLength;
+
+  IndexStatReq* req = CAST_PTR(IndexStatReq, tSignal.getDataPtrSend());
+  req->clientRef = m_reference;
+  req->clientData = 0;
+  req->transId = m_tx.transId();
+  req->transKey = m_tx.transKey();
+  req->requestInfo = requestType;
+  req->requestFlag = 0;
+  req->indexId = indexId;
+  req->indexVersion = indexVersion;
+  req->tableId = tableId;
+
+  int errCodes[] = { IndexStatRef::Busy, IndexStatRef::NotMaster, 0 };
+  return dictSignal(&tSignal, 0, 0,
+                    0,
+                    WAIT_CREATE_INDX_REQ,
+                    DICT_WAITFOR_TIMEOUT, 100,
+                    errCodes);
+}
+
+void
+NdbDictInterface::execINDEX_STAT_CONF(const NdbApiSignal * signal,
+				      const LinearSectionPtr ptr[3])
+{
+  m_impl->theWaiter.signal(NO_WAIT);
+}
+
+void
+NdbDictInterface::execINDEX_STAT_REF(const NdbApiSignal * signal,
+				     const LinearSectionPtr ptr[3])
+{
+  const IndexStatRef* ref = CAST_CONSTPTR(IndexStatRef, signal->getDataPtr());
+  m_error.code = ref->errorCode;
+  if (m_error.code == ref->NotMaster)
+    m_masterNodeId = ref->masterNodeId;
   m_impl->theWaiter.signal(NO_WAIT);
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2011-06-06 12:18:27 +0000
@@ -641,6 +641,12 @@ public:
   int createIndex(class Ndb & ndb, const NdbIndexImpl &, const NdbTableImpl &,
                   bool offline);
   int dropIndex(const NdbIndexImpl &, const NdbTableImpl &);
+  int doIndexStatReq(class Ndb& ndb,
+                     const NdbIndexImpl&, const NdbTableImpl&,
+                     Uint32 requestType);
+  int doIndexStatReq(class Ndb& ndb,
+                     Uint32 indexId, Uint32 indexVersion, Uint32 tableId,
+                     Uint32 requestType);
   
   int createEvent(class Ndb & ndb, NdbEventImpl &, int getFlag);
   int dropEvent(const NdbEventImpl &);
@@ -736,6 +742,9 @@ private:
   void execDROP_INDX_REF(const NdbApiSignal *, const LinearSectionPtr ptr[3]);
   void execDROP_INDX_CONF(const NdbApiSignal *, const LinearSectionPtr ptr[3]);
 
+  void execINDEX_STAT_CONF(const NdbApiSignal *, const LinearSectionPtr ptr[3]);
+  void execINDEX_STAT_REF(const NdbApiSignal *, const LinearSectionPtr ptr[3]);
+
   void execCREATE_EVNT_REF(const NdbApiSignal *, const LinearSectionPtr pr[3]);
   void execCREATE_EVNT_CONF(const NdbApiSignal *, const LinearSectionPtr p[3]);
   void execSUB_START_CONF(const NdbApiSignal*, const LinearSectionPtr ptr[3]);
@@ -846,6 +855,11 @@ public:
   NdbTableImpl * getIndexTable(NdbIndexImpl * index, 
 			       NdbTableImpl * table);
 
+  int updateIndexStat(const NdbIndexImpl&, const NdbTableImpl&);
+  int updateIndexStat(Uint32 indexId, Uint32 indexVersion, Uint32 tableId);
+  int deleteIndexStat(const NdbIndexImpl&, const NdbTableImpl&);
+  int deleteIndexStat(Uint32 indexId, Uint32 indexVersion, Uint32 tableId);
+
   int createEvent(NdbEventImpl &);
   int createBlobEvents(NdbEventImpl &);
   int dropEvent(const char * eventName, int force);

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStat.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStat.cpp	2011-05-04 14:45:46 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStat.cpp	2011-06-06 12:18:27 +0000
@@ -23,357 +23,23 @@
 #include "NdbDictionaryImpl.hpp"
 #include <NdbInterpretedCode.hpp>
 #include <NdbRecord.hpp>
+#include "NdbIndexStatImpl.hpp"
 
-NdbIndexStat::NdbIndexStat(const NdbDictionary::Index* index) :
-  m_index(index->m_impl),
-  m_cache(NULL)
+NdbIndexStat::NdbIndexStat() :
+  m_impl(*new NdbIndexStatImpl(*this))
 {
 }
 
-NdbIndexStat::~NdbIndexStat()
-{
-  delete [] m_cache;
-  m_cache = NULL;
-}
-
-int
-NdbIndexStat::alloc_cache(Uint32 entries)
-{
-  delete [] m_cache;
-  m_cache = NULL;
-  if (entries == 0) {
-    return 0;
-  }
-  Uint32 i;
-  Uint32 keysize = 0;
-  for (i = 0; i < m_index.m_columns.size(); i++) {
-    NdbColumnImpl* c = m_index.m_columns[i];
-    keysize += 2;       // counting extra headers
-    keysize += (c->m_attrSize * c->m_arraySize + 3 ) / 4;
-  }
-  Uint32 areasize = entries * (PointerSize + EntrySize + keysize);
-  if (areasize > (1 << 16))
-    areasize = (1 << 16);
-  Uint32 cachesize = 2 * areasize;
-  m_cache = new Uint32 [cachesize];
-  if (m_cache == NULL) {
-    set_error(4000);
-    return -1;
-  }
-  m_areasize = areasize;
-  m_seq = 0;
-  Uint32 idir;
-  for (idir = 0; idir <= 1; idir++) {
-    Area& a = m_area[idir];
-    a.m_data = &m_cache[idir * areasize];
-    a.m_offset = Uint32(a.m_data - &m_cache[0]);
-    a.m_free = areasize;
-    a.m_entries = 0;
-    a.m_idir = idir;
-    a.pad1 = 0;
-  }
-#ifdef VM_TRACE
-  memset(&m_cache[0], 0x3f, cachesize << 2);
-#endif
-  return 0;
-}
-
-#ifndef VM_TRACE
-#define stat_verify()
-#else
-void
-NdbIndexStat::stat_verify()
-{
-  Uint32 idir;
-  for (idir = 0; idir <= 1; idir++) {
-    Uint32 i;
-    const Area& a = m_area[idir];
-    assert(a.m_offset == idir * m_areasize);
-    assert(a.m_data == &m_cache[a.m_offset]);
-    Uint32 pointerwords = PointerSize * a.m_entries;
-    Uint32 entrywords = 0;
-    for (i = 0; i < a.m_entries; i++) {
-      const Pointer& p = a.get_pointer(i);
-      const Entry& e = a.get_entry(i);
-      assert(a.get_pos(e) == p.m_pos);
-      entrywords += EntrySize + e.m_keylen;
-    }
-    assert(a.m_free <= m_areasize);
-    assert(pointerwords + a.m_free + entrywords == m_areasize);
-    Uint32 off = pointerwords + a.m_free;
-    for (i = 0; i < a.m_entries; i++) {
-      assert(off < m_areasize);
-      const Entry& e = *(const Entry*)&a.m_data[off];
-      off += EntrySize + e.m_keylen;
-    }
-    assert(off == m_areasize);
-    for (i = 0; i < a.m_entries; i++) {
-      const Entry& e = a.get_entry(i);
-      const Uint32* entrykey = (const Uint32*)&e + EntrySize;
-      Uint32 n = 0;
-      while (n + 2 <= e.m_keylen) {
-        Uint32 t = entrykey[n++];
-        assert(t == 2 * idir || t == 2 * idir + 1 || t == 4);
-        AttributeHeader ah = *(const AttributeHeader*)&entrykey[n++];
-        n += ah.getDataSize();
-      }
-      assert(n == e.m_keylen);
-    }
-    for (i = 0; i + 1 < a.m_entries; i++) {
-      const Entry& e1 = a.get_entry(i);
-      const Entry& e2 = a.get_entry(i + 1);
-      const Uint32* entrykey1 = (const Uint32*)&e1 + EntrySize;
-      const Uint32* entrykey2 = (const Uint32*)&e2 + EntrySize;
-      int ret = stat_cmpkey(a, entrykey1, e1.m_keylen, entrykey2, e2.m_keylen);
-      assert(ret < 0);
-    }
-  }
-}
-#endif
-
-// compare keys
-int
-NdbIndexStat::stat_cmpkey(const Area& a, const Uint32* key1, Uint32 keylen1, const Uint32* key2, Uint32 keylen2)
-{
-  const Uint32 idir = a.m_idir;
-  const int jdir = 1 - 2 * int(idir);
-  Uint32 i1 = 0, i2 = 0;
-  Uint32 t1 = 4, t2 = 4; //BoundEQ
-  int ret = 0;
-  Uint32 k = 0;
-  while (k < m_index.m_columns.size()) {
-    NdbColumnImpl* c = m_index.m_columns[k];
-    Uint32 n = c->m_attrSize * c->m_arraySize;
-    // absence of keypart is treated specially
-    bool havekp1 = (i1 + 2 <= keylen1);
-    bool havekp2 = (i2 + 2 <= keylen2);
-    AttributeHeader ah1;
-    AttributeHeader ah2;
-    if (havekp1) {
-      t1 = key1[i1++];
-      assert(t1 == 2 * idir || t1 == 2 * idir + 1 || t1 == 4);
-      ah1 = *(const AttributeHeader*)&key1[i1++];
-    }
-    if (havekp2) {
-      t2 = key2[i2++];
-      assert(t2 == 2 * idir || t2 == 2 * idir + 1 || t2 == 4);
-      ah2 = *(const AttributeHeader*)&key2[i2++];
-    }
-    if (havekp1) {
-      if (havekp2) {
-        if (! ah1.isNULL()) {
-          if (! ah2.isNULL()) {
-            const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(c->m_type);
-            ret = (*sqlType.m_cmp)(c->m_cs, &key1[i1], n, &key2[i2], n);
-            if (ret != 0)
-              break;
-          } else {
-            ret = +1;
-            break;
-          }
-        } else if (! ah2.isNULL()) {
-          ret = -1;
-          break;
-        }
-      } else {
-        ret = +jdir;
-        break;
-      }
-    } else {
-      if (havekp2) {
-        ret = -jdir;
-        break;
-      } else {
-        // no more keyparts on either side
-        break;
-      }
-    }
-    i1 += ah1.getDataSize();
-    i2 += ah2.getDataSize();
-    k++;
-  }
-  if (ret == 0) {
-    // strict bound is greater as start key and less as end key
-    int s1 = t1 & 1;
-    int s2 = t2 & 1;
-    ret = (s1 - s2) * jdir;
-  }
-  return ret;
-}
-
-// find first key >= given key
-int
-NdbIndexStat::stat_search(const Area& a, const Uint32* key, Uint32 keylen, Uint32* idx, bool* match)
-{
-  // points at minus/plus infinity
-  int lo = -1;
-  int hi = a.m_entries;
-  // loop invariant: key(lo) < key < key(hi)
-  while (hi - lo > 1) {
-    // observe lo < j < hi
-    int j = (hi + lo) / 2;
-    Entry& e = a.get_entry(j);
-    const Uint32* key2 = (Uint32*)&e + EntrySize;
-    Uint32 keylen2 = e.m_keylen;
-    int ret = stat_cmpkey(a, key, keylen, key2, keylen2);
-    // observe the loop invariant if ret != 0
-    if (ret < 0)
-      hi = j;
-    else if (ret > 0)
-      lo = j;
-    else {
-      *idx = j;
-      *match = true;
-      return 0;
-    }
-  }
-  // hi - lo == 1 and key(lo) < key < key(hi)
-  *idx = hi;
-  *match = false;
-  return 0;
-}
-
-// find oldest entry
-int
-NdbIndexStat::stat_oldest(const Area& a)
-{
-  Uint32 i, k= 0, m;
-  bool found = false;
-  m = ~(Uint32)0;     // shut up incorrect CC warning
-  for (i = 0; i < a.m_entries; i++) {
-    Pointer& p = a.get_pointer(i);
-    Uint32 m2 = m_seq >= p.m_seq ? m_seq - p.m_seq : p.m_seq - m_seq;
-    if (! found || m < m2) {
-      m = m2;
-      k = i;
-      found = true;
-    }
-  }
-  assert(found);
-  return k;
-}
-
-// delete entry
-int
-NdbIndexStat::stat_delete(Area& a, Uint32 k)
-{
-  Uint32 i;
-  NdbIndexStat::Entry& e = a.get_entry(k);
-  Uint32 entrylen = EntrySize + e.m_keylen;
-  Uint32 pos = a.get_pos(e);
-  // adjust pointers to entries after
-  for (i = 0; i < a.m_entries; i++) {
-    Pointer& p = a.get_pointer(i);
-    if (p.m_pos < pos) {
-      p.m_pos += entrylen;
-    }
-  }
-  // compact entry area
-  unsigned firstpos = a.get_firstpos();
-  for (i = pos; i > firstpos; i--) {
-    a.m_data[i + entrylen - 1] = a.m_data[i - 1];
-  }
-  // compact pointer area
-  for (i = k; i + 1 < a.m_entries; i++) {
-    NdbIndexStat::Pointer& p = a.get_pointer(i);
-    NdbIndexStat::Pointer& q = a.get_pointer(i + 1);
-    p = q;
-  }
-  a.m_free += PointerSize + entrylen;
-  a.m_entries--;
-  stat_verify();
-  return 0;
-}
-
-// update or insert stat values
-int
-NdbIndexStat::stat_update(const Uint32* key1, Uint32 keylen1, const Uint32* key2, Uint32 keylen2, const float pct[2])
+NdbIndexStat::NdbIndexStat(NdbIndexStatImpl& impl) :
+  m_impl(impl)
 {
-  const Uint32* const key[2] = { key1, key2 };
-  const Uint32 keylen[2] = { keylen1, keylen2 };
-  Uint32 idir;
-  for (idir = 0; idir <= 1; idir++) {
-    Area& a = m_area[idir];
-    Uint32 k;
-    bool match;
-    stat_search(a, key[idir], keylen[idir], &k, &match);
-    Uint16 seq = m_seq++;
-    if (match) {
-      // update old entry
-      NdbIndexStat::Pointer& p = a.get_pointer(k);
-      NdbIndexStat::Entry& e = a.get_entry(k);
-      e.m_pct = pct[idir];
-      p.m_seq = seq;
-    } else {
-      Uint32 entrylen = NdbIndexStat::EntrySize + keylen[idir];
-      Uint32 need = NdbIndexStat::PointerSize + entrylen;
-      while (need > a.m_free) {
-        Uint32 j = stat_oldest(a);
-        if (j < k)
-          k--;
-        stat_delete(a, j);
-      }
-      // insert pointer
-      Uint32 i;
-      for (i = a.m_entries; i > k; i--) {
-        NdbIndexStat::Pointer& p1 = a.get_pointer(i);
-        NdbIndexStat::Pointer& p2 = a.get_pointer(i - 1);
-        p1 = p2;
-      }
-      NdbIndexStat::Pointer& p = a.get_pointer(k);
-      // insert entry
-      Uint32 firstpos = a.get_firstpos();
-      p.m_pos = firstpos - entrylen;
-      NdbIndexStat::Entry& e = a.get_entry(k);
-      e.m_pct = pct[idir];
-      e.m_keylen = keylen[idir];
-      Uint32* entrykey = (Uint32*)&e + EntrySize;
-      for (i = 0; i < keylen[idir]; i++) {
-        entrykey[i] = key[idir][i];
-      }
-      p.m_seq = seq;
-      // total
-      a.m_free -= PointerSize + entrylen;
-      a.m_entries++;
-    }
-  }
-  stat_verify();
-  return 0;
 }
 
-int
-NdbIndexStat::stat_select(const Uint32* key1, Uint32 keylen1, const Uint32* key2, Uint32 keylen2, float pct[2])
+NdbIndexStat::~NdbIndexStat()
 {
-  const Uint32* const key[2] = { key1, key2 };
-  const Uint32 keylen[2] = { keylen1, keylen2 };
-  Uint32 idir;
-  for (idir = 0; idir <= 1; idir++) {
-    Area& a = m_area[idir];
-    Uint32 k;
-    bool match;
-    stat_search(a, key[idir], keylen[idir], &k, &match);
-    if (match) {
-      NdbIndexStat::Entry& e = a.get_entry(k);
-      pct[idir] = e.m_pct;
-    } else if (k == 0) {
-      NdbIndexStat::Entry& e = a.get_entry(k);
-      if (idir == 0)
-        pct[idir] = e.m_pct / 2;
-      else
-        pct[idir] = e.m_pct + (1 - e.m_pct) / 2;
-    } else if (k == a.m_entries) {
-      NdbIndexStat::Entry& e = a.get_entry(k - 1);
-      if (idir == 0)
-        pct[idir] = e.m_pct + (1 - e.m_pct) / 2;
-      else
-        pct[idir] = e.m_pct / 2;
-    } else {
-      NdbIndexStat::Entry& e1 = a.get_entry(k - 1);
-      NdbIndexStat::Entry& e2 = a.get_entry(k);
-      pct[idir] = (e1.m_pct + e2.m_pct) / 2;
-    }
-  }
-  return 0;
+  NdbIndexStatImpl* impl = &m_impl;
+  if (this != impl)
+    delete impl;
 }
 
 /** 
@@ -424,7 +90,7 @@ NdbIndexStat::addKeyPartInfo(const NdbRe
       len_ok= column->get_var_length(keyRecordData, len);
     }
     if (!len_ok) {
-      set_error(4209);
+      m_impl.setError(4209, __LINE__);
       return -1;
     }
   }
@@ -439,7 +105,7 @@ NdbIndexStat::addKeyPartInfo(const NdbRe
   {
     /* Something wrong, key data would be too big */
     /* Key size is limited to 4092 bytes */
-    set_error(4207);
+    m_impl.setError(4207, __LINE__);
     return -1;
   }
 
@@ -470,12 +136,8 @@ NdbIndexStat::records_in_range(const Ndb
   Uint32 key1[BoundBufWords], keylen1;
   Uint32 key2[BoundBufWords], keylen2;
 
-  if (m_cache == NULL)
-    flags |= RR_UseDb | RR_NoUpdate;
-  else if (m_area[0].m_entries == 0 || m_area[1].m_entries == 0)
-    flags |= RR_UseDb;
-
-  if ((flags & (RR_UseDb | RR_NoUpdate)) != (RR_UseDb | RR_NoUpdate)) {
+  if (true)
+  {
     // get start and end key from NdbIndexBound, using NdbRecord to 
     // get values into a standard format.
     Uint32 maxBoundParts= (ib->low_key_count > ib->high_key_count) ? 
@@ -527,7 +189,8 @@ NdbIndexStat::records_in_range(const Ndb
     }
   }
 
-  if (flags & RR_UseDb) {
+  if (true)
+  {
     Uint32 out[4] = { 0, 0, 0, 0 };  // rows, in, before, after
     float tot[4] = { 0, 0, 0, 0 };   // totals of above
     int cnt, ret;
@@ -540,8 +203,8 @@ NdbIndexStat::records_in_range(const Ndb
     if ((code.interpret_exit_last_row() != 0) ||
         (code.finalise() != 0))
     {
-      m_error= code.getNdbError();
-      DBUG_PRINT("error", ("code: %d", m_error.code));
+      m_impl.setError(code.getNdbError().code, __LINE__);
+      DBUG_PRINT("error", ("code: %d", code.getNdbError().code));
       DBUG_RETURN(-1);
     }
 
@@ -577,14 +240,14 @@ NdbIndexStat::records_in_range(const Ndb
                               &options,
                               sizeof(NdbScanOperation::ScanOptions))))
     {
-      m_error= trans->getNdbError();
-      DBUG_PRINT("error", ("scanIndex : %d", m_error.code));
+      m_impl.setError(trans->getNdbError().code, __LINE__);
+      DBUG_PRINT("error", ("scanIndex : %d", trans->getNdbError().code));
       DBUG_RETURN(-1);
     }
 
     if (trans->execute(NdbTransaction::NoCommit,
                        NdbOperation::AbortOnError, forceSend) == -1) {
-      m_error = trans->getNdbError();
+      m_impl.setError(trans->getNdbError().code, __LINE__);
       DBUG_PRINT("error", ("trans:%d op:%d", trans->getNdbError().code,
                            op->getNdbError().code));
       DBUG_RETURN(-1);
@@ -602,35 +265,13 @@ NdbIndexStat::records_in_range(const Ndb
       cnt++;
     }
     if (ret == -1) {
-      m_error = op->getNdbError();
+      m_impl.setError(op->getNdbError().code, __LINE__);
       DBUG_PRINT("error nextResult ", ("trans:%d op:%d", trans->getNdbError().code,
                            op->getNdbError().code));
       DBUG_RETURN(-1);
     }
     op->close(forceSend);
     rows = (Uint64)tot[1];
-    if (cnt != 0 && ! (flags & RR_NoUpdate)) {
-      float pct[2];
-      pct[0] = 100 * tot[2] / tot[0];
-      pct[1] = 100 * tot[3] / tot[0];
-      DBUG_PRINT("info", ("update stat pct"
-                          " before=%.2f after=%.2f",
-                          pct[0], pct[1]));
-      stat_update(key1, keylen1, key2, keylen2, pct);
-    }
-  } else {
-      float pct[2];
-      stat_select(key1, keylen1, key2, keylen2, pct);
-      float diff = (float)100.0 - (pct[0] + pct[1]);
-      float trows = (float)table_rows;
-      DBUG_PRINT("info", ("select stat pct"
-                          " before=%.2f after=%.2f in=%.2f table_rows=%.2f",
-                          pct[0], pct[1], diff, trows));
-      rows = 0;
-      if (diff >= 0)
-        rows = (Uint64)(diff * trows / 100);
-      if (rows == 0)
-        rows = 1;
   }
 
   *count = rows;
@@ -639,14 +280,424 @@ NdbIndexStat::records_in_range(const Ndb
   DBUG_RETURN(0);
 }
 
+// stored stats
+
+int
+NdbIndexStat::create_systables(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::create_systables");
+  if (m_impl.create_systables(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::drop_systables(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::drop_systables");
+  if (m_impl.drop_systables(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::check_systables(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::check_systables");
+  if (m_impl.check_systables(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::set_index(const NdbDictionary::Index& index,
+                        const NdbDictionary::Table& table)
+{
+  DBUG_ENTER("NdbIndexStat::set_index");
+  if (m_impl.set_index(index, table) == -1)
+    DBUG_RETURN(-1);
+  m_impl.m_facadeHead.m_indexId = index.getObjectId();
+  m_impl.m_facadeHead.m_indexVersion = index.getObjectVersion();
+  m_impl.m_facadeHead.m_tableId = table.getObjectId();
+  DBUG_RETURN(0);
+}
+
+void
+NdbIndexStat::reset_index()
+{
+  DBUG_ENTER("NdbIndexStat::reset_index");
+  m_impl.reset_index();
+  DBUG_VOID_RETURN;
+}
+
+int
+NdbIndexStat::update_stat(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::update_stat");
+  if (m_impl.update_stat(ndb, m_impl.m_facadeHead) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::delete_stat(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::delete_stat");
+  if (m_impl.delete_stat(ndb, m_impl.m_facadeHead) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+// cache
+
 void
-NdbIndexStat::set_error(int code)
+NdbIndexStat::move_cache()
 {
-  m_error.code = code;
+  DBUG_ENTER("NdbIndexStat::move_cache");
+  m_impl.move_cache();
+  DBUG_VOID_RETURN;
 }
 
-const NdbError&
+void
+NdbIndexStat::clean_cache()
+{
+  DBUG_ENTER("NdbIndexStat::clean_cache");
+  m_impl.clean_cache();
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbIndexStat::get_cache_info(CacheInfo& info, CacheType type) const
+{
+  const NdbIndexStatImpl::Cache* c = 0;
+  switch (type) {
+  case CacheBuild:
+    c = m_impl.m_cacheBuild;
+    break;
+  case CacheQuery:
+    c = m_impl.m_cacheQuery;
+    break;
+  case CacheClean:
+    c = m_impl.m_cacheClean;
+    break;
+  }
+  info.m_count = 0;
+  info.m_valid = 0;
+  info.m_sampleCount = 0;
+  info.m_totalBytes = 0;
+  info.m_save_time = 0;
+  info.m_sort_time = 0;
+  while (c != 0)
+  {
+    info.m_count += 1;
+    info.m_valid += c->m_valid;
+    info.m_sampleCount += c->m_sampleCount;
+    info.m_totalBytes += c->m_keyBytes + c->m_valueBytes + c->m_addrBytes;
+    info.m_save_time += (Uint64)c->m_save_time;
+    info.m_sort_time += (Uint64)c->m_sort_time;
+    c = c->m_nextClean;
+  }
+  // build and query cache have at most one instance
+  require(type == CacheClean || info.m_count <= 1);
+}
+
+// read
+
+void
+NdbIndexStat::get_head(Head& head) const
+{
+  head = m_impl.m_facadeHead;
+}
+
+int
+NdbIndexStat::read_head(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::read_head");
+  if (m_impl.read_head(ndb, m_impl.m_facadeHead) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::read_stat(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::read_stat");
+  if (m_impl.read_stat(ndb, m_impl.m_facadeHead) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+// bound
+
+NdbIndexStat::Bound::Bound(const NdbIndexStat* is, void* buffer)
+{
+  DBUG_ENTER("NdbIndexStat::Bound::Bound");
+  require(is != 0 && is->m_impl.m_indexSet);
+  require(buffer != 0);
+  Uint8* buf = (Uint8*)buffer;
+  // bound impl
+  Uint8* buf1 = buf;
+  UintPtr ubuf1 = (UintPtr)buf1;
+  if (ubuf1 % 8 != 0)
+    buf1 += (8 - ubuf1 % 8);
+  new (buf1) NdbIndexStatImpl::Bound(is->m_impl.m_keySpec);
+  m_impl = (void*)buf1;
+  NdbIndexStatImpl::Bound& bound = *(NdbIndexStatImpl::Bound*)m_impl;
+  // bound data
+  Uint8* buf2 = buf1 + sizeof(NdbIndexStatImpl::Bound);
+  uint used = (uint)(buf2 - buf);
+  uint bytes = BoundBufferBytes - used;
+  bound.m_data.set_buf(buf2, bytes);
+  DBUG_VOID_RETURN;
+}
+
+int
+NdbIndexStat::add_bound(Bound& bound_f, const void* value)
+{
+  DBUG_ENTER("NdbIndexStat::add_bound");
+  NdbIndexStatImpl::Bound& bound =
+    *(NdbIndexStatImpl::Bound*)bound_f.m_impl;
+  Uint32 len_out;
+  if (value == 0)
+  {
+    m_impl.setError(UsageError, __LINE__);
+    DBUG_RETURN(-1);
+  }
+  if (bound.m_data.add(value, &len_out) == -1)
+  {
+    m_impl.setError(UsageError, __LINE__);
+    DBUG_RETURN(-1);
+  }
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::add_bound_null(Bound& bound_f)
+{
+  DBUG_ENTER("NdbIndexStat::add_bound_null");
+  NdbIndexStatImpl::Bound& bound =
+    *(NdbIndexStatImpl::Bound*)bound_f.m_impl;
+  Uint32 len_out;
+  if (bound.m_data.add_null(&len_out) == -1)
+  {
+    m_impl.setError(UsageError, __LINE__);
+    DBUG_RETURN(-1);
+  }
+  DBUG_RETURN(0);
+}
+
+void
+NdbIndexStat::set_bound_strict(Bound& bound_f, int strict)
+{
+  DBUG_ENTER("NdbIndexStat::set_bound_strict");
+  NdbIndexStatImpl::Bound& bound =
+    *(NdbIndexStatImpl::Bound*)bound_f.m_impl;
+  bound.m_strict = strict;
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbIndexStat::reset_bound(Bound& bound_f)
+{
+  DBUG_ENTER("NdbIndexStat::reset_bound");
+  NdbIndexStatImpl::Bound& bound =
+    *(NdbIndexStatImpl::Bound*)bound_f.m_impl;
+  bound.m_bound.reset();
+  bound.m_type = -1;
+  bound.m_strict = -1;
+  DBUG_VOID_RETURN;
+}
+
+// range
+
+NdbIndexStat::Range::Range(Bound& bound1, Bound& bound2) :
+  m_bound1(bound1),
+  m_bound2(bound2)
+{
+  DBUG_ENTER("NdbIndexStat::Range::Range");
+  DBUG_VOID_RETURN;
+}
+
+int
+NdbIndexStat::finalize_range(Range& range_f)
+{
+  DBUG_ENTER("NdbIndexStat::finalize_range");
+  Bound& bound1_f = range_f.m_bound1;
+  Bound& bound2_f = range_f.m_bound2;
+  NdbIndexStatImpl::Bound& bound1 =
+    *(NdbIndexStatImpl::Bound*)bound1_f.m_impl;
+  NdbIndexStatImpl::Bound& bound2 =
+    *(NdbIndexStatImpl::Bound*)bound2_f.m_impl;
+  NdbIndexStatImpl::Range range(bound1, bound2);
+  if (m_impl.finalize_range(range) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+void
+NdbIndexStat::reset_range(Range& range)
+{
+  DBUG_ENTER("NdbIndexStat::reset_range");
+  reset_bound(range.m_bound1);
+  reset_bound(range.m_bound2);
+  DBUG_VOID_RETURN;
+}
+
+int
+NdbIndexStat::convert_range(Range& range_f,
+                            const NdbRecord* key_record,
+                            const NdbIndexScanOperation::IndexBound* ib)
+{
+  DBUG_ENTER("NdbIndexStatImpl::convert_range");
+  Bound& bound1_f = range_f.m_bound1;
+  Bound& bound2_f = range_f.m_bound2;
+  NdbIndexStatImpl::Bound& bound1 =
+    *(NdbIndexStatImpl::Bound*)bound1_f.m_impl;
+  NdbIndexStatImpl::Bound& bound2 =
+    *(NdbIndexStatImpl::Bound*)bound2_f.m_impl;
+  NdbIndexStatImpl::Range range(bound1, bound2);
+  if (m_impl.convert_range(range, key_record, ib) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+// stat
+
+NdbIndexStat::Stat::Stat(void* buffer)
+{
+  DBUG_ENTER("NdbIndexStat::Stat::Stat");
+  require(buffer != 0);
+  Uint8* buf = (Uint8*)buffer;
+  // stat impl
+  Uint8* buf1 = buf;
+  UintPtr ubuf1 = (UintPtr)buf1;
+  if (ubuf1 % 8 != 0)
+    buf1 += (8 - ubuf1 % 8);
+  new (buf1) NdbIndexStatImpl::Stat;
+  m_impl = (void*)buf1;
+  DBUG_VOID_RETURN;
+}
+
+int
+NdbIndexStat::query_stat(const Range& range_f, Stat& stat_f)
+{
+  DBUG_ENTER("NdbIndexStat::query_stat");
+  Bound& bound1_f = range_f.m_bound1;
+  Bound& bound2_f = range_f.m_bound2;
+  NdbIndexStatImpl::Bound& bound1 =
+    *(NdbIndexStatImpl::Bound*)bound1_f.m_impl;
+  NdbIndexStatImpl::Bound& bound2 =
+    *(NdbIndexStatImpl::Bound*)bound2_f.m_impl;
+  NdbIndexStatImpl::Range range(bound1, bound2);
+#ifndef DBUG_OFF
+  const uint sz = 8000;
+  char buf[sz];
+  DBUG_PRINT("index_stat", ("lo: %s", bound1.m_bound.print(buf, sz)));
+  DBUG_PRINT("index_stat", ("hi: %s", bound2.m_bound.print(buf, sz)));
+#endif
+  NdbIndexStatImpl::Stat& stat =
+    *(NdbIndexStatImpl::Stat*)stat_f.m_impl;
+  if (m_impl.query_stat(range, stat) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+void
+NdbIndexStat::get_empty(const Stat& stat_f, bool* empty)
+{
+  DBUG_ENTER("NdbIndexStat::get_empty");
+  const NdbIndexStatImpl::Stat& stat =
+    *(const NdbIndexStatImpl::Stat*)stat_f.m_impl;
+  require(empty != 0);
+  *empty = stat.m_value.m_empty;
+  DBUG_PRINT("index_stat", ("empty:%d", *empty));
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbIndexStat::get_rir(const Stat& stat_f, double* rir)
+{
+  DBUG_ENTER("NdbIndexStat::get_rir");
+  const NdbIndexStatImpl::Stat& stat =
+    *(const NdbIndexStatImpl::Stat*)stat_f.m_impl;
+  double x = stat.m_value.m_rir;
+  if (x < 1.0)
+    x = 1.0;
+  require(rir != 0);
+  *rir = x;
+  DBUG_PRINT("index_stat", ("rir:%.2f", *rir));
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbIndexStat::get_rpk(const Stat& stat_f, Uint32 k, double* rpk)
+{
+  DBUG_ENTER("NdbIndexStat::get_rpk");
+  const NdbIndexStatImpl::Stat& stat =
+    *(const NdbIndexStatImpl::Stat*)stat_f.m_impl;
+  double x = stat.m_value.m_rir / stat.m_value.m_unq[k];
+  if (x < 1.0)
+    x = 1.0;
+  require(rpk != 0);
+  *rpk = x;
+  DBUG_PRINT("index_stat", ("rpk[%u]:%.2f", k, *rpk));
+  DBUG_VOID_RETURN;
+}
+
+void
+NdbIndexStat::get_rule(const Stat& stat_f, char* buffer)
+{
+  DBUG_ENTER("NdbIndexStat::get_rule");
+  const NdbIndexStatImpl::Stat& stat =
+    *(const NdbIndexStatImpl::Stat*)stat_f.m_impl;
+  require(buffer != 0);
+  BaseString::snprintf(buffer, RuleBufferBytes, "%s/%s/%s",
+                       stat.m_rule[0], stat.m_rule[1], stat.m_rule[2]);
+  DBUG_VOID_RETURN;
+}
+
+// mem
+
+NdbIndexStat::Mem::Mem()
+{
+}
+
+NdbIndexStat::Mem::~Mem()
+{
+}
+
+void
+NdbIndexStat::set_mem_handler(Mem* mem)
+{
+  m_impl.m_mem_handler = mem;
+}
+
+// get impl
+
+NdbIndexStatImpl&
+NdbIndexStat::getImpl()
+{
+  return m_impl;
+}
+
+// error
+
+NdbIndexStat::Error::Error()
+{
+  line = 0;
+  extra = 0;
+}
+
+const NdbIndexStat::Error&
 NdbIndexStat::getNdbError() const
 {
-  return m_error;
+  return m_impl.getNdbError();
+}
+
+class NdbOut&
+operator<<(class NdbOut& out, const NdbIndexStat::Error& error)
+{
+  out << static_cast<const NdbError&>(error);
+  out << " (line " << error.line << ", extra " << error.extra << ")";
+  return out;
 }

=== added file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-06-06 12:18:27 +0000
@@ -0,0 +1,2284 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include <ndb_global.h>
+#include <Ndb.hpp>
+#include <NdbTransaction.hpp>
+#include <NdbRecAttr.hpp>
+#include <NdbOut.hpp>
+#include <NdbEnv.h>
+#include <Bitmask.hpp>
+#include <NdbSqlUtil.hpp>
+#include <NdbRecord.hpp>
+#include "NdbIndexStatImpl.hpp"
+
+#undef min
+#undef max
+#define min(a, b) ((a) <= (b) ? (a) : (b))
+#define max(a, b) ((a) >= (b) ? (a) : (b))
+
+static const char* const g_headtable_name = "NDB$IS_HEAD";
+static const char* const g_sampletable_name = "NDB$IS_SAMPLE";
+static const char* const g_sampleindex1_name = "NDB$IS_SAMPLE_X1";
+
+const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
+const int ERR_TupleNotFound[] = { 626, 0 };
+
+NdbIndexStatImpl::NdbIndexStatImpl(NdbIndexStat& facade) :
+  NdbIndexStat(*this),
+  m_facade(&facade),
+  m_keyData(m_keySpec, false, 2),
+  m_valueData(m_valueSpec, false, 2)
+{
+  init();
+  m_query_mutex = NdbMutex_Create();
+  assert(m_query_mutex != 0);
+  m_mem_handler = &g_mem_default_handler;
+}
+
+void
+NdbIndexStatImpl::init()
+{
+  m_indexSet = false;
+  m_indexId = 0;
+  m_indexVersion = 0;
+  m_tableId = 0;
+  m_keyAttrs = 0;
+  m_valueAttrs = 0;
+  // buffers
+  m_keySpecBuf = 0;
+  m_valueSpecBuf = 0;
+  m_keyDataBuf = 0;
+  m_valueDataBuf = 0;
+  // cache
+  m_cacheBuild = 0;
+  m_cacheQuery = 0;
+  m_cacheClean = 0;
+  // head
+  init_head(m_facadeHead);
+}
+
+NdbIndexStatImpl::~NdbIndexStatImpl()
+{
+  reset_index();
+  if (m_query_mutex != 0)
+  {
+    NdbMutex_Destroy(m_query_mutex);
+    m_query_mutex = 0;
+  }
+}
+ 
+// sys tables meta
+
+NdbIndexStatImpl::Sys::Sys(NdbIndexStatImpl* impl, Ndb* ndb) :
+  m_impl(impl),
+  m_ndb(ndb)
+{
+  m_dic = m_ndb->getDictionary();
+  m_headtable = 0;
+  m_sampletable = 0;
+  m_sampleindex1 = 0;
+  m_obj_cnt = 0;
+}
+
+NdbIndexStatImpl::Sys::~Sys()
+{
+  m_impl->sys_release(*this);
+}
+
+void
+NdbIndexStatImpl::sys_release(Sys& sys)
+{
+  if (sys.m_headtable != 0)
+  {
+    sys.m_dic->removeTableGlobal(*sys.m_headtable, false);
+    sys.m_headtable = 0;
+  }
+  if (sys.m_sampletable != 0)
+  {
+    sys.m_dic->removeTableGlobal(*sys.m_sampletable, false);
+    sys.m_sampletable = 0;
+  }
+  if (sys.m_sampleindex1 != 0)
+  {
+    sys.m_dic->removeIndexGlobal(*sys.m_sampleindex1, false);
+    sys.m_sampleindex1 = 0;
+  }
+}
+
+int
+NdbIndexStatImpl::make_headtable(NdbDictionary::Table& tab)
+{
+  tab.setName(g_headtable_name);
+  tab.setLogging(true);
+  // key must be first
+  {
+    NdbDictionary::Column col("INDEX_ID");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setPrimaryKey(true);
+    tab.addColumn(col);
+  }
+  {
+    NdbDictionary::Column col("INDEX_VERSION");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setPrimaryKey(true);
+    tab.addColumn(col);
+  }
+  // table
+  {
+    NdbDictionary::Column col("TABLE_ID");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setNullable(false);
+    tab.addColumn(col);
+  }
+  {
+    NdbDictionary::Column col("FRAG_COUNT");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setNullable(false);
+    tab.addColumn(col);
+  }
+  // current sample
+  {
+    NdbDictionary::Column col("VALUE_FORMAT");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setNullable(false);
+    tab.addColumn(col);
+  }
+  {
+    NdbDictionary::Column col("SAMPLE_VERSION");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setNullable(false);
+    tab.addColumn(col);
+  }
+  {
+    NdbDictionary::Column col("LOAD_TIME");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setNullable(false);
+    tab.addColumn(col);
+  }
+  {
+    NdbDictionary::Column col("SAMPLE_COUNT");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setNullable(false);
+    tab.addColumn(col);
+  }
+  {
+    NdbDictionary::Column col("KEY_BYTES");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setNullable(false);
+    tab.addColumn(col);
+  }
+  NdbError error;
+  if (tab.validate(error) == -1) {
+    setError(error.code, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::make_sampletable(NdbDictionary::Table& tab)
+{
+  tab.setName(g_sampletable_name);
+  tab.setLogging(true);
+  // key must be first
+  {
+    NdbDictionary::Column col("INDEX_ID");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setPrimaryKey(true);
+    tab.addColumn(col);
+  }
+  {
+    NdbDictionary::Column col("INDEX_VERSION");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setPrimaryKey(true);
+    tab.addColumn(col);
+  }
+  {
+    NdbDictionary::Column col("SAMPLE_VERSION");
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setPrimaryKey(true);
+    tab.addColumn(col);
+  }
+  {
+    NdbDictionary::Column col("STAT_KEY");
+    col.setType(NdbDictionary::Column::Longvarbinary);
+    col.setPrimaryKey(true);
+    col.setLength(MaxKeyBytes);
+    tab.addColumn(col);
+  }
+  // value
+  {
+    NdbDictionary::Column col("STAT_VALUE");
+    col.setType(NdbDictionary::Column::Longvarbinary);
+    col.setNullable(false);
+    col.setLength(MaxValueCBytes);
+    tab.addColumn(col);
+  }
+  NdbError error;
+  if (tab.validate(error) == -1) {
+    setError(error.code, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::make_sampleindex1(NdbDictionary::Index& ind)
+{
+  ind.setTable(g_sampletable_name);
+  ind.setName(g_sampleindex1_name);
+  ind.setType(NdbDictionary::Index::OrderedIndex);
+  ind.setLogging(false);
+  ind.addColumnName("INDEX_ID");
+  ind.addColumnName("INDEX_VERSION");
+  ind.addColumnName("SAMPLE_VERSION");
+  return 0;
+}
+
+int
+NdbIndexStatImpl::check_table(const NdbDictionary::Table& tab1,
+                              const NdbDictionary::Table& tab2)
+{
+  if (tab1.getNoOfColumns() != tab2.getNoOfColumns())
+    return -1;
+  const uint n = tab1.getNoOfColumns();
+  for (uint i = 0; i < n; i++)
+  {
+    const NdbDictionary::Column* col1 = tab1.getColumn(i);
+    const NdbDictionary::Column* col2 = tab2.getColumn(i);
+    require(col1 != 0 && col2 != 0);
+    if (!col1->equal(*col2))
+      return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::check_index(const NdbDictionary::Index& ind1,
+                              const NdbDictionary::Index& ind2)
+{
+  if (ind1.getNoOfColumns() != ind2.getNoOfColumns())
+    return -1;
+  const uint n = ind1.getNoOfColumns();
+  for (uint i = 0; i < n; i++)
+  {
+    const NdbDictionary::Column* col1 = ind1.getColumn(i);
+    const NdbDictionary::Column* col2 = ind2.getColumn(i);
+    require(col1 != 0 && col2 != 0);
+    // getColumnNo() does not work on non-retrieved
+    if (!col1->equal(*col2))
+      return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::get_systables(Sys& sys)
+{
+  Ndb* ndb = sys.m_ndb;
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+  const int NoSuchTable = 723;
+  const int NoSuchIndex = 4243;
+
+  sys.m_headtable = dic->getTableGlobal(g_headtable_name);
+  if (sys.m_headtable == 0)
+  {
+    int code = dic->getNdbError().code;
+    if (code != NoSuchTable) {
+      setError(code, __LINE__);
+      return -1;
+    }
+  }
+  else
+  {
+    NdbDictionary::Table tab;
+    make_headtable(tab);
+    if (check_table(*sys.m_headtable, tab) == -1)
+    {
+      setError(BadSysTables, __LINE__);
+      return -1;
+    }
+    sys.m_obj_cnt++;
+  }
+
+  sys.m_sampletable = dic->getTableGlobal(g_sampletable_name);
+  if (sys.m_sampletable == 0)
+  {
+    int code = dic->getNdbError().code;
+    if (code != NoSuchTable) {
+      setError(code, __LINE__);
+      return -1;
+    }
+  }
+  else
+  {
+    NdbDictionary::Table tab;
+    make_sampletable(tab);
+    if (check_table(*sys.m_sampletable, tab) == -1)
+    {
+      setError(BadSysTables, __LINE__);
+      return -1;
+    }
+    sys.m_obj_cnt++;
+  }
+
+  if (sys.m_sampletable != 0)
+  {
+    sys.m_sampleindex1 = dic->getIndexGlobal(g_sampleindex1_name, *sys.m_sampletable);
+    if (sys.m_sampleindex1 == 0)
+    {
+      int code = dic->getNdbError().code;
+      if (code != NoSuchIndex) {
+        setError(code, __LINE__);
+        return -1;
+      }
+    }
+    else
+    {
+      NdbDictionary::Index ind;
+      make_sampleindex1(ind);
+      if (check_index(*sys.m_sampleindex1, ind) == -1)
+      {
+        setError(BadSysTables, __LINE__);
+        return -1;
+      }
+      sys.m_obj_cnt++;
+    }
+  }
+
+  return 0;
+}
+
+int
+NdbIndexStatImpl::create_systables(Ndb* ndb)
+{
+  Sys sys(this, ndb);
+
+  if (get_systables(sys) == -1)
+    return -1;
+
+  if (sys.m_obj_cnt == Sys::ObjCnt)
+  {
+    setError(HaveSysTables, __LINE__);
+    return -1;
+  }
+
+  if (sys.m_obj_cnt != 0)
+  {
+    setError(BadSysTables, __LINE__);
+    return -1;
+  }
+
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+  {
+    NdbDictionary::Table tab;
+    if (make_headtable(tab) == -1)
+      return -1;
+    if (dic->createTable(tab) == -1)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+
+    sys.m_headtable = dic->getTableGlobal(tab.getName());
+    if (sys.m_headtable == 0)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+  }
+
+  {
+    NdbDictionary::Table tab;
+    if (make_sampletable(tab) == -1)
+      return -1;
+    if (dic->createTable(tab) == -1)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+
+    sys.m_sampletable = dic->getTableGlobal(tab.getName());
+    if (sys.m_sampletable == 0)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+  }
+
+  {
+    NdbDictionary::Index ind;
+    if (make_sampleindex1(ind) == -1)
+      return -1;
+    if (dic->createIndex(ind, *sys.m_sampletable) == -1)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+
+    sys.m_sampleindex1 = dic->getIndexGlobal(ind.getName(), sys.m_sampletable->getName());
+    if (sys.m_sampleindex1 == 0)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+  }
+
+  return 0;
+}
+
+int
+NdbIndexStatImpl::drop_systables(Ndb* ndb)
+{
+  Sys sys(this, ndb);
+
+  if (get_systables(sys) == -1 &&
+      m_error.code != BadSysTables)
+    return -1;
+
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+  if (sys.m_headtable != 0)
+  {
+    if (dic->dropTableGlobal(*sys.m_headtable) == -1)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+  }
+
+  if (sys.m_sampletable != 0)
+  {
+    if (dic->dropTableGlobal(*sys.m_sampletable) == -1)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+  }
+    
+  return 0;
+}
+
+int
+NdbIndexStatImpl::check_systables(Ndb* ndb)
+{
+  Sys sys(this, ndb);
+
+  if (get_systables(sys) == -1)
+    return -1;
+
+  if (sys.m_obj_cnt == 0)
+  {
+    setError(NoSysTables, __LINE__);
+    return -1;
+  }
+
+  if (sys.m_obj_cnt != Sys::ObjCnt)
+  {
+    setError(BadSysTables, __LINE__);
+    return -1;
+  }
+
+  return 0;
+}
+
+// operation context
+
+NdbIndexStatImpl::Con::Con(NdbIndexStatImpl* impl, Head& head, Ndb* ndb) :
+  m_impl(impl),
+  m_head(head),
+  m_ndb(ndb)
+{
+  head.m_indexId = m_impl->m_indexId;
+  head.m_indexVersion = m_impl->m_indexVersion;
+  m_dic = m_ndb->getDictionary();
+  m_headtable = 0;
+  m_sampletable = 0;
+  m_sampleindex1 = 0;
+  m_tx = 0;
+  m_op = 0;
+  m_scanop = 0;
+  m_cacheBuild = 0;
+  m_cachePos = 0;
+  m_cacheKeyOffset = 0;
+  m_cacheValueOffset = 0;
+  m_start.seconds = 0;
+  m_start.micro_seconds = 0;
+}
+
+NdbIndexStatImpl::Con::~Con()
+{
+  if (m_cacheBuild != 0)
+  {
+    m_impl->free_cache(m_cacheBuild);
+    m_cacheBuild = 0;
+  }
+  if (m_tx != 0)
+  {
+    m_ndb->closeTransaction(m_tx);
+    m_tx = 0;
+  }
+  m_impl->sys_release(*this);
+}
+
+int
+NdbIndexStatImpl::Con::startTransaction()
+{
+  assert(m_headtable != 0 && m_ndb != 0 && m_tx == 0);
+  Uint32 key[2] = {
+    m_head.m_indexId,
+    m_head.m_indexVersion
+  };
+  m_tx = m_ndb->startTransaction(m_headtable, (const char*)key, sizeof(key));
+  if (m_tx == 0)
+    return -1;
+  return 0;
+}
+
+int
+NdbIndexStatImpl::Con::execute(bool commit)
+{
+  assert(m_tx != 0);
+  if (commit)
+  {
+    if (m_tx->execute(NdbTransaction::Commit) == -1)
+      return -1;
+    m_ndb->closeTransaction(m_tx);
+    m_tx = 0;
+  }
+  else
+  {
+    if (m_tx->execute(NdbTransaction::NoCommit) == -1)
+      return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::Con::getNdbOperation()
+{
+  assert(m_headtable != 0);
+  assert(m_tx != 0 && m_op == 0);
+  m_op = m_tx->getNdbOperation(m_headtable);
+  if (m_op == 0)
+    return -1;
+  return 0;
+}
+
+int
+NdbIndexStatImpl::Con::getNdbIndexScanOperation()
+{
+  assert(m_sampletable != 0 && m_sampleindex1 != 0);
+  assert( m_tx != 0 && m_scanop == 0);
+  m_scanop = m_tx->getNdbIndexScanOperation(m_sampleindex1, m_sampletable);
+  if (m_scanop == 0)
+    return -1;
+  return 0;
+}
+
+void
+NdbIndexStatImpl::Con::set_time()
+{
+  NdbTick_getMicroTimer(&m_start);
+}
+
+NDB_TICKS
+NdbIndexStatImpl::Con::get_time()
+{
+  MicroSecondTimer stop;
+  NdbTick_getMicroTimer(&stop);
+  NDB_TICKS us = NdbTick_getMicrosPassed(m_start, stop);
+  return us;
+}
+
+// index
+
+int
+NdbIndexStatImpl::set_index(const NdbDictionary::Index& index,
+                            const NdbDictionary::Table& table)
+{
+  if (m_indexSet)
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  m_indexId = index.getObjectId();
+  m_indexVersion = index.getObjectVersion();
+  m_tableId = table.getObjectId();
+  m_keyAttrs = index.getNoOfColumns();
+  m_valueAttrs = 1 + m_keyAttrs;
+  if (m_keyAttrs == 0)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  if (m_keyAttrs > MaxKeyCount)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+
+  // spec buffers
+  m_keySpecBuf = new NdbPack::Type [m_keyAttrs];
+  m_valueSpecBuf = new NdbPack::Type [m_valueAttrs];
+  if (m_keySpecBuf == 0 || m_valueSpecBuf == 0)
+  {
+    setError(NoMemError, __LINE__);
+    return -1;
+  }
+  m_keySpec.set_buf(m_keySpecBuf, m_keyAttrs);
+  m_valueSpec.set_buf(m_valueSpecBuf, m_valueAttrs);
+
+  // index key spec
+  {
+    for (uint i = 0; i < m_keyAttrs; i++)
+    {
+      const NdbDictionary::Column* icol = index.getColumn(i);
+      if (icol == 0)
+      {
+        setError(UsageError, __LINE__);
+        return -1;
+      }
+      NdbPack::Type type (
+        icol->getType(),
+        icol->getArrayType() + icol->getSize() * icol->getLength(),
+        icol->getNullable(),
+        icol->getCharset() != 0 ? icol->getCharset()->number : 0
+      );
+      if (m_keySpec.add(type) == -1)
+      {
+        setError(UsageError, __LINE__, m_keySpec.get_error_code());
+        return -1;
+      }
+    }
+  }
+  // stat values spec
+  {
+    NdbPack::Type type(NDB_TYPE_UNSIGNED, 4, false, 0);
+    // rir + rpk
+    if (m_valueSpec.add(type, m_valueAttrs) == -1)
+    {
+      setError(InternalError, __LINE__);
+      return -1;
+    }
+  }
+
+  // data buffers (rounded to word)
+  m_keyDataBuf = new Uint8 [m_keyData.get_max_len4()];
+  m_valueDataBuf = new Uint8 [m_valueData.get_max_len4()];
+  if (m_keyDataBuf == 0 || m_valueDataBuf == 0)
+  {
+    setError(NoMemError, __LINE__);
+    return -1;
+  }
+  m_keyData.set_buf(m_keyDataBuf, m_keyData.get_max_len());
+  m_valueData.set_buf(m_valueDataBuf, m_valueData.get_max_len());
+
+  m_indexSet = true;
+  return 0;
+}
+
+void
+NdbIndexStatImpl::reset_index()
+{
+  free_cache();
+  m_keySpec.reset();
+  m_valueSpec.reset();
+  delete [] m_keySpecBuf;
+  delete [] m_valueSpecBuf;
+  delete [] m_keyDataBuf;
+  delete [] m_valueDataBuf;
+  init();
+}
+
+// head
+
+void
+NdbIndexStatImpl::init_head(Head& head)
+{
+  head.m_found = -1;
+  head.m_indexId = 0;
+  head.m_indexVersion = 0;
+  head.m_tableId = 0;
+  head.m_fragCount = 0;
+  head.m_valueFormat = 0;
+  head.m_sampleVersion = 0;
+  head.m_loadTime = 0;
+  head.m_sampleCount = 0;
+  head.m_keyBytes = 0;
+}
+
+// sys tables data
+
+int
+NdbIndexStatImpl::sys_init(Con& con)
+{
+  Ndb* ndb = con.m_ndb;
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+  sys_release(con);
+
+  con.m_headtable = dic->getTableGlobal(g_headtable_name);
+  if (con.m_headtable == 0)
+  {
+    setError(con, __LINE__);
+    mapError(ERR_NoSuchObject, NoSysTables);
+    return -1;
+  }
+  con.m_sampletable = dic->getTableGlobal(g_sampletable_name);
+  if (con.m_sampletable == 0)
+  {
+    setError(con, __LINE__);
+    mapError(ERR_NoSuchObject, NoSysTables);
+    return -1;
+  }
+  con.m_sampleindex1 = dic->getIndexGlobal(g_sampleindex1_name, *con.m_sampletable);
+  if (con.m_sampleindex1 == 0)
+  {
+    setError(con, __LINE__);
+    mapError(ERR_NoSuchObject, NoSysTables);
+    return -1;
+  }
+  return 0;
+}
+
+void
+NdbIndexStatImpl::sys_release(Con& con)
+{
+  if (con.m_headtable != 0)
+  {
+    con.m_dic->removeTableGlobal(*con.m_headtable, false);
+    con.m_headtable = 0;
+  }
+  if (con.m_sampletable != 0)
+  {
+    con.m_dic->removeTableGlobal(*con.m_sampletable, false);
+    con.m_sampletable = 0;
+  }
+  if (con.m_sampleindex1 != 0)
+  {
+    con.m_dic->removeIndexGlobal(*con.m_sampleindex1, false);
+    con.m_sampleindex1 = 0;
+  }
+}
+
+int
+NdbIndexStatImpl::sys_read_head(Con& con, bool commit)
+{
+  Head& head = con.m_head;
+  head.m_sampleVersion = 0;
+  head.m_found = false;
+
+  if (con.getNdbOperation() == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (con.m_op->readTuple(NdbOperation::LM_Read) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (sys_head_setkey(con) == -1)
+    return -1;
+  if (sys_head_getvalue(con) == -1)
+    return -1;
+  if (con.execute(commit) == -1)
+  {
+    setError(con, __LINE__);
+    mapError(ERR_TupleNotFound, NoIndexStats);
+    return -1;
+  }
+  head.m_found = true;
+  if (head.m_sampleVersion == 0)
+  {
+    setError(NoIndexStats, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::sys_head_setkey(Con& con)
+{
+  Head& head = con.m_head;
+  NdbOperation* op = con.m_op;
+  if (op->equal("INDEX_ID", (char*)&head.m_indexId) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->equal("INDEX_VERSION", (char*)&head.m_indexVersion) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::sys_head_getvalue(Con& con)
+{
+  Head& head = con.m_head;
+  NdbOperation* op = con.m_op;
+  if (op->getValue("TABLE_ID", (char*)&head.m_tableId) == 0)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->getValue("FRAG_COUNT", (char*)&head.m_fragCount) == 0)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->getValue("VALUE_FORMAT", (char*)&head.m_valueFormat) == 0)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->getValue("SAMPLE_VERSION", (char*)&head.m_sampleVersion) == 0)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->getValue("LOAD_TIME", (char*)&head.m_loadTime) == 0)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->getValue("SAMPLE_COUNT", (char*)&head.m_sampleCount) == 0)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->getValue("KEY_BYTES", (char*)&head.m_keyBytes) == 0)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::sys_sample_setkey(Con& con)
+{
+  Head& head = con.m_head;
+  NdbIndexScanOperation* op = con.m_scanop;
+  if (op->equal("INDEX_ID", (char*)&head.m_indexId) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->equal("INDEX_VERSION", (char*)&head.m_indexVersion) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->equal("SAMPLE_VERSION", (char*)&head.m_sampleVersion) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->equal("STAT_KEY", (char*)m_keyData.get_full_buf()) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::sys_sample_getvalue(Con& con)
+{
+  NdbIndexScanOperation* op = con.m_scanop;
+  if (op->getValue("STAT_KEY", (char*)m_keyData.get_full_buf()) == 0)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->getValue("STAT_VALUE", (char*)m_valueData.get_full_buf()) == 0)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::sys_sample_setbound(Con& con, int sv_bound)
+{
+  Head& head = con.m_head;
+  NdbIndexScanOperation* op = con.m_scanop;
+  const NdbIndexScanOperation::BoundType eq_bound =
+    NdbIndexScanOperation::BoundEQ;
+
+  if (op->setBound("INDEX_ID", eq_bound, &head.m_indexId) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (op->setBound("INDEX_VERSION", eq_bound, &head.m_indexVersion) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (sv_bound != -1)
+  {
+    if (op->setBound("SAMPLE_VERSION", sv_bound, &head.m_sampleVersion) == -1)
+    {
+      setError(con, __LINE__);
+      return -1;
+    }
+  }
+  return 0;
+}
+
+// update, delete
+
+int
+NdbIndexStatImpl::update_stat(Ndb* ndb, Head& head)
+{
+  Con con(this, head, ndb);
+  if (con.m_dic->updateIndexStat(m_indexId, m_indexVersion, m_tableId) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::delete_stat(Ndb* ndb, Head& head)
+{
+  Con con(this, head, ndb);
+  if (con.m_dic->deleteIndexStat(m_indexId, m_indexVersion, m_tableId) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+// read
+
+int
+NdbIndexStatImpl::read_head(Ndb* ndb, Head& head)
+{
+  Con con(this, head, ndb);
+  if (!m_indexSet)
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  if (sys_init(con) == -1)
+    return -1;
+  if (con.startTransaction() == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (sys_read_head(con, true) == -1)
+    return -1;
+  return 0;
+}
+
+int
+NdbIndexStatImpl::read_stat(Ndb* ndb, Head& head)
+{
+  Con con(this, head, ndb);
+  con.set_time();
+
+  if (read_start(con) == -1)
+    return -1;
+  if (save_start(con) == -1)
+    return -1;
+  while (1)
+  {
+    int ret = read_next(con);
+    if (ret == -1)
+      return -1;
+    if (ret != 0)
+      break;
+    if (save_next(con) == -1)
+      return -1;
+  }
+  if (read_commit(con) == -1)
+    return -1;
+
+  NDB_TICKS save_time = con.get_time();
+  con.set_time();
+
+  if (save_commit(con) == -1)
+    return -1;
+  NDB_TICKS sort_time = con.get_time();
+
+  const Cache& c = *m_cacheBuild;
+  c.m_save_time = save_time;
+  c.m_sort_time = sort_time;
+  return 0;
+}
+
+int
+NdbIndexStatImpl::read_start(Con& con)
+{
+  Head& head = con.m_head;
+  if (!m_indexSet)
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  if (sys_init(con) == -1)
+    return -1;
+  if (con.startTransaction() == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (sys_read_head(con, false) == -1)
+    return -1;
+  if (con.getNdbIndexScanOperation() == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (con.m_scanop->readTuples(NdbOperation::LM_CommittedRead, 0) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  if (sys_sample_setbound(con, NdbIndexScanOperation::BoundEQ) == -1)
+    return -1;
+  if (sys_sample_getvalue(con) == -1)
+    return -1;
+  if (con.execute(false) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::read_next(Con& con)
+{
+  m_keyData.reset();
+  m_valueData.reset();
+  int ret = con.m_scanop->nextResult();
+  if (ret != 0)
+  {
+    if (ret == -1)
+      setError(con, __LINE__);
+    return ret;
+  }
+  // create consistent NdbPack::Data instances
+  if (m_keyData.desc_all(m_keyAttrs) == -1)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  if (m_valueData.desc_all(m_valueAttrs) == -1)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::read_commit(Con& con)
+{
+  if (con.execute(true) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+// save
+
+int
+NdbIndexStatImpl::save_start(Con& con)
+{
+  Mem* mem = m_mem_handler;
+  if (m_cacheBuild != 0)
+  {
+    free_cache(m_cacheBuild);
+    m_cacheBuild = 0;
+  }
+  con.m_cacheBuild = (Cache*)mem->mem_alloc(sizeof(Cache));
+  if (con.m_cacheBuild == 0)
+  {
+    setError(NoMemError, __LINE__);
+    return -1;
+  }
+  new (con.m_cacheBuild) Cache;
+  if (cache_init(con) == -1)
+    return -1;
+  return 0;
+}
+
+int
+NdbIndexStatImpl::save_next(Con& con)
+{
+  if (cache_insert(con) == -1)
+    return -1;
+  return 0;
+}
+
+int
+NdbIndexStatImpl::save_commit(Con& con)
+{
+  if (cache_commit(con) == -1)
+    return -1;
+  m_cacheBuild = con.m_cacheBuild;
+  con.m_cacheBuild = 0;
+  return 0;
+}
+
+// cache inline
+
+inline uint
+NdbIndexStatImpl::Cache::get_keyaddr(uint pos) const
+{
+  assert(pos < m_sampleCount);
+  const uint offset = pos * m_addrLen;
+  assert(offset + m_addrLen <= m_addrBytes);
+  const Uint8* src = &m_addrArray[offset];
+  uint addr = 0;
+  switch (m_addrLen) {
+  case 4:
+    addr += src[3] << 24;
+  case 3:
+    addr += src[2] << 16;
+  case 2:
+    addr += src[1] << 8;
+  case 1:
+    addr += src[0] << 0;
+    break;
+  default:
+    assert(false);
+  }
+  return addr;
+}
+
+inline void
+NdbIndexStatImpl::Cache::set_keyaddr(uint pos, uint addr)
+{
+  assert(pos < m_sampleCount);
+  const uint offset = pos * m_addrLen;
+  assert(offset + m_addrLen <= m_addrBytes);
+  Uint8* dst = &m_addrArray[offset];
+  switch (m_addrLen) {
+  case 4:
+    dst[3] = (addr >> 24) & 0xFF;
+  case 3:
+    dst[2] = (addr >> 16) & 0xFF;
+  case 2:
+    dst[1] = (addr >> 8) & 0xFF;
+  case 1:
+    dst[0] = (addr >> 0) & 0xFF;
+    break;
+  default:
+    assert(false);
+  }
+  assert(get_keyaddr(pos) == addr);
+}
+
+inline const Uint8*
+NdbIndexStatImpl::Cache::get_keyptr(uint addr) const
+{
+  assert(addr < m_keyBytes);
+  return &m_keyArray[addr];
+}
+
+inline Uint8*
+NdbIndexStatImpl::Cache::get_keyptr(uint addr)
+{
+  assert(addr < m_keyBytes);
+  return &m_keyArray[addr];
+}
+
+inline const Uint8*
+NdbIndexStatImpl::Cache::get_valueptr(uint pos) const
+{
+  assert(pos < m_sampleCount);
+  return &m_valueArray[pos * m_valueLen];
+}
+
+inline Uint8*
+NdbIndexStatImpl::Cache::get_valueptr(uint pos)
+{
+  assert(pos < m_sampleCount);
+  return &m_valueArray[pos * m_valueLen];
+}
+
+inline void
+NdbIndexStatImpl::Cache::swap_entry(uint pos1, uint pos2)
+{
+  uint hold_addr;
+  Uint8 hold_value[MaxValueBytes];
+
+  hold_addr = get_keyaddr(pos1);
+  memcpy(hold_value, get_valueptr(pos1), m_valueLen);
+  set_keyaddr(pos1, get_keyaddr(pos2));
+  memcpy(get_valueptr(pos1), get_valueptr(pos2), m_valueLen);
+  set_keyaddr(pos2, hold_addr);
+  memcpy(get_valueptr(pos2), hold_value, m_valueLen);
+}
+
+inline double
+NdbIndexStatImpl::Cache::get_rir(uint pos) const
+{
+  const Uint8* ptr = get_valueptr(pos);
+  Uint32 n;
+  memcpy(&n, &ptr[0], 4);
+  double x = (double)m_fragCount * (double)n;
+  return x;
+}
+
+inline double
+NdbIndexStatImpl::Cache::get_rir(uint pos1, uint pos2) const
+{
+  assert(pos2 > pos1);
+  return get_rir(pos2) - get_rir(pos1);
+}
+
+inline double
+NdbIndexStatImpl::Cache::get_unq(uint pos, uint k) const
+{
+  assert(k < m_keyAttrs);
+  const Uint8* ptr = get_valueptr(pos);
+  Uint32 n;
+  memcpy(&n, &ptr[4 + k * 4], 4);
+  double x = (double)m_fragCount * (double)n;
+  return x;
+}
+
+inline double
+NdbIndexStatImpl::Cache::get_unq(uint pos1, uint pos2, uint k) const
+{
+  assert(pos2 > pos1);
+  return get_unq(pos2, k) - get_unq(pos1, k);
+}
+
+inline double
+NdbIndexStatImpl::Cache::get_rpk(uint pos, uint k) const
+{
+  return get_rir(pos) / get_unq(pos, k);
+}
+
+inline double
+NdbIndexStatImpl::Cache::get_rpk(uint pos1, uint pos2, uint k) const
+{
+  assert(pos2 > pos1);
+  return get_rir(pos1, pos2) / get_unq(pos1, pos2, k);
+}
+
+// cache
+
+NdbIndexStatImpl::Cache::Cache()
+{
+  m_valid = false;
+  m_keyAttrs = 0;
+  m_valueAttrs = 0;
+  m_fragCount = 0;
+  m_sampleVersion = 0;
+  m_sampleCount = 0;
+  m_keyBytes = 0;
+  m_valueLen = 0;
+  m_valueBytes = 0;
+  m_addrLen = 0;
+  m_addrBytes = 0;
+  m_addrArray = 0;
+  m_keyArray = 0;
+  m_valueArray = 0;
+  m_nextClean = 0;
+  // performance
+  m_save_time = 0;
+  m_sort_time = 0;
+}
+
+int
+NdbIndexStatImpl::cache_init(Con& con)
+{
+  Cache& c = *con.m_cacheBuild;
+  Head& head = con.m_head;
+  Mem* mem = m_mem_handler;
+
+  if (m_keyAttrs == 0)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  c.m_keyAttrs = m_keyAttrs;
+  c.m_valueAttrs = m_valueAttrs;
+  c.m_fragCount = head.m_fragCount;
+  c.m_sampleCount = head.m_sampleCount;
+  c.m_keyBytes = head.m_keyBytes;
+  c.m_valueLen = 4 + c.m_keyAttrs * 4;
+  c.m_valueBytes = c.m_sampleCount * c.m_valueLen;
+  c.m_addrLen =
+    c.m_keyBytes < (1 << 8) ? 1 :
+    c.m_keyBytes < (1 << 16) ? 2 :
+    c.m_keyBytes < (1 << 24) ? 3 : 4;
+  c.m_addrBytes = c.m_sampleCount * c.m_addrLen;
+
+  // wl4124_todo omit addrArray if keys have fixed size
+  c.m_addrArray = (Uint8*)mem->mem_alloc(c.m_addrBytes);
+  if (c.m_addrArray == 0)
+  {
+    setError(NoMemError, __LINE__);
+    return -1;
+  }
+  c.m_keyArray = (Uint8*)mem->mem_alloc(c.m_keyBytes);
+  if (c.m_keyArray == 0)
+  {
+    setError(NoMemError, __LINE__);
+    return -1;
+  }
+  c.m_valueArray = (Uint8*)mem->mem_alloc(c.m_valueBytes);
+  if (c.m_valueArray == 0)
+  {
+    setError(NoMemError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::cache_insert(Con& con)
+{
+  Cache& c = *con.m_cacheBuild;
+
+  const uint nextPos = con.m_cachePos + 1;
+  if (nextPos > c.m_sampleCount)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  assert(m_keyData.is_full());
+  const uint keyLen = m_keyData.get_data_len();
+  const uint nextKeyOffset = con.m_cacheKeyOffset + keyLen;
+  if (nextKeyOffset > c.m_keyBytes)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  if (m_valueData.get_data_len() != c.m_valueLen)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  const uint nextValueOffset = con.m_cacheValueOffset + c.m_valueLen;
+  if (nextValueOffset > c.m_valueBytes)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+
+  c.set_keyaddr(con.m_cachePos, con.m_cacheKeyOffset);
+  con.m_cachePos = nextPos;
+
+  Uint8* cacheKeyPtr = &c.m_keyArray[con.m_cacheKeyOffset];
+  const Uint8* keyPtr = (const Uint8*)m_keyData.get_data_buf();
+  memcpy(cacheKeyPtr, keyPtr, keyLen);
+  con.m_cacheKeyOffset = nextKeyOffset;
+
+  Uint8* cacheValuePtr = &c.m_valueArray[con.m_cacheValueOffset];
+  const Uint8* valuePtr = (const Uint8*)m_valueData.get_data_buf();
+  memcpy(cacheValuePtr, valuePtr, c.m_valueLen);
+  con.m_cacheValueOffset = nextValueOffset;
+
+  // verify sanity
+  {
+    const Uint8* rir_ptr = &cacheValuePtr[0];
+    Uint32 rir;
+    memcpy(&rir, rir_ptr, 4);
+    assert(rir != 0);
+    Uint32 unq_prev = 0;
+    for (uint k = 0; k < c.m_keyAttrs; k++)
+    {
+      Uint8* unq_ptr = &cacheValuePtr[4 + k * 4];
+      Uint32 unq;
+      memcpy(&unq, unq_ptr, 4);
+      assert(unq != 0);
+      assert(rir >= unq);
+      assert(unq >= unq_prev);
+      unq_prev = unq;
+    }
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::cache_commit(Con& con)
+{
+  Cache& c = *con.m_cacheBuild;
+  Head& head = con.m_head;
+  if (con.m_cachePos != c.m_sampleCount)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  if (con.m_cacheKeyOffset != c.m_keyBytes)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  if (con.m_cacheValueOffset != c.m_valueBytes)
+  {
+    setError(InternalError, __LINE__);
+    return -1;
+  }
+  c.m_sampleVersion = head.m_sampleVersion;
+  if (cache_sort(c) == -1)
+    return -1;
+  if (cache_verify(c) == -1)
+    return -1;
+  c.m_valid = true;
+  return 0;
+}
+
+int
+NdbIndexStatImpl::cache_cmpaddr(const Cache& c, uint addr1, uint addr2) const
+{
+  const Uint8* key1 = c.get_keyptr(addr1);
+  const Uint8* key2 = c.get_keyptr(addr2);
+
+  NdbPack::DataC keyData1(m_keySpec, false);
+  NdbPack::DataC keyData2(m_keySpec, false);
+  keyData1.set_buf(key1, c.m_keyBytes - addr1, c.m_keyAttrs);
+  keyData2.set_buf(key2, c.m_keyBytes - addr2, c.m_keyAttrs);
+
+  Uint32 num_eq;
+  int res = keyData1.cmp(keyData2, c.m_keyAttrs, num_eq);
+  assert(addr1 == addr2 || res != 0);
+  return res;
+}
+
+int
+NdbIndexStatImpl::cache_cmppos(const Cache& c, uint pos1, uint pos2) const
+{
+  uint addr1 = c.get_keyaddr(pos1);
+  uint addr2 = c.get_keyaddr(pos2);
+  return cache_cmpaddr(c, addr1, addr2);
+}
+
+/*
+ * Sort addr and value arrays via key values.  The samples were inserted
+ * in key order and were read back via index scan so they may be nearly
+ * ordered at first.  This is quicksort worst case so we do not use it.
+ */
+int
+NdbIndexStatImpl::cache_sort(Cache& c)
+{
+  if (c.m_sampleCount > 1)
+    cache_hsort(c);
+  return 0;
+}
+
+// insertion sort - expensive
+void
+NdbIndexStatImpl::cache_isort(Cache& c)
+{
+  int n = c.m_sampleCount;
+  for (int i = 1; i < n; i++)
+  {
+    for (int j = i - 1; j >= 0; j--)
+    {
+      int res = cache_cmppos(c, j, j + 1);
+      if (res < 0)
+        break;
+      c.swap_entry(j, j + 1);
+    }
+  }
+}
+
+// heapsort
+void
+NdbIndexStatImpl::cache_hsort(Cache& c)
+{
+  int count = c.m_sampleCount;
+  int i;
+
+  // highest entry which can have children
+  i = count / 2;
+
+  // make into heap (binary tree where child < parent)
+  while (i >= 0)
+  {
+    cache_hsort_sift(c, i, count);
+    i--;
+  }
+
+  // verify is too expensive to enable under VM_TRACE
+
+#ifdef ndb_index_stat_hsort_verify
+  cache_hsort_verify(c, count);
+#endif
+
+  // sort
+  i = count - 1;
+  while (i > 0)
+  {
+    // move current max to proper position
+    c.swap_entry(0, i);
+
+    // restore heap property for the rest
+    cache_hsort_sift(c, 0, i);
+#ifdef ndb_index_stat_hsort_verify
+    cache_hsort_verify(c, i);
+#endif
+    i--;
+  }
+}
+
+void
+NdbIndexStatImpl::cache_hsort_sift(Cache& c, int i, int count)
+{
+  int parent = i;
+
+  while (1)
+  {
+    // left child if any
+    int child = parent * 2 + 1;
+    if (! (child < count))
+      break;
+
+    // replace by right child if bigger
+    if (child + 1 < count && cache_cmppos(c, child, child + 1) < 0)
+      child = child + 1;
+
+    // done if both children are less than parent
+    if (cache_cmppos(c, child, parent) < 0)
+      break;
+
+    c.swap_entry(parent, child);
+    parent = child;
+  }
+}
+
+// verify heap property
+void
+NdbIndexStatImpl::cache_hsort_verify(Cache& c, int count)
+{
+  for (int i = 0; i < count; i++)
+  {
+    int parent = i;
+    int child1 = 2 * i + 1;
+    int child2 = 2 * i + 2;
+    if (child1 < count)
+    {
+      assert(cache_cmppos(c, child1, parent) < 0);
+    }
+    if (child2 < count)
+    {
+      assert(cache_cmppos(c, child2, parent) < 0);
+    }
+  }
+}
+
+int
+NdbIndexStatImpl::cache_verify(const Cache& c)
+{
+  for (uint pos1 = 0; pos1 < c.m_sampleCount; pos1++)
+  {
+    const uint addr1 = c.get_keyaddr(pos1);
+    const Uint8* key1 = c.get_keyptr(addr1);
+    NdbPack::DataC keyData1(m_keySpec, false);
+    keyData1.set_buf(key1, c.m_keyBytes - addr1, c.m_keyAttrs);
+    uint pos2 = pos1 + 1;
+    if (pos2 < c.m_sampleCount)
+    {
+      const uint addr2 = c.get_keyaddr(pos2);
+      const Uint8* key2 = c.get_keyptr(addr2);
+      NdbPack::DataC keyData2(m_keySpec, false);
+      keyData2.set_buf(key2, c.m_keyBytes - addr2, c.m_keyAttrs);
+      Uint32 num_eq;
+      int res = keyData1.cmp(keyData2, c.m_keyAttrs, num_eq);
+      if (!(res < 0))
+      {
+        setError(InvalidCache, __LINE__);
+        return -1;
+      }
+    }
+  }
+  return 0;
+}
+
+void
+NdbIndexStatImpl::move_cache()
+{
+  Cache* cacheTmp = m_cacheQuery;
+
+  NdbMutex_Lock(m_query_mutex);
+  m_cacheQuery = m_cacheBuild;
+  NdbMutex_Unlock(m_query_mutex);
+  m_cacheBuild = 0;
+
+  if (cacheTmp != 0)
+  {
+    cacheTmp->m_nextClean = m_cacheClean;
+    m_cacheClean = cacheTmp;
+  }
+}
+
+void
+NdbIndexStatImpl::clean_cache()
+{
+  while (m_cacheClean != 0)
+  {
+    NdbIndexStatImpl::Cache* tmp = m_cacheClean;
+    m_cacheClean = tmp->m_nextClean;
+    free_cache(tmp);
+  }
+}
+
+void
+NdbIndexStatImpl::free_cache(Cache* c)
+{
+  Mem* mem = m_mem_handler;
+  mem->mem_free(c->m_addrArray);
+  mem->mem_free(c->m_keyArray);
+  mem->mem_free(c->m_valueArray);
+  mem->mem_free(c);
+}
+
+void
+NdbIndexStatImpl::free_cache()
+{
+  // twice to move all to clean list
+  move_cache();
+  move_cache();
+  clean_cache();
+}
+
+// cache dump
+
+NdbIndexStatImpl::CacheIter::CacheIter(const NdbIndexStatImpl& impl) :
+  m_keyData(impl.m_keySpec, false),
+  m_valueData(impl.m_valueSpec, false)
+{
+  m_keyCount = impl.m_keyAttrs;
+  m_sampleCount = 0;
+  m_sampleIndex = 0;
+}
+
+int
+NdbIndexStatImpl::dump_cache_start(CacheIter& iter)
+{
+  if (m_cacheQuery == 0)
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  const Cache& c = *m_cacheQuery;
+  new (&iter) CacheIter(*this);
+  iter.m_sampleCount = c.m_sampleCount;
+  iter.m_sampleIndex = ~(Uint32)0;
+  return 0;
+}
+
+bool
+NdbIndexStatImpl::dump_cache_next(CacheIter& iter)
+{
+  if (iter.m_sampleIndex == ~(Uint32)0)
+    iter.m_sampleIndex = 0;
+  else
+    iter.m_sampleIndex++;
+  if (iter.m_sampleIndex >= iter.m_sampleCount)
+    return false;
+  const Cache& c = *m_cacheQuery;
+  const uint pos = iter.m_sampleIndex;
+  const uint addr = c.get_keyaddr(pos);
+  const Uint8* key = c.get_keyptr(addr);
+  const Uint8* value = c.get_valueptr(pos);
+  iter.m_keyData.set_buf(key, c.m_keyBytes - addr, c.m_keyAttrs);
+  iter.m_valueData.set_buf(value, c.m_valueLen, c.m_valueAttrs);
+  return true;
+}
+
+// bound
+
+int
+NdbIndexStatImpl::finalize_bound(Bound& bound)
+{
+  assert(bound.m_type == 0 || bound.m_type == 1);
+  int side = 0;
+  if (bound.m_data.get_cnt() == 0)
+  {
+    if (bound.m_strict != -1)
+    {
+      setError(UsageError, __LINE__);
+      return -1;
+    }
+  }
+  else
+  {
+    if (bound.m_strict == -1)
+    {
+      setError(UsageError, __LINE__);
+      return -1;
+    }
+    if (bound.m_type == 0)
+      side = bound.m_strict ? +1 : -1;
+    else
+      side = bound.m_strict ? -1 : +1;
+  }
+  if (bound.m_bound.finalize(side) == -1)
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+// range
+
+int
+NdbIndexStatImpl::convert_range(Range& range,
+                                const NdbRecord* key_record,
+                                const NdbIndexScanOperation::IndexBound* ib)
+{
+  if (ib == 0)
+    return 0;
+  if (ib->low_key_count == 0 && ib->high_key_count == 0)
+    return 0;
+  for (uint j = 0; j <= 1; j++)
+  {
+    Bound& bound = j == 0 ? range.m_bound1 : range.m_bound2;
+    bound.m_bound.reset();
+    const char* key = j == 0 ? ib->low_key : ib->high_key;
+    const uint key_count = j == 0 ? ib->low_key_count : ib->high_key_count;
+    const bool inclusive = j == 0 ? ib->low_inclusive : ib->high_inclusive;
+    Uint32 len_out;
+    for (uint i = 0; i < key_count; i++)
+    {
+      const NdbRecord::Attr& attr = key_record->columns[i];
+      if (!attr.is_null(key))
+      {
+        const char* data = key + attr.offset;
+        char buf[256];
+        if (attr.flags & NdbRecord::IsMysqldShrinkVarchar)
+        {
+          Uint32 len;
+          if (!attr.shrink_varchar(key, len, buf))
+          {
+            setError(InternalError, __LINE__);
+            return -1;
+          }
+          data = buf;
+        }
+        if (bound.m_data.add(data, &len_out) == -1)
+        {
+          setError(InternalError, __LINE__);
+          return -1;
+        }
+      }
+      else
+      {
+        if (bound.m_data.add_null(&len_out) == -1)
+        {
+          setError(InternalError, __LINE__);
+          return -1;
+        }
+      }
+    }
+    if (key_count > 0)
+      bound.m_strict = !inclusive;
+    if (finalize_bound(bound) == -1)
+    {
+      setError(InternalError, __LINE__);
+      return -1;
+    }
+  }
+  return 0;
+}
+
+// query
+
+// normalize values to >= 1.0
+void
+NdbIndexStatImpl::query_normalize(const Cache& c, StatValue& value)
+{
+  if (!value.m_empty)
+  {
+    if (value.m_rir < 1.0)
+      value.m_rir = 1.0;
+    for (uint k = 0; k < c.m_keyAttrs; k++)
+    {
+      if (value.m_unq[k] < 1.0)
+        value.m_unq[k] = 1.0;
+    }
+  }
+  else
+  {
+    value.m_rir = 1.0;
+    for (uint k = 0; k < c.m_keyAttrs; k++)
+      value.m_unq[k] = 1.0;
+  }
+}
+
+int
+NdbIndexStatImpl::query_stat(const Range& range, Stat& stat)
+{
+  NdbMutex_Lock(m_query_mutex);
+  const Cache* cacheTmp = m_cacheQuery;
+  NdbMutex_Unlock(m_query_mutex);
+
+  if (unlikely(cacheTmp == 0))
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  const Cache& c = *cacheTmp;
+  if (unlikely(!c.m_valid))
+  {
+    setError(InvalidCache, __LINE__);
+    return -1;
+  }
+
+  query_interpolate(c, range, stat);
+  query_normalize(c, stat.m_value);
+  return 0;
+}
+
+void
+NdbIndexStatImpl::query_interpolate(const Cache& c,
+                                    const Range& range,
+                                    Stat& stat)
+{
+  const uint keyAttrs = c.m_keyAttrs;
+  StatValue& value = stat.m_value;
+  value.m_empty = false;
+  stat.m_rule[0] = "-";
+  stat.m_rule[1] = "-";
+  stat.m_rule[2] = "-";
+
+  if (c.m_sampleCount == 0)
+  {
+    stat.m_rule[0] = "r1.1";
+    value.m_empty = true;
+    return;
+  }
+  const uint posMIN = 0;
+  const uint posMAX = c.m_sampleCount - 1;
+
+  const Bound& bound1 = range.m_bound1;
+  const Bound& bound2 = range.m_bound2;
+  if (bound1.m_data.is_empty() && bound2.m_data.is_empty())
+  {
+    stat.m_rule[0] = "r1.2";
+    value.m_rir = c.get_rir(posMAX);
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = c.get_unq(posMAX, k);
+    return;
+  }
+
+  StatBound& stat1 = stat.m_stat1;
+  StatBound& stat2 = stat.m_stat2;
+  if (!bound1.m_data.is_empty())
+  {
+    query_interpolate(c, bound1, stat1);
+    query_normalize(c, stat1.m_value);
+    stat.m_rule[1] = stat1.m_rule;
+  }
+  if (!bound2.m_data.is_empty())
+  {
+    query_interpolate(c, bound2, stat2);
+    query_normalize(c, stat2.m_value);
+    stat.m_rule[2] = stat2.m_rule;
+  }
+
+  const StatValue& value1 = stat1.m_value;
+  const StatValue& value2 = stat2.m_value;
+  const uint posL1 = stat1.m_pos - 1; // invalid if posH1 == posMIN
+  const uint posH1 = stat1.m_pos;
+  const uint posL2 = stat2.m_pos - 1; // invalid if posH2 == posMIN
+  const uint posH2 = stat2.m_pos;
+  const uint cnt1 = bound1.m_data.get_cnt();
+  const uint cnt2 = bound2.m_data.get_cnt();
+  const int side1 = bound1.m_bound.get_side();
+  const int side2 = bound2.m_bound.get_side();
+  const uint mincnt = min(cnt1, cnt2);
+  Uint32 numEq = 0; // of bound1,bound2
+
+  if (bound1.m_data.is_empty())
+  {
+    stat.m_rule[0] = "r1.3";
+    value.m_rir = value2.m_rir;
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = value2.m_unq[k];
+    return;
+  }
+  if (bound2.m_data.is_empty())
+  {
+    stat.m_rule[0] = "r1.4";
+    value.m_rir = c.get_rir(posMAX) - value1.m_rir;
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = c.get_unq(posMAX, k) - value1.m_unq[k];
+    return;
+  }
+  if (posH1 > posH2)
+  {
+    stat.m_rule[0] = "r1.5";
+    value.m_empty = true;
+    return;
+  }
+  // also returns number of equal initial components
+  if (bound1.m_bound.cmp(bound2.m_bound, mincnt, numEq) >= 0)
+  {
+    stat.m_rule[0] = "r1.6";
+    value.m_empty = true;
+    return;
+  }
+  if (posH1 == posMIN)
+  {
+    stat.m_rule[0] = "r1.7";
+    value.m_rir = value2.m_rir - value1.m_rir;
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
+    return;
+  }
+  if (posH2 == posMAX + 1)
+  {
+    stat.m_rule[0] = "r1.8";
+    value.m_rir = value2.m_rir - value1.m_rir;
+    for (uint k = 0; k <= keyAttrs; k++)
+      value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
+    return;
+  }
+  if (posL1 == posL2)
+  {
+    assert(posH1 == posH2);
+    if (cnt1 == keyAttrs &&
+        cnt2 == keyAttrs &&
+        numEq == keyAttrs) {
+      stat.m_rule[0] = "r2.1";
+      assert(side1 == -1 && side2 == +1);
+      assert(stat1.m_numEqL < keyAttrs && stat2.m_numEqH < keyAttrs);
+      value.m_rir = c.get_rpk(posL1, posH1, keyAttrs - 1);
+      for (uint k = 0; k < keyAttrs; k++)
+        value.m_unq[k] = value.m_rir / c.get_rpk(posL1, posH1, k);
+      return;
+    }
+    if (numEq != 0)
+    {
+      stat.m_rule[0] = "r2.2";
+      // skip for now
+    }
+    if (true)
+    {
+      stat.m_rule[0] = "r2.3";
+      const double w = 0.5;
+      value.m_rir = w * c.get_rir(posL1, posH1);
+      for (uint k = 0; k < keyAttrs; k++)
+        value.m_unq[k] = w * c.get_unq(posL1, posH1, k);
+      return;
+    }
+  }
+  if (posH1 == posL2)
+  {
+    if (cnt1 == keyAttrs &&
+        cnt2 == keyAttrs &&
+        numEq == keyAttrs) {
+      stat.m_rule[0] = "r3.1";
+      assert(side1 == -1 && side2 == +1);
+      assert(stat1.m_numEqH == keyAttrs && stat2.m_numEqL == keyAttrs);
+      value.m_rir = value2.m_rir - value1.m_rir;
+      for (uint k = 0; k < keyAttrs; k++)
+        value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
+      return;
+    }
+    if (numEq != 0)
+    {
+      stat.m_rule[0] = "r3.2";
+      // skip for now
+    }
+    if (true)
+    {
+      stat.m_rule[0] = "r3.3";
+      const double w = 0.5;
+      value.m_rir = w * c.get_rir(posL1, posH1);
+      for (uint k = 0; k < keyAttrs; k++)
+        value.m_unq[k] = w * c.get_unq(posL1, posH1, k);
+      return;
+    }
+  }
+  if (true)
+  {
+    stat.m_rule[0] = "r4";
+    value.m_rir = value2.m_rir - value1.m_rir;
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = value2.m_unq[k] - value1.m_unq[k];
+    return;
+  }
+}
+
+void
+NdbIndexStatImpl::query_interpolate(const Cache& c,
+                                    const Bound& bound,
+                                    StatBound& stat)
+{
+  const uint keyAttrs = c.m_keyAttrs;
+  StatValue& value = stat.m_value;
+  value.m_empty = false;
+  stat.m_rule = "-";
+
+  query_search(c, bound, stat);
+
+  const uint posMIN = 0;
+  const uint posMAX = c.m_sampleCount - 1;
+  const uint posL = stat.m_pos - 1; // invalid if posH == posMIN
+  const uint posH = stat.m_pos;
+  const uint cnt = bound.m_data.get_cnt();
+  const int side = bound.m_bound.get_side();
+
+  if (posH == posMIN)
+  {
+    if (cnt == keyAttrs &&
+        cnt == stat.m_numEqH) {
+      stat.m_rule = "b1.1";
+      assert(side == -1);
+      value.m_rir = c.get_rir(posMIN) - c.get_rpk(posMIN, keyAttrs - 1);
+      for (uint k = 0; k < keyAttrs; k++)
+        value.m_unq[k] = c.get_unq(posMIN, k) - 1;
+      return;
+    }
+    if (true)
+    {
+      stat.m_rule = "b1.2";
+      value.m_empty = true;
+      return;
+    }
+  }
+  if (posH == posMAX + 1)
+  {
+    stat.m_rule = "b2";
+    value.m_rir = c.get_rir(posMAX);
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = c.get_unq(posMAX, k);
+    return;
+  }
+  if (cnt == keyAttrs &&
+      cnt == stat.m_numEqL) {
+    stat.m_rule = "b3.1";
+    assert(side == +1);
+    value.m_rir = c.get_rir(posL);
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = c.get_unq(posL, k);
+    return;
+  }
+  if (cnt == keyAttrs &&
+      cnt == stat.m_numEqH &&
+      side == +1) {
+    stat.m_rule = "b3.2";
+    value.m_rir = c.get_rir(posH);
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = c.get_unq(posH, k);
+    return;
+  }
+  if (cnt == keyAttrs &&
+      cnt == stat.m_numEqH &&
+      side == -1) {
+    stat.m_rule = "b3.3";
+    const double u = 1.0 + c.get_unq(posL, posH, keyAttrs - 1);
+    const double wL = 1.0 / u;
+    const double wH = 1.0 - wL;
+    value.m_rir = wL * c.get_rir(posL) + wH * c.get_rir(posH);
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = wL * c.get_unq(posL, k) + wH * c.get_unq(posH, k);
+    return;
+  }
+  if (true)
+  {
+    stat.m_rule = "b4";
+    const double wL = 0.5;
+    const double wH = 0.5;
+    value.m_rir = wL * c.get_rir(posL) + wH * c.get_rir(posH);
+    for (uint k = 0; k < keyAttrs; k++)
+      value.m_unq[k] = wL * c.get_unq(posL, k) + wH * c.get_unq(posH, k);
+    return;
+  }
+}
+
+void
+NdbIndexStatImpl::query_search(const Cache& c,
+                               const Bound& bound,
+                               StatBound& stat)
+{
+  assert(c.m_sampleCount > 0);
+  assert(!bound.m_data.is_empty());
+  Uint32 numEq;
+
+  int lo = -1;
+  int hi = c.m_sampleCount;
+  while (hi - lo > 1)
+  {
+    int j = (hi + lo) / 2;
+    assert(lo < j && j < hi);
+    int res = query_keycmp(c, bound, j, numEq);
+    if (res < 0)
+      lo = j;
+    else if (res > 0)
+      hi = j;
+    else
+    {
+      assert(false);
+      return;
+    }
+  }
+  assert(hi - lo == 1);
+  stat.m_pos = hi;
+
+  if (stat.m_pos > 0)
+  {
+    (void)query_keycmp(c, bound, stat.m_pos - 1, stat.m_numEqL);
+  }
+  if (stat.m_pos < c.m_sampleCount)
+  {
+    (void)query_keycmp(c, bound, stat.m_pos, stat.m_numEqH);
+  }
+}
+
+// return <0/>0 for key before/after bound
+int
+NdbIndexStatImpl::query_keycmp(const Cache& c,
+                               const Bound& bound,
+                               uint pos, Uint32& numEq)
+{
+  const uint addr = c.get_keyaddr(pos);
+  const Uint8* key = c.get_keyptr(addr);
+  NdbPack::DataC keyData(m_keySpec, false);
+  keyData.set_buf(key, c.m_keyBytes - addr, c.m_keyAttrs);
+  // reverse result for key vs bound
+  Uint32 cnt = bound.m_bound.get_data().get_cnt();
+  int res = (-1) * bound.m_bound.cmp(keyData, cnt, numEq);
+  return res;
+}
+
+// mem alloc - default impl
+
+NdbIndexStatImpl::MemDefault
+NdbIndexStatImpl::g_mem_default_handler;
+
+NdbIndexStatImpl::MemDefault::MemDefault()
+{
+  m_used = 0;
+}
+
+NdbIndexStatImpl::MemDefault::~MemDefault()
+{
+  assert(m_used == 0);
+}
+
+void*
+NdbIndexStatImpl::MemDefault::mem_alloc(size_t size)
+{
+  if (size == 0 || size % 4 != 0)
+  {
+    size += 4 - size % 4;
+  }
+  Item* item = (Item*)my_malloc(sizeof(Item) + size, MYF(0));
+  if (item != 0)
+  {
+    item->m_magic = MemMagic;
+    item->m_size = size;
+    void* ptr = &item[1];
+    m_used += size;
+    return ptr;
+  }
+  return 0;
+}
+
+void
+NdbIndexStatImpl::MemDefault::mem_free(void* ptr)
+{
+  if (ptr != 0)
+  {
+    Item* item = (Item*)ptr - 1;
+    assert(item->m_magic == MemMagic);
+    Uint32 size = item->m_size;
+    item->m_magic = 0;
+    my_free(item, MYF(0));
+    assert(m_used >= size);
+    m_used -= size;
+  }
+}
+
+size_t
+NdbIndexStatImpl::MemDefault::mem_used() const
+{
+  return m_used;
+}
+
+// error
+
+void
+NdbIndexStatImpl::setError(int code, int line, int extra)
+{
+  if (code == 0)
+    code = InternalError;
+  m_error.code = code;
+  m_error.line = line;
+  m_error.extra = extra;
+#ifdef VM_TRACE
+  const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_ON_ERROR", (char*)0, 0);
+  if (p != 0 && strchr("1Y", p[0]) != 0)
+    abort();
+#endif
+}
+
+void
+NdbIndexStatImpl::setError(const Con& con, int line)
+{
+  int code = 0;
+  if (code == 0 && con.m_op != 0)
+  {
+    code = con.m_op->getNdbError().code;
+  }
+  if (code == 0 && con.m_scanop != 0)
+  {
+    code = con.m_scanop->getNdbError().code;
+  }
+  if (code == 0 && con.m_tx != 0)
+  {
+    code = con.m_tx->getNdbError().code;
+  }
+  if (code == 0 && con.m_dic != 0)
+  {
+    code = con.m_dic->getNdbError().code;
+  }
+  if (code == 0 && con.m_ndb != 0)
+  {
+    code = con.m_ndb->getNdbError().code;
+  }
+  setError(code, line);
+}
+
+void
+NdbIndexStatImpl::mapError(const int* map, int code)
+{
+  while (*map != 0)
+  {
+    if (m_error.code == *map) {
+      m_error.code = code;
+      break;
+    }
+    map++;
+  }
+}

=== added file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	2011-06-06 12:18:27 +0000
@@ -0,0 +1,350 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef NDB_INDEX_STAT_IMPL_HPP
+#define NDB_INDEX_STAT_IMPL_HPP
+
+#include <ndb_global.h>
+#include <ndb_limits.h>
+#include <NdbDictionary.hpp>
+#include <NdbIndexStat.hpp>
+#include <util/NdbPack.hpp>
+#include <NdbError.hpp>
+#include <NdbMutex.h>
+#include <NdbTick.h>
+class Ndb;
+class NdbTransaction;
+class NdbIndexScanOperation;
+class NdbRecAttr;
+class NdbOperation;
+
+struct NdbIndexStatImpl : public NdbIndexStat {
+  friend class NdbIndexStat;
+  struct Con;
+  struct Cache;
+
+  enum { MaxKeyCount = MAX_INDEX_STAT_KEY_COUNT };
+  enum { MaxKeyBytes = MAX_INDEX_STAT_KEY_SIZE * 4 };
+  enum { MaxValueBytes = MAX_INDEX_STAT_VALUE_SIZE * 4 };
+  enum { MaxValueCBytes = MAX_INDEX_STAT_VALUE_CSIZE * 4 };
+
+  NdbIndexStatImpl(NdbIndexStat& facade);
+  ~NdbIndexStatImpl();
+  void init();
+
+  NdbIndexStat* const m_facade;
+  Head m_facadeHead; // owned by facade
+  bool m_indexSet;
+  Uint32 m_indexId;
+  Uint32 m_indexVersion;
+  Uint32 m_tableId;
+  uint m_keyAttrs;
+  uint m_valueAttrs;
+  NdbPack::Spec m_keySpec;
+  NdbPack::Spec m_valueSpec;
+  NdbPack::Type* m_keySpecBuf;
+  NdbPack::Type* m_valueSpecBuf;
+  NdbPack::Data m_keyData;
+  NdbPack::Data m_valueData;
+  Uint8* m_keyDataBuf;
+  Uint8* m_valueDataBuf;
+  Cache* m_cacheBuild;
+  Cache* m_cacheQuery;
+  Cache* m_cacheClean;
+  // mutex for query cache switch, memory barrier would do
+  NdbMutex* m_query_mutex;
+  Mem* m_mem_handler;
+  NdbIndexStat::Error m_error;
+
+  // sys tables meta
+  struct Sys {
+    NdbIndexStatImpl* const m_impl;
+    Ndb* const m_ndb;
+    NdbDictionary::Dictionary* m_dic;
+    const NdbDictionary::Table* m_headtable;
+    const NdbDictionary::Table* m_sampletable;
+    const NdbDictionary::Index* m_sampleindex1;
+    int m_obj_cnt;
+    enum { ObjCnt = 3 };
+    Sys(NdbIndexStatImpl* impl, Ndb* ndb);
+    ~Sys();
+  };
+  void sys_release(Sys& sys);
+  int make_headtable(NdbDictionary::Table& tab);
+  int make_sampletable(NdbDictionary::Table& tab);
+  int make_sampleindex1(NdbDictionary::Index& ind);
+  int check_table(const NdbDictionary::Table& tab1,
+                  const NdbDictionary::Table& tab2);
+  int check_index(const NdbDictionary::Index& ind1,
+                  const NdbDictionary::Index& ind2);
+  int get_systables(Sys& sys);
+  int create_systables(Ndb* ndb);
+  int drop_systables(Ndb* ndb);
+  int check_systables(Ndb* ndb);
+
+  // operation context
+  struct Con {
+    NdbIndexStatImpl* const m_impl;
+    Head& m_head;
+    Ndb* const m_ndb;
+    NdbDictionary::Dictionary* m_dic;
+    const NdbDictionary::Table* m_headtable;
+    const NdbDictionary::Table* m_sampletable;
+    const NdbDictionary::Index* m_sampleindex1;
+    NdbTransaction* m_tx;
+    NdbOperation* m_op;
+    NdbIndexScanOperation* m_scanop;
+    Cache* m_cacheBuild;
+    uint m_cachePos;
+    uint m_cacheKeyOffset;   // in bytes
+    uint m_cacheValueOffset; // in bytes
+    MicroSecondTimer m_start;
+    Con(NdbIndexStatImpl* impl, Head& head, Ndb* ndb);
+    ~Con();
+    int startTransaction();
+    int execute(bool commit);
+    int getNdbOperation();
+    int getNdbIndexScanOperation();
+    void set_time();
+    NDB_TICKS get_time();
+  };
+
+  // index
+  int set_index(const NdbDictionary::Index& index,
+                const NdbDictionary::Table& table);
+  void reset_index();
+
+  // init m_facadeHead here (keep API struct a POD)
+  void init_head(Head& head);
+
+  // sys tables data
+  int sys_init(Con& con);
+  void sys_release(Con& con);
+  int sys_read_head(Con& con, bool commit);
+  int sys_head_setkey(Con& con);
+  int sys_head_getvalue(Con& con);
+  int sys_sample_setkey(Con& con);
+  int sys_sample_getvalue(Con& con);
+  int sys_sample_setbound(Con& con, int sv_bound);
+
+  // update, delete (head may record elapsed time)
+  int update_stat(Ndb* ndb, Head& head);
+  int delete_stat(Ndb* ndb, Head& head);
+
+  // read
+  int read_head(Ndb* ndb, Head& head);
+  int read_stat(Ndb* ndb, Head& head);
+  int read_start(Con& con);
+  int read_next(Con& con);
+  int read_commit(Con& con);
+  int save_start(Con& con);
+  int save_next(Con& con);
+  int save_commit(Con& con);
+  int cache_init(Con& con);
+  int cache_insert(Con& con);
+  int cache_commit(Con& con);
+
+  // cache
+  struct Cache {
+    bool m_valid;
+    uint m_keyAttrs;      // number of attrs in index key
+    uint m_valueAttrs;    // number of values
+    uint m_fragCount;     // index fragments
+    uint m_sampleVersion;
+    uint m_sampleCount;   // sample count from head record
+    uint m_keyBytes;      // total key bytes from head record
+    uint m_valueLen;      // value bytes per entry i.e. valueAttrs * ValueSize
+    uint m_valueBytes;    // total value bytes i.e. sampleCount * valuelen
+    uint m_addrLen;       // 1-4 based on keyBytes
+    uint m_addrBytes;     // total address bytes
+    Uint8* m_addrArray;
+    Uint8* m_keyArray;
+    Uint8* m_valueArray;
+    Cache* m_nextClean;
+    // performance
+    mutable int m_save_time;
+    mutable int m_sort_time;
+    Cache();
+    // pos is index < sampleCount, addr is offset in keyArray
+    uint get_keyaddr(uint pos) const;
+    void set_keyaddr(uint pos, uint addr);
+    // get pointers to key and value arrays at pos
+    const Uint8* get_keyptr(uint addr) const;
+    Uint8* get_keyptr(uint addr);
+    const Uint8* get_valueptr(uint pos) const;
+    Uint8* get_valueptr(uint pos);
+    // for sort
+    void swap_entry(uint pos1, uint pos2);
+    // get stats values primitives
+    double get_rir(uint pos) const;
+    double get_rir(uint pos1, uint pos2) const;
+    double get_unq(uint pos, uint k) const;
+    double get_unq(uint pos1, uint pos2, uint k) const;
+    double get_rpk(uint pos, uint k) const;
+    double get_rpk(uint pos1, uint pos2, uint k) const;
+  };
+  int cache_cmpaddr(const Cache& c, uint addr1, uint addr2) const;
+  int cache_cmppos(const Cache& c, uint pos1, uint pos2) const;
+  int cache_sort(Cache& c);
+  void cache_isort(Cache& c);
+  void cache_hsort(Cache& c);
+  void cache_hsort_sift(Cache& c, int i, int count);
+  void cache_hsort_verify(Cache& c, int count);
+  int cache_verify(const Cache& c);
+  void move_cache();
+  void clean_cache();
+  void free_cache(Cache* c);
+  void free_cache();
+
+  // query cache dump (not available via facade)
+  struct CacheIter {
+    Uint32 m_keyCount;
+    Uint32 m_sampleCount;
+    Uint32 m_sampleIndex;
+    NdbPack::DataC m_keyData;
+    NdbPack::DataC m_valueData;
+    CacheIter(const NdbIndexStatImpl& impl);
+  };
+  int dump_cache_start(CacheIter& iter);
+  bool dump_cache_next(CacheIter& iter);
+
+  // bound
+  struct Bound {
+    NdbPack::Data m_data;
+    NdbPack::Bound m_bound;
+    int m_type;     // 0-lower 1-upper
+    int m_strict;
+    Bound(const NdbPack::Spec& spec);
+  };
+  int finalize_bound(Bound&);
+
+  // range
+  struct Range {
+    Range(Bound& bound1, Bound& bound2);
+    Bound& m_bound1;
+    Bound& m_bound2;
+  };
+  int finalize_range(Range& range);
+  int convert_range(Range& range,
+                    const NdbRecord* key_record,
+                    const NdbIndexScanOperation::IndexBound* ib);
+
+  // computed stats values
+  struct StatValue {
+    bool m_empty;
+    double m_rir;
+    double m_unq[MaxKeyCount];
+    StatValue();
+  };
+
+  // query
+  struct StatBound {
+    uint m_pos;       // non-empty bound is between pos-1,pos
+    uint m_numEqL;    // components matching key at pos-1
+    uint m_numEqH;    // components matching key at pos
+    StatValue m_value;
+    const char* m_rule;
+    StatBound();
+  };
+  struct Stat {
+    StatBound m_stat1;
+    StatBound m_stat2;
+    StatValue m_value;
+    const char* m_rule[3];
+    Stat();
+  };
+  void query_normalize(const Cache&, StatValue&);
+  void query_unq2rpk(const Cache&, StatValue&);
+  int query_stat(const Range&, Stat&);
+  void query_interpolate(const Cache&, const Range&, Stat&);
+  void query_interpolate(const Cache&, const Bound&, StatBound&);
+  void query_search(const Cache&, const Bound&, StatBound&);
+  int query_keycmp(const Cache&, const Bound&, uint pos, Uint32& numEq);
+
+  // default memory allocator
+  struct MemDefault : public Mem {
+    virtual void* mem_alloc(size_t bytes);
+    virtual void mem_free(void* p);
+    virtual size_t mem_used() const;
+    MemDefault();
+    virtual ~MemDefault();
+  private:
+    enum { MemMagic = 0xf1f2f3f4 };
+    struct Item {
+      Uint32 m_magic;
+      Uint32 m_size;
+    };
+    size_t m_used;
+  };
+  static MemDefault g_mem_default_handler;
+
+  // error
+  const NdbIndexStat::Error& getNdbError() const;
+  void setError(int code, int line, int extra = 0);
+  void setError(const Con& con, int line);
+  void mapError(const int* map, int code);
+};
+
+inline
+NdbIndexStatImpl::Bound::Bound(const NdbPack::Spec& spec) :
+  m_data(spec, true, 0),
+  m_bound(m_data)
+{
+  m_type = -1;
+  m_strict = -1;
+}
+
+inline
+NdbIndexStatImpl::Range::Range(Bound& bound1, Bound& bound2) :
+  m_bound1(bound1),
+  m_bound2(bound2)
+{
+  bound1.m_type = 0;
+  bound2.m_type = 1;
+}
+
+inline int
+NdbIndexStatImpl::finalize_range(Range& range)
+{
+  if (finalize_bound(range.m_bound1) == -1)
+    return -1;
+  if (finalize_bound(range.m_bound2) == -1)
+    return -1;
+  return 0;
+}
+
+inline
+NdbIndexStatImpl::StatValue::StatValue()
+{
+  m_empty = false;
+}
+
+inline
+NdbIndexStatImpl::StatBound::StatBound()
+{
+  m_pos = 0;
+  m_numEqL = 0;
+  m_numEqH = 0;
+}
+
+inline
+NdbIndexStatImpl::Stat::Stat()
+{
+  m_rule[0] = 0;
+  m_rule[1] = 0;
+  m_rule[2] = 0;
+}
+ 
+#endif

=== modified file 'storage/ndb/src/ndbapi/Ndberr.cpp'
--- a/storage/ndb/src/ndbapi/Ndberr.cpp	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/src/ndbapi/Ndberr.cpp	2011-06-06 12:18:27 +0000
@@ -95,3 +95,10 @@ NdbQueryImpl::getNdbError() const {
   update(m_error);
   return m_error;
 }
+
+const
+NdbIndexStat::Error &
+NdbIndexStatImpl::getNdbError() const {
+  update(m_error);
+  return m_error;
+}

=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp	2011-04-27 10:48:16 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp	2011-06-06 12:18:27 +0000
@@ -722,6 +722,8 @@ Ndb::handleReceivedSignal(const NdbApiSi
   case GSN_CREATE_INDX_REF:
   case GSN_DROP_INDX_CONF:
   case GSN_DROP_INDX_REF:
+  case GSN_INDEX_STAT_CONF:
+  case GSN_INDEX_STAT_REF:
   case GSN_CREATE_EVNT_CONF:
   case GSN_CREATE_EVNT_REF:
   case GSN_DROP_EVNT_CONF:

=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c	2011-05-31 08:28:58 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c	2011-06-06 12:18:27 +0000
@@ -540,12 +540,13 @@ ErrorBundle ErrorCodes[] = {
   /*
    * Index stats error codes
    */
-  { 4714, DMEC, AE, "Index stats sys tables " NDB_INDEX_STAT_PREFIX " are invalid" },
+  { 4714, DMEC, AE, "Index stats sys tables " NDB_INDEX_STAT_PREFIX " do not exist" },
   { 4715, DMEC, AE, "Index stats for specified index do not exist" },
   { 4716, DMEC, AE, "Index stats methods usage error" },
   { 4717, DMEC, AE, "Index stats cannot allocate memory" },
   { 4718, DMEC, IE, "Index stats memory cache is corrupted" },
   { 4719, DMEC, IE, "Index stats internal error" },
+  { 4720, DMEC, AE, "Index stats sys tables " NDB_INDEX_STAT_PREFIX " partly missing or invalid" },
   
   /**
    * Still uncategorized

=== modified file 'storage/ndb/test/ndbapi/testIndexStat.cpp'
--- a/storage/ndb/test/ndbapi/testIndexStat.cpp	2011-02-02 00:40:07 +0000
+++ b/storage/ndb/test/ndbapi/testIndexStat.cpp	2011-06-06 12:18:27 +0000
@@ -21,16 +21,9 @@
 #include <NdbIndexStat.hpp>
 #include <NdbTest.hpp>
 #include <ndb_version.h>
+#include <NDBT_Stats.hpp>
 #include <math.h>
 
-/*
- * Sample results:
- *
- * 0. err pct: count: 1000 min: -99.99 max: 99.92 avg: 6.88 stddev: 27.61
- *
- * 0. baseline with same options as handler
- */
-
 #undef min
 #undef max
 #define min(a, b) ((a) <= (b) ? (a) : (b))
@@ -48,93 +41,98 @@ NdbOut::operator<<(double x)
 struct Opts {
   int loglevel;
   uint seed;
-  uint loop;
+  uint attrs;
+  uint loops;
   uint rows;
   uint ops;
   uint nullkeys;
-  uint dupkeys;
+  uint rpk;
+  uint rpkvar;
   uint scanpct;
   uint eqscans;
-  uint dupscans;
   my_bool keeptable;
-  my_bool loaddata;
-  my_bool nochecks;
   my_bool abort;
-  // internal
-  uint tryhard;
+  const char* dump;
   Opts() :
     loglevel(0),
-    seed(-1),
-    loop(1),
-    rows(100000),
-    ops(1000),
+    seed(0),
+    attrs(3),
+    loops(1),
+    rows(10000),
+    ops(100),
     nullkeys(10),
-    dupkeys(1000),
-    scanpct(5),
-    eqscans(50),
-    dupscans(10),
+    rpk(10),
+    rpkvar(10),
+    scanpct(10),
+    eqscans(30),
     keeptable(false),
-    loaddata(true),
-    nochecks(false),
     abort(false),
-    // internal
-    tryhard(20)
+    dump(0)
   {}
 };
 
 static Opts g_opts;
-const char* g_progname = "testIndexStat";
 static uint g_loop = 0;
 
-static const char* g_tabname = "ts0";
-static const char* g_indname = "ts0x1";
-static const char g_numattrs = 3;
+static const char* g_tabname = "ts1";
+static const char* g_indname = "ts1x1";
+static const uint g_numattrs = 3;
 static const uint g_charlen = 10;
 static const char* g_csname = "latin1_swedish_ci";
 static CHARSET_INFO* g_cs;
 
-// value and bound ranges
-static uint g_val_b_max = 10;
-static uint g_bnd_b_max = 20;
-static const char* g_val_c_char = "bcd";
-static const char* g_bnd_c_char = "abcde";
-static uint g_val_d_max = 100;
-static uint g_bnd_d_max = 200;
+// keys nullability
+static const bool g_b_nullable = true;
+static const bool g_c_nullable = true;
+static const bool g_d_nullable = true;
+
+// value limits
+struct Lim {
+  bool all_nullable;
+  uint b_min;
+  uint b_max;
+  const char* c_char;
+  uint d_min;
+  uint d_max;
+};
+
+static Lim g_lim_val;
+static Lim g_lim_bnd;
 
 static Ndb_cluster_connection* g_ncc = 0;
 static Ndb* g_ndb = 0;
+static Ndb* g_ndb_sys = 0;
 static NdbDictionary::Dictionary* g_dic = 0;
 static const NdbDictionary::Table* g_tab = 0;
 static const NdbDictionary::Index* g_ind = 0;
 static const NdbRecord* g_tab_rec = 0;
 static const NdbRecord* g_ind_rec = 0;
 
-
 struct my_record
 {
   Uint32 m_null_bm;
   Uint32 m_a;
-  Uint16 m_b;
+  Uint32 m_b;
   char m_c[1+g_charlen];
-  Uint32 m_d;
+  Uint16 m_d;
 };
 
-
 static const Uint32 g_ndbrec_a_offset=offsetof(my_record, m_a);
 static const Uint32 g_ndbrec_b_offset=offsetof(my_record, m_b);
+static const Uint32 g_ndbrec_b_nb_offset=1;
 static const Uint32 g_ndbrec_c_offset=offsetof(my_record, m_c);
 static const Uint32 g_ndbrec_c_nb_offset=2;
 static const Uint32 g_ndbrec_d_offset=offsetof(my_record, m_d);
 static const Uint32 g_ndbrec_d_nb_offset=3;
 static const Uint32 g_ndbrecord_bytes=sizeof(my_record);
 
-static NdbIndexStat* g_stat = 0;
-
 static NdbTransaction* g_con = 0;
 static NdbOperation* g_op = 0;
 static NdbScanOperation* g_scan_op = 0;
 static NdbIndexScanOperation* g_rangescan_op = 0;
 
+static NdbIndexStat* g_is = 0;
+
 static uint
 urandom()
 {
@@ -157,12 +155,12 @@ static int& g_loglevel = g_opts.loglevel
 #define chkdb(x) \
   do { if (likely(x)) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; errdb(); if (g_opts.abort) abort(); return -1; } while (0)
 
+#define chker(x) \
+  do { if (likely(x)) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; ndbout << "errno: " << errno; if (g_opts.abort) abort(); return -1; } while (0)
+
 #define chkrc(x) \
   do { if (likely(x)) break; ndbout << "line " << __LINE__ << " FAIL " << #x << endl; if (g_opts.abort) abort(); return -1; } while (0)
 
-#define reqrc(x) \
-  do { if (likely(x)) break; ndbout << "line " << __LINE__ << " ASSERT " << #x << endl; abort(); } while (0)
-
 #define llx(n, x) \
   do { if (likely(g_loglevel < n)) break; ndbout << x << endl; } while (0)
 
@@ -175,7 +173,13 @@ static void
 errdb()
 {
   uint any = 0;
-  // g_ncc return no error...
+  if (g_ncc != 0) {
+    NdbError e;
+    e.code = g_ncc->get_latest_error();
+    e.message = g_ncc->get_latest_error_msg();
+    if (e.code != 0)
+      ll0(++any << " ncc: error" << e);
+  }
   if (g_ndb != 0) {
     const NdbError& e = g_ndb->getNdbError();
     if (e.code != 0)
@@ -206,8 +210,8 @@ errdb()
     if (e.code != 0)
       ll0(++any << " rangescan_op: error " << e);
   }
-  if (g_stat != 0) {
-    const NdbError& e = g_stat->getNdbError();
+  if (g_is != 0) {
+    const NdbError& e = g_is->getNdbError();
     if (e.code != 0)
       ll0(++any << " stat: error " << e);
   }
@@ -219,37 +223,54 @@ errdb()
 static int
 createNdbRecords()
 {
+  ll1("createNdbRecords");
   const Uint32 numCols=4;
   const Uint32 numIndexCols=3;
   NdbDictionary::RecordSpecification recSpec[numCols];
 
   recSpec[0].column= g_tab->getColumn("a"); // 4 bytes
   recSpec[0].offset= g_ndbrec_a_offset;
-  recSpec[0].nullbit_byte_offset= 0;
-  recSpec[0].nullbit_bit_in_byte= 0;
+  recSpec[0].nullbit_byte_offset= ~(Uint32)0;
+  recSpec[0].nullbit_bit_in_byte= ~(Uint32)0;
  
-  recSpec[1].column= g_tab->getColumn("b"); // 2 bytes
+  recSpec[1].column= g_tab->getColumn("b"); // 4 bytes
   recSpec[1].offset= g_ndbrec_b_offset;
-  recSpec[1].nullbit_byte_offset= 0;
-  recSpec[1].nullbit_bit_in_byte= 0;
+  if (g_b_nullable) {
+    recSpec[1].nullbit_byte_offset= 0;
+    recSpec[1].nullbit_bit_in_byte= g_ndbrec_b_nb_offset;
+  } else {
+    recSpec[1].nullbit_byte_offset= ~(Uint32)0;
+    recSpec[1].nullbit_bit_in_byte= ~(Uint32)0;
+  }
  
   recSpec[2].column= g_tab->getColumn("c"); // Varchar(10) -> ~12 bytes
   recSpec[2].offset= g_ndbrec_c_offset;
-  recSpec[2].nullbit_byte_offset= 0;
-  recSpec[2].nullbit_bit_in_byte= g_ndbrec_c_nb_offset;
+  if (g_c_nullable) {
+    recSpec[2].nullbit_byte_offset= 0;
+    recSpec[2].nullbit_bit_in_byte= g_ndbrec_c_nb_offset;
+  } else {
+    recSpec[2].nullbit_byte_offset= ~(Uint32)0;
+    recSpec[2].nullbit_bit_in_byte= ~(Uint32)0;
+  }
 
-  recSpec[3].column= g_tab->getColumn("d"); // 4 bytes
+  recSpec[3].column= g_tab->getColumn("d"); // 2 bytes
   recSpec[3].offset= g_ndbrec_d_offset;
-  recSpec[3].nullbit_byte_offset= 0;
-  recSpec[3].nullbit_bit_in_byte= g_ndbrec_d_nb_offset;
+  if (g_d_nullable) {
+    recSpec[3].nullbit_byte_offset= 0;
+    recSpec[3].nullbit_bit_in_byte= g_ndbrec_d_nb_offset;
+  } else {
+    recSpec[3].nullbit_byte_offset= ~(Uint32)0;
+    recSpec[3].nullbit_bit_in_byte= ~(Uint32)0;
+  }
 
+  g_dic = g_ndb->getDictionary();
   g_tab_rec= g_dic->createRecord(g_tab,
                                  &recSpec[0],
                                  numCols,
                                  sizeof(NdbDictionary::RecordSpecification),
                                  0);
 
-  chkrc(g_tab_rec != NULL);
+  chkdb(g_tab_rec != NULL);
 
   g_ind_rec= g_dic->createRecord(g_ind,
                                  &recSpec[1],
@@ -257,18 +278,21 @@ createNdbRecords()
                                  sizeof(NdbDictionary::RecordSpecification),
                                  0);
   
-  chkrc(g_ind_rec != NULL);
+  chkdb(g_ind_rec != NULL);
+  g_dic = 0;
 
   return 0;
 }
 
 // create table ts0 (
-//   a int unsigned, b smallint not null, c varchar(10), d int unsigned,
+//   a int unsigned,
+//   b int unsigned, c varchar(10), d smallint unsigned,
 //   primary key using hash (a), index (b, c, d) )
 
 static int
 createtable()
 {
+  ll1("createtable");
   NdbDictionary::Table tab(g_tabname);
   tab.setLogging(false);
   {
@@ -279,8 +303,8 @@ createtable()
   }
   {
     NdbDictionary::Column col("b");
-    col.setType(NdbDictionary::Column::Smallint);
-    col.setNullable(false);
+    col.setType(NdbDictionary::Column::Unsigned);
+    col.setNullable(g_b_nullable);
     tab.addColumn(col);
   }
   {
@@ -288,15 +312,29 @@ createtable()
     col.setType(NdbDictionary::Column::Varchar);
     col.setLength(g_charlen);
     col.setCharset(g_cs);
-    col.setNullable(true);
+    col.setNullable(g_c_nullable);
     tab.addColumn(col);
   }
   {
     NdbDictionary::Column col("d");
-    col.setType(NdbDictionary::Column::Unsigned);
-    col.setNullable(true);
+    col.setType(NdbDictionary::Column::Smallunsigned);
+    col.setNullable(g_d_nullable);
     tab.addColumn(col);
   }
+
+  g_dic = g_ndb->getDictionary();
+  if (g_dic->getTable(g_tabname) != 0)
+    chkdb(g_dic->dropTable(g_tabname) == 0);
+  chkdb(g_dic->createTable(tab) == 0);
+  chkdb((g_tab = g_dic->getTable(g_tabname)) != 0);
+  g_dic = 0;
+  return 0;
+}
+
+static int
+createindex()
+{
+  ll1("createindex");
   NdbDictionary::Index ind(g_indname);
   ind.setTable(g_tabname);
   ind.setType(NdbDictionary::Index::OrderedIndex);
@@ -304,24 +342,10 @@ createtable()
   ind.addColumnName("b");
   ind.addColumnName("c");
   ind.addColumnName("d");
+
   g_dic = g_ndb->getDictionary();
-  if (! g_opts.keeptable) {
-    if (g_dic->getTable(g_tabname) != 0)
-      chkdb(g_dic->dropTable(g_tabname) == 0);
-    chkdb(g_dic->createTable(tab) == 0);
-    chkdb(g_dic->createIndex(ind) == 0);
-  } else {
-    if (g_dic->getTable(g_tabname) == 0) {
-      chkdb(g_dic->createTable(tab) == 0);
-      chkdb(g_dic->createIndex(ind) == 0);
-    } else
-      g_opts.loaddata = false;
-  }
-  chkdb((g_tab = g_dic->getTable(g_tabname)) != 0);
+  chkdb(g_dic->createIndex(ind) == 0);
   chkdb((g_ind = g_dic->getIndex(g_indname, g_tabname)) != 0);
-  
-  chkrc(createNdbRecords() == 0);
-
   g_dic = 0;
   return 0;
 }
@@ -329,33 +353,45 @@ createtable()
 static int
 droptable()
 {
+  ll1("droptable");
   g_dic = g_ndb->getDictionary();
-  if (! g_opts.keeptable)
-    chkdb(g_dic->dropTable(g_tabname) == 0);
+  chkdb(g_dic->dropTable(g_tabname) == 0);
   g_dic = 0;
   return 0;
 }
 
+// values for keys and bounds
+
 struct Val {
-  Int16 b;
-  bool c_null;
+  uint8 m_numattrs;
+  int8 b_null;
+  int8 c_null;
+  int8 d_null;
+  Uint32 b;
   uchar c[1 + g_charlen];
-  bool d_null;
-  Uint32 d;
-  // partial values for use in Bnd
-  uint numattrs;
-  void make(uint n = g_numattrs, bool is_val = true);
-  int cmp(const Val& val, uint n = g_numattrs) const;
+  Uint16 d;
+  Val();
+  void init();
+  void copy(const Val& val2);
+  void make(uint numattrs, const Lim& lim);
+  int cmp(const Val& val2, uint numattrs = g_numattrs, uint* num_eq = 0) const;
+
+private:
+  Val& operator=(const Val&);
+  Val(const Val&);
 };
 
 static NdbOut&
 operator<<(NdbOut& out, const Val& val)
 {
   out << "[";
-  if (val.numattrs >= 1) {
-    out << val.b;
+  if (val.m_numattrs >= 1) {
+    if (val.b_null)
+      out << "NULL";
+    else
+      out << val.b;
   }
-  if (val.numattrs >= 2) {
+  if (val.m_numattrs >= 2) {
     out << " ";
     if (val.c_null)
       out << "NULL";
@@ -365,7 +401,7 @@ operator<<(NdbOut& out, const Val& val)
       out << "'" << buf << "'";
     }
   }
-  if (val.numattrs >= 3) {
+  if (val.m_numattrs >= 3) {
     out << " ";
     if (val.d_null)
       out <<" NULL";
@@ -376,237 +412,303 @@ operator<<(NdbOut& out, const Val& val)
   return out;
 }
 
+Val::Val()
+{
+  init();
+}
+
+void
+Val::init()
+{
+  m_numattrs = 0;
+  // junk rest
+  b_null = -1;
+  c_null = -1;
+  d_null = -1;
+  b = ~(Uint32)0;
+  memset(c, 0xff, sizeof(c));
+  d = ~(Uint16)0;
+}
+
+void
+Val::copy(const Val& val2)
+{
+  require(this != &val2);
+  init();
+  m_numattrs = val2.m_numattrs;
+  if (m_numattrs >= 1) {
+    require(val2.b_null == 0 || val2.b_null == 1);
+    b_null = val2.b_null;
+    if (!b_null)
+      b = val2.b;
+  }
+  if (m_numattrs >= 2) {
+    require(val2.c_null == 0 || val2.c_null == 1);
+    c_null = val2.c_null;
+    if (!c_null)
+      memcpy(c, val2.c, sizeof(c));
+  }
+  if (m_numattrs >= 3) {
+    require(val2.d_null == 0 || val2.d_null == 1);
+    d_null = val2.d_null;
+    if (!d_null)
+      d = val2.d;
+  }
+}
+
 void
-Val::make(uint n, bool is_val)
+Val::make(uint numattrs, const Lim& lim)
 {
-  if (n >= 1) {
-    uint b_max = is_val ? g_val_b_max : g_bnd_b_max;
-    b = (int)urandom(2 * b_max) - (int)b_max;
+  require(numattrs <= g_numattrs);
+  if (numattrs >= 1) {
+    const bool nullable = g_b_nullable || lim.all_nullable;
+    if (nullable && urandom(100) < g_opts.nullkeys)
+      b_null = 1;
+    else {
+      require(lim.b_min <= lim.b_max);
+      b = lim.b_min + urandom(lim.b_max - lim.b_min + 1);
+      b_null = 0;
+    }
   }
-  if (n >= 2) {
-    if (urandom(100) < g_opts.nullkeys)
+  if (numattrs >= 2) {
+    const bool nullable = g_c_nullable || lim.all_nullable;
+    if (nullable && urandom(100) < g_opts.nullkeys)
       c_null = 1;
     else {
-      const char* c_char = is_val ? g_val_c_char : g_bnd_c_char;
       // prefer shorter
-      uint len = urandom(urandom(g_charlen + 2));
+      const uint len = urandom(urandom(g_charlen + 1) + 1);
       c[0] = len;
-      uint j;
-      for (j = 0; j < len; j++) {
-        uint k = urandom(strlen(c_char));
-        c[1 + j] = c_char[k];
+      for (uint j = 0; j < len; j++) {
+        uint k = urandom(strlen(lim.c_char));
+        c[1 + j] = lim.c_char[k];
       }
       c_null = 0;
     }
   }
-  if (n >= 3) {
-    if (urandom(100) < g_opts.nullkeys)
+  if (numattrs >= 3) {
+    const bool nullable = g_d_nullable || lim.all_nullable;
+    if (nullable && urandom(100) < g_opts.nullkeys)
       d_null = 1;
     else {
-      uint d_max = is_val ? g_val_d_max : g_bnd_d_max;
-      d = urandom(d_max);
+      require(lim.d_min <= lim.d_max);
+      d = lim.d_min + urandom(lim.d_max - lim.d_min + 1);
       d_null = 0;
     }
   }
-  numattrs = n;
+  m_numattrs = numattrs;
 }
 
 int
-Val::cmp(const Val& val, uint n) const
+Val::cmp(const Val& val2, uint numattrs, uint* num_eq) const
 {
-  int k = 0;
-  if (k == 0 && n >= 1) {
-    if (b < val.b)
-      k = -1;
-    else if (b > val.b)
+  require(numattrs <= m_numattrs);
+  require(numattrs <= val2.m_numattrs);
+  uint n = 0; // attr index where differs
+  uint k = 0;
+  if (k == 0 && numattrs >= 1) {
+    if (! b_null && ! val2.b_null) {
+      if (b < val2.b)
+        k = -1;
+      else if (b > val2.b)
+        k = +1;
+    } else if (! b_null) {
       k = +1;
+    } else if (! val2.b_null) {
+      k = -1;
+    }
+    if (k == 0)
+      n++;
   }
-  if (k == 0 && n >= 2) {
-    if (! c_null && ! val.c_null) {
+  if (k == 0 && numattrs >= 2) {
+    if (! c_null && ! val2.c_null) {
       const uchar* s1 = &c[1];
-      const uchar* s2 = &val.c[1];
+      const uchar* s2 = &val2.c[1];
       const uint l1 = (uint)c[0];
-      const uint l2 = (uint)val.c[0];
+      const uint l2 = (uint)val2.c[0];
       assert(l1 <= g_charlen && l2 <= g_charlen);
       k = g_cs->coll->strnncollsp(g_cs, s1, l1, s2, l2, 0);
     } else if (! c_null) {
       k = +1;
-    } else if (! val.c_null) {
+    } else if (! val2.c_null) {
       k = -1;
     }
+    if (k == 0)
+      n++;
   }
-  if (k == 0 && n >= 3) {
-    if (! d_null && ! val.d_null) {
-      if (d < val.d)
+  if (k == 0 && numattrs >= 3) {
+    if (! d_null && ! val2.d_null) {
+      if (d < val2.d)
         k = -1;
-      else if (d > val.d)
+      else if (d > val2.d)
         k = +1;
     } else if (! d_null) {
       k = +1;
-    } else if (! val.d_null) {
+    } else if (! val2.d_null) {
       k = -1;
     }
+    if (k == 0)
+      n++;
   }
+  require(n <= numattrs);
+  if (num_eq != 0)
+    *num_eq = n;
   return k;
 }
 
+// index keys
+
 struct Key {
-  Val val;
-  union {
-    bool flag;
-    uint count;
-    uint rpk;
-  };
+  Val m_val;
+  int8 m_flag; // temp use
+  Key();
+
+private:
+  Key& operator=(const Key&);
+  Key(const Key&);
 };
 
 static NdbOut&
 operator<<(NdbOut& out, const Key& key)
 {
-  out << key.val << " info:" << key.count;
+  out << key.m_val;
+  if (key.m_flag != -1)
+    out << " flag: " << key.m_flag;
   return out;
 }
 
+Key::Key()
+{
+  m_flag = -1;
+}
+
 static Key* g_keys = 0;
-static Key* g_sortkeys = 0;
+static uint* g_sortkeys = 0;
 static uint g_sortcount = 0;
-static Key* g_minkey = 0;
-static Key* g_maxkey = 0;
 
 static void
 freekeys()
 {
-  if (g_keys != 0)
-    free(g_keys);
-  if (g_sortkeys != 0)
-    free(g_sortkeys);
+  delete [] g_keys;
+  delete [] g_sortkeys;
   g_keys = 0;
   g_sortkeys = 0;
 }
 
-static int
+static void
 allockeys()
 {
   freekeys();
-  size_t sz = sizeof(Key) * g_opts.rows;
-  g_keys = (Key*)malloc(sz);
-  g_sortkeys = (Key*)malloc(sz);
-  chkrc(g_keys != 0 && g_sortkeys != 0);
-  memset(g_keys, 0x1f, sz);
-  memset(g_sortkeys, 0x1f, sz);
-  return 0;
+  g_keys = new Key [g_opts.rows];
+  g_sortkeys = new uint [g_opts.rows];
+  require(g_keys != 0 && g_sortkeys != 0);
+  memset(g_sortkeys, 0xff, sizeof(uint) * g_opts.rows);
+}
+
+static int
+cmpkeys(const void* p1, const void* p2)
+{
+  const uint i1 = *(const uint*)p1;
+  const uint i2 = *(const uint*)p2;
+  require(i1 < g_opts.rows && i2 < g_opts.rows);
+  const Key& key1 = g_keys[i1];
+  const Key& key2 = g_keys[i2];
+  const int k = key1.m_val.cmp(key2.m_val, g_opts.attrs);
+  return k;
 }
 
 static void
-makekeys()
+sortkeys()
 {
+  ll2("sortkeys");
   uint i;
-  for (i = 0; i < g_opts.rows; i++) {
-    Key& key = g_keys[i];
-    key.val.make();
-    key.flag = false; // mark for dup generation done
-  }
-  for (i = 0; i < g_opts.rows; i++) {
-    Key& key = g_keys[i];
-    if (key.flag)
-      continue;
-    key.flag = true;
-    uint fudge = 9;
-    uint n = (urandom(fudge * (g_opts.dupkeys - 100)) + 99) / 100;
-    uint k;
-    for (k = 1; k < n; k++) {
-      uint j = urandom(g_opts.rows);
-      do {
-        Key& dst = g_keys[j];
-        if (! dst.flag) {
-          dst.val = key.val;
-          dst.flag = true;
-          break;
-        }
-      } while (urandom(g_opts.tryhard) != 0);
-    }
-  }
+
+  // sort
+  for (i = 0; i < g_opts.rows; i++)
+    g_sortkeys[i] = i;
+  qsort(g_sortkeys, g_opts.rows, sizeof(uint), cmpkeys);
+
+  // verify
+  uint unique = 1;
+  for (i = 1; i < g_opts.rows; i++) {
+    const uint i1 = g_sortkeys[i - 1];
+    const uint i2 = g_sortkeys[i];
+    require(i1 < g_opts.rows && i2 < g_opts.rows);
+    const Key& key1 = g_keys[i1];
+    const Key& key2 = g_keys[i2];
+    const int k = key1.m_val.cmp(key2.m_val, g_opts.attrs);
+    require(k <= 0);
+    if (k < 0)
+      unique++;
+  }
+
+  // show min max key
+  ll1("minkey:" << g_keys[g_sortkeys[0]]);
+  ll1("maxkey:" << g_keys[g_sortkeys[g_opts.rows - 1]]);
+  ll1("unique:" << unique);
 }
 
-static int
-insertdata()
+static void
+makekeys()
 {
-  const uint batch = 512;
-  chkdb((g_con = g_ndb->startTransaction()) != 0);
+  ll1("makekeys");
+
+  uint initrows = g_opts.rows / g_opts.rpk;
+  require(initrows != 0);
+
+  // distinct keys
   uint i = 0;
+  while (i < initrows) {
+    Key& key = g_keys[i];
+    key.m_val.make(g_numattrs, g_lim_val);
+    i++;
+  }
+
+  // remaining keys
   while (i < g_opts.rows) {
-    chkdb((g_op = g_con->getNdbOperation(g_tab)) != 0);
-    chkdb(g_op->insertTuple() == 0);
-    Uint32 a = i;
-    const Val& val = g_keys[i].val;
-    const char* a_addr = (const char*)&a;
-    const char* b_addr = (const char*)&val.b;
-    const char* c_addr = ! val.c_null ? (const char*)val.c : 0;
-    const char* d_addr = ! val.d_null ? (const char*)&val.d : 0;
-    Uint32 no = 0;
-    chkdb(g_op->equal(no++, a_addr) == 0);
-    chkdb(g_op->setValue(no++, b_addr) == 0);
-    chkdb(g_op->setValue(no++, c_addr) == 0);
-    chkdb(g_op->setValue(no++, d_addr) == 0);
-    if (i++ % batch == 0) {
-      chkdb(g_con->execute(NdbTransaction::Commit) == 0);
-      g_ndb->closeTransaction(g_con);
-      g_con = 0;
-      g_op = 0;
-      chkdb((g_con = g_ndb->startTransaction()) != 0);
+    // if rpkvar is 10, multiply rpk by number between 0.1 and 10.0
+    double a = (double)(1 + urandom(g_opts.rpkvar * g_opts.rpkvar));
+    double b = a / (double)g_opts.rpkvar;
+    double c = b * (double)g_opts.rpk;
+    const uint n = (uint)(c + 0.5);
+    // select random key to duplicate from initrows
+    const uint k = urandom(initrows);
+    uint j = 0;
+    while (i < g_opts.rows && j < n) {
+      g_keys[i].m_val.copy(g_keys[k].m_val);
+      j++;
+      i++;
     }
   }
-  chkdb(g_con->execute(NdbTransaction::Commit) == 0);
-  g_ndb->closeTransaction(g_con);
-  g_con = 0;
-  g_op = 0;
-  ll0(g_tabname << ": inserted " << g_opts.rows << " rows");
-  return 0;
-}
-
-static int
-countrows()
-{
-  Uint64 rows = 0;
-  Uint64 r;
-  char* r_addr = (char*)&r;
-  
-  const Uint32 codeWords= 1;
-  Uint32 codeSpace[ codeWords ];
-  NdbInterpretedCode code(NULL, // Table is irrelevant
-                          &codeSpace[0],
-                          codeWords);
-  chkrc(code.interpret_exit_last_row());
-  chkrc(code.finalise());
 
-  chkdb((g_con = g_ndb->startTransaction()) != 0);
-  chkdb((g_scan_op = g_con->getNdbScanOperation(g_tab)) != 0);
-  chkdb(g_scan_op->readTuples() == 0);
-  chkdb(g_scan_op->setInterpretedCode(&code) == 0);
-  chkdb(g_scan_op->getValue(NdbDictionary::Column::ROW_COUNT, r_addr) != 0);
-  chkdb(g_con->execute(NdbTransaction::NoCommit) == 0);
-  while (1) {
-    int ret;
-    r = ~(Uint64)0;
-    chkdb((ret = g_scan_op->nextResult()) == 0 || ret == 1);
-    if (ret == 1)
-      break;
-    rows += r;
+  // shuffle
+  i = 0;
+  while (i < g_opts.rows) {
+    uint j = urandom(g_opts.rows);
+    if (i != j) {
+      Key tmp;
+      tmp.m_val.copy(g_keys[i].m_val);
+      g_keys[i].m_val.copy(g_keys[j].m_val);
+      g_keys[j].m_val.copy(tmp.m_val);
+    }
+    i++;
   }
-  g_ndb->closeTransaction(g_con);
-  g_con = 0;
-  g_scan_op = 0;
-  g_opts.rows = rows;
-  return 0;
+
+  // sort
+  sortkeys();
 }
 
+// data loading
+
 static int
-scandata()
+verifydata()
 {
+  ll3("verifydata");
   chkdb((g_con = g_ndb->startTransaction()) != 0);
   chkdb((g_scan_op = g_con->getNdbScanOperation(g_tab)) != 0);
-  chkdb(g_scan_op->readTuples() == 0);
+  chkdb(g_scan_op->readTuples(NdbScanOperation::LM_CommittedRead) == 0);
   Uint32 a;
   Val val;
+  val.m_numattrs = g_numattrs;
   char* a_addr = (char*)&a;
   char* b_addr = (char*)&val.b;
   char* c_addr = (char*)val.c;
@@ -622,116 +724,93 @@ scandata()
   chkdb(g_con->execute(NdbTransaction::NoCommit) == 0);
   uint count = 0;
   uint i;
-  for (i = 0; i < g_opts.rows; i++)
-    g_keys[i].count = 0;
+  for (i = 0; i < g_opts.rows; i++) {
+    Key& key = g_keys[i];
+    key.m_flag = false; // not scanned
+  }
   while (1) {
     int ret;
     a = ~(Uint32)0;
     chkdb((ret = g_scan_op->nextResult()) == 0 || ret == 1);
     if (ret == 1)
       break;
-    assert(b_ra->isNULL() == 0 && c_ra->isNULL() != -1 && d_ra->isNULL() != -1);
+    val.b_null = b_ra->isNULL();
     val.c_null = c_ra->isNULL();
     val.d_null = d_ra->isNULL();
+    require(val.b_null == 0 || (g_b_nullable && val.b_null == 1));
+    require(val.c_null == 0 || (g_c_nullable && val.c_null == 1));
+    require(val.d_null == 0 || (g_d_nullable && val.d_null == 1));
     i = (uint)a;
     chkrc(i < g_opts.rows);
     Key& key = g_keys[i];
-    if (g_opts.loaddata)
-      chkrc(key.val.cmp(val) == 0);
-    else
-      key.val = val;
-    key.count++;
+    chkrc(key.m_val.cmp(val) == 0);
+    chkrc(key.m_flag == false);
+    key.m_flag = true;
     count++;
   }
   g_ndb->closeTransaction(g_con);
   g_con = 0;
   g_scan_op = 0;
-  for (i = 0; i < g_opts.rows; i++)
-    chkrc(g_keys[i].count == 1);
+  for (i = 0; i < g_opts.rows; i++) {
+    Key& key = g_keys[i];
+    chkrc(key.m_flag == true);
+    key.m_flag = -1; // forget
+  }
   assert(count == g_opts.rows);
-  int level = g_opts.loaddata ? 1 : 0;
-  llx(level, g_tabname << ": scanned " << g_opts.rows << " rows");
+  ll3("verifydata: " << g_opts.rows << " rows");
   return 0;
 }
 
 static int
-loaddata()
+loaddata(bool update)
 {
-  if (g_opts.loaddata) {
-    chkrc(allockeys() == 0);
-    makekeys();
-    chkrc(insertdata() == 0);
-  } else {
-    chkrc(countrows() == 0);
-    chkrc(g_opts.rows != 0);
-    ll0(g_tabname << ": using old table of " << g_opts.rows << " rows");
-    chkrc(allockeys() == 0);
-  }
-  chkrc(scandata() == 0);
-  uint i;
-  for (i = 0; i < g_opts.rows; i++)
-    ll3(i << ": " << g_keys[i]);
-  return 0;
-}
-
-// true = match, index = match or next higher
-static bool
-sortval(const Val& val, int& index)
-{
-  if (unlikely(g_sortcount == 0)) {
-    index = 0;
-    return false;
-  }
-  int lo = -1;
-  int hi = (int)g_sortcount;
-  int ret;
-  int j;
-  do {
-    j = (hi + lo) / 2;
-    ret = val.cmp(g_sortkeys[j].val);
-    if (ret < 0)
-      hi = j;
-    else if (ret > 0)
-      lo = j;
+  ll1("loaddata: update: " << update);
+  const uint batch = 512;
+  chkdb((g_con = g_ndb->startTransaction()) != 0);
+  uint i = 0;
+  while (i < g_opts.rows) {
+    chkdb((g_op = g_con->getNdbOperation(g_tab)) != 0);
+    if (!update)
+      chkdb(g_op->insertTuple() == 0);
     else
-      break;
-  } while (hi - lo > 1);
-  if (ret == 0) {
-    index = j;
-    return true;
-  }
-  index = hi;
-  return false;
-}
-
-static void
-sortkeys()
-{
-  // insert sort with binary search
-  g_sortcount = 0;
-  uint i;
-  for (i = 0; i < g_opts.rows; i++) {
-    const Val& val = g_keys[i].val;
-    int index;
-    bool match = sortval(val, index);
-    Key& dst = g_sortkeys[index];
-    if (match) {
-      dst.rpk++;
-    } else {
-      uint bytes = ((int)g_sortcount - index) * sizeof(Key);
-      memmove(&dst + 1, &dst, bytes);
-      dst.val = val;
-      dst.rpk = 1;
-      g_sortcount++;
+      chkdb(g_op->updateTuple() == 0);
+    Uint32 a = i;
+    const Val& val = g_keys[i].m_val;
+    const char* a_addr = (const char*)&a;
+    const char* b_addr = ! val.b_null ? (const char*)&val.b : 0;
+    const char* c_addr = ! val.c_null ? (const char*)val.c : 0;
+    const char* d_addr = ! val.d_null ? (const char*)&val.d : 0;
+    Uint32 no = 0;
+    chkdb(g_op->equal(no++, a_addr) == 0);
+    chkdb(g_op->setValue(no++, b_addr) == 0);
+    chkdb(g_op->setValue(no++, c_addr) == 0);
+    chkdb(g_op->setValue(no++, d_addr) == 0);
+    if (i++ % batch == 0) {
+      chkdb(g_con->execute(NdbTransaction::Commit) == 0);
+      g_ndb->closeTransaction(g_con);
+      g_con = 0;
+      g_op = 0;
+      chkdb((g_con = g_ndb->startTransaction()) != 0);
     }
   }
-  g_minkey = &g_sortkeys[0];
-  g_maxkey = &g_sortkeys[g_sortcount - 1];
-  ll1("counted " << g_sortcount << " distinct keys");
+  chkdb(g_con->execute(NdbTransaction::Commit) == 0);
+  g_ndb->closeTransaction(g_con);
+  g_con = 0;
+  g_op = 0;
+
+  // check data and cmp routines
+  chkrc(verifydata() == 0);
+
+  for (uint i = 0; i < g_opts.rows; i++)
+    ll3("load " << i << ": " << g_keys[i]);
+  ll0("loaddata: " << g_opts.rows << " rows");
+  return 0;
 }
 
+// bounds
+
 struct Bnd {
-  Val val;
+  Val m_val;
   /*
    * A bound is a partial key value (0 to g_numattrs attributes).
    * It is not equal to any key value.  Instead, it has a "side".
@@ -757,27 +836,67 @@ struct Bnd {
    * A non-empty bound divides keys into 2 disjoint subsets:
    * keys before (cmp() == -1) and keys after (cmp() == +1).
    */
-  int side;
+  int8 m_side;
+  int8 m_lohi; // 0-lo 1-hi as part of Rng
+  Bnd();
+  bool isempty() const;
+  void copy(const Bnd& bnd2); // does not copy m_lohi
   Bnd& make(uint minattrs);
   Bnd& make(uint minattrs, const Val& theval);
-  int cmp(const Val& val) const;
-  int type(uint lohi, uint colno) const; // for setBound
+  int cmp(const Key& key) const;
+  int type(uint colno) const; // for setBound
+
+private:
+  Bnd& operator=(const Bnd&);
+  Bnd(const Bnd&);
 };
 
 static NdbOut&
 operator<<(NdbOut& out, const Bnd& bnd)
 {
-  out << bnd.val;
-  out << " side: " << bnd.side;
+  if (bnd.m_lohi == 0)
+    out << "L";
+  else if (bnd.m_lohi == 1)
+    out << "H";
+  else
+    out << bnd.m_lohi << "?";
+  out << bnd.m_val;
+  if (bnd.m_side == 0)
+    ;
+  else if (bnd.m_side == -1)
+    out << "-";
+  else if (bnd.m_side == +1)
+    out << "+";
   return out;
 }
 
+Bnd::Bnd()
+{
+  m_side = 0;
+  m_lohi = -1;
+}
+
+bool
+Bnd::isempty() const
+{
+  return m_val.m_numattrs == 0;
+}
+
+void
+Bnd::copy(const Bnd& bnd2)
+{
+  m_val.copy(bnd2.m_val);
+  m_side = bnd2.m_side;
+}
+
 Bnd&
 Bnd::make(uint minattrs)
 {
-  uint numattrs = minattrs + urandom(g_numattrs - minattrs);
-  val.make(numattrs, false);
-  side = val.numattrs == 0 ? 0 : urandom(2) == 0 ? -1 : +1;
+  require(minattrs <= g_opts.attrs);
+  require(m_lohi == 0 || m_lohi == 1);
+  uint numattrs = minattrs + urandom(g_numattrs - minattrs + 1);
+  m_val.make(numattrs, g_lim_bnd);
+  m_side = m_val.m_numattrs == 0 ? 0 : urandom(2) == 0 ? -1 : +1;
   return *this;
 }
 
@@ -785,55 +904,55 @@ Bnd&
 Bnd::make(uint minattrs, const Val& theval)
 {
   uint numattrs = minattrs + urandom(g_numattrs - minattrs);
-  val = theval;
-  val.numattrs = numattrs;
-  side = val.numattrs == 0 ? 0 : urandom(2) == 0 ? -1 : +1;
+  m_val.copy(theval);
+  m_val.m_numattrs = numattrs;
+  m_side = m_val.m_numattrs == 0 ? 0 : urandom(2) == 0 ? -1 : +1;
   return *this;
 }
 
 int
-Bnd::cmp(const Val& theval) const
+Bnd::cmp(const Key& key) const
 {
   int place; // debug
   int ret;
   do {
-    assert(theval.numattrs == (uint) g_numattrs);
-    int k = theval.cmp(val, val.numattrs);
+    int k = key.m_val.cmp(m_val, m_val.m_numattrs);
     if (k != 0) {
       place = 1;
       ret = k;
       break;
     }
-    if (side != 0) {
+    if (m_side != 0) {
       place = 2;
-      ret = -side;
+      ret = (-1) * m_side;
       break;
     }
     place = 3;
     ret = 0;
-    assert(val.numattrs == 0);
+    assert(m_val.m_numattrs == 0);
   } while (0);
-  ll3("cmp: val: " << theval << " bnd: " << *this <<
-      " return: " << ret << " at " << place);
+  ll3("bnd: " << *this << " cmp key: " << key
+      << " ret: " << ret << " place: " << place);
   return ret;
 }
 
 int
-Bnd::type(uint lohi, uint colno) const
+Bnd::type(uint colno) const
 {
   int t;
-  assert(lohi <= 1 && colno < val.numattrs && (side == -1 || side == +1));
-  if (lohi == 0) {
-    if (colno + 1 < val.numattrs)
+  require(colno < m_val.m_numattrs && (m_side == -1 || m_side == +1));
+  require(m_lohi == 0 || m_lohi == 1);
+  if (m_lohi == 0) {
+    if (colno + 1 < m_val.m_numattrs)
       t = 0; // LE
-    else if (side == -1)
+    else if (m_side == -1)
       t = 0; // LE
     else
       t = 1; // LT
   } else {
-    if (colno + 1 < val.numattrs)
+    if (colno + 1 < m_val.m_numattrs)
       t = 2; // GE
-    else if (side == +1)
+    else if (m_side == +1)
       t = 2; // GE
     else
       t = 3; // GT
@@ -841,53 +960,133 @@ Bnd::type(uint lohi, uint colno) const
   return t;
 }
 
-struct Range {
-  Bnd bnd[2];
+// stats values
+
+struct Stval {
+  Uint32 rir_v2;
+  double rir;
+  double rpk[g_numattrs];
+  bool empty;
+  char rule[NdbIndexStat::RuleBufferBytes];
+  Stval();
+};
+
+static NdbOut&
+operator<<(NdbOut& out, const Stval& st)
+{
+  out << "rir_v2: " << st.rir_v2;
+  out << " rir_v4: " << st.rir;
+  out << " rpk:[ ";
+  for (uint k = 0; k < g_opts.attrs; k++) {
+    if (k != 0)
+      out << " ";
+    out << st.rpk[k];
+  }
+  out << " ]";
+  out << " " << (st.empty ? "E" : "N");
+  out << " " << st.rule;
+  return out;
+}
+
+Stval::Stval()
+{
+  rir_v2 = 0;
+  rir = 0.0;
+  for (uint k = 0; k < g_numattrs; k++)
+    rpk[k] = 0.0;
+  empty = false;
+  strcpy(rule, "-");
+}
+
+// ranges
+
+struct Rng {
+  Bnd m_bnd[2];
+  Int32 m_rowcount;
+  // stats v2
+  double errpct;
+  // stats v4
+  Stval m_st_scan; // exact stats computed from keys in range
+  Stval m_st_stat; // interpolated kernel stats via g_is
+  Rng();
   uint minattrs() const;
   uint maxattrs() const;
-  int cmp(const Val& val) const; // -1,0,+1 = key is before,in,after range
-  uint rowcount() const;
   bool iseq() const;
-  // stats
-  bool flag;
-  uint statrows;
-  uint scanrows;
-  double errpct;
+  bool isempty() const;
+  void copy(const Rng& rng2);
+  int cmp(const Key& key) const; // -1,0,+1 = key is before,in,after range
+  uint rowcount() const;
+
+private:
+  Rng& operator=(const Rng&);
+  Rng(const Rng&);
 };
 
 static NdbOut&
-operator<<(NdbOut& out, const Range& range)
+operator<<(NdbOut& out, const Rng& rng)
 {
-  out << "bnd0: " << range.bnd[0] << " bnd1: " << range.bnd[1];
+  out << rng.m_bnd[0] << " " << rng.m_bnd[1];
+  if (rng.m_rowcount != -1)
+    out << " rows: " << rng.m_rowcount;
   return out;
 }
 
+Rng::Rng()
+{
+  m_bnd[0].m_lohi = 0;
+  m_bnd[1].m_lohi = 1;
+  m_rowcount = -1;
+}
+
 uint
-Range::minattrs() const
+Rng::minattrs() const
 {
-  return min(bnd[0].val.numattrs, bnd[1].val.numattrs);
+  return min(m_bnd[0].m_val.m_numattrs, m_bnd[1].m_val.m_numattrs);
 }
 
 uint
-Range::maxattrs() const
+Rng::maxattrs() const
+{
+  return max(m_bnd[0].m_val.m_numattrs, m_bnd[1].m_val.m_numattrs);
+}
+
+bool
+Rng::iseq() const
+{
+  return
+    minattrs() == maxattrs() &&
+    m_bnd[0].m_val.cmp(m_bnd[1].m_val, minattrs()) == 0 &&
+    m_bnd[0].m_side < m_bnd[1].m_side;
+}
+
+bool
+Rng::isempty() const
+{
+  return m_bnd[0].isempty() && m_bnd[1].isempty();
+}
+
+void
+Rng::copy(const Rng& rng2)
 {
-  return max(bnd[0].val.numattrs, bnd[1].val.numattrs);
+  m_bnd[0].copy(rng2.m_bnd[0]);
+  m_bnd[1].copy(rng2.m_bnd[1]);
+  m_rowcount = rng2.m_rowcount;
 }
 
 int
-Range::cmp(const Val& theval) const
+Rng::cmp(const Key& key) const
 {
   int place; // debug
   int ret;
   do  {
     int k;
-    k = bnd[0].cmp(theval);
+    k = m_bnd[0].cmp(key);
     if (k < 0) {
       place = 1;
       ret = -1;
       break;
     }
-    k = bnd[1].cmp(theval);
+    k = m_bnd[1].cmp(key);
     if (k > 0) {
       place = 2;
       ret = +1;
@@ -896,27 +1095,28 @@ Range::cmp(const Val& theval) const
     place = 3;
     ret = 0;
   } while (0);
-  ll3("cmp: val: " << theval << " range: " << *this <<
-      " return: " << ret << " at " << place);
+  ll3("rng: " << *this << " cmp key: " << key
+      << " ret: " << ret << " place: " << place);
   return ret;
 }
 
 uint
-Range::rowcount() const
+Rng::rowcount() const
 {
-  ll2("rowcount: " << *this);
+  ll3("rowcount: " << *this);
   int i;
   // binary search for first and last in range
   int lim[2];
   for (i = 0; i <= 1; i++) {
     ll3("search i=" << i);
     int lo = -1;
-    int hi = (int)g_sortcount;
+    int hi = (int)g_opts.rows;
     int ret;
     int j;
     do {
       j = (hi + lo) / 2;
-      ret = cmp(g_sortkeys[j].val);
+      require(lo < j && j < hi);
+      ret = cmp(g_keys[g_sortkeys[j]]);
       if (i == 0) {
         if (ret < 0)
           lo = j;
@@ -936,157 +1136,337 @@ Range::rowcount() const
     else
       lim[i] = lo;
   }
-  // the range
+
+  // verify is expensive due to makeranges() multiple tries
+  const bool verify = (urandom(10) == 0);
   const int lo = max(lim[0], 0);
-  const int hi = min(lim[1], (int)g_sortcount - 1);
-  if (! g_opts.nochecks) {
-    int curr = -1;
-    for (i = 0; i < (int)g_sortcount; i++) {
-      int k = cmp(g_sortkeys[i].val);
+  const int hi = min(lim[1], (int)g_opts.rows - 1);
+  if (verify) {
+    int pos = -1; // before, within, after
+    for (i = 0; i < (int)g_opts.rows; i++) {
+      int k = cmp(g_keys[g_sortkeys[i]]);
       if (k < 0)
-        assert(i < lo);
+        require(i < lo);
       else if (k == 0)
-        assert(lo <= i && i <= hi);
+        require(lo <= i && i <= hi);
       else
-        assert(i > hi);
-      assert(curr <= k);
-      if (curr < k)
-        curr = k;
+        require(i > hi);
+      require(pos <= k);
+      if (pos < k)
+        pos = k;
     }
   }
-  // sum them up
-  uint count = 0;
-  for (i = lo; i <= hi; i++)
-    count += g_sortkeys[i].count;
-  ll2("count: " << count << " index lim: " << lim[0] << " " << lim[1]);
-  return count;
-}
 
-bool
-Range::iseq() const
-{
-  return
-    minattrs() == maxattrs() &&
-    bnd[0].val.cmp(bnd[1].val, minattrs()) == 0 &&
-    bnd[0].side < bnd[1].side;
+  // result
+  require(hi - lo + 1 >= 0);
+  uint count = hi - lo + 1;
+  ll3("rowcount: " << count << " lim: " << lim[0] << " " << lim[1]);
+  return count;
 }
 
-static Range* g_ranges = 0;
+static Rng* g_rnglist = 0;
 
 static void
 freeranges()
 {
-  if (g_ranges != 0)
-    free(g_ranges);
-  g_ranges = 0;
+  delete [] g_rnglist;
+  g_rnglist = 0;
 }
 
-static int
+static void
 allocranges()
 {
   freeranges();
-  size_t sz = sizeof(Range) * g_opts.ops;
-  g_ranges = (Range*)malloc(sz);
-  chkrc(g_ranges != 0);
-  memset(g_ranges, 0x1f, sz);
-  return 0;
+  g_rnglist = new Rng [g_opts.ops];
+  require(g_rnglist != 0);
 }
 
 static void
 makeranges()
 {
-  uint i;
-  for (i = 0; i < g_opts.ops; i++) {
-    Range& range = g_ranges[i];
-    range.flag = false; // mark for dup generation done
-    bool fulleq = (urandom(100) < g_opts.eqscans);
-    bool eq = fulleq || (urandom(100) < g_opts.eqscans);
-    bool matcheq = eq && (urandom(10) != 0);
-    if (! eq) {
-      // random but prefer non-empty and no more than scanpct
-      do {
-        range.bnd[0].make(0);
-        range.bnd[1].make(0);
-        uint count = range.rowcount();
-        if (count != 0 && 100 * count <= g_opts.scanpct * g_opts.rows)
-          break;
-      } while (urandom(g_opts.tryhard) != 0);
-    } else {
-      uint minattrs = fulleq ? g_numattrs : 1;
-      if (! matcheq) {
-        range.bnd[0].make(minattrs);
+  ll1("makeranges");
+  const uint mintries = 20;
+  const uint maxtries = 80;
+  const uint fudgefac = 10;
+
+  for (uint i = 0; i < g_opts.ops; i++) {
+    const bool eqpart = (urandom(100) < g_opts.eqscans);
+    const bool eqfull = eqpart && (urandom(100) < g_opts.eqscans);
+    Rng rng; // candidate
+    uint j;
+    for (j = 0; j < maxtries; j++) {
+      Rng rng2;
+      if (!eqpart) {
+        rng2.m_bnd[0].make(0);
+        rng2.m_bnd[1].make(0);
       } else {
-        uint m = urandom(g_sortcount);
-        const Val& val = g_sortkeys[m].val;
-        range.bnd[0].make(minattrs, val);
+        const uint mincnt = eqfull ? g_opts.attrs : 1;
+        rng2.m_bnd[0].make(mincnt);
+        rng2.m_bnd[1].copy(rng2.m_bnd[0]);
+        rng2.m_bnd[0].m_side = -1;
+        rng2.m_bnd[1].m_side = +1;
+        require(rng2.iseq());
       }
-      range.bnd[1] = range.bnd[0];
-      range.bnd[0].side = -1;
-      range.bnd[1].side = +1;
-
-      assert(range.iseq());
-    }
-  }
-  for (i = 0; i < g_opts.ops; i++) {
-    Range& range = g_ranges[i];
-    if (range.flag)
-      continue;
-    range.flag = true;
-    if (urandom(100) < g_opts.dupscans) {
-      uint j = urandom(g_opts.ops);
+      rng2.m_rowcount = (Int32)rng2.rowcount();
+      // 0-discard 1-replace or accept 2-accept
+      int action = 0;
       do {
-        Range& dst = g_ranges[j];
-        if (! dst.flag) {
-          dst.bnd[0] = range.bnd[0];
-          dst.bnd[1] = range.bnd[1];
-          dst.flag = true;
+        // first candidate
+        if (rng.m_rowcount == -1) {
+          action = 1;
+          break;
+        }
+        require(rng.m_rowcount != -1);
+        // prefer some bounds
+        if (rng2.isempty()) {
+          if (urandom(fudgefac) != 0)
+            action = 0;
+          else
+            action = 1;
           break;
         }
-      } while (urandom(g_opts.tryhard) != 0);
+        // prefer some rows
+        if (rng2.m_rowcount == 0) {
+          action = 0;
+          break;
+        }
+        // accept if row count under given pct
+        require((uint)rng2.m_rowcount <= g_opts.rows);
+        if (100 * (uint)rng2.m_rowcount <= g_opts.scanpct * g_opts.rows) {
+          if (urandom(fudgefac) != 0) {
+            action = 2;
+            break;
+          }
+        }
+        // replace if less rows
+        if (rng2.m_rowcount < rng.m_rowcount) {
+          if (urandom(fudgefac) != 0) {
+            action = 1;
+            break;
+          }
+        }
+      } while (0);
+      if (action != 0) {
+        rng.copy(rng2);
+        if (action == 2 || j >= mintries)
+          break;
+      }
     }
+    g_rnglist[i].copy(rng);
+    ll2("rng " << i << ": " << rng << " tries: " << j);
   }
 }
 
+// verify ranges via range scans
+
 static int
-setbounds(const Range& range)
+setbounds(const Rng& rng)
 {
   // currently must do each attr in order
-  ll2("setbounds: " << range);
+  ll3("setbounds: " << rng);
   uint i;
-  const Bnd (&bnd)[2] = range.bnd;
-  for (i = 0; i < (uint)g_numattrs; i++) {
+  const Bnd (&bnd)[2] = rng.m_bnd;
+  for (i = 0; i < g_numattrs; i++) {
     const Uint32 no = i; // index attribute number
     uint j;
     int type[2] = { -1, -1 };
     // determine inclusivity (boundtype) of upper+lower bounds on this col.
     // -1 == no bound on the col.
     for (j = 0; j <= 1; j++) {
-      if (no < bnd[j].val.numattrs)
-        type[j] = bnd[j].type(j, no);
+      if (no < bnd[j].m_val.m_numattrs)
+        type[j] = bnd[j].type(no);
     }
     for (j = 0; j <= 1; j++) {
       int t = type[j];
       if (t == -1)
         continue;
-      if (no + 1 < bnd[j].val.numattrs)
+      if (no + 1 < bnd[j].m_val.m_numattrs)
         t &= ~(uint)1; // strict bit is set on last bound only
-      const Val& val = bnd[j].val;
+      const Val& val = bnd[j].m_val;
       const void* addr = 0;
       if (no == 0)
-        addr = (const void*)&val.b; // col a, not nullable
+        addr = ! val.b_null ? (const void*)&val.b : 0;
       else if (no == 1)
-        addr = ! val.c_null ? (const void*)val.c : 0; // col b, nullable
+        addr = ! val.c_null ? (const void*)val.c : 0;
       else if (no == 2)
-        addr = ! val.d_null ? (const void*)&val.d : 0; // col c, nullable
+        addr = ! val.d_null ? (const void*)&val.d : 0;
       else
         assert(false);
-      ll2("setBound attr:" << no << " type:" << t << " val: " << val);
+      ll3("setBound attr:" << no << " type:" << t << " val: " << val);
       chkdb(g_rangescan_op->setBound(no, t, addr) == 0);
     }
   }
   return 0;
 }
 
+static int
+scanrange(const Rng& rng)
+{
+  ll3("scanrange: " << rng);
+  chkdb((g_con = g_ndb->startTransaction()) != 0);
+  chkdb((g_rangescan_op = g_con->getNdbIndexScanOperation(g_ind, g_tab)) != 0);
+  chkdb(g_rangescan_op->readTuples() == 0);
+  chkrc(setbounds(rng) == 0);
+  Uint32 a;
+  char* a_addr = (char*)&a;
+  Uint32 no = 0;
+  chkdb(g_rangescan_op->getValue(no++, a_addr) != 0);
+  chkdb(g_con->execute(NdbTransaction::NoCommit) == 0);
+  uint count = 0;
+  uint i;
+  for (i = 0; i < g_opts.rows; i++) {
+    Key& key = g_keys[i];
+    key.m_flag = false; // not scanned
+  }
+  while (1) {
+    int ret;
+    a = ~(Uint32)0;
+    chkdb((ret = g_rangescan_op->nextResult()) == 0 || ret == 1);
+    if (ret == 1)
+      break;
+    i = (uint)a;
+    chkrc(i < g_opts.rows);
+    Key& key = g_keys[i];
+    ll3("scan: " << key);
+    int k = rng.cmp(key);
+    chkrc(k == 0);
+    chkrc(key.m_flag == false);
+    key.m_flag = true;
+    count++;
+  }
+  g_ndb->closeTransaction(g_con);
+  g_con = 0;
+  g_rangescan_op = 0;
+
+  for (i = 0; i < g_opts.rows; i++) {
+    Key& key = g_keys[i];
+    int k = rng.cmp(key);
+    if (k != 0) // not in range
+      chkrc(key.m_flag == false);
+    else
+      chkrc(key.m_flag == true);
+    key.m_flag = -1; // forget
+  }
+  require((uint)rng.m_rowcount == count);
+  return 0;
+}
+
+static int
+scanranges()
+{
+  ll1("scanranges");
+  for (uint i = 0; i < g_opts.ops; i++) {
+    const Rng& rng = g_rnglist[i];
+    chkrc(scanrange(rng) == 0);
+  }
+  return 0;
+}
+
+// stats v4 update
+
+static int
+definestat()
+{
+  ll1("definestat");
+  require(g_is != 0 && g_ind != 0 && g_tab != 0);
+  chkdb(g_is->set_index(*g_ind, *g_tab) == 0);
+  return 0;
+}
+
+static int
+updatestat()
+{
+  ll1("updatestat");
+  if (urandom(2) == 0) {
+    g_dic = g_ndb->getDictionary();
+    chkdb(g_dic->updateIndexStat(*g_ind, *g_tab) == 0);
+    g_dic = 0;
+  } else {
+    chkdb(g_is->update_stat(g_ndb_sys) == 0);
+  }
+  return 0;
+}
+
+static int
+readstat()
+{
+  ll1("readstat");
+
+  NdbIndexStat::Head head;
+  chkdb(g_is->read_head(g_ndb_sys) == 0);
+  g_is->get_head(head);
+  chkrc(head.m_found == true);
+  chkrc(head.m_sampleVersion != 0);
+  ll1("readstat:"
+      << " sampleVersion: " << head.m_sampleVersion
+      << " sampleCount: " << head.m_sampleCount);
+
+  NdbIndexStat::CacheInfo infoQuery;
+  chkdb(g_is->read_stat(g_ndb_sys) == 0);
+  g_is->move_cache();
+  g_is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
+  ll1("readstat: cache bytes: " << infoQuery.m_totalBytes);
+  return 0;
+}
+
+// stats queries
+
+// exact stats from scan results
+static void
+queryscan(Rng& rng)
+{
+  ll3("queryscan");
+
+  uint rir;
+  uint unq[g_numattrs];
+  rir = 0;
+  for (uint k = 0; k < g_opts.attrs; k++)
+    unq[0] = 0;
+  Key prevkey;
+  for (uint i = 0; i < g_opts.rows; i++) {
+    const Key& key = g_keys[g_sortkeys[i]];
+    int res = rng.cmp(key);
+    if (res != 0)
+      continue;
+    rir++;
+    if (rir == 1) {
+      for (uint k = 0; k < g_opts.attrs; k++)
+        unq[k] = 1;
+    } else {
+      uint num_eq = ~0;
+      int res = prevkey.m_val.cmp(key.m_val, g_opts.attrs, &num_eq);
+      if (res == 0)
+        require(num_eq == g_opts.attrs);
+      else {
+        require(res < 0);
+        require(num_eq < g_opts.attrs);
+        unq[num_eq]++;
+        // propagate down
+        for (uint k = num_eq + 1; k < g_opts.attrs; k++)
+          unq[k]++;
+      }
+    }
+    prevkey.m_val.copy(key.m_val);
+  }
+  require(rng.m_rowcount != -1);
+  require((uint)rng.m_rowcount == rir);
+
+  Stval& st = rng.m_st_scan;
+  st.rir_v2 = rir;
+  st.rir = rir == 0 ? 1.0 : (double)rir;
+  for (uint k = 0; k < g_opts.attrs; k++) {
+    if (rir == 0)
+      st.rpk[k] = 1.0;
+    else {
+      require(rir >= unq[k]);
+      require(unq[k] != 0);
+      st.rpk[k] = (double)rir / (double)unq[k];
+    }
+  }
+  st.empty = (rir == 0);
+  ll2("queryscan: " << st);
+}
+
 /* This method initialises the passed in IndexBound
  * to represent the range passed in.
  * It assumes that the storage pointed to by low_key
@@ -1094,12 +1474,12 @@ setbounds(const Range& range)
  * and is long enough to store the data
  */
 static int
-initialiseIndexBound(const Range& range, 
+initialiseIndexBound(const Rng& rng, 
                      NdbIndexScanOperation::IndexBound& ib)
 {
-  ll2("initialiseIndexBound: " << range);
+  ll3("initialiseIndexBound: " << rng);
   uint i;
-  const Bnd (&bnd)[2] = range.bnd;
+  const Bnd (&bnd)[2] = rng.m_bnd;
   Uint32 colsInBound[2]= {0, 0};
   bool boundInclusive[2]= {false, false};
 
@@ -1107,15 +1487,15 @@ initialiseIndexBound(const Range& range,
   *((char *)ib.low_key) = 
     *((char *)ib.high_key) = 0;
 
-  for (i = 0; i < (uint)g_numattrs; i++) {
+  for (i = 0; i < g_numattrs; i++) {
     const Uint32 no = i; // index attribute number
     uint j;
     int type[2] = { -1, -1 };
     // determine inclusivity (boundtype) of upper+lower bounds on this col.
     // -1 == no bound on the col.
     for (j = 0; j <= 1; j++) {
-      if (no < bnd[j].val.numattrs)
-        type[j] = bnd[j].type(j, no);
+      if (no < bnd[j].m_val.m_numattrs)
+        type[j] = bnd[j].type(no);
     }
     for (j = 0; j <= 1; j++) {
       /* Get ptr to key storage space for this bound */
@@ -1125,38 +1505,39 @@ initialiseIndexBound(const Range& range,
         continue;
       colsInBound[j]++;
 
-      if (no + 1 >= bnd[j].val.numattrs)
+      if (no + 1 >= bnd[j].m_val.m_numattrs)
         // Last column in bound, inclusive if GE or LE (or EQ)
         // i.e. bottom bit of boundtype is clear
         boundInclusive[j]= !(t & 1);
       
-      const Val& val = bnd[j].val;
+      const Val& val = bnd[j].m_val;
       if (no == 0)
-        // b, not nullable
-        keyBuf->m_b= val.b;
+      {
+        if (! val.b_null)
+          keyBuf->m_b= val.b;
+
+        if (g_b_nullable)
+          keyBuf->m_null_bm |= ((val.b_null?1:0) << g_ndbrec_b_nb_offset);
+      }
       else if (no == 1)
       {
-        // c, nullable
         if (! val.c_null)
-          memcpy(&keyBuf->m_c[0],
-                 (const void*)&val.c,
-                 1+ g_charlen);
+          memcpy(&keyBuf->m_c[0], (const void*)&val.c, 1+ g_charlen);
         
-        // Set null bit
-        keyBuf->m_null_bm |= ((val.c_null?1:0) << g_ndbrec_c_nb_offset);
+        if (g_c_nullable)
+          keyBuf->m_null_bm |= ((val.c_null?1:0) << g_ndbrec_c_nb_offset);
       } 
       else if (no == 2)
       {
-        // d, nullable
         if (! val.d_null)
           keyBuf->m_d= val.d;
 
-        // Set null bit
-        keyBuf->m_null_bm |= ((val.d_null?1:0) << g_ndbrec_d_nb_offset);
+        if (g_d_nullable)
+          keyBuf->m_null_bm |= ((val.d_null?1:0) << g_ndbrec_d_nb_offset);
       }
       else
-        assert(false);
-      ll2("initialiseIndexBound attr:" << no << " type:" << t << " val: " << val);
+        require(false);
+      ll3("initialiseIndexBound attr:" << no << " type:" << t << " val: " << val);
     }
   }
 
@@ -1167,32 +1548,24 @@ initialiseIndexBound(const Range& range,
   ib.high_inclusive= boundInclusive[1];
   ib.range_no= 0;
 
-  ll2(" indexBound low_key_count=" << ib.low_key_count << 
+  ll3(" indexBound low_key_count=" << ib.low_key_count << 
       " low_inc=" << ib.low_inclusive <<
       " high_key_count=" << ib.high_key_count <<
       " high_inc=" << ib.high_inclusive);
-  ll2(" low bound b=" << *((Uint16*) &ib.low_key[g_ndbrec_b_offset]) <<
-      " d=" << *((Uint32*) &ib.low_key[g_ndbrec_d_offset]) <<
+  ll3(" low bound b=" << *((Uint32*) &ib.low_key[g_ndbrec_b_offset]) <<
+      " d=" << *((Uint16*) &ib.low_key[g_ndbrec_d_offset]) <<
       " first byte=%xu" << ib.low_key[0]);
-  ll2(" high bound b=" << *((Uint16*) &ib.high_key[g_ndbrec_b_offset]) <<
-      " d=" << *((Uint32*) &ib.high_key[g_ndbrec_d_offset]) <<
+  ll3(" high bound b=" << *((Uint32*) &ib.high_key[g_ndbrec_b_offset]) <<
+      " d=" << *((Uint16*) &ib.high_key[g_ndbrec_d_offset]) <<
       " first byte=%xu" << ib.high_key[0]);  
 
   return 0;
 }
 
 static int
-allocstat()
+querystat_v2(Rng& rng)
 {
-  g_stat = new NdbIndexStat(g_ind);
-  chkdb(g_stat->alloc_cache(32) == 0);
-  return 0;
-}
-
-static int
-runstat(Range& range, int flags)
-{
-  ll2("runstat: " << range << " flags=" << flags);
+  ll3("querystat_v2");
   
   /* Create IndexBound and key storage space */
   char keySpace[2][g_ndbrecord_bytes];
@@ -1201,283 +1574,445 @@ runstat(Range& range, int flags)
   ib.high_key= keySpace[1];
 
   chkdb((g_con = g_ndb->startTransaction()) != 0);
-  chkrc(initialiseIndexBound(range, ib) == 0);
+  chkrc(initialiseIndexBound(rng, ib) == 0);
 
   Uint64 count = ~(Uint64)0;
-  chkdb(g_stat->records_in_range(g_ind, 
+  chkdb(g_is->records_in_range(g_ind, 
                                  g_con,
                                  g_ind_rec,
                                  g_tab_rec,
                                  &ib,
-                                 g_opts.rows, 
+                                 0,
                                  &count, 
-                                 flags) == 0);
+                                 0) == 0);
   g_ndb->closeTransaction(g_con);
   g_con = 0;
   g_rangescan_op = 0;
-  range.statrows = (uint)count;
-  chkrc((Uint64)range.statrows == count);
-  ll2("stat: " << range.statrows);
+
+  Stval& st = rng.m_st_stat;
+  chkrc(count < (1 << 30));
+  st.rir_v2 = count;
+  ll2("querystat_v2: " << st.rir_v2 << " rows");
   return 0;
 }
 
 static int
-runscan(Range& range)
+querystat(Rng& rng)
 {
-  ll2("runscan: " << range);
-  chkdb((g_con = g_ndb->startTransaction()) != 0);
-  chkdb((g_rangescan_op = g_con->getNdbIndexScanOperation(g_ind, g_tab)) != 0);
-  chkdb(g_rangescan_op->readTuples() == 0);
-  chkrc(setbounds(range) == 0);
-  Uint32 a;
-  char* a_addr = (char*)&a;
-  Uint32 no = 0;
-  chkdb(g_rangescan_op->getValue(no++, a_addr) != 0);
-  chkdb(g_con->execute(NdbTransaction::NoCommit) == 0);
-  uint count = 0;
-  uint i;
-  for (i = 0; i < g_opts.rows; i++)
-    g_keys[i].count = 0;
-  while (1) {
-    int ret;
-    a = ~(Uint32)0;
-    chkdb((ret = g_rangescan_op->nextResult()) == 0 || ret == 1);
-    if (ret == 1)
-      break;
-    i = (uint)a;
-    chkrc(i < g_opts.rows);
-    Key& key = g_keys[i];
-    ll3("scan: " << key);
-    int k = range.cmp(key.val);
-    chkrc(k == 0);
-    chkrc(key.count == 0);
-    key.count++;
-    count++;
-  }
-  g_ndb->closeTransaction(g_con);
-  g_con = 0;
-  g_rangescan_op = 0;
-  if (! g_opts.nochecks) {
-    for (i = 0; i < g_opts.rows; i++) {
-      const Key& key = g_keys[i];
-      int k = range.cmp(key.val);
-      assert((k != 0 && key.count == 0) || (k == 0 && key.count == 1));
-    }
-    assert(range.rowcount() == count);
+  ll3("querystat");
+
+  // set up range
+  Uint8 bound_lo_buffer[NdbIndexStat::BoundBufferBytes];
+  Uint8 bound_hi_buffer[NdbIndexStat::BoundBufferBytes];
+  NdbIndexStat::Bound bound_lo(g_is, bound_lo_buffer);
+  NdbIndexStat::Bound bound_hi(g_is, bound_hi_buffer);
+  NdbIndexStat::Range range(bound_lo, bound_hi);
+
+  // convert to IndexBound (like in mysqld)
+  NdbIndexScanOperation::IndexBound ib;
+  char keySpace[2][g_ndbrecord_bytes];
+  ib.low_key = keySpace[0];
+  ib.high_key = keySpace[1];
+  chkrc(initialiseIndexBound(rng, ib) == 0);
+  chkrc(g_is->convert_range(range, g_ind_rec, &ib) == 0);
+
+  // index stat query
+  Uint8 stat_buffer[NdbIndexStat::StatBufferBytes];
+  NdbIndexStat::Stat stat(stat_buffer);
+  chkdb(g_is->query_stat(range, stat) == 0);
+
+  // save result
+  Stval& st = rng.m_st_stat;
+  g_is->get_rir(stat, &st.rir);
+  for (uint k = 0; k < g_opts.attrs; k++) {
+    g_is->get_rpk(stat, k, &st.rpk[k]);
   }
-  range.scanrows = count;
-  ll2("scan: " << range.scanrows);
+  g_is->get_empty(stat, &st.empty);
+  g_is->get_rule(stat, st.rule);
+
+  ll2("querystat: " << st);
   return 0;
 }
 
 static int
-runscans()
+queryranges()
 {
-  uint i;
-  for (i = 0; i < g_opts.ops; i++) {
-    Range& range = g_ranges[i];
-    ll1("range " << i << ": " << range);
-    // simulate old handler code
-    int flags = 0;
-    if (i < 32 || i % 20 == 0)
-      flags |= NdbIndexStat::RR_UseDb;
-    chkrc(runstat(range, flags) == 0);
-    chkrc(runscan(range) == 0);
-    // if stat is 0 then it is exact scan count
-    chkrc(range.statrows != 0 || range.scanrows == 0);
-    // measure error as fraction of total rows
-    double x = (double)range.statrows;
-    double y = (double)range.scanrows;
-    double z = (double)g_opts.rows;
-    double err = (x - y) / z;
-    // report in pct
-    range.errpct = 100.0 * err;
-    ll1("range " << i << ":" <<
-        " stat: " << range.statrows << " scan: " << range.scanrows <<
-        " errpct: " << range.errpct);
+  ll2("queryranges");
+  for (uint i = 0; i < g_opts.ops; i++) {
+    Rng& rng = g_rnglist[i];
+    ll1("rng " << i << ": " << rng);
+    // exact stats
+    queryscan(rng);
+    // interpolated stats
+    chkrc(querystat_v2(rng) == 0);
+    chkrc(querystat(rng) == 0);
+    const Stval& st1 = rng.m_st_scan;
+    const Stval& st2 = rng.m_st_stat;
+    // if rir v2 is zero then it is exact
+    chkrc(st2.rir_v2 != 0 || st1.rir_v2 == 0);
   }
   return 0;
 }
 
-struct Stat {
-  const char* name;
-  uint count;
-  double sum;
-  double minval;
-  double maxval;
-  double avg;
-  double varsum;
-  double var;
-  double stddev;
-  void init();
-  void add(const Stat& stat);
+// general statistics methods
+
+struct Stats : public NDBT_Stats {
+  Stats();
+  void add(double x2);
+  void add(const Stats& sum2);
 };
 
-void 
-Stat::init()
+static NdbOut&
+operator<<(NdbOut& out, const Stats& st)
+{
+  out << "count: " << st.getCount()
+      << " min: " << st.getMin()
+      << " max: " << st.getMax()
+      << " mean: " << st.getMean()
+      << " stddev: " << st.getStddev();
+  return out;
+}
+
+Stats::Stats()
 {
-  name = "stat";
-  count = 0;
-  sum = minval = maxval = avg = varsum = var = stddev = 0.0;
 }
 
 void
-Stat::add(const Stat& stat)
+Stats::add(double x2)
 {
-  if (count == 0) {
-    *this = stat;
-    return;
-  }
-  Stat tmp = *this;
-  tmp.count = count + stat.count;
-  tmp.sum = sum + stat.sum;
-  tmp.minval = minval <= stat.minval ? minval : stat.minval;
-  tmp.maxval = maxval >= stat.maxval ? maxval : stat.maxval;
-  tmp.avg = tmp.sum / double(tmp.count);
-  tmp.varsum = varsum + stat.varsum;
-  tmp.var = tmp.varsum / double(tmp.count);
-  tmp.stddev = sqrt(tmp.var);
-  *this = tmp;
+  addObservation(x2);
 }
 
+void
+Stats::add(const Stats& st2)
+{
+  *this += st2;
+}
+
+// error statistics scan vs stat
+
+struct Sterr {
+  Stats rir_v2;
+  Stats rir;
+  Stats rpk[g_numattrs];
+  Sterr();
+  void add(const Sterr& st2);
+};
+
 static NdbOut&
-operator<<(NdbOut& out, const Stat& stat)
+operator<<(NdbOut& out, const Sterr& st)
 {
-  out << stat.name << ": " << "count: " << stat.count
-      << " min: " << stat.minval << " max: " << stat.maxval
-      << " avg: " << stat.avg << " stddev: " << stat.stddev;
+  out << "rir_v2: " << st.rir_v2 << endl;
+  out << "rir_v4: " << st.rir;
+  for (uint k = 0; k < g_opts.attrs; k++) {
+    out << endl << "rpk[" << k << "]: " << st.rpk[k];
+  }
   return out;
 }
 
-template <class T, class V>
+Sterr::Sterr()
+{
+}
+
+void
+Sterr::add(const Sterr& st2)
+{
+  rir_v2.add(st2.rir_v2);
+  rir.add(st2.rir);
+  for (uint k = 0; k < g_opts.attrs; k++) {
+    rpk[k].add(st2.rpk[k]);
+  }
+}
+
 static void
-computestat(Stat& stat)
+sumrange(const Rng& rng, Sterr& st)
 {
-  stat.init();
-  stat.name = V::name();
-  const T* array = V::array();
-  stat.count = V::count();
-  assert(stat.count != 0);
-  uint i;
-  for (i = 0; i < stat.count; i++) {
-    const T& item = array[i];
-    double data = V::data(item);
-    stat.sum += data;
-    if (i == 0)
-      stat.minval = stat.maxval = data;
-    else {
-      if (stat.minval > data)
-        stat.minval = data;
-      if (stat.maxval < data)
-        stat.maxval = data;
-    }
-  }
-  stat.avg = stat.sum / double(stat.count);
-  stat.varsum = 0.0;
-  for (i = 0; i < stat.count; i++) {
-    const T& item = array[i];
-    double data = V::data(item);
-    double x = data - stat.avg;
-    stat.varsum += x * x;
-  }
-  stat.var = stat.varsum / double(stat.count);
-  stat.stddev = sqrt(stat.var);
-}
-
-struct V_rpk {
-  static const char* name() { return "rec per key"; }
-  static const Key* array() { return g_sortkeys; }
-  static uint count() { return g_sortcount; }
-  static double data(const Key& key) { return (double)key.rpk; }
-};
+  const Stval& st1 = rng.m_st_scan;
+  const Stval& st2 = rng.m_st_stat;
 
-struct V_rir {
-  static const char* name() { return "rir err pct"; }
-  static const Range* array() { return g_ranges; }
-  static uint count() { return g_opts.ops; }
-  static double data(const Range& range) { return (double)range.errpct; }
-};
+  // rir_v2 error as pct of total rows
+  {
+    double rows = (double)g_opts.rows;
+    double x1 = (double)st1.rir_v2;
+    double x2 = (double)st2.rir_v2;
+    double x3 = 100.0 * (x2 - x1) / rows;
+    st.rir_v2.add(x3);
+  }
 
-template void computestat<Key, V_rpk>(Stat& stat);
-template void computestat<Range, V_rir>(Stat& stat);
+  // rir error as pct of total rows
+  {
+    double rows = (double)g_opts.rows;
+    double x1 = st1.rir;
+    double x2 = st2.rir;
+    double x3 = 100.0 * (x2 - x1) / rows;
+    st.rir.add(x3);
+  }
 
-static Stat g_stat_rpk; // summaries over loops
-static Stat g_stat_rir;
+  // rpk errors as plain diff
+  for (uint k = 0; k < g_opts.attrs; k++) {
+    double x1 = st1.rpk[k];
+    double x2 = st2.rpk[k];
+    double x3 = (x2 - x1);
+    st.rpk[k].add(x3);
+  }
+}
+
+static void
+sumranges(Sterr& st)
+{
+  for (uint i = 0; i < g_opts.ops; i++) {
+    const Rng& rng = g_rnglist[i];
+    sumrange(rng, st);
+  }
+}
+
+// loop and final stats
+
+static Sterr g_sterr;
 
 static void
 loopstats()
 {
-  Stat stat_rpk; // records per key
-  Stat stat_rir; // record in range
-  if (g_loop == 0) {
-    g_stat_rpk.init();
-    g_stat_rir.init();
-  }
-  computestat<Key, V_rpk>(stat_rpk);
-  computestat<Range, V_rir>(stat_rir);
-  if (g_opts.loop != 1) {
+  Sterr st;
+  sumranges(st);
+  if (g_opts.loops != 1) {
     ll0("=== loop " << g_loop << " summary ===");
-    ll0(stat_rpk);
-    ll0(stat_rir);
+    ll0(st);
   }
   // accumulate
-  g_stat_rpk.add(stat_rpk);
-  g_stat_rir.add(stat_rir);
+  g_sterr.add(st);
 }
 
-static void
-finalstats()
+static int
+loopdumps()
 {
-  ll0("=== summary ===");
-  ll0(g_stat_rpk);
-  ll0(g_stat_rir);
+  char file[200];
+  if (g_opts.dump == 0)
+    return 0;
+  {
+    BaseString::snprintf(file, sizeof(file),
+                         "%s.key.%d", g_opts.dump, g_loop);
+    FILE* f = 0;
+    chker((f = fopen(file, "w")) != 0);
+    fprintf(f, "a");
+    for (uint k = 0; k < g_opts.attrs; k++) {
+      if (k == 0)
+        fprintf(f, ",b_null,b");
+      else if (k == 1)
+        fprintf(f, ",c_null,c");
+      else if (k == 2)
+        fprintf(f, ",d_null,d");
+      else
+        require(false);
+    }
+    fprintf(f, "\n");
+    for (uint i = 0; i < g_opts.rows; i++) {
+      const Key& key = g_keys[g_sortkeys[i]];
+      const Val& val = key.m_val;
+      fprintf(f, "%u", i);
+      for (uint k = 0; k < g_opts.attrs; k++) {
+        if (k == 0) {
+          fprintf(f, ",%d,", val.b_null);
+          if (!val.b_null)
+            fprintf(f, "%u", val.b);
+        } else if (k == 1) {
+          fprintf(f, ",%d,", val.c_null);
+          if (!val.c_null)
+            fprintf(f, "%.*s", val.c[0], &val.c[1]);
+        } else if (k == 2) {
+          fprintf(f, ",%d,", val.d_null);
+          if (!val.d_null)
+            fprintf(f, "%u", val.d);
+        } else {
+          require(false);
+        }
+      }
+      fprintf(f, "\n");
+    }
+    chker(fclose(f) == 0);
+  }
+  {
+    BaseString::snprintf(file, sizeof(file),
+                         "%s.range.%d", g_opts.dump, g_loop);
+    FILE* f = 0;
+    chker((f = fopen(file, "w")) != 0);
+    fprintf(f, "op");
+    for (uint j = 0; j <= 1; j++) {
+      const char* suf = (j == 0 ? "_lo" : "_hi");
+      fprintf(f, ",attrs%s", suf);
+      for (uint k = 0; k < g_opts.attrs; k++) {
+        if (k == 0)
+          fprintf(f, ",b_null%s,b%s", suf, suf);
+        else if (k == 1)
+          fprintf(f, ",c_null%s,c%s", suf, suf);
+        else if (k == 2)
+          fprintf(f, ",d_null%s,d%s", suf, suf);
+        else
+          require(false);
+      }
+      fprintf(f, ",side%s", suf);
+    }
+    fprintf(f, "\n");
+    for (uint i = 0; i < g_opts.ops; i++) {
+      const Rng& rng = g_rnglist[i];
+      fprintf(f, "%u", i);
+      for (uint j = 0; j <= 1; j++) {
+        const Bnd& bnd = rng.m_bnd[j];
+        const Val& val = bnd.m_val;
+        fprintf(f, ",%u", val.m_numattrs);
+        for (uint k = 0; k < g_opts.attrs; k++) {
+          if (k >= val.m_numattrs)
+            fprintf(f, ",,");
+          else if (k == 0) {
+            fprintf(f, ",%d,", val.b_null);
+            if (!val.b_null)
+              fprintf(f, "%u", val.b);
+          } else if (k == 1) {
+            fprintf(f, ",%d,", val.c_null);
+            if (!val.c_null)
+              fprintf(f, "%.*s", val.c[0], &val.c[1]);
+          } else if (k == 2) {
+            fprintf(f, ",%d,", val.d_null);
+            if (!val.d_null)
+              fprintf(f, "%u", val.d);
+          } else {
+            require(false);
+          }
+        }
+        fprintf(f, ",%d", bnd.m_side);
+      }
+      fprintf(f, "\n");
+    }
+    chker(fclose(f) == 0);
+  }
+  {
+    BaseString::snprintf(file, sizeof(file),
+                         "%s.stat.%d", g_opts.dump, g_loop);
+    FILE* f = 0;
+    chker((f = fopen(file, "w")) != 0);
+    fprintf(f, "op");
+    for (uint j = 0; j <= 1; j++) {
+      const char* suf = (j == 0 ? "_scan" : "_stat");
+      fprintf(f, ",rir_v2%s", suf);
+      fprintf(f, ",rir%s", suf);
+      for (uint k = 0; k < g_opts.attrs; k++) {
+        fprintf(f, ",rpk_%u%s", k, suf);
+      }
+      fprintf(f, ",empty%s", suf);
+      if (j == 1)
+        fprintf(f, ",rule%s", suf);
+    }
+    fprintf(f, "\n");
+    for (uint i = 0; i < g_opts.ops; i++) {
+      const Rng& rng = g_rnglist[i];
+      fprintf(f, "%u", i);
+      for (uint j = 0; j <= 1; j++) {
+        const Stval& st = (j == 0 ? rng.m_st_scan : rng.m_st_stat);
+        fprintf(f, ",%u", st.rir_v2);
+        fprintf(f, ",%.2f", st.rir);
+        for (uint k = 0; k < g_opts.attrs; k++) {
+          fprintf(f, ",%.2f", st.rpk[k]);
+        }
+        fprintf(f, ",%d", st.empty);
+        if (j == 1)
+          fprintf(f, ",%s", st.rule);
+      }
+      fprintf(f, "\n");
+    }
+    chker(fclose(f) == 0);
+  }
+  return 0;
 }
 
 static void
-setseed(int n)
+finalstats()
 {
-  uint seed;
-  if (n == -1) {
-    if (g_opts.seed == 0)
-      return;
-    if (g_opts.seed != (uint) -1)
-      seed = (uint)g_opts.seed;
-    else
-      seed = 1 + (ushort)getpid();
-  } else {
-    if (g_opts.seed != 0)
-      return;
-    seed = n;
-  }
-  ll0("seed=" << seed);
-  srandom(seed);
+  ll0("=== summary ===");
+  ll0(g_sterr);
 }
 
 static int
 runtest()
 {
-  setseed(-1);
+  ll1("sizeof Val: " << sizeof(Val));
+  ll1("sizeof Key: " << sizeof(Key));
+  ll1("sizeof Bnd: " << sizeof(Bnd));
+  ll1("sizeof Rng: " << sizeof(Rng));
+
+  uint seed = g_opts.seed;
+  if (seed != 1) { // not loop number
+    if (seed == 0) { // random
+      seed = 2 + (ushort)getpid();
+    }
+    ll0("random seed is " << seed);
+    srandom(seed);
+  } else {
+    ll0("random seed is " << "loop number");
+  }
   g_cs = get_charset_by_name(g_csname, MYF(0));
   if (g_cs == 0)
     g_cs = get_charset_by_csname(g_csname, MY_CS_PRIMARY, MYF(0));
   chkrc(g_cs != 0);
-  for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) {
+
+  allockeys();
+  allocranges();
+  chkrc(createtable() == 0);
+  chkrc(createindex() == 0);
+  chkrc(createNdbRecords() == 0);
+  chkrc(definestat() == 0);
+
+  for (g_loop = 0; g_opts.loops == 0 || g_loop < g_opts.loops; g_loop++) {
     ll0("=== loop " << g_loop << " ===");
-    setseed(g_loop);
-    chkrc(createtable() == 0);
-    chkrc(loaddata() == 0);
-    sortkeys();
-    chkrc(allocranges() == 0);
+    uint seed = g_opts.seed;
+    if (seed == 1) { // loop number
+      seed = g_loop;
+      srandom(seed);
+    }
+    makekeys();
+    chkrc(loaddata(g_loop != 0) == 0);
     makeranges();
-    chkrc(allocstat() == 0);
-    chkrc(runscans() == 0);
-    chkrc(droptable() == 0);
+    chkrc(scanranges() == 0);
+    chkrc(updatestat() == 0);
+    chkrc(readstat() == 0);
+    chkrc(queryranges() == 0);
     loopstats();
+    chkrc(loopdumps() == 0);
   }
   finalstats();
+
+  if (!g_opts.keeptable)
+    chkrc(droptable() == 0);
+  freeranges();
+  freekeys();
   return 0;
 }
 
+static int
+doconnect()
+{
+  g_ncc = new Ndb_cluster_connection();
+  require(g_ncc != 0);
+  chkdb(g_ncc->connect(30) == 0);
+  g_ndb = new Ndb(g_ncc, "TEST_DB");
+  require(g_ndb != 0);
+  chkdb(g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0);
+  g_ndb_sys = new Ndb(g_ncc, "mysql");
+  require(g_ndb_sys != 0);
+  chkdb(g_ndb_sys->init() == 0 && g_ndb_sys->waitUntilReady(30) == 0);
+  g_is = new NdbIndexStat;
+  require(g_is != 0);
+  return 0;
+}
+
+static void
+dodisconnect()
+{
+  delete g_is;
+  delete g_ndb_sys;
+  delete g_ndb;
+  delete g_ncc;
+}
+
 static struct my_option
 my_long_options[] =
 {
@@ -1486,112 +2021,122 @@ my_long_options[] =
     "Logging level in this program 0-3 (default 0)",
     (uchar **)&g_opts.loglevel, (uchar **)&g_opts.loglevel, 0,
     GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
-  { "seed", NDB_OPT_NOSHORT, "Random seed (0=loop number, default -1=random)",
+  { "seed", NDB_OPT_NOSHORT, "Random seed (default 0=random, 1=loop number)",
     (uchar **)&g_opts.seed, (uchar **)&g_opts.seed, 0,
-    GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 },
-  { "loop", NDB_OPT_NOSHORT, "Number of test loops (default 1, 0=forever)",
-    (uchar **)&g_opts.loop, (uchar **)&g_opts.loop, 0,
+    GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+  { "loops", NDB_OPT_NOSHORT, "Number of test loops (default 1, 0=forever)",
+    (uchar **)&g_opts.loops, (uchar **)&g_opts.loops, 0,
     GET_INT, REQUIRED_ARG, 1, 0, 0, 0, 0, 0 },
-  { "rows", NDB_OPT_NOSHORT, "Number of rows (default 100000)",
+  { "rows", NDB_OPT_NOSHORT, "Number of rows (default 10000)",
     (uchar **)&g_opts.rows, (uchar **)&g_opts.rows, 0,
     GET_UINT, REQUIRED_ARG, 100000, 0, 0, 0, 0, 0 },
-  { "ops", NDB_OPT_NOSHORT,"Number of index scans per loop (default 1000)",
+  { "ops", NDB_OPT_NOSHORT,"Number of index scans per loop (default 100)",
     (uchar **)&g_opts.ops, (uchar **)&g_opts.ops, 0,
     GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 },
-  { "dupkeys", NDB_OPT_NOSHORT, "Pct records per key (min 100, default 1000)",
-    (uchar **)&g_opts.dupkeys, (uchar **)&g_opts.dupkeys, 0,
-    GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 },
-  { "scanpct", NDB_OPT_NOSHORT,
-    "Preferred max pct of total rows per scan (default 5)",
-    (uchar **)&g_opts.scanpct, (uchar **)&g_opts.scanpct, 0,
-    GET_UINT, REQUIRED_ARG, 5, 0, 0, 0, 0, 0 },
   { "nullkeys", NDB_OPT_NOSHORT, "Pct nulls in each key attribute (default 10)",
     (uchar **)&g_opts.nullkeys, (uchar **)&g_opts.nullkeys, 0,
     GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
+  { "rpk", NDB_OPT_NOSHORT, "Avg records per full key (default 10)",
+    (uchar **)&g_opts.rpk, (uchar **)&g_opts.rpk, 0,
+    GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
+  { "rpkvar", NDB_OPT_NOSHORT, "Vary rpk by factor (default 10, none 1)",
+    (uchar **)&g_opts.rpkvar, (uchar **)&g_opts.rpkvar, 0,
+    GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
+  { "scanpct", NDB_OPT_NOSHORT,
+    "Preferred max pct of total rows per scan (default 10)",
+    (uchar **)&g_opts.scanpct, (uchar **)&g_opts.scanpct, 0,
+    GET_UINT, REQUIRED_ARG, 5, 0, 0, 0, 0, 0 },
   { "eqscans", NDB_OPT_NOSHORT,
-    "Pct scans for partial/full equality (default 50)",
+    "Pct scans for partial/full equality (default 30)",
     (uchar **)&g_opts.eqscans, (uchar **)&g_opts.eqscans, 0,
     GET_UINT, REQUIRED_ARG, 50, 0, 0, 0, 0, 0 },
-  { "dupscans", NDB_OPT_NOSHORT, "Pct scans using same bounds (default 10)",
-    (uchar **)&g_opts.dupscans, (uchar **)&g_opts.dupscans, 0,
-    GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
   { "keeptable", NDB_OPT_NOSHORT,
-    "Use existing table and data if any and do not drop",
+    "Do not drop table at exit",
     (uchar **)&g_opts.keeptable, (uchar **)&g_opts.keeptable, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "no-extra-checks", NDB_OPT_NOSHORT, "Omit expensive consistency checks",
-    (uchar **)&g_opts.nochecks, (uchar **)&g_opts.nochecks, 0,
-    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
-  { "abort-on-error", NDB_OPT_NOSHORT, "Dump core on any error",
+  { "abort", NDB_OPT_NOSHORT, "Dump core on any error",
     (uchar **)&g_opts.abort, (uchar **)&g_opts.abort, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "dump", NDB_OPT_NOSHORT, "Write CSV files name.* of keys,ranges,stats",
+    (uchar **)&g_opts.dump, (uchar **)&g_opts.dump, 0,
+    GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
   { 0, 0, 0,
     0, 0, 0,
     GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }
 };
 
-#if 0
+const char*
+load_default_groups[] = { "mysql_cluster", 0 };
+
+static void
+short_usage_sub()
+{
+  ndb_short_usage_sub(NULL);
+}
+
 static void
 usage()
 {
-  ndbout
-    << g_progname
-    << ": measure records_in_range error as percentage of total rows" << endl;
-  my_print_help(my_long_options);
+  ndbout << my_progname << ": ordered index stats test" << endl;
+  ndb_usage(short_usage_sub, load_default_groups, my_long_options);
 }
-#endif
 
 static int
 checkoptions()
 {
   chkrc(g_opts.rows != 0);
   chkrc(g_opts.nullkeys <= 100);
-  chkrc(g_opts.dupkeys >= 100);
+  chkrc(g_opts.rpk != 0);
+  g_opts.rpk = min(g_opts.rpk, g_opts.rows);
+  chkrc(g_opts.rpkvar != 0);
   chkrc(g_opts.scanpct <= 100);
   chkrc(g_opts.eqscans <= 100);
-  chkrc(g_opts.dupscans <= 100);
+  // set value limits
+  g_lim_val.all_nullable = false;
+  g_lim_bnd.all_nullable = true;
+  g_lim_val.b_min = g_opts.rows;
+  g_lim_val.b_max = 2 * g_opts.rows;
+  g_lim_bnd.b_min = 90 * g_lim_val.b_min / 100;
+  g_lim_bnd.b_max = 110 * g_lim_val.b_max / 100;
+  g_lim_val.c_char = "bcd";
+  g_lim_bnd.c_char = "abcde";
+  g_lim_val.d_min = 100;
+  g_lim_val.d_max = 200;
+  g_lim_bnd.d_min = 0;
+  g_lim_bnd.d_max = 300;
   return 0;
 }
 
-static int
-doconnect()
-{
-  g_ncc = new Ndb_cluster_connection();
-  chkdb(g_ncc->connect(30) == 0);
-  g_ndb = new Ndb(g_ncc, "TEST_DB");
-  chkdb(g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0);
-  return 0;
-}
-
-static void
-freeall()
-{
-  delete g_stat;
-  freekeys();
-  freeranges();
-  delete g_ndb;
-  delete g_ncc;
-}
-
 int
 main(int argc, char** argv)
 {
   ndb_init();
-  const char* g_progname =
-    strchr(argv[0], '/') ? strrchr(argv[0], '/') + 1 : argv[0];
+  my_progname = strchr(argv[0], '/') ? strrchr(argv[0], '/') + 1 : argv[0];
   uint i;
-  ndbout << g_progname;
+  ndbout << my_progname;
   for (i = 1; i < (uint)argc; i++)
     ndbout << " " << argv[i];
   ndbout << endl;
   int ret;
+  ndb_opt_set_usage_funcs(short_usage_sub, usage);
   ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option);
-  if (ret != 0 || argc != 0)
+  if (ret != 0 || argc != 0) {
+    ll0("wrong args");
+    return NDBT_ProgramExit(NDBT_WRONGARGS);
+  }
+  if (checkoptions() == -1) {
+    ll0("invalid args");
     return NDBT_ProgramExit(NDBT_WRONGARGS);
-  if (checkoptions() == 0 && doconnect() == 0 && runtest() == 0) {
-    freeall();
-    return NDBT_ProgramExit(NDBT_OK);
   }
-  freeall();
-  return NDBT_ProgramExit(NDBT_FAILED);
+  if (doconnect() == -1) {
+    ll0("connect failed");
+    return NDBT_ProgramExit(NDBT_FAILED);
+  }
+  if (runtest() == -1) {
+    ll0("test failed");
+    dodisconnect();
+    return NDBT_ProgramExit(NDBT_FAILED);
+  }
+  dodisconnect();
+  return NDBT_ProgramExit(NDBT_OK);
 }

=== modified file 'storage/ndb/tools/Makefile.am'
--- a/storage/ndb/tools/Makefile.am	2011-05-24 11:51:39 +0000
+++ b/storage/ndb/tools/Makefile.am	2011-06-06 12:18:27 +0000
@@ -31,7 +31,8 @@ ndbtools_PROGRAMS = \
   ndb_show_tables \
   ndb_select_all \
   ndb_select_count \
-  ndb_restore ndb_config
+  ndb_restore ndb_config \
+  ndb_index_stat
 
 tools_common_sources = ../test/src/NDBT_ReturnCodes.cpp \
                        ../test/src/NDBT_Table.cpp \
@@ -76,6 +77,8 @@ ndbinfo.sql: $(ndbinfo_sql_SOURCES)
 	./ndbinfo_sql$(EXEEXT) > $@-t
 	$(MV) $@-t $@
 
+ndb_index_stat_SOURCES = ndb_index_stat.cpp $(tools_common_sources)
+
 include $(top_srcdir)/storage/ndb/config/common.mk.am
 include $(top_srcdir)/storage/ndb/config/type_ndbapitools.mk.am
 
@@ -91,4 +94,5 @@ ndb_select_count_LDFLAGS = @ndb_bin_am_l
 ndb_restore_LDFLAGS = @ndb_bin_am_ldflags@
 ndb_config_LDFLAGS = @ndb_bin_am_ldflags@
 ndbinfo_sql_LDFLAGS = @ndb_bin_am_ldflags@
+ndb_index_stat_LDFLAGS = @ndb_bin_am_ldflags@
 

=== added file 'storage/ndb/tools/ndb_index_stat.cpp'
--- a/storage/ndb/tools/ndb_index_stat.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/tools/ndb_index_stat.cpp	2011-06-06 12:18:27 +0000
@@ -0,0 +1,632 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include <ndb_global.h>
+#include <ndb_opts.h>
+
+#include <NdbOut.hpp>
+#include <NdbApi.hpp>
+#include <NDBT.hpp>
+#include <NdbIndexStatImpl.hpp>
+
+// stats options
+static const char* _dbname = 0;
+static my_bool _delete = false;
+static my_bool _update = false;
+static my_bool _dump = false;
+static int _query = 0;
+static int _stats_any = 0;
+// sys options
+static my_bool _sys_drop = false;
+static my_bool _sys_create = false;
+static my_bool _sys_create_if_not_exist = false;
+static my_bool _sys_create_if_not_valid = false;
+static my_bool _sys_check = false;
+static int _sys_any = 0;
+// other
+static my_bool _verbose = false;
+static int _loops = 1;
+
+static Ndb_cluster_connection* g_ncc = 0;
+static Ndb* g_ndb = 0;
+static Ndb* g_ndb_sys = 0;
+static NdbDictionary::Dictionary* g_dic = 0;
+static NdbIndexStat* g_is = 0;
+
+static const char* g_tabname = 0;
+static const NdbDictionary::Table* g_tab = 0;
+static int g_indcount = 0;
+static const char** g_indnames = 0;
+static const NdbDictionary::Index** g_indlist = 0;
+// current index in loop
+static const char* g_indname = 0;
+static const NdbDictionary::Index* g_ind = 0;
+
+#define CHK1(b) \
+  if (!(b)) { \
+    ret = -1; \
+    break; \
+  }
+
+#define CHK2(b, e) \
+  if (!(b)) { \
+    g_err << "ERR: " << #b << " failed at line " << __LINE__ \
+          << ": " << e << endl; \
+    ret = -1; \
+    break; \
+  }
+
+static const NdbError&
+getNdbError(Ndb_cluster_connection* ncc)
+{
+  static NdbError err;
+  err.code = g_ncc->get_latest_error();
+  err.message = g_ncc->get_latest_error_msg();
+  return err;
+}
+
+static int
+doconnect()
+{
+  int ret = 0;
+  do
+  {
+    g_ncc = new Ndb_cluster_connection(opt_ndb_connectstring);
+    CHK2(g_ncc->connect(6, 5) == 0, getNdbError(g_ncc));
+    CHK2(g_ncc->wait_until_ready(30, 10) == 0, getNdbError(g_ncc));
+
+    g_ndb = new Ndb(g_ncc, _dbname);
+    CHK2(g_ndb->init() == 0, g_ndb->getNdbError());
+    CHK2(g_ndb->waitUntilReady(30) == 0, g_ndb->getNdbError());
+
+    g_dic = g_ndb->getDictionary();
+
+    g_ndb_sys = new Ndb(g_ncc, "mysql");
+    CHK2(g_ndb_sys->init() == 0, g_ndb_sys->getNdbError());
+    CHK2(g_ndb_sys->waitUntilReady(30) == 0, g_ndb_sys->getNdbError());
+
+    g_is = new NdbIndexStat;
+    g_info << "connected" << endl;
+  }
+  while (0);
+  return ret;
+}
+
+static void
+dodisconnect()
+{
+  delete g_is;
+  delete g_ndb_sys;
+  delete g_ndb;
+  delete g_ncc;
+    g_info << "disconnected" << endl;
+}
+
+static const char*
+format(Uint64 us64, char* buf)
+{
+  Uint32 ms = (Uint32)(us64 / (Uint64)1000);
+  Uint32 us = (Uint32)(us64 % (Uint64)1000);
+  sprintf(buf, "%u.%03u", ms, us);
+  return buf;
+}
+
+static const char*
+format(double x, char* buf)
+{
+  sprintf(buf, "%.02f", x);
+  return buf;
+}
+
+static void
+show_head(const NdbIndexStat::Head& head)
+{
+  setOutputLevel(2);
+  g_info << "table:" << g_tabname;
+  g_info << " index:" << g_indname;
+  g_info << " fragCount:" << head.m_fragCount;
+  g_info << endl;
+  g_info << "sampleVersion:" << head.m_sampleVersion;
+  g_info << " loadTime:" << head.m_loadTime;
+  g_info << " sampleCount:" << head.m_sampleCount;
+  g_info << " keyBytes:" << head.m_keyBytes;
+  g_info << endl;
+  setOutputLevel(_verbose ? 2 : 0);
+}
+
+static void
+show_cache_info(const char* name, const NdbIndexStat::CacheInfo& info)
+{
+  Uint64 us64;
+  char buf[100];
+  setOutputLevel(2);
+  g_info << name << ":";
+  g_info << " valid:" << info.m_valid;
+  g_info << " sampleCount:" << info.m_sampleCount;
+  g_info << " totalBytes:" << info.m_totalBytes;
+  g_info << endl;
+  g_info << "times in ms:";
+  g_info << " save: " << format(info.m_save_time, buf);
+  g_info << " sort: " << format(info.m_sort_time, buf);
+  if (info.m_sampleCount != 0)
+  {
+    us64 = info.m_sort_time / (Uint64)info.m_sampleCount;
+    g_info << " sort per sample: " << format(us64, buf);
+  }
+  g_info << endl;
+  setOutputLevel(_verbose ? 2 : 0);
+}
+
+static void
+show_cache_entry(const NdbIndexStatImpl::CacheIter& iter)
+{
+  setOutputLevel(2);
+  const NdbPack::DataC& key = iter.m_keyData;
+  const NdbPack::DataC& value = iter.m_valueData;
+  char buf[8000];
+  key.print(buf, sizeof(buf));
+  g_info << "key:" << buf << endl;
+  value.print(buf, sizeof(buf));
+  g_info << "value:" << buf << endl;
+  setOutputLevel(_verbose ? 2 : 0);
+}
+
+static int
+doquery()
+{
+  int ret = 0;
+  char buf[100];
+
+  Uint8 b_lo_buffer[NdbIndexStat::BoundBufferBytes];
+  Uint8 b_hi_buffer[NdbIndexStat::BoundBufferBytes];
+  NdbIndexStat::Bound b_lo(g_is, b_lo_buffer);
+  NdbIndexStat::Bound b_hi(g_is, b_hi_buffer);
+  do
+  {
+    NdbIndexStat::Range r(b_lo, b_hi);
+    Uint8 s_buffer[NdbIndexStat::StatBufferBytes];
+    NdbIndexStat::Stat s(s_buffer);
+
+    for (int n = 0; n < _query; n++)
+    {
+      g_is->reset_range(r);
+      for (int i = 0; i <= 1; i++)
+      {
+        NdbIndexStat::Bound& b = (i == 0 ? b_lo : b_hi);
+
+        bool strict = false;
+        if (random() % 3 != 0)
+        {
+          if (random() % 3 != 0)
+          {
+            Uint32 x = random();
+            CHK2(g_is->add_bound(b, &x) == 0, g_is->getNdbError());
+          }
+          else
+          {
+            CHK2(g_is->add_bound_null(b) == 0, g_is->getNdbError());
+          }
+          bool strict = (random() % 2 == 0);
+          g_is->set_bound_strict(b, strict);
+        }
+      }
+      CHK2(ret == 0, "failed");
+      CHK2(g_is->finalize_range(r) == 0, g_is->getNdbError());
+      CHK2(g_is->query_stat(r, s) == 0, g_is->getNdbError());
+      double rir = -1.0;
+      NdbIndexStat::get_rir(s, &rir);
+      g_info << "rir: " << format(rir, buf) << endl;
+    }
+    CHK2(ret == 0, "failed");
+  }
+  while (0);
+
+  return ret;
+}
+
+static int
+dostats(int i)
+{
+  int ret = 0;
+  do
+  {
+    g_indname = g_indnames[i];
+    g_ind = g_indlist[i];
+
+    g_is->reset_index();
+    CHK2(g_is->set_index(*g_ind, *g_tab) == 0, g_is->getNdbError());
+
+    if (_delete)
+    {
+      g_info << g_indname << ": delete stats" << endl;
+      if (random() % 2 == 0)
+      {
+        CHK2(g_dic->deleteIndexStat(*g_ind, *g_tab) == 0, g_dic->getNdbError());
+      }
+      else
+      {
+        CHK2(g_is->delete_stat(g_ndb_sys) == 0, g_is->getNdbError());
+      }
+    }
+
+    if (_update)
+    {
+      g_info << g_indname << ": update stats" << endl;
+      if (random() % 2 == 0)
+      {
+        CHK2(g_dic->updateIndexStat(*g_ind, *g_tab) == 0, g_dic->getNdbError());
+      }
+      else
+      {
+        CHK2(g_is->update_stat(g_ndb_sys) == 0, g_is->getNdbError());
+      }
+    }
+
+    NdbIndexStat::Head head;
+    g_is->read_head(g_ndb_sys);
+    g_is->get_head(head);
+    CHK2(head.m_found != -1, g_is->getNdbError());
+    if (head.m_found == false)
+    {
+      g_info << "no stats" << endl;
+      break;
+    }
+    show_head(head);
+
+    g_info << "read stats" << endl;
+    CHK2(g_is->read_stat(g_ndb_sys) == 0, g_is->getNdbError());
+    g_is->move_cache();
+    g_is->clean_cache();
+    g_info << "query cache created" << endl;
+
+    NdbIndexStat::CacheInfo infoQuery;
+    g_is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
+    show_cache_info("query cache", infoQuery);
+
+    if (_dump)
+    {
+      NdbIndexStatImpl& impl = g_is->getImpl();
+      NdbIndexStatImpl::CacheIter iter(impl);
+      CHK2(impl.dump_cache_start(iter) == 0, g_is->getNdbError());
+      while (impl.dump_cache_next(iter) == true)
+      {
+        show_cache_entry(iter);
+      }
+    }
+
+    if (_query > 0)
+    {
+      CHK2(doquery() == 0, "failed");
+    }
+  }
+  while (0);
+  return ret;
+}
+
+static int
+dostats()
+{
+  int ret = 0;
+  do
+  {
+    for (int i = 0; i < g_indcount; i++)
+    {
+      CHK1(dostats(i) == 0);
+    }
+    CHK1(ret == 0);
+  }
+  while (0);
+  return ret;
+}
+
+static int
+checkobjs()
+{
+  int ret = 0;
+  do
+  {
+    CHK2((g_tab = g_dic->getTable(g_tabname)) != 0,
+          g_tabname << ": " << g_dic->getNdbError());
+
+    if (g_indcount == 0)
+    {
+      NdbDictionary::Dictionary::List list;
+      CHK2(g_dic->listIndexes(list, g_tabname) == 0, g_dic->getNdbError());
+      const int count = list.count;
+      g_indnames = (const char**)my_malloc(sizeof(char*) * count, MYF(0));
+      CHK2(g_indnames != 0, "out of memory");
+      for (int i = 0; i < count; i++)
+      {
+        const NdbDictionary::Dictionary::List::Element& e = list.elements[i];
+        if (e.type == NdbDictionary::Object::OrderedIndex)
+        {
+          g_indcount++;
+          g_indnames[i] = my_strdup(e.name, MYF(0));
+          CHK2(g_indnames[i] != 0, "out of memory");
+        }
+      }
+      CHK1(ret == 0);
+    }
+    g_indlist = (const NdbDictionary::Index**)my_malloc(sizeof(NdbDictionary::Index*) * g_indcount, MYF(0));
+    CHK2(g_indlist != 0, "out of memory");
+    for (int i = 0; i < g_indcount; i++)
+    {
+      CHK2((g_indlist[i] = g_dic->getIndex(g_indnames[i], g_tabname)) != 0,
+            g_tabname << "." << g_indnames[i] << ": " << g_dic->getNdbError());
+    }
+  }
+  while (0);
+  return ret;
+}
+
+static int
+dosys()
+{
+  int ret = 0;
+  do
+  {
+    if (_sys_drop)
+    {
+      g_info << "dropping any sys tables" << endl;
+      CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
+      CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
+      CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+           "unexpected error: " << g_is->getNdbError());
+      g_info << "drop done" << endl;
+    }
+
+    if (_sys_create)
+    {
+      g_info << "creating all sys tables" << endl;
+      CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
+      CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+      g_info << "create done" << endl;
+    }
+
+    if (_sys_create_if_not_exist)
+    {
+      if (g_is->check_systables(g_ndb_sys) == -1)
+      {
+        CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+             g_is->getNdbError());
+        g_info << "creating all sys tables" << endl;
+        CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
+        CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+        g_info << "create done" << endl;
+      }
+      else
+      {
+        g_info << "using existing sys tables" << endl;
+      }
+    }
+
+    if (_sys_create_if_not_valid)
+    {
+      if (g_is->check_systables(g_ndb_sys) == -1)
+      {
+        if (g_is->getNdbError().code != NdbIndexStat::NoSysTables)
+        {
+          CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysTables,
+               g_is->getNdbError());
+          g_info << "dropping invalid sys tables" << endl;
+          CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
+          CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
+          CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+               "unexpected error: " << g_is->getNdbError());
+          g_info << "drop done" << endl;
+        }
+        g_info << "creating all sys tables" << endl;
+        CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
+        CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+        g_info << "create done" << endl;
+      }
+      else
+      {
+        g_info << "using existing sys tables" << endl;
+      }
+    }
+
+    if (_sys_check)
+    {
+      CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+      g_info << "sys tables ok" << endl;
+    }
+  }
+  while (0);
+  return ret;
+}
+
+static int
+doall()
+{
+  int ret = 0;
+  do
+  {
+    CHK2(doconnect() == 0, "connect to NDB");
+
+    int loop = 0;
+    while (++loop <= _loops)
+    {
+      g_info << "loop " << loop << " of " << _loops << endl;
+      if (!_sys_any)
+      {
+        if (loop == 1)
+        {
+          CHK1(checkobjs() == 0);
+        }
+        CHK2(dostats() == 0, "at loop " << loop);
+      }
+      else
+      {
+        CHK2(dosys() == 0, "at loop " << loop);
+      }
+    }
+    CHK1(ret == 0);
+  }
+  while (0);
+
+  dodisconnect();
+  return ret;
+}
+
+static int oi = 1000;
+static struct my_option
+my_long_options[] =
+{
+  NDB_STD_OPTS("ndb_index_stat"),
+  // stats options
+  { "database", 'd',
+    "Name of database table is in",
+    (uchar**) &_dbname, (uchar**) &_dbname, 0,
+    GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+  { "delete", ++oi,
+    "Delete index stats of given table"
+     " and stop any configured auto update",
+    (uchar **)&_delete, (uchar **)&_delete, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "update", ++oi,
+    "Update index stats of given table"
+     " and restart any configured auto update",
+    (uchar **)&_update, (uchar **)&_update, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "dump", ++oi,
+    "Dump query cache",
+    (uchar **)&_dump, (uchar **)&_dump, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "query", NDB_OPT_NOSHORT,
+    "Perform random range queries on first key attr (must be int unsigned)",
+    (uchar **)&_query, (uchar **)&_query, 0,
+    GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+  // sys options
+  { "sys-drop", ++oi,
+    "Drop any stats tables in NDB kernel (all stats is lost)",
+    (uchar **)&_sys_drop, (uchar **)&_sys_drop, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "sys-create", ++oi,
+    "Create stats tables in NDB kernel (must not exist)",
+    (uchar **)&_sys_create, (uchar **)&_sys_create, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "sys-create-if-not-exist", ++oi,
+    "Like --sys-create but do nothing if correct stats tables exist",
+    (uchar **)&_sys_create_if_not_exist, (uchar **)&_sys_create_if_not_exist, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "sys-create-if-not-valid", ++oi,
+    "Like --sys-create-if-not-exist but first drop any invalid tables",
+    (uchar **)&_sys_create_if_not_valid, (uchar **)&_sys_create_if_not_valid, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "sys-check", ++oi,
+    "Check that correct stats tables exist in NDB kernel",
+    (uchar **)&_sys_check, (uchar **)&_sys_check, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  // other
+  { "verbose", 'v',
+    "Verbose messages",
+    (uchar **)&_verbose, (uchar **)&_verbose, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "loops", NDB_OPT_NOSHORT,
+    "Repeat same commands a number of times (for testing)",
+    (uchar **)&_loops, (uchar **)&_loops, 0,
+    GET_INT, REQUIRED_ARG, 1, 0, 0, 0, 0, 0 },
+  { 0, 0,
+    0,
+    0, 0, 0,
+    GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }
+};
+
+const char*
+load_default_groups[]= { "mysql_cluster", 0 };
+
+static void
+short_usage_sub(void)
+{
+  ndb_short_usage_sub("[table [index...]]");
+}
+
+static void
+usage()
+{
+  printf("%s: ordered index stats tool and test\n", my_progname);
+  ndb_usage(short_usage_sub, load_default_groups, my_long_options);
+}
+
+static int
+checkopts(int argc, char** argv)
+{
+  int ret = 0;
+  do
+  {
+    _stats_any =
+      (_dbname != 0) +
+      (_delete != 0) +
+      (_update != 0) +
+      (_dump != 0) +
+      (_query != 0);
+    _sys_any =
+      (_sys_create != 0) +
+      (_sys_create_if_not_exist != 0) +
+      (_sys_create_if_not_valid != 0) +
+      (_sys_drop != 0) +
+      ( _sys_check != 0);
+    if (!_sys_any)
+    {
+      if (_dbname == 0)
+        _dbname = "TEST_DB";
+      CHK2(argc >= 1, "stats options require table");
+      g_tabname = my_strdup(argv[0], MYF(0));
+      CHK2(g_tabname != 0, "out of memory");
+      g_indcount = argc - 1;
+      if (g_indcount != 0)
+      {
+        g_indnames = (const char**)my_malloc(sizeof(char*) * g_indcount, MYF(0));
+        CHK2(g_indnames != 0, "out of memory");
+        for (int i = 0; i < g_indcount; i++)
+        {
+          g_indnames[i] = my_strdup(argv[1 + i], MYF(0));
+          CHK2(g_indnames[i] != 0, "out of memory");
+        }
+        CHK1(ret == 0);
+      }
+    }
+    else
+    {
+      CHK2(_stats_any == 0, "cannot mix --sys options with stats options");
+      CHK2(argc == 0, "--sys options take no args");
+    }
+  }
+  while (0);
+  return ret;
+}
+
+int
+main(int argc, char** argv)
+{
+  my_progname = "ndb_index_stat";
+  int ret;
+
+  srandom((unsigned)time(0));
+
+  ndb_init();
+  ndb_opt_set_usage_funcs(short_usage_sub, usage);
+  ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option);
+  if (ret != 0 || checkopts(argc, argv) != 0)
+    return NDBT_ProgramExit(NDBT_WRONGARGS);
+
+  setOutputLevel(_verbose ? 2 : 0);
+
+  ret = doall();
+  if (ret == -1)
+    return NDBT_ProgramExit(NDBT_FAILED);
+  return NDBT_ProgramExit(NDBT_OK);
+}


Attachment: [text/bzr-bundle] bzr/pekka.nousiainen@oracle.com-20110606121827-r8sgvjr0trpvzpw9.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-wl4124-new0 branch(pekka.nousiainen:4390) WL#4124Pekka Nousiainen6 Jun