List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:December 20 2011 1:20pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3660)
View as plain text  
 3660 Mikael Ronstrom	2011-12-20 [merge]
      merge

=== modified file '.bzrignore'
--- a/.bzrignore	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/.bzrignore	revid:mikael.ronstrom@stripped
@@ -3046,7 +3046,7 @@ storage/ndb/**/target
 storage/ndb/**/*.class
 storage/ndb/src/ndbjtie/**/*.sh
 storage/ndb/src/ndbjtie/**/*.log
-storage/ndb/src/ndbjtie/jtie/test/myapi/myapi_test
+storage/ndb/src/ndbjtie/**/*_test
 storage/ndb/clusterj/**/*MANIFEST.MF
 storage/ndb/clusterj/**/*manifest.mf
 storage/ndb/test/crund/*.cnf

=== modified file 'mysql-test/CMakeLists.txt'
--- a/mysql-test/CMakeLists.txt	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/CMakeLists.txt	revid:mikael.ronstrom@stripped
@@ -13,13 +13,15 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 
+
+# MCP_BUG13511612 CHANGE PATTERN var/ to PATTERN var/* 
 IF(INSTALL_MYSQLTESTDIR)
 INSTALL(
  DIRECTORY .
  DESTINATION ${INSTALL_MYSQLTESTDIR}
  USE_SOURCE_PERMISSIONS
  COMPONENT Test
- PATTERN "var/" EXCLUDE
+ PATTERN "var/*" EXCLUDE
  PATTERN "lib/My/SafeProcess" EXCLUDE
  PATTERN "lib/t*" EXCLUDE
  PATTERN "CPack" EXCLUDE

=== modified file 'mysql-test/extra/rpl_tests/rpl_insert_ignore.test'
--- a/mysql-test/extra/rpl_tests/rpl_insert_ignore.test	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/extra/rpl_tests/rpl_insert_ignore.test	revid:mikael.ronstrom@stripped
@@ -58,6 +58,11 @@ INSERT IGNORE INTO t1 SELECT NULL, t2.b 
 --let $assert_text= Count of elements in t1 should be 6.
 --source include/assert.inc
 
+# MCP
+# force outstanding executed transactions to be flushed to binlog for ndb
+--save_master_pos
+# MCP
+
 if (`SELECT @@BINLOG_FORMAT != 'STATEMENT'`)
 {
   --let $binlog_position_cmp= =
@@ -91,6 +96,11 @@ eval CREATE TABLE t2 (
 INSERT INTO t1 VALUES (1);
 INSERT INTO t2 VALUES (1);
 
+# MCP
+# force outstanding executed transactions to be flushed to binlog for ndb
+--save_master_pos
+# MCP
+
 --let $binlog_file= query_get_value("SHOW MASTER STATUS", File, 1)
 --let $binlog_start= query_get_value("SHOW MASTER STATUS", Position, 1)
 --let $statement_file=INSERT INTO t1 SELECT t2.a FROM t2 ORDER BY t2.a ON DUPLICATE KEY UPDATE t1.a= t1.a
@@ -101,6 +111,11 @@ INSERT INTO t2 VALUES (1);
 --let $assert_text= Sum of elements in t1 should be 1.
 --source include/assert.inc
 
+# MCP
+# force outstanding executed transactions to be flushed to binlog for ndb
+--save_master_pos
+# MCP
+
 if (`SELECT @@BINLOG_FORMAT != 'STATEMENT'`)
 {
   --let $binlog_position_cmp= =

=== modified file 'mysql-test/mysql-test-run.pl'
--- a/mysql-test/mysql-test-run.pl	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/mysql-test-run.pl	revid:mikael.ronstrom@stripped
@@ -3130,11 +3130,14 @@ sub memcached_start {
 
 
 sub memcached_load_metadata($) {
-  my $cluster = shift;
+  my $cluster= shift;
     
-  my $sql_script= my_find_file($basedir,
-                             ["share", "storage/ndb/memcache/scripts"],
-                             "ndb_memcache_metadata.sql", NOT_REQUIRED);
+  my $sql_script= my_find_file($bindir,
+                              ["share/mysql/memcache-api", # RPM install
+                               "share/memcache-api",       # Other installs
+                               "scripts"                   # Build tree
+                              ],
+                              "ndb_memcache_metadata.sql", NOT_REQUIRED);
 
   foreach my $mysqld (mysqlds()) {
     if(-d $mysqld->value('datadir') . "/" . "ndbmemcache") {

=== modified file 'mysql-test/suite/ndb/r/ndb_alter_table_online2.result'
--- a/mysql-test/suite/ndb/r/ndb_alter_table_online2.result	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/suite/ndb/r/ndb_alter_table_online2.result	revid:mikael.ronstrom@stripped
@@ -49,7 +49,7 @@ name
 ~ Starting mysqlslap using column b
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
 
-update t1 set b= 0;
+update t1 set b = 0 where pk = 1;
 
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 ~ Alter table t1 add column c
@@ -74,10 +74,10 @@ name
 ~ Starting mysqlslap using column c
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-update t1 set c= 0;
+update t1 set c = 0 where pk = 1;
 select * from t1;
 pk	a	b	c
-1	5000	5000	5000
+1	2000	2000	2000
 
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 ~ Alter table t1 and try to add partitions
@@ -105,7 +105,7 @@ name
 START TRANSACTION;
 SELECT * FROM t1;
 pk	a	b	c
-1	5000	5000	5000
+1	2000	2000	2000
 # Connection con1
 SET lock_wait_timeout=1;
 ALTER ONLINE TABLE t1 ADD d INT;
@@ -114,7 +114,7 @@ ALTER ONLINE TABLE t1 ADD d INT;
 ERROR HY000: Lock wait timeout exceeded; try restarting transaction
 SELECT * FROM t1;
 pk	a	b	c
-1	5000	5000	5000
+1	2000	2000	2000
 COMMIT;
 
 ndb_show_tables completed.....

=== modified file 'mysql-test/suite/ndb/t/ndb_alter_table_online2.test'
--- a/mysql-test/suite/ndb/t/ndb_alter_table_online2.test	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/mysql-test/suite/ndb/t/ndb_alter_table_online2.test	revid:mikael.ronstrom@stripped
@@ -60,8 +60,8 @@ set @t1_id = (select id from ndb_show_ta
 --echo ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 --echo
 
-let $end_mysqlslap= 5000;
---exec $MYSQL_SLAP --query="update test.t1 set a=a+1 where pk=1" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
+let $end_mysqlslap= 2000;
+--exec $MYSQL_SLAP --query="update test.t1 set a=a+1 where pk=1; select sleep(0.01);" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
 
 # wait for 100 updates
 --disable_result_log
@@ -101,8 +101,8 @@ select name from ndb_show_tables_results
 --echo ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
 --echo
 
-update t1 set b= 0;
---exec $MYSQL_SLAP --query="update test.t1 set b=b+1 where pk=1" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
+update t1 set b = 0 where pk = 1;
+--exec $MYSQL_SLAP --query="update test.t1 set b=b+1 where pk=1; select sleep(0.01);" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
 
 # wait for 100 updates
 --disable_result_log
@@ -142,8 +142,8 @@ select name from ndb_show_tables_results
 --echo ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 --echo
 
-update t1 set c= 0;
---exec $MYSQL_SLAP --query="update test.t1 set c=c+1 where pk=1" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
+update t1 set c = 0 where pk = 1;
+--exec $MYSQL_SLAP --query="update test.t1 set c=c+1 where pk=1; select sleep(0.01);" -i $end_mysqlslap >> $NDB_TOOLS_OUTPUT &
 
 # wait for mysqlslap to end
 --disable_result_log

=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/sql/ha_ndb_index_stat.cc	revid:mikael.ronstrom@stripped
@@ -50,7 +50,6 @@ Ndb_index_stat_thread::Ndb_index_stat_th
   pthread_mutex_init(&LOCK, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&COND, NULL);
   pthread_cond_init(&COND_ready, NULL);
-  pthread_mutex_init(&list_mutex, MY_MUTEX_INIT_FAST);
   pthread_mutex_init(&stat_mutex, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&stat_cond, NULL);
 }
@@ -60,7 +59,6 @@ Ndb_index_stat_thread::~Ndb_index_stat_t
   pthread_mutex_destroy(&LOCK);
   pthread_cond_destroy(&COND);
   pthread_cond_destroy(&COND_ready);
-  pthread_mutex_destroy(&list_mutex);
   pthread_mutex_destroy(&stat_mutex);
   pthread_cond_destroy(&stat_cond);
 }
@@ -90,9 +88,12 @@ struct Ndb_index_stat {
   time_t check_time;    /* when checked for updated stats (>= read_time) */
   uint query_bytes;     /* cache query bytes in use */
   uint clean_bytes;     /* cache clean bytes waiting to be deleted */
+  uint drop_bytes;      /* cache bytes waiting for drop */
+  uint evict_bytes;     /* cache bytes waiting for evict */
   bool force_update;    /* one-time force update from analyze table */
   bool no_stats;        /* have detected that no stats exist */
   NdbIndexStat::Error error;
+  NdbIndexStat::Error client_error;
   time_t error_time;
   uint error_count;
   struct Ndb_index_stat *share_next; /* per-share list */
@@ -101,7 +102,9 @@ struct Ndb_index_stat {
   struct Ndb_index_stat *list_next;
   struct Ndb_index_stat *list_prev;
   struct NDB_SHARE *share;
+  uint ref_count;       /* from client requests */
   bool to_delete;       /* detached from share and marked for delete */
+  bool abort_request;   /* abort all requests and allow no more */
   Ndb_index_stat();
 };
 
@@ -556,12 +559,16 @@ struct Ndb_index_stat_glob {
   uint event_ok;          /* Events received for known index */
   uint event_miss;        /* Events received for unknown index */
   uint refresh_count;     /* Successful cache refreshes */
+  uint clean_count;       /* Times old caches (1 or more) cleaned */
+  uint pinned_count;      /* Times not cleaned due to old cache ref count */
   uint drop_count;        /* From index drop */
   uint evict_count;       /* From LRU cleanup */
   /* Cache */
   uint cache_query_bytes; /* In use */
   uint cache_clean_bytes; /* Obsolete versions not yet removed */
   uint cache_high_bytes;  /* Max ever of above */
+  uint cache_drop_bytes;  /* Part of above waiting to be evicted */
+  uint cache_evict_bytes; /* Part of above waiting to be evicted */
   char status[2][512];
   uint status_i;
 
@@ -588,11 +595,15 @@ Ndb_index_stat_glob::Ndb_index_stat_glob
   event_ok= 0;
   event_miss= 0;
   refresh_count= 0;
+  clean_count= 0;
+  pinned_count= 0;
   drop_count= 0;
   evict_count= 0;
   cache_query_bytes= 0;
   cache_clean_bytes= 0;
   cache_high_bytes= 0;
+  cache_drop_bytes= 0;
+  cache_evict_bytes= 0;
   memset(status, 0, sizeof(status));
   status_i= 0;
 }
@@ -601,6 +612,8 @@ Ndb_index_stat_glob::Ndb_index_stat_glob
 void
 Ndb_index_stat_glob::set_status()
 {
+  safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
   char* p= status[status_i];
 
@@ -635,11 +648,13 @@ Ndb_index_stat_glob::set_status()
   p+= strlen(p);
   sprintf(p, "analyze:(all:%u,error:%u)", analyze_count, analyze_error);
   p+= strlen(p);
-  sprintf(p, ",query:(all:%u,nostats:%u,error:%u)", query_count, query_no_stats, query_error);
+  sprintf(p, ",query:(all:%u,nostats:%u,error:%u)",
+             query_count, query_no_stats, query_error);
   p+= strlen(p);
   sprintf(p, ",event:(ok:%u,miss:%u)", event_ok, event_miss);
   p+= strlen(p);
-  sprintf(p, ",cache:(refresh:%u,drop:%u,evict:%u)", refresh_count, drop_count, evict_count);
+  sprintf(p, ",cache:(refresh:%u,clean:%u,pinned:%u,drop:%u,evict:%u)",
+             refresh_count, clean_count, pinned_count, drop_count, evict_count);
   p+= strlen(p);
   sprintf(p, ")");
   p+= strlen(p);
@@ -654,8 +669,12 @@ Ndb_index_stat_glob::set_status()
     cache_pct= (double)100.0 * (double)cache_total / (double)cache_limit;
     cache_high_pct= (double)100.0 * (double)cache_high_bytes / (double)cache_limit;
   }
-  sprintf(p, ",cache:(query:%u,clean:%u,usedpct:%.2f,highpct:%.2f)",
-             cache_query_bytes, cache_clean_bytes, cache_pct, cache_high_pct);
+  sprintf(p, ",cache:(query:%u,clean:%u"
+             ",drop:%u,evict:%u"
+             ",usedpct:%.2f,highpct:%.2f)",
+             cache_query_bytes, cache_clean_bytes,
+             cache_drop_bytes, cache_evict_bytes,
+             cache_pct, cache_high_pct);
   p+= strlen(p);
 
   // alternating status buffers to keep this lock short
@@ -679,6 +698,8 @@ Ndb_index_stat_glob::zero_total()
   event_ok= 0;
   event_miss= 0;
   refresh_count= 0;
+  clean_count= 0;
+  pinned_count= 0;
   drop_count= 0;
   evict_count= 0;
   /* Reset highest use seen to current */
@@ -704,6 +725,8 @@ Ndb_index_stat::Ndb_index_stat()
   check_time= 0;
   query_bytes= 0;
   clean_bytes= 0;
+  drop_bytes= 0;
+  evict_bytes= 0;
   force_update= false;
   no_stats= false;
   error_time= 0;
@@ -714,24 +737,40 @@ Ndb_index_stat::Ndb_index_stat()
   list_next= 0;
   list_prev= 0;
   share= 0;
+  ref_count= 0;
   to_delete= false;
+  abort_request= false;
 }
 
+/*
+  Called by stats thread and (rarely) by client.  Caller must hold
+  stat_mutex.  Client errors currently have no effect on execution
+  since they are probably local e.g. bad range (internal error).
+  Argument "from" is 0=stats thread 1=client.
+*/
 void
-ndb_index_stat_error(Ndb_index_stat *st, const char* place, int line)
+ndb_index_stat_error(Ndb_index_stat *st,
+                     int from, const char* place, int line)
 {
+  safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
   time_t now= ndb_index_stat_time();
   NdbIndexStat::Error error= st->is->getNdbError();
   if (error.code == 0)
   {
-    // XXX why this if
+    /* Make sure code is not 0 */
     NdbIndexStat::Error error2;
     error= error2;
     error.code= NdbIndexStat::InternalError;
     error.status= NdbError::TemporaryError;
   }
-  st->error= error;
-  st->error_time= now;
+  if (from == 0)
+  {
+    st->error= error;
+    st->error_time= now; /* Controls proc_error */
+  }
+  else
+    st->client_error= error;
   st->error_count++;
 
   DBUG_PRINT("index_stat", ("%s line %d: error %d line %d extra %d",
@@ -839,6 +878,8 @@ ndb_index_stat_list_move(Ndb_index_stat 
 void
 ndb_index_stat_force_update(Ndb_index_stat *st, bool onoff)
 {
+  safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   if (onoff)
   {
@@ -864,6 +905,8 @@ ndb_index_stat_force_update(Ndb_index_st
 void
 ndb_index_stat_no_stats(Ndb_index_stat *st, bool flag)
 {
+  safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   if (st->no_stats != flag)
   {
@@ -882,8 +925,36 @@ ndb_index_stat_no_stats(Ndb_index_stat *
   }
 }
 
+void
+ndb_index_stat_ref_count(Ndb_index_stat *st, bool flag)
+{
+  safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
+  uint old_count= st->ref_count;
+  (void)old_count; // USED
+  if (flag)
+  {
+    st->ref_count++;
+  }
+  else
+  {
+    assert(st->ref_count != 0);
+    st->ref_count--;
+  }
+  DBUG_PRINT("index_stat", ("st %s ref_count:%u->%u",
+                            st->id, old_count, st->ref_count));
+}
+
 /* Find or add entry under the share */
 
+/* Saved in get_share() under stat_mutex */
+struct Ndb_index_stat_snap {
+  time_t load_time;
+  uint sample_version;
+  Ndb_index_stat_snap() { load_time= 0; sample_version= 0; }
+};
+
+/* Subroutine, have lock */
 Ndb_index_stat*
 ndb_index_stat_alloc(const NDBINDEX *index,
                      const NDBTAB *table,
@@ -902,8 +973,8 @@ ndb_index_stat_alloc(const NDBINDEX *ind
 #endif
     if (is->set_index(*index, *table) == 0)
       return st;
-    ndb_index_stat_error(st, "set_index", __LINE__);
-    err_out= st->error.code;
+    ndb_index_stat_error(st, 1, "set_index", __LINE__);
+    err_out= st->client_error.code;
   }
   else
   {
@@ -956,6 +1027,7 @@ Ndb_index_stat*
 ndb_index_stat_get_share(NDB_SHARE *share,
                          const NDBINDEX *index,
                          const NDBTAB *table,
+                         Ndb_index_stat_snap &snap,
                          int &err_out,
                          bool allow_add,
                          bool force_update)
@@ -963,7 +1035,6 @@ ndb_index_stat_get_share(NDB_SHARE *shar
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
 
   pthread_mutex_lock(&share->mutex);
-  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   time_t now= ndb_index_stat_time();
   err_out= 0;
@@ -974,7 +1045,7 @@ ndb_index_stat_get_share(NDB_SHARE *shar
   {
     if (unlikely(!ndb_index_stat_allow()))
     {
-      err_out= Ndb_index_stat_error_NOT_ALLOW;
+      err_out= NdbIndexStat::MyNotAllow;
       break;
     }
     st= ndb_index_stat_find_share(share, index, st_last);
@@ -982,7 +1053,7 @@ ndb_index_stat_get_share(NDB_SHARE *shar
     {
       if (!allow_add)
       {
-        err_out= Ndb_index_stat_error_NOT_FOUND;
+        err_out= NdbIndexStat::MyNotFound;
         break;
       }
       st= ndb_index_stat_alloc(index, table, err_out);
@@ -995,14 +1066,28 @@ ndb_index_stat_get_share(NDB_SHARE *shar
       ndb_index_stat_list_add(st, Ndb_index_stat::LT_New);
       glob.set_status();
     }
+    else if (unlikely(st->abort_request))
+    {
+      err_out= NdbIndexStat::MyAbortReq;
+      break;
+    }
     if (force_update)
       ndb_index_stat_force_update(st, true);
+    snap.load_time= st->load_time;
+    snap.sample_version= st->sample_version;
     st->access_time= now;
   }
   while (0);
+ 
+  if (err_out == 0)
+  {
+    assert(st != 0);
+    ndb_index_stat_ref_count(st, true);
+  }
+  else
+    st= 0;
 
   pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
   pthread_mutex_unlock(&share->mutex);
   return st;
 }
@@ -1012,10 +1097,12 @@ ndb_index_stat_get_share(NDB_SHARE *shar
   list and set "to_delete" flag.  Stats thread does real delete.
 */
 
-/* caller must hold list_mutex */
+/* caller must hold stat_mutex */
 void
 ndb_index_stat_free(Ndb_index_stat *st)
 {
+  safe_mutex_assert_owner(&ndb_index_stat_thread.stat_mutex);
+
   DBUG_ENTER("ndb_index_stat_free");
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   NDB_SHARE *share= st->share;
@@ -1037,6 +1124,7 @@ ndb_index_stat_free(Ndb_index_stat *st)
       assert(st->lt != Ndb_index_stat::LT_Delete);
       assert(!st->to_delete);
       st->to_delete= true;
+      st->abort_request= true;
       found++;
     }
     else
@@ -1053,9 +1141,7 @@ ndb_index_stat_free(Ndb_index_stat *st)
   assert(found == 1);
   share->index_stat_list= st_head;
 
-  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   DBUG_VOID_RETURN;
 }
 
@@ -1067,7 +1153,7 @@ ndb_index_stat_free(NDB_SHARE *share, in
   DBUG_PRINT("index_stat", ("(index_id:%d index_version:%d",
                             index_id, index_version));
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
 
   uint found= 0;
   Ndb_index_stat *st= share->index_stat_list;
@@ -1078,16 +1164,17 @@ ndb_index_stat_free(NDB_SHARE *share, in
     {
       ndb_index_stat_free(st);
       found++;
+      glob.drop_count++;
+      assert(st->drop_bytes == 0);
+      st->drop_bytes= st->query_bytes + st->clean_bytes;
+      glob.cache_drop_bytes+= st->drop_bytes;
       break;
     }
     st= st->share_next;
   }
 
-  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
-  glob.drop_count+= found;
   glob.set_status();
   pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
   DBUG_VOID_RETURN;
 }
 
@@ -1096,7 +1183,8 @@ ndb_index_stat_free(NDB_SHARE *share)
 {
   DBUG_ENTER("ndb_index_stat_free");
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+
   uint found= 0;
   Ndb_index_stat *st;
   while ((st= share->index_stat_list) != 0)
@@ -1109,13 +1197,16 @@ ndb_index_stat_free(NDB_SHARE *share)
     assert(st->lt != Ndb_index_stat::LT_Delete);
     assert(!st->to_delete);
     st->to_delete= true;
+    st->abort_request= true;
     found++;
+    glob.drop_count++;
+    assert(st->drop_bytes == 0);
+    st->drop_bytes+= st->query_bytes + st->clean_bytes;
+    glob.cache_drop_bytes+= st->drop_bytes;
   }
-  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
-  glob.drop_count+= found;
+
   glob.set_status();
   pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
   DBUG_VOID_RETURN;
 }
 
@@ -1126,7 +1217,7 @@ ndb_index_stat_find_entry(int index_id, 
 {
   DBUG_ENTER("ndb_index_stat_find_entry");
   pthread_mutex_lock(&ndbcluster_mutex);
-  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d",
                             index_id, index_version, table_id));
 
@@ -1139,7 +1230,7 @@ ndb_index_stat_find_entry(int index_id, 
       if (st->index_id == index_id &&
           st->index_version == index_version)
       {
-        pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+        pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
         pthread_mutex_unlock(&ndbcluster_mutex);
         DBUG_RETURN(st);
       }
@@ -1147,7 +1238,7 @@ ndb_index_stat_find_entry(int index_id, 
     }
   }
 
-  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   pthread_mutex_unlock(&ndbcluster_mutex);
   DBUG_RETURN(0);
 }
@@ -1179,7 +1270,7 @@ ndb_index_stat_cache_move(Ndb_index_stat
     glob.cache_high_bytes= cache_total;
 }
 
-void
+bool
 ndb_index_stat_cache_clean(Ndb_index_stat *st)
 {
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
@@ -1187,12 +1278,46 @@ ndb_index_stat_cache_clean(Ndb_index_sta
 
   st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
   const uint old_clean_bytes= infoClean.m_totalBytes;
-  DBUG_PRINT("index_stat", ("st %s cache clean: clean:%u",
-                            st->id, old_clean_bytes));
+  const uint ref_count= infoClean.m_ref_count;
+  DBUG_PRINT("index_stat", ("st %s cache clean: clean:%u ref_count:%u",
+                            st->id, old_clean_bytes, ref_count));
+  if (ref_count != 0)
+    return false;
   st->is->clean_cache();
   st->clean_bytes= 0;
   assert(glob.cache_clean_bytes >= old_clean_bytes);
   glob.cache_clean_bytes-= old_clean_bytes;
+  return true;
+}
+
+void
+ndb_index_stat_cache_evict(Ndb_index_stat *st)
+{
+  NdbIndexStat::Head head;
+  NdbIndexStat::CacheInfo infoBuild;
+  NdbIndexStat::CacheInfo infoQuery;
+  NdbIndexStat::CacheInfo infoClean;
+  st->is->get_head(head);
+  st->is->get_cache_info(infoBuild, NdbIndexStat::CacheBuild);
+  st->is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
+  st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
+
+  DBUG_PRINT("index_stat",
+             ("evict table: %u index: %u version: %u"
+              " sample version: %u"
+              " cache bytes build:%u query:%u clean:%u",
+              head.m_tableId, head.m_indexId, head.m_indexVersion,
+              head.m_sampleVersion,
+              infoBuild.m_totalBytes, infoQuery.m_totalBytes, infoClean.m_totalBytes));
+
+  /* Twice to move all caches to clean */
+  ndb_index_stat_cache_move(st);
+  ndb_index_stat_cache_move(st);
+  /* Unused variable release vs debug nonsense */
+  bool ok= false;
+  (void)ok; // USED
+  ok= ndb_index_stat_cache_clean(st);
+  assert(ok);
 }
 
 /* Misc in/out parameters for process steps */
@@ -1231,7 +1356,7 @@ void
 ndb_index_stat_proc_new(Ndb_index_stat_proc &pr)
 {
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   const int lt= Ndb_index_stat::LT_New;
   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
 
@@ -1245,10 +1370,8 @@ ndb_index_stat_proc_new(Ndb_index_stat_p
     assert(pr.lt != lt);
     ndb_index_stat_list_move(st, pr.lt);
   }
-  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
   pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
 }
 
 void
@@ -1256,8 +1379,8 @@ ndb_index_stat_proc_update(Ndb_index_sta
 {
   if (st->is->update_stat(pr.ndb) == -1)
   {
-    ndb_index_stat_error(st, "update_stat", __LINE__);
     pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+    ndb_index_stat_error(st, 0, "update_stat", __LINE__);
 
     /*
       Turn off force update or else proc_error() thinks
@@ -1311,7 +1434,7 @@ ndb_index_stat_proc_read(Ndb_index_stat_
   if (st->is->read_stat(pr.ndb) == -1)
   {
     pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
-    ndb_index_stat_error(st, "read_stat", __LINE__);
+    ndb_index_stat_error(st, 0, "read_stat", __LINE__);
     const bool force_update= st->force_update;
     ndb_index_stat_force_update(st, false);
 
@@ -1384,6 +1507,7 @@ ndb_index_stat_proc_read(Ndb_index_stat_
 void
 ndb_index_stat_proc_idle(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
 {
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
   const longlong clean_delay= opt.get(Ndb_index_stat_opt::Iclean_delay);
   const longlong check_delay= opt.get(Ndb_index_stat_opt::Icheck_delay);
@@ -1408,7 +1532,10 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
 
   if (st->clean_bytes != 0 && clean_wait <= 0)
   {
-    ndb_index_stat_cache_clean(st);
+    if (ndb_index_stat_cache_clean(st))
+      glob.clean_count++;
+    else
+      glob.pinned_count++;
   }
   if (st->force_update)
   {
@@ -1482,7 +1609,8 @@ ndb_index_stat_proc_check(Ndb_index_stat
   NdbIndexStat::Head head;
   if (st->is->read_head(pr.ndb) == -1)
   {
-    ndb_index_stat_error(st, "read_head", __LINE__);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+    ndb_index_stat_error(st, 0, "read_head", __LINE__);
     /* no stats is not unexpected error */
     if (st->is->getNdbError().code == NdbIndexStat::NoIndexStats)
     {
@@ -1493,6 +1621,8 @@ ndb_index_stat_proc_check(Ndb_index_stat
     {
       pr.lt= Ndb_index_stat::LT_Error;
     }
+    pthread_cond_broadcast(&ndb_index_stat_thread.stat_cond);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
     return;
   }
   st->is->get_head(head);
@@ -1537,39 +1667,6 @@ ndb_index_stat_proc_check(Ndb_index_stat
     pr.busy= true;
 }
 
-/* Only evict the caches */
-void
-ndb_index_stat_proc_evict(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
-{
-  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-
-  NdbIndexStat::Head head;
-  NdbIndexStat::CacheInfo infoBuild;
-  NdbIndexStat::CacheInfo infoQuery;
-  NdbIndexStat::CacheInfo infoClean;
-  st->is->get_head(head);
-  st->is->get_cache_info(infoBuild, NdbIndexStat::CacheBuild);
-  st->is->get_cache_info(infoQuery, NdbIndexStat::CacheQuery);
-  st->is->get_cache_info(infoClean, NdbIndexStat::CacheClean);
-
-  DBUG_PRINT("index_stat",
-             ("evict table: %u index: %u version: %u"
-              " sample version: %u"
-              " cache bytes build:%u query:%u clean:%u",
-              head.m_tableId, head.m_indexId, head.m_indexVersion,
-              head.m_sampleVersion,
-              infoBuild.m_totalBytes, infoQuery.m_totalBytes, infoClean.m_totalBytes));
-
-  /* Twice to move all caches to clean */
-  ndb_index_stat_cache_move(st);
-  ndb_index_stat_cache_move(st);
-  ndb_index_stat_cache_clean(st);
-
-  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
-  glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
-}
-
 /* Check if need to evict more */
 bool
 ndb_index_stat_proc_evict()
@@ -1577,6 +1674,11 @@ ndb_index_stat_proc_evict()
   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   uint curr_size= glob.cache_query_bytes + glob.cache_clean_bytes;
+
+  /* Subtract bytes already scheduled for evict */
+  assert(curr_size >= glob.cache_evict_bytes);
+  curr_size-= glob.cache_evict_bytes;
+
   const uint cache_lowpct= opt.get(Ndb_index_stat_opt::Icache_lowpct);
   const uint cache_limit= opt.get(Ndb_index_stat_opt::Icache_limit);
   if (100 * curr_size <= cache_lowpct * cache_limit)
@@ -1612,6 +1714,9 @@ ndb_index_stat_proc_evict(Ndb_index_stat
   if (!ndb_index_stat_proc_evict())
     return;
 
+  /* Mutex entire routine (protect access_time) */
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+
   /* Create a LRU batch */
   Ndb_index_stat* st_lru_arr[ndb_index_stat_max_evict_batch + 1];
   uint st_lru_cnt= 0;
@@ -1690,19 +1795,19 @@ ndb_index_stat_proc_evict(Ndb_index_stat
 
     Ndb_index_stat *st= st_lru_arr[cnt];
     DBUG_PRINT("index_stat", ("st %s proc evict %s", st->id, list.name));
-    ndb_index_stat_proc_evict(pr, st);
-    pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+
+    /* Entry may have requests.  Cache is evicted at delete. */
     ndb_index_stat_free(st);
-    pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+    assert(st->evict_bytes == 0);
+    st->evict_bytes= st->query_bytes + st->clean_bytes;
+    glob.cache_evict_bytes+= st->evict_bytes;
     cnt++;
   }
+  if (cnt == batch)
+    pr.busy= true;
 
-  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.evict_count+= cnt;
   pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
-
-  if (cnt == batch)
-    pr.busy= true;
 }
 
 void
@@ -1722,6 +1827,9 @@ ndb_index_stat_proc_delete(Ndb_index_sta
   const uint delete_batch= opt.get(Ndb_index_stat_opt::Idelete_batch);
   const uint batch= !pr.end ? delete_batch : ~(uint)0;
 
+  /* Mutex entire routine */
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+
   Ndb_index_stat *st_loop= list.head;
   uint cnt= 0;
   while (st_loop != 0 && cnt < batch)
@@ -1731,12 +1839,26 @@ ndb_index_stat_proc_delete(Ndb_index_sta
     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
 
     // adjust global counters at drop
-    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     ndb_index_stat_force_update(st, false);
     ndb_index_stat_no_stats(st, false);
-    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 
-    ndb_index_stat_proc_evict(pr, st);
+    /*
+      Do not wait for requests to terminate since this could
+      risk stats thread hanging.  Instead try again next time.
+      Presumably clients will eventually notice abort_request.
+    */
+    if (st->ref_count != 0)
+    {
+      DBUG_PRINT("index_stat", ("st %s proc %s: ref_count:%u",
+                 st->id, list.name, st->ref_count));
+      continue;
+    }
+
+    ndb_index_stat_cache_evict(st);
+    assert(glob.cache_drop_bytes >= st->drop_bytes);
+    glob.cache_drop_bytes-= st->drop_bytes;
+    assert(glob.cache_evict_bytes >= st->evict_bytes);
+    glob.cache_evict_bytes-= st->evict_bytes;
     ndb_index_stat_list_remove(st);
     delete st->is;
     delete st;
@@ -1745,7 +1867,6 @@ ndb_index_stat_proc_delete(Ndb_index_sta
   if (cnt == batch)
     pr.busy= true;
 
-  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
   pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
@@ -2004,7 +2125,7 @@ void
 ndb_index_stat_list_verify(Ndb_index_stat_proc &pr)
 {
   const Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   pr.cache_query_bytes= 0;
   pr.cache_clean_bytes= 0;
 
@@ -2013,7 +2134,7 @@ ndb_index_stat_list_verify(Ndb_index_sta
 
   assert(glob.cache_query_bytes == pr.cache_query_bytes);
   assert(glob.cache_clean_bytes == pr.cache_clean_bytes);
-  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 void
@@ -2498,12 +2619,16 @@ ndb_index_stat_round(double x)
   return n;
 }
 
+/*
+  Client waits for query or analyze.  The routines are
+  similar but separated for clarity.
+*/
+
 int
-ndb_index_stat_wait(Ndb_index_stat *st,
-                    uint sample_version,
-                    bool from_analyze)
+ndb_index_stat_wait_query(Ndb_index_stat *st,
+                          const Ndb_index_stat_snap &snap)
 {
-  DBUG_ENTER("ndb_index_stat_wait");
+  DBUG_ENTER("ndb_index_stat_wait_query");
 
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
@@ -2515,24 +2640,16 @@ ndb_index_stat_wait(Ndb_index_stat *st,
     int ret= 0;
     if (count == 0)
     {
-      if (!from_analyze)
-      {
-        glob.wait_stats++;
-        glob.query_count++;
-      }
-      else
+      glob.wait_stats++;
+      glob.query_count++;
+      if (st->lt == Ndb_index_stat::LT_Error)
       {
-        glob.wait_update++;
-        glob.analyze_count++;
-      }
-      if (st->lt == Ndb_index_stat::LT_Error && !from_analyze)
-      {
-        err= Ndb_index_stat_error_HAS_ERROR;
+        err= NdbIndexStat::MyHasError;
         break;
       }
       ndb_index_stat_clear_error(st);
     }
-    if (st->no_stats && !from_analyze)
+    if (st->no_stats)
     {
       /* Have detected no stats now or before */
       err= NdbIndexStat::NoIndexStats;
@@ -2543,16 +2660,29 @@ ndb_index_stat_wait(Ndb_index_stat *st,
     {
       /* A new error has occured */
       err= st->error.code;
-      if (!from_analyze)
-        glob.query_error++;
-      else
-        glob.analyze_error++;
+      glob.query_error++;
+      break;
+    }
+    /* Query waits for any samples */
+    if (st->sample_version > 0)
+      break;
+    /*
+      Try to detect changes behind our backs.  Should really not
+      happen but make sure.
+    */
+    if (st->load_time != snap.load_time ||
+        st->sample_version != snap.sample_version)
+    {
+      err= NdbIndexStat::NoIndexStats;
       break;
     }
-    if (st->sample_version > sample_version)
+    if (st->abort_request)
+    {
+      err= NdbIndexStat::MyAbortReq;
       break;
+    }
     count++;
-    DBUG_PRINT("index_stat", ("st %s wait count:%u",
+    DBUG_PRINT("index_stat", ("st %s wait_query count:%u",
                               st->id, count));
     pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     ndb_index_stat_waiter= true;
@@ -2568,25 +2698,93 @@ ndb_index_stat_wait(Ndb_index_stat *st,
       break;
     }
   }
-  if (!from_analyze)
+  assert(glob.wait_stats != 0);
+  glob.wait_stats--;
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  if (err != 0)
   {
-    assert(glob.wait_stats != 0);
-    glob.wait_stats--;
+    DBUG_PRINT("index_stat", ("st %s wait_query error: %d",
+                               st->id, err));
+    DBUG_RETURN(err);
   }
-  else
+  DBUG_PRINT("index_stat", ("st %s wait_query ok: sample_version %u -> %u",
+                            st->id, snap.sample_version, st->sample_version));
+  DBUG_RETURN(0);
+}
+
+int
+ndb_index_stat_wait_analyze(Ndb_index_stat *st,
+                            const Ndb_index_stat_snap &snap)
+{
+  DBUG_ENTER("ndb_index_stat_wait_analyze");
+
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+  int err= 0;
+  uint count= 0;
+  struct timespec abstime;
+  while (true)
   {
-    assert(glob.wait_update != 0);
-    glob.wait_update--;
+    int ret= 0;
+    if (count == 0)
+    {
+      glob.wait_update++;
+      glob.analyze_count++;
+      ndb_index_stat_clear_error(st);
+    }
+    if (st->error.code != 0)
+    {
+      /* A new error has occured */
+      err= st->error.code;
+      glob.analyze_error++;
+      break;
+    }
+    /* Analyze waits for newer samples */
+    if (st->sample_version > snap.sample_version)
+      break;
+    /*
+      Try to detect changes behind our backs.  If another process
+      deleted stats, an analyze here could wait forever.
+    */
+    if (st->load_time != snap.load_time ||
+        st->sample_version != snap.sample_version)
+    {
+      err= NdbIndexStat::AlienUpdate;
+      break;
+    }
+    if (st->abort_request)
+    {
+      err= NdbIndexStat::MyAbortReq;
+      break;
+    }
+    count++;
+    DBUG_PRINT("index_stat", ("st %s wait_analyze count:%u",
+                              st->id, count));
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
+    ndb_index_stat_waiter= true;
+    pthread_cond_signal(&ndb_index_stat_thread.COND);
+    pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
+    set_timespec(abstime, 1);
+    ret= pthread_cond_timedwait(&ndb_index_stat_thread.stat_cond,
+                                &ndb_index_stat_thread.stat_mutex,
+                                &abstime);
+    if (ret != 0 && ret != ETIMEDOUT)
+    {
+      err= ret;
+      break;
+    }
   }
+  assert(glob.wait_update != 0);
+  glob.wait_update--;
   pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   if (err != 0)
   {
-    DBUG_PRINT("index_stat", ("st %s wait error: %d",
+    DBUG_PRINT("index_stat", ("st %s wait_analyze error: %d",
                                st->id, err));
     DBUG_RETURN(err);
   }
-  DBUG_PRINT("index_stat", ("st %s wait ok: sample_version %u -> %u",
-                            st->id, sample_version, st->sample_version));
+  DBUG_PRINT("index_stat", ("st %s wait_analyze ok: sample_version %u -> %u",
+                            st->id, snap.sample_version, st->sample_version));
   DBUG_RETURN(0);
 }
 
@@ -2611,37 +2809,51 @@ ha_ndbcluster::ndb_index_stat_query(uint
   compute_index_bounds(ib, key_info, min_key, max_key, from);
   ib.range_no= 0;
 
+  Ndb_index_stat_snap snap;
   Ndb_index_stat *st=
-    ndb_index_stat_get_share(m_share, index, m_table, err, true, false);
+    ndb_index_stat_get_share(m_share, index, m_table, snap, err, true, false);
   if (st == 0)
     DBUG_RETURN(err);
+  /* Now holding reference to st */
 
-  /* Pass old version 0 so existing stats terminates wait at once */
-  err= ndb_index_stat_wait(st, 0, false);
-  if (err != 0)
-    DBUG_RETURN(err);
-  assert(st->sample_version != 0);
-
-  uint8 bound_lo_buffer[NdbIndexStat::BoundBufferBytes];
-  uint8 bound_hi_buffer[NdbIndexStat::BoundBufferBytes];
-  NdbIndexStat::Bound bound_lo(st->is, bound_lo_buffer);
-  NdbIndexStat::Bound bound_hi(st->is, bound_hi_buffer);
-  NdbIndexStat::Range range(bound_lo, bound_hi);
-
-  const NdbRecord* key_record= data.ndb_record_key;
-  if (st->is->convert_range(range, key_record, &ib) == -1)
-  {
-    ndb_index_stat_error(st, "convert_range", __LINE__);
-    DBUG_RETURN(st->error.code);
-  }
-  if (st->is->query_stat(range, stat) == -1)
+  do
   {
-    /* Invalid cache - should remove the entry */
-    ndb_index_stat_error(st, "query_stat", __LINE__);
-    DBUG_RETURN(st->error.code);
+    err= ndb_index_stat_wait_query(st, snap);
+    if (err != 0)
+      break;
+    assert(st->sample_version != 0);
+    uint8 bound_lo_buffer[NdbIndexStat::BoundBufferBytes];
+    uint8 bound_hi_buffer[NdbIndexStat::BoundBufferBytes];
+    NdbIndexStat::Bound bound_lo(st->is, bound_lo_buffer);
+    NdbIndexStat::Bound bound_hi(st->is, bound_hi_buffer);
+    NdbIndexStat::Range range(bound_lo, bound_hi);
+
+    const NdbRecord* key_record= data.ndb_record_key;
+    if (st->is->convert_range(range, key_record, &ib) == -1)
+    {
+      pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+      ndb_index_stat_error(st, 1, "convert_range", __LINE__);
+      err= st->client_error.code;
+      pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+      break;
+    }
+    if (st->is->query_stat(range, stat) == -1)
+    {
+      /* Invalid cache - should remove the entry */
+      pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+      ndb_index_stat_error(st, 1, "query_stat", __LINE__);
+      err= st->client_error.code;
+      pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+      break;
+    }
   }
+  while (0);
 
-  DBUG_RETURN(0);
+  /* Release reference to st */
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+  ndb_index_stat_ref_count(st, false);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  DBUG_RETURN(err);
 }
 
 int
@@ -2713,50 +2925,62 @@ ha_ndbcluster::ndb_index_stat_analyze(Nd
 {
   DBUG_ENTER("ha_ndbcluster::ndb_index_stat_analyze");
 
-  struct {
-    uint sample_version;
-    uint error_count;
-  } old[MAX_INDEXES];
-
-  int err= 0;
-  uint i;
+  struct Req {
+    Ndb_index_stat *st;
+    Ndb_index_stat_snap snap;
+    int err;
+    Req() { st= 0; err= 0; }
+  };
+  Req req[MAX_INDEXES];
 
   /* Force stats update on each index */
-  for (i= 0; i < inx_count; i++)
+  for (uint i= 0; i < inx_count; i++)
   {
+    Req &r= req[i];
     uint inx= inx_list[i];
     const NDB_INDEX_DATA &data= m_index[inx];
     const NDBINDEX *index= data.index;
     DBUG_PRINT("index_stat", ("force update: %s", index->getName()));
 
-    Ndb_index_stat *st=
-      ndb_index_stat_get_share(m_share, index, m_table, err, true, true);
-    if (st == 0)
-      DBUG_RETURN(err);
-
-    old[i].sample_version= st->sample_version;
-    old[i].error_count= st->error_count;
+    r.st=
+      ndb_index_stat_get_share(m_share, index, m_table, r.snap, r.err, true, true);
+    assert((r.st != 0) == (r.err == 0));
+    /* Now holding reference to r.st if r.err == 0 */
   }
 
-  /* Wait for each update (or error) */
-  for (i = 0; i < inx_count; i++)
+  /* Wait for each update */
+  for (uint i = 0; i < inx_count; i++)
   {
+    Req &r= req[i];
     uint inx= inx_list[i];
     const NDB_INDEX_DATA &data= m_index[inx];
     const NDBINDEX *index= data.index;
-    DBUG_PRINT("index_stat", ("wait for update: %s", index->getName()));
+    (void)index; // USED
 
-    Ndb_index_stat *st=
-      ndb_index_stat_get_share(m_share, index, m_table, err, false, false);
-    if (st == 0)
-      DBUG_RETURN(err);
+    if (r.err == 0)
+    {
+      DBUG_PRINT("index_stat", ("wait for update: %s", index->getName()));
+      r.err=ndb_index_stat_wait_analyze(r.st, r.snap);
+      /* Release reference to r.st */
+      pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+      ndb_index_stat_ref_count(r.st, false);
+      pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+    }
+  }
 
-    err= ndb_index_stat_wait(st, old[i].sample_version, true);
-    if (err != 0)
-      DBUG_RETURN(err);
+  /* Return first error if any */
+  int err= 0;
+  for (uint i= 0; i < inx_count; i++)
+  {
+    Req &r= req[i];
+    if (r.err != 0)
+    {
+      err= r.err;
+      break;
+    }
   }
 
-  DBUG_RETURN(0);
+  DBUG_RETURN(err);
 }
 
 #endif

=== modified file 'sql/ha_ndb_index_stat.h'
--- a/sql/ha_ndb_index_stat.h	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/sql/ha_ndb_index_stat.h	revid:mikael.ronstrom@stripped
@@ -40,10 +40,10 @@ public:
   pthread_cond_t COND;
   pthread_cond_t COND_ready;
 
-  /* protect entry lists where needed */
-  pthread_mutex_t list_mutex;
-
-  /* protect and signal changes in stats entries */
+  /*
+    protect stats entry lists where needed
+    protect and signal changes in stats entries
+  */
   pthread_mutex_t stat_mutex;
   pthread_cond_t stat_cond;
 
@@ -82,5 +82,8 @@ compute_index_bounds(NdbIndexScanOperati
 
 /* request on stats entry with recent error was ignored */
 #define Ndb_index_stat_error_HAS_ERROR          9003
+ 
+/* stats thread aborted request on stats entry */
+#define Ndb_index_stat_error_ABORT_REQUEST      9004
 
 #endif

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/sql/ha_ndbcluster.cc	revid:mikael.ronstrom@stripped
@@ -1286,7 +1286,9 @@ void ha_ndbcluster::set_rec_per_key()
             /* no stats is not unexpected error */
             err != NdbIndexStat::NoIndexStats &&
             /* warning was printed at first error */
-            err != Ndb_index_stat_error_HAS_ERROR)
+            err != NdbIndexStat::MyHasError &&
+            /* stats thread aborted request */
+            err != NdbIndexStat::MyAbortReq)
         {
           push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                               ER_CANT_GET_STAT, /* pun? */
@@ -12893,7 +12895,9 @@ ha_ndbcluster::records_in_range(uint inx
           /* no stats is not unexpected error */
           err != NdbIndexStat::NoIndexStats &&
           /* warning was printed at first error */
-          err != Ndb_index_stat_error_HAS_ERROR)
+          err != NdbIndexStat::MyHasError &&
+          /* stats thread aborted request */
+          err != NdbIndexStat::MyAbortReq)
       {
         push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                             ER_CANT_GET_STAT, /* pun? */

=== modified file 'storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/ClusterJHelper.java'
--- a/storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/ClusterJHelper.java	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/ClusterJHelper.java	revid:mikael.ronstrom@stripped
@@ -35,6 +35,14 @@ import java.util.Map;
  */
 public class ClusterJHelper {
 
+    /** Return a new Dbug instance.
+     * 
+     * @return a new Dbug instance
+     */
+    public static Dbug newDbug() {
+        return getServiceInstance(Dbug.class);
+    }
+
     /** Locate a SessionFactory implementation by services lookup. The class loader
      * used is the thread's context class loader.
      *

=== added file 'storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/Dbug.java'
--- a/storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/Dbug.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/Dbug.java	revid:mikael.ronstrom@stripped
@@ -0,0 +1,114 @@
+/*
+ *  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj;
+
+/** Dbug allows clusterj applications to enable the DBUG functionality in cluster
+ * ndbapi library.
+ * The dbug state is a control string that consists of flags separated by colons. Flags are:
+ * <ul><li>d set the debug flag
+ * </li><li>a[,filename] append debug output to the file
+ * </li><li>A[,filename] like a[,filename] but flush the output after each operation
+ * </li><li>d[,keyword[,keyword...]] enable output from macros with specified keywords
+ * </li><li>D[,tenths] delay for specified tenths of a second after each operation
+ * </li><li>f[,function[,function...]] limit output to the specified list of functions
+ * </li><li>F mark each output with the file name of the source file
+ * </li><li>i mark each output with the process id of the current process
+ * </li><li>g[,function[,function...]] profile specified list of functions
+ * </li><li>L mark each output with the line number of the source file
+ * </li><li>n mark each output with the current function nesting depth
+ * </li><li>N mark each output with a sequential number
+ * </li><li>o[,filename] overwrite debug output to the file
+ * </li><li>O[,filename] like o[,filename] but flush the output after each operation
+ * </li><li>p[,pid[,pid...]] limit output to specified list of process ids
+ * </li><li>P mark each output with the process name
+ * </li><li>r reset the indentation level to zero
+ * </li><li>t[,depth] limit function nesting to the specified depth
+ * </li><li>T mark each output with the current timestamp
+ * </li></ul>
+ * For example, the control string to trace calls and output debug information only for
+ * "jointx" and overwrite the contents of file "/tmp/dbug/jointx", use "t:d,jointx:o,/tmp/dbug/jointx".
+ * The above can be written as ClusterJHelper.newDbug().trace().debug("jointx").output("/tmp/dbug/jointx").set();
+ */
+public interface Dbug {
+    /** Push the current state and set the parameter as the new state.
+     * @param state the new state
+     */
+    void push(String state);
+
+    /** Pop the current state. The new state will be the previously pushed state.
+     */
+    void pop();
+
+    /** Set the current state from the parameter.
+     * @param state the new state
+     */
+    void set(String state);
+
+    /** Return the current state.
+     * @return the current state
+     */
+    String get();
+
+    /** Print debug message.
+     * 
+     */
+    void print(String keyword, String message);
+
+    /** Set the list of debug keywords.
+     * @param strings the debug keywords
+     * @return this
+     */
+    Dbug debug(String[] strings);
+
+    /** Set the list of debug keywords.
+     * @param string the comma separated debug keywords
+     * @return this
+     */
+    Dbug debug(String string);
+
+    /** Push the current state as defined by the methods.
+     */
+    void push();
+
+    /** Set the current state as defined by the methods.
+     */
+    void set();
+
+    /** Specify the file name for debug output (append).
+     * @param fileName the name of the file
+     * @return this
+     */
+    Dbug append(String fileName);
+
+    /** Specify the file name for debug output (overwrite).
+     * @param fileName the name of the file
+     * @return this
+     */
+    Dbug output(String fileName);
+
+    /** Force flush after each output operation.
+     * @return this
+     */
+    Dbug flush();
+
+    /** Set the trace flag.
+     * @return this
+     */
+    Dbug trace();
+
+}

=== added file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java	revid:mikael.ronstrom@stripped
@@ -0,0 +1,77 @@
+/*
+ *  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package testsuite.clusterj;
+
+import com.mysql.clusterj.ClusterJHelper;
+import com.mysql.clusterj.Dbug;
+
+/**
+ * Tests dbug methods.
+ */
+public class DbugTest extends AbstractClusterJTest{
+
+    static String tmpFileName = System.getProperty("MYSQL_TMP_DIR", "/tmp") + "/clusterj-test-dbug";
+
+    public boolean getDebug() {
+        return false;
+    }
+
+    public void test() {
+        Dbug dbug = ClusterJHelper.newDbug();
+        if (dbug == null) {
+            // nothing else can be tested
+            fail("Failed to get new Dbug");
+        }
+        if (dbug.get() == null) {
+            // ndbclient is compiled without DBUG; just make sure nothing blows up
+            dbug.set("nothing");
+            dbug.push("nada");
+            dbug.pop();
+            dbug.print("keyword", "message");
+            return;
+        }
+        String originalState = "t";
+        String newState = "d,jointx:o," + tmpFileName;
+        dbug.set(originalState);
+        String actualState = dbug.get();
+        errorIfNotEqual("Failed to set original state", originalState, actualState);
+        dbug.push(newState);
+        actualState = dbug.get();
+        errorIfNotEqual("Failed to push new state", newState, actualState);
+        dbug.pop();
+        actualState = dbug.get();
+        errorIfNotEqual("Failed to pop original state", originalState, actualState);
+
+        dbug = ClusterJHelper.newDbug();
+        dbug.output(tmpFileName).flush().debug(new String[] {"a", "b", "c", "d", "e", "f"}).push();
+        actualState = dbug.get();
+        // keywords are stored LIFO
+        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:O," + tmpFileName, actualState);
+        dbug.pop();
+
+        dbug = ClusterJHelper.newDbug();
+        dbug.append(tmpFileName).trace().debug("a,b,c,d,e,f").set();
+        actualState = dbug.get();
+        // keywords are stored LIFO
+        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:a," + tmpFileName + ":t", actualState);
+        dbug.pop();
+
+        failOnError();
+    }
+
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	revid:mikael.ronstrom@stripped
@@ -45,13 +45,6 @@ public class ClusterConnectionImpl
     static final Logger logger = LoggerFactoryService.getFactory()
             .getInstance(com.mysql.clusterj.core.store.ClusterConnection.class);
 
-    /** Load the ndbjtie system library */
-    static {
-        loadSystemLibrary("ndbclient");
-        // initialize the charset map
-        Utility.getCharsetMap();
-    }
-
     /** Ndb_cluster_connection is wrapped by ClusterConnection */
     protected Ndb_cluster_connection clusterConnection;
 
@@ -77,40 +70,6 @@ public class ClusterConnectionImpl
         logger.info(local.message("INFO_Create_Cluster_Connection", connectString, nodeId));
     }
 
-    static protected void loadSystemLibrary(String name) {
-        String message;
-        String path;
-        try {
-            System.loadLibrary(name);
-        } catch (UnsatisfiedLinkError e) {
-            path = getLoadLibraryPath();
-            message = local.message("ERR_Failed_Loading_Library",
-                    name, path, "UnsatisfiedLinkError", e.getLocalizedMessage());
-            logger.fatal(message);
-            throw e;
-        } catch (SecurityException e) {
-            path = getLoadLibraryPath();
-            message = local.message("ERR_Failed_Loading_Library",
-                    name, path, "SecurityException", e.getLocalizedMessage());
-            logger.fatal(message);
-            throw e;
-        }
-    }
-
-    /**
-     * @return the load library path or the Exception string
-     */
-    private static String getLoadLibraryPath() {
-        String path;
-        try {
-            path = System.getProperty("java.library.path");
-        } catch (Exception ex) {
-            path = "<Exception: " + ex.getMessage() + ">";
-        }
-        return path;
-    }
-
-
     public void connect(int connectRetries, int connectDelay, boolean verbose) {
         checkConnection();
         int returnCode = clusterConnection.connect(connectRetries, connectDelay, verbose?1:0);

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java	revid:mikael.ronstrom@stripped
@@ -40,7 +40,30 @@ public class ClusterConnectionServiceImp
         LoggerFactoryService.getFactory().registerLogger("com.mysql.clusterj.tie");
     }
 
+    /** Load the ndbclient system library only once */
+    static boolean ndbclientLoaded = false;
+
+    static protected void loadSystemLibrary(String name) {
+        // this is not thread-protected so it might be executed multiple times but no matter
+        if (ndbclientLoaded) {
+            return;
+        }
+        try {
+            System.loadLibrary(name);
+            // initialize the charset map
+            Utility.getCharsetMap();
+            ndbclientLoaded = true;
+        } catch (Throwable e) {
+            String path = getLoadLibraryPath();
+            String message = local.message("ERR_Failed_Loading_Library",
+                    name, path, e.getClass(), e.getLocalizedMessage());
+            logger.fatal(message);
+            throw new ClusterJFatalUserException(message, e);
+        }
+    }
+
     public ClusterConnection create(String connectString, int nodeId) {
+        loadSystemLibrary("ndbclient");
         try {
             return new ClusterConnectionImpl(connectString, nodeId);
         } catch (ClusterJFatalUserException cjex) {
@@ -52,4 +75,17 @@ public class ClusterConnectionServiceImp
         }
     }
 
+    /**
+     * @return the load library path or the Exception string
+     */
+    private static String getLoadLibraryPath() {
+        String path;
+        try {
+            path = System.getProperty("java.library.path");
+        } catch (Exception ex) {
+            path = "<Exception: " + ex.getMessage() + ">";
+        }
+        return path;
+    }
+
 }

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbugImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbugImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbugImpl.java	revid:mikael.ronstrom@stripped
@@ -0,0 +1,151 @@
+/*
+ *  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.nio.ByteBuffer;
+
+import com.mysql.clusterj.Dbug;
+
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.ndbjtie.mysql.Utils;
+
+/**
+ * This class encapsulates Utils dbug methods to manage dbug settings. 
+ */
+public class DbugImpl implements Dbug {
+
+    /** My message translator */
+    static final I18NHelper local = I18NHelper
+            .getInstance(DbugImpl.class);
+
+    /** My logger */
+    static final Logger logger = LoggerFactoryService.getFactory()
+            .getInstance(DbugImpl.class);
+
+    private static final int DBUG_SIZE = 256;
+
+    boolean propertyTrace = false;
+    String fileName = "";
+    Character fileStrategy = 'o';
+    String debugList;
+    
+    public DbugImpl() {
+        // Load the native library so we can set up debugging before anything else
+        ClusterConnectionServiceImpl.loadSystemLibrary("ndbclient");
+    }
+
+    public String get() {
+        ByteBuffer buffer = ByteBuffer.allocateDirect(DBUG_SIZE);
+        String result = Utils.dbugExplain(buffer, DBUG_SIZE);
+        return result;
+    }
+
+    public void pop() {
+        Utils.dbugPop();
+    }
+
+    public void push(String state) {
+        Utils.dbugPush(state);
+    }
+
+    public void set(String state) {
+        Utils.dbugSet(state);
+    }
+
+    public void set() {
+        set(toState());
+    }
+
+    public void push() {
+        push(toState());
+    }
+
+    public void print(String keyword, String message) {
+        Utils.dbugPrint(keyword, message);
+    }
+
+    public Dbug trace(boolean trace) {
+        this.propertyTrace = trace;
+        return this;
+    }
+
+    public Dbug trace() {
+        return trace(true);
+    }
+
+    public Dbug output(String fileName) {
+        this.fileName = fileName;
+        this.fileStrategy = 'o';
+        return this;
+    }
+
+    public Dbug append(String fileName) {
+        this.fileName = fileName;
+        this.fileStrategy = 'a';
+        return this;
+    }
+
+    public Dbug flush() {
+        this.fileStrategy = Character.toUpperCase(this.fileStrategy);
+        return this;
+    }
+
+    public Dbug debug(String debugList) {
+        this.debugList = debugList;
+        return this;
+    }
+
+    public Dbug debug(String[] debugList) {
+        StringBuilder builder = new StringBuilder();
+        String sep = "";
+        for (String debug: debugList) {
+            builder.append(sep);
+            builder.append(debug);
+            sep = ",";
+        }
+        this.debugList = builder.toString();
+        return this;
+    }
+
+    private String toState() {
+        String separator = "";
+        StringBuilder builder = new StringBuilder();
+        if (propertyTrace) {
+            builder.append("t");
+            separator = ":";
+        }
+        if (fileName != null) {
+            builder.append(separator);
+            builder.append(fileStrategy);
+            builder.append(',');
+            builder.append(fileName);
+            separator = ":";
+        }
+        if (debugList != null) {
+            builder.append(separator);
+            builder.append("d,");
+            builder.append(debugList);
+            separator = ":";
+        }
+        return builder.toString();
+    }
+
+}

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/resources/META-INF/services/com.mysql.clusterj.Dbug'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/resources/META-INF/services/com.mysql.clusterj.Dbug	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/resources/META-INF/services/com.mysql.clusterj.Dbug	revid:mikael.ronstrom@stripped
@@ -0,0 +1 @@
+com.mysql.clusterj.tie.DbugImpl

=== added file 'storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/DbugTest.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/DbugTest.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/DbugTest.java	revid:mikael.ronstrom@stripped
@@ -0,0 +1,22 @@
+/*
+ *  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+package testsuite.clusterj.tie;
+
+public class DbugTest extends testsuite.clusterj.DbugTest {
+
+}

=== modified file 'storage/ndb/include/ndbapi/NdbIndexStat.hpp'
--- a/storage/ndb/include/ndbapi/NdbIndexStat.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/include/ndbapi/NdbIndexStat.hpp	revid:mikael.ronstrom@stripped
@@ -104,7 +104,16 @@ public:
     HaveSysTables = 4244, // create error if all sys tables exist
     NoSysEvents = 4710,
     BadSysEvents = BadSysTables,
-    HaveSysEvents = 746
+    HaveSysEvents = 746,
+    /*
+     * Following are for mysqld.  Most are consumed by mysqld itself
+     * and should therefore not be seen by clients.
+     */
+    MyNotAllow = 4721,    // stats thread not open for requests
+    MyNotFound = 4722,    // stats entry unexpectedly not found
+    MyHasError = 4723,    // request ignored due to recent error
+    MyAbortReq = 4724,    // request aborted by stats thread
+    AlienUpdate = 4725    // somebody else messed with stats
   };
 
   /*
@@ -180,6 +189,7 @@ public:
     Uint32 m_totalBytes;  // total bytes memory used
     Uint64 m_save_time;   // microseconds to read stats into cache
     Uint64 m_sort_time;   // microseconds to sort the cache
+    Uint32 m_ref_count;   // in use by query_stat
     // end v4 fields
   };
 

=== added file 'storage/ndb/include/util/dbug_utils.hpp'
--- a/storage/ndb/include/util/dbug_utils.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/dbug_utils.hpp	revid:mikael.ronstrom@stripped
@@ -0,0 +1,105 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+/*
+ * debug_utils.hpp
+ */
+
+#ifndef dbug_utils_hpp
+#define dbug_utils_hpp
+
+#include "my_global.h"
+#include "my_dbug.h"
+
+/*
+ * These macros are robuster versions of the ones in MySQL's DBUG package.
+ * 
+ * As the DBUG macros/functions don't check arguments, the caller (or JVM!)
+ * crashes in case, for instance, of NULL args.  Also, macros returning a
+ * value (like DBUG_EXPLAIN) ought to do so even if DBUG_OFF was defined.
+ */
+
+#ifndef DBUG_OFF
+
+#define MY_DBUG_PUSH(a1)                                                \
+    do { if ((a1)) DBUG_PUSH(a1); } while (0)
+#define MY_DBUG_POP()                                                   \
+    DBUG_POP()
+#define MY_DBUG_SET(a1)                                                 \
+    do { if ((a1)) DBUG_SET(a1); } while (0)
+#define MY_DBUG_EXPLAIN(buf, len)                                       \
+    ((!(buf) || (len) <= 0) ? 1 : DBUG_EXPLAIN(buf, len))
+#define MY_DBUG_PRINT(keyword, arglist)                                 \
+    do { if ((keyword)) DBUG_PRINT(keyword, arglist); } while (0)
+
+#else // DBUG_OFF
+
+#define MY_DBUG_PUSH(a1)
+#define MY_DBUG_POP()
+#define MY_DBUG_SET(a1)
+#define MY_DBUG_EXPLAIN(buf,len) 1
+#define MY_DBUG_PRINT(keyword, arglist)
+
+#endif // DBUG_OFF
+
+/*
+ * These DBUG functions provide suitable mapping targets for use from Java.
+ */
+
+/** Push the state of the DBUG package */
+inline
+void
+dbugPush(const char * state)
+{
+    MY_DBUG_PUSH(state);
+}
+
+/** Pop the state of the DBUG package */
+inline
+void
+dbugPop()
+{
+    MY_DBUG_POP();
+}
+
+/** Set the state of the DBUG package */
+inline
+void
+dbugSet(const char * state)
+{
+    MY_DBUG_SET(state);
+}
+
+/** Return the state of the DBUG package */
+inline
+const char *
+dbugExplain(char * buffer, int length)
+{
+    if (!MY_DBUG_EXPLAIN(buffer, length)) {
+        return buffer;
+    }
+    return NULL;
+}
+
+/** Print a message */
+inline
+void
+dbugPrint(const char * keyword, const char * message)
+{
+    MY_DBUG_PRINT(keyword, (message));
+}
+
+#endif // dbug_utils_hpp

=== modified file 'storage/ndb/include/util/decimal_utils.hpp'
--- a/storage/ndb/include/util/decimal_utils.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/include/util/decimal_utils.hpp	revid:mikael.ronstrom@stripped
@@ -19,8 +19,8 @@
  * decimal_utils.hpp
  */
 
-#ifndef decimal_utils_h
-#define decimal_utils_h
+#ifndef decimal_utils_hpp
+#define decimal_utils_hpp
 
 /* return values (redeclared here if to be mapped to Java) */
 #define E_DEC_OK                0
@@ -36,11 +36,11 @@
 /* 
  decimal_str2bin: Convert string directly to on-disk binary format. 
  str  - string to convert 
- len - length of string
+ str_len - length of string
  prec - precision of column
  scale - scale of column
- dest - buffer for binary representation 
- len - length of buffer 
+ bin - buffer for binary representation 
+ bin_len - length of buffer 
 
  NOTES
    Added so that NDB API programs can convert directly between  the stored
@@ -59,8 +59,8 @@ int decimal_str2bin(const char *str, int
  bin_len - length to convert 
  prec - precision of column
  scale - scale of column
- dest - buffer for string representation   
- len - length of buffer 
+ str - buffer for string representation   
+ str_len - length of buffer 
 
  NOTES
    Added so that NDB API programs can convert directly between  the stored
@@ -74,4 +74,4 @@ int decimal_bin2str(const void *bin, int
                     int prec, int scale, 
                     char *str, int str_len);
 
-#endif // decimal_utils_h
+#endif // decimal_utils_hpp

=== modified file 'storage/ndb/memcache/CMakeLists.txt'
--- a/storage/ndb/memcache/CMakeLists.txt	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/CMakeLists.txt	revid:mikael.ronstrom@stripped
@@ -93,6 +93,8 @@ set(NDB_MEMCACHE_SOURCE_FILES
     src/hash_item_util.c
     src/ndb_configuration.cc
     src/ndb_engine_private.h
+    src/ndb_error_logger.cc
+    src/ndb_flush.cc
     src/ndb_pipeline.cc
     src/ndb_worker.cc
     src/schedulers
@@ -167,6 +169,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DI
 # Build the perl include file used by mtr
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/memcached_path.pl.in
                ${CMAKE_CURRENT_SOURCE_DIR}/memcached_path.pl)
+
+# Copy the SQL script into /scripts/ in the build directory.
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/scripts/ndb_memcache_metadata.sql
+               ${CMAKE_BINARY_DIR}/scripts/ndb_memcache_metadata.sql COPYONLY)
                
 ######### TARGETS ############
 # Build the convenience library
@@ -201,19 +207,23 @@ set_target_properties(ndb_engine PROPERT
                       LINK_FLAGS "${FINAL_LINK_FLAGS}")
 
 ############ INSTALLER RULES #########
-# Install the ndb_engine.so module
+### Install the ndb_engine.so module
+###
 install(TARGETS ndb_engine DESTINATION ${INSTALL_LIBDIR})
 
-# Install the metadata.sql script
-install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/scripts/ndb_memcache_metadata.sql 
-        DESTINATION ${INSTALL_MYSQLSHAREDIR})
 
 ### Install the memcache-api directory  ################
-install(DIRECTORY DESTINATION memcache-api)
-install(PROGRAMS sandbox.sh DESTINATION memcache-api)
-install(FILES README DESTINATION memcache-api)
+###
+SET(MEMCACHE_API_DIR "${INSTALL_MYSQLSHAREDIR}/memcache-api")
+install(DIRECTORY DESTINATION ${MEMCACHE_API_DIR} )
+install(PROGRAMS sandbox.sh DESTINATION  ${MEMCACHE_API_DIR})
+install(FILES README DESTINATION  ${MEMCACHE_API_DIR})
 install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/scripts/ndb_memcache_metadata.sql 
-        DESTINATION memcache-api)
+        DESTINATION  ${MEMCACHE_API_DIR})
+# Upgrader scripts:
+install(DIRECTORY DESTINATION ${MEMCACHE_API_DIR}/upgrade)
+install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/scripts/update_to_1.2.sql
+        DESTINATION ${MEMCACHE_API_DIR}/upgrade)
         
 # memcached_path.pl is also installed, for use by installed mtr
 install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/memcached_path.pl

=== modified file 'storage/ndb/memcache/include/ExternalValue.h'
--- a/storage/ndb/memcache/include/ExternalValue.h	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/include/ExternalValue.h	revid:mikael.ronstrom@stripped
@@ -54,6 +54,7 @@ public:
   static TableSpec * createContainerRecord(const char *);
   static op_status_t do_write(workitem *);
   static op_status_t do_delete(workitem *);
+  static int do_delete(memory_pool *, NdbTransaction *, QueryPlan *, Operation &);
   static op_status_t do_read_header(workitem *, ndb_async_callback *, worker_step *);
   static void append_after_read(NdbTransaction *, workitem *);
   static bool setupKey(workitem *, Operation &);

=== modified file 'storage/ndb/memcache/include/Operation.h'
--- a/storage/ndb/memcache/include/Operation.h	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/include/Operation.h	revid:mikael.ronstrom@stripped
@@ -107,6 +107,8 @@ public: 
   // delete
   const NdbOperation *deleteTuple(NdbTransaction *tx,
                                   NdbOperation::OperationOptions *options = 0);
+  const NdbOperation *deleteCurrentTuple(NdbScanOperation *, NdbTransaction *,
+                                         NdbOperation::OperationOptions *opts = 0);
 
   // write
   const NdbOperation *writeTuple(NdbTransaction *tx);
@@ -116,10 +118,11 @@ public: 
                                   NdbOperation::OperationOptions *options = 0);
 
   // scan
-  NdbScanOperation *scanTable(NdbTransaction *tx);
+  NdbScanOperation *scanTable(NdbTransaction *tx,
+                              NdbOperation::LockMode lmod = NdbOperation::LM_Read,
+                              NdbScanOperation::ScanOptions *options = 0);
   NdbIndexScanOperation *scanIndex(NdbTransaction *tx,
                                    NdbIndexScanOperation::IndexBound *bound);
-
  };
   
 
@@ -268,8 +271,20 @@ inline const NdbOperation * 
                          row_mask, options);
 }
 
-inline NdbScanOperation * Operation::scanTable(NdbTransaction *tx) {
-  return tx->scanTable(record->ndb_record);
+inline NdbScanOperation * 
+  Operation::scanTable(NdbTransaction *tx, NdbOperation::LockMode lmode,
+                       NdbScanOperation::ScanOptions *opts) {
+    return tx->scanTable(record->ndb_record, lmode,
+                         read_mask_ptr, opts, 0);
 }
 
+inline const NdbOperation * 
+  Operation::deleteCurrentTuple(NdbScanOperation *scanop,
+                                NdbTransaction *tx,
+                                NdbOperation::OperationOptions *opts) {
+    return scanop->deleteCurrentTuple(tx, record->ndb_record, buffer, 
+                                      read_mask_ptr, opts);
+}
+
+
 #endif

=== modified file 'storage/ndb/memcache/include/ndb_engine.h'
--- a/storage/ndb/memcache/include/ndb_engine.h	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/include/ndb_engine.h	revid:mikael.ronstrom@stripped
@@ -64,6 +64,7 @@ struct ndb_engine {
   struct {
     size_t nthreads;
     bool cas_enabled;  
+    size_t verbose;
   } server_options;
   
   union {
@@ -82,5 +83,14 @@ struct ndb_engine {
   ndbmc_atomic32_t cas_lo;
 };
 
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern size_t global_max_item_size;
+
+#ifdef __cplusplus
+};
+#endif
 
 #endif

=== added file 'storage/ndb/memcache/include/ndb_error_logger.h'
--- a/storage/ndb/memcache/include/ndb_error_logger.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/include/ndb_error_logger.h	revid:mikael.ronstrom@stripped
@@ -0,0 +1,53 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+#ifndef NDBMEMCACHE_NDB_ERROR_LOGGER_H
+#define NDBMEMCACHE_NDB_ERROR_LOGGER_H
+
+/* Log NDB error messages, 
+   but take care to prevent flooding the log file with repeated errors.
+*/
+
+
+#include "ndbmemcache_global.h"
+#include "workitem.h"
+
+#ifdef __cplusplus
+#include <NdbApi.hpp>
+
+enum {
+  ERR_SUCCESS = ndberror_st_success,  /* == 0 */
+  ERR_TEMP = ndberror_st_temporary,
+  ERR_PERM = ndberror_st_permanent,
+  ERR_UR   = ndberror_st_unknown
+};
+
+int log_ndb_error(const NdbError &);
+#endif
+
+
+DECLARE_FUNCTIONS_WITH_C_LINKAGE
+
+void ndb_error_logger_init(SERVER_CORE_API *, size_t log_level);
+
+END_FUNCTIONS_WITH_C_LINKAGE
+
+
+#endif
+

=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/include/ndb_pipeline.h	revid:mikael.ronstrom@stripped
@@ -100,7 +100,7 @@ DECLARE_FUNCTIONS_WITH_C_LINKAGE
 ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *);
 
 /** create a generic request pipeline */
-ndb_pipeline * get_request_pipeline();
+ndb_pipeline * get_request_pipeline(int thd_id);
 
 /** call into a pipeline for its own statistics */
 void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *);

=== modified file 'storage/ndb/memcache/scripts/ndb_memcache_metadata.sql'
--- a/storage/ndb/memcache/scripts/ndb_memcache_metadata.sql	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/scripts/ndb_memcache_metadata.sql	revid:mikael.ronstrom@stripped
@@ -252,6 +252,7 @@ INSERT INTO memcache_server_roles (role_
 INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("db-only", 1);
 INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("mc-only", 2);
 INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("ndb-caching", 3);
+INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("large", 4);
 
 -- ndb_clusters table 
 -- Create an entry for the primary cluster.
@@ -303,7 +304,9 @@ INSERT INTO key_prefixes (server_role_id
 
          (3, "",    0, "caching", "demo_table"),    /* ndb-caching role */
          (3, "t:",  0, "caching", "demo_tabs"),
-         (3, "b:",  0, "caching", "demo_ext")
+         (3, "b:",  0, "caching", "demo_ext"),
+         
+         (4, ""  ,  0, "ndb-test", "demo_ext");
 ;
 
 

=== modified file 'storage/ndb/memcache/scripts/update_to_1.2.sql'
--- a/storage/ndb/memcache/scripts/update_to_1.2.sql	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/scripts/update_to_1.2.sql	revid:mikael.ronstrom@stripped
@@ -48,6 +48,8 @@ CREATE TABLE IF NOT EXISTS `external_val
 
 INSERT INTO meta VALUES ("ndbmemcache", "1.2");
 
+INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("large", 4);
+
 UPDATE  containers 
   SET   expire_time_column = "expire_time",
         flags = "flags" 
@@ -67,6 +69,8 @@ INSERT INTO key_prefixes (server_role_id
           (0, "b:",  0, "ndb-test", "demo_ext"),
           (1, "b:",  0, "ndb-only", "demo_ext"),
           (3, "t:",  0, "caching", "demo_tabs"),
-          (3, "b:",  0, "caching", "demo_ext")     ;
+          (3, "b:",  0, "caching", "demo_ext"),
+          (4, ""  ,  0, "ndb-test", "demo_ext");
+
  
 

=== modified file 'storage/ndb/memcache/src/ClusterConnectionPool.cc'
--- a/storage/ndb/memcache/src/ClusterConnectionPool.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ClusterConnectionPool.cc	revid:mikael.ronstrom@stripped
@@ -59,7 +59,7 @@ void store_connection_pool_for_cluster(c
                                        ClusterConnectionPool *p) {
   DEBUG_ENTER();
   if(name == 0) name = "[default]";
-  int name_len = strlen(name);
+  // int name_len = strlen(name);
 
   if(pthread_mutex_lock(& conn_pool_map_lock) == 0) {
     if(conn_pool_map == 0) {

=== modified file 'storage/ndb/memcache/src/Config_v1.cc'
--- a/storage/ndb/memcache/src/Config_v1.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/Config_v1.cc	revid:mikael.ronstrom@stripped
@@ -463,7 +463,6 @@ bool config_v1::store_prefix(const char 
                              TableSpec *table, 
                              int cluster_id, 
                              char *cache_policy) {
-  DEBUG_PRINT("%s", name);
   KeyPrefix prefix(name);
   prefix_info_t * info_ptr;
   

=== modified file 'storage/ndb/memcache/src/ExpireTime.cc'
--- a/storage/ndb/memcache/src/ExpireTime.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ExpireTime.cc	revid:mikael.ronstrom@stripped
@@ -29,8 +29,8 @@
 
 ExpireTime::ExpireTime(workitem *i) :
   item(i),
-  is_expired(false),
-  ndb_expire_time(0)
+  ndb_expire_time(0),
+  is_expired(false)
 {
   SERVER_CORE_API * SERVER = item->pipeline->engine->server.core;
   current_time = SERVER->get_current_time();

=== modified file 'storage/ndb/memcache/src/ExternalValue.cc'
--- a/storage/ndb/memcache/src/ExternalValue.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ExternalValue.cc	revid:mikael.ronstrom@stripped
@@ -70,6 +70,42 @@ TableSpec * ExternalValue::createContain
 }
 
 
+/* This is called from FLUSH_ALL.
+   It returns the number of parts deleted.
+   It uses a memory pool, passed in, to allocate key buffers.
+*/
+int ExternalValue::do_delete(memory_pool *mpool, NdbTransaction *delTx, 
+                             QueryPlan *plan, Operation & op) {
+  Uint32 id, nparts = 0;
+  QueryPlan * extern_plan = plan->extern_store;
+  
+  if(extern_plan 
+     && ! (op.isNull(COL_STORE_EXT_SIZE) || op.isNull(COL_STORE_EXT_ID))) {
+
+    /* How many parts? */
+    Uint32 stripe_size = extern_plan->val_record->value_length;
+    Uint32 len = op.getIntValue(COL_STORE_EXT_SIZE);
+    id  = op.getIntValue(COL_STORE_EXT_ID);  
+    nparts = len / stripe_size;
+    if(len % stripe_size) nparts += 1;
+
+    /* Delete them */
+    int key_size = extern_plan->key_record->rec_size;
+    
+    for(Uint32 i = 0; i < nparts ; i++) {
+      Operation part_op(extern_plan);
+      part_op.key_buffer = (char *) memory_pool_alloc(mpool, key_size);
+      
+      part_op.clearKeyNullBits();
+      part_op.setKeyPartInt(COL_STORE_KEY + 0, id);
+      part_op.setKeyPartInt(COL_STORE_KEY + 1, i);    
+      part_op.deleteTuple(delTx);
+    }
+  }
+  return nparts;
+}
+
+
 inline bool ExternalValue::setupKey(workitem *item, Operation &op) {
   op.key_buffer = item->ndb_key_buffer;
   op.clearKeyNullBits();
@@ -209,15 +245,15 @@ void ExternalValue::append_after_read(Nd
 
 /* Constructor */
 ExternalValue::ExternalValue(workitem *item, NdbTransaction *t) :
+  old_hdr(item->plan->extern_store->val_record->value_length),  // (part size)
+  new_hdr(item->plan->extern_store->val_record->value_length),
+  expire_time(item),
   tx(t),
   wqitem(item),
-  expire_time(item),
   ext_plan(item->plan->extern_store),
-  old_hdr(item->plan->extern_store->val_record->value_length),  // (part size)
-  new_hdr(item->plan->extern_store->val_record->value_length),
   value(0),
-  stored_cas(0),
-  value_size_in_header(item->plan->row_record->value_length)
+  value_size_in_header(item->plan->row_record->value_length),
+  stored_cas(0)
 {
   do_server_cas = (item->prefix_info.has_cas_col && item->cas);
   wqitem->ext_val = this;

=== modified file 'storage/ndb/memcache/src/QueryPlan.cc'
--- a/storage/ndb/memcache/src/QueryPlan.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/QueryPlan.cc	revid:mikael.ronstrom@stripped
@@ -31,9 +31,7 @@
 #include "ExternalValue.h"
 
 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
-extern "C" {
-  size_t global_max_item_size;
-}
+size_t global_max_item_size;
 
 /* For each pair [TableSpec,NDB Object], we can cache some dictionary 
    lookups, acccess path information, NdbRecords, etc.
@@ -78,11 +76,11 @@ inline const NdbDictionary::Column *get_
 QueryPlan::QueryPlan(Ndb *my_ndb, const TableSpec *my_spec, PlanOpts opts)  : 
   initialized(0), 
   dup_numbers(false),
-  db(my_ndb),
-  spec(my_spec), 
   is_scan(false),
-  static_flags(spec->static_flags)
-{  
+  spec(my_spec),
+  static_flags(spec->static_flags),
+  db(my_ndb)
+{
   const NdbDictionary::Column *col;
   bool op_ok = false; 
   bool last_value_col_is_int = false;
@@ -139,7 +137,7 @@ QueryPlan::QueryPlan(Ndb *my_ndb, const 
   /* Key Columns */
   for(int i = 0; i < spec->nkeycols ; i++) {
     col = get_ndb_col(spec, table, spec->key_columns[i]);
-    int this_col_id = col->getColumnNo();
+    // int this_col_id = col->getColumnNo();
     key_record->addColumn(COL_STORE_KEY, col);
     row_record->addColumn(COL_STORE_KEY, col);
   }

=== modified file 'storage/ndb/memcache/src/Record.cc'
--- a/storage/ndb/memcache/src/Record.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/Record.cc	revid:mikael.ronstrom@stripped
@@ -31,9 +31,11 @@
 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
 
 Record::Record(int ncol) : ncolumns(ncol), rec_size(0), nkeys(0), nvalues(0),  
-                           index(0), n_nullable(0),
-                           start_of_nullmap(0), size_of_nullmap(0), 
                            value_length(0),
+                           index(0),
+                           n_nullable(0),
+                           start_of_nullmap(0),
+                           size_of_nullmap(0),
                            handlers(new DataTypeHandler *[ncol]),
                            specs(new NdbDictionary::RecordSpecification[ncol])
 {

=== modified file 'storage/ndb/memcache/src/ndb_configuration.cc'
--- a/storage/ndb/memcache/src/ndb_configuration.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ndb_configuration.cc	revid:mikael.ronstrom@stripped
@@ -31,6 +31,7 @@
 #include "NdbInstance.h"
 #include "thread_identifier.h"
 #include "Scheduler.h"
+#include "ExternalValue.h"
 
 /* A static global variable */
 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
@@ -158,9 +159,10 @@ void disconnect_all() {
 /* This function has C linkage */
 void print_debug_startup_info() {
   int sz[4];
-  DEBUG_PRINT("  sizeof Ndb        : %lu", sz[0]=sizeof(Ndb));
-  DEBUG_PRINT("  sizeof NdbInstance: %lu", sz[1]=sizeof(NdbInstance));
-  DEBUG_PRINT("  sizeof workitem: %lu", sizeof(workitem));
+  DEBUG_PRINT("  sizeof Ndb           : %lu", sz[0]=sizeof(Ndb));
+  DEBUG_PRINT("  sizeof NdbInstance   : %lu", sz[1]=sizeof(NdbInstance));
+  DEBUG_PRINT("  sizeof workitem      : %lu", sizeof(workitem));
+  DEBUG_PRINT("  sizeof ExternalValue : %lu", sizeof(ExternalValue));
 }
 
 

=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ndb_engine.c	revid:mikael.ronstrom@stripped
@@ -38,10 +38,10 @@
 #include "Scheduler.h"
 #include "thread_identifier.h"
 #include "timing.h"
+#include "ndb_error_logger.h"
 
 /* Global variables */
 EXTENSION_LOGGER_DESCRIPTOR *logger;
-size_t global_max_item_size;
 
 /* Static and local to this file */
 const char * set_ops[] = { "","add","set","replace","append","prepend","cas" };
@@ -206,6 +206,9 @@ static ENGINE_ERROR_CODE ndb_initialize(
   fetch_core_settings(ndb_eng, def_eng);
   nthreads = ndb_eng->server_options.nthreads;
 
+  /* Initialize the error handler */
+  ndb_error_logger_init(def_eng->server.core, ndb_eng->server_options.verbose);
+
   logger->log(LOG_WARNING, NULL, "Server started with %d threads.\n", nthreads);
   logger->log(LOG_WARNING, NULL, "Priming the pump ... ");
   timing_point(& pump_time);
@@ -225,7 +228,7 @@ static ENGINE_ERROR_CODE ndb_initialize(
   ndb_eng->pipelines  = malloc(nthreads * sizeof(void *));
   ndb_eng->schedulers = malloc(nthreads * sizeof(void *));
   for(i = 0 ; i < nthreads ; i++) {
-    ndb_eng->pipelines[i] = get_request_pipeline();
+    ndb_eng->pipelines[i] = get_request_pipeline(i);
     ndb_eng->schedulers[i] = 
       initialize_scheduler(ndb_eng->startup_options.scheduler, nthreads, i);
     if(ndb_eng->schedulers[i] == 0) {
@@ -790,6 +793,9 @@ int fetch_core_settings(struct ndb_engin
     { .key = "num_threads",
       .datatype = DT_SIZE,
       .value.dt_size = &engine->server_options.nthreads },
+    { .key = "verbosity",
+      .datatype = DT_SIZE,
+      .value.dt_size = &engine->server_options.verbose },
     { .key = NULL }
   };
   

=== added file 'storage/ndb/memcache/src/ndb_error_logger.cc'
--- a/storage/ndb/memcache/src/ndb_error_logger.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ndb_error_logger.cc	revid:mikael.ronstrom@stripped
@@ -0,0 +1,188 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+/* System headers */
+#include <pthread.h>
+#include <time.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include "memcached/extension_loggers.h"
+#include "memcached/server_api.h"
+
+#include "ndb_error_logger.h"
+
+
+/* ***********************************************************************
+   ndb_error_logger 
+
+   Log NDB error messages, but try to avoid flooding the logfile with them. 
+   *********************************************************************** 
+*/
+
+
+/* Memcached externals */
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+SERVER_CORE_API * core_api;
+size_t verbose_logging;
+
+/* Internal Static Globals and Declarations */
+#define ERROR_HASH_TABLE_SIZE 251
+pthread_mutex_t error_table_lock;
+class ErrorEntry;
+ErrorEntry * error_hash_table[ERROR_HASH_TABLE_SIZE];
+
+/* Prototypes */
+void manage_error(const NdbError &, const char * mesg, rel_time_t interval);
+
+
+
+/********* PUBLIC API *************************************/
+/* Initialize the NDB Error Logger */
+void ndb_error_logger_init(SERVER_CORE_API * api, size_t level) {
+  int r = pthread_mutex_init(& error_table_lock, NULL);
+  if(r) logger->log(LOG_WARNING,0, "CANNOT INIT ERROR MUTEX: %d\n", r);
+  core_api = api;
+  verbose_logging = level;
+  
+  for(int i = 0; i < ERROR_HASH_TABLE_SIZE; i++) 
+    error_hash_table[i] = 0;
+}
+
+
+int log_ndb_error(const NdbError &error) {
+  switch(error.status) {
+    case ndberror_st_success:
+      break;
+
+    case ndberror_st_temporary:
+      manage_error(error, "NDB Temporary Error", 10);
+      break;
+
+    case ndberror_st_permanent:
+    case ndberror_st_unknown:
+      manage_error(error, "NDB Error", 10);
+      break;
+  }
+  /* NDB classifies "Out Of Memory" (827) errors as permament errors, but we 
+     reclassify them to temporary */
+  if(error.classification == NdbError::InsufficientSpace)
+    return ERR_TEMP;
+  return error.status;
+}
+
+
+/********* IMPLEMENTATION *******************************/
+
+class ErrorEntry {
+public:
+  int error_code;
+  rel_time_t first;
+  rel_time_t time[2];   /* odd and even timestamps */
+  Uint32 count;
+  ErrorEntry *next;
+  
+  ErrorEntry(int code, rel_time_t tm) :
+    error_code(code), first(tm), count(1), next(0) 
+  { 
+    time[0] = 0;
+    time[1] = tm;
+  };
+};
+
+
+class Lock {
+public:
+  pthread_mutex_t *mutex;
+  int status;
+  Lock(pthread_mutex_t *m) : mutex(m) { status = pthread_mutex_lock(mutex); }
+  ~Lock()                             { pthread_mutex_unlock(mutex); }
+};
+
+ErrorEntry * error_table_lookup(int code, rel_time_t now);
+
+
+/* Lock the error table and look up an error. 
+   If found, increment the count and set either the odd or even timestamp.
+   If not found, create.
+*/
+ErrorEntry * error_table_lookup(int code, rel_time_t now) {
+  int hash_val = code % ERROR_HASH_TABLE_SIZE;
+  Lock lock(& error_table_lock);
+  ErrorEntry *sym;
+  
+  for(sym = error_hash_table[hash_val] ; sym != 0 ; sym = sym->next) {
+    if(sym->error_code == code) {
+      sym->time[(++(sym->count)) % 2] = now;
+      return sym;
+    }
+  }
+
+  /* Create */
+  sym = new ErrorEntry(code, now);
+  sym->next = error_hash_table[hash_val];
+  error_hash_table[hash_val] = sym;
+  return sym;
+}
+
+
+/* Record the error message, and possibly log it. */
+void manage_error(const NdbError & error, const char *type_mesg, rel_time_t interval) {
+  char note[256];
+  ErrorEntry *entry = 0;
+  bool first_ever, interval_passed, flood = false;
+  int current = 0, prior = 0;  // array indexes
+
+  if(verbose_logging == 0) { 
+    entry = error_table_lookup(error.code, core_api->get_current_time());
+
+    if((entry->count | 1) == entry->count)
+      current = 1;  // odd count
+    else
+      prior   = 1;  // even count
+
+    /* We have four pieces of information: the first timestamp, the two 
+       most recent timestamps, and the error count. When to write a log message?
+       (A) On the first occurrence of an error. 
+       (B) If a time > interval has passed since the previous message.
+       (C) At certain count numbers in error flood situations
+    */
+    first_ever = (entry->count == 1);
+    interval_passed = (entry->time[current] - entry->time[prior] > interval);
+    if(! interval_passed) 
+      for(Uint32 i = 10 ; i <= entry->count ; i *= 10) 
+        if(entry->count < (10 * i) && (entry->count % i == 0))
+          { flood = true; break; }
+  }
+  
+  if(verbose_logging || first_ever || interval_passed || flood) 
+  {
+    if(flood) 
+      snprintf(note, 256, "[occurrence %d of this error]", entry->count);
+    else
+      note[0] = '\0';
+    logger->log(LOG_WARNING, 0, "%s %d: %s %s\n", 
+                type_mesg, error.code, error.message, note);
+  }
+}
+  

=== added file 'storage/ndb/memcache/src/ndb_flush.cc'
--- a/storage/ndb/memcache/src/ndb_flush.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ndb_flush.cc	revid:mikael.ronstrom@stripped
@@ -0,0 +1,290 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+/* System headers */
+#define __STDC_FORMAT_MACROS 
+#include <assert.h>
+#include <ctype.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include <memcached/extension_loggers.h>
+
+/* NDB headers */
+#include "NdbApi.hpp"
+
+/* NDB Memcache headers */
+#include "ndbmemcache_global.h"
+#include "Configuration.h"
+#include "ExternalValue.h"
+#include "debug.h"
+#include "Operation.h"
+#include "NdbInstance.h"
+#include "ndb_pipeline.h"
+#include "ndb_error_logger.h"
+#include "ndb_worker.h"
+
+/* Extern pointers */
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+
+/* Scan helpers */
+
+// nextResult() return values:
+enum { 
+  fetchError         = -1,
+  fetchOK            =  0,
+  fetchScanFinished  =  1,
+  fetchCacheEmpty    =  2
+};
+
+enum { fetchFromThisBatch = false, fetchNewBatchFromKernel = true };
+enum { SendImmediate = true, sendDeferred = false };
+
+bool scan_delete(NdbInstance *, QueryPlan *);
+bool scan_delete_ext_val(ndb_pipeline *, NdbInstance *, QueryPlan *);
+
+
+/*************** SYNCHRONOUS IMPLEMENTATION OF "FLUSH ALL" ******************/
+
+/* Flush all is a fully synchronous operation -- 
+ the memcache server is waiting for a response, and the thread is blocked.
+ */
+ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *pipeline) {
+  DEBUG_ENTER();
+  bool r;
+  const Configuration &conf = get_Configuration();
+  
+  DEBUG_PRINT(" %d prefixes", conf.nprefixes);
+  for(unsigned int i = 0 ; i < conf.nprefixes ; i++) {
+    const KeyPrefix *pfx = conf.getPrefix(i);
+    if(pfx->info.use_ndb && pfx->info.do_db_flush) {
+      ClusterConnectionPool *pool = conf.getConnectionPoolById(pfx->info.cluster_id);
+      Ndb_cluster_connection *conn = pool->getMainConnection();
+      NdbInstance inst(conn, 128);
+      QueryPlan plan(inst.db, pfx->table);
+      if(plan.pk_access) {
+        // To flush, scan the table and delete every row
+        if(plan.canHaveExternalValue()) {
+          DEBUG_PRINT("prefix %d - doing ExternalValue delete");
+          r = scan_delete_ext_val(pipeline, &inst, &plan);
+        }
+        else {
+          DEBUG_PRINT("prefix %d - deleting from %s", i, pfx->table->table_name);
+          r = scan_delete(&inst, &plan);
+        }
+        if(! r) logger->log(LOG_WARNING, 0, "-- FLUSH_ALL Failed.\n");
+      }
+      else DEBUG_PRINT("prefix %d - not scanning table %s -- accees path "
+                       "is not primary key", i, pfx->table->table_name);
+    }
+    else DEBUG_PRINT("prefix %d - not scanning table %s -- use_ndb:%d flush:%d",
+                     i, pfx->table ? pfx->table->table_name : "",
+                     pfx->info.use_ndb, pfx->info.do_db_flush);
+  }
+  
+  return ENGINE_SUCCESS;
+}
+
+
+bool scan_delete(NdbInstance *inst, QueryPlan *plan) {
+  DEBUG_ENTER();
+  bool rescan, fetch_option;
+  int rFetch, rExec, rDel, batch_size, rows_deleted;
+  int error_status = 0;
+  const unsigned int max_errors = 100000;
+  
+  struct {
+    unsigned int errors;
+    unsigned int rows;
+    unsigned short scans;
+    unsigned short commit_batches;
+  } stats = {0, 0, 0, 0 };
+  
+  /* The outer loop performs an initial table scan and then possibly some 
+     rescans, which are triggered whenever some rows have been scanned but, 
+     due to an error condition, have not been deleted.
+   */
+  do {
+    batch_size = 1;   /* slow start. */
+    stats.scans += 1;
+    rescan = false;
+    
+    NdbTransaction *scanTx = inst->db->startTransaction();
+    NdbScanOperation *scan = scanTx->getNdbScanOperation(plan->table);
+    
+    /* Express intent to read with exclusive lock; execute NoCommit */
+    scan->readTuplesExclusive();
+    rExec = scanTx->execute(NdbTransaction::NoCommit);
+    if(rExec != 0) {
+      stats.errors++;
+      error_status = log_ndb_error(scanTx->getNdbError());
+      break;
+    }
+    
+    /* Within a scan, this loop iterates over batches.
+       Batches are committed whenever the batch_size has been reached.
+       Batch size starts at 1 and doubles when a batch is succesful, 
+       until it reaches the result cache size.
+     */
+    while(1) {
+      stats.commit_batches++;
+      NdbTransaction *delTx = inst->db->startTransaction();
+      rows_deleted = 0;
+      fetch_option = fetchNewBatchFromKernel; 
+      bool fetch_more;
+      
+      /* The inner loop iterates over rows within a batch */      
+      do {
+        fetch_more = false;
+        rFetch = scan->nextResult(fetch_option, SendImmediate);
+        switch(rFetch) {
+          case fetchError:
+            stats.errors++;
+            error_status = log_ndb_error(scan->getNdbError());
+            break; 
+            
+          case fetchOK:
+            rDel = scan->deleteCurrentTuple(delTx);
+            if(rDel == 0) {
+              fetch_more = ( ++rows_deleted < batch_size);
+              fetch_option = fetchFromThisBatch;
+            }
+            else {
+              stats.errors++;
+              error_status = log_ndb_error(delTx->getNdbError());
+            }
+            break;
+            
+          case fetchScanFinished:        
+          case fetchCacheEmpty:
+          default:
+            break;        
+        }
+      } while(fetch_more); /* break out of the inner loop to here */
+      
+      /* Quit now if errors were serious */
+      if(error_status > ERR_TEMP)
+        break;
+
+      /* Execute the batch */
+      rExec = delTx->execute(NdbTransaction::Commit, NdbOperation::AbortOnError, 1);
+      if(rExec == 0) {
+        stats.rows += rows_deleted;
+        if(rFetch != fetchCacheEmpty) 
+          batch_size *= 2;
+      }
+      else {
+        stats.errors++;
+        error_status = log_ndb_error(delTx->getNdbError());
+        if(batch_size > 1) 
+          batch_size /= 2;
+        rescan = true;
+      }
+      
+      delTx->close();
+      
+      if(rFetch == fetchScanFinished || (stats.errors > max_errors))
+        break;
+    } /* break out of the batch loop to here */
+    
+    scanTx->close();
+  } while(rescan && (error_status < ERR_PERM) && stats.errors < max_errors);
+  
+  logger->log(LOG_WARNING, 0, "Flushed rows from %s.%s: "
+              "Scans: %d  Batches: %d  Rows: %d  Errors: %d",
+              plan->spec->schema_name, plan->spec->table_name, 
+              stats.scans, stats.commit_batches, stats.rows, stats.errors);
+  
+  return (stats.rows || ! stats.errors);  
+}
+
+
+/* External Values require a different version of FLUSH_ALL, which preserves
+   the referential integrity between the main table and the parts table
+   while deleting.   This one uses the NdbRecord variant of a scan and commits
+   once for each row of the main table.
+*/
+bool scan_delete_ext_val(ndb_pipeline *pipeline, NdbInstance *inst, 
+                         QueryPlan *plan) {
+  DEBUG_ENTER();
+  int r, ext_rows, error_status = 0;
+  bool fetch_more;
+  struct {
+    Uint32 main_rows;
+    Uint32 ext_rows;
+    Uint32 errors;
+  } stats = {0, 0, 0 };
+
+  /* Need KeyInfo when performing scanning delete */
+  NdbScanOperation::ScanOptions opts;
+  opts.optionsPresent=NdbScanOperation::ScanOptions::SO_SCANFLAGS;
+  opts.scan_flags=NdbScanOperation::SF_KeyInfo;
+  
+  memory_pool * pool = pipeline_create_memory_pool(pipeline);
+  NdbTransaction *scanTx = inst->db->startTransaction();
+  Operation op(plan, OP_SCAN);  
+  op.readSelectedColumns();
+  op.readColumn(COL_STORE_EXT_SIZE);
+  op.readColumn(COL_STORE_EXT_ID);
+    
+  NdbScanOperation *scan = op.scanTable(scanTx, NdbOperation::LM_Exclusive, &opts);
+  r = scanTx->execute(NdbTransaction::NoCommit); 
+  
+  if(r == 0) {   /* Here's the scan loop */
+    do {
+      fetch_more = false;
+      r = scan->nextResult((const char **) & op.buffer, true, true);
+      if(r == fetchOK) {
+        fetch_more = true;      
+        NdbTransaction * delTx = inst->db->startTransaction();
+
+        op.deleteCurrentTuple(scan, delTx);                        // main row
+        ext_rows = ExternalValue::do_delete(pool, delTx, plan, op);  // parts
+
+        r = delTx->execute(NdbTransaction::Commit, 
+                           NdbOperation::AbortOnError, 
+                           SendImmediate);
+        if(r)
+          error_status = log_ndb_error(delTx->getNdbError()), stats.errors++;
+        else 
+          stats.main_rows++, stats.ext_rows += ext_rows;
+   
+        memory_pool_free(pool);
+        delTx->close();
+      }
+      else {
+        break;
+      }
+    } while(fetch_more && (error_status < ERR_PERM));
+  }
+  
+  memory_pool_destroy(pool);
+  scanTx->close();
+
+  logger->log(LOG_WARNING, 0, "Flushed %d rows from %s plus "
+              "%d rows from %s.  Errors: %d\n",  
+              stats.main_rows, plan->spec->table_name, 
+              stats.ext_rows, plan->extern_store->spec->table_name,
+              stats.errors);
+  
+  return (stats.main_rows || ! stats.errors);
+}
+
+

=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc	revid:mikael.ronstrom@stripped
@@ -73,7 +73,7 @@ void init_pool_header(allocation_referen
 /* initialize a new pipeline for an NDB engine thread */
 ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *engine) {
   bool did_inc;
-  int id;
+  unsigned int id;
   thread_identifier * tid;
 
   /* Get my pipeline id */
@@ -85,15 +85,14 @@ ndb_pipeline * ndb_pipeline_initialize(s
   /* Fetch the partially initialized pipeline */
   ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id];
   
-  /* Set my id */
-  self->id = id;
-    
+  assert(self->id == id);
+  
   /* Set the pointer back to the engine */
   self->engine = engine;
   
   /* And the thread id */
   self->engine_thread_id = pthread_self(); 
-    
+
   /* Create and set a thread identity */
   tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
   tid->pipeline = self;
@@ -109,13 +108,13 @@ ndb_pipeline * ndb_pipeline_initialize(s
 
 
 /* Allocate and initialize a generic request pipeline */
-ndb_pipeline * get_request_pipeline() { 
+ndb_pipeline * get_request_pipeline(int thd_id) { 
   /* Allocate the pipeline */
   ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline)); 
   
-  /* Initialize */
+  /* Initialize */  
   self->engine = 0;
-  self->id = 0;  
+  self->id = thd_id;
   self->nworkitems = 0;
 
   /* Say hi to the alligator */  

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/ndb_worker.cc	revid:mikael.ronstrom@stripped
@@ -54,6 +54,7 @@
 #include "ndb_engine.h"
 #include "hash_item_util.h"
 #include "ndb_worker.h"
+#include "ndb_error_logger.h"
 
 /**********************************************************
   Schedduler::schedule()
@@ -122,7 +123,6 @@ worker_step worker_finalize_write;
 /* Misc utility functions */
 void worker_set_cas(ndb_pipeline *, uint64_t *);
 int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val);
-bool scan_delete(NdbInstance *, QueryPlan *);
 void build_hash_item(workitem *, Operation &,  ExpireTime &);
 
 /* Extern pointers */
@@ -166,6 +166,9 @@ status_block status_block_idx_insert = 
 status_block status_block_too_big = 
   { ENGINE_E2BIG, "Value too large"                   };
 
+status_block status_block_no_mem =
+  { ENGINE_ENOMEM, "NDB out of data memory"           }; 
+
 void worker_set_cas(ndb_pipeline *p, uint64_t *cas) {  
   /* Be careful here --  ndbmc_atomic32_t might be a signed type.
      Shitfting of signed types behaves differently. */
@@ -676,7 +679,7 @@ op_status_t WorkerStep1::do_math() {
 
 void callback_main(int, NdbTransaction *tx, void *itemptr) {
   workitem *wqitem = (workitem *) itemptr;
-  ndb_pipeline * & pipeline = wqitem->pipeline;
+  // ndb_pipeline * & pipeline = wqitem->pipeline;
     
   /************** Error handling ***********/  
   /* No Error */
@@ -723,10 +726,21 @@ void callback_main(int, NdbTransaction *
   else if(tx->getNdbError().code == 897) {
     wqitem->status = & status_block_idx_insert;
   }
+  /* Out of memory */
+  else if(tx->getNdbError().code == 827) {
+    log_ndb_error(tx->getNdbError());
+    wqitem->status = & status_block_no_mem;
+  }
+  /* 284: Table not defined in TC (stale definition) */
+  else if(tx->getNdbError().code == 284) {
+    /* TODO: find a way to handle this error, after an ALTER TABLE */
+    log_ndb_error(tx->getNdbError());
+    wqitem->status = & status_block_misc_error;
+  }
+  
   /* Some other error */
   else  {
-    DEBUG_PRINT("[%d]: %s", 
-                       tx->getNdbError().code, tx->getNdbError().message);
+    log_ndb_error(tx->getNdbError());
     wqitem->status = & status_block_misc_error;
   }
 
@@ -736,7 +750,7 @@ void callback_main(int, NdbTransaction *
 
 void callback_incr(int result, NdbTransaction *tx, void *itemptr) {
   workitem *wqitem = (workitem *) itemptr;
-  ndb_pipeline * & pipeline = wqitem->pipeline;
+  // ndb_pipeline * & pipeline = wqitem->pipeline;
   
   /*  read  insert  update cr_flag response
    ------------------------------------------------------------------------
@@ -1105,151 +1119,3 @@ int build_cas_routine(NdbInterpretedCode
   return r->finalise();                      // resolve the label/branch
 }
 
-
-/*************** SYNCHRONOUS IMPLEMENTATION OF "FLUSH ALL" ******************/
-
-/* Flush all is a fully synchronous operation -- 
-   the memcache server is waiting for a response, and the thread is blocked.
-*/
-ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *pipeline) {
-  DEBUG_ENTER();
-  const Configuration &conf = get_Configuration();
-  
-  DEBUG_PRINT(" %d prefixes", conf.nprefixes);
-  for(unsigned int i = 0 ; i < conf.nprefixes ; i++) {
-    const KeyPrefix *pfx = conf.getPrefix(i);
-    if(pfx->info.use_ndb && pfx->info.do_db_flush) {
-      ClusterConnectionPool *pool = conf.getConnectionPoolById(pfx->info.cluster_id);
-      Ndb_cluster_connection *conn = pool->getMainConnection();
-      NdbInstance inst(conn, 128);
-      QueryPlan plan(inst.db, pfx->table);
-      if(plan.pk_access) {
-        // To flush, scan the table and delete every row
-        DEBUG_PRINT("prefix %d - deleting from %s", i, pfx->table->table_name);
-        scan_delete(&inst, &plan);
-        // If there is an external store, also delete from the large value table
-        if(plan.canHaveExternalValue()) {
-          DEBUG_PRINT("prefix %d - deleting from %s", i, 
-                      plan.extern_store->spec->table_name);
-          scan_delete(&inst, plan.extern_store);
-        }
-      }
-      else DEBUG_PRINT("prefix %d - not scanning table %s -- accees path "
-                       "is not primary key", i, pfx->table->table_name);
-    }
-    else DEBUG_PRINT("prefix %d - not scanning table %s -- use_ndb:%d flush:%d",
-                     i, pfx->table ? pfx->table->table_name : "",
-                     pfx->info.use_ndb, pfx->info.do_db_flush);
-  }
-  
-  return ENGINE_SUCCESS;
-}
-
-
-bool scan_delete(NdbInstance *inst, QueryPlan *plan) {
-  DEBUG_ENTER();
-  int check;
-  bool rescan;
-  int res = 0;
-  const int max_batch_size = 1000;
-  int batch_size = 1;
-  int delTxRowCount = 0;
-  int force_send = 1;
-  struct {
-    unsigned short scans;
-    unsigned short errors;
-    unsigned short rows;
-    unsigned short commit_batches;
-  } stats = {0, 0, 0, 0 };
-  
-  /* To securely scan a whole table, use an outer transaction only for the scan, 
-     but take over each lock in an inner transaction (with a row count) that 
-     deletes 1000 rows per transaction 
-  */  
-  do {
-    stats.scans += 1;
-    rescan = false;
-    NdbTransaction *scanTx = inst->db->startTransaction();
-    NdbTransaction *delTx = inst->db->startTransaction();
-    NdbScanOperation *scan = scanTx->getNdbScanOperation(plan->table);
-    scan->readTuplesExclusive();
-    
-    /* execute NoCommit */
-    if((res = scanTx->execute(NdbTransaction::NoCommit)) != 0) 
-      logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", 
-                  scanTx->getNdbError().message);
-    
-    /* scan and delete.  delTx takes over the lock. */
-    while(scan->nextResult(true) == 0) {
-      do {
-        if((res = scan->deleteCurrentTuple(delTx)) == 0) {
-          delTxRowCount += 1;
-        }
-        else {      
-          logger->log(LOG_WARNING, 0, "deleteCurrentTuple(): %s\n", 
-                      scanTx->getNdbError().message);
-        }
-       } while((check = scan->nextResult(false)) == 0);
-      
-      /* execute a batch (NoCommit) */
-      if(check != -1) {
-        res = delTx->execute(NdbTransaction::NoCommit,
-                             NdbOperation::AbortOnError, force_send);
-        if(res != 0) {
-          stats.errors += 1;
-          if(delTx->getNdbError().code == 827) { 
-            /* DataMemory is full, and the kernel could not create a Copy Tuple
-               for a deleted row.  Rollback this batch, turn off force-send 
-               (for throttling), make batches smalller, and trigger a
-               rescan to clean up these rows. */
-            rescan = true;
-            delTx->execute(NdbTransaction::Rollback);
-            delTx->close();
-            delTx = inst->db->startTransaction();
-            delTxRowCount = 0;
-            if(batch_size > 1) batch_size /= 2;
-            force_send = 0;
-          }
-          else {
-            logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", 
-                        delTx->getNdbError().message);
-          }
-        }
-      }
-      
-      /* Execute & commit a batch */
-      if(delTxRowCount >= batch_size) {
-        stats.commit_batches += 1;
-        res = delTx->execute(NdbTransaction::Commit, 
-                             NdbOperation::AbortOnError, force_send);
-        if(res != 0) {
-          stats.errors++;
-          logger->log(LOG_WARNING, 0, "execute(Commit): %s\n", 
-                      delTx->getNdbError().message);
-        }
-        stats.rows += delTxRowCount;
-        delTxRowCount = 0;
-        delTx->close();
-        delTx = inst->db->startTransaction();
-        batch_size *= 2;
-        if(batch_size > max_batch_size) {
-          batch_size = max_batch_size;
-          force_send = 1;
-        }
-      }
-    }
-    /* Final execute & commit */
-    res = delTx->execute(NdbTransaction::Commit);
-    delTx->close();
-    scanTx->close();
-
-  } while(rescan);
-  
-  logger->log(EXTENSION_LOG_INFO, 0, "Flushed all rows from %s.%s: "
-              "Scans: %d  Batches: %d  Rows: %d  Errors: %d",
-              plan->spec->schema_name, plan->spec->table_name, 
-              stats.scans, stats.commit_batches, stats.rows, stats.errors);
-
-  return (res == 0);
-}
-

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc	revid:mikael.ronstrom@stripped
@@ -125,7 +125,7 @@ void S::SchedulerGlobal::reconfigure(Con
 
 void S::SchedulerGlobal::shutdown() {
   if(running) {
-    logger->log(LOG_WARNING, 0, "Shutting down scheduler.");
+    logger->log(LOG_INFO, 0, "Shutting down scheduler.");
 
     /* First shut down each WorkerConnection */
     for(int i = 0; i < nclusters ; i++) {
@@ -249,8 +249,6 @@ void S::SchedulerGlobal::add_stats(const
 void S::SchedulerWorker::init(int my_thread, 
                               int nthreads, 
                               const char * config_string) {
-  
-  DEBUG_ENTER_METHOD("S::SchedulerWorker::init");
   /* On the first call in, initialize the SchedulerGlobal.
    * This will start the send & poll threads for each connection.
    */
@@ -298,6 +296,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
     pthread_rwlock_unlock(& reconf_lock);
   }
   else {
+    logger->log(LOG_INFO, 0, "S Scheduler could not acquire read lock");
     return ENGINE_TMPFAIL;
   }
   /* READ LOCK RELEASED */
@@ -316,6 +315,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
      all we can do is return an error. 
      (Or, alternately, the scheduler may be shutting down.)
      */
+    logger->log(LOG_INFO, 0, "No free NDB instances.");
     return ENGINE_TMPFAIL;
   }
   
@@ -426,9 +426,9 @@ void S::SchedulerWorker::add_stats(const
 
 /* Cluster methods */
 S::Cluster::Cluster(SchedulerGlobal *global, int _id) : 
-  cluster_id(_id), 
-  nreferences(0),
-  threads_started(false)
+  threads_started(false),
+  cluster_id(_id),
+  nreferences(0)
 {
   DEBUG_PRINT("%d", cluster_id);
   
@@ -514,7 +514,6 @@ void S::Cluster::add_stats(const char *s
 
 S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
                                       int thd_id, int cluster_id) {
-  DEBUG_ENTER_METHOD("S::WorkerConnection::WorkerConnection");
   S::Cluster *cl = global->clusters[cluster_id];  
   Configuration *conf = global->conf;
 

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	revid:mikael.ronstrom@stripped
@@ -88,7 +88,7 @@ void Scheduler_stockholm::init(int my_th
      TODO? Start one tx on each data node.
   */
   QueryPlan *plan;
-  const KeyPrefix *default_prefix = conf.getDefaultPrefix();  // TODO: something
+//  const KeyPrefix *default_prefix = conf.getDefaultPrefix();  // TODO: something
   for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
     const KeyPrefix *prefix = conf.getNextPrefixForCluster(c, NULL); 
     if(prefix) {

=== modified file 'storage/ndb/memcache/unit/alloc.cc'
--- a/storage/ndb/memcache/unit/alloc.cc	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/memcache/unit/alloc.cc	revid:mikael.ronstrom@stripped
@@ -28,7 +28,7 @@
 #include "all_tests.h"
 
 int run_allocator_test(QueryPlan *, Ndb *, int v) {
-  struct request_pipeline *p = get_request_pipeline();
+  struct request_pipeline *p = get_request_pipeline(0);
   
   memory_pool *p1 = pipeline_create_memory_pool(p);
   int sz = 13;

=== modified file 'storage/ndb/src/common/util/testMysqlUtils/mysql_utils_test.cpp'
--- a/storage/ndb/src/common/util/testMysqlUtils/mysql_utils_test.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/common/util/testMysqlUtils/mysql_utils_test.cpp	revid:mikael.ronstrom@stripped
@@ -23,6 +23,7 @@
 #include <stdlib.h> // not using namespaces yet
 #include <assert.h> // not using namespaces yet
 
+#include "dbug_utils.hpp"
 #include "decimal_utils.hpp"
 #include "CharsetMap.hpp"
 
@@ -30,6 +31,48 @@
 #include "my_sys.h"
 #include "mysql.h"
 
+void test_dbug_utils()
+{
+    const int DBUG_BUF_SIZE = 1024;
+    char buffer[DBUG_BUF_SIZE];
+
+    const char * s0 = "";
+    const char * s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+    assert(!strcmp(s1, s0));
+
+    s1 = dbugExplain(NULL, DBUG_BUF_SIZE);
+    assert(s1 == NULL);
+
+    s1 = dbugExplain(buffer, 0);
+    assert(s1 == NULL);
+
+    s0 = "t";
+    dbugSet(s0);
+    s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+    assert(!strcmp(s1, s0));
+
+    dbugSet(NULL);
+    s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+    assert(!strcmp(s1, s0));
+
+    const char * s2 = "d,jointx:o,/tmp/jointx";
+    dbugPush(s2);
+    s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+    assert(!strcmp(s1, s2));
+
+    dbugPush(NULL);
+    s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+    assert(!strcmp(s1, s2));
+
+    dbugPop();
+    s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+    assert(!strcmp(s1, s0));
+
+    dbugPush(NULL);
+    s1 = dbugExplain(buffer, DBUG_BUF_SIZE);
+    assert(!strcmp(s1, s0));
+}
+
 void test_decimal(const char *s, int prec, int scale, int expected_rv) 
 {
     char bin_buff[128], str_buff[128];
@@ -51,14 +94,15 @@ void test_decimal(const char *s, int pre
     }
 }
 
-
 int main()
 {
-    printf("==== init MySQL lib ====\n");
+    printf("==== BEGIN: MySQL Utils Unit Test ====\n");
+
+    printf("\n==== init MySQL lib, CharsetMap ====\n");
     my_init();
     CharsetMap::init();
 
-    printf("==== decimal_str2bin() / decimal_bin2str() ====\n");
+    printf("\n==== decimal_str2bin() / decimal_bin2str() ====\n");
     
     test_decimal("100", 3, -1, E_DEC_BAD_SCALE); 
     test_decimal("3.3", 2, 1, E_DEC_OK);
@@ -128,7 +172,7 @@ int main()
     lengths[1] = 32;
     CharsetMap::RecodeStatus rr1 = csmap.recode(lengths, utf8_num, latin1_num, 
                                                 my_word_utf8, result_buff_1);
-    printf("Recode Test 1 - UTF-8 to Latin-1: %d %ld %ld \"%s\" => \"%s\" \n", 
+    printf("Recode Test 1 - UTF-8 to Latin-1: %d %d %d \"%s\" => \"%s\" \n", 
            rr1, lengths[0], lengths[1], my_word_utf8, result_buff_1);
     assert(rr1 == CharsetMap::RECODE_OK);
     assert(lengths[0] == 7);
@@ -140,7 +184,7 @@ int main()
     lengths[1] = 32;
     CharsetMap::RecodeStatus rr2 = csmap.recode(lengths, latin1_num, utf8_num,
                                                 my_word_latin1, result_buff_2);
-    printf("Recode Test 2 - Latin-1 to UTF-8: %d %ld %ld \"%s\" => \"%s\" \n", 
+    printf("Recode Test 2 - Latin-1 to UTF-8: %d %d %d \"%s\" => \"%s\" \n", 
            rr2, lengths[0], lengths[1], my_word_latin1, result_buff_2);
     assert(rr2 == CharsetMap::RECODE_OK);
     assert(lengths[0] == 6);
@@ -152,7 +196,7 @@ int main()
     lengths[1] = 4;
     CharsetMap::RecodeStatus rr3 = csmap.recode(lengths, latin1_num, utf8_num,
                                                 my_word_latin1, result_buff_too_small);
-    printf("Recode Test 3 - too-small buffer: %d %ld %ld \"%s\" => \"%s\" \n", 
+    printf("Recode Test 3 - too-small buffer: %d %d %d \"%s\" => \"%s\" \n", 
            rr3, lengths[0], lengths[1], my_word_latin1, result_buff_too_small);
     assert(rr3 == CharsetMap::RECODE_BUFF_TOO_SMALL);
     assert(lengths[0] == 3);
@@ -201,7 +245,11 @@ int main()
     // If there is not at least one of each, then something is probably wrong
     assert(nNull && nSingle && nMulti);
   
+    printf("\n==== DBUG Utilities ====\n");    
+    test_dbug_utils();    
     
-    
+    printf("\n==== unload CharsetMap ====\n");
     CharsetMap::unload();
+
+    printf("\n==== END: MySQL Utils Unit Test ====\n");
 }

=== modified file 'storage/ndb/src/common/util/testMysqlUtils/mysql_utils_unit_tests-t'
--- a/storage/ndb/src/common/util/testMysqlUtils/mysql_utils_unit_tests-t	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/common/util/testMysqlUtils/mysql_utils_unit_tests-t	revid:mikael.ronstrom@stripped
@@ -39,7 +39,7 @@ echo "running test from directory:" >> "
 echo "  $script_dir" >> "$log" 2>&1
 
 test_name="test_mysql_utils"
-script_name="$script_dir/$test_mysql_utils.sh"
+script_name="$script_dir/$test_name.sh"
 
 if [ ! -x "$script_name" ];
 then

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp	revid:mikael.ronstrom@stripped
@@ -27,8 +27,6 @@ LocalProxy::LocalProxy(BlockNumber block
   for (i = 0; i < MaxWorkers; i++)
     c_worker[i] = 0;
 
-  c_ssIdSeq = 0;
-
   c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
   c_masterNodeId = ZNIL;
 

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp	revid:mikael.ronstrom@stripped
@@ -192,11 +192,17 @@ protected:
 
   template <class Ss>
   Ss& ssSeize() {
-    const Uint32 base = SsIdBase;
-    const Uint32 mask = ~base;
-    const Uint32 ssId = base | c_ssIdSeq;
-    c_ssIdSeq = (c_ssIdSeq + 1) & mask;
-    return ssSeize<Ss>(ssId);
+    SsPool<Ss>& sp = Ss::pool(this);
+    Ss* ssptr = ssSearch<Ss>(0);
+    ndbrequire(ssptr != 0);
+    // Use position in array as ssId
+    UintPtr pos = ssptr - sp.m_pool;
+    Uint32 ssId = Uint32(pos) + 1;
+    new (ssptr) Ss;
+    ssptr->m_ssId = ssId;
+    sp.m_usage++;
+    D("ssSeize()" << V(sp.m_usage) << hex << V(ssId) << " " << Ss::name());
+    return *ssptr;
   }
 
   template <class Ss>

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	revid:mikael.ronstrom@stripped
@@ -1886,7 +1886,8 @@ private:
   } m_local_lcp_state;
 
   // MT LQH
-  Uint32 c_fragments_per_node;
+  Uint32 c_fragments_per_node_;
+  Uint32 getFragmentsPerNode();
   Uint32 dihGetInstanceKey(FragmentstorePtr tFragPtr) {
     ndbrequire(!tFragPtr.isNull());
     Uint32 log_part_id = tFragPtr.p->m_log_part_id;

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	revid:mikael.ronstrom@stripped
@@ -319,7 +319,7 @@ Dbdih::Dbdih(Block_context& ctx):
   nodeGroupRecord = 0;
   nodeRecord = 0;
   c_nextNodeGroup = 0;
-  c_fragments_per_node = 1;
+  c_fragments_per_node_ = 0;
   bzero(c_node_groups, sizeof(c_node_groups));
   if (globalData.ndbMtTcThreads == 0)
   {

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	revid:mikael.ronstrom@stripped
@@ -1312,14 +1312,13 @@ void Dbdih::execREAD_CONFIG_REQ(Signal* 
   if (isNdbMtLqh())
   {
     jam();
-    c_fragments_per_node = getLqhWorkers();
+    c_fragments_per_node_ = 0;
     // try to get some LQH workers which initially handle no fragments
     if (ERROR_INSERTED(7215)) {
-      c_fragments_per_node = 1;
+      c_fragments_per_node_ = 1;
+      ndbout_c("Using %u fragments per node", c_fragments_per_node_);
     }
   }
-  ndbout_c("Using %u fragments per node", c_fragments_per_node);
-  
   ndb_mgm_get_int_parameter(p, CFG_DB_LCP_TRY_LOCK_TIMEOUT, 
                             &c_lcpState.m_lcp_trylock_timeout);
 
@@ -7545,6 +7544,42 @@ static Uint32 find_min_index(const Uint3
   return m;
 }
 
+Uint32
+Dbdih::getFragmentsPerNode()
+{
+  jam();
+  if (c_fragments_per_node_ != 0)
+  {
+    return c_fragments_per_node_;
+  }
+
+  c_fragments_per_node_ = getLqhWorkers();
+  if (c_fragments_per_node_ == 0)
+    c_fragments_per_node_ = 1; // ndbd
+
+  NodeRecordPtr nodePtr;
+  nodePtr.i = cfirstAliveNode;
+  do
+  {
+    jam();
+    ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
+    Uint32 workers = getNodeInfo(nodePtr.i).m_lqh_workers;
+    if (workers == 0) // ndbd
+      workers = 1;
+
+    c_fragments_per_node_ = MIN(workers, c_fragments_per_node_);
+    nodePtr.i = nodePtr.p->nextNode;
+  } while (nodePtr.i != RNIL);
+
+  if (c_fragments_per_node_ == 0)
+  {
+    ndbassert(false);
+    c_fragments_per_node_ = 1;
+  }
+  ndbout_c("Using %u fragments per node", c_fragments_per_node_);
+  return c_fragments_per_node_;
+}
+
 void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal)
 {
   Uint16 node_group_id[MAX_NDB_PARTITIONS];
@@ -7561,11 +7596,9 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
   const Uint32 flags = req->requestInfo;
 
   Uint32 err = 0;
-  const Uint32 defaultFragments = 
-    c_fragments_per_node * cnoOfNodeGroups * cnoReplicas;
-  const Uint32 maxFragments =
-    MAX_FRAG_PER_LQH * (getLqhWorkers() ? getLqhWorkers() : 1) *
-    cnoOfNodeGroups * cnoReplicas;
+  const Uint32 defaultFragments =
+    getFragmentsPerNode() * cnoOfNodeGroups * cnoReplicas;
+  const Uint32 maxFragments = MAX_FRAG_PER_LQH * defaultFragments;
 
   do {
     NodeGroupRecordPtr NGPtr;
@@ -16190,8 +16223,8 @@ void Dbdih::execCHECKNODEGROUPSREQ(Signa
   case CheckNodeGroups::GetDefaultFragments:
     jam();
     ok = true;
-    sd->output = (cnoOfNodeGroups + sd->extraNodeGroups) 
-      * c_fragments_per_node * cnoReplicas;
+    sd->output = (cnoOfNodeGroups + sd->extraNodeGroups)
+      * getFragmentsPerNode() * cnoReplicas;
     break;
   }
   ndbrequire(ok);

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	revid:mikael.ronstrom@stripped
@@ -4397,6 +4397,7 @@ MgmtSrvr::request_events(NdbNodeBitmask 
                          Vector<SimpleSignal>& events)
 {
   int nodes_counter[MAX_NDB_NODES];
+  NdbNodeBitmask save = nodes;
   SignalSender ss(theFacade);
   ss.lock();
 
@@ -4447,6 +4448,10 @@ MgmtSrvr::request_events(NdbNodeBitmask 
       if (!nodes.get(nodeid))
       {
         // The reporting node was not expected
+#ifndef NDEBUG
+        ndbout_c("nodeid: %u", nodeid);
+        ndbout_c("save: %s", BaseString::getPrettyText(save).c_str());
+#endif
         assert(false);
         return false;
       }

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStat.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStat.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbapi/NdbIndexStat.cpp	revid:mikael.ronstrom@stripped
@@ -369,6 +369,7 @@ NdbIndexStat::clean_cache()
 void
 NdbIndexStat::get_cache_info(CacheInfo& info, CacheType type) const
 {
+  NdbMutex_Lock(m_impl.m_query_mutex);
   const NdbIndexStatImpl::Cache* c = 0;
   switch (type) {
   case CacheBuild:
@@ -387,6 +388,7 @@ NdbIndexStat::get_cache_info(CacheInfo& 
   info.m_totalBytes = 0;
   info.m_save_time = 0;
   info.m_sort_time = 0;
+  info.m_ref_count = 0;
   while (c != 0)
   {
     info.m_count += 1;
@@ -395,10 +397,12 @@ NdbIndexStat::get_cache_info(CacheInfo& 
     info.m_totalBytes += c->m_keyBytes + c->m_valueBytes + c->m_addrBytes;
     info.m_save_time += c->m_save_time;
     info.m_sort_time += c->m_sort_time;
+    info.m_ref_count += c->m_ref_count;
     c = c->m_nextClean;
   }
   // build and query cache have at most one instance
   require(type == CacheClean || info.m_count <= 1);
+  NdbMutex_Unlock(m_impl.m_query_mutex);
 }
 
 // read

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	revid:mikael.ronstrom@stripped
@@ -23,6 +23,7 @@
 #include <NdbSqlUtil.hpp>
 #include <NdbRecord.hpp>
 #include <NdbEventOperation.hpp>
+#include <NdbSleep.h>
 #include "NdbIndexStatImpl.hpp"
 
 #undef min
@@ -1460,6 +1461,8 @@ NdbIndexStatImpl::Cache::Cache()
   // performance
   m_save_time = 0;
   m_sort_time = 0;
+  // in use by query_stat
+  m_ref_count = 0;
 }
 
 int
@@ -2002,6 +2005,21 @@ NdbIndexStatImpl::convert_range(Range& r
       return -1;
     }
   }
+
+#ifdef VM_TRACE
+  {
+    const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_RANGE_ERROR", (char*)0, 0);
+    if (p != 0 && strchr("1Y", p[0]) != 0)
+    {
+      if (rand() % 10 == 0)
+      {
+        setError(InternalError, __LINE__, NdbIndexStat::InternalError);
+        return -1;
+      }
+    }
+  }
+#endif
+
   return 0;
 }
 
@@ -2033,23 +2051,41 @@ int
 NdbIndexStatImpl::query_stat(const Range& range, Stat& stat)
 {
   NdbMutex_Lock(m_query_mutex);
-  const Cache* cacheTmp = m_cacheQuery;
-  NdbMutex_Unlock(m_query_mutex);
-
-  if (unlikely(cacheTmp == 0))
+  if (unlikely(m_cacheQuery == 0))
   {
+    NdbMutex_Unlock(m_query_mutex);
     setError(UsageError, __LINE__);
     return -1;
   }
-  const Cache& c = *cacheTmp;
+  const Cache& c = *m_cacheQuery;
   if (unlikely(!c.m_valid))
   {
+    NdbMutex_Unlock(m_query_mutex);
     setError(InvalidCache, __LINE__);
     return -1;
   }
+  c.m_ref_count++;
+  NdbMutex_Unlock(m_query_mutex);
+
+#ifdef VM_TRACE
+  {
+    const char* p = NdbEnv_GetEnv("NDB_INDEX_STAT_SLOW_QUERY", (char*)0, 0);
+    if (p != 0 && strchr("1Y", p[0]) != 0)
+    {
+      int ms = 1 + rand() % 20;
+      NdbSleep_MilliSleep(ms);
+    }
+  }
+#endif
 
+  // clients run these in parallel
   query_interpolate(c, range, stat);
   query_normalize(c, stat.m_value);
+
+  NdbMutex_Lock(m_query_mutex);
+  assert(c.m_ref_count != 0);
+  c.m_ref_count--;
+  NdbMutex_Unlock(m_query_mutex);
   return 0;
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	revid:mikael.ronstrom@stripped
@@ -70,7 +70,7 @@ public:
   Cache* m_cacheBuild;
   Cache* m_cacheQuery;
   Cache* m_cacheClean;
-  // mutex for query cache switch, memory barrier would do
+  // mutex for query cache switch and reference count
   NdbMutex* m_query_mutex;
   NdbEventOperation* m_eventOp;
   Mem* m_mem_handler;
@@ -185,6 +185,8 @@ public:
     // performance
     mutable Uint64 m_save_time;
     mutable Uint64 m_sort_time;
+    // in use by query_stat
+    mutable uint m_ref_count;
     Cache();
     // pos is index < sampleCount, addr is offset in keyArray
     uint get_keyaddr(uint pos) const;

=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbapi/ndberror.c	revid:mikael.ronstrom@stripped
@@ -552,6 +552,11 @@ ErrorBundle ErrorCodes[] = {
   { 4718, DMEC, IE, "Index stats samples data or memory cache is invalid" },
   { 4719, DMEC, IE, "Index stats internal error" },
   { 4720, DMEC, AE, "Index stats sys tables " NDB_INDEX_STAT_PREFIX " partly missing or invalid" },
+  { 4721, DMEC, IE, "Mysqld: index stats thread not open for requests" },
+  { 4722, DMEC, IE, "Mysqld: index stats entry unexpectedly not found" },
+  { 4723, DMEC, AE, "Mysqld: index stats request ignored due to recent error" },
+  { 4724, DMEC, AE, "Mysqld: index stats request aborted by stats thread" },
+  { 4725, DMEC, AE, "Index stats were deleted by another process" },
   
   /**
    * Still uncategorized

=== modified file 'storage/ndb/src/ndbjtie/MysqlUtilsWrapper.hpp'
--- a/storage/ndb/src/ndbjtie/MysqlUtilsWrapper.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbjtie/MysqlUtilsWrapper.hpp	revid:mikael.ronstrom@stripped
@@ -24,6 +24,7 @@
 // API to wrap
 #include "CharsetMap.hpp"
 #include "decimal_utils.hpp"
+#include "dbug_utils.hpp"
 
 struct MysqlUtilsWrapper {
 
@@ -100,7 +101,42 @@ struct MysqlUtilsWrapper {
     {
         return ::decimal_bin2str(p0, p1, p2, p3, p4, p5);
     }
-    
+
+    static void
+    dbugPush
+    ( const char * p0 )
+    {
+        ::dbugPush(p0);
+    }
+
+    static void
+    dbugPop
+    ()
+    {
+        ::dbugPop();
+    }
+
+    static void
+    dbugSet
+    ( const char * p0 )
+    {
+        ::dbugSet(p0);
+    }
+
+    static const char *
+    dbugExplain
+    ( char * p0, int p1 )
+    {
+        return ::dbugExplain(p0, p1);
+    }
+
+    static void
+    dbugPrint
+    ( const char * p0, const char * p1 )
+    {
+        ::dbugPrint(p0, p1);
+    }
+
 // ---------------------------------------------------------------------------
 
 };

=== modified file 'storage/ndb/src/ndbjtie/com/mysql/ndbjtie/mysql/Utils.java'
--- a/storage/ndb/src/ndbjtie/com/mysql/ndbjtie/mysql/Utils.java	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbjtie/com/mysql/ndbjtie/mysql/Utils.java	revid:mikael.ronstrom@stripped
@@ -1,20 +1,20 @@
 /*
-  Copyright 2010 Sun Microsystems, Inc.
-  All rights reserved. Use is subject to license terms.
-
-  This program is free software; you can redistribute it and/or modify
-  it under the terms of the GNU General Public License as published by
-  the Free Software Foundation; version 2 of the License.
-
-  This program is distributed in the hope that it will be useful,
-  but WITHOUT ANY WARRANTY; without even the implied warranty of
-  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-  GNU General Public License for more details.
+ *  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
 
-  You should have received a copy of the GNU General Public License
-  along with this program; if not, write to the Free Software
-  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
-*/
 /*
  * Utils.java
  */
@@ -36,4 +36,9 @@ public class Utils
     //static public final int E_DEC_FATAL_ERROR = 30; // MMM not used at this time?
     static public final native int decimal_str2bin(ByteBuffer/*_const char *_*/ str, int str_len, int prec, int scale, ByteBuffer/*_void *_*/ dest, int buf_len);
     static public final native int decimal_bin2str(ByteBuffer/*_const void *_*/ bin, int bin_len, int prec, int scale, ByteBuffer/*_char *_*/ dest, int buf_len);
+    static public final native void dbugPush(String state);
+    static public final native void dbugPop();
+    static public final native void dbugSet(String state);
+    static public final native String dbugExplain(ByteBuffer buffer, int length);
+    static public final native void dbugPrint(String keyword, String message);
 }

=== modified file 'storage/ndb/src/ndbjtie/mysql_utils_jtie.hpp'
--- a/storage/ndb/src/ndbjtie/mysql_utils_jtie.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/src/ndbjtie/mysql_utils_jtie.hpp	revid:mikael.ronstrom@stripped
@@ -243,6 +243,86 @@ Java_com_mysql_ndbjtie_mysql_Utils_decim
 #endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
 }
 
+/*
+ * Class:     com_mysql_ndbjtie_mysql_Utils
+ * Method:    dbugPush
+ * Signature: (Ljava/lang/String;)V
+ */
+JNIEXPORT void JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugPush(JNIEnv * env, jclass cls, jstring p0)
+{
+    TRACE("void Java_com_mysql_ndbjtie_mysql_Utils_dbugPush(JNIEnv *, jclass, jstring)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+    gcall_fv< ttrait_char_cp_jutf8null, &::dbugPush >(env, cls, p0);
+#else
+    gcall_fv< ttrait_char_cp_jutf8null, &MysqlUtilsWrapper::dbugPush >(env, cls, p0);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
+/*
+ * Class:     com_mysql_ndbjtie_mysql_Utils
+ * Method:    dbugPop
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugPop(JNIEnv * env, jclass cls)
+{
+    TRACE("void Java_com_mysql_ndbjtie_mysql_Utils_dbugPop(JNIEnv *, jclass)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+    gcall_fv< &::dbugPop >(env, cls);
+#else
+    gcall_fv< &MysqlUtilsWrapper::dbugPop >(env, cls);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
+/*
+ * Class:     com_mysql_ndbjtie_mysql_Utils
+ * Method:    dbugSet
+ * Signature: (Ljava/lang/String;)V
+ */
+JNIEXPORT void JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugSet(JNIEnv * env, jclass cls, jstring p0)
+{
+    TRACE("void Java_com_mysql_ndbjtie_mysql_Utils_dbugSet(JNIEnv *, jclass, jstring)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+    gcall_fv< ttrait_char_cp_jutf8null, &::dbugSet >(env, cls, p0);
+#else
+    gcall_fv< ttrait_char_cp_jutf8null, &MysqlUtilsWrapper::dbugSet >(env, cls, p0);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
+/*
+ * Class:     com_mysql_ndbjtie_mysql_Utils
+ * Method:    dbugExplain
+ * Signature: (Ljava/nio/ByteBuffer;I)Ljava/lang/String;
+ */
+JNIEXPORT jstring JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugExplain(JNIEnv * env, jclass cls, jobject p0, jint p1)
+{
+    TRACE("jstring Java_com_mysql_ndbjtie_mysql_Utils_dbugExplain(JNIEnv *, jclass, jobject, jint)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+    return gcall_fr< ttrait_char_cp_jutf8null, ttrait_char_0p_bb, ttrait_int, &::dbugExplain >(env, cls, p0, p1);
+#else
+    return gcall_fr< ttrait_char_cp_jutf8null, ttrait_char_0p_bb, ttrait_int, &MysqlUtilsWrapper::dbugExplain >(env, cls, p0, p1);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
+/*
+ * Class:     com_mysql_ndbjtie_mysql_Utils
+ * Method:    dbugPrint
+ * Signature: (Ljava/lang/String;Ljava/lang/String;)V
+ */
+JNIEXPORT void JNICALL
+Java_com_mysql_ndbjtie_mysql_Utils_dbugPrint(JNIEnv * env, jclass cls, jstring p0, jstring p1)
+{
+    TRACE("void Java_com_mysql_ndbjtie_mysql_Utils_dbugPrint(JNIEnv *, jclass, jstring, jstring)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+    gcall_fv< ttrait_char_cp_jutf8null, ttrait_char_cp_jutf8null, &::dbugPrint >(env, cls, p0, p1);
+#else
+    gcall_fv< ttrait_char_cp_jutf8null, ttrait_char_cp_jutf8null, &MysqlUtilsWrapper::dbugPrint >(env, cls, p0, p1);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
 // ---------------------------------------------------------------------------
 
 } // extern "C"

=== modified file 'storage/ndb/test/include/NDBT_Test.hpp'
--- a/storage/ndb/test/include/NDBT_Test.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/include/NDBT_Test.hpp	revid:mikael.ronstrom@stripped
@@ -100,6 +100,13 @@ public:
    */
   void sync_down(const char * key);
   void sync_up_and_wait(const char * key, Uint32 count = 0);
+
+  /**
+   * safety for slow machines...
+   * 0 means no safety
+   */
+  bool closeToTimeout(int safety_percent = 0);
+
 private:
   friend class NDBT_Step;
   friend class NDBT_TestSuite;
@@ -120,6 +127,9 @@ private:
   Properties props;
   NdbMutex* propertyMutexPtr;
   NdbCondition* propertyCondPtr;
+
+  int m_env_timeout;
+  Uint64 m_test_start_time;
 };
 
 typedef int (NDBT_TESTFUNC)(NDBT_Context*, NDBT_Step*);

=== modified file 'storage/ndb/test/include/NdbRestarts.hpp'
--- a/storage/ndb/test/include/NdbRestarts.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/include/NdbRestarts.hpp	revid:mikael.ronstrom@stripped
@@ -56,7 +56,8 @@ public:
   };
 
   struct NdbRestart {
-    typedef int (restartFunc)(NDBT_Context*, NdbRestarter&, const NdbRestart*);
+    typedef int (restartFunc)(NDBT_Context*, NdbRestarter&, const NdbRestart*,
+                              int safety);
     
     NdbRestart(const char* _name,
 	       NdbRestartType _type,
@@ -74,13 +75,16 @@ public:
 
   int getNumRestarts();
 
-  int executeRestart(NDBT_Context*, int _num, unsigned int _to = 120);
-  int executeRestart(NDBT_Context*, const char* _name, unsigned int _to = 120);
+  int executeRestart(NDBT_Context*, int _num,
+                     unsigned int _to = 120, int safety = 0);
+  int executeRestart(NDBT_Context*, const char* _name,
+                     unsigned int _to = 120, int safety = 0);
 
   void listRestarts();
   void listRestarts(NdbRestartType _type);
 private:
-  int executeRestart(NDBT_Context*, const NdbRestart*, unsigned int _timeout);
+  int executeRestart(NDBT_Context*, const NdbRestart*,
+                     unsigned int _timeout, int safety);
 
   struct NdbErrorInsert {
     NdbErrorInsert(const char* _name,

=== modified file 'storage/ndb/test/ndbapi/testDict.cpp'
--- a/storage/ndb/test/ndbapi/testDict.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/ndbapi/testDict.cpp	revid:mikael.ronstrom@stripped
@@ -9007,9 +9007,10 @@ runIndexStatCreate(NDBT_Context* ctx, ND
        * OK
        */
     }
-    else if (! (is.getNdbError().code == 721 ||
-                is.getNdbError().code == 4244 ||
-                is.getNdbError().code == 4009)) // no connection
+    else if (! (is.getNdbError().code == 701  || // timeout
+                is.getNdbError().code == 721  || // already exists
+                is.getNdbError().code == 4244 || // already exists
+                is.getNdbError().code == 4009))  // no connection
     {
       ndbout << is.getNdbError() << endl;
       return NDBT_FAILED;

=== modified file 'storage/ndb/test/ndbapi/testNodeRestart.cpp'
--- a/storage/ndb/test/ndbapi/testNodeRestart.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp	revid:mikael.ronstrom@stripped
@@ -450,13 +450,20 @@ int runRestarts(NDBT_Context* ctx, NDBT_
   int i = 0;
   int timeout = 240;
 
-  while(i<loops && result != NDBT_FAILED && !ctx->isTestStopped()){
-    
-    if(restarts.executeRestart(ctx, pCase->getName(), timeout) != 0){
+  while (i<loops && result != NDBT_FAILED && !ctx->isTestStopped())
+  {
+    int safety = 0;
+    if (i > 0)
+      safety = 15;
+
+    if (ctx->closeToTimeout(safety))
+      break;
+
+    if(restarts.executeRestart(ctx, pCase->getName(), timeout, safety) != 0){
       g_err << "Failed to executeRestart(" <<pCase->getName() <<")" << endl;
       result = NDBT_FAILED;
       break;
-    }    
+    }
     i++;
   }
   ctx->stopTest();

=== modified file 'storage/ndb/test/run-test/atrt.hpp'
--- a/storage/ndb/test/run-test/atrt.hpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/atrt.hpp	revid:mikael.ronstrom@stripped
@@ -118,9 +118,15 @@ struct atrt_testcase 
   bool m_report;
   bool m_run_all;
   time_t m_max_time;
-  BaseString m_command;
-  BaseString m_args;
   BaseString m_name;
+  BaseString m_mysqld_options;
+
+  struct Command
+  {
+    atrt_process::Type m_cmd_type;
+    BaseString m_exe;
+    BaseString m_args;
+  } m_cmd; // Todo make array of these...
 };
 
 extern Logger g_logger;
@@ -155,6 +161,9 @@ bool do_command(atrt_config& config);
 bool start_process(atrt_process & proc);
 bool stop_process(atrt_process & proc);
 
+bool connect_mysqld(atrt_process & proc);
+bool disconnect_mysqld(atrt_process & proc);
+
 /**
  * check configuration if any changes has been 
  *   done for the duration of the latest running test

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	revid:mikael.ronstrom@stripped
@@ -12,7 +12,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
-max-time: 900
+max-time: 1800
 cmd: testIndex
 args: -n DeferredError
 
@@ -20,7 +20,7 @@ max-time: 900
 cmd: testIndex
 args: -n DeferredMixedLoad T1 T6 T13
 
-max-time: 1800
+max-time: 2000
 cmd: testIndex
 args: -n DeferredMixedLoadError T1 T6 T13
 
@@ -28,7 +28,7 @@ max-time: 900
 cmd: testIndex
 args: -n NF_DeferredMixed T1 T6 T13
 
-max-time: 900
+max-time: 1800
 cmd: testIndex
 args: -n NF_Mixed T1 T6 T13
 
@@ -1751,7 +1751,7 @@ max-time: 300
 cmd: testBasic
 args: -n UnlockUpdateBatch T3
 
-max-time: 300
+max-time: 600
 cmd: testNodeRestart
 args: -n MixReadUnlockRestart T1
 

=== modified file 'storage/ndb/test/run-test/daily-perf-tests.txt'
--- a/storage/ndb/test/run-test/daily-perf-tests.txt	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/daily-perf-tests.txt	revid:mikael.ronstrom@stripped
@@ -79,62 +79,72 @@ type: bench
 
 # sql
 max-time: 600
-client: ndb-sql-perf-create-table.sh
-args: t1
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-create-table.sh t1
+cmd-type: mysql
 
 max-time: 600
-client: ndb-sql-perf-select.sh
-args: t1 1 64
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-select.sh t1 1 64
 mysqld: --ndb-cluster-connection-pool=1
 type: bench
+cmd-type: mysql
 
 max-time: 600
-client: ndb-sql-perf-select.sh
-args: t1 1 64
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-select.sh t1 1 64
 mysqld: --ndb-cluster-connection-pool=4
 type: bench
+cmd-type: mysql
 
 max-time: 600
-client: ndb-sql-perf-update.sh
-args: t1 1 64
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-update.sh t1 1 64
 mysqld: --ndb-cluster-connection-pool=1
 type: bench
+cmd-type: mysql
 
 max-time: 600
-client: ndb-sql-perf-update.sh
-args: t1 1 64
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-update.sh t1 1 64
 mysqld: --ndb-cluster-connection-pool=4
 type: bench
+cmd-type: mysql
 
 max-time: 600
-client: ndb-sql-perf-drop-table.sh
-args: t1
-mysqld:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-drop-table.sh t1
+cmd-type: mysql
 
 # sql join
 max-time: 600
-client: ndb-sql-perf-load-tpcw.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-load-tpcw.sh
+cmd-type: mysql
 
 max-time: 600
-client: ndb-sql-perf-tpcw-getBestSeller.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-tpcw-getBestSeller.sh
+cmd-type: mysql
 type: bench
 
 max-time: 600
-client: ndb-sql-perf-drop-tpcw.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-drop-tpcw.sh
+cmd-type: mysql
 
 max-time: 600
-client: ndb-sql-perf-load-music-store.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-load-music-store.sh
+cmd-type: mysql
 
 max-time: 600
-client: ndb-sql-perf-select-music-store.sh
-args:
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-select-music-store.sh
+cmd-type: mysql
 type: bench
 
 max-time: 600
-client: ndb-sql-perf-drop-music-store.sh
-args:
-
+cmd: ndb-sql-perf.sh
+args: ndb-sql-perf-drop-music-store.sh
+cmd-type: mysql

=== modified file 'storage/ndb/test/run-test/db.cpp'
--- a/storage/ndb/test/run-test/db.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/db.cpp	revid:mikael.ronstrom@stripped
@@ -18,7 +18,6 @@
 #include "atrt.hpp"
 #include <NdbSleep.h>
 
-static bool connect_mysqld(atrt_process* proc);
 static bool populate_db(atrt_config&, atrt_process*);
 static bool setup_repl(atrt_config&);
 
@@ -132,7 +131,7 @@ setup_db(atrt_config& config)
     atrt_process * proc = config.m_processes[i];
     if (proc->m_type == atrt_process::AP_MYSQLD)
     {
-      if (!connect_mysqld(config.m_processes[i]))
+      if (!connect_mysqld(* config.m_processes[i]))
 	return false;
     }
   }    
@@ -177,16 +176,16 @@ find(atrt_process* proc, const char * ke
 }
 
 bool
-connect_mysqld(atrt_process* proc)
+connect_mysqld(atrt_process& proc)
 {
-  if ( !mysql_init(&proc->m_mysql))
+  if ( !mysql_init(&proc.m_mysql))
   {
     g_logger.error("Failed to init mysql");
     return false;
   }
 
-  const char * port = find(proc, "--port=");
-  const char * socket = find(proc, "--socket=");
+  const char * port = find(&proc, "--port=");
+  const char * socket = find(&proc, "--socket=");
   if (port == 0 && socket == 0)
   {
     g_logger.error("Neither socket nor port specified...cant connect to mysql");
@@ -198,10 +197,10 @@ connect_mysqld(atrt_process* proc)
     if (port)
     {
       mysql_protocol_type val = MYSQL_PROTOCOL_TCP;
-      mysql_options(&proc->m_mysql, MYSQL_OPT_PROTOCOL, &val);
+      mysql_options(&proc.m_mysql, MYSQL_OPT_PROTOCOL, &val);
     }
-    if (mysql_real_connect(&proc->m_mysql,
-			   proc->m_host->m_hostname.c_str(),
+    if (mysql_real_connect(&proc.m_mysql,
+			   proc.m_host->m_hostname.c_str(),
 			   "root", "", "test",
 			   port ? atoi(port) : 0,
 			   socket,
@@ -210,17 +209,24 @@ connect_mysqld(atrt_process* proc)
       return true;
     }
     g_logger.info("Retrying connect to %s:%u 3s",
-		  proc->m_host->m_hostname.c_str(),atoi(port));
-    NdbSleep_SecSleep(3); 
+		  proc.m_host->m_hostname.c_str(),atoi(port));
+    NdbSleep_SecSleep(3);
   }
-  
+
   g_logger.error("Failed to connect to mysqld err: >%s< >%s:%u:%s<",
-		 mysql_error(&proc->m_mysql),
-		 proc->m_host->m_hostname.c_str(), port ? atoi(port) : 0,
+		 mysql_error(&proc.m_mysql),
+		 proc.m_host->m_hostname.c_str(), port ? atoi(port) : 0,
 		 socket ? socket : "<null>");
   return false;
 }
 
+bool
+disconnect_mysqld(atrt_process& proc)
+{
+  mysql_close(&proc.m_mysql);
+  return true;
+}
+
 void
 BINDI(MYSQL_BIND& bind, int * i)
 {

=== modified file 'storage/ndb/test/run-test/main.cpp'
--- a/storage/ndb/test/run-test/main.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/main.cpp	revid:mikael.ronstrom@stripped
@@ -422,17 +422,17 @@ main(int argc, char ** argv)
     if(!read_test_case(g_test_case_file, test_case, lineno))
       goto end;
     
-    g_logger.info("#%d - %s %s", 
+    g_logger.info("#%d - %s",
 		  test_no,
-		  test_case.m_command.c_str(), test_case.m_args.c_str());
-    
+		  test_case.m_name.c_str());
+
     // Assign processes to programs
-    if(!setup_test_case(g_config, test_case))
+    if (!setup_test_case(g_config, test_case))
     {
       g_logger.critical("Failed to setup test case");
       goto end;
     }
-    
+
     if(!start_processes(g_config, p_clients))
     {
       g_logger.critical("Failed to start client processes");
@@ -1091,6 +1091,11 @@ stop_process(atrt_process & proc){
     return true;
   }
 
+  if (proc.m_type == atrt_process::AP_MYSQLD)
+  {
+    disconnect_mysqld(proc);
+  }
+
   {
     Properties reply;
     if(proc.m_host->m_cpcd->stop_process(proc.m_proc.m_id, reply) != 0){
@@ -1248,24 +1253,24 @@ read_test_case(FILE * file, atrt_testcas
       tmp.trim(" \t\n\r");
       Vector<BaseString> split;
       tmp.split(split, " ", 2);
-      tc.m_command = split[0];
+      tc.m_cmd.m_exe = split[0];
       if(split.size() == 2)
-	tc.m_args = split[1];
+	tc.m_cmd.m_args = split[1];
       else
-	tc.m_args = "";
+	tc.m_cmd.m_args = "";
       tc.m_max_time = 60000;
       return true;
     }
     return false;
   }
 
-  if(!p.get("cmd", tc.m_command)){
+  if(!p.get("cmd", tc.m_cmd.m_exe)){
     g_logger.critical("Invalid test file: cmd is missing near line: %d", line);
     return false;
   }
   
-  if(!p.get("args", tc.m_args))
-    tc.m_args = "";
+  if(!p.get("args", tc.m_cmd.m_args))
+    tc.m_cmd.m_args = "";
 
   const char * mt = 0;
   if(!p.get("max-time", &mt))
@@ -1283,17 +1288,33 @@ read_test_case(FILE * file, atrt_testcas
   else
     tc.m_run_all= false;
 
+  const char * str;
+  if (p.get("mysqld", &str))
+  {
+    tc.m_mysqld_options.assign(str);
+  }
+  else
+  {
+    tc.m_mysqld_options.assign("");
+  }
+
+  tc.m_cmd.m_cmd_type = atrt_process::AP_NDB_API;
+  if (p.get("cmd-type", &str) && strcmp(str, "mysql") == 0)
+  {
+    tc.m_cmd.m_cmd_type = atrt_process::AP_CLIENT;
+  }
+
   if (!p.get("name", &mt))
   {
     tc.m_name.assfmt("%s %s", 
-		     tc.m_command.c_str(),
-		     tc.m_args.c_str());
+		     tc.m_cmd.m_exe.c_str(),
+		     tc.m_cmd.m_args.c_str());
   }
   else
   {
     tc.m_name.assign(mt);
   }
-  
+
   return true;
 }
 
@@ -1306,46 +1327,87 @@ setup_test_case(atrt_config& config, con
     return false;
   }
 
-  size_t i = 0;
-  for(; i<config.m_processes.size(); i++)
+  for (size_t i = 0; i<config.m_processes.size(); i++)
   {
-    atrt_process & proc = *config.m_processes[i]; 
-    if(proc.m_type == atrt_process::AP_NDB_API || 
-       proc.m_type == atrt_process::AP_CLIENT)
+    atrt_process & proc = *config.m_processes[i];
+    if (proc.m_type == atrt_process::AP_NDB_API ||
+        proc.m_type == atrt_process::AP_CLIENT)
     {
-      BaseString cmd;
-      char * p = find_bin_path(tc.m_command.c_str());
-      if (p == 0)
-      {
-        g_logger.critical("Failed to locate '%s'", tc.m_command.c_str());
-        return false;
-      }
-      cmd.assign(p);
-      free(p);
+      proc.m_proc.m_path.assign("");
+      proc.m_proc.m_args.assign("");
+    }
+  }
 
+  BaseString cmd;
+  char * p = find_bin_path(tc.m_cmd.m_exe.c_str());
+  if (p == 0)
+  {
+    g_logger.critical("Failed to locate '%s'", tc.m_cmd.m_exe.c_str());
+    return false;
+  }
+  cmd.assign(p);
+  free(p);
+
+  for (size_t i = 0; i<config.m_processes.size(); i++)
+  {
+    atrt_process & proc = *config.m_processes[i];
+    if (proc.m_type == tc.m_cmd.m_cmd_type &&
+        proc.m_proc.m_path == "")
+    {
+      proc.m_save.m_proc = proc.m_proc;
+      proc.m_save.m_saved = true;
+
+      proc.m_proc.m_env.appfmt(" ATRT_TIMEOUT=%ld", tc.m_max_time);
       if (0) // valgrind
       {
         proc.m_proc.m_path = "/usr/bin/valgrind";
-        proc.m_proc.m_args.appfmt("%s %s", cmd.c_str(), tc.m_args.c_str());
+        proc.m_proc.m_args.appfmt("%s %s", cmd.c_str(),
+                                  tc.m_cmd.m_args.c_str());
       }
       else
       {
         proc.m_proc.m_path = cmd;
-        proc.m_proc.m_args.assign(tc.m_args);
+        proc.m_proc.m_args.assign(tc.m_cmd.m_args.c_str());
       }
-      if(!tc.m_run_all)
+      if (!tc.m_run_all)
         break;
     }
   }
-  for(i++; i<config.m_processes.size(); i++){
-    atrt_process & proc = *config.m_processes[i]; 
-    if(proc.m_type == atrt_process::AP_NDB_API || 
-       proc.m_type == atrt_process::AP_CLIENT)
+
+  if (tc.m_mysqld_options != "")
+  {
+    g_logger.info("restarting mysqld with extra options: %s",
+                  tc.m_mysqld_options.c_str());
+
+    /**
+     * Apply testcase specific mysqld options
+     */
+    for (size_t i = 0; i<config.m_processes.size(); i++)
     {
-      proc.m_proc.m_path.assign("");
-      proc.m_proc.m_args.assign("");
+      atrt_process & proc = *config.m_processes[i];
+      if (proc.m_type == atrt_process::AP_MYSQLD)
+      {
+        proc.m_save.m_proc = proc.m_proc;
+        proc.m_save.m_saved = true;
+        proc.m_proc.m_args.appfmt(" %s", tc.m_mysqld_options.c_str());
+        if (!stop_process(proc))
+        {
+          return false;
+        }
+
+        if (!start_process(proc))
+        {
+          return false;
+        }
+
+        if (!connect_mysqld(proc))
+        {
+          return false;
+        }
+      }
     }
   }
+
   return true;
 }
 
@@ -1562,10 +1624,14 @@ reset_config(atrt_config & config)
     atrt_process & proc = *config.m_processes[i]; 
     if (proc.m_save.m_saved)
     {
-      if (!stop_process(proc))
-        return false;
-      
-      changed = true;
+      if (proc.m_proc.m_status == "running")
+      {
+        if (!stop_process(proc))
+          return false;
+
+        changed = true;
+      }
+
       proc.m_save.m_saved = false;
       proc.m_proc = proc.m_save.m_proc;
       proc.m_proc.m_id = -1;

=== modified file 'storage/ndb/test/run-test/test-tests.txt'
--- a/storage/ndb/test/run-test/test-tests.txt	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/run-test/test-tests.txt	revid:mikael.ronstrom@stripped
@@ -1,4 +1,17 @@
 max-time: 600
 cmd: testBasic
 args: -n PkRead T1
+type: bench
+
+max-time: 60
+cmd: mysql
+args: -u root information_schema -e show\ tables
+cmd-type: mysql
+type: bench
+mysqld: --ndb-cluster-connection-pool=4
+
+max-time: 6000
+cmd: testBasic
+args: -n PkRead T1
+type: bench
 

=== modified file 'storage/ndb/test/src/NDBT_Test.cpp'
--- a/storage/ndb/test/src/NDBT_Test.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/src/NDBT_Test.cpp	revid:mikael.ronstrom@stripped
@@ -20,6 +20,7 @@
 
 #include "NDBT.hpp"
 #include "NDBT_Test.hpp"
+#include <portlib/NdbEnv.h>
 
 static int opt_stop_on_error = 0;
 
@@ -34,9 +35,10 @@ NDBT_Context::NDBT_Context(Ndb_cluster_c
   stopped = false;
   propertyMutexPtr = NdbMutex_Create();
   propertyCondPtr = NdbCondition_Create();
+  m_env_timeout = 0;
+  m_test_start_time = NdbTick_CurrentMillisecond();
 }
 
-
 NDBT_Context::~NDBT_Context(){
   NdbCondition_Destroy(propertyCondPtr);
   NdbMutex_Destroy(propertyMutexPtr);
@@ -318,7 +320,6 @@ NDBT_Step::tearDown(){
   m_ndb = NULL;
 }
 
-
 Ndb* NDBT_Step::getNdb() const {
   assert(m_ndb != NULL);
   return m_ndb;
@@ -970,8 +971,8 @@ NDBT_TestSuite::executeOneCtx(Ndb_cluste
       if (opt_stop_on_error != 0 && numTestsFail > 0)
         break;
 
-      if (_testname != NULL && 
-	      strcasecmp(tests[t]->getName(), _testname) != 0)
+      if (_testname != NULL &&
+          strcasecmp(tests[t]->getName(), _testname) != 0)
         continue;
 
       tests[t]->initBeforeTest();
@@ -1648,6 +1649,42 @@ NDBT_Context::sync_up_and_wait(const cha
   getPropertyWait(key, (unsigned)0);
 }
 
+bool
+NDBT_Context::closeToTimeout(int safety)
+{
+  if (safety == 0)
+    return false;
+
+  if (m_env_timeout == 0)
+  {
+    char buf[1024];
+    const char * p = NdbEnv_GetEnv("ATRT_TIMEOUT", buf, sizeof(buf));
+    if (p)
+    {
+      m_env_timeout = atoi(p);
+      ndbout_c("FOUND ATRT_TIMEOUT: %d", m_env_timeout);
+    }
+    else
+    {
+      m_env_timeout = -1;
+    }
+  }
+
+  if (m_env_timeout < 0)
+    return false;
+
+  Uint64 to = (1000 * m_env_timeout * (100 - safety)) / 100;
+  Uint64 now = NdbTick_CurrentMillisecond();
+  if (now >= m_test_start_time + to)
+  {
+    ndbout_c("closeToTimeout(%d) => true env(timeout): %d",
+             safety, m_env_timeout);
+    return true;
+  }
+
+  return false;
+}
+
 template class Vector<NDBT_TestCase*>;
 template class Vector<NDBT_TestCaseResult*>;
 template class Vector<NDBT_Step*>;

=== modified file 'storage/ndb/test/src/NdbRestarts.cpp'
--- a/storage/ndb/test/src/NdbRestarts.cpp	revid:mikael@dator9-20111215085458-u8nu3tgg2vn5xarq
+++ b/storage/ndb/test/src/NdbRestarts.cpp	revid:mikael.ronstrom@stripped
@@ -24,7 +24,7 @@
 #include <NdbEnv.h>
 #include <NDBT_Test.hpp>
 
-#define F_ARGS NDBT_Context* ctx, NdbRestarter& _restarter, const NdbRestarts::NdbRestart* _restart
+#define F_ARGS NDBT_Context* ctx, NdbRestarter& _restarter, const NdbRestarts::NdbRestart* _restart, int
 
 int restartRandomNodeGraceful(F_ARGS);
 int restartRandomNodeAbort(F_ARGS);
@@ -242,7 +242,8 @@ const NdbRestarts::NdbRestart* NdbRestar
 
 int NdbRestarts::executeRestart(NDBT_Context* ctx,
                                 const NdbRestarts::NdbRestart* _restart,
-				unsigned int _timeout){
+				unsigned int _timeout,
+                                int safety){
   // Check that there are enough nodes in the cluster
   // for this test
   NdbRestarter restarter;
@@ -258,7 +259,7 @@ int NdbRestarts::executeRestart(NDBT_Con
     return NDBT_FAILED;
   }
   
-  int res = _restart->m_restartFunc(ctx, restarter, _restart);
+  int res = _restart->m_restartFunc(ctx, restarter, _restart, safety);
 
   // Sleep a little waiting for nodes to react to command
   NdbSleep_SecSleep(2);
@@ -280,23 +281,25 @@ int NdbRestarts::executeRestart(NDBT_Con
 
 int NdbRestarts::executeRestart(NDBT_Context* ctx,
                                 int _num,
-				unsigned int _timeout){
+				unsigned int _timeout,
+                                int safety){
   const NdbRestarts::NdbRestart* r = getRestart(_num);
   if (r == NULL)
     return NDBT_FAILED;
 
-  int res = executeRestart(ctx, r, _timeout);
+  int res = executeRestart(ctx, r, _timeout, safety);
   return res;
 }
 
 int NdbRestarts::executeRestart(NDBT_Context* ctx,
                                 const char* _name,
-				unsigned int _timeout){
+				unsigned int _timeout,
+                                int safety){
   const NdbRestarts::NdbRestart* r = getRestart(_name);
   if (r == NULL)
     return NDBT_FAILED;
 
-  int res = executeRestart(ctx, r, _timeout);
+  int res = executeRestart(ctx, r, _timeout, safety);
   return res;
 }
 
@@ -658,12 +661,12 @@ NFDuringNR_codes[] = {
   5002
 };
 
-int restartNFDuringNR(F_ARGS){
+int restartNFDuringNR(F_ARGS safety){
 
   myRandom48Init((long)NdbTick_CurrentMillisecond());
   int i;
   const int sz = sizeof(NFDuringNR_codes)/sizeof(NFDuringNR_codes[0]);
-  for(i = 0; i<sz; i++){
+  for(i = 0; i<sz && !ctx->closeToTimeout(safety); i++){
     int randomId = myRandom48(_restarter.getNumDbNodes());
     int nodeId = _restarter.getDbNodeId(randomId);
     int error = NFDuringNR_codes[i];
@@ -708,7 +711,7 @@ int restartNFDuringNR(F_ARGS){
   if(NdbEnv_GetEnv("USER", buf, 256) == 0 || strcmp(buf, "ejonore") != 0)
     return NDBT_OK;
   
-  for(i = 0; i<sz && !ctx->isTestStopped(); i++){
+  for(i = 0; i<sz && !ctx->isTestStopped() && !ctx->closeToTimeout(safety);i++){
     const int randomId = myRandom48(_restarter.getNumDbNodes());
     int nodeId = _restarter.getDbNodeId(randomId);
     const int error = NFDuringNR_codes[i];
@@ -786,7 +789,7 @@ NRDuringLCP_NonMaster_codes[] = {
   7018  // Crash in !master when changing state to LCP_TAB_SAVED
 };
 
-int restartNodeDuringLCP(F_ARGS) {
+int restartNodeDuringLCP(F_ARGS safety) {
   int i;
   // Master
   int val = DumpStateOrd::DihMinTimeBetweenLCP;
@@ -794,8 +797,8 @@ int restartNodeDuringLCP(F_ARGS) {
 	"Failed to set LCP to min value"); // Set LCP to min val
   int sz = sizeof(NRDuringLCP_Master_codes)/
            sizeof(NRDuringLCP_Master_codes[0]);
-  for(i = 0; i<sz; i++) {
-
+  for(i = 0; i<sz && !ctx->closeToTimeout(safety); i++)
+  {
     int error = NRDuringLCP_Master_codes[i];
     int masterNodeId = _restarter.getMasterNodeId();
 
@@ -832,7 +835,7 @@ int restartNodeDuringLCP(F_ARGS) {
   // NON-Master
   sz = sizeof(NRDuringLCP_NonMaster_codes)/
        sizeof(NRDuringLCP_NonMaster_codes[0]);
-  for(i = 0; i<sz; i++) {
+  for(i = 0; i<sz && !ctx->closeToTimeout(safety) ; i++) {
 
     int error = NRDuringLCP_NonMaster_codes[i];
     int nodeId = getRandomNodeId(_restarter);

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3660) Mikael Ronstrom21 Dec