List:Commits« Previous MessageNext Message »
From:magnus.blaudd Date:June 22 2011 8:15am
Subject:bzr push into mysql-5.5-cluster branch (magnus.blaudd:3363 to 3364)
View as plain text  
 3364 magnus.blaudd@stripped	2011-06-22 [merge]
      Merge 7.1 -> 5.5-cluster

    added:
      sql/ha_ndb_index_stat.cc
      sql/ha_ndb_index_stat.h
      storage/ndb/src/ndbapi/NdbIndexStatFrmData.cpp
      storage/ndb/tools/ndb_dump_frm_data.cpp
    modified:
      mysql-test/suite/ndb/r/ndb_basic.result
      mysql-test/suite/ndb/r/ndb_index_ordered.result
      mysql-test/suite/ndb/t/ndb_basic.test
      mysql-test/suite/ndb/t/ndb_index_ordered.test
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/ndb_share.h
      storage/ndb/CMakeLists.txt
      storage/ndb/include/kernel/ndb_limits.h
      storage/ndb/include/ndb_constants.h
      storage/ndb/src/kernel/blocks/trix/Trix.cpp
      storage/ndb/src/ndbapi/CMakeLists.txt
      storage/ndb/src/ndbapi/Makefile.am
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp
      storage/ndb/tools/CMakeLists.txt
      storage/ndb/tools/Makefile.am
      storage/ndb/tools/ndb_index_stat.cpp
 3363 magnus.blaudd@stripped	2011-06-22
      ndb
       - fix windows compile problem, "first seen as class now using struct"

    modified:
      sql/ndb_share.h
=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result	2011-06-20 10:41:04 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result	2011-06-22 08:09:31 +0000
@@ -70,6 +70,8 @@ Ndb_conflict_fn_max_del_win	#
 Ndb_conflict_fn_old	#
 Ndb_connect_count	#
 Ndb_execute_count	#
+Ndb_index_stat_cache_clean	#
+Ndb_index_stat_cache_query	#
 Ndb_number_of_data_nodes	#
 Ndb_number_of_ready_data_nodes	#
 Ndb_pruned_scan_count	#
@@ -90,6 +92,7 @@ ndb_extra_logging	#
 ndb_force_send	#
 ndb_index_stat_cache_entries	#
 ndb_index_stat_enable	#
+ndb_index_stat_option	#
 ndb_index_stat_update_freq	#
 ndb_log_apply_status	#
 ndb_log_bin	#
@@ -900,8 +903,8 @@ drop table t10,t20;
 #
 # bug #39872 - explain causes segv
 # (ndb_index_stat_enable=1 must be set to trigger bug)
+# index stats v4: do not set, the v2 setting was local
 #
-set ndb_index_stat_enable=1;
 CREATE TABLE `t1` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 PRIMARY KEY (`id`)
@@ -915,8 +918,8 @@ KEY `obj_id` (`obj_id`)
 # here we used to segv
 explain SELECT t1.id FROM t1 INNER JOIN t2 ON t1.id = t2.id WHERE t2.obj_id=1;
 id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t2	ref	id,obj_id	obj_id	5	const	1	Using where with pushed condition
-1	SIMPLE	t1	eq_ref	PRIMARY	PRIMARY	4	test.t2.id	1	
+1	SIMPLE	t2	ref	id,obj_id	obj_id	5	const	#	Using where with pushed condition
+1	SIMPLE	t1	eq_ref	PRIMARY	PRIMARY	4	test.t2.id	#	
 drop table t1, t2;
 #
 # Bug #47054 Cluster only deletes first matched row in delete with left join

=== modified file 'mysql-test/suite/ndb/r/ndb_index_ordered.result'
--- a/mysql-test/suite/ndb/r/ndb_index_ordered.result	2010-10-27 11:32:32 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_ordered.result	2011-06-22 08:09:31 +0000
@@ -659,143 +659,6 @@ select count(*) from t1 where c<'bbb';
 count(*)
 1
 drop table t1;
-set autocommit=1;
-show session variables like 'ndb_index_stat_%';
-Variable_name	Value
-ndb_index_stat_cache_entries	32
-ndb_index_stat_enable	OFF
-ndb_index_stat_update_freq	20
-set ndb_index_stat_enable = off;
-show session variables like 'ndb_index_stat_%';
-Variable_name	Value
-ndb_index_stat_cache_entries	32
-ndb_index_stat_enable	OFF
-ndb_index_stat_update_freq	20
-create table t1 (a int, b int, c varchar(10) not null,
-primary key using hash (a), index(b,c)) engine=ndb;
-insert into t1 values
-(1,10,'aaa'),(2,10,'bbb'),(3,10,'ccc'),
-(4,20,'aaa'),(5,20,'bbb'),(6,20,'ccc'),
-(7,30,'aaa'),(8,30,'bbb'),(9,30,'ccc');
-select count(*) from t1 where b < 10;
-count(*)
-0
-select count(*) from t1 where b >= 10 and c >= 'bbb';
-count(*)
-6
-select count(*) from t1 where b > 10;
-count(*)
-6
-select count(*) from t1 where b <= 20 and c < 'ccc';
-count(*)
-4
-select count(*) from t1 where b = 20 and c = 'ccc';
-count(*)
-1
-select count(*) from t1 where b > 20;
-count(*)
-3
-select count(*) from t1 where b = 30 and c > 'aaa';
-count(*)
-2
-select count(*) from t1 where b <= 20;
-count(*)
-6
-select count(*) from t1 where b >= 20 and c > 'aaa';
-count(*)
-4
-drop table t1;
-set ndb_index_stat_enable = on;
-set ndb_index_stat_cache_entries = 0;
-show session variables like 'ndb_index_stat_%';
-Variable_name	Value
-ndb_index_stat_cache_entries	0
-ndb_index_stat_enable	ON
-ndb_index_stat_update_freq	20
-create table t1 (a int, b int, c varchar(10) not null,
-primary key using hash (a), index(b,c)) engine=ndb;
-insert into t1 values
-(1,10,'aaa'),(2,10,'bbb'),(3,10,'ccc'),
-(4,20,'aaa'),(5,20,'bbb'),(6,20,'ccc'),
-(7,30,'aaa'),(8,30,'bbb'),(9,30,'ccc');
-select count(*) from t1 where b < 10;
-count(*)
-0
-select count(*) from t1 where b >= 10 and c >= 'bbb';
-count(*)
-6
-select count(*) from t1 where b > 10;
-count(*)
-6
-select count(*) from t1 where b <= 20 and c < 'ccc';
-count(*)
-4
-select count(*) from t1 where b = 20 and c = 'ccc';
-count(*)
-1
-select count(*) from t1 where b > 20;
-count(*)
-3
-select count(*) from t1 where b = 30 and c > 'aaa';
-count(*)
-2
-select count(*) from t1 where b <= 20;
-count(*)
-6
-select count(*) from t1 where b >= 20 and c > 'aaa';
-count(*)
-4
-drop table t1;
-set ndb_index_stat_enable = on;
-set ndb_index_stat_cache_entries = 4;
-set ndb_index_stat_update_freq = 2;
-show session variables like 'ndb_index_stat_%';
-Variable_name	Value
-ndb_index_stat_cache_entries	4
-ndb_index_stat_enable	ON
-ndb_index_stat_update_freq	2
-create table t1 (a int, b int, c varchar(10) not null,
-primary key using hash (a), index(b,c)) engine=ndb;
-insert into t1 values
-(1,10,'aaa'),(2,10,'bbb'),(3,10,'ccc'),
-(4,20,'aaa'),(5,20,'bbb'),(6,20,'ccc'),
-(7,30,'aaa'),(8,30,'bbb'),(9,30,'ccc');
-select count(*) from t1 where b < 10;
-count(*)
-0
-select count(*) from t1 where b >= 10 and c >= 'bbb';
-count(*)
-6
-select count(*) from t1 where b > 10;
-count(*)
-6
-select count(*) from t1 where b <= 20 and c < 'ccc';
-count(*)
-4
-select count(*) from t1 where b = 20 and c = 'ccc';
-count(*)
-1
-select count(*) from t1 where b > 20;
-count(*)
-3
-select count(*) from t1 where b = 30 and c > 'aaa';
-count(*)
-2
-select count(*) from t1 where b <= 20;
-count(*)
-6
-select count(*) from t1 where b >= 20 and c > 'aaa';
-count(*)
-4
-drop table t1;
-set ndb_index_stat_enable = @@global.ndb_index_stat_enable;
-set ndb_index_stat_cache_entries = @@global.ndb_index_stat_cache_entries;
-set ndb_index_stat_update_freq = @@global.ndb_index_stat_update_freq;
-show session variables like 'ndb_index_stat_%';
-Variable_name	Value
-ndb_index_stat_cache_entries	32
-ndb_index_stat_enable	OFF
-ndb_index_stat_update_freq	20
 create table t1 (a int primary key) engine = ndb;
 insert into t1 values (1), (2), (3);
 begin;

=== modified file 'mysql-test/suite/ndb/t/ndb_basic.test'
--- a/mysql-test/suite/ndb/t/ndb_basic.test	2011-04-14 15:47:23 +0000
+++ b/mysql-test/suite/ndb/t/ndb_basic.test	2011-06-22 08:09:31 +0000
@@ -758,8 +758,9 @@ drop table t10,t20;
 --echo #
 --echo # bug #39872 - explain causes segv
 --echo # (ndb_index_stat_enable=1 must be set to trigger bug)
+--echo # index stats v4: do not set, the v2 setting was local
 --echo #
-set ndb_index_stat_enable=1;
+# set ndb_index_stat_enable=1;
 CREATE TABLE `t1` (
   `id` int(11) NOT NULL AUTO_INCREMENT,
   PRIMARY KEY (`id`)
@@ -771,6 +772,7 @@ CREATE TABLE `t2` (
   KEY `obj_id` (`obj_id`)
 ) ENGINE=ndbcluster DEFAULT CHARSET=utf8;
 --echo # here we used to segv
+--replace_column 9 #
 explain SELECT t1.id FROM t1 INNER JOIN t2 ON t1.id = t2.id WHERE t2.obj_id=1;
 drop table t1, t2;
 

=== modified file 'mysql-test/suite/ndb/t/ndb_index_ordered.test'
--- a/mysql-test/suite/ndb/t/ndb_index_ordered.test	2010-10-27 11:32:32 +0000
+++ b/mysql-test/suite/ndb/t/ndb_index_ordered.test	2011-06-22 08:09:31 +0000
@@ -355,78 +355,7 @@ insert into t1 (a, c) values (1,'aaa'),(
 select count(*) from t1 where c<'bbb';
 drop table t1;
 
-# -- index statistics --
-
-set autocommit=1;
-show session variables like 'ndb_index_stat_%';
-
-set ndb_index_stat_enable = off;
-show session variables like 'ndb_index_stat_%';
-
-create table t1 (a int, b int, c varchar(10) not null,
-  primary key using hash (a), index(b,c)) engine=ndb;
-insert into t1 values
-  (1,10,'aaa'),(2,10,'bbb'),(3,10,'ccc'),
-  (4,20,'aaa'),(5,20,'bbb'),(6,20,'ccc'),
-  (7,30,'aaa'),(8,30,'bbb'),(9,30,'ccc');
-select count(*) from t1 where b < 10;
-select count(*) from t1 where b >= 10 and c >= 'bbb';
-select count(*) from t1 where b > 10;
-select count(*) from t1 where b <= 20 and c < 'ccc';
-select count(*) from t1 where b = 20 and c = 'ccc';
-select count(*) from t1 where b > 20;
-select count(*) from t1 where b = 30 and c > 'aaa';
-select count(*) from t1 where b <= 20;
-select count(*) from t1 where b >= 20 and c > 'aaa';
-drop table t1;
-
-set ndb_index_stat_enable = on;
-set ndb_index_stat_cache_entries = 0;
-show session variables like 'ndb_index_stat_%';
-
-create table t1 (a int, b int, c varchar(10) not null,
-  primary key using hash (a), index(b,c)) engine=ndb;
-insert into t1 values
-  (1,10,'aaa'),(2,10,'bbb'),(3,10,'ccc'),
-  (4,20,'aaa'),(5,20,'bbb'),(6,20,'ccc'),
-  (7,30,'aaa'),(8,30,'bbb'),(9,30,'ccc');
-select count(*) from t1 where b < 10;
-select count(*) from t1 where b >= 10 and c >= 'bbb';
-select count(*) from t1 where b > 10;
-select count(*) from t1 where b <= 20 and c < 'ccc';
-select count(*) from t1 where b = 20 and c = 'ccc';
-select count(*) from t1 where b > 20;
-select count(*) from t1 where b = 30 and c > 'aaa';
-select count(*) from t1 where b <= 20;
-select count(*) from t1 where b >= 20 and c > 'aaa';
-drop table t1;
-
-set ndb_index_stat_enable = on;
-set ndb_index_stat_cache_entries = 4;
-set ndb_index_stat_update_freq = 2;
-show session variables like 'ndb_index_stat_%';
-
-create table t1 (a int, b int, c varchar(10) not null,
-  primary key using hash (a), index(b,c)) engine=ndb;
-insert into t1 values
-  (1,10,'aaa'),(2,10,'bbb'),(3,10,'ccc'),
-  (4,20,'aaa'),(5,20,'bbb'),(6,20,'ccc'),
-  (7,30,'aaa'),(8,30,'bbb'),(9,30,'ccc');
-select count(*) from t1 where b < 10;
-select count(*) from t1 where b >= 10 and c >= 'bbb';
-select count(*) from t1 where b > 10;
-select count(*) from t1 where b <= 20 and c < 'ccc';
-select count(*) from t1 where b = 20 and c = 'ccc';
-select count(*) from t1 where b > 20;
-select count(*) from t1 where b = 30 and c > 'aaa';
-select count(*) from t1 where b <= 20;
-select count(*) from t1 where b >= 20 and c > 'aaa';
-drop table t1;
-
-set ndb_index_stat_enable = @@global.ndb_index_stat_enable;
-set ndb_index_stat_cache_entries = @@global.ndb_index_stat_cache_entries;
-set ndb_index_stat_update_freq = @@global.ndb_index_stat_update_freq;
-show session variables like 'ndb_index_stat_%';
+# index stats v4: old v2 tests are not meaningful and are removed
 
 # End of 4.1 tests
 

=== added file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ha_ndb_index_stat.cc	2011-06-22 08:09:31 +0000
@@ -0,0 +1,1962 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ha_ndbcluster_glue.h"
+
+#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
+
+#include "ha_ndbcluster.h"
+#include "ha_ndb_index_stat.h"
+#include <mysql/plugin.h>
+#include <ctype.h>
+
+// copied from ha_ndbcluster_binlog.h
+
+extern handlerton *ndbcluster_hton;
+
+inline
+void
+set_thd_ndb(THD *thd, Thd_ndb *thd_ndb)
+{ thd_set_ha_data(thd, ndbcluster_hton, thd_ndb); }
+
+// Typedefs for long names 
+typedef NdbDictionary::Table NDBTAB;
+typedef NdbDictionary::Index NDBINDEX;
+
+struct Ndb_index_stat {
+  enum {
+    LT_Undef= 0,
+    LT_New= 1,          /* new entry added by a table handler */
+    LT_Update = 2,      /* force kernel update from analyze table */
+    LT_Read= 3,         /* read or reread stats into new query cache */
+    LT_Idle= 4,         /* stats exist */
+    LT_Check= 5,        /* check for new stats */
+    LT_Delete= 6,       /* delete the entry */
+    LT_Error= 7,        /* error, on hold for a while */
+    LT_Count= 8
+  };
+  NdbIndexStat* is;
+  int index_id;
+  int index_version;
+#ifndef DBUG_OFF
+  char id[32];
+#endif
+  time_t access_time;   /* by any table handler */
+  time_t load_time;     /* when stats were created by kernel */
+  time_t read_time;     /* when stats were read by us (>= load_time) */
+  uint sample_version;  /* goes with read_time */
+  time_t check_time;    /* when checked for updated stats (>= read_time) */
+  bool cache_clean;     /* old caches have been deleted */
+  uint force_update;    /* one-time force update from analyze table */
+  NdbIndexStat::Error error;
+  time_t error_time;
+  int error_count;
+  struct Ndb_index_stat *share_next; /* per-share list */
+  int lt;
+  int lt_old;     /* for info only */
+  struct Ndb_index_stat *list_next;
+  struct Ndb_index_stat *list_prev;
+  struct NDB_SHARE *share;
+  Ndb_index_stat();
+};
+
+struct Ndb_index_stat_list {
+  const char *name;
+  int lt;
+  struct Ndb_index_stat *head;
+  struct Ndb_index_stat *tail;
+  uint count;
+  Ndb_index_stat_list(int the_lt, const char* the_name);
+};
+
+extern Ndb_index_stat_list ndb_index_stat_list[];
+
+time_t ndb_index_stat_time_now= 0;
+
+time_t
+ndb_index_stat_time()
+{
+  time_t now= time(0);
+
+  if (unlikely(ndb_index_stat_time_now == 0))
+    ndb_index_stat_time_now= now;
+
+  if (unlikely(now < ndb_index_stat_time_now))
+  {
+    DBUG_PRINT("index_stat", ("time moved backwards %d seconds",
+                              int(ndb_index_stat_time_now - now)));
+    now= ndb_index_stat_time_now;
+  }
+
+  ndb_index_stat_time_now= now;
+  return now;
+}
+
+bool ndb_index_stat_allow_flag= false;
+
+bool
+ndb_index_stat_allow(int flag= -1)
+{
+  if (flag != -1) {
+    pthread_mutex_lock(&ndb_index_stat_list_mutex);
+    ndb_index_stat_allow_flag= (bool)flag;
+    pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  }
+  return ndb_index_stat_allow_flag;
+}
+
+/* Options */
+
+/* Options in string format buffer size */
+static const uint ndb_index_stat_option_sz= 512;
+void ndb_index_stat_opt2str(const struct Ndb_index_stat_opt&, char*);
+
+struct Ndb_index_stat_opt {
+  enum Unit {
+    Ubool = 1,
+    Usize = 2,
+    Utime = 3,
+    Umsec = 4
+  };
+  enum Flag {
+    Freadonly = (1 << 0)
+  };
+  struct Val {
+    const char* name;
+    uint val;
+    uint minval;
+    uint maxval;
+    Unit unit;
+    uint flag;
+  };
+  enum Idx {
+    Iloop_checkon = 0,
+    Iloop_idle = 1,
+    Iloop_busy = 2,
+    Iupdate_batch = 3,
+    Iread_batch = 4,
+    Iidle_batch = 5,
+    Icheck_batch = 6,
+    Icheck_delay = 7,
+    Idelete_batch = 8,
+    Iclean_delay = 9,
+    Ierror_batch = 10,
+    Ierror_delay = 11,
+    Ievict_batch = 12,
+    Ievict_delay = 13,
+    Icache_limit = 14,
+    Icache_lowpct = 15,
+    Imax = 16
+  };
+  Val val[Imax];
+  /* Options in string format (SYSVAR ndb_index_stat_option) */
+  char *option;
+  Ndb_index_stat_opt(char* buf);
+  uint get(Idx i) const {
+    assert(i < Imax);
+    return val[i].val;
+  }
+};
+
+Ndb_index_stat_opt::Ndb_index_stat_opt(char* buf) :
+  option(buf)
+{
+#define ival(aname, aval, aminval, amaxval, aunit, aflag) \
+  val[I##aname].name = #aname; \
+  val[I##aname].val = aval; \
+  val[I##aname].minval = aminval; \
+  val[I##aname].maxval = amaxval; \
+  val[I##aname].unit = aunit; \
+  val[I##aname].flag = aflag
+  ival(loop_checkon, 1000, 0, ~0, Umsec, 0);
+  ival(loop_idle, 1000, 0, ~0, Umsec, 0);
+  ival(loop_busy, 100, 0, ~0, Umsec, 0);
+  ival(update_batch, 1, 1, ~0, Usize, 0);
+  ival(read_batch, 4, 1, ~0, Usize, 0);
+  ival(idle_batch, 32, 1, ~0, Usize, 0);
+  ival(check_batch, 32, 1, ~0, Usize, 0);
+  ival(check_delay, 60, 0, ~0, Utime, 0);
+  ival(clean_delay, 0, 0, ~0, Utime, 0);
+  ival(delete_batch, 8, 1, ~0, Usize, 0);
+  ival(error_batch, 4, 1, ~0, Usize, 0);
+  ival(error_delay, 60, 0, ~0, Utime, 0);
+  ival(evict_batch, 8, 1, ~0, Usize, 0);
+  ival(evict_delay, 60, 0, ~0, Utime, 0);
+  ival(cache_limit, 32*1024*1024, 1024*1024, ~0, Usize, 0);
+  ival(cache_lowpct, 90, 0, 100, Usize, 0);
+#undef ival
+
+  ndb_index_stat_opt2str(*this, option);
+}
+
+/* Hard limits */
+static const uint ndb_index_stat_max_evict_batch = 32;
+
+char ndb_index_stat_option_buf[ndb_index_stat_option_sz];
+Ndb_index_stat_opt ndb_index_stat_opt(ndb_index_stat_option_buf);
+
+/* Copy option struct to string buffer */
+void
+ndb_index_stat_opt2str(const Ndb_index_stat_opt& opt, char* str)
+{
+  DBUG_ENTER("ndb_index_stat_opt2str");
+
+  char buf[ndb_index_stat_option_sz];
+  char *const end= &buf[sizeof(buf)];
+  char* ptr= buf;
+  *ptr= 0;
+
+  const uint imax= Ndb_index_stat_opt::Imax;
+  for (uint i= 0; i < imax; i++)
+  {
+    const Ndb_index_stat_opt::Val& v= opt.val[i];
+    ptr+= strlen(ptr);
+    const char* sep= (ptr == buf ? "" : ",");
+    const uint sz= ptr < end ? end - ptr : 0;
+
+    switch (v.unit) {
+    case Ndb_index_stat_opt::Ubool:
+      {
+        DBUG_ASSERT(v.val == 0 || v.val == 1);
+        if (v.val == 0)
+          my_snprintf(ptr, sz, "%s%s=OFF", sep, v.name);
+        else
+          my_snprintf(ptr, sz, "%s%s=ON", sep, v.name);
+      }
+      break;
+
+    case Ndb_index_stat_opt::Usize:
+      {
+        uint m;
+        if (v.val == 0)
+          my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
+        else if (v.val % (m= 1024*1024*1024) == 0)
+          my_snprintf(ptr, sz, "%s%s=%uG", sep, v.name, v.val / m);
+        else if (v.val % (m= 1024*1024) == 0)
+          my_snprintf(ptr, sz, "%s%s=%uM", sep, v.name, v.val / m);
+        else if (v.val % (m= 1024) == 0)
+          my_snprintf(ptr, sz, "%s%s=%uK", sep, v.name, v.val / m);
+        else
+          my_snprintf(ptr, sz, "%s%s=%u", sep, v.name, v.val);
+      }
+      break;
+
+    case Ndb_index_stat_opt::Utime:
+      {
+        uint m;
+        if (v.val == 0)
+          my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
+        else if (v.val % (m= 60*60*24) == 0)
+          my_snprintf(ptr, sz, "%s%s=%ud", sep, v.name, v.val / m);
+        else if (v.val % (m= 60*60) == 0)
+          my_snprintf(ptr, sz, "%s%s=%uh", sep, v.name, v.val / m);
+        else if (v.val % (m= 60) == 0)
+          my_snprintf(ptr, sz, "%s%s=%um", sep, v.name, v.val / m);
+        else
+          my_snprintf(ptr, sz, "%s%s=%us", sep, v.name, v.val);
+      }
+      break;
+
+    case Ndb_index_stat_opt::Umsec:
+      {
+        if (v.val == 0)
+          my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
+        else
+          my_snprintf(ptr, sz, "%s%s=%ums", sep, v.name, v.val);
+      }
+      break;
+
+    default:
+      DBUG_ASSERT(false);
+      break;
+    }
+  }
+
+  memset(str, 0, ndb_index_stat_option_sz);
+  strcpy(str, buf);
+  DBUG_PRINT("index_stat", ("str: \"%s\"", str));
+  DBUG_VOID_RETURN;
+}
+
+int
+ndb_index_stat_option_parse(char* p, Ndb_index_stat_opt& opt)
+{
+  DBUG_ENTER("ndb_index_stat_option_parse");
+
+  char *r= strchr(p, '=');
+  if (r == 0)
+    DBUG_RETURN(-1);
+  *r++= 0;
+
+  while (isspace(*r))
+    *r++= 0;
+  if (*r == 0)
+    DBUG_RETURN(-1);
+
+  const uint imax= Ndb_index_stat_opt::Imax;
+  for (uint i= 0; i < imax; i++)
+  {
+    Ndb_index_stat_opt::Val& v= opt.val[i];
+    if (strcmp(p, v.name) != 0)
+      continue;
+
+    char *s;
+    for (s= r; *s != 0; s++)
+      *s= tolower(*s);
+    ulonglong val= strtoull(r, &s, 10);
+
+    switch (v.unit) {
+    case Ndb_index_stat_opt::Ubool:
+      {
+        if ((s > r && *s == 0 && val == 0) ||
+            strcmp(r, "off") == 0 ||
+            strcmp(r, "false") == 0)
+          val= 0;
+        else if ((s > r && *s == 0 && val == 1) ||
+            strcmp(r, "on") == 0 ||
+            strcmp(r, "true") == 0)
+          val= 1;
+        else
+          DBUG_RETURN(-1);
+        v.val= (uint)val;
+      }
+      break;
+
+    case Ndb_index_stat_opt::Usize:
+      {
+        if (s == r)
+          DBUG_RETURN(-1);
+        if (strcmp(s, "") == 0)
+          ;
+        else if (strcmp(s, "k") == 0)
+          val*= 1024;
+        else if (strcmp(s, "m") == 0)
+          val*= 1024*1024;
+        else if (strcmp(s, "g") == 0)
+          val*= 1024*1024*1024;
+        else
+          DBUG_RETURN(-1);
+        if (val < v.minval || val > v.maxval)
+          DBUG_RETURN(-1);
+        v.val= (uint)val;
+      }
+      break;
+
+    case Ndb_index_stat_opt::Utime:
+      {
+        if (s == r)
+          DBUG_RETURN(-1);
+        if (strcmp(s, "") == 0)
+          ;
+        else if (strcmp(s, "s") == 0)
+          ;
+        else if (strcmp(s, "m") == 0)
+          val*= 60;
+        else if (strcmp(s, "h") == 0)
+          val*= 60*60;
+        else if (strcmp(s, "d") == 0)
+          val*= 24*60*60;
+        else
+          DBUG_RETURN(-1);
+        if (val < v.minval || val > v.maxval)
+          DBUG_RETURN(-1);
+        v.val= (uint)val;
+      }
+      break;
+
+    case Ndb_index_stat_opt::Umsec:
+      {
+        if (s == r)
+          DBUG_RETURN(-1);
+        if (strcmp(s, "") == 0)
+          ;
+        else if (strcmp(s, "ms") == 0)
+          ;
+        else
+          DBUG_RETURN(-1);
+        if (val < v.minval || val > v.maxval)
+          DBUG_RETURN(-1);
+        v.val= (uint)val;
+      }
+      break;
+
+    default:
+      DBUG_ASSERT(false);
+      break;
+    }
+  }
+  DBUG_RETURN(0);
+}
+
+/* Copy option string to option struct */
+int
+ndb_index_stat_str2opt(const char *str, Ndb_index_stat_opt& opt)
+{
+  DBUG_ENTER("ndb_index_stat_str2opt");
+  DBUG_PRINT("index_stat", ("str: \"%s\"", str));
+
+  char buf[ndb_index_stat_option_sz];
+
+  assert(str != 0);
+  if (strlen(str) >= sizeof(buf))
+    DBUG_RETURN(-1);
+  strcpy(buf, str);
+
+  char *p= buf;
+  while (1)
+  {
+    while (isspace(*p))
+      p++;
+    if (*p == 0)
+      break;
+
+    char *q= strchr(p, ',');
+    if (q == p)
+      DBUG_RETURN(-1);
+    if (q != 0)
+      *q= 0;
+
+    DBUG_PRINT("index_stat", ("parse: %s", p));
+    if (ndb_index_stat_option_parse(p, opt) == -1)
+      DBUG_RETURN(-1);
+
+    if (q == 0)
+      break;
+    p= q + 1;
+  }
+
+  ndb_index_stat_opt2str(opt, opt.option);
+  DBUG_RETURN(0);
+}
+
+/* Thanks to ha_innodb.cc */
+
+/* Need storage between check and update (assume locked) */
+char ndb_index_stat_option_tmp[ndb_index_stat_option_sz];
+ 
+int
+ndb_index_stat_option_check(MYSQL_THD,
+                            struct st_mysql_sys_var *var,
+                            void *save,
+                            struct st_mysql_value *value)
+{
+  DBUG_ENTER("ndb_index_stat_option_check");
+  char buf[ndb_index_stat_option_sz];
+  int len= sizeof(buf);
+  const char *str= value->val_str(value, buf, &len);
+  if (str != 0)
+  {
+    /* Seems to be nothing in buf */
+    DBUG_PRINT("index_stat", ("str: %s len: %d", str, len));
+    char buf2[ndb_index_stat_option_sz];
+    Ndb_index_stat_opt opt(buf2);
+    if (ndb_index_stat_str2opt(str, opt) == 0)
+    {
+      /* Passed to update */
+      strcpy(ndb_index_stat_option_tmp, str);
+      *(const char**)save= ndb_index_stat_option_tmp;
+      DBUG_RETURN(0);
+    }
+  }
+  DBUG_RETURN(1);
+}
+
+void
+ndb_index_stat_option_update(MYSQL_THD,
+                             struct st_mysql_sys_var *var,
+                             void *var_ptr,
+                             const void *save)
+{
+  DBUG_ENTER("ndb_index_stat_option_update");
+  const char *str= *(const char**)save;
+  DBUG_PRINT("index_stat", ("str: %s", str));
+  Ndb_index_stat_opt& opt= ndb_index_stat_opt;
+  int ret= ndb_index_stat_str2opt(str, opt);
+  assert(ret == 0);
+  *(const char**)var_ptr= ndb_index_stat_opt.option;
+  DBUG_VOID_RETURN;
+}
+
+/* Global stuff */
+
+struct Ndb_index_stat_glob {
+  uint list_count[Ndb_index_stat::LT_Count]; /* Temporary use */
+  uint total_count;
+  uint force_update;
+  uint wait_update;
+  uint cache_query_bytes; /* In use */
+  uint cache_clean_bytes; /* Obsolete versions not yet removed */
+  bool is_locked;
+  Ndb_index_stat_glob() :
+    total_count(0),
+    force_update(0),
+    wait_update(0),
+    cache_query_bytes(0),
+    cache_clean_bytes(0),
+    is_locked(false)
+  {
+  }
+  void set_list_count()
+  {
+    int lt;
+    for (lt= 0; lt < Ndb_index_stat::LT_Count; lt++)
+    {
+      const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+      list_count[lt]= list.count;
+    }
+  }
+  void lock()
+  {
+    pthread_mutex_lock(&ndb_index_stat_glob_mutex);
+    assert(!is_locked);
+    is_locked= true;
+  }
+  void unlock()
+  {
+    assert(is_locked);
+    g_ndb_status_index_stat_cache_query= cache_query_bytes;
+    g_ndb_status_index_stat_cache_clean= cache_clean_bytes;
+    is_locked= false;
+    pthread_mutex_unlock(&ndb_index_stat_glob_mutex);
+  }
+};
+
+Ndb_index_stat_glob ndb_index_stat_glob;
+
+/* Shared index entries */
+
+Ndb_index_stat::Ndb_index_stat()
+{
+  is= 0;
+  index_id= 0;
+  index_version= 0;
+#ifndef DBUG_OFF
+  memset(id, 0, sizeof(id));
+#endif
+  access_time= 0;
+  load_time= 0;
+  read_time= 0;
+  sample_version= 0;
+  check_time= 0;
+  cache_clean= false;
+  force_update= 0;
+  error_time= 0;
+  error_count= 0;
+  share_next= 0;
+  lt= 0;
+  lt_old= 0;
+  list_next= 0;
+  list_prev= 0;
+  share= 0;
+}
+
+void
+ndb_index_stat_error(Ndb_index_stat *st, const char* place, int line)
+{
+  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  time_t now= ndb_index_stat_time();
+  NdbIndexStat::Error error= st->is->getNdbError();
+  if (error.code == 0)
+  {
+    // XXX why this if
+    NdbIndexStat::Error error2;
+    error= error2;
+    error.code= NdbIndexStat::InternalError;
+    error.status= NdbError::TemporaryError;
+  }
+  st->error= error;
+  st->error_time= now;
+  st->error_count++;
+  pthread_cond_broadcast(&ndb_index_stat_stat_cond);
+  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+
+  DBUG_PRINT("index_stat", ("%s line %d: error %d line %d extra %d",
+                            place, line, error.code, error.line, error.extra));
+}
+
+/* Lists across shares */
+
+Ndb_index_stat_list::Ndb_index_stat_list(int the_lt, const char* the_name)
+{
+  lt= the_lt;
+  name= the_name;
+  head= 0;
+  tail= 0;
+  count= 0;
+}
+
+Ndb_index_stat_list ndb_index_stat_list[Ndb_index_stat::LT_Count] = {
+  Ndb_index_stat_list(0, 0),
+  Ndb_index_stat_list(Ndb_index_stat::LT_New,    "New"),
+  Ndb_index_stat_list(Ndb_index_stat::LT_Update, "Update"),
+  Ndb_index_stat_list(Ndb_index_stat::LT_Read,   "Read"),
+  Ndb_index_stat_list(Ndb_index_stat::LT_Idle,   "Idle"),
+  Ndb_index_stat_list(Ndb_index_stat::LT_Check,  "Check"),
+  Ndb_index_stat_list(Ndb_index_stat::LT_Delete, "Delete"),
+  Ndb_index_stat_list(Ndb_index_stat::LT_Error,  "Error")
+};
+
+void
+ndb_index_stat_list_add(Ndb_index_stat* st, int lt, int place= +1)
+{
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+  assert(st != 0 && st->lt == 0);
+  assert(st->list_next == 0 && st->list_prev == 0);
+  assert(1 <= lt && lt < Ndb_index_stat::LT_Count);
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+
+  DBUG_PRINT("index_stat", ("st %s -> %s", st->id, list.name));
+
+  if (list.count == 0)
+  {
+    assert(list.head == 0 && list.tail == 0);
+    list.head= st;
+    list.tail= st;
+  }
+  else if (place < 0)
+  {
+    assert(list.head != 0 && list.head->list_prev == 0);
+    st->list_next= list.head;
+    list.head->list_prev= st;
+    list.head= st;
+  }
+  else
+  {
+    assert(list.tail != 0 && list.tail->list_next == 0);
+    st->list_prev= list.tail;
+    list.tail->list_next= st;
+    list.tail= st;
+  }
+  list.count++;
+  glob.lock();
+  glob.total_count++;
+  glob.unlock();
+
+  st->lt= lt;
+}
+
+void
+ndb_index_stat_list_remove(Ndb_index_stat* st)
+{
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+  assert(st != 0);
+  int lt= st->lt;
+  assert(1 <= lt && lt < Ndb_index_stat::LT_Count);
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+
+  DBUG_PRINT("index_stat", ("st %s <- %s", st->id, list.name));
+
+  Ndb_index_stat* next= st->list_next;
+  Ndb_index_stat* prev= st->list_prev;
+
+  if (list.head == st)
+    list.head= next;
+  if (list.tail == st)
+    list.tail= prev;
+  assert(list.count != 0);
+  list.count--;
+  glob.lock();
+  assert(glob.total_count != 0);
+  glob.total_count--;
+  glob.unlock();
+
+  if (next != 0)
+    next->list_prev= prev;
+  if (prev != 0)
+    prev->list_next= next;
+
+  st->lt= 0;
+  st->lt_old= 0;
+  st->list_next= 0;
+  st->list_prev= 0;
+}
+
+void
+ndb_index_stat_list_move(Ndb_index_stat *st, int lt, int place= +1)
+{
+  assert(st != 0);
+  ndb_index_stat_list_remove(st);
+  ndb_index_stat_list_add(st, lt, place);
+}
+
+/* Move entry in / out error list */
+
+void
+ndb_index_stat_list_to_error(Ndb_index_stat *st)
+{
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+
+  assert(st != 0);
+  const int lt= st->lt;
+  assert(1 <= lt && lt < Ndb_index_stat::LT_Count);
+  assert(lt != Ndb_index_stat::LT_Error);
+
+  if (st->force_update != 0)
+  {
+    glob.lock();
+    assert(glob.force_update >= st->force_update);
+    glob.force_update-= st->force_update;
+    glob.unlock();
+    st->force_update= 0;
+  }
+
+  time_t now= ndb_index_stat_time();
+  st->error_time= now;
+  ndb_index_stat_list_move(st, Ndb_index_stat::LT_Error);
+}
+
+void
+ndb_index_stat_list_from_error(Ndb_index_stat *st)
+{
+  assert(st != 0);
+  assert(st->lt == Ndb_index_stat::LT_Error);
+  if (st->force_update)
+    ndb_index_stat_list_move(st, Ndb_index_stat::LT_Update);
+  else
+    ndb_index_stat_list_move(st, Ndb_index_stat::LT_Read);
+  st->error.code= 0;
+  st->error.status= NdbError::Success;
+}
+
+/* Find or add entry under the share */
+
+Ndb_index_stat*
+ndb_index_stat_alloc()
+{
+  Ndb_index_stat *st= new Ndb_index_stat;
+  NdbIndexStat *is= new NdbIndexStat;
+  if (st != 0 && is != 0)
+  {
+    st->is= is;
+    return st;
+  }
+  delete is;
+  delete st;
+  return 0;
+}
+
+/* Subroutine, have lock */
+Ndb_index_stat*
+ndb_index_stat_find_share(NDB_SHARE *share,
+                          const NDBINDEX *index,
+                          Ndb_index_stat *&st_last)
+{
+  struct Ndb_index_stat *st= share->index_stat_list;
+  st_last= 0;
+  while (st != 0)
+  {
+    assert(st->share == share);
+    assert(st->is != 0);
+    NdbIndexStat::Head head;
+    st->is->get_head(head);
+    if (head.m_indexId == (uint)index->getObjectId() &&
+        head.m_indexVersion == (uint)index->getObjectVersion())
+      break;
+    st_last= st;
+    st= st->share_next;
+  }
+  return st;
+}
+
+/* Subroutine, have lock */
+Ndb_index_stat*
+ndb_index_stat_add_share(NDB_SHARE *share,
+                         const NDBINDEX *index,
+                         const NDBTAB *table,
+                         Ndb_index_stat *st_last)
+{
+  struct Ndb_index_stat *st= ndb_index_stat_alloc();
+  if (st != 0)
+  {
+    st->share= share;
+    if (st_last == 0)
+      share->index_stat_list= st;
+    else
+      st_last->share_next= st;
+    st->index_id= index->getObjectId();
+    st->index_version= index->getObjectVersion();
+#ifndef DBUG_OFF
+    my_snprintf(st->id, sizeof(st->id), "%d.%d", st->index_id, st->index_version);
+#endif
+    if (st->is->set_index(*index, *table) == -1)
+    {
+      ndb_index_stat_error(st, "set_index", __LINE__);
+      /* Caller assigns list */
+    }
+  }
+  return st;
+}
+
+Ndb_index_stat*
+ndb_index_stat_get_share(NDB_SHARE *share,
+                         const NDBINDEX *index,
+                         const NDBTAB *table,
+                         bool allow_add,
+                         bool force_update)
+{
+  pthread_mutex_lock(&share->mutex);
+  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  time_t now= ndb_index_stat_time();
+
+  struct Ndb_index_stat *st= 0;
+  struct Ndb_index_stat *st_last= 0;
+  if (ndb_index_stat_allow())
+  {
+    st= ndb_index_stat_find_share(share, index, st_last);
+    if (st == 0 && allow_add)
+    {
+      st= ndb_index_stat_add_share(share, index, table, st_last);
+      if (st != 0)
+        ndb_index_stat_list_add(st, Ndb_index_stat::LT_New);
+    }
+    if (st != 0)
+    {
+      if (force_update != 0)
+      {
+        st->force_update++;
+        Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+        glob.lock();
+        glob.force_update++;
+        glob.unlock();
+      }
+      st->access_time= now;
+    }
+  }
+
+  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&share->mutex);
+  return st;
+}
+
+void
+ndb_index_stat_free(Ndb_index_stat *st)
+{
+  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  NDB_SHARE *share= st->share;
+  assert(share != 0);
+
+  Ndb_index_stat *st_head= 0;
+  Ndb_index_stat *st_tail= 0;
+  Ndb_index_stat *st_loop= share->index_stat_list;
+  bool found= false;
+  while (st_loop != 0) {
+    if (st == st_loop) {
+      st->share= 0;
+      assert(st->lt != 0);
+      assert(st->lt != Ndb_index_stat::LT_Delete);
+      ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+      st_loop= st_loop->share_next;
+      assert(!found);
+      found++;
+    } else {
+      if (st_head == 0)
+        st_head= st_loop;
+      else
+        st_tail->share_next= st_loop;
+      st_tail= st_loop;
+      st_loop= st_loop->share_next;
+      st_tail->share_next= 0;
+    }
+  }
+  assert(found);
+  share->index_stat_list= st_head;
+  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+}
+
+void
+ndb_index_stat_free(NDB_SHARE *share)
+{
+  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  Ndb_index_stat *st;
+  while ((st= share->index_stat_list) != 0)
+  {
+    share->index_stat_list= st->share_next;
+    st->share= 0;
+    assert(st->lt != 0);
+    assert(st->lt != Ndb_index_stat::LT_Delete);
+    ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+  }
+  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+}
+
+/* Statistics thread sub-routines */
+
+void
+ndb_index_stat_cache_move(Ndb_index_stat *st)
+{
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+  NdbIndexStat::CacheInfo infoBuild;
+  NdbIndexStat::CacheInfo infoQuery;
+
+  st->is->get_cache_info(infoBuild, NdbIndexStat::CacheBuild);
+  st->is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
+  const uint new_query_bytes= infoBuild.m_totalBytes;
+  const uint old_query_bytes= infoQuery.m_totalBytes;
+  DBUG_PRINT("index_stat", ("st %s cache move: query:%u clean:%u",
+                            st->id, new_query_bytes, old_query_bytes));
+  st->is->move_cache();
+  glob.lock();
+  assert(glob.cache_query_bytes >= old_query_bytes);
+  glob.cache_query_bytes-= old_query_bytes;
+  glob.cache_query_bytes+= new_query_bytes;
+  glob.cache_clean_bytes+= old_query_bytes;
+  glob.unlock();
+}
+
+void
+ndb_index_stat_cache_clean(Ndb_index_stat *st)
+{
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+  NdbIndexStat::CacheInfo infoClean;
+
+  st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
+  const uint old_clean_bytes= infoClean.m_totalBytes;
+  DBUG_PRINT("index_stat", ("st %s cache clean: clean:%u",
+                            st->id, old_clean_bytes));
+  st->is->clean_cache();
+  glob.lock();
+  assert(glob.cache_clean_bytes >= old_clean_bytes);
+  glob.cache_clean_bytes-= old_clean_bytes;
+  glob.unlock();
+}
+
+/* Misc in/out parameters for process steps */
+struct Ndb_index_stat_proc {
+  Ndb *ndb;
+  time_t now;
+  int lt;
+  bool busy;
+  bool end;
+  Ndb_index_stat_proc() :
+    ndb(0),
+    now(0),
+    lt(0),
+    busy(false),
+    end(false)
+  {}
+};
+
+void
+ndb_index_stat_proc_new(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+  if (st->error.code != 0)
+    pr.lt= Ndb_index_stat::LT_Error;
+  else if (st->force_update)
+    pr.lt= Ndb_index_stat::LT_Update;
+  else
+    pr.lt= Ndb_index_stat::LT_Read;
+}
+
+void
+ndb_index_stat_proc_new(Ndb_index_stat_proc &pr)
+{
+  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  const int lt= Ndb_index_stat::LT_New;
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+
+  Ndb_index_stat *st_loop= list.head;
+  while (st_loop != 0)
+  {
+    Ndb_index_stat *st= st_loop;
+    st_loop= st_loop->list_next;
+    DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+    ndb_index_stat_proc_new(pr, st);
+    ndb_index_stat_list_move(st, pr.lt);
+  }
+  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+}
+
+void
+ndb_index_stat_proc_update(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+  if (st->is->update_stat(pr.ndb) == -1)
+  {
+    ndb_index_stat_error(st, "update_stat", __LINE__);
+    pr.lt= Ndb_index_stat::LT_Error;
+    return;
+  }
+  pr.lt= Ndb_index_stat::LT_Read;
+}
+
+void
+ndb_index_stat_proc_update(Ndb_index_stat_proc &pr)
+{
+  const int lt= Ndb_index_stat::LT_Update;
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  const uint batch= opt.get(Ndb_index_stat_opt::Iupdate_batch);
+
+  Ndb_index_stat *st_loop= list.head;
+  uint cnt= 0;
+  while (st_loop != 0 && cnt < batch)
+  {
+    Ndb_index_stat *st= st_loop;
+    st_loop= st_loop->list_next;
+    DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+    ndb_index_stat_proc_update(pr, st);
+    ndb_index_stat_list_move(st, pr.lt);
+    cnt++;
+  }
+  if (cnt == batch)
+    pr.busy= true;
+}
+
+void
+ndb_index_stat_proc_read(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+  NdbIndexStat::Head head;
+  if (st->is->read_stat(pr.ndb) == -1)
+  {
+    ndb_index_stat_error(st, "read_stat", __LINE__);
+    pr.lt= Ndb_index_stat::LT_Error;
+    return;
+  }
+
+  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pr.now= ndb_index_stat_time();
+  st->is->get_head(head);
+  st->load_time= head.m_loadTime;
+  st->read_time= pr.now;
+  st->sample_version= head.m_sampleVersion;
+
+  if (st->force_update != 0)
+  {
+    Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+    glob.lock();
+    assert(glob.force_update >= st->force_update);
+    glob.force_update-= st->force_update;
+    glob.unlock();
+    st->force_update= 0;
+  }
+
+  ndb_index_stat_cache_move(st);
+  st->cache_clean= false;
+  pr.lt= Ndb_index_stat::LT_Idle;
+  pthread_cond_broadcast(&ndb_index_stat_stat_cond);
+  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+}
+
+void
+ndb_index_stat_proc_read(Ndb_index_stat_proc &pr)
+{
+  const int lt= Ndb_index_stat::LT_Read;
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  const uint batch= opt.get(Ndb_index_stat_opt::Iread_batch);
+
+  Ndb_index_stat *st_loop= list.head;
+  uint cnt= 0;
+  while (st_loop != 0 && cnt < batch)
+  {
+    Ndb_index_stat *st= st_loop;
+    st_loop= st_loop->list_next;
+    DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+    ndb_index_stat_proc_read(pr, st);
+    ndb_index_stat_list_move(st, pr.lt);
+    cnt++;
+  }
+  if (cnt == batch)
+    pr.busy= true;
+}
+
+void
+ndb_index_stat_proc_idle(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  const int clean_delay= opt.get(Ndb_index_stat_opt::Iclean_delay);
+  const int check_delay= opt.get(Ndb_index_stat_opt::Icheck_delay);
+  const time_t clean_wait=
+    st->cache_clean ? 0 : st->read_time + clean_delay - pr.now;
+  const time_t check_wait=
+    st->check_time == 0 ? 0 : st->check_time + check_delay - pr.now;
+
+  DBUG_PRINT("index_stat", ("st %s check wait:%ds force update:%u"
+                            " clean wait:%ds cache clean:%d",
+                            st->id, check_wait, st->force_update,
+                            clean_wait, st->cache_clean));
+
+  if (!st->cache_clean && clean_wait <= 0)
+  {
+    ndb_index_stat_cache_clean(st);
+    st->cache_clean= true;
+  }
+  if (st->force_update)
+  {
+    pr.lt= Ndb_index_stat::LT_Update;
+    return;
+  }
+  if (check_wait <= 0)
+  {
+    pr.lt= Ndb_index_stat::LT_Check;
+    return;
+  }
+  pr.lt= Ndb_index_stat::LT_Idle;
+}
+
+void
+ndb_index_stat_proc_idle(Ndb_index_stat_proc &pr)
+{
+  const int lt= Ndb_index_stat::LT_Idle;
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  const uint batch= opt.get(Ndb_index_stat_opt::Iidle_batch);
+  pr.now= ndb_index_stat_time();
+
+  Ndb_index_stat *st_loop= list.head;
+  uint cnt= 0;
+  while (st_loop != 0 && cnt < batch)
+  {
+    Ndb_index_stat *st= st_loop;
+    st_loop= st_loop->list_next;
+    DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+    ndb_index_stat_proc_idle(pr, st);
+    if (pr.lt != lt)
+    {
+      ndb_index_stat_list_move(st, pr.lt);
+      cnt++;
+    }
+  }
+  if (cnt == batch)
+    pr.busy= true;
+}
+
+void
+ndb_index_stat_proc_check(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+  pr.now= ndb_index_stat_time();
+  st->check_time= pr.now;
+  NdbIndexStat::Head head;
+  if (st->is->read_head(pr.ndb) == -1)
+  {
+    ndb_index_stat_error(st, "read_head", __LINE__);
+    pr.lt= Ndb_index_stat::LT_Error;
+    return;
+  }
+  st->is->get_head(head);
+  const uint version_old= st->sample_version;
+  const uint version_new= head.m_sampleVersion;
+  if (version_old != version_new)
+  {
+    DBUG_PRINT("index_stat", ("st %s sample version old:%u new:%u",
+                              st->id, version_old, version_new));
+    pr.lt= Ndb_index_stat::LT_Read;
+    return;
+  }
+  pr.lt= Ndb_index_stat::LT_Idle;
+}
+
+void
+ndb_index_stat_proc_check(Ndb_index_stat_proc &pr)
+{
+  const int lt= Ndb_index_stat::LT_Check;
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  const uint batch= opt.get(Ndb_index_stat_opt::Icheck_batch);
+
+  Ndb_index_stat *st_loop= list.head;
+  uint cnt= 0;
+  while (st_loop != 0 && cnt < batch)
+  {
+    Ndb_index_stat *st= st_loop;
+    st_loop= st_loop->list_next;
+    DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+    ndb_index_stat_proc_check(pr, st);
+    ndb_index_stat_list_move(st, pr.lt);
+    cnt++;
+  }
+  if (cnt == batch)
+    pr.busy= true;
+}
+
+void
+ndb_index_stat_proc_evict(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+  NdbIndexStat::Head head;
+  NdbIndexStat::CacheInfo infoBuild;
+  NdbIndexStat::CacheInfo infoQuery;
+  NdbIndexStat::CacheInfo infoClean;
+  st->is->get_head(head);
+  st->is->get_cache_info(infoBuild, NdbIndexStat::CacheBuild);
+  st->is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
+  st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
+
+  DBUG_PRINT("index_stat",
+             ("evict table: %u index: %u version: %u"
+              " sample version: %u"
+              " cache bytes build:%u query:%u clean:%u",
+              head.m_tableId, head.m_indexId, head.m_indexVersion,
+              head.m_sampleVersion,
+              infoBuild.m_totalBytes, infoQuery.m_totalBytes, infoClean.m_totalBytes));
+
+  /* Twice to move all caches to clean */
+  ndb_index_stat_cache_move(st);
+  ndb_index_stat_cache_move(st);
+  ndb_index_stat_cache_clean(st);
+}
+
+bool
+ndb_index_stat_proc_evict()
+{
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+  glob.lock();
+  uint curr_size= glob.cache_query_bytes + glob.cache_clean_bytes;
+  glob.unlock();
+  const uint cache_lowpct= opt.get(Ndb_index_stat_opt::Icache_lowpct);
+  const uint cache_limit= opt.get(Ndb_index_stat_opt::Icache_limit);
+  if (100 * curr_size <= cache_lowpct * cache_limit)
+    return false;
+  return true;
+}
+
+void
+ndb_index_stat_proc_evict(Ndb_index_stat_proc &pr, int lt)
+{
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  const uint batch= opt.get(Ndb_index_stat_opt::Ievict_batch);
+  const int evict_delay= opt.get(Ndb_index_stat_opt::Ievict_delay);
+  pr.now= ndb_index_stat_time();
+
+  if (!ndb_index_stat_proc_evict())
+    return;
+
+  /* Create a LRU batch */
+  Ndb_index_stat* st_lru_arr[ndb_index_stat_max_evict_batch + 1];
+  uint st_lru_cnt= 0;
+  Ndb_index_stat *st_loop= list.head;
+  while (st_loop != 0 && st_lru_cnt < batch)
+  {
+    Ndb_index_stat *st= st_loop;
+    st_loop= st_loop->list_next;
+    if (st->read_time + evict_delay <= pr.now)
+    {
+      /* Insertion sort into the batch from the end */
+      if (st_lru_cnt == 0)
+        st_lru_arr[st_lru_cnt++]= st;
+      else
+      {
+        uint i= st_lru_cnt;
+        while (i != 0)
+        {
+          if (st_lru_arr[i-1]->access_time < st->access_time)
+            break;
+          i--;
+        }
+        if (i < st_lru_cnt)
+        {
+          uint j= st_lru_cnt; /* There is place for one more at end */
+          while (j > i)
+          {
+            st_lru_arr[j]= st_lru_arr[j-1];
+            j--;
+          }
+          st_lru_arr[i]= st;
+          if (st_lru_cnt < batch)
+            st_lru_cnt++;
+        }
+      }
+    }
+  }
+
+  /* Process the LRU batch */
+  uint cnt= 0;
+  while (cnt < st_lru_cnt)
+  {
+    if (!ndb_index_stat_proc_evict())
+      break;
+
+    Ndb_index_stat *st= st_lru_arr[cnt];
+    DBUG_PRINT("index_stat", ("st %s proc evict %s", st->id, list.name));
+    ndb_index_stat_proc_evict(pr, st);
+    ndb_index_stat_free(st);
+    cnt++;
+  }
+  if (cnt == batch)
+    pr.busy= true;
+}
+
+void
+ndb_index_stat_proc_evict(Ndb_index_stat_proc &pr)
+{
+  ndb_index_stat_proc_evict(pr, Ndb_index_stat::LT_Error);
+  ndb_index_stat_proc_evict(pr, Ndb_index_stat::LT_Idle);
+}
+
+void
+ndb_index_stat_proc_delete(Ndb_index_stat_proc &pr)
+{
+  const int lt= Ndb_index_stat::LT_Delete;
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  const uint delete_batch= opt.get(Ndb_index_stat_opt::Idelete_batch);
+  const uint batch= !pr.end ? delete_batch : 0xFFFFFFFF;
+
+  Ndb_index_stat *st_loop= list.head;
+  uint cnt= 0;
+  while (st_loop != 0 && cnt < batch)
+  {
+    Ndb_index_stat *st= st_loop;
+    st_loop= st_loop->list_next;
+    DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+    ndb_index_stat_proc_evict(pr, st);
+    ndb_index_stat_list_remove(st);
+    delete st->is;
+    delete st;
+    cnt++;
+  }
+  if (cnt == batch)
+    pr.busy= true;
+}
+
+void
+ndb_index_stat_proc_error(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  const int error_delay= opt.get(Ndb_index_stat_opt::Ierror_delay);
+  const time_t error_wait= st->error_time + error_delay - pr.now;
+
+  if (error_wait <= 0)
+  {
+    ndb_index_stat_list_from_error(st);
+    DBUG_PRINT("index_stat", ("st %s error wait:%ds error count:%u",
+                              st->id, (int)error_wait, st->error_count));
+    if (st->force_update)
+      pr.lt= Ndb_index_stat::LT_Update;
+    else
+      pr.lt= Ndb_index_stat::LT_Read;
+    return;
+  }
+  pr.lt= Ndb_index_stat::LT_Error;
+}
+
+void
+ndb_index_stat_proc_error(Ndb_index_stat_proc &pr)
+{
+  const int lt= Ndb_index_stat::LT_Error;
+  Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+  const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+  const uint batch= opt.get(Ndb_index_stat_opt::Ierror_batch);
+  pr.now= ndb_index_stat_time();
+
+  Ndb_index_stat *st_loop= list.head;
+  uint cnt= 0;
+  while (st_loop != 0 && cnt < batch)
+  {
+    Ndb_index_stat *st= st_loop;
+    st_loop= st_loop->list_next;
+    DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+    ndb_index_stat_proc_error(pr, st);
+    if (pr.lt != lt)
+    {
+      ndb_index_stat_list_move(st, pr.lt);
+      cnt++;
+    }
+  }
+  if (cnt == batch)
+    pr.busy= true;
+}
+
+#ifndef DBUG_OFF
+void
+ndb_index_stat_report(const Ndb_index_stat_glob& old_glob)
+{
+  Ndb_index_stat_glob new_glob= ndb_index_stat_glob;
+  new_glob.set_list_count();
+
+  /* List counts */
+  {
+    const uint (&old_count)[Ndb_index_stat::LT_Count]= old_glob.list_count;
+    const uint (&new_count)[Ndb_index_stat::LT_Count]= new_glob.list_count;
+    bool any= false;
+    int lt;
+    for (lt=1; lt < Ndb_index_stat::LT_Count; lt++)
+    {
+      const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+      const char* name= list.name;
+      if (old_count[lt] != new_count[lt])
+      {
+        DBUG_PRINT("index_stat", ("%s: %u -> %u",
+                                  name, old_count[lt], new_count[lt]));
+        any= true;
+      }
+    }
+    if (any)
+    {
+      const uint bufsz= 20 * Ndb_index_stat::LT_Count;
+      char buf[bufsz];
+      char *ptr= buf;
+      for (lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
+      {
+        const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+        const char* name= list.name;
+        sprintf(ptr, " %s:%u", name, new_count[lt]);
+        ptr+= strlen(ptr);
+      }
+      DBUG_PRINT("index_stat", ("list:%s", buf));
+    }
+  }
+
+  /* Cache summary */
+  {
+    const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+    uint query_size= new_glob.cache_query_bytes;
+    uint clean_size= new_glob.cache_clean_bytes;
+    uint total_size= query_size + clean_size;
+    const uint limit= opt.get(Ndb_index_stat_opt::Icache_limit);
+    double pct= 100.0;
+    if (limit != 0)
+      pct= 100.0 * (double)total_size / (double)limit;
+    DBUG_PRINT("index_stat", ("cache query:%u clean:%u (%.2f pct)",
+                              query_size, clean_size, pct));
+  }
+
+  /* Updates waited for and forced updates */
+  {
+    pthread_mutex_lock(&ndb_index_stat_list_mutex);
+    uint wait_update= new_glob.wait_update;
+    uint force_update= new_glob.force_update;
+    pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+    if (wait_update != 0 || force_update != 0)
+    {
+      DBUG_PRINT("index_stat", ("wait update:%u force update:%u",
+                                wait_update, force_update));
+    }
+  }
+}
+#endif
+
+void
+ndb_index_stat_proc(Ndb_index_stat_proc &pr)
+{
+#ifndef DBUG_OFF
+  Ndb_index_stat_glob old_glob= ndb_index_stat_glob;
+  old_glob.set_list_count();
+#endif
+
+  DBUG_ENTER("ndb_index_stat_proc");
+
+  ndb_index_stat_proc_new(pr);
+  ndb_index_stat_proc_update(pr);
+  ndb_index_stat_proc_read(pr);
+  ndb_index_stat_proc_idle(pr);
+  ndb_index_stat_proc_check(pr);
+  ndb_index_stat_proc_evict(pr);
+  ndb_index_stat_proc_delete(pr);
+  ndb_index_stat_proc_error(pr);
+
+#ifndef DBUG_OFF
+  ndb_index_stat_report(old_glob);
+#endif
+  DBUG_VOID_RETURN;
+}
+
+void
+ndb_index_stat_end()
+{
+  DBUG_ENTER("ndb_index_stat_end");
+  Ndb_index_stat_proc pr;
+  pr.end= true;
+
+  /*
+   * Shares have been freed so any index stat entries left should be
+   * in LT_Delete.  The first two steps here should be unnecessary.
+   */
+
+  ndb_index_stat_allow(0);
+
+  int lt;
+  for (lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
+  {
+    if (lt == (int)Ndb_index_stat::LT_Delete)
+      continue;
+    Ndb_index_stat_list &list= ndb_index_stat_list[lt];
+    Ndb_index_stat *st_loop= list.head;
+    while (st_loop != 0)
+    {
+      Ndb_index_stat *st= st_loop;
+      st_loop= st_loop->list_next;
+      DBUG_PRINT("index_stat", ("st %s end %s", st->id, list.name));
+      pr.lt= Ndb_index_stat::LT_Delete;
+      ndb_index_stat_list_move(st, pr.lt);
+    }
+  }
+
+  /* Real free */
+  ndb_index_stat_proc_delete(pr);
+  DBUG_VOID_RETURN;
+}
+
+/* Index stats thread */
+
+static int
+ndb_index_stat_check_or_create_systables(NdbIndexStat* is, Ndb* ndb)
+{
+  DBUG_ENTER("ndb_index_stat_check_or_create_systables");
+
+  if (is->check_systables(ndb) == 0)
+  {
+    DBUG_PRINT("index_stat", ("using existing index stats tables"));
+    DBUG_RETURN(0);
+  }
+
+  if (is->create_systables(ndb) == 0)
+  {
+    DBUG_PRINT("index_stat", ("created index stats tables"));
+    DBUG_RETURN(0);
+  }
+
+  if (is->getNdbError().code == 721)
+  {
+    // race between mysqlds, maybe
+    DBUG_PRINT("index_stat", ("create index stats tables failed: error %d line %d",
+                              is->getNdbError().code, is->getNdbError().line));
+    DBUG_RETURN(-1);
+  }
+
+  sql_print_warning("create index stats tables failed: error %d line %d",
+                    is->getNdbError().code, is->getNdbError().line);
+  DBUG_RETURN(-1);
+}
+
+pthread_handler_t
+ndb_index_stat_thread_func(void *arg __attribute__((unused)))
+{
+  THD *thd; /* needs to be first for thread_stack */
+  struct timespec abstime;
+  Thd_ndb *thd_ndb= NULL;
+
+  my_thread_init();
+  DBUG_ENTER("ndb_index_stat_thread_func");
+
+  // wl4124_todo remove useless stuff copied from utility thread
+ 
+  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+
+  thd= new THD; /* note that contructor of THD uses DBUG_ */
+  if (thd == NULL)
+  {
+    my_errno= HA_ERR_OUT_OF_MEM;
+    DBUG_RETURN(NULL);
+  }
+  THD_CHECK_SENTRY(thd);
+  pthread_detach_this_thread();
+  ndb_index_stat_thread= pthread_self();
+
+  thd->thread_stack= (char*)&thd; /* remember where our stack is */
+  if (thd->store_globals())
+    goto ndb_index_stat_thread_fail;
+  lex_start(thd);
+  thd->init_for_queries();
+#ifndef NDB_THD_HAS_NO_VERSION
+  thd->version=refresh_version;
+#endif
+  thd->client_capabilities = 0;
+  thd->security_ctx->skip_grants();
+  my_net_init(&thd->net, 0);
+
+  CHARSET_INFO *charset_connection;
+  charset_connection= get_charset_by_csname("utf8",
+                                            MY_CS_PRIMARY, MYF(MY_WME));
+  thd->variables.character_set_client= charset_connection;
+  thd->variables.character_set_results= charset_connection;
+  thd->variables.collation_connection= charset_connection;
+  thd->update_charset();
+
+  /* Signal successful initialization */
+  ndb_index_stat_thread_running= 1;
+  pthread_cond_signal(&COND_ndb_index_stat_ready);
+  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+
+  /*
+    wait for mysql server to start
+  */
+  mysql_mutex_lock(&LOCK_server_started);
+  while (!mysqld_server_started)
+  {
+    set_timespec(abstime, 1);
+    mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
+                         &abstime);
+    if (ndbcluster_terminating)
+    {
+      mysql_mutex_unlock(&LOCK_server_started);
+      pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+      goto ndb_index_stat_thread_end;
+    }
+  }
+  mysql_mutex_unlock(&LOCK_server_started);
+
+  /*
+    Wait for cluster to start
+  */
+  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+  while (!g_ndb_status.cluster_node_id && (ndbcluster_hton->slot != ~(uint)0))
+  {
+    /* ndb not connected yet */
+    pthread_cond_wait(&COND_ndb_index_stat_thread, &LOCK_ndb_index_stat_thread);
+    if (ndbcluster_terminating)
+      goto ndb_index_stat_thread_end;
+  }
+  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+
+  /* Get thd_ndb for this thread */
+  if (!(thd_ndb= Thd_ndb::seize(thd)))
+  {
+    sql_print_error("Could not allocate Thd_ndb object");
+    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    goto ndb_index_stat_thread_end;
+  }
+  set_thd_ndb(thd, thd_ndb);
+  thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
+  if (thd_ndb->ndb->setDatabaseName(NDB_INDEX_STAT_DB) == -1)
+  {
+    sql_print_error("Could not change index stats thd_ndb database to %s",
+                    NDB_INDEX_STAT_DB);
+    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    goto ndb_index_stat_thread_end;
+  }
+
+  ndb_index_stat_allow(1);
+  bool enable_ok;
+  enable_ok= false;
+
+  set_timespec(abstime, 0);
+  for (;;)
+  {
+    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    if (!ndbcluster_terminating) {
+      int ret= pthread_cond_timedwait(&COND_ndb_index_stat_thread,
+                                      &LOCK_ndb_index_stat_thread,
+                                      &abstime);
+      const char* reason= ret == ETIMEDOUT ? "timed out" : "wake up";
+      (void)reason; // USED
+      DBUG_PRINT("index_stat", ("loop: %s", reason));
+    }
+    if (ndbcluster_terminating) /* Shutting down server */
+      goto ndb_index_stat_thread_end;
+    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+
+    mysql_mutex_lock(&LOCK_global_system_variables);
+    /* const bool enable_ok_new= THDVAR(NULL, index_stat_enable); */
+    const bool enable_ok_new= ndb_index_stat_get_enable(NULL);
+    mysql_mutex_unlock(&LOCK_global_system_variables);
+
+    Ndb_index_stat_proc pr;
+    pr.ndb= thd_ndb->ndb;
+
+    do
+    {
+      if (enable_ok != enable_ok_new)
+      {
+        DBUG_PRINT("index_stat", ("global enable: %d -> %d",
+                                  enable_ok, enable_ok_new));
+
+        if (enable_ok_new)
+        {
+          // at enable check or create stats tables
+          NdbIndexStat is;
+          if (ndb_index_stat_check_or_create_systables(&is, thd_ndb->ndb) == -1)
+          {
+            // try again in next loop
+            break;
+          }
+        }
+        enable_ok= enable_ok_new;
+      }
+
+      if (!enable_ok)
+        break;
+
+      pr.busy= false;
+      ndb_index_stat_proc(pr);
+    } while (0);
+
+    /* Calculate new time to wake up */
+
+    const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+    uint msecs= 0;
+    if (!enable_ok)
+      msecs= opt.get(Ndb_index_stat_opt::Iloop_checkon);
+    else if (!pr.busy)
+      msecs= opt.get(Ndb_index_stat_opt::Iloop_idle);
+    else
+      msecs= opt.get(Ndb_index_stat_opt::Iloop_busy);
+    DBUG_PRINT("index_stat", ("sleep %dms", msecs));
+
+    set_timespec_nsec(abstime, msecs * 1000000ULL);
+  }
+
+ndb_index_stat_thread_end:
+  net_end(&thd->net);
+
+ndb_index_stat_thread_fail:
+  if (thd_ndb)
+  {
+    Thd_ndb::release(thd_ndb);
+    set_thd_ndb(thd, NULL);
+  }
+  thd->cleanup();
+  delete thd;
+  
+  /* signal termination */
+  ndb_index_stat_thread_running= 0;
+  pthread_cond_signal(&COND_ndb_index_stat_ready);
+  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  DBUG_PRINT("exit", ("ndb_index_stat_thread"));
+
+  DBUG_LEAVE;
+  my_thread_end();
+  pthread_exit(0);
+  return NULL;
+}
+
+/* Optimizer queries */
+
+static ulonglong
+ndb_index_stat_round(double x)
+{
+  char buf[100];
+  if (x < 0.0)
+    x= 0.0;
+  // my_snprintf has no float and windows has no snprintf
+  sprintf(buf, "%.0f", x);
+  /* mysql provides strtoull */
+  ulonglong n= strtoull(buf, 0, 10);
+  return n;
+}
+
+int
+ha_ndbcluster::ndb_index_stat_wait(Ndb_index_stat *st,
+                                   uint sample_version)
+{
+  DBUG_ENTER("ha_ndbcluster::ndb_index_stat_wait");
+
+  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  int err= 0;
+  uint count= 0;
+  (void)count; // USED
+  struct timespec abstime;
+  while (true) {
+    int ret= 0;
+    if (st->error.code != 0 &&
+        (st->error.code != NdbIndexStat::NoIndexStats ||
+         st->force_update == 0))
+    {
+      err= st->error.code;
+      break;
+    }
+    if (st->sample_version > sample_version)
+      break;
+    DBUG_PRINT("index_stat", ("st %s wait count:%u",
+                              st->id, ++count));
+    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_cond_signal(&COND_ndb_index_stat_thread);
+    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    set_timespec(abstime, 1);
+    ret= pthread_cond_timedwait(&ndb_index_stat_stat_cond,
+                                &ndb_index_stat_stat_mutex,
+                                &abstime);
+    if (ret != 0 && ret != ETIMEDOUT)
+    {
+      err= ret;
+      break;
+    }
+  }
+  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  if (err != 0) {
+    DBUG_PRINT("index_stat", ("st %s wait error: %d",
+                               st->id, err));
+    DBUG_RETURN(err);
+  }
+  DBUG_PRINT("index_stat", ("st %s wait ok: sample_version %u -> %u",
+                            st->id, sample_version, st->sample_version));
+  DBUG_RETURN(0);
+}
+
+int
+ha_ndbcluster::ndb_index_stat_query(uint inx,
+                                    const key_range *min_key,
+                                    const key_range *max_key,
+                                    NdbIndexStat::Stat& stat)
+{
+  DBUG_ENTER("ha_ndbcluster::ndb_index_stat_query");
+
+  const KEY *key_info= table->key_info + inx;
+  const NDB_INDEX_DATA &data= m_index[inx];
+  const NDBINDEX *index= data.index;
+  DBUG_PRINT("index_stat", ("index: %s", index->getName()));
+
+  int err= 0;
+
+  /* Create an IndexBound struct for the keys */
+  NdbIndexScanOperation::IndexBound ib;
+  compute_index_bounds(ib, key_info, min_key, max_key);
+  ib.range_no= 0;
+
+  Ndb_index_stat *st=
+    ndb_index_stat_get_share(m_share, index, m_table, true, false);
+  if (st == 0)
+  {
+    DBUG_PRINT("index_stat", ("failed to add index stat share"));
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+
+  err= ndb_index_stat_wait(st, 0);
+  if (err != 0)
+    DBUG_RETURN(err);
+
+  if (st->read_time == 0)
+  {
+    DBUG_PRINT("index_stat", ("no index stats"));
+    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_cond_signal(&COND_ndb_index_stat_thread);
+    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    DBUG_RETURN(NdbIndexStat::NoIndexStats);
+  }
+
+  uint8 bound_lo_buffer[NdbIndexStat::BoundBufferBytes];
+  uint8 bound_hi_buffer[NdbIndexStat::BoundBufferBytes];
+  NdbIndexStat::Bound bound_lo(st->is, bound_lo_buffer);
+  NdbIndexStat::Bound bound_hi(st->is, bound_hi_buffer);
+  NdbIndexStat::Range range(bound_lo, bound_hi);
+
+  const NdbRecord* key_record= data.ndb_record_key;
+  if (st->is->convert_range(range, key_record, &ib) == -1)
+  {
+    ndb_index_stat_error(st, "convert_range", __LINE__);
+    DBUG_RETURN(st->error.code);
+  }
+  if (st->is->query_stat(range, stat) == -1)
+  {
+    /* Invalid cache - should remove the entry */
+    ndb_index_stat_error(st, "query_stat", __LINE__);
+    DBUG_RETURN(st->error.code);
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+ha_ndbcluster::ndb_index_stat_get_rir(uint inx,
+                                      key_range *min_key,
+                                      key_range *max_key,
+                                      ha_rows *rows_out)
+{
+  DBUG_ENTER("ha_ndbcluster::ndb_index_stat_get_rir");
+  uint8 stat_buffer[NdbIndexStat::StatBufferBytes];
+  NdbIndexStat::Stat stat(stat_buffer);
+  int err= ndb_index_stat_query(inx, min_key, max_key, stat);
+  if (err == 0)
+  {
+    double rir= -1.0;
+    NdbIndexStat::get_rir(stat, &rir);
+    DBUG_PRINT("index_stat", ("stat rir: %.2f", rir));
+    ha_rows rows= ndb_index_stat_round(rir);
+    /* Estimate only so cannot return exact zero */
+    if (rows == 0)
+      rows= 1;
+    *rows_out= rows;
+    DBUG_PRINT("index_stat", ("rir: %u", (uint)rows));
+    DBUG_RETURN(0);
+  }
+  DBUG_RETURN(err);
+}
+
+int
+ha_ndbcluster::ndb_index_stat_set_rpk(uint inx)
+{
+  DBUG_ENTER("ha_ndbcluster::ndb_index_stat_set_rpk");
+
+  KEY *key_info= table->key_info + inx;
+  int err= 0;
+
+  uint8 stat_buffer[NdbIndexStat::StatBufferBytes];
+  NdbIndexStat::Stat stat(stat_buffer);
+  const key_range *min_key= 0;
+  const key_range *max_key= 0;
+  err= ndb_index_stat_query(inx, min_key, max_key, stat);
+  if (err == 0)
+  {
+    uint k;
+    for (k= 0; k < key_info->key_parts; k++)
+    {
+      double rpk= -1.0;
+      NdbIndexStat::get_rpk(stat, k, &rpk);
+      ulonglong recs= ndb_index_stat_round(rpk);
+      key_info->rec_per_key[k]= (ulong)recs;
+      DBUG_PRINT("index_stat", ("rpk[%u]: %u", k, (uint)recs));
+    }
+    DBUG_RETURN(0);
+  }
+  DBUG_RETURN(err);
+}
+
+int
+ha_ndbcluster::ndb_index_stat_analyze(Ndb *ndb,
+                                      uint *inx_list,
+                                      uint inx_count)
+{
+  DBUG_ENTER("ha_ndbcluster::ndb_index_stat_analyze");
+
+  struct {
+    uint sample_version;
+    uint error_count;
+  } old[MAX_INDEXES];
+
+  int err= 0;
+  uint i;
+
+  /* Force stats update on each index */
+  for (i= 0; i < inx_count; i++)
+  {
+    uint inx= inx_list[i];
+    const NDB_INDEX_DATA &data= m_index[inx];
+    const NDBINDEX *index= data.index;
+    DBUG_PRINT("index_stat", ("force update: %s", index->getName()));
+
+    Ndb_index_stat *st=
+      ndb_index_stat_get_share(m_share, index, m_table, true, true);
+
+    if (st == 0)
+      DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+
+    old[i].sample_version= st->sample_version;
+    old[i].error_count= st->error_count;
+  }
+
+  /* Wait for each update (or error) */
+  for (i = 0; i < inx_count; i++)
+  {
+    uint inx= inx_list[i];
+    const NDB_INDEX_DATA &data= m_index[inx];
+    const NDBINDEX *index= data.index;
+    DBUG_PRINT("index_stat", ("wait for update: %s", index->getName()));
+
+    Ndb_index_stat *st=
+      ndb_index_stat_get_share(m_share, index, m_table, false, false);
+    if (st == 0)
+      DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
+
+    err= ndb_index_stat_wait(st, old[i].sample_version);
+    if (err != 0)
+      DBUG_RETURN(err);
+  }
+
+  DBUG_RETURN(0);
+}
+
+#endif

=== added file 'sql/ha_ndb_index_stat.h'
--- a/sql/ha_ndb_index_stat.h	1970-01-01 00:00:00 +0000
+++ b/sql/ha_ndb_index_stat.h	2011-06-15 10:37:56 +0000
@@ -0,0 +1,38 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+/* provides declarations only to index_stat.cc */
+
+extern struct st_ndb_status g_ndb_status;
+
+extern pthread_t ndb_index_stat_thread;
+extern pthread_cond_t COND_ndb_index_stat_thread;
+extern pthread_mutex_t LOCK_ndb_index_stat_thread;
+extern pthread_mutex_t ndb_index_stat_glob_mutex;
+extern pthread_mutex_t ndb_index_stat_list_mutex;
+extern pthread_mutex_t ndb_index_stat_stat_mutex;
+extern pthread_cond_t ndb_index_stat_stat_cond;
+
+extern bool ndb_index_stat_get_enable(THD *thd);
+
+extern long g_ndb_status_index_stat_cache_query;
+extern long g_ndb_status_index_stat_cache_clean;
+
+void 
+compute_index_bounds(NdbIndexScanOperation::IndexBound & bound,
+                     const KEY *key_info,
+                     const key_range *start_key, const key_range *end_key);

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-06-20 10:41:04 +0000
+++ b/sql/ha_ndbcluster.cc	2011-06-22 08:09:31 +0000
@@ -56,6 +56,7 @@ static ulong opt_ndb_wait_connected;
 ulong opt_ndb_wait_setup;
 static ulong opt_ndb_cache_check_time;
 static uint opt_ndb_cluster_connection_pool;
+static char* opt_ndb_index_stat_option;
 static char* opt_ndb_connectstring;
 static uint opt_ndb_nodeid;
 
@@ -169,7 +170,7 @@ static MYSQL_THDVAR_BOOL(
 static MYSQL_THDVAR_ULONG(
   index_stat_cache_entries,          /* name */
   PLUGIN_VAR_NOCMDARG,
-  "",
+  "Obsolete (ignored and will be removed later).",
   NULL,                              /* check func. */
   NULL,                              /* update func. */
   32,                                /* default */
@@ -182,7 +183,7 @@ static MYSQL_THDVAR_ULONG(
 static MYSQL_THDVAR_ULONG(
   index_stat_update_freq,            /* name */
   PLUGIN_VAR_NOCMDARG,
-  "",
+  "Obsolete (ignored and will be removed later).",
   NULL,                              /* check func. */
   NULL,                              /* update func. */
   20,                                /* default */
@@ -249,6 +250,15 @@ static MYSQL_THDVAR_UINT(
   0                                  /* block */
 );
 
+/*
+  Required in index_stat.cc but available only from here
+  thanks to use of top level anonymous structs.
+*/
+bool ndb_index_stat_get_enable(THD *thd)
+{
+  return THDVAR(thd, index_stat_enable);
+}
+
 static int ndbcluster_end(handlerton *hton, ha_panic_function flag);
 static bool ndbcluster_show_status(handlerton *hton, THD*,
                                    stat_print_fn *,
@@ -402,27 +412,27 @@ pthread_cond_t COND_ndb_util_thread;
 pthread_cond_t COND_ndb_util_ready;
 pthread_handler_t ndb_util_thread_func(void *arg);
 
+// Index stats thread variables
+pthread_t ndb_index_stat_thread;
+int ndb_index_stat_thread_running= 0;
+pthread_mutex_t LOCK_ndb_index_stat_thread;
+pthread_cond_t COND_ndb_index_stat_thread;
+pthread_cond_t COND_ndb_index_stat_ready;
+pthread_mutex_t ndb_index_stat_glob_mutex;
+pthread_mutex_t ndb_index_stat_list_mutex;
+pthread_mutex_t ndb_index_stat_stat_mutex;
+pthread_cond_t ndb_index_stat_stat_cond;
+pthread_handler_t ndb_index_stat_thread_func(void *arg);
+
+extern void ndb_index_stat_free(NDB_SHARE *share);
+extern void ndb_index_stat_end();
+
 /* Status variables shown with 'show status like 'Ndb%' */
 
-struct st_ndb_status {
-  st_ndb_status() { bzero(this, sizeof(struct st_ndb_status)); }
-  long cluster_node_id;
-  const char * connected_host;
-  long connected_port;
-  long number_of_replicas;
-  long number_of_data_nodes;
-  long number_of_ready_data_nodes;
-  long connect_count;
-  long execute_count;
-  long scan_count;
-  long pruned_scan_count;
-  long schema_locks_count;
-  long transaction_no_hint_count[MAX_NDB_NODES];
-  long transaction_hint_count[MAX_NDB_NODES];
-  long long api_client_stats[Ndb::NumClientStatistics];
-};
+struct st_ndb_status g_ndb_status;
 
-static struct st_ndb_status g_ndb_status;
+long g_ndb_status_index_stat_cache_query = 0;
+long g_ndb_status_index_stat_cache_clean = 0;
 
 long long g_event_data_count = 0;
 long long g_event_nondata_count = 0;
@@ -635,6 +645,11 @@ static int show_ndb_server_api_stats(THD
   return 0;
 }
 
+SHOW_VAR ndb_status_index_stat_variables[]= {
+  {"cache_query",     (char*) &g_ndb_status_index_stat_cache_query, SHOW_LONG},
+  {"cache_clean",     (char*) &g_ndb_status_index_stat_cache_clean, SHOW_LONG},
+  {NullS, NullS, SHOW_LONG}
+};
 
 /*
   Error handling functions
@@ -1107,7 +1122,19 @@ void ha_ndbcluster::set_rec_per_key()
     case ORDERED_INDEX:
       // 'Records pr. key' are unknown for non-unique indexes.
       // (May change when we get better index statistics.)
+    {
+      THD *thd= current_thd;
+      const bool index_stat_enable= THDVAR(NULL, index_stat_enable) &&
+                                    THDVAR(thd, index_stat_enable);
+      if (index_stat_enable)
+      {
+        int err= ndb_index_stat_set_rpk(i);
+        if (err == 0)
+          break;
+      }
+      // no fallback method...
       break;
+    }
     default:
       DBUG_ASSERT(false);
     }
@@ -2124,10 +2151,6 @@ static void ndb_init_index(NDB_INDEX_DAT
   data.unique_index= NULL;
   data.index= NULL;
   data.unique_index_attrid_map= NULL;
-  data.index_stat=NULL;
-  data.index_stat_cache_entries=0;
-  data.index_stat_update_freq=0;
-  data.index_stat_query_count=0;
   data.ndb_record_key= NULL;
   data.ndb_unique_record_key= NULL;
   data.ndb_unique_record_row= NULL;
@@ -2139,10 +2162,6 @@ static void ndb_clear_index(NDBDICT *dic
   {
     my_free((char*)data.unique_index_attrid_map, MYF(0));
   }
-  if (data.index_stat)
-  {
-    delete data.index_stat;
-  }
   if (data.ndb_unique_record_key)
     dict->releaseRecord(data.ndb_unique_record_key);
   if (data.ndb_unique_record_row)
@@ -2214,25 +2233,6 @@ int ha_ndbcluster::add_index_handle(THD 
       break;
     } while (1);
     m_index[index_no].index= index;
-    // ordered index - add stats
-    NDB_INDEX_DATA& d=m_index[index_no];
-    delete d.index_stat;
-    d.index_stat=NULL;
-    if (THDVAR(thd, index_stat_enable))
-    {
-      d.index_stat=new NdbIndexStat();
-      d.index_stat_cache_entries= THDVAR(thd, index_stat_cache_entries);
-      d.index_stat_update_freq= THDVAR(thd, index_stat_update_freq);
-      d.index_stat_query_count=0;
-      d.index_stat->alloc_cache(d.index_stat_cache_entries);
-      DBUG_PRINT("info", ("index %s stat=on cache_entries=%u update_freq=%u",
-                          index->getName(),
-                          d.index_stat_cache_entries,
-                          d.index_stat_update_freq));
-    } else
-    {
-      DBUG_PRINT("info", ("index %s stat=off", index->getName()));
-    }
   }
   if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
   {
@@ -3450,7 +3450,7 @@ count_key_columns(const KEY *key_info, c
 }
 
 /* Helper method to compute NDB index bounds. Note: does not set range_no. */
-static void
+void
 compute_index_bounds(NdbIndexScanOperation::IndexBound & bound,
                      const KEY *key_info,
                      const key_range *start_key, const key_range *end_key)
@@ -3501,6 +3501,11 @@ compute_index_bounds(NdbIndexScanOperati
     bound.high_key= NULL;
     bound.high_key_count= 0;
   }
+  DBUG_PRINT("info", ("start_flag=0x%x end_flag=0x%x"
+                      " lo_keys=%d lo_incl=%d hi_keys=%d hi_incl=%d",
+                      start_key?start_key->flag:0, end_key?end_key->flag:0,
+                      bound.low_key_count, bound.low_inclusive,
+                      bound.high_key_count, bound.high_inclusive));
 }
 
 /**
@@ -6051,9 +6056,11 @@ int ha_ndbcluster::info(uint flag)
     result= update_stats(thd, 1);
     break;
   }
-  if (flag & HA_STATUS_CONST)
+  /* RPK moved to variable part */
+  if (flag & HA_STATUS_VARIABLE)
   {
-    DBUG_PRINT("info", ("HA_STATUS_CONST"));
+    /* No meaningful way to return error */
+    DBUG_PRINT("info", ("rec_per_key"));
     set_rec_per_key();
   }
   if (flag & HA_STATUS_ERRKEY)
@@ -9802,7 +9809,51 @@ int ha_ndbcluster::ndb_optimize_table(TH
 
 int ha_ndbcluster::analyze(THD* thd, HA_CHECK_OPT* check_opt)
 {
-  return update_stats(thd, 1);
+  int err;
+  if ((err= update_stats(thd, 1)) != 0)
+    return err;
+  const bool index_stat_enable= THDVAR(NULL, index_stat_enable) &&
+                                THDVAR(thd, index_stat_enable);
+  if (index_stat_enable)
+  {
+    if ((err= analyze_index(thd)) != 0)
+      return err;
+  }
+  return 0;
+}
+
+int
+ha_ndbcluster::analyze_index(THD *thd)
+{
+  DBUG_ENTER("ha_ndbcluster::analyze_index");
+
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  Ndb *ndb= thd_ndb->ndb;
+
+  uint inx_list[MAX_INDEXES];
+  uint inx_count= 0;
+
+  uint inx;
+  for (inx= 0; inx < table_share->keys; inx++)
+  {
+    NDB_INDEX_TYPE idx_type= get_index_type(inx);  
+
+    if ((idx_type == PRIMARY_KEY_ORDERED_INDEX ||
+         idx_type == UNIQUE_ORDERED_INDEX ||
+         idx_type == ORDERED_INDEX))
+    {
+      if (inx_count < MAX_INDEXES)
+        inx_list[inx_count++]= inx;
+    }
+  }
+
+  if (inx_count != 0)
+  {
+    int err= ndb_index_stat_analyze(ndb, inx_list, inx_count);
+    if (err != 0)
+      DBUG_RETURN(err);
+  }
+  DBUG_RETURN(0);
 }
 
 /*
@@ -10685,6 +10736,14 @@ static int ndbcluster_init(void *p)
   pthread_cond_init(&COND_ndb_util_ready, NULL);
   pthread_cond_init(&COND_ndb_setup_complete, NULL);
   ndb_util_thread_running= -1;
+  pthread_mutex_init(&LOCK_ndb_index_stat_thread, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND_ndb_index_stat_thread, NULL);
+  pthread_cond_init(&COND_ndb_index_stat_ready, NULL);
+  pthread_mutex_init(&ndb_index_stat_glob_mutex, MY_MUTEX_INIT_FAST);
+  pthread_mutex_init(&ndb_index_stat_list_mutex, MY_MUTEX_INIT_FAST);
+  pthread_mutex_init(&ndb_index_stat_stat_mutex, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&ndb_index_stat_stat_cond, NULL);
+  ndb_index_stat_thread_running= -1;
   ndbcluster_terminating= 0;
   ndb_dictionary_is_mysqld= 1;
   ndb_setup_complete= 0;
@@ -10779,6 +10838,44 @@ static int ndbcluster_init(void *p)
     goto ndbcluster_init_error;
   }
 
+  // Create index statistics thread
+  pthread_t tmp2;
+  if (pthread_create(&tmp2, &connection_attrib, ndb_index_stat_thread_func, 0))
+  {
+    DBUG_PRINT("error", ("Could not create ndb index statistics thread"));
+    my_hash_free(&ndbcluster_open_tables);
+    pthread_mutex_destroy(&ndbcluster_mutex);
+    pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
+    pthread_cond_destroy(&COND_ndb_index_stat_thread);
+    pthread_cond_destroy(&COND_ndb_index_stat_ready);
+    pthread_mutex_destroy(&ndb_index_stat_glob_mutex);
+    pthread_mutex_destroy(&ndb_index_stat_list_mutex);
+    pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
+    pthread_cond_destroy(&ndb_index_stat_stat_cond);
+    goto ndbcluster_init_error;
+  }
+
+  /* Wait for the index statistics thread to start */
+  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+  while (ndb_index_stat_thread_running < 0)
+    pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
+  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  
+  if (!ndb_index_stat_thread_running)
+  {
+    DBUG_PRINT("error", ("ndb index statistics thread exited prematurely"));
+    my_hash_free(&ndbcluster_open_tables);
+    pthread_mutex_destroy(&ndbcluster_mutex);
+    pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
+    pthread_cond_destroy(&COND_ndb_index_stat_thread);
+    pthread_cond_destroy(&COND_ndb_index_stat_ready);
+    pthread_mutex_destroy(&ndb_index_stat_glob_mutex);
+    pthread_mutex_destroy(&ndb_index_stat_list_mutex);
+    pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
+    pthread_cond_destroy(&ndb_index_stat_stat_cond);
+    goto ndbcluster_init_error;
+  }
+
 #ifndef NDB_NO_WAIT_SETUP
   ndb_wait_setup_func= ndb_wait_setup_func_impl;
 #endif
@@ -10808,6 +10905,15 @@ static int ndbcluster_end(handlerton *ht
     DBUG_RETURN(0);
   ndbcluster_inited= 0;
 
+  /* wait for index stat thread to finish */
+  sql_print_information("Stopping Cluster Index Statistics thread");
+  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+  ndbcluster_terminating= 1;
+  pthread_cond_signal(&COND_ndb_index_stat_thread);
+  while (ndb_index_stat_thread_running > 0)
+    pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
+  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+
   /* wait for util and binlog thread to finish */
   ndbcluster_binlog_end(NULL);
 
@@ -10827,6 +10933,7 @@ static int ndbcluster_end(handlerton *ht
   }
   my_hash_free(&ndbcluster_open_tables);
 
+  ndb_index_stat_end();
   ndbcluster_disconnect();
 
   ndbcluster_global_schema_lock_deinit();
@@ -10839,6 +10946,10 @@ static int ndbcluster_end(handlerton *ht
   pthread_cond_destroy(&COND_ndb_util_thread);
   pthread_cond_destroy(&COND_ndb_util_ready);
   pthread_cond_destroy(&COND_ndb_setup_complete);
+  pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
+  pthread_cond_destroy(&COND_ndb_index_stat_thread);
+  pthread_cond_destroy(&COND_ndb_index_stat_ready);
+
   DBUG_RETURN(0);
 }
 
@@ -10947,6 +11058,12 @@ void ha_ndbcluster::set_tabname(const ch
 }
 
 
+/*
+  If there are no stored stats, should we do a tree-dive on all db
+  nodes.  The result is fairly good but does mean a round-trip.
+ */
+static const bool g_ndb_records_in_range_tree_dive= false;
+
 /* Determine roughly how many records are in the range specified */
 ha_rows 
 ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
@@ -10972,92 +11089,73 @@ ha_ndbcluster::records_in_range(uint inx
         memcmp(min_key->key, max_key->key, key_length)==0)))
     DBUG_RETURN(1);
   
+  // XXX why this if
   if ((idx_type == PRIMARY_KEY_ORDERED_INDEX ||
        idx_type == UNIQUE_ORDERED_INDEX ||
-       idx_type == ORDERED_INDEX) &&
-      m_index[inx].index_stat != NULL) // --ndb-index-stat-enable=1
+       idx_type == ORDERED_INDEX))
   {
     THD *thd= current_thd;
-    NDB_INDEX_DATA& d=m_index[inx];
-    const NDBINDEX* index= d.index;
-    Ndb *ndb= get_ndb(thd);
-    NdbTransaction* active_trans= m_thd_ndb ? m_thd_ndb->trans : 0;
-    NdbTransaction* trans=NULL;
-    int res=0;
-    Uint64 rows;
+    const bool index_stat_enable= THDVAR(NULL, index_stat_enable) &&
+                                  THDVAR(thd, index_stat_enable);
 
-    do
+    if (index_stat_enable)
     {
-      // We must provide approx table rows
-      Uint64 table_rows=0;
-      if (stats.records != ~(ha_rows)0 && stats.records != 0)
-      {
-        table_rows = stats.records;
-        DBUG_PRINT("info", ("use info->records: %lu", (ulong) table_rows));
-      }
-      else
-      {
-        if (update_stats(thd, 1))
-          break;
-        table_rows= stats.records;
-        DBUG_PRINT("info", ("use db row_count: %lu", (ulong) table_rows));
-        if (table_rows == 0) {
-          // Problem if autocommit=0
-#ifdef ndb_get_table_statistics_uses_active_trans
-          rows=0;
-          break;
-#endif
-        }
-      }
+      ha_rows rows= HA_POS_ERROR;
+      int err= ndb_index_stat_get_rir(inx, min_key, max_key, &rows);
+      if (err == 0)
+        DBUG_RETURN(rows);
+      /*fall through*/
+    }
 
-      /*
-        Query the index statistics for our range.
-      */
-      if ((trans=active_trans) == NULL || 
-	  trans->commitStatus() != NdbTransaction::Started)
+    if (g_ndb_records_in_range_tree_dive)
+    {
+      NDB_INDEX_DATA& d=m_index[inx];
+      const NDBINDEX* index= d.index;
+      Ndb *ndb= get_ndb(thd);
+      NdbTransaction* active_trans= m_thd_ndb ? m_thd_ndb->trans : 0;
+      NdbTransaction* trans=NULL;
+      int res=0;
+      Uint64 rows;
+
+      do
       {
-        DBUG_PRINT("info", ("no active trans"));
-        if (! (trans=ndb->startTransaction()))
-          ERR_BREAK(ndb->getNdbError(), res);
-      }
-      
-      /* Create an IndexBound struct for the keys */
-      NdbIndexScanOperation::IndexBound ib;
-      compute_index_bounds(ib,
-                           key_info,
-                           min_key, 
-                           max_key);
-
-      ib.range_no= 0;
-
-      // Decide if db should be contacted
-      int flags=0;
-      if (d.index_stat_query_count < d.index_stat_cache_entries ||
-          (d.index_stat_update_freq != 0 &&
-           d.index_stat_query_count % d.index_stat_update_freq == 0))
-      {
-        DBUG_PRINT("info", ("force stat from db"));
-        flags|=NdbIndexStat::RR_UseDb;
-      }
-      if (d.index_stat->records_in_range(index, 
-                                         trans, 
-                                         d.ndb_record_key,
-                                         m_ndb_record,
-                                         &ib, 
-                                         table_rows, 
-                                         &rows, 
-                                         flags) == -1)
-        ERR_BREAK(d.index_stat->getNdbError(), res);
-      d.index_stat_query_count++;
-    } while (0);
-
-    if (trans != active_trans && rows == 0)
-      rows = 1;
-    if (trans != active_trans && trans != NULL)
-      ndb->closeTransaction(trans);
-    if (res != 0)
-      DBUG_RETURN(HA_POS_ERROR);
-    DBUG_RETURN(rows);
+        if ((trans=active_trans) == NULL || 
+            trans->commitStatus() != NdbTransaction::Started)
+        {
+          DBUG_PRINT("info", ("no active trans"));
+          if (! (trans=ndb->startTransaction()))
+            ERR_BREAK(ndb->getNdbError(), res);
+        }
+        
+        /* Create an IndexBound struct for the keys */
+        NdbIndexScanOperation::IndexBound ib;
+        compute_index_bounds(ib,
+                             key_info,
+                             min_key, 
+                             max_key);
+
+        ib.range_no= 0;
+
+        NdbIndexStat is;
+        if (is.records_in_range(index, 
+                                trans, 
+                                d.ndb_record_key,
+                                m_ndb_record,
+                                &ib, 
+                                0, 
+                                &rows, 
+                                0) == -1)
+          ERR_BREAK(is.getNdbError(), res);
+      } while (0);
+
+      if (trans != active_trans && rows == 0)
+        rows = 1;
+      if (trans != active_trans && trans != NULL)
+        ndb->closeTransaction(trans);
+      if (res == 0)
+        DBUG_RETURN(rows);
+      /*fall through*/
+    }
   }
 
   /* Use simple heuristics to estimate fraction
@@ -11933,6 +12031,8 @@ void ndbcluster_real_free_share(NDB_SHAR
   if (opt_ndb_extra_logging > 9)
     sql_print_information ("ndbcluster_real_free_share: %s use_count: %u", (*share)->key, (*share)->use_count);
 
+  ndb_index_stat_free(*share);
+
   my_hash_delete(&ndbcluster_open_tables, (uchar*) *share);
   thr_lock_delete(&(*share)->lock);
   pthread_mutex_destroy(&(*share)->mutex);
@@ -15146,6 +15246,7 @@ SHOW_VAR ndb_status_variables_export[]= 
   {"Ndb",          (char*) &ndb_status_injector_variables, SHOW_ARRAY},
   {"Ndb",          (char*) &ndb_status_slave_variables,    SHOW_ARRAY},
   {"Ndb",          (char*) &show_ndb_server_api_stats,     SHOW_FUNC},
+  {"Ndb_index_stat", (char*) &ndb_status_index_stat_variables, SHOW_ARRAY},
   {NullS, NullS, SHOW_LONG}
 };
 
@@ -15224,6 +15325,31 @@ static MYSQL_SYSVAR_UINT(
   0                                  /* block */
 );
 
+/* should be in index_stat.h */
+
+extern int
+ndb_index_stat_option_check(MYSQL_THD,
+                            struct st_mysql_sys_var *var,
+                            void *save,
+                            struct st_mysql_value *value);
+extern void
+ndb_index_stat_option_update(MYSQL_THD,
+                             struct st_mysql_sys_var *var,
+                             void *var_ptr,
+                             const void *save);
+
+extern char ndb_index_stat_option_buf[];
+
+static MYSQL_SYSVAR_STR(
+  index_stat_option,                /* name */
+  opt_ndb_index_stat_option,        /* var */
+  PLUGIN_VAR_RQCMDARG,
+  "Comma-separated tunable options for ndb index statistics",
+  ndb_index_stat_option_check,      /* check func. */
+  ndb_index_stat_option_update,     /* update func. */
+  ndb_index_stat_option_buf
+);
+
 
 ulong opt_ndb_report_thresh_binlog_epoch_slip;
 static MYSQL_SYSVAR_ULONG(
@@ -15418,6 +15544,7 @@ static struct st_mysql_sys_var* system_v
   MYSQL_SYSVAR(batch_size),
   MYSQL_SYSVAR(optimization_delay),
   MYSQL_SYSVAR(index_stat_enable),
+  MYSQL_SYSVAR(index_stat_option),
   MYSQL_SYSVAR(index_stat_cache_entries),
   MYSQL_SYSVAR(index_stat_update_freq),
   MYSQL_SYSVAR(table_no_logging),

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-06-20 10:41:04 +0000
+++ b/sql/ha_ndbcluster.h	2011-06-22 08:09:31 +0000
@@ -64,12 +64,6 @@ typedef struct ndb_index_data {
   const NdbDictionary::Index *unique_index;
   unsigned char *unique_index_attrid_map;
   bool null_in_unique_index;
-  // In this version stats are not shared between threads
-  NdbIndexStat* index_stat;
-  uint index_stat_cache_entries;
-  // Simple counter mechanism to decide when to connect to db
-  uint index_stat_update_freq;
-  uint index_stat_query_count;
   /*
     In mysqld, keys and rows are stored differently (using KEY_PART_INFO for
     keys and Field for rows).
@@ -200,6 +194,24 @@ struct Ndb_local_table_statistics {
 
 #include "ndb_thd_ndb.h"
 
+struct st_ndb_status {
+  st_ndb_status() { bzero(this, sizeof(struct st_ndb_status)); }
+  long cluster_node_id;
+  const char * connected_host;
+  long connected_port;
+  long number_of_replicas;
+  long number_of_data_nodes;
+  long number_of_ready_data_nodes;
+  long connect_count;
+  long execute_count;
+  long scan_count;
+  long pruned_scan_count;
+  long schema_locks_count;
+  long transaction_no_hint_count[MAX_NDB_NODES];
+  long transaction_hint_count[MAX_NDB_NODES];
+  long long api_client_stats[Ndb::NumClientStatistics];
+};
+
 int ndbcluster_commit(handlerton *hton, THD *thd, bool all);
 class ha_ndbcluster: public handler
 {
@@ -213,6 +225,7 @@ class ha_ndbcluster: public handler
 
   int optimize(THD* thd, HA_CHECK_OPT* check_opt);
   int analyze(THD* thd, HA_CHECK_OPT* check_opt);
+  int analyze_index(THD* thd);
 
   int write_row(uchar *buf);
   int update_row(const uchar *old_data, uchar *new_data);
@@ -570,6 +583,21 @@ private:
   void no_uncommitted_rows_update(int);
   void no_uncommitted_rows_reset(THD *);
 
+  /* Ordered index statistics v4 */
+  int ndb_index_stat_get_rir(uint inx,
+                             key_range *min_key,
+                             key_range *max_key,
+                             ha_rows *rows_out);
+  int ndb_index_stat_set_rpk(uint inx);
+  int ndb_index_stat_wait(struct Ndb_index_stat *st,
+                          uint sample_version);
+  int ndb_index_stat_query(uint inx,
+                           const key_range *min_key,
+                           const key_range *max_key,
+                           NdbIndexStat::Stat& stat);
+  int ndb_index_stat_analyze(Ndb *ndb,
+                             uint *inx_list,
+                             uint inx_count);
 
   NdbTransaction *start_transaction_part_id(uint32 part_id, int &error);
   inline NdbTransaction *get_transaction_part_id(uint32 part_id, int &error)
@@ -714,3 +742,5 @@ static const int ndbcluster_hton_name_le
 extern int ndbcluster_terminating;
 extern int ndb_util_thread_running;
 extern pthread_cond_t COND_ndb_util_ready;
+extern int ndb_index_stat_thread_running;
+extern pthread_cond_t COND_ndb_index_stat_ready;

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-06-20 10:41:04 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-06-22 08:09:31 +0000
@@ -716,6 +716,24 @@ int ndbcluster_binlog_end(THD *thd)
     pthread_mutex_unlock(&LOCK_ndb_util_thread);
   }
 
+  if (ndb_index_stat_thread_running > 0)
+  {
+    /*
+      Index stats thread blindly imitates util thread.  Following actually
+      fixes some "[Warning] Plugin 'ndbcluster' will be forced to shutdown".
+    */
+    sql_print_information("Stopping Cluster Index Stats thread");
+    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
+    ndb_index_stat_thread_running++;
+    ndbcluster_terminating= 1;
+    pthread_cond_signal(&COND_ndb_index_stat_thread);
+    while (ndb_index_stat_thread_running > 1)
+      pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
+    ndb_index_stat_thread_running--;
+    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  }
+
   if (ndbcluster_binlog_inited)
   {
     ndbcluster_binlog_inited= 0;

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-06-20 10:41:04 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-06-22 08:09:31 +0000
@@ -121,6 +121,8 @@ void ndbcluster_global_schema_lock_deini
 extern unsigned char g_node_id_map[max_ndb_nodes];
 extern pthread_mutex_t LOCK_ndb_util_thread;
 extern pthread_cond_t COND_ndb_util_thread;
+extern pthread_mutex_t LOCK_ndb_index_stat_thread;
+extern pthread_cond_t COND_ndb_index_stat_thread;
 extern pthread_mutex_t ndbcluster_mutex;
 extern HASH ndbcluster_open_tables;
 

=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h	2011-06-22 06:35:38 +0000
+++ b/sql/ndb_share.h	2011-06-22 08:09:31 +0000
@@ -159,6 +159,7 @@ struct NDB_SHARE {
   char *table_name;
   Ndb::TupleIdRange tuple_id_range;
   struct Ndb_statistics stat;
+  struct Ndb_index_stat* index_stat_list;
   bool util_thread; // if opened by util thread
   uint32 connect_count;
   uint32 flags;

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2011-06-16 09:39:47 +0000
+++ b/storage/ndb/CMakeLists.txt	2011-06-22 08:09:31 +0000
@@ -175,6 +175,7 @@ SET(NDBCLUSTER_SOURCES
   ../../sql/ha_ndbcluster_cond.cc
   ../../sql/ha_ndbcluster_connection.cc
   ../../sql/ha_ndbcluster_binlog.cc
+  ../../sql/ha_ndb_index_stat.cc
   ../../sql/ha_ndbinfo.cc
   ../../sql/ndb_local_connection.cc
   ../../sql/ndb_share.cc

=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h	2011-05-29 10:55:32 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h	2011-06-12 16:54:32 +0000
@@ -226,9 +226,13 @@
  * counted as 1 word.  Values currently contain RIR (one word) and RPK
  * (one word for each key level).  The SAMPLEs table STAT_VALUE column
  * is longer to allow future changes.
+ *
+ * Stats tables are "lifted" to mysql level so for max key size use
+ * MAX_KEY_LENGTH/4 instead of the bigger MAX_KEY_SIZE_IN_WORDS.  The
+ * definition is not available by default, use 3072 directly now.
  */
 #define MAX_INDEX_STAT_KEY_COUNT    MAX_ATTRIBUTES_IN_INDEX
-#define MAX_INDEX_STAT_KEY_SIZE     (MAX_KEY_SIZE_IN_WORDS - 3 - 1)
+#define MAX_INDEX_STAT_KEY_SIZE     ((3072/4) - 3 - 1)
 #define MAX_INDEX_STAT_VALUE_COUNT  (1 + MAX_INDEX_STAT_KEY_COUNT)
 #define MAX_INDEX_STAT_VALUE_SIZE   MAX_INDEX_STAT_VALUE_COUNT
 #define MAX_INDEX_STAT_VALUE_CSIZE  512 /* Longvarbinary(2048) */

=== modified file 'storage/ndb/include/ndb_constants.h'
--- a/storage/ndb/include/ndb_constants.h	2011-05-31 08:28:58 +0000
+++ b/storage/ndb/include/ndb_constants.h	2011-06-12 16:54:32 +0000
@@ -118,10 +118,10 @@
 #define NDB_INDEX_STAT_DB     "mysql"
 #define NDB_INDEX_STAT_SCHEMA "def"
 
-#define NDB_INDEX_STAT_HEAD_TABLE    "NDB$IS_HEAD"
-#define NDB_INDEX_STAT_SAMPLE_TABLE  "NDB$IS_SAMPLE"
-#define NDB_INDEX_STAT_SAMPLE_INDEX1 "NDB$IS_SAMPLE_X1"
+#define NDB_INDEX_STAT_HEAD_TABLE    "ndb_index_stat_head"
+#define NDB_INDEX_STAT_SAMPLE_TABLE  "ndb_index_stat_sample"
+#define NDB_INDEX_STAT_SAMPLE_INDEX1 "ndb_index_stat_sample_x1"
 
-#define NDB_INDEX_STAT_PREFIX        "NDB$IS"
+#define NDB_INDEX_STAT_PREFIX        "ndb_index_stat"
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.cpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2011-06-06 12:18:27 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2011-06-12 16:54:32 +0000
@@ -1694,50 +1694,50 @@ Trix::execINDEX_STAT_IMPL_REQ(Signal* si
 
 const Trix::SysColumn
 Trix::g_statMetaHead_column[] = {
-  { 0, "INDEX_ID",
+  { 0, "index_id",
     true
   },
-  { 1, "INDEX_VERSION",
+  { 1, "index_version",
     true
   },
-  { 2, "TABLE_ID",
+  { 2, "table_id",
     false
   },
-  { 3, "FRAG_COUNT",
+  { 3, "frag_count",
     false
   },
-  { 4, "VALUE_FORMAT",
+  { 4, "value_format",
     false
   },
-  { 5, "SAMPLE_VERSION",
+  { 5, "sample_version",
     false
   },
-  { 6, "LOAD_TIME",
+  { 6, "load_time",
     false
   },
-  { 7, "SAMPLE_COUNT",
+  { 7, "sample_count",
     false
   },
-  { 8, "KEY_BYTES",
+  { 8, "key_bytes",
     false
   }
 };
 
 const Trix::SysColumn
 Trix::g_statMetaSample_column[] = {
-  { 0, "INDEX_ID",
+  { 0, "index_id",
     true
   },
-  { 1, "INDEX_VERSION",
+  { 1, "index_version",
     true
   },
-  { 2, "SAMPLE_VERSION",
+  { 2, "sample_version",
     true
   },
-  { 3, "STAT_KEY",
+  { 3, "stat_key",
     true
   },
-  { 4, "STAT_VALUE",
+  { 4, "stat_value",
     false
   }
 };

=== modified file 'storage/ndb/src/ndbapi/CMakeLists.txt'
--- a/storage/ndb/src/ndbapi/CMakeLists.txt	2011-06-06 12:18:27 +0000
+++ b/storage/ndb/src/ndbapi/CMakeLists.txt	2011-06-14 10:42:04 +0000
@@ -21,7 +21,8 @@ ADD_LIBRARY(ndbapi STATIC
             NdbEventOperation.cpp
             NdbEventOperationImpl.cpp
             NdbIndexStat.cpp
-	    NdbIndexStatImpl.cpp
+            NdbIndexStatImpl.cpp
+            NdbIndexStatFrmData.cpp
             NdbInterpretedCode.cpp
             TransporterFacade.cpp
             ClusterMgr.cpp

=== modified file 'storage/ndb/src/ndbapi/Makefile.am'
--- a/storage/ndb/src/ndbapi/Makefile.am	2011-06-07 13:47:21 +0000
+++ b/storage/ndb/src/ndbapi/Makefile.am	2011-06-22 08:09:31 +0000
@@ -59,6 +59,7 @@ libndbapi_la_SOURCES = \
 	NdbBlob.cpp \
 	NdbIndexStat.cpp \
 	NdbIndexStatImpl.cpp \
+	NdbIndexStatFrmData.cpp \
         SignalSender.cpp \
         ObjectMap.cpp \
 	NdbInterpretedCode.cpp \

=== added file 'storage/ndb/src/ndbapi/NdbIndexStatFrmData.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatFrmData.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatFrmData.cpp	2011-06-12 16:54:32 +0000
@@ -0,0 +1,175 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include <ndb_global.h>
+#include "NdbIndexStatImpl.hpp"
+
+#if ndb_index_stat_systables_sql
+
+use mysql;
+
+create table ndb_index_stat_head (
+  index_id int unsigned not null,
+  index_version int unsigned not null,
+  table_id int unsigned not null,
+  frag_count int unsigned not null,
+  value_format int unsigned not null,
+  sample_version int unsigned not null,
+  load_time int unsigned not null,
+  sample_count int unsigned not null,
+  key_bytes int unsigned not null,
+  primary key using hash (
+    index_id, index_version)
+) engine=ndb;
+
+create table ndb_index_stat_sample (
+  index_id int unsigned not null,
+  index_version int unsigned not null,
+  sample_version int unsigned not null,
+  stat_key varbinary(3056) not null, -- ((3072/4) - 3 - 1) * 4
+  stat_value varbinary(2048) not null, -- 512 * 4
+  primary key using hash (
+    index_id, index_version, sample_version, stat_key),
+  index ndb_index_stat_sample_x1 (
+    index_id, index_version, sample_version)
+) engine=ndb;
+
+#endif
+
+// rest dumped from *.frm by tools/ndb_dump_frm_data
+
+/*
+  name: ndb_index_stat_head
+  orig: 8918
+  pack: 402
+*/
+
+const uint g_ndb_index_stat_head_frm_len = 402;
+
+const uint8 g_ndb_index_stat_head_frm_data[402] =
+{
+  0x01,0x00,0x00,0x00,0xd6,0x22,0x00,0x00,
+  0x86,0x01,0x00,0x00,0x78,0x9c,0xed,0xda,
+  0xbf,0x4e,0xc2,0x40,0x1c,0x07,0xf0,0x6f,
+  0xcb,0xdf,0x1e,0x50,0x08,0x21,0x0c,0x0c,
+  0xa6,0x31,0x21,0x11,0x17,0x9d,0x9d,0xc4,
+  0xc4,0x81,0x18,0x95,0x10,0x16,0x5c,0x9a,
+  0x02,0x87,0xa9,0x02,0x35,0xb4,0x10,0xd9,
+  0x78,0x27,0x1e,0xc1,0x77,0xf0,0x29,0x7c,
+  0x06,0xcf,0x2b,0x50,0x3c,0x36,0x16,0x83,
+  0x9a,0xdf,0x67,0xba,0xfb,0xf6,0x2e,0xf9,
+  0xe5,0xbb,0xb5,0xe9,0xa7,0x66,0x98,0x31,
+  0xa0,0xa0,0x01,0xe7,0x80,0xab,0x55,0xb1,
+  0xa5,0x9f,0x22,0x0d,0x24,0xc2,0x65,0x3a,
+  0xca,0x5c,0x79,0xee,0xe3,0x0d,0xb8,0x58,
+  0xed,0x4c,0xe0,0xec,0x0c,0xb0,0x40,0x08,
+  0x21,0x84,0x10,0x42,0x08,0x21,0x84,0x90,
+  0xdf,0x4c,0xd3,0x01,0x86,0xf0,0x0d,0x5f,
+  0x8f,0xc9,0xdd,0x42,0x6e,0x2b,0x97,0x71,
+  0xe8,0x8b,0xe4,0x7a,0x21,0x9a,0xad,0xc6,
+  0x6d,0xbd,0xd5,0x11,0x87,0x1e,0xf4,0xaf,
+  0xdb,0xb3,0x40,0x86,0x71,0xbf,0xdb,0x1b,
+  0x4e,0xfd,0x80,0x4f,0x4a,0x72,0x6f,0x35,
+  0xeb,0xad,0x76,0xa3,0xdd,0xb8,0xbf,0xb3,
+  0xae,0x3a,0xd6,0xcd,0x75,0xc7,0x3a,0xa9,
+  0x41,0x2b,0xfe,0xe8,0xa8,0x84,0x10,0x42,
+  0x08,0x21,0x84,0x10,0x42,0xfe,0x9f,0x77,
+  0x1d,0x85,0x43,0xcf,0x70,0x48,0x1a,0x0c,
+  0x2c,0xf1,0x20,0x57,0x55,0x3c,0x6d,0xd3,
+  0x26,0xca,0x9b,0xd5,0x12,0xcc,0xd0,0x4b,
+  0x35,0x6b,0x4f,0x88,0xc3,0x70,0xc7,0x7d,
+  0xfe,0x6a,0xbb,0x7d,0x24,0x60,0xae,0xd7,
+  0x33,0x3e,0xf1,0x5d,0x6f,0x8c,0x24,0x8c,
+  0xc0,0xe9,0x0e,0x79,0xf8,0x30,0x85,0xcc,
+  0x60,0xe2,0x3c,0xda,0x3d,0x6f,0x3a,0x0e,
+  0x90,0x46,0x6e,0xe6,0x0c,0xa7,0xdc,0x1e,
+  0x78,0x93,0x91,0x13,0xc8,0xa1,0xf2,0xbe,
+  0x33,0x7a,0x91,0x47,0xa3,0xbb,0x0c,0x6c,
+  0xe8,0x39,0x7d,0x3b,0x70,0x47,0x1c,0x19,
+  0xe4,0x36,0x8f,0xd7,0xd7,0xb3,0x60,0xcf,
+  0x7c,0x6e,0x77,0xe7,0x01,0xf7,0x11,0x37,
+  0x18,0xc3,0xea,0x3b,0x8e,0x9c,0x3f,0x16,
+  0xfe,0xc2,0x61,0xca,0x20,0xa9,0x04,0xc9,
+  0xf0,0x04,0x53,0x82,0x54,0x46,0x06,0xa6,
+  0x12,0xa4,0x73,0x32,0x28,0x2a,0x81,0x91,
+  0x97,0x41,0x59,0x09,0x98,0x84,0x8a,0x12,
+  0x64,0xc2,0x2b,0x47,0x4a,0x90,0x0d,0x4f,
+  0x1c,0x2b,0x81,0x88,0x9a,0x11,0x3b,0xb5,
+  0x88,0xa8,0x13,0xf1,0x5d,0x88,0x50,0xdb,
+  0x10,0xbb,0x55,0x88,0x6d,0x0f,0x42,0x2d,
+  0x41,0x6c,0x1b,0x10,0xf8,0x02,0xe7,0x16,
+  0x7d,0xd4
+};
+
+/*
+  name: ndb_index_stat_sample
+  orig: 12842
+  pack: 371
+*/
+
+const uint g_ndb_index_stat_sample_frm_len = 371;
+
+const uint8 g_ndb_index_stat_sample_frm_data[371] =
+{
+  0x01,0x00,0x00,0x00,0x2a,0x32,0x00,0x00,
+  0x67,0x01,0x00,0x00,0x78,0x9c,0xed,0xda,
+  0x3b,0x4f,0xc2,0x60,0x18,0x05,0xe0,0xd3,
+  0x1b,0xf4,0xa2,0x85,0x81,0x38,0x38,0x35,
+  0x71,0x01,0x17,0x74,0x71,0x32,0x11,0x8c,
+  0x9a,0x10,0xa3,0x12,0xc2,0xc2,0xd4,0x80,
+  0xed,0x40,0xc4,0x6a,0xb8,0x05,0xb6,0xfe,
+  0x36,0xfd,0x3b,0x0e,0xfc,0x00,0xe3,0x67,
+  0x69,0xa9,0xc0,0x62,0xdc,0x8a,0xc9,0x79,
+  0xa6,0xb7,0xa7,0x5f,0x9a,0xf3,0x6d,0x1d,
+  0xde,0x2f,0xc9,0xb4,0x15,0xa0,0x28,0x01,
+  0x35,0xe0,0x4d,0x46,0x09,0x29,0x79,0x06,
+  0x03,0xd0,0x96,0xa3,0x9e,0x66,0xd1,0x01,
+  0x7c,0xbc,0x03,0x67,0xf1,0x93,0x0d,0x54,
+  0xab,0xc0,0x09,0x88,0x88,0x88,0x88,0x88,
+  0x88,0x88,0x68,0x97,0xc9,0x79,0xe0,0x08,
+  0x65,0x7c,0x5a,0xaa,0x02,0x48,0xa1,0x04,
+  0x1c,0xd6,0x54,0xc8,0xa1,0x96,0x0c,0x4a,
+  0x68,0x24,0x83,0x1a,0xee,0x47,0xef,0x6b,
+  0x0b,0x4b,0xc2,0x1e,0xa2,0xa3,0xbf,0x9d,
+  0x15,0xcd,0x56,0xe3,0xae,0xde,0xea,0x88,
+  0xc0,0xeb,0xb9,0xfd,0xc0,0xf3,0x67,0xee,
+  0x68,0xdc,0x1d,0xbb,0xa3,0xee,0xf3,0xeb,
+  0xc0,0x77,0x67,0xa7,0x22,0xeb,0x6b,0x13,
+  0x11,0x11,0x11,0x11,0x11,0x11,0x11,0x11,
+  0x11,0xd1,0x3f,0x60,0x22,0xf0,0x7a,0x8f,
+  0x83,0xc9,0x68,0xec,0x0f,0x97,0xeb,0x6b,
+  0x4e,0xb3,0xde,0x6a,0x37,0xda,0x8d,0x87,
+  0x7b,0xe7,0xb2,0xe3,0xdc,0x5e,0x77,0x9c,
+  0x72,0x05,0x92,0x9d,0x75,0x4d,0x22,0x22,
+  0x22,0xa2,0x9d,0x70,0x2c,0xa3,0x98,0x75,
+  0x87,0x2c,0x49,0xd0,0x30,0x87,0xbd,0xfc,
+  0x6d,0x2c,0x9d,0xff,0xa4,0x4d,0x1c,0xac,
+  0xa6,0x39,0x72,0x9a,0x5c,0xaa,0x38,0x7f,
+  0x04,0x15,0x46,0xb2,0xf0,0xd0,0xf7,0xa2,
+  0x2f,0xdb,0xc9,0x3c,0xf5,0x87,0xa3,0xfe,
+  0x4b,0x80,0x1c,0x0a,0xab,0x15,0x88,0x34,
+  0xc9,0xc3,0x88,0x37,0x23,0x9e,0xfc,0x39,
+  0x74,0x58,0xf1,0x3c,0xed,0x0e,0x26,0x3e,
+  0x54,0xc3,0x34,0x11,0x6f,0x58,0x44,0x2d,
+  0x14,0x1d,0xd0,0xec,0x28,0xd0,0x36,0x82,
+  0x5c,0x21,0x0a,0x8c,0x8d,0x20,0x6f,0xdc,
+  0x2c,0xac,0x78,0x4b,0x23,0x0a,0x0a,0x17,
+  0x80,0x6e,0x5d,0x41,0x17,0xd6,0x3a,0x10,
+  0x69,0x37,0xb1,0x55,0x4c,0x6c,0xb7,0x12,
+  0x69,0x25,0xb1,0xee,0x23,0xf0,0x0d,0x53,
+  0x4c,0x66,0xbc
+};

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-06-07 13:28:40 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-06-16 18:16:01 +0000
@@ -101,6 +101,10 @@ NdbIndexStatImpl::Sys::~Sys()
 void
 NdbIndexStatImpl::sys_release(Sys& sys)
 {
+  // close schema trans if any exists
+  NdbDictionary::Dictionary* const dic = sys.m_dic;
+  (void)dic->endSchemaTrans(NdbDictionary::Dictionary::SchemaTransAbort);
+
   if (sys.m_headtable != 0)
   {
     sys.m_dic->removeTableGlobal(*sys.m_headtable, false);
@@ -123,59 +127,67 @@ NdbIndexStatImpl::make_headtable(NdbDict
 {
   tab.setName(g_headtable_name);
   tab.setLogging(true);
+  int ret;
+  ret = tab.setFrm(g_ndb_index_stat_head_frm_data,
+                   g_ndb_index_stat_head_frm_len);
+  if (ret != 0)
+  {
+    setError(ret, __LINE__);
+    return -1;
+  }
   // key must be first
   {
-    NdbDictionary::Column col("INDEX_ID");
+    NdbDictionary::Column col("index_id");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setPrimaryKey(true);
     tab.addColumn(col);
   }
   {
-    NdbDictionary::Column col("INDEX_VERSION");
+    NdbDictionary::Column col("index_version");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setPrimaryKey(true);
     tab.addColumn(col);
   }
   // table
   {
-    NdbDictionary::Column col("TABLE_ID");
+    NdbDictionary::Column col("table_id");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setNullable(false);
     tab.addColumn(col);
   }
   {
-    NdbDictionary::Column col("FRAG_COUNT");
+    NdbDictionary::Column col("frag_count");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setNullable(false);
     tab.addColumn(col);
   }
   // current sample
   {
-    NdbDictionary::Column col("VALUE_FORMAT");
+    NdbDictionary::Column col("value_format");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setNullable(false);
     tab.addColumn(col);
   }
   {
-    NdbDictionary::Column col("SAMPLE_VERSION");
+    NdbDictionary::Column col("sample_version");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setNullable(false);
     tab.addColumn(col);
   }
   {
-    NdbDictionary::Column col("LOAD_TIME");
+    NdbDictionary::Column col("load_time");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setNullable(false);
     tab.addColumn(col);
   }
   {
-    NdbDictionary::Column col("SAMPLE_COUNT");
+    NdbDictionary::Column col("sample_count");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setNullable(false);
     tab.addColumn(col);
   }
   {
-    NdbDictionary::Column col("KEY_BYTES");
+    NdbDictionary::Column col("key_bytes");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setNullable(false);
     tab.addColumn(col);
@@ -193,27 +205,35 @@ NdbIndexStatImpl::make_sampletable(NdbDi
 {
   tab.setName(g_sampletable_name);
   tab.setLogging(true);
+  int ret;
+  ret = tab.setFrm(g_ndb_index_stat_sample_frm_data,
+                   g_ndb_index_stat_sample_frm_len);
+  if (ret != 0)
+  {
+    setError(ret, __LINE__);
+    return -1;
+  }
   // key must be first
   {
-    NdbDictionary::Column col("INDEX_ID");
+    NdbDictionary::Column col("index_id");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setPrimaryKey(true);
     tab.addColumn(col);
   }
   {
-    NdbDictionary::Column col("INDEX_VERSION");
+    NdbDictionary::Column col("index_version");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setPrimaryKey(true);
     tab.addColumn(col);
   }
   {
-    NdbDictionary::Column col("SAMPLE_VERSION");
+    NdbDictionary::Column col("sample_version");
     col.setType(NdbDictionary::Column::Unsigned);
     col.setPrimaryKey(true);
     tab.addColumn(col);
   }
   {
-    NdbDictionary::Column col("STAT_KEY");
+    NdbDictionary::Column col("stat_key");
     col.setType(NdbDictionary::Column::Longvarbinary);
     col.setPrimaryKey(true);
     col.setLength(MaxKeyBytes);
@@ -221,7 +241,7 @@ NdbIndexStatImpl::make_sampletable(NdbDi
   }
   // value
   {
-    NdbDictionary::Column col("STAT_VALUE");
+    NdbDictionary::Column col("stat_value");
     col.setType(NdbDictionary::Column::Longvarbinary);
     col.setNullable(false);
     col.setLength(MaxValueCBytes);
@@ -242,9 +262,9 @@ NdbIndexStatImpl::make_sampleindex1(NdbD
   ind.setName(g_sampleindex1_name);
   ind.setType(NdbDictionary::Index::OrderedIndex);
   ind.setLogging(false);
-  ind.addColumnName("INDEX_ID");
-  ind.addColumnName("INDEX_VERSION");
-  ind.addColumnName("SAMPLE_VERSION");
+  ind.addColumnName("index_id");
+  ind.addColumnName("index_version");
+  ind.addColumnName("sample_version");
   return 0;
 }
 
@@ -382,7 +402,13 @@ NdbIndexStatImpl::create_systables(Ndb* 
     return -1;
   }
 
-  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+  NdbDictionary::Dictionary* const dic = sys.m_dic;
+
+  if (dic->beginSchemaTrans() == -1)
+  {
+    setError(dic->getNdbError().code, __LINE__);
+    return -1;
+  }
 
   {
     NdbDictionary::Table tab;
@@ -406,6 +432,19 @@ NdbIndexStatImpl::create_systables(Ndb* 
     NdbDictionary::Table tab;
     if (make_sampletable(tab) == -1)
       return -1;
+
+#ifdef VM_TRACE
+    // test of schema trans
+    {
+      const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_SYS_CREATE", (char*)0, 0);
+      if (p != 0 && strchr("1Y", p[0]) != 0)
+      {
+        setError(9999, __LINE__);
+        return -1;
+      }
+    }
+#endif
+
     if (dic->createTable(tab) == -1)
     {
       setError(dic->getNdbError().code, __LINE__);
@@ -438,6 +477,12 @@ NdbIndexStatImpl::create_systables(Ndb* 
     }
   }
 
+  if (dic->endSchemaTrans() == -1)
+  {
+    setError(dic->getNdbError().code, __LINE__);
+    return -1;
+  }
+
   return 0;
 }
 
@@ -450,7 +495,13 @@ NdbIndexStatImpl::drop_systables(Ndb* nd
       m_error.code != BadSysTables)
     return -1;
 
-  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+  NdbDictionary::Dictionary* const dic = sys.m_dic;
+
+  if (dic->beginSchemaTrans() == -1)
+  {
+    setError(dic->getNdbError().code, __LINE__);
+    return -1;
+  }
 
   if (sys.m_headtable != 0)
   {
@@ -463,12 +514,31 @@ NdbIndexStatImpl::drop_systables(Ndb* nd
 
   if (sys.m_sampletable != 0)
   {
+
+#ifdef VM_TRACE
+    // test of schema trans
+    {
+      const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_ABORT_SYS_DROP", (char*)0, 0);
+      if (p != 0 && strchr("1Y", p[0]) != 0)
+      {
+        setError(9999, __LINE__);
+        return -1;
+      }
+    }
+#endif
+
     if (dic->dropTableGlobal(*sys.m_sampletable) == -1)
     {
       setError(dic->getNdbError().code, __LINE__);
       return -1;
     }
   }
+
+  if (dic->endSchemaTrans() == -1)
+  {
+    setError(dic->getNdbError().code, __LINE__);
+    return -1;
+  }
     
   return 0;
 }
@@ -816,12 +886,12 @@ NdbIndexStatImpl::sys_head_setkey(Con& c
 {
   Head& head = con.m_head;
   NdbOperation* op = con.m_op;
-  if (op->equal("INDEX_ID", (char*)&head.m_indexId) == -1)
+  if (op->equal("index_id", (char*)&head.m_indexId) == -1)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->equal("INDEX_VERSION", (char*)&head.m_indexVersion) == -1)
+  if (op->equal("index_version", (char*)&head.m_indexVersion) == -1)
   {
     setError(con, __LINE__);
     return -1;
@@ -834,37 +904,37 @@ NdbIndexStatImpl::sys_head_getvalue(Con&
 {
   Head& head = con.m_head;
   NdbOperation* op = con.m_op;
-  if (op->getValue("TABLE_ID", (char*)&head.m_tableId) == 0)
+  if (op->getValue("table_id", (char*)&head.m_tableId) == 0)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->getValue("FRAG_COUNT", (char*)&head.m_fragCount) == 0)
+  if (op->getValue("frag_count", (char*)&head.m_fragCount) == 0)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->getValue("VALUE_FORMAT", (char*)&head.m_valueFormat) == 0)
+  if (op->getValue("value_format", (char*)&head.m_valueFormat) == 0)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->getValue("SAMPLE_VERSION", (char*)&head.m_sampleVersion) == 0)
+  if (op->getValue("sample_version", (char*)&head.m_sampleVersion) == 0)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->getValue("LOAD_TIME", (char*)&head.m_loadTime) == 0)
+  if (op->getValue("load_time", (char*)&head.m_loadTime) == 0)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->getValue("SAMPLE_COUNT", (char*)&head.m_sampleCount) == 0)
+  if (op->getValue("sample_count", (char*)&head.m_sampleCount) == 0)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->getValue("KEY_BYTES", (char*)&head.m_keyBytes) == 0)
+  if (op->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
   {
     setError(con, __LINE__);
     return -1;
@@ -877,22 +947,22 @@ NdbIndexStatImpl::sys_sample_setkey(Con&
 {
   Head& head = con.m_head;
   NdbIndexScanOperation* op = con.m_scanop;
-  if (op->equal("INDEX_ID", (char*)&head.m_indexId) == -1)
+  if (op->equal("index_id", (char*)&head.m_indexId) == -1)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->equal("INDEX_VERSION", (char*)&head.m_indexVersion) == -1)
+  if (op->equal("index_version", (char*)&head.m_indexVersion) == -1)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->equal("SAMPLE_VERSION", (char*)&head.m_sampleVersion) == -1)
+  if (op->equal("sample_version", (char*)&head.m_sampleVersion) == -1)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->equal("STAT_KEY", (char*)m_keyData.get_full_buf()) == -1)
+  if (op->equal("stat_key", (char*)m_keyData.get_full_buf()) == -1)
   {
     setError(con, __LINE__);
     return -1;
@@ -904,12 +974,12 @@ int
 NdbIndexStatImpl::sys_sample_getvalue(Con& con)
 {
   NdbIndexScanOperation* op = con.m_scanop;
-  if (op->getValue("STAT_KEY", (char*)m_keyData.get_full_buf()) == 0)
+  if (op->getValue("stat_key", (char*)m_keyData.get_full_buf()) == 0)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->getValue("STAT_VALUE", (char*)m_valueData.get_full_buf()) == 0)
+  if (op->getValue("stat_value", (char*)m_valueData.get_full_buf()) == 0)
   {
     setError(con, __LINE__);
     return -1;
@@ -925,19 +995,19 @@ NdbIndexStatImpl::sys_sample_setbound(Co
   const NdbIndexScanOperation::BoundType eq_bound =
     NdbIndexScanOperation::BoundEQ;
 
-  if (op->setBound("INDEX_ID", eq_bound, &head.m_indexId) == -1)
+  if (op->setBound("index_id", eq_bound, &head.m_indexId) == -1)
   {
     setError(con, __LINE__);
     return -1;
   }
-  if (op->setBound("INDEX_VERSION", eq_bound, &head.m_indexVersion) == -1)
+  if (op->setBound("index_version", eq_bound, &head.m_indexVersion) == -1)
   {
     setError(con, __LINE__);
     return -1;
   }
   if (sv_bound != -1)
   {
-    if (op->setBound("SAMPLE_VERSION", sv_bound, &head.m_sampleVersion) == -1)
+    if (op->setBound("sample_version", sv_bound, &head.m_sampleVersion) == -1)
     {
       setError(con, __LINE__);
       return -1;

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	2011-06-07 10:03:02 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	2011-06-12 16:54:32 +0000
@@ -30,6 +30,11 @@ class NdbIndexScanOperation;
 class NdbRecAttr;
 class NdbOperation;
 
+extern const uint g_ndb_index_stat_head_frm_len;
+extern const uint8 g_ndb_index_stat_head_frm_data[];
+extern const uint g_ndb_index_stat_sample_frm_len;
+extern const uint8 g_ndb_index_stat_sample_frm_data[];
+
 class NdbIndexStatImpl : public NdbIndexStat {
 public:
   friend class NdbIndexStat;

=== modified file 'storage/ndb/tools/CMakeLists.txt'
--- a/storage/ndb/tools/CMakeLists.txt	2011-05-24 12:16:31 +0000
+++ b/storage/ndb/tools/CMakeLists.txt	2011-06-17 07:14:20 +0000
@@ -82,6 +82,14 @@ ADD_CUSTOM_COMMAND(OUTPUT ${PROJECT_SOUR
 ADD_CUSTOM_TARGET(ndbinfo_sql_run ALL
 				  DEPENDS ${PROJECT_SOURCE_DIR}/storage/ndb/tools/ndbinfo.sql)
 
+MYSQL_ADD_EXECUTABLE(ndb_index_stat
+  ndb_index_stat.cpp
+  COMPONENT ClusterTools)
+TARGET_LINK_LIBRARIES(ndb_index_stat ndbNDBT ndbgeneral)
+
+ADD_EXECUTABLE(ndb_dump_frm_data
+  ndb_dump_frm_data.cpp)
+TARGET_LINK_LIBRARIES(ndb_dump_frm_data ndbNDBT ndbgeneral)
 
 IF (MYSQL_VERSION_ID LESS "50501")
   # Don't build or install this program anymore in 5.5+

=== modified file 'storage/ndb/tools/Makefile.am'
--- a/storage/ndb/tools/Makefile.am	2011-06-07 13:47:21 +0000
+++ b/storage/ndb/tools/Makefile.am	2011-06-22 08:09:31 +0000
@@ -17,7 +17,7 @@ EXTRA_DIST = CMakeLists.txt ndbinfo.sql
 
 BUILT_SOURCES = ndbinfo.sql
 
-noinst_PROGRAMS = 	ndbinfo_sql
+noinst_PROGRAMS = 	ndbinfo_sql ndb_dump_frm_data
 dist_bin_SCRIPTS =	ndb_size.pl ndb_error_reporter
 dist_pkgdata_DATA = ndbinfo.sql
 
@@ -78,6 +78,7 @@ ndbinfo.sql: $(ndbinfo_sql_SOURCES)
 	$(MV) $@-t $@
 
 ndb_index_stat_SOURCES = ndb_index_stat.cpp $(tools_common_sources)
+ndb_dump_frm_data_SOURCES = ndb_dump_frm_data.cpp
 
 include $(top_srcdir)/storage/ndb/config/common.mk.am
 include $(top_srcdir)/storage/ndb/config/type_ndbapitools.mk.am
@@ -95,4 +96,5 @@ ndb_restore_LDFLAGS = @ndb_bin_am_ldflag
 ndb_config_LDFLAGS = @ndb_bin_am_ldflags@
 ndbinfo_sql_LDFLAGS = @ndb_bin_am_ldflags@
 ndb_index_stat_LDFLAGS = @ndb_bin_am_ldflags@
+ndb_dump_frm_data_LDFLAGS = @ndb_bin_am_ldflags@
 

=== added file 'storage/ndb/tools/ndb_dump_frm_data.cpp'
--- a/storage/ndb/tools/ndb_dump_frm_data.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/tools/ndb_dump_frm_data.cpp	2011-06-22 08:09:31 +0000
@@ -0,0 +1,176 @@
+/* Copyright (C) 2003 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include <ndb_global.h>
+#include <ndb_opts.h>
+
+#include <NdbOut.hpp>
+#include <NdbApi.hpp>
+#include <NDBT.hpp>
+
+// UNUSED static int oi = 1000;
+static struct my_option
+my_long_options[] =
+{
+  { "help", '?',
+    "Display this help and exit.",
+    0, 0, 0,
+    GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { 0, 0,
+    0,
+    0, 0, 0,
+    GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }
+};
+
+const char*
+load_default_groups[]= { 0 };
+
+static void
+short_usage_sub(void)
+{
+  ndb_short_usage_sub("*.frm ...");
+}
+
+static void
+usage()
+{
+  printf("%s: pack and dump *.frm as C arrays\n", my_progname);
+  ndb_usage(short_usage_sub, load_default_groups, my_long_options);
+}
+
+static void
+dodump(const char* name, const uchar* frm_data, uint frm_len)
+{
+  printf("const uint g_%s_frm_len = %u;\n\n", name, frm_len);
+  printf("const uint8 g_%s_frm_data[%u] =\n{\n", name, frm_len);
+  uint n = 0;
+  while (n < frm_len) {
+    if (n % 8 == 0) {
+      if (n != 0) {
+        printf("\n");
+      }
+      printf("  ");
+    }
+    printf("0x%02x", frm_data[n]);
+    if (n + 1 < frm_len) {
+            printf(",");
+    }
+    n++;
+  }
+  if (n % 8 != 0) {
+    printf("\n");
+  }
+  printf("};\n");
+}
+
+static int
+dofile(const char* file)
+{
+  struct stat st;
+  size_t size = 0;
+  uchar* data = 0;
+  int fd = -1;
+  uchar* pack_data = 0;
+  size_t pack_len = 0;
+  char* namebuf = 0;
+  int ret = -1;
+  do
+  {
+    if (stat(file, &st) == -1)
+    {
+      fprintf(stderr, "%s: stat: %s\n", file, strerror(errno));
+      break;
+    }
+    size = st.st_size;
+    if ((data = (uchar*)malloc(size)) == 0)
+    {
+      fprintf(stderr, "%s: malloc %u: %s\n", file, (uint)size, strerror(errno));
+      break;
+    }
+    if ((fd = open(file, O_RDONLY)) == -1)
+    {
+      fprintf(stderr, "%s: open: %s\n", file, strerror(errno));
+      break;
+    }
+    ssize_t size2;
+    if ((size2 = read(fd, data, size)) == -1)
+    {
+      fprintf(stderr, "%s: read: %s\n", file, strerror(errno));
+      break;
+    }
+    if ((size_t)size2 != size)
+    {
+      fprintf(stderr, "%s: short read: %u != %u\n", file, (uint)size2, (uint)size);
+      break;
+    }
+    int error;
+    if ((error = packfrm(data, size, &pack_data, &pack_len)) != 0)
+    {
+      fprintf(stderr, "%s: packfrm: error %d\n", file, error);
+      break;
+    }
+    namebuf = strdup(file);
+    if (namebuf == 0)
+    {
+      fprintf(stderr, "%s: strdup: %s\n", file, strerror(errno));
+      break;
+    }
+    char* name = namebuf;
+    if (strchr(name, '/') != 0)
+      name = strrchr(name, '/') + 1;
+    char* dot;
+    if ((dot = strchr(name, '.')) != 0)
+      *dot = 0;
+    printf("\n/*\n");
+    printf("  name: %s\n", name);
+    printf("  orig: %u\n", (uint)size);
+    printf("  pack: %u\n", (uint)pack_len);
+    printf("*/\n\n");
+    dodump(name, pack_data, pack_len);
+    ret = 0;
+  }
+  while (0);
+  if (namebuf != 0)
+    free(namebuf);
+  if (pack_data != 0)
+    my_free(pack_data); // Free data returned from packfrm with my_free
+  if (fd != -1)
+    (void)close(fd);
+  if (data != 0)
+    free(data);
+  return ret;
+}
+
+int
+main(int argc, char** argv)
+{
+  my_progname = "ndb_pack_frm";
+  int ret;
+
+  ndb_init();
+  ndb_opt_set_usage_funcs(short_usage_sub, usage);
+  ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option);
+ if (ret != 0)
+    return NDBT_WRONGARGS;
+
+  for (int i = 0; i < argc; i++)
+  {
+    ret = dofile(argv[i]);
+    if (ret != 0)
+      return NDBT_FAILED;
+  }
+
+  return NDBT_OK;
+}

=== modified file 'storage/ndb/tools/ndb_index_stat.cpp'
--- a/storage/ndb/tools/ndb_index_stat.cpp	2011-06-06 12:18:47 +0000
+++ b/storage/ndb/tools/ndb_index_stat.cpp	2011-06-14 10:42:04 +0000
@@ -20,6 +20,7 @@
 #include <NdbApi.hpp>
 #include <NDBT.hpp>
 #include <NdbIndexStatImpl.hpp>
+#include <ndb_rand.h>
 
 // stats options
 static const char* _dbname = 0;
@@ -93,7 +94,7 @@ doconnect()
 
     g_dic = g_ndb->getDictionary();
 
-    g_ndb_sys = new Ndb(g_ncc, "mysql");
+    g_ndb_sys = new Ndb(g_ncc, NDB_INDEX_STAT_DB);
     CHK2(g_ndb_sys->init() == 0, g_ndb_sys->getNdbError());
     CHK2(g_ndb_sys->waitUntilReady(30) == 0, g_ndb_sys->getNdbError());
 
@@ -207,18 +208,18 @@ doquery()
         NdbIndexStat::Bound& b = (i == 0 ? b_lo : b_hi);
 
         bool strict = false;
-        if (random() % 3 != 0)
+        if (ndb_rand() % 3 != 0)
         {
-          if (random() % 3 != 0)
+          if (ndb_rand() % 3 != 0)
           {
-            Uint32 x = random();
+            Uint32 x = ndb_rand();
             CHK2(g_is->add_bound(b, &x) == 0, g_is->getNdbError());
           }
           else
           {
             CHK2(g_is->add_bound_null(b) == 0, g_is->getNdbError());
           }
-          bool strict = (random() % 2 == 0);
+          bool strict = (ndb_rand() % 2 == 0);
           g_is->set_bound_strict(b, strict);
         }
       }
@@ -251,7 +252,7 @@ dostats(int i)
     if (_delete)
     {
       g_info << g_indname << ": delete stats" << endl;
-      if (random() % 2 == 0)
+      if (ndb_rand() % 2 == 0)
       {
         CHK2(g_dic->deleteIndexStat(*g_ind, *g_tab) == 0, g_dic->getNdbError());
       }
@@ -264,7 +265,7 @@ dostats(int i)
     if (_update)
     {
       g_info << g_indname << ": update stats" << endl;
-      if (random() % 2 == 0)
+      if (ndb_rand() % 2 == 0)
       {
         CHK2(g_dic->updateIndexStat(*g_ind, *g_tab) == 0, g_dic->getNdbError());
       }
@@ -615,8 +616,6 @@ main(int argc, char** argv)
   my_progname = "ndb_index_stat";
   int ret;
 
-  srandom((unsigned)time(0));
-
   ndb_init();
   ndb_opt_set_usage_funcs(short_usage_sub, usage);
   ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option);
@@ -625,6 +624,10 @@ main(int argc, char** argv)
 
   setOutputLevel(_verbose ? 2 : 0);
 
+  unsigned seed = (unsigned)time(0);
+  g_info << "random seed " << seed << endl;
+  ndb_srand(seed);
+
   ret = doall();
   if (ret == -1)
     return NDBT_ProgramExit(NDBT_FAILED);

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster branch (magnus.blaudd:3363 to 3364) magnus.blaudd22 Jun