List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:November 10 2011 2:24pm
Subject:bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4330 to 4331)
View as plain text  
 4331 Jonas Oreland	2011-11-10 [merge]
      ndb - merge 70 to 71

    added:
      sql/ndb_component.cc
      sql/ndb_component.h
      sql/ndb_util_thread.h
    modified:
      libmysqld/Makefile.am
      mysql-test/suite/ndb/r/ndb_index_stat.result
      sql/Makefile.am
      sql/ha_ndb_index_stat.cc
      sql/ha_ndb_index_stat.h
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      storage/ndb/CMakeLists.txt
      storage/ndb/include/kernel/signaldata/ScanFrag.hpp
      storage/ndb/include/kernel/signaldata/TupKey.hpp
      storage/ndb/include/ndbapi/NdbReceiver.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
      storage/ndb/src/ndbapi/NdbRecord.hpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
      storage/ndb/test/ndbapi/flexAsynch.cpp
 4330 jonas oreland	2011-11-08
      ndb - bump version to 7.1.18

    modified:
      configure.in
      storage/ndb/ndb_configure.m4
=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am	2011-09-08 06:22:07 +0000
+++ b/libmysqld/Makefile.am	2011-11-10 14:23:53 +0000
@@ -49,7 +49,8 @@ sqlsources = derror.cc field.cc field_co
 	     ha_ndbcluster.cc ha_ndbcluster_cond.cc \
 	ha_ndbcluster_connection.cc ha_ndbinfo.cc \
 	ha_ndb_index_stat.cc \
-	ha_ndbcluster_binlog.cc ndb_conflict_trans.cc ha_partition.cc \
+	ha_ndbcluster_binlog.cc ndb_conflict_trans.cc ndb_component.cc \
+        ha_partition.cc \
 	handler.cc sql_handler.cc \
 	hostname.cc init.cc password.c \
 	item.cc item_buff.cc item_cmpfunc.cc item_create.cc \

=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result	2011-10-08 16:56:43 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result	2011-11-09 08:27:32 +0000
@@ -21,18 +21,18 @@ Variable_name	Value
 ndb_index_stat_enable	ON
 show global variables like 'ndb_index_stat_option';
 Variable_name	Value
-ndb_index_stat_option	loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+ndb_index_stat_option	loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90,zero_total=0
 set @save_option = @@global.ndb_index_stat_option;
 set @@global.ndb_index_stat_option = 'loop_idle=3333,cache_limit=44M';
 set @@global.ndb_index_stat_option = 'cache_lowpct=85,evict_delay=55';
 set @@global.ndb_index_stat_option = 'check_delay=234s';
 show global variables like 'ndb_index_stat_option';
 Variable_name	Value
-ndb_index_stat_option	loop_enable=1000ms,loop_idle=3333ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=234s,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=55s,cache_limit=44M,cache_lowpct=85
+ndb_index_stat_option	loop_enable=1000ms,loop_idle=3333ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=234s,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=55s,cache_limit=44M,cache_lowpct=85,zero_total=0
 set @@global.ndb_index_stat_option = @save_option;
 show global variables like 'ndb_index_stat_option';
 Variable_name	Value
-ndb_index_stat_option	loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+ndb_index_stat_option	loop_enable=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=8,check_delay=10m,delete_batch=8,clean_delay=1m,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90,zero_total=0
 create table t1 (
 a1 int unsigned not null,
 b1 int unsigned not null,

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2011-09-08 06:22:07 +0000
+++ b/sql/Makefile.am	2011-11-10 14:23:53 +0000
@@ -64,6 +64,8 @@ noinst_HEADERS =	item.h item_func.h item
 			ha_ndb_index_stat.h \
                         ndb_mi.h \
 			ndb_conflict_trans.h \
+                        ndb_component.h \
+                        ndb_util_thread.h \
 			ha_partition.h rpl_constants.h \
 			debug_sync.h \
 			opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
@@ -141,7 +143,8 @@ libndb_la_SOURCES=	ha_ndbcluster.cc \
 			ha_ndb_index_stat.cc \
 			ha_ndbinfo.cc \
 			ndb_mi.cc \
-			ndb_conflict_trans.cc
+			ndb_conflict_trans.cc \
+                        ndb_component.cc
 
 gen_lex_hash_SOURCES =	gen_lex_hash.cc
 gen_lex_hash_LDFLAGS =  @NOINST_LDFLAGS@

=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc	2011-10-20 16:18:28 +0000
+++ b/sql/ha_ndb_index_stat.cc	2011-11-10 10:35:09 +0000
@@ -24,6 +24,16 @@
 #include <mysql/plugin.h>
 #include <ctype.h>
 
+/* from other files */
+extern struct st_ndb_status g_ndb_status;
+extern pthread_mutex_t ndbcluster_mutex;
+
+/* these have to live in ha_ndbcluster.cc */
+extern bool ndb_index_stat_get_enable(THD *thd);
+extern const char* g_ndb_status_index_stat_status;
+extern long g_ndb_status_index_stat_cache_query;
+extern long g_ndb_status_index_stat_cache_clean;
+
 // Do we have waiter...
 static bool ndb_index_stat_waiter= false;
 
@@ -40,6 +50,28 @@ set_thd_ndb(THD *thd, Thd_ndb *thd_ndb)
 typedef NdbDictionary::Table NDBTAB;
 typedef NdbDictionary::Index NDBINDEX;
 
+/** ndb_index_stat_thread */
+Ndb_index_stat_thread::Ndb_index_stat_thread()
+  : running(-1)
+{
+  pthread_mutex_init(&LOCK, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND, NULL);
+  pthread_cond_init(&COND_ready, NULL);
+  pthread_mutex_init(&list_mutex, MY_MUTEX_INIT_FAST);
+  pthread_mutex_init(&stat_mutex, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&stat_cond, NULL);
+}
+
+Ndb_index_stat_thread::~Ndb_index_stat_thread()
+{
+  pthread_mutex_destroy(&LOCK);
+  pthread_cond_destroy(&COND);
+  pthread_cond_destroy(&COND_ready);
+  pthread_mutex_destroy(&list_mutex);
+  pthread_mutex_destroy(&stat_mutex);
+  pthread_cond_destroy(&stat_cond);
+}
+
 struct Ndb_index_stat {
   enum {
     LT_Undef= 0,
@@ -75,6 +107,7 @@ struct Ndb_index_stat {
   struct Ndb_index_stat *list_next;
   struct Ndb_index_stat *list_prev;
   struct NDB_SHARE *share;
+  bool to_delete;       /* detached from share and marked for delete */
   Ndb_index_stat();
 };
 
@@ -134,7 +167,8 @@ struct Ndb_index_stat_opt {
     Umsec = 4
   };
   enum Flag {
-    Freadonly = (1 << 0)
+    Freadonly = (1 << 0),
+    Fcontrol = (1 << 1)
   };
   struct Val {
     const char* name;
@@ -161,7 +195,8 @@ struct Ndb_index_stat_opt {
     Ievict_delay = 13,
     Icache_limit = 14,
     Icache_lowpct = 15,
-    Imax = 16
+    Izero_total = 16,
+    Imax = 17
   };
   Val val[Imax];
   /* Options in string format (SYSVAR ndb_index_stat_option) */
@@ -171,6 +206,10 @@ struct Ndb_index_stat_opt {
     assert(i < Imax);
     return val[i].val;
   }
+  void set(Idx i, uint the_val) {
+    assert(i < Imax);
+    val[i].val = the_val;
+  }
 };
 
 Ndb_index_stat_opt::Ndb_index_stat_opt(char* buf) :
@@ -197,8 +236,9 @@ Ndb_index_stat_opt::Ndb_index_stat_opt(c
   ival(error_delay, 60, 0, ~0, Utime, 0);
   ival(evict_batch, 8, 1, ~0, Usize, 0);
   ival(evict_delay, 60, 0, ~0, Utime, 0);
-  ival(cache_limit, 32*1024*1024, 1024*1024, ~0, Usize, 0);
+  ival(cache_limit, 32*1024*1024, 0, ~0, Usize, 0);
   ival(cache_lowpct, 90, 0, 100, Usize, 0);
+  ival(zero_total, 0, 0, 1, Ubool, Fcontrol);
 #undef ival
 
   ndb_index_stat_opt2str(*this, option);
@@ -234,9 +274,9 @@ ndb_index_stat_opt2str(const Ndb_index_s
       {
         DBUG_ASSERT(v.val == 0 || v.val == 1);
         if (v.val == 0)
-          my_snprintf(ptr, sz, "%s%s=OFF", sep, v.name);
+          my_snprintf(ptr, sz, "%s%s=0", sep, v.name);
         else
-          my_snprintf(ptr, sz, "%s%s=ON", sep, v.name);
+          my_snprintf(ptr, sz, "%s%s=1", sep, v.name);
       }
       break;
 
@@ -308,12 +348,14 @@ ndb_index_stat_option_parse(char* p, Ndb
   if (*r == 0)
     DBUG_RETURN(-1);
 
+  bool found= false;
   const uint imax= Ndb_index_stat_opt::Imax;
   for (uint i= 0; i < imax; i++)
   {
     Ndb_index_stat_opt::Val& v= opt.val[i];
     if (strcmp(p, v.name) != 0)
       continue;
+    found= true;
 
     char *s;
     for (s= r; *s != 0; s++)
@@ -400,6 +442,9 @@ ndb_index_stat_option_parse(char* p, Ndb
       break;
     }
   }
+
+  if (!found)
+    DBUG_RETURN(-1);
   DBUG_RETURN(0);
 }
 
@@ -503,15 +548,23 @@ struct Ndb_index_stat_glob {
   uint wait_update;
   uint no_stats;
   uint wait_stats;
+  /* Accumulating counters */
+  uint analyze_count;
+  uint analyze_error;
+  uint query_count;
+  uint query_no_stats;
+  uint query_error;
   uint event_ok;          /* Events received for known index */
   uint event_miss;        /* Events received for unknown index */
-  char status[2][512];
-  uint status_i;
+  /* Cache */
   uint cache_query_bytes; /* In use */
   uint cache_clean_bytes; /* Obsolete versions not yet removed */
+  char status[2][512];
+  uint status_i;
 
   Ndb_index_stat_glob();
   void set_status();
+  void zero_total();
 };
 
 Ndb_index_stat_glob::Ndb_index_stat_glob()
@@ -524,12 +577,17 @@ Ndb_index_stat_glob::Ndb_index_stat_glob
   wait_update= 0;
   no_stats= 0;
   wait_stats= 0;
+  analyze_count= 0;
+  analyze_error= 0;
+  query_count= 0;
+  query_no_stats= 0;
+  query_error= 0;
   event_ok= 0;
   event_miss= 0;
-  memset(status, 0, sizeof(status));
-  status_i= 0;
   cache_query_bytes= 0;
   cache_clean_bytes= 0;
+  memset(status, 0, sizeof(status));
+  status_i= 0;
 }
 
 /* Update status variable (must hold stat_mutex) */
@@ -541,7 +599,7 @@ Ndb_index_stat_glob::set_status()
 
   // stats thread
   th_allow= ndb_index_stat_allow();
-  sprintf(p, "allow:%d,enable:%d,busy:%d,loop:%ums",
+  sprintf(p, "allow:%d,enable:%d,busy:%d,loop:%u",
              th_allow, th_enable, th_busy, th_loop);
   p+= strlen(p);
 
@@ -562,11 +620,19 @@ Ndb_index_stat_glob::set_status()
   // special counters
   sprintf(p, ",analyze:(queue:%u,wait:%u)", force_update, wait_update);
   p+= strlen(p);
-  sprintf(p, ",stats:(none:%u,wait:%u)", no_stats, wait_stats);
+  sprintf(p, ",stats:(nostats:%u,wait:%u)", no_stats, wait_stats);
   p+= strlen(p);
 
-  // events
-  sprintf(p, ",events:(ok:%u,miss:%u)", event_ok, event_miss);
+  // accumulating counters
+  sprintf(p, ",total:(");
+  p+= strlen(p);
+  sprintf(p, "analyze:(all:%u,error:%u)", analyze_count, analyze_error);
+  p+= strlen(p);
+  sprintf(p, ",query:(all:%u,nostats:%u,error:%u)", query_count, query_no_stats, query_error);
+  p+= strlen(p);
+  sprintf(p, ",event:(ok:%u,miss:%u)", event_ok, event_miss);
+  p+= strlen(p);
+  sprintf(p, ")");
   p+= strlen(p);
 
   // cache size
@@ -575,7 +641,7 @@ Ndb_index_stat_glob::set_status()
   double cache_pct= (double)0.0;
   if (cache_limit != 0)
     cache_pct= (double)100.0 * (double)cache_total / (double)cache_limit;
-  sprintf(p, ",cache:(query:%u,clean:%u,total:%.2f%%)",
+  sprintf(p, ",cache:(query:%u,clean:%u,totalpct:%.2f)",
              cache_query_bytes, cache_clean_bytes, cache_pct);
   p+= strlen(p);
 
@@ -588,6 +654,19 @@ Ndb_index_stat_glob::set_status()
   pthread_mutex_unlock(&LOCK_global_system_variables);
 }
 
+/* Zero accumulating counters */
+void
+Ndb_index_stat_glob::zero_total()
+{
+  analyze_count= 0;
+  analyze_error= 0;
+  query_count= 0;
+  query_no_stats= 0;
+  query_error= 0;
+  event_ok= 0;
+  event_miss= 0;
+}
+
 Ndb_index_stat_glob ndb_index_stat_glob;
 
 /* Shared index entries */
@@ -616,6 +695,7 @@ Ndb_index_stat::Ndb_index_stat()
   list_next= 0;
   list_prev= 0;
   share= 0;
+  to_delete= false;
 }
 
 void
@@ -864,8 +944,8 @@ ndb_index_stat_get_share(NDB_SHARE *shar
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
 
   pthread_mutex_lock(&share->mutex);
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   time_t now= ndb_index_stat_time();
   err_out= 0;
 
@@ -902,17 +982,23 @@ ndb_index_stat_get_share(NDB_SHARE *shar
   }
   while (0);
 
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
   pthread_mutex_unlock(&share->mutex);
   return st;
 }
 
+/*
+  Prepare to delete index stat entry.  Remove it from per-share
+  list and set "to_delete" flag.  Stats thread does real delete.
+*/
+
 void
 ndb_index_stat_free(Ndb_index_stat *st)
 {
+  DBUG_ENTER("ndb_index_stat_free");
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   NDB_SHARE *share= st->share;
   assert(share != 0);
 
@@ -924,10 +1010,13 @@ ndb_index_stat_free(Ndb_index_stat *st)
   {
     if (st == st_loop)
     {
+      DBUG_PRINT("index_stat", ("st %s stat free one", st->id));
+      st->share_next= 0;
       st->share= 0;
       assert(st->lt != 0);
       assert(st->lt != Ndb_index_stat::LT_Delete);
-      ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+      assert(!st->to_delete);
+      st->to_delete= true;
       st_loop= st_loop->share_next;
       assert(!found);
       found++;
@@ -946,30 +1035,36 @@ ndb_index_stat_free(Ndb_index_stat *st)
   assert(found);
   share->index_stat_list= st_head;
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+  DBUG_VOID_RETURN;
 }
 
 void
 ndb_index_stat_free(NDB_SHARE *share)
 {
+  DBUG_ENTER("ndb_index_stat_free");
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   Ndb_index_stat *st;
   while ((st= share->index_stat_list) != 0)
   {
+    DBUG_PRINT("index_stat", ("st %s stat free all", st->id));
     share->index_stat_list= st->share_next;
+    st->share_next= 0;
     st->share= 0;
     assert(st->lt != 0);
     assert(st->lt != Ndb_index_stat::LT_Delete);
-    ndb_index_stat_list_move(st, Ndb_index_stat::LT_Delete);
+    assert(!st->to_delete);
+    st->to_delete= true;
   }
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
+  DBUG_VOID_RETURN;
 }
 
 /* Find entry across shares */
@@ -979,7 +1074,7 @@ ndb_index_stat_find_entry(int index_id,
 {
   DBUG_ENTER("ndb_index_stat_find_entry");
   pthread_mutex_lock(&ndbcluster_mutex);
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d",
                             index_id, index_version, table_id));
 
@@ -992,7 +1087,7 @@ ndb_index_stat_find_entry(int index_id,
       if (st->index_id == index_id &&
           st->index_version == index_version)
       {
-        pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+        pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
         pthread_mutex_unlock(&ndbcluster_mutex);
         DBUG_RETURN(st);
       }
@@ -1000,7 +1095,7 @@ ndb_index_stat_find_entry(int index_id,
     }
   }
 
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
   pthread_mutex_unlock(&ndbcluster_mutex);
   DBUG_RETURN(0);
 }
@@ -1074,7 +1169,7 @@ void
 ndb_index_stat_proc_new(Ndb_index_stat_proc &pr)
 {
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   const int lt= Ndb_index_stat::LT_New;
   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
 
@@ -1088,10 +1183,10 @@ ndb_index_stat_proc_new(Ndb_index_stat_p
     assert(pr.lt != lt);
     ndb_index_stat_list_move(st, pr.lt);
   }
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
 }
 
 void
@@ -1126,9 +1221,9 @@ ndb_index_stat_proc_update(Ndb_index_sta
     assert(pr.lt != lt);
     ndb_index_stat_list_move(st, pr.lt);
     // db op so update status after each
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     glob.set_status();
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
     cnt++;
   }
   if (cnt == batch)
@@ -1141,7 +1236,7 @@ ndb_index_stat_proc_read(Ndb_index_stat_
   NdbIndexStat::Head head;
   if (st->is->read_stat(pr.ndb) == -1)
   {
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     ndb_index_stat_error(st, "read_stat", __LINE__);
     const bool force_update= st->force_update;
     ndb_index_stat_force_update(st, false);
@@ -1158,12 +1253,12 @@ ndb_index_stat_proc_read(Ndb_index_stat_
       pr.lt= Ndb_index_stat::LT_Error;
     }
 
-    pthread_cond_broadcast(&ndb_index_stat_stat_cond);
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_cond_broadcast(&ndb_index_stat_thread.stat_cond);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
     return;
   }
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   pr.now= ndb_index_stat_time();
   st->is->get_head(head);
   st->load_time= head.m_loadTime;
@@ -1176,8 +1271,8 @@ ndb_index_stat_proc_read(Ndb_index_stat_
   ndb_index_stat_cache_move(st);
   st->cache_clean= false;
   pr.lt= Ndb_index_stat::LT_Idle;
-  pthread_cond_broadcast(&ndb_index_stat_stat_cond);
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_cond_broadcast(&ndb_index_stat_thread.stat_cond);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 void
@@ -1200,9 +1295,9 @@ ndb_index_stat_proc_read(Ndb_index_stat_
     assert(pr.lt != lt);
     ndb_index_stat_list_move(st, pr.lt);
     // db op so update status after each
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     glob.set_status();
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
     cnt++;
   }
   if (cnt == batch)
@@ -1221,9 +1316,15 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
     st->check_time == 0 ? 0 : st->check_time + check_delay - pr.now;
 
   DBUG_PRINT("index_stat", ("st %s check wait:%lds force update:%u"
-                            " clean wait:%lds cache clean:%d",
+                            " clean wait:%lds cache clean:%d to delete:%d",
                             st->id, (long)check_wait, st->force_update,
-                            (long)clean_wait, st->cache_clean));
+                            (long)clean_wait, st->cache_clean, st->to_delete));
+
+  if (st->to_delete)
+  {
+    pr.lt= Ndb_index_stat::LT_Delete;
+    return;
+  }
 
   if (!st->cache_clean && clean_wait <= 0)
   {
@@ -1260,7 +1361,7 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
   uint batch= opt.get(Ndb_index_stat_opt::Iidle_batch);
   {
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     const Ndb_index_stat_glob &glob= ndb_index_stat_glob;
     const int lt_update= Ndb_index_stat::LT_Update;
     const Ndb_index_stat_list &list_update= ndb_index_stat_list[lt_update];
@@ -1269,7 +1370,7 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
       // probably there is a force update waiting on Idle list
       batch= ~0;
     }
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   }
   // entry may be moved to end of this list
   if (batch > list.count)
@@ -1289,9 +1390,9 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
     cnt++;
   }
   // full batch does not set pr.busy
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 void
@@ -1348,9 +1449,9 @@ ndb_index_stat_proc_check(Ndb_index_stat
     assert(pr.lt != lt);
     ndb_index_stat_list_move(st, pr.lt);
     // db op so update status after each
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     glob.set_status();
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
     cnt++;
   }
   if (cnt == batch)
@@ -1384,9 +1485,9 @@ ndb_index_stat_proc_evict(Ndb_index_stat
   ndb_index_stat_cache_move(st);
   ndb_index_stat_cache_clean(st);
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 bool
@@ -1493,6 +1594,13 @@ ndb_index_stat_proc_delete(Ndb_index_sta
     Ndb_index_stat *st= st_loop;
     st_loop= st_loop->list_next;
     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
+
+    // adjust global counters at drop
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+    ndb_index_stat_force_update(st, false);
+    ndb_index_stat_no_stats(st, false);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+
     ndb_index_stat_proc_evict(pr, st);
     ndb_index_stat_list_remove(st);
     delete st->is;
@@ -1502,9 +1610,9 @@ ndb_index_stat_proc_delete(Ndb_index_sta
   if (cnt == batch)
     pr.busy= true;
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 void
@@ -1514,6 +1622,12 @@ ndb_index_stat_proc_error(Ndb_index_stat
   const int error_delay= opt.get(Ndb_index_stat_opt::Ierror_delay);
   const time_t error_wait= st->error_time + error_delay - pr.now;
 
+  if (st->to_delete)
+  {
+    pr.lt= Ndb_index_stat::LT_Delete;
+    return;
+  }
+
   if (error_wait <= 0 ||
       /* Analyze issued after previous error */
       st->force_update)
@@ -1558,9 +1672,9 @@ ndb_index_stat_proc_error(Ndb_index_stat
     cnt++;
   }
   // full batch does not set pr.busy
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 void
@@ -1633,13 +1747,67 @@ ndb_index_stat_proc_event(Ndb_index_stat
       glob.event_miss++;
     }
   }
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+}
+
+/* Control options */
+
+void
+ndb_index_stat_proc_control(Ndb_index_stat_proc &pr)
+{
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+  Ndb_index_stat_opt &opt= ndb_index_stat_opt;
+
+  /* Request to zero accumulating counters */
+  if (opt.get(Ndb_index_stat_opt::Izero_total) == true)
+  {
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
+    glob.zero_total();
+    glob.set_status();
+    opt.set(Ndb_index_stat_opt::Izero_total, false);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  }
 }
 
 #ifndef DBUG_OFF
 void
+ndb_index_stat_entry_verify(const Ndb_index_stat *st)
+{
+  const NDB_SHARE *share= st->share;
+  if (st->to_delete)
+  {
+    assert(st->share_next == 0);
+    assert(share == 0);
+  }
+  else
+  {
+    assert(share != 0);
+    const Ndb_index_stat *st2= share->index_stat_list;
+    assert(st2 != 0);
+    uint found= 0;
+    while (st2 != 0)
+    {
+      assert(st2->share == share);
+      const Ndb_index_stat *st3= st2->share_next;
+      uint guard= 0;
+      while (st3 != 0)
+      {
+        assert(st2 != st3);
+        guard++;
+        assert(guard <= 1000); // MAX_INDEXES
+        st3= st3->share_next;
+      }
+      if (st == st2)
+        found++;
+      st2= st2->share_next;
+    }
+    assert(found == 1);
+  }
+}
+
+void
 ndb_index_stat_list_verify(int lt)
 {
   const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
@@ -1684,6 +1852,7 @@ ndb_index_stat_list_verify(int lt)
       assert(guard <= list.count);
       st2= st2->list_next;
     }
+    ndb_index_stat_entry_verify(st);
     st= st->list_next;
   }
   assert(count == list.count);
@@ -1692,10 +1861,10 @@ ndb_index_stat_list_verify(int lt)
 void
 ndb_index_stat_list_verify()
 {
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   for (int lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
     ndb_index_stat_list_verify(lt);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
 }
 
 void
@@ -1717,6 +1886,9 @@ void
 ndb_index_stat_proc(Ndb_index_stat_proc &pr)
 {
   DBUG_ENTER("ndb_index_stat_proc");
+
+  ndb_index_stat_proc_control(pr);
+
 #ifndef DBUG_OFF
   ndb_index_stat_list_verify();
   Ndb_index_stat_glob old_glob= ndb_index_stat_glob;
@@ -1751,9 +1923,9 @@ ndb_index_stat_end()
    * in LT_Delete.  The first two steps here should be unnecessary.
    */
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   ndb_index_stat_allow(0);
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 
   int lt;
   for (lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
@@ -1889,14 +2061,13 @@ ndb_index_stat_stop_listener(Ndb_index_s
   DBUG_RETURN(0);
 }
 
-pthread_handler_t
-ndb_index_stat_thread_func(void *arg __attribute__((unused)))
+void
+Ndb_index_stat_thread::do_run()
 {
   THD *thd; /* needs to be first for thread_stack */
   struct timespec abstime;
   Thd_ndb *thd_ndb= NULL;
 
-  my_thread_init();
   DBUG_ENTER("ndb_index_stat_thread_func");
 
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
@@ -1906,18 +2077,16 @@ ndb_index_stat_thread_func(void *arg __a
   have_listener= false;
 
   // wl4124_todo remove useless stuff copied from utility thread
- 
-  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+
+  pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
 
   thd= new THD; /* note that contructor of THD uses DBUG_ */
   if (thd == NULL)
   {
     my_errno= HA_ERR_OUT_OF_MEM;
-    DBUG_RETURN(NULL);
+    DBUG_VOID_RETURN;
   }
   THD_CHECK_SENTRY(thd);
-  pthread_detach_this_thread();
-  ndb_index_stat_thread= pthread_self();
 
   thd->thread_stack= (char*)&thd; /* remember where our stack is */
   if (thd->store_globals())
@@ -1940,9 +2109,9 @@ ndb_index_stat_thread_func(void *arg __a
   thd->update_charset();
 
   /* Signal successful initialization */
-  ndb_index_stat_thread_running= 1;
-  pthread_cond_signal(&COND_ndb_index_stat_ready);
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  ndb_index_stat_thread.running= 1;
+  pthread_cond_signal(&ndb_index_stat_thread.COND_ready);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
 
   /*
     wait for mysql server to start
@@ -1956,7 +2125,7 @@ ndb_index_stat_thread_func(void *arg __a
     if (ndbcluster_terminating)
     {
       mysql_mutex_unlock(&LOCK_server_started);
-      pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+      pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
       goto ndb_index_stat_thread_end;
     }
   }
@@ -1965,21 +2134,21 @@ ndb_index_stat_thread_func(void *arg __a
   /*
     Wait for cluster to start
   */
-  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+  pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
   while (!g_ndb_status.cluster_node_id && (ndbcluster_hton->slot != ~(uint)0))
   {
     /* ndb not connected yet */
-    pthread_cond_wait(&COND_ndb_index_stat_thread, &LOCK_ndb_index_stat_thread);
+    pthread_cond_wait(&ndb_index_stat_thread.COND, &ndb_index_stat_thread.LOCK);
     if (ndbcluster_terminating)
       goto ndb_index_stat_thread_end;
   }
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
 
   /* Get instance used for sys objects check and create */
   if (!(pr.is_util= new NdbIndexStat))
   {
     sql_print_error("Could not allocate NdbIndexStat is_util object");
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     goto ndb_index_stat_thread_end;
   }
 
@@ -1987,7 +2156,7 @@ ndb_index_stat_thread_func(void *arg __a
   if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb(thd)))
   {
     sql_print_error("Could not allocate Thd_ndb object");
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     goto ndb_index_stat_thread_end;
   }
   set_thd_ndb(thd, thd_ndb);
@@ -1996,19 +2165,19 @@ ndb_index_stat_thread_func(void *arg __a
   {
     sql_print_error("Could not change index stats thd_ndb database to %s",
                     NDB_INDEX_STAT_DB);
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     goto ndb_index_stat_thread_end;
   }
   pr.ndb= thd_ndb->ndb;
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   ndb_index_stat_allow(1);
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 
   /* Fill in initial status variable */
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 
   bool enable_ok;
   enable_ok= false;
@@ -2016,10 +2185,10 @@ ndb_index_stat_thread_func(void *arg __a
   set_timespec(abstime, 0);
   for (;;)
   {
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     if (!ndbcluster_terminating && ndb_index_stat_waiter == false) {
-      int ret= pthread_cond_timedwait(&COND_ndb_index_stat_thread,
-                                      &LOCK_ndb_index_stat_thread,
+      int ret= pthread_cond_timedwait(&ndb_index_stat_thread.COND,
+                                      &ndb_index_stat_thread.LOCK,
                                       &abstime);
       const char* reason= ret == ETIMEDOUT ? "timed out" : "wake up";
       (void)reason; // USED
@@ -2028,7 +2197,7 @@ ndb_index_stat_thread_func(void *arg __a
     if (ndbcluster_terminating) /* Shutting down server */
       goto ndb_index_stat_thread_end;
     ndb_index_stat_waiter= false;
-    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
 
     /* const bool enable_ok_new= THDVAR(NULL, index_stat_enable); */
     const bool enable_ok_new= ndb_index_stat_get_enable(NULL);
@@ -2089,9 +2258,9 @@ ndb_index_stat_thread_func(void *arg __a
     glob.th_enable= enable_ok;
     glob.th_busy= pr.busy;
     glob.th_loop= msecs;
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     glob.set_status();
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   }
 
 ndb_index_stat_thread_end:
@@ -2117,15 +2286,12 @@ ndb_index_stat_thread_fail:
   delete thd;
   
   /* signal termination */
-  ndb_index_stat_thread_running= 0;
-  pthread_cond_signal(&COND_ndb_index_stat_ready);
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  ndb_index_stat_thread.running= 0;
+  pthread_cond_signal(&ndb_index_stat_thread.COND_ready);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
   DBUG_PRINT("exit", ("ndb_index_stat_thread"));
 
   DBUG_LEAVE;
-  my_thread_end();
-  pthread_exit(0);
-  return NULL;
 }
 
 /* Optimizer queries */
@@ -2151,7 +2317,7 @@ ndb_index_stat_wait(Ndb_index_stat *st,
   DBUG_ENTER("ndb_index_stat_wait");
 
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   int err= 0;
   uint count= 0;
   struct timespec abstime;
@@ -2161,9 +2327,15 @@ ndb_index_stat_wait(Ndb_index_stat *st,
     if (count == 0)
     {
       if (!from_analyze)
+      {
         glob.wait_stats++;
+        glob.query_count++;
+      }
       else
+      {
         glob.wait_update++;
+        glob.analyze_count++;
+      }
       if (st->lt == Ndb_index_stat::LT_Error && !from_analyze)
       {
         err= Ndb_index_stat_error_HAS_ERROR;
@@ -2175,25 +2347,30 @@ ndb_index_stat_wait(Ndb_index_stat *st,
     {
       /* Have detected no stats now or before */
       err= NdbIndexStat::NoIndexStats;
+      glob.query_no_stats++;
       break;
     }
     if (st->error.code != 0)
     {
       /* A new error has occured */
       err= st->error.code;
+      if (!from_analyze)
+        glob.query_error++;
+      else
+        glob.analyze_error++;
       break;
     }
     if (st->sample_version > sample_version)
       break;
     DBUG_PRINT("index_stat", ("st %s wait count:%u",
                               st->id, ++count));
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     ndb_index_stat_waiter= true;
-    pthread_cond_signal(&COND_ndb_index_stat_thread);
-    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    pthread_cond_signal(&ndb_index_stat_thread.COND);
+    pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
     set_timespec(abstime, 1);
-    ret= pthread_cond_timedwait(&ndb_index_stat_stat_cond,
-                                &ndb_index_stat_stat_mutex,
+    ret= pthread_cond_timedwait(&ndb_index_stat_thread.stat_cond,
+                                &ndb_index_stat_thread.stat_mutex,
                                 &abstime);
     if (ret != 0 && ret != ETIMEDOUT)
     {
@@ -2211,7 +2388,7 @@ ndb_index_stat_wait(Ndb_index_stat *st,
     assert(glob.wait_update != 0);
     glob.wait_update--;
   }
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   if (err != 0)
   {
     DBUG_PRINT("index_stat", ("st %s wait error: %d",
@@ -2257,9 +2434,9 @@ ha_ndbcluster::ndb_index_stat_query(uint
   if (st->read_time == 0)
   {
     DBUG_PRINT("index_stat", ("no index stats"));
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
-    pthread_cond_signal(&COND_ndb_index_stat_thread);
-    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
+    pthread_cond_signal(&ndb_index_stat_thread.COND);
+    pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
     DBUG_RETURN(NdbIndexStat::NoIndexStats);
   }
 

=== modified file 'sql/ha_ndb_index_stat.h'
--- a/sql/ha_ndb_index_stat.h	2011-10-08 16:54:19 +0000
+++ b/sql/ha_ndb_index_stat.h	2011-11-10 10:35:09 +0000
@@ -15,22 +15,43 @@
    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
 */
 
-/* provides declarations only to index_stat.cc */
+#ifndef HA_NDB_INDEX_STAT_H
+#define HA_NDB_INDEX_STAT_H
 
-extern struct st_ndb_status g_ndb_status;
+#include "ndb_component.h"
 
-extern pthread_mutex_t ndbcluster_mutex;
+/* for NdbIndexScanOperation::IndexBound */
+#include <ndbapi/NdbIndexScanOperation.hpp>
 
-extern pthread_t ndb_index_stat_thread;
-extern pthread_cond_t COND_ndb_index_stat_thread;
-extern pthread_mutex_t LOCK_ndb_index_stat_thread;
-
-/* protect entry lists where needed */
-extern pthread_mutex_t ndb_index_stat_list_mutex;
-
-/* protect and signal changes in stats entries */
-extern pthread_mutex_t ndb_index_stat_stat_mutex;
-extern pthread_cond_t ndb_index_stat_stat_cond;
+/* forward declarations */
+struct st_key_range;
+typedef struct st_key_range key_range;
+struct st_key;
+typedef struct st_key KEY;
+
+class Ndb_index_stat_thread : public Ndb_component
+{
+public:
+  Ndb_index_stat_thread();
+  virtual ~Ndb_index_stat_thread();
+
+  int running;
+  pthread_mutex_t LOCK;
+  pthread_cond_t COND;
+  pthread_cond_t COND_ready;
+
+  /* protect entry lists where needed */
+  pthread_mutex_t list_mutex;
+
+  /* protect and signal changes in stats entries */
+  pthread_mutex_t stat_mutex;
+  pthread_cond_t stat_cond;
+
+private:
+  virtual int do_init() { return 0;}
+  virtual void do_run();
+  virtual int do_deinit() { return 0;}
+};
 
 /* these have to live in ha_ndbcluster.cc */
 extern bool ndb_index_stat_get_enable(THD *thd);
@@ -38,7 +59,7 @@ extern const char* g_ndb_status_index_st
 extern long g_ndb_status_index_stat_cache_query;
 extern long g_ndb_status_index_stat_cache_clean;
 
-void 
+void
 compute_index_bounds(NdbIndexScanOperation::IndexBound & bound,
                      const KEY *key_info,
                      const key_range *start_key, const key_range *end_key,
@@ -54,3 +75,5 @@ compute_index_bounds(NdbIndexScanOperati
 
 /* request on stats entry with recent error was ignored */
 #define Ndb_index_stat_error_HAS_ERROR          9003
+
+#endif

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-10-23 07:38:10 +0000
+++ b/sql/ha_ndbcluster.cc	2011-11-10 14:23:53 +0000
@@ -46,6 +46,8 @@
 #include <ndb_version.h>
 #include "ndb_mi.h"
 #include "ndb_conflict_trans.h"
+#include "ndb_component.h"
+#include "ndb_util_thread.h"
 
 #ifdef ndb_dynamite
 #undef assert
@@ -420,25 +422,7 @@ static int ndb_get_table_statistics(THD
 
 THD *injector_thd= 0;
 
-// Util thread variables
-pthread_t ndb_util_thread;
-int ndb_util_thread_running= 0;
-pthread_mutex_t LOCK_ndb_util_thread;
-pthread_cond_t COND_ndb_util_thread;
-pthread_cond_t COND_ndb_util_ready;
-pthread_handler_t ndb_util_thread_func(void *arg);
-
 // Index stats thread variables
-pthread_t ndb_index_stat_thread;
-int ndb_index_stat_thread_running= 0;
-pthread_mutex_t LOCK_ndb_index_stat_thread;
-pthread_cond_t COND_ndb_index_stat_thread;
-pthread_cond_t COND_ndb_index_stat_ready;
-pthread_mutex_t ndb_index_stat_list_mutex;
-pthread_mutex_t ndb_index_stat_stat_mutex;
-pthread_cond_t ndb_index_stat_stat_cond;
-pthread_handler_t ndb_index_stat_thread_func(void *arg);
-
 extern void ndb_index_stat_free(NDB_SHARE *share);
 extern void ndb_index_stat_end();
 
@@ -12251,7 +12235,7 @@ ndbcluster_find_files(handlerton *hton,
 /* Call back after cluster connect */
 static int connect_callback()
 {
-  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
   update_status_variables(NULL, &g_ndb_status,
                           g_ndb_cluster_connection);
 
@@ -12261,8 +12245,8 @@ static int connect_callback()
   while ((node_id= g_ndb_cluster_connection->get_next_node(node_iter)))
     g_node_id_map[node_id]= i++;
 
-  pthread_cond_signal(&COND_ndb_util_thread);
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  pthread_cond_signal(&ndb_util_thread.COND);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
   return 0;
 }
 
@@ -12308,6 +12292,12 @@ int(*ndb_wait_setup_func)(ulong) = 0;
 #endif
 extern int ndb_dictionary_is_mysqld;
 
+/**
+ * Components
+ */
+Ndb_util_thread ndb_util_thread;
+Ndb_index_stat_thread ndb_index_stat_thread;
+
 static int ndbcluster_init(void *p)
 {
   DBUG_ENTER("ndbcluster_init");
@@ -12320,20 +12310,11 @@ static int ndbcluster_init(void *p)
   assert(DependencyTracker::InvalidTransactionId ==
          Ndb_binlog_extra_row_info::InvalidTransactionId);
 #endif
+  ndb_util_thread.init();
+  ndb_index_stat_thread.init();
 
   pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
-  pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
-  pthread_cond_init(&COND_ndb_util_thread, NULL);
-  pthread_cond_init(&COND_ndb_util_ready, NULL);
   pthread_cond_init(&COND_ndb_setup_complete, NULL);
-  ndb_util_thread_running= -1;
-  pthread_mutex_init(&LOCK_ndb_index_stat_thread, MY_MUTEX_INIT_FAST);
-  pthread_cond_init(&COND_ndb_index_stat_thread, NULL);
-  pthread_cond_init(&COND_ndb_index_stat_ready, NULL);
-  pthread_mutex_init(&ndb_index_stat_list_mutex, MY_MUTEX_INIT_FAST);
-  pthread_mutex_init(&ndb_index_stat_stat_mutex, MY_MUTEX_INIT_FAST);
-  pthread_cond_init(&ndb_index_stat_stat_cond, NULL);
-  ndb_index_stat_thread_running= -1;
   ndbcluster_terminating= 0;
   ndb_dictionary_is_mysqld= 1;
   ndb_setup_complete= 0;
@@ -12397,72 +12378,53 @@ static int ndbcluster_init(void *p)
   }
 
   // Create utility thread
-  pthread_t tmp;
-  if (pthread_create(&tmp, &connection_attrib, ndb_util_thread_func, 0))
+  if (ndb_util_thread.start())
   {
     DBUG_PRINT("error", ("Could not create ndb utility thread"));
     my_hash_free(&ndbcluster_open_tables);
     pthread_mutex_destroy(&ndbcluster_mutex);
-    pthread_mutex_destroy(&LOCK_ndb_util_thread);
-    pthread_cond_destroy(&COND_ndb_util_thread);
-    pthread_cond_destroy(&COND_ndb_util_ready);
     pthread_cond_destroy(&COND_ndb_setup_complete);
     ndbcluster_global_schema_lock_deinit();
     goto ndbcluster_init_error;
   }
 
   /* Wait for the util thread to start */
-  pthread_mutex_lock(&LOCK_ndb_util_thread);
-  while (ndb_util_thread_running < 0)
-    pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
+  while (ndb_util_thread.running < 0)
+    pthread_cond_wait(&ndb_util_thread.COND_ready, &ndb_util_thread.LOCK);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
   
-  if (!ndb_util_thread_running)
+  if (!ndb_util_thread.running)
   {
     DBUG_PRINT("error", ("ndb utility thread exited prematurely"));
     my_hash_free(&ndbcluster_open_tables);
     pthread_mutex_destroy(&ndbcluster_mutex);
-    pthread_mutex_destroy(&LOCK_ndb_util_thread);
-    pthread_cond_destroy(&COND_ndb_util_thread);
-    pthread_cond_destroy(&COND_ndb_util_ready);
     pthread_cond_destroy(&COND_ndb_setup_complete);
     ndbcluster_global_schema_lock_deinit();
     goto ndbcluster_init_error;
   }
 
   // Create index statistics thread
-  pthread_t tmp2;
-  if (pthread_create(&tmp2, &connection_attrib, ndb_index_stat_thread_func, 0))
+  if (ndb_index_stat_thread.start())
   {
     DBUG_PRINT("error", ("Could not create ndb index statistics thread"));
     my_hash_free(&ndbcluster_open_tables);
     pthread_mutex_destroy(&ndbcluster_mutex);
-    pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
-    pthread_cond_destroy(&COND_ndb_index_stat_thread);
-    pthread_cond_destroy(&COND_ndb_index_stat_ready);
-    pthread_mutex_destroy(&ndb_index_stat_list_mutex);
-    pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
-    pthread_cond_destroy(&ndb_index_stat_stat_cond);
     goto ndbcluster_init_error;
   }
 
   /* Wait for the index statistics thread to start */
-  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
-  while (ndb_index_stat_thread_running < 0)
-    pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
-  
-  if (!ndb_index_stat_thread_running)
+  pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
+  while (ndb_index_stat_thread.running < 0)
+    pthread_cond_wait(&ndb_index_stat_thread.COND_ready,
+                      &ndb_index_stat_thread.LOCK);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
+
+  if (!ndb_index_stat_thread.running)
   {
     DBUG_PRINT("error", ("ndb index statistics thread exited prematurely"));
     my_hash_free(&ndbcluster_open_tables);
     pthread_mutex_destroy(&ndbcluster_mutex);
-    pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
-    pthread_cond_destroy(&COND_ndb_index_stat_thread);
-    pthread_cond_destroy(&COND_ndb_index_stat_ready);
-    pthread_mutex_destroy(&ndb_index_stat_list_mutex);
-    pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
-    pthread_cond_destroy(&ndb_index_stat_stat_cond);
     goto ndbcluster_init_error;
   }
 
@@ -12476,6 +12438,8 @@ static int ndbcluster_init(void *p)
   DBUG_RETURN(FALSE);
 
 ndbcluster_init_error:
+  ndb_util_thread.deinit();
+  ndb_index_stat_thread.deinit();
   /* disconnect from cluster and free connection resources */
   ndbcluster_disconnect();
   ndbcluster_hton->state= SHOW_OPTION_DISABLED;               // If we couldn't use handler
@@ -12513,12 +12477,13 @@ static int ndbcluster_end(handlerton *ht
 
   /* wait for index stat thread to finish */
   sql_print_information("Stopping Cluster Index Statistics thread");
-  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+  pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
   ndbcluster_terminating= 1;
-  pthread_cond_signal(&COND_ndb_index_stat_thread);
-  while (ndb_index_stat_thread_running > 0)
-    pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  pthread_cond_signal(&ndb_index_stat_thread.COND);
+  while (ndb_index_stat_thread.running > 0)
+    pthread_cond_wait(&ndb_index_stat_thread.COND_ready,
+                      &ndb_index_stat_thread.LOCK);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
 
   /* wait for util and binlog thread to finish */
   ndbcluster_binlog_end(NULL);
@@ -12547,17 +12512,13 @@ static int ndbcluster_end(handlerton *ht
   ndb_index_stat_end();
   ndbcluster_disconnect();
 
+  ndb_util_thread.deinit();
+
   // cleanup ndb interface
   ndb_end_internal();
 
   pthread_mutex_destroy(&ndbcluster_mutex);
-  pthread_mutex_destroy(&LOCK_ndb_util_thread);
-  pthread_cond_destroy(&COND_ndb_util_thread);
-  pthread_cond_destroy(&COND_ndb_util_ready);
   pthread_cond_destroy(&COND_ndb_setup_complete);
-  pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
-  pthread_cond_destroy(&COND_ndb_index_stat_thread);
-  pthread_cond_destroy(&COND_ndb_index_stat_ready);
   ndbcluster_global_schema_lock_deinit();
   DBUG_RETURN(0);
 }
@@ -14776,7 +14737,24 @@ ha_ndbcluster::update_table_comment(
 /**
   Utility thread main loop.
 */
-pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
+Ndb_util_thread::Ndb_util_thread()
+  : running(-1)
+{
+  pthread_mutex_init(&LOCK, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND, NULL);
+  pthread_cond_init(&COND_ready, NULL);
+}
+
+Ndb_util_thread::~Ndb_util_thread()
+{
+  assert(running <= 0);
+  pthread_mutex_destroy(&LOCK);
+  pthread_cond_destroy(&COND);
+  pthread_cond_destroy(&COND_ready);
+}
+
+void
+Ndb_util_thread::do_run()
 {
   THD *thd; /* needs to be first for thread_stack */
   struct timespec abstime;
@@ -14784,21 +14762,18 @@ pthread_handler_t ndb_util_thread_func(v
   uint share_list_size= 0;
   NDB_SHARE **share_list= NULL;
 
-  my_thread_init();
   DBUG_ENTER("ndb_util_thread");
   DBUG_PRINT("enter", ("cache_check_time: %lu", opt_ndb_cache_check_time));
- 
-   pthread_mutex_lock(&LOCK_ndb_util_thread);
+
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
 
   thd= new THD; /* note that contructor of THD uses DBUG_ */
   if (thd == NULL)
   {
     my_errno= HA_ERR_OUT_OF_MEM;
-    DBUG_RETURN(NULL);
+    DBUG_VOID_RETURN;
   }
   THD_CHECK_SENTRY(thd);
-  pthread_detach_this_thread();
-  ndb_util_thread= pthread_self();
 
   thd->thread_stack= (char*)&thd; /* remember where our stack is */
   if (thd->store_globals())
@@ -14821,9 +14796,9 @@ pthread_handler_t ndb_util_thread_func(v
   thd->update_charset();
 
   /* Signal successful initialization */
-  ndb_util_thread_running= 1;
-  pthread_cond_signal(&COND_ndb_util_ready);
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  ndb_util_thread.running= 1;
+  pthread_cond_signal(&ndb_util_thread.COND_ready);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
 
   /*
     wait for mysql server to start
@@ -14837,7 +14812,7 @@ pthread_handler_t ndb_util_thread_func(v
     if (ndbcluster_terminating)
     {
       mysql_mutex_unlock(&LOCK_server_started);
-      pthread_mutex_lock(&LOCK_ndb_util_thread);
+      pthread_mutex_lock(&ndb_util_thread.LOCK);
       goto ndb_util_thread_end;
     }
   }
@@ -14846,21 +14821,21 @@ pthread_handler_t ndb_util_thread_func(v
   /*
     Wait for cluster to start
   */
-  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
   while (!g_ndb_status.cluster_node_id && (ndbcluster_hton->slot != ~(uint)0))
   {
     /* ndb not connected yet */
-    pthread_cond_wait(&COND_ndb_util_thread, &LOCK_ndb_util_thread);
+    pthread_cond_wait(&ndb_util_thread.COND, &ndb_util_thread.LOCK);
     if (ndbcluster_terminating)
       goto ndb_util_thread_end;
   }
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
 
   /* Get thd_ndb for this thread */
   if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb(thd)))
   {
     sql_print_error("Could not allocate Thd_ndb object");
-    pthread_mutex_lock(&LOCK_ndb_util_thread);
+    pthread_mutex_lock(&ndb_util_thread.LOCK);
     goto ndb_util_thread_end;
   }
   set_thd_ndb(thd, thd_ndb);
@@ -14872,14 +14847,14 @@ pthread_handler_t ndb_util_thread_func(v
   set_timespec(abstime, 0);
   for (;;)
   {
-    pthread_mutex_lock(&LOCK_ndb_util_thread);
+    pthread_mutex_lock(&ndb_util_thread.LOCK);
     if (!ndbcluster_terminating)
-      pthread_cond_timedwait(&COND_ndb_util_thread,
-                             &LOCK_ndb_util_thread,
+      pthread_cond_timedwait(&ndb_util_thread.COND,
+                             &ndb_util_thread.LOCK,
                              &abstime);
     if (ndbcluster_terminating) /* Shutting down server */
       goto ndb_util_thread_end;
-    pthread_mutex_unlock(&LOCK_ndb_util_thread);
+    pthread_mutex_unlock(&ndb_util_thread.LOCK);
 #ifdef NDB_EXTRA_DEBUG_UTIL_THREAD
     DBUG_PRINT("ndb_util_thread", ("Started, cache_check_time: %lu",
                                    opt_ndb_cache_check_time));
@@ -15032,7 +15007,7 @@ next:
     set_timespec_nsec(abstime, opt_ndb_cache_check_time * 1000000ULL);
   }
 
-  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
 
 ndb_util_thread_end:
   net_end(&thd->net);
@@ -15048,15 +15023,12 @@ ndb_util_thread_fail:
   delete thd;
   
   /* signal termination */
-  ndb_util_thread_running= 0;
-  pthread_cond_signal(&COND_ndb_util_ready);
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  ndb_util_thread.running= 0;
+  pthread_cond_signal(&ndb_util_thread.COND_ready);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
   DBUG_PRINT("exit", ("ndb_util_thread"));
 
   DBUG_LEAVE;                               // Must match DBUG_ENTER()
-  my_thread_end();
-  pthread_exit(0);
-  return NULL;                              // Avoid compiler warnings
 }
 
 /*

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-10-17 16:46:12 +0000
+++ b/sql/ha_ndbcluster.h	2011-11-10 14:23:53 +0000
@@ -1090,7 +1090,9 @@ void ndbcluster_print_error(int error, c
 static const char ndbcluster_hton_name[]= "ndbcluster";
 static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1;
 extern int ndbcluster_terminating;
-extern int ndb_util_thread_running;
-extern pthread_cond_t COND_ndb_util_ready;
-extern int ndb_index_stat_thread_running;
-extern pthread_cond_t COND_ndb_index_stat_ready;
+
+#include "ndb_util_thread.h"
+extern Ndb_util_thread ndb_util_thread;
+
+#include "ha_ndb_index_stat.h"
+extern Ndb_index_stat_thread ndb_index_stat_thread;

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-10-20 19:18:20 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-11-10 14:23:53 +0000
@@ -812,7 +812,7 @@ int ndbcluster_binlog_end(THD *thd)
 {
   DBUG_ENTER("ndbcluster_binlog_end");
 
-  if (ndb_util_thread_running > 0)
+  if (ndb_util_thread.running > 0)
   {
     /*
       Wait for util thread to die (as this uses the injector mutex)
@@ -822,33 +822,34 @@ int ndbcluster_binlog_end(THD *thd)
       be called before ndb_cluster_end().
     */
     sql_print_information("Stopping Cluster Utility thread");
-    pthread_mutex_lock(&LOCK_ndb_util_thread);
+    pthread_mutex_lock(&ndb_util_thread.LOCK);
     /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
-    ndb_util_thread_running++;
+    ndb_util_thread.running++;
     ndbcluster_terminating= 1;
-    pthread_cond_signal(&COND_ndb_util_thread);
-    while (ndb_util_thread_running > 1)
-      pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
-    ndb_util_thread_running--;
-    pthread_mutex_unlock(&LOCK_ndb_util_thread);
+    pthread_cond_signal(&ndb_util_thread.COND);
+    while (ndb_util_thread.running > 1)
+      pthread_cond_wait(&ndb_util_thread.COND_ready, &ndb_util_thread.LOCK);
+    ndb_util_thread.running--;
+    pthread_mutex_unlock(&ndb_util_thread.LOCK);
   }
 
-  if (ndb_index_stat_thread_running > 0)
+  if (ndb_index_stat_thread.running > 0)
   {
     /*
       Index stats thread blindly imitates util thread.  Following actually
       fixes some "[Warning] Plugin 'ndbcluster' will be forced to shutdown".
     */
     sql_print_information("Stopping Cluster Index Stats thread");
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
-    ndb_index_stat_thread_running++;
+    ndb_index_stat_thread.running++;
     ndbcluster_terminating= 1;
-    pthread_cond_signal(&COND_ndb_index_stat_thread);
-    while (ndb_index_stat_thread_running > 1)
-      pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
-    ndb_index_stat_thread_running--;
-    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    pthread_cond_signal(&ndb_index_stat_thread.COND);
+    while (ndb_index_stat_thread.running > 1)
+      pthread_cond_wait(&ndb_index_stat_thread.COND_ready,
+                        &ndb_index_stat_thread.LOCK);
+    ndb_index_stat_thread.running--;
+    pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
   }
 
   if (ndbcluster_binlog_inited)

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-10-20 19:18:20 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-11-10 14:23:53 +0000
@@ -191,10 +191,6 @@ void ndbcluster_global_schema_lock_init(
 void ndbcluster_global_schema_lock_deinit();
 
 extern unsigned char g_node_id_map[max_ndb_nodes];
-extern pthread_mutex_t LOCK_ndb_util_thread;
-extern pthread_cond_t COND_ndb_util_thread;
-extern pthread_mutex_t LOCK_ndb_index_stat_thread;
-extern pthread_cond_t COND_ndb_index_stat_thread;
 extern pthread_mutex_t ndbcluster_mutex;
 extern HASH ndbcluster_open_tables;
 

=== added file 'sql/ndb_component.cc'
--- a/sql/ndb_component.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_component.cc	2011-11-10 08:16:52 +0000
@@ -0,0 +1,143 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ndb_component.h"
+
+Ndb_component::Ndb_component()
+  : m_thread_state(TS_UNINIT)
+{
+}
+
+Ndb_component::~Ndb_component()
+{
+
+}
+
+int
+Ndb_component::init()
+{
+  assert(m_thread_state == TS_UNINIT);
+
+  pthread_mutex_init(&m_start_stop_mutex, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&m_start_stop_cond, NULL);
+
+  int res= do_init();
+  if (res == 0)
+  {
+    m_thread_state= TS_INIT;
+  }
+  return res;
+}
+
+void *
+Ndb_component_run_C(void * arg)
+{
+  my_thread_init();
+  Ndb_component * self = reinterpret_cast<Ndb_component*>(arg);
+  self->run_impl();
+  my_thread_end();
+  pthread_exit(0);
+  return NULL;                              // Avoid compiler warnings
+}
+
+extern pthread_attr_t connection_attrib; // mysql global pthread attr
+
+int
+Ndb_component::start()
+{
+  assert(m_thread_state == TS_INIT);
+  pthread_mutex_lock(&m_start_stop_mutex);
+  m_thread_state= TS_STARTING;
+  int res= pthread_create(&m_thread, &connection_attrib, Ndb_component_run_C,
+                          this);
+
+  if (res == 0)
+  {
+    while (m_thread_state == TS_STARTING)
+    {
+      pthread_cond_wait(&m_start_stop_cond, &m_start_stop_mutex);
+    }
+    pthread_mutex_unlock(&m_start_stop_mutex);
+    return m_thread_state == TS_RUNNING ? 0 : 1;
+  }
+
+  pthread_mutex_unlock(&m_start_stop_mutex);
+  return res;
+}
+
+void
+Ndb_component::run_impl()
+{
+  pthread_detach_this_thread();
+  pthread_mutex_lock(&m_start_stop_mutex);
+  if (m_thread_state == TS_STARTING)
+  {
+    m_thread_state= TS_RUNNING;
+    pthread_cond_signal(&m_start_stop_cond);
+    pthread_mutex_unlock(&m_start_stop_mutex);
+    do_run();
+    pthread_mutex_lock(&m_start_stop_mutex);
+  }
+  m_thread_state = TS_STOPPED;
+  pthread_cond_signal(&m_start_stop_cond);
+  pthread_mutex_unlock(&m_start_stop_mutex);
+}
+
+bool
+Ndb_component::is_stop_requested()
+{
+  bool res = false;
+  pthread_mutex_lock(&m_start_stop_mutex);
+  res = m_thread_state != TS_RUNNING;
+  pthread_mutex_unlock(&m_start_stop_mutex);
+  return res;
+}
+
+int
+Ndb_component::stop()
+{
+  pthread_mutex_lock(&m_start_stop_mutex);
+  assert(m_thread_state == TS_RUNNING ||
+         m_thread_state == TS_STOPPING ||
+         m_thread_state == TS_STOPPED);
+
+  if (m_thread_state == TS_RUNNING)
+  {
+    m_thread_state= TS_STOPPING;
+  }
+
+  if (m_thread_state == TS_STOPPING)
+  {
+    while (m_thread_state != TS_STOPPED)
+    {
+      pthread_cond_signal(&m_start_stop_cond);
+      pthread_cond_wait(&m_start_stop_cond, &m_start_stop_mutex);
+    }
+  }
+  pthread_mutex_unlock(&m_start_stop_mutex);
+
+  return 0;
+}
+
+int
+Ndb_component::deinit()
+{
+  assert(m_thread_state == TS_STOPPED);
+  pthread_mutex_destroy(&m_start_stop_mutex);
+  pthread_cond_destroy(&m_start_stop_cond);
+  return do_deinit();
+}

=== added file 'sql/ndb_component.h'
--- a/sql/ndb_component.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_component.h	2011-11-10 08:16:52 +0000
@@ -0,0 +1,82 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef HA_NDBCLUSTER_COMPONENT_H
+#define HA_NDBCLUSTER_COMPONENT_H
+
+#include <my_global.h>
+#include <my_pthread.h>
+
+extern "C" void * Ndb_component_run_C(void *);
+
+class Ndb_component
+{
+public:
+  virtual int init();
+  virtual int start();
+  virtual int stop();
+  virtual int deinit();
+
+protected:
+  /**
+   * Con/de-structor is protected...so that sub-class needs to provide own
+   */
+  Ndb_component();
+  virtual ~Ndb_component();
+
+  /**
+   * Component init function
+   */
+  virtual int do_init() = 0;
+
+  /**
+   * Component run function
+   */
+  virtual void do_run() = 0;
+
+  /**
+   * Component deinit function
+   */
+  virtual int do_deinit() = 0;
+
+  /**
+   * For usage in threads main loop
+   */
+  bool is_stop_requested();
+
+private:
+
+  enum ThreadState
+  {
+    TS_UNINIT   = 0,
+    TS_INIT     = 1,
+    TS_STARTING = 2,
+    TS_RUNNING  = 3,
+    TS_STOPPING = 4,
+    TS_STOPPED  = 5
+  };
+
+  ThreadState m_thread_state;
+  pthread_t m_thread;
+  pthread_mutex_t m_start_stop_mutex;
+  pthread_cond_t m_start_stop_cond;
+
+  void run_impl();
+  friend void * Ndb_component_run_C(void *);
+};
+
+#endif

=== added file 'sql/ndb_util_thread.h'
--- a/sql/ndb_util_thread.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_util_thread.h	2011-11-10 08:16:52 +0000
@@ -0,0 +1,40 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_UTIL_THREAD_H
+#define NDB_UTIL_THREAD_H
+
+#include "ndb_component.h"
+
+class Ndb_util_thread : public Ndb_component
+{
+public:
+  Ndb_util_thread();
+  virtual ~Ndb_util_thread();
+
+  int running;
+  pthread_mutex_t LOCK;
+  pthread_cond_t COND;
+  pthread_cond_t COND_ready;
+
+private:
+  virtual int do_init() { return 0;}
+  virtual void do_run();
+  virtual int do_deinit() { return 0;}
+};
+
+#endif

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2011-09-08 06:22:07 +0000
+++ b/storage/ndb/CMakeLists.txt	2011-11-10 14:23:53 +0000
@@ -178,7 +178,8 @@ SET(NDBCLUSTER_SOURCES
   ../../sql/ha_ndb_index_stat.cc
   ../../sql/ha_ndbinfo.cc
   ../../sql/ndb_mi.cc
-  ../../sql/ndb_conflict_trans.cc)
+  ../../sql/ndb_conflict_trans.cc
+  ../../sql/ndb_component.cc)
 INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include)
 
 IF(EXISTS ${CMAKE_SOURCE_DIR}/storage/mysql_storage_engine.cmake)

=== modified file 'storage/ndb/include/kernel/signaldata/ScanFrag.hpp'
--- a/storage/ndb/include/kernel/signaldata/ScanFrag.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/ScanFrag.hpp	2011-11-09 13:10:53 +0000
@@ -177,7 +177,7 @@ public:
   Uint32 fragmentCompleted;
   Uint32 transId1;
   Uint32 transId2;
-  Uint32 total_len;
+  Uint32 total_len;  // Total #Uint32 returned as TRANSID_AI
 };
 
 class ScanFragRef {

=== modified file 'storage/ndb/include/kernel/signaldata/TupKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/TupKey.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/TupKey.hpp	2011-11-09 13:10:53 +0000
@@ -91,7 +91,7 @@ private:
    * DATA VARIABLES
    */
   Uint32 userPtr;
-  Uint32 readLength;
+  Uint32 readLength;  // Length in Uint32 words
   Uint32 writeLength;
   Uint32 noFiredTriggers;
   Uint32 lastRow;

=== modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp'
--- a/storage/ndb/include/ndbapi/NdbReceiver.hpp	2011-08-17 12:36:56 +0000
+++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp	2011-11-09 13:10:53 +0000
@@ -105,16 +105,13 @@ private:
 
   static
   void calculate_batch_size(const NdbImpl&,
-                            const NdbRecord *,
-                            const NdbRecAttr *first_rec_attr,
-                            Uint32, Uint32, Uint32&, Uint32&, Uint32&);
-
-  void calculate_batch_size(Uint32 key_size,
                             Uint32 parallelism,
                             Uint32& batch_size,
-                            Uint32& batch_byte_size,
-                            Uint32& first_batch_size,
-                            const NdbRecord *rec) const;
+                            Uint32& batch_byte_size);
+
+  void calculate_batch_size(Uint32 parallelism,
+                            Uint32& batch_size,
+                            Uint32& batch_byte_size) const;
 
   /*
     Set up buffers for receiving TRANSID_AI and KEYINFO20 signals

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-10-28 16:37:39 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-11-10 14:23:53 +0000
@@ -11205,7 +11205,7 @@ void Dblqh::scanTupkeyConfLab(Signal* si
     tdata4 += sendKeyinfo20(signal, scanptr.p, tcConnectptr.p);
   }//if
   ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
-  scanptr.p->m_curr_batch_size_bytes+= tdata4;
+  scanptr.p->m_curr_batch_size_bytes+= tdata4 * sizeof(Uint32);
   scanptr.p->m_curr_batch_size_rows = rows + 1;
   scanptr.p->m_last_row = tdata5;
   if (scanptr.p->check_scan_batch_completed() | tdata5){
@@ -11832,6 +11832,7 @@ void Dblqh::releaseScanrec(Signal* signa
 /* ------------------------------------------------------------------------
  * -------              SEND KEYINFO20 TO API                       ------- 
  *
+ * Return: Length in number of Uint32 words
  * ------------------------------------------------------------------------  */
 Uint32 Dblqh::sendKeyinfo20(Signal* signal, 
 			    ScanRecord * scanP, 
@@ -11968,7 +11969,9 @@ Uint32 Dblqh::sendKeyinfo20(Signal* sign
 void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) 
 {
   Uint32 completed_ops= scanptr.p->m_curr_batch_size_rows;
-  Uint32 total_len= scanptr.p->m_curr_batch_size_bytes;
+  Uint32 total_len= scanptr.p->m_curr_batch_size_bytes / sizeof(Uint32);
+  ndbassert((scanptr.p->m_curr_batch_size_bytes % sizeof(Uint32)) == 0);
+
   scanptr.p->scanTcWaiting = 0;
 
   if(ERROR_INSERTED(5037)){

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-10-03 07:08:19 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-11-10 14:23:53 +0000
@@ -871,6 +871,7 @@ public:
     Uint32 m_senderRef;
     Uint32 m_senderData;
     Uint32 m_rootResultData;
+    Uint32 m_rootFragId;
     Uint32 m_transId[2];
     TreeNode_list::Head m_nodes;
     TreeNodeCursor_list::Head m_cursor_nodes;

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-11-03 10:20:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-11-10 14:23:53 +0000
@@ -482,6 +482,7 @@ Dbspj::do_init(Request* requestP, const
   requestP->m_outstanding = 0;
   requestP->m_transId[0] = req->transId1;
   requestP->m_transId[1] = req->transId2;
+  requestP->m_rootFragId = LqhKeyReq::getFragmentId(req->fragmentData);
   bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
 #ifdef SPJ_TRACE_TIME
   requestP->m_cnt_batches = 0;
@@ -777,6 +778,7 @@ Dbspj::do_init(Request* requestP, const
   requestP->m_transId[0] = req->transId1;
   requestP->m_transId[1] = req->transId2;
   requestP->m_rootResultData = req->resultData;
+  requestP->m_rootFragId = req->fragmentNoKeyLen;
   bzero(requestP->m_lookup_node_data, sizeof(requestP->m_lookup_node_data));
 #ifdef SPJ_TRACE_TIME
   requestP->m_cnt_batches = 0;
@@ -2187,12 +2189,6 @@ Dbspj::storeRow(Ptr<Request> requestPtr,
     jam();
     return DbspjErr::OutOfRowMemory;
   }
-
-  row.m_type = RowPtr::RT_LINEAR;
-  row.m_row_data.m_linear.m_row_ref = ref;
-  row.m_row_data.m_linear.m_header = (RowPtr::Header*)(dstptr + linklen);
-  row.m_row_data.m_linear.m_data = dstptr + linklen + headlen;
-
   memcpy(dstptr + linklen, headptr, 4 * headlen);
   copy(dstptr + linklen + headlen, dataPtr);
 
@@ -2205,9 +2201,30 @@ Dbspj::storeRow(Ptr<Request> requestPtr,
   else
   {
     jam();
-    return add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref);
+    Uint32 error = add_to_map(requestPtr, treeNodePtr, row.m_src_correlation, ref);
+    if (unlikely(error))
+      return error;
+  }
+
+  /**
+   * Refetch pointer to alloc'ed row memory  before creating RowPtr 
+   * as above add_to_xxx may mave reorganized memory causing
+   * alloced row to be moved.
+   */
+  Uint32 * rowptr = 0;
+  if (ref.m_allocator == 0)
+  {
+    jam();
+    rowptr = get_row_ptr_stack(ref);
+  }
+  else
+  {
+    jam();
+    rowptr = get_row_ptr_var(ref);
   }
 
+//ndbrequire(rowptr==dstptr);  // It moved which we now do handle
+  setupRowPtr(treeNodePtr, row, ref, rowptr);
   return 0;
 }
 
@@ -4615,12 +4632,17 @@ Dbspj::execDIH_SCAN_TAB_CONF(Signal* sig
   Ptr<Request> requestPtr;
   m_request_pool.getPtr(requestPtr, treeNodePtr.p->m_requestPtrI);
 
+  // Add a skew in the fragment lists such that we don't scan 
+  // the same subset of frags fram all SPJ requests in case of
+  // the scan not being ' T_SCAN_PARALLEL'
+  Uint16 fragNoOffs = requestPtr.p->m_rootFragId % fragCount;
+
   Ptr<ScanFragHandle> fragPtr;
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
   if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
   {
     jam();
-    fragPtr.p->init(0);
+    fragPtr.p->init(fragNoOffs);
     fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
     list.addLast(fragPtr);
   }
@@ -4686,10 +4708,11 @@ Dbspj::execDIH_SCAN_TAB_CONF(Signal* sig
     {
       jam();
       Ptr<ScanFragHandle> fragPtr;
+      Uint16 fragNo = (fragNoOffs+i) % fragCount;
       if (likely(m_scanfraghandle_pool.seize(requestPtr.p->m_arena, fragPtr)))
       {
         jam();
-        fragPtr.p->init(i);
+        fragPtr.p->init(fragNo);
         fragPtr.p->m_treeNodePtrI = treeNodePtr.i;
         list.addLast(fragPtr);
       }
@@ -5192,6 +5215,7 @@ Dbspj::scanIndex_send(Signal* signal,
   jam();
   ndbassert(bs_bytes > 0);
   ndbassert(bs_rows > 0);
+  ndbassert(bs_rows <= bs_bytes);
   /**
    * if (m_bits & prunemask):
    * - Range keys sliced out to each ScanFragHandle
@@ -5420,6 +5444,7 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
 
   Uint32 rows = conf->completedOps;
   Uint32 done = conf->fragmentCompleted;
+  Uint32 bytes = conf->total_len * sizeof(Uint32);
 
   Uint32 state = fragPtr.p->m_state;
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
@@ -5435,9 +5460,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
 
   requestPtr.p->m_rows += rows;
   data.m_totalRows += rows;
-  data.m_totalBytes += conf->total_len;
+  data.m_totalBytes += bytes;
   data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
-  data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
+  data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, bytes);
 
   if (!treeNodePtr.p->isLeaf())
   {
@@ -5532,37 +5557,43 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
         org->batch_size_rows / data.m_parallelism * (data.m_parallelism - 1)
         + data.m_totalRows;
       
-      // Number of rows that we can still fetch in this batch.
+      // Number of rows & bytes that we can still fetch in this batch.
       const Int32 remainingRows 
         = static_cast<Int32>(org->batch_size_rows - maxCorrVal);
-      
+      const Int32 remainingBytes 
+        = static_cast<Int32>(org->batch_size_bytes - data.m_totalBytes);
+
       if (remainingRows >= data.m_frags_not_started &&
+          remainingBytes >= data.m_frags_not_started &&
           /**
            * Check that (remaning row capacity)/(remaining fragments) is 
            * greater or equal to (rows read so far)/(finished fragments).
            */
           remainingRows * static_cast<Int32>(data.m_parallelism) >=
-          static_cast<Int32>(data.m_totalRows * data.m_frags_not_started) &&
-          (org->batch_size_bytes - data.m_totalBytes) * data.m_parallelism >=
-          data.m_totalBytes * data.m_frags_not_started)
+            static_cast<Int32>(data.m_totalRows * data.m_frags_not_started) &&
+          remainingBytes * static_cast<Int32>(data.m_parallelism) >=
+            static_cast<Int32>(data.m_totalBytes * data.m_frags_not_started))
       {
         jam();
         Uint32 batchRange = maxCorrVal;
+        Uint32 bs_rows  = remainingRows / data.m_frags_not_started;
+        Uint32 bs_bytes = remainingBytes / data.m_frags_not_started;
+
         DEBUG("::scanIndex_execSCAN_FRAGCONF() first batch was not full."
               " Asking for new batches from " << data.m_frags_not_started <<
               " fragments with " << 
-              remainingRows / data.m_frags_not_started 
-              <<" rows and " << 
-              (org->batch_size_bytes - data.m_totalBytes)
-              / data.m_frags_not_started 
-              << " bytes.");
+              bs_rows  <<" rows and " << 
+              bs_bytes << " bytes.");
+
+        if (unlikely(bs_rows > bs_bytes))
+          bs_rows = bs_bytes;
+
         scanIndex_send(signal,
                        requestPtr,
                        treeNodePtr,
                        data.m_frags_not_started,
-                       (org->batch_size_bytes - data.m_totalBytes)
-                       / data.m_frags_not_started,
-                       remainingRows / data.m_frags_not_started,
+                       bs_bytes,
+                       bs_rows,
                        batchRange);
         return;
       }

=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2011-10-21 10:58:09 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2011-11-10 14:23:53 +0000
@@ -6795,8 +6795,6 @@ NdbDictionaryImpl::initialiseColumnData(
   recCol->orgAttrSize= col->m_orgAttrSize;
   if (recCol->offset+recCol->maxSize > rec->m_row_size)
     rec->m_row_size= recCol->offset+recCol->maxSize;
-  /* Round data size to whole words + 4 bytes of AttributeHeader. */
-  rec->m_max_transid_ai_bytes+= (recCol->maxSize+7) & ~3;
   recCol->charset_info= col->m_cs;
   recCol->compare_function= NdbSqlUtil::getType(col->m_type).m_cmp;
   recCol->flags= 0;
@@ -6985,7 +6983,6 @@ NdbDictionaryImpl::createRecord(const Nd
   }
 
   rec->m_row_size= 0;
-  rec->m_max_transid_ai_bytes= 0;
   for (i= 0; i<length; i++)
   {
     const NdbDictionary::RecordSpecification *rs= &recSpec[i];

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-09-19 10:11:11 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-11-10 14:23:53 +0000
@@ -878,6 +878,11 @@ NdbIndexStatImpl::sys_read_head(Con& con
     return -1;
   if (sys_head_getvalue(con) == -1)
     return -1;
+  if (con.m_op->setAbortOption(NdbOperation::AbortOnError) == -1)
+  {
+    setError(con, __LINE__);
+    return -1;
+  }
   if (con.execute(commit) == -1)
   {
     setError(con, __LINE__);

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-10-28 13:38:36 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-11-09 13:10:53 +0000
@@ -3058,20 +3058,16 @@ NdbQueryImpl::doSend(int nodeId, bool la
     scanTabReq->transId2 = (Uint32) (transId >> 32);
 
     Uint32 batchRows = root.getMaxBatchRows();
-    Uint32 batchByteSize, firstBatchRows;
+    Uint32 batchByteSize;
     NdbReceiver::calculate_batch_size(* ndb.theImpl,
-                                      root.m_ndbRecord,
-                                      root.m_firstRecAttr,
-                                      0, // Key size.
                                       getRootFragCount(),
                                       batchRows,
-                                      batchByteSize,
-                                      firstBatchRows);
+                                      batchByteSize);
     assert(batchRows==root.getMaxBatchRows());
-    assert(batchRows==firstBatchRows);
+    assert(batchRows<=batchByteSize);
     ScanTabReq::setScanBatch(reqInfo, batchRows);
     scanTabReq->batch_byte_size = batchByteSize;
-    scanTabReq->first_batch_size = firstBatchRows;
+    scanTabReq->first_batch_size = batchRows;
 
     ScanTabReq::setViaSPJFlag(reqInfo, 1);
     ScanTabReq::setPassAllConfsFlag(reqInfo, 1);
@@ -4361,11 +4357,11 @@ NdbQueryOperationImpl
      * We must thus make sure that we do not set a batch size for the scan 
      * that exceeds what any of its scan descendants can use.
      *
-     * Ignore calculated 'batchByteSize' and 'firstBatchRows' 
+     * Ignore calculated 'batchByteSize' 
      * here - Recalculated when building signal after max-batchRows has been 
      * determined.
      */
-    Uint32 batchByteSize, firstBatchRows;
+    Uint32 batchByteSize;
     /**
      * myClosestScan->m_maxBatchRows may be zero to indicate that we
      * should use default values, or non-zero if the application had an 
@@ -4373,18 +4369,14 @@ NdbQueryOperationImpl
      */
     maxBatchRows = myClosestScan->m_maxBatchRows;
     NdbReceiver::calculate_batch_size(* ndb.theImpl,
-                                      m_ndbRecord,
-                                      m_firstRecAttr,
-                                      0, // Key size.
                                       getRoot().m_parallelism
-                                      == Parallelism_max ?
-                                      m_queryImpl.getRootFragCount() :
-                                      getRoot().m_parallelism,
+                                      == Parallelism_max
+                                      ? m_queryImpl.getRootFragCount()
+                                      : getRoot().m_parallelism,
                                       maxBatchRows,
-                                      batchByteSize,
-                                      firstBatchRows);
+                                      batchByteSize);
     assert(maxBatchRows > 0);
-    assert(firstBatchRows == maxBatchRows);
+    assert(maxBatchRows <= batchByteSize);
   }
 
   // Find the largest value that is acceptable to all lookup descendants.
@@ -4554,17 +4546,13 @@ NdbQueryOperationImpl::prepareAttrInfo(U
     Ndb& ndb = *m_queryImpl.getNdbTransaction().getNdb();
 
     Uint32 batchRows = getMaxBatchRows();
-    Uint32 batchByteSize, firstBatchRows;
+    Uint32 batchByteSize;
     NdbReceiver::calculate_batch_size(* ndb.theImpl,
-                                      m_ndbRecord,
-                                      m_firstRecAttr,
-                                      0, // Key size.
                                       m_queryImpl.getRootFragCount(),
                                       batchRows,
-                                      batchByteSize,
-                                      firstBatchRows);
-    assert(batchRows == firstBatchRows);
+                                      batchByteSize);
     assert(batchRows == getMaxBatchRows());
+    assert(batchRows <= batchByteSize);
     assert(m_parallelism == Parallelism_max ||
            m_parallelism == Parallelism_adaptive);
     if (m_parallelism == Parallelism_max)

=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp	2011-08-17 12:36:56 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp	2011-11-09 13:10:53 +0000
@@ -155,88 +155,57 @@ NdbReceiver::prepareRead(char *buf, Uint
   Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
   use, taking into account limits in the transporter, user preference, etc.
 
-  Hm, there are some magic overhead numbers (4 bytes/attr, 32 bytes/row) here,
-  would be nice with some explanation on how these numbers were derived.
+  It is the responsibility of the batch producer (LQH+TUP) to
+  stay within these 'batch_size' and 'batch_byte_size' limits.:
 
-  TODO : Check whether these numbers need to be revised w.r.t. read packed
+  - It should stay strictly within the 'batch_size' (#rows) limit.
+  - It is allowed to overallocate the 'batch_byte_size' (slightly)
+    in order to complete the current row when it hit the limit.
+
+  The client should be prepared to receive, and buffer, upto 
+  'batch_size' rows from each fragment.
+  ::ndbrecord_rowsize() might be usefull for calculating the
+  buffersize to allocate for this resultset.
 */
 //static
 void
 NdbReceiver::calculate_batch_size(const NdbImpl& theImpl,
-                                  const NdbRecord *record,
-                                  const NdbRecAttr *first_rec_attr,
-                                  Uint32 key_size,
                                   Uint32 parallelism,
                                   Uint32& batch_size,
-                                  Uint32& batch_byte_size,
-                                  Uint32& first_batch_size)
+                                  Uint32& batch_byte_size)
 {
   const NdbApiConfig & cfg = theImpl.get_ndbapi_config_parameters();
   const Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
   const Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
   const Uint32 max_batch_size= cfg.m_batch_size;
 
-  Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
-  if (record)
-  {
-    tot_size+= record->m_max_transid_ai_bytes;
-  }
-
-  const NdbRecAttr *rec_attr= first_rec_attr;
-  while (rec_attr != NULL) {
-    Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
-    attr_size= ((attr_size + 4 + 3) >> 2) << 2; //Even to word + overhead
-    tot_size+= attr_size;
-    rec_attr= rec_attr->next();
+  batch_byte_size= max_batch_byte_size;
+  if (batch_byte_size * parallelism > max_scan_batch_size) {
+    batch_byte_size= max_scan_batch_size / parallelism;
   }
 
-  tot_size+= 32; //include signal overhead
-
-  /**
-   * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
-   * bytes sent for each batch from each node. We do however ensure that
-   * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
-   * batch.
-   */
-  if (batch_size == 0)
-  {
-    batch_byte_size= max_batch_byte_size;
+  if (batch_size == 0 || batch_size > max_batch_size) {
+    batch_size= max_batch_size;
   }
-  else
-  {
-    batch_byte_size= batch_size * tot_size;
+  if (unlikely(batch_size > MAX_PARALLEL_OP_PER_SCAN)) {
+    batch_size= MAX_PARALLEL_OP_PER_SCAN;
   }
-  
-  if (batch_byte_size * parallelism > max_scan_batch_size) {
-    batch_byte_size= max_scan_batch_size / parallelism;
-  }
-  batch_size= batch_byte_size / tot_size;
-  if (batch_size == 0) {
-    batch_size= 1;
-  } else {
-    if (batch_size > max_batch_size) {
-      batch_size= max_batch_size;
-    } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
-      batch_size= MAX_PARALLEL_OP_PER_SCAN;
-    }
+  if (unlikely(batch_size > batch_byte_size)) {
+    batch_size= batch_byte_size;
   }
-  first_batch_size= batch_size;
+
   return;
 }
 
 void
-NdbReceiver::calculate_batch_size(Uint32 key_size,
-                                  Uint32 parallelism,
+NdbReceiver::calculate_batch_size(Uint32 parallelism,
                                   Uint32& batch_size,
-                                  Uint32& batch_byte_size,
-                                  Uint32& first_batch_size,
-                                  const NdbRecord *record) const
+                                  Uint32& batch_byte_size) const
 {
   calculate_batch_size(* m_ndb->theImpl,
-                       record,
-                       theFirstRecAttr,
-                       key_size, parallelism, batch_size, batch_byte_size,
-                       first_batch_size);
+                       parallelism,
+                       batch_size,
+                       batch_byte_size);
 }
 
 void

=== modified file 'storage/ndb/src/ndbapi/NdbRecord.hpp'
--- a/storage/ndb/src/ndbapi/NdbRecord.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbRecord.hpp	2011-11-09 13:10:53 +0000
@@ -189,8 +189,6 @@ public:
   Uint32 tableVersion;
   /* Copy of table->m_keyLenInWords. */
   Uint32 m_keyLenInWords;
-  /* Total maximum size of TRANSID_AI data (for computing batch size). */
-  Uint32 m_max_transid_ai_bytes;
   /**
    * Number of distribution keys (usually == number of primary keys).
    *

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2011-05-17 12:47:21 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2011-11-09 13:10:53 +0000
@@ -2284,16 +2284,13 @@ int NdbScanOperation::prepareSendScan(Ui
    */
   ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
   Uint32 batch_size = req->first_batch_size; // User specified
-  Uint32 batch_byte_size, first_batch_size;
-  theReceiver.calculate_batch_size(key_size,
-                                   theParallelism,
+  Uint32 batch_byte_size;
+  theReceiver.calculate_batch_size(theParallelism,
                                    batch_size,
-                                   batch_byte_size,
-                                   first_batch_size,
-                                   m_attribute_record);
+                                   batch_byte_size);
   ScanTabReq::setScanBatch(req->requestInfo, batch_size);
   req->batch_byte_size= batch_byte_size;
-  req->first_batch_size= first_batch_size;
+  req->first_batch_size= batch_size;
 
   /**
    * Set keyinfo, nodisk and distribution key flags in 

=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp	2011-11-09 14:19:23 +0000
@@ -37,7 +37,7 @@
 #define MAX_SEEK 16 
 #define MAXSTRLEN 16 
 #define MAXATTR 64
-#define MAXTABLES 64
+#define MAXTABLES 1
 #define NDB_MAXTHREADS 128
 /*
   NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
@@ -59,6 +59,16 @@ enum StartType {
   stStop 
 } ;
 
+enum RunType {
+  RunInsert,
+  RunRead,
+  RunUpdate,
+  RunDelete,
+  RunCreateTable,
+  RunDropTable,
+  RunAll
+};
+
 struct ThreadNdb
 {
   int NoOfOps;
@@ -70,6 +80,7 @@ extern "C" { static void* threadLoop(voi
 static void setAttrNames(void);
 static void setTableNames(void);
 static int readArguments(int argc, const char** argv);
+static void dropTables(Ndb* pMyNdb);
 static int createTables(Ndb*);
 static void defineOperation(NdbConnection* aTransObject, StartType aType, 
                             Uint32 base, Uint32 aIndex);
@@ -77,6 +88,8 @@ static void defineNdbRecordOperation(Thr
                             Uint32 base, Uint32 aIndex);
 static void execute(StartType aType);
 static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
+static bool executeTransLoop(ThreadNdb* pThread, StartType aType, Ndb* aNdbObject,
+                             unsigned int threadBase, int threadNo);
 static void executeCallback(int result, NdbConnection* NdbObject,
                             void* aObject);
 static bool error_handler(const NdbError & err);
@@ -92,9 +105,15 @@ ErrorData * flexAsynchErrorData;
 static NdbThread*               threadLife[NDB_MAXTHREADS];
 static int                              tNodeId;
 static int                              ThreadReady[NDB_MAXTHREADS];
+static longlong                 ThreadExecutions[NDB_MAXTHREADS];
 static StartType                ThreadStart[NDB_MAXTHREADS];
 static char                             tableName[MAXTABLES][MAXSTRLEN+1];
 static char                             attrName[MAXATTR][MAXSTRLEN+1];
+static RunType                          tRunType = RunAll;
+static int                              tStdTableNum = 0;
+static int                              tWarmupTime = 10; //Seconds
+static int                              tExecutionTime = 30; //Seconds
+static int                              tCooldownTime = 10; //Seconds
 
 // Program Parameters
 static NdbRecord * g_record[MAXTABLES];
@@ -126,9 +145,10 @@ static int
 
 #define START_REAL_TIME
 #define STOP_REAL_TIME
-#define START_TIMER { NdbTimer timer; timer.doStart();
+#define DEFINE_TIMER NdbTimer timer
+#define START_TIMER timer.doStart();
 #define STOP_TIMER timer.doStop();
-#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; 
+#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans)
 
 NDBT_Stats a_i, a_u, a_d, a_r;
 
@@ -183,6 +203,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   ThreadNdb*            pThreadData;
   int                   tLoops=0;
   int                   returnValue = NDBT_OK;
+  DEFINE_TIMER;
 
   flexAsynchErrorData = new ErrorData;
   flexAsynchErrorData->resetErrorCounters();
@@ -201,7 +222,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   ndbout << "  " << tNoOfParallelTrans;
   ndbout << " number of parallel operation per thread " << endl;
   ndbout << "  " << tNoOfTransactions << " transaction(s) per round " << endl;
-  ndbout << "  " << tNoOfLoops << " iterations " << endl;
+  if (tRunType == RunAll){
+    ndbout << "  " << tNoOfLoops << " iterations " << endl;
+  } else if (tRunType == RunRead || tRunType == RunUpdate){
+    ndbout << "  Warmup time is " << tWarmupTime << endl;
+    ndbout << "  Execution time is " << tExecutionTime << endl;
+    ndbout << "  Cooldown time is " << tCooldownTime << endl;
+  }
   ndbout << "  " << "Load Factor is " << tLoadFactor << "%" << endl;
   ndbout << "  " << tNoOfAttributes << " attributes per table " << endl;
   ndbout << "  " << tAttributeSize;
@@ -262,10 +289,20 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   if (pNdb->waitUntilReady(10000) != 0){
     ndbout << "NDB is not ready" << endl;
     ndbout << "Benchmark failed!" << endl;
-    returnValue = NDBT_FAILED;
+    return NDBT_ProgramExit(NDBT_FAILED);
   }
 
-  if(returnValue == NDBT_OK){
+  if (tRunType == RunCreateTable)
+  {
+    if (createTables(pNdb) != 0){
+      returnValue = NDBT_FAILED;
+    }
+  }
+  else if (tRunType == RunDropTable)
+  {
+    dropTables(pNdb);
+  }
+  else if(returnValue == NDBT_OK){
     if (createTables(pNdb) != 0){
       returnValue = NDBT_FAILED;
     }
@@ -282,14 +319,15 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
     }
   }
 
-  if(returnValue == NDBT_OK){
+  if(returnValue == NDBT_OK &&
+     tRunType != RunCreateTable &&
+     tRunType != RunDropTable){
     /****************************************************************
      *  Create NDB objects.                                   *
      ****************************************************************/
     resetThreads();
     for (Uint32 i = 0; i < tNoOfThreads ; i++) {
-      pThreadData[i].ThreadNo = i
-;
+      pThreadData[i].ThreadNo = i;
       threadLife[i] = NdbThread_Create(threadLoop,
                                        (void**)&pThreadData[i],
                                        32768,
@@ -312,76 +350,86 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
        ****************************************************************/
           
       failed = 0 ;
-
-      START_TIMER;
-      execute(stInsert);
-      STOP_TIMER;
-      a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-      PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int ci = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!" 
-                 << endl << endl;
-          ndbout << "Attempting to redo the failed transactions now..." 
-                 << endl ;
-          ndbout << "Redo attempt " << ci <<" out of " << retry_opt 
-                 << endl << endl;
-          failed = 0 ;
-          START_TIMER;
-          execute(stInsert);
-          STOP_TIMER;
-          PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          ci++;
-        }
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-        }else{
-          ndbout << endl <<"Redo attempt failed, moving on now..." << endl 
-                 << endl;
+      if (tRunType == RunAll || tRunType == RunInsert){
+        ndbout << "Executing inserts" << endl;
+        START_TIMER;
+        execute(stInsert);
+        STOP_TIMER;
+      }
+      if (tRunType == RunAll){
+        a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int ci = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!" 
+                   << endl << endl;
+            ndbout << "Attempting to redo the failed transactions now..." 
+                   << endl ;
+            ndbout << "Redo attempt " << ci <<" out of " << retry_opt 
+                   << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stInsert);
+            STOP_TIMER;
+            PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            ci++;
+          }
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+          }else{
+            ndbout << endl <<"Redo attempt failed, moving on now..." << endl 
+                   << endl;
+          }//if
         }//if
-      }//if
-          
+      }//if  
       /****************************************************************
        * Perform read.                                                *
        ****************************************************************/
       
       failed = 0 ;
 
-      for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
-      {
-        START_TIMER;
-        execute(stRead);
-        STOP_TIMER;
-        a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-        PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-      }
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cr = 1;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<<endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now..." << endl;
-          ndbout << endl;
-          ndbout <<"Redo attempt " << cr <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
+      if (tRunType == RunAll || tRunType == RunRead){
+        for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+        {
+          ndbout << "Executing reads" << endl;
           START_TIMER;
           execute(stRead);
           STOP_TIMER;
-          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cr++ ;
-        }//while
-        if(0 == failed ) {
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
-        }else{
-          ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+          if (tRunType == RunAll){
+            a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+          }//if
+        }//for
+      }//if
+
+      if (tRunType == RunAll){
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cr = 1;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<<endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now..." << endl;
+            ndbout << endl;
+            ndbout <<"Redo attempt " << cr <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stRead);
+            STOP_TIMER;
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cr++ ;
+          }//while
+          if(0 == failed ) {
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+          }else{
+            ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+          }//if
         }//if
       }//if
           
@@ -391,35 +439,40 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
        ****************************************************************/
       
       failed = 0 ;
-          
-      START_TIMER;
-      execute(stUpdate);
-      STOP_TIMER;
-      a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-      PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cu = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<<endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now..." << endl;
-          ndbout << endl <<"Redo attempt " << cu <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
-          START_TIMER;
-          execute(stUpdate);
-          STOP_TIMER;
-          PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cu++ ;
-        }//while
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-        } else {
-          ndbout << endl;
-          ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+
+      if (tRunType == RunAll || tRunType == RunUpdate){
+        ndbout << "Executing updates" << endl;
+        START_TIMER;
+        execute(stUpdate);
+        STOP_TIMER;
+      }//if
+      if (tRunType == RunAll){
+        a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cu = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<<endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now..." << endl;
+            ndbout << endl <<"Redo attempt " << cu <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stUpdate);
+            STOP_TIMER;
+            PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cu++ ;
+          }//while
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+          } else {
+            ndbout << endl;
+            ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+          }//if
         }//if
       }//if
           
@@ -428,38 +481,41 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
        ****************************************************************/
       
       failed = 0 ;
-          
-      for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
-      {
-        START_TIMER;
-        execute(stRead);
-        STOP_TIMER;
-        a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-        PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-      }        
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cr2 = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<<endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now..." << endl;
-          ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
+
+      if (tRunType == RunAll){
+        for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+        {
+          ndbout << "Executing reads" << endl;
           START_TIMER;
           execute(stRead);
           STOP_TIMER;
+          a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
           PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cr2++ ;
-        }//while
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-        }else{
-          ndbout << endl;
-          ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+        }        
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cr2 = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<<endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now..." << endl;
+            ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stRead);
+            STOP_TIMER;
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cr2++ ;
+          }//while
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+          }else{
+            ndbout << endl;
+            ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+          }//if
         }//if
       }//if
           
@@ -470,34 +526,39 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       
       failed = 0 ;
           
-      START_TIMER;
-      execute(stDelete);
-      STOP_TIMER;
-      a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-      PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cd = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<< endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now:" << endl ;
-          ndbout << endl <<"Redo attempt " << cd <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
-          START_TIMER;
-          execute(stDelete);
-          STOP_TIMER;
-          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cd++ ;
-        }//while
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
-        }else{
-          ndbout << endl;
-          ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+      if (tRunType == RunAll || tRunType == RunDelete){
+        ndbout << "Executing deletes" << endl;
+        START_TIMER;
+        execute(stDelete);
+        STOP_TIMER;
+      }//if
+      if (tRunType == RunAll){
+        a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cd = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<< endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now:" << endl ;
+            ndbout << endl <<"Redo attempt " << cd <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stDelete);
+            STOP_TIMER;
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cd++ ;
+          }//while
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+          }else{
+            ndbout << endl;
+            ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+          }//if
         }//if
       }//if
           
@@ -516,17 +577,61 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       NdbThread_WaitFor(threadLife[i], &tmp);
       NdbThread_Destroy(&threadLife[i]);
     }
-  } 
+  }
+
+  if (tRunType == RunAll)
+  {
+    dropTables(pNdb);
+  }
   delete [] pThreadData;
   delete pNdb;
 
-  //printing errorCounters
-  flexAsynchErrorData->printErrorCounters(ndbout);
-
-  print("insert", a_i);
-  print("update", a_u);
-  print("delete", a_d);
-  print("read  ", a_r);
+  if (tRunType == RunAll ||
+      tRunType == RunInsert ||
+      tRunType == RunDelete ||
+      tRunType == RunUpdate ||
+      tRunType == RunRead)
+  {
+    //printing errorCounters
+    flexAsynchErrorData->printErrorCounters(ndbout);
+    if (tRunType == RunAll) {
+      print("insert", a_i);
+      print("update", a_u);
+      print("delete", a_d);
+      print("read  ", a_r);
+    }
+  }
+  if (tRunType == RunInsert ||
+      tRunType == RunRead ||
+      tRunType == RunUpdate ||
+      tRunType == RunDelete)
+  {
+    longlong total_executions = 0;
+    longlong total_transactions;
+    longlong exec_time;
+
+    if (tRunType == RunInsert || tRunType == RunDelete) {
+      total_executions = (longlong)tNoOfTransactions;
+      total_executions *= (longlong)tNoOfThreads;
+    } else {
+      for (Uint32 i = 0; i < tNoOfThreads; i++){
+        total_executions += ThreadExecutions[i];
+      }
+    }
+    total_transactions = total_executions * tNoOfParallelTrans;
+    if (tRunType == RunInsert || tRunType == RunDelete) {
+      exec_time = (longlong)timer.elapsedTime();
+    } else {
+      exec_time = (longlong)tExecutionTime * 1000;
+    }
+    ndbout << "Total number of transactions is " << total_transactions;
+    ndbout << endl;
+    ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
+
+    total_transactions = (total_transactions * 1000) / exec_time;
+    int trans_per_sec = (int)total_transactions;
+    ndbout << "Total transactions per second " << trans_per_sec << endl;
+  }
 
   delete [] g_cluster_connection;
 
@@ -551,7 +656,7 @@ threadLoop(void* ThreadData)
   localNdb = new Ndb(g_cluster_connection+(threadNo % tConnections), "TEST_DB");
   localNdb->init(1024);
   localNdb->waitUntilReady(10000);
-  unsigned int threadBase = (threadNo << 16) + tNodeId ;
+  unsigned int threadBase = (threadNo << 16);
   
   for (;;){
     while (ThreadStart[threadNo] == stIdle) {
@@ -565,8 +670,14 @@ threadLoop(void* ThreadData)
 
     tType = ThreadStart[threadNo];
     ThreadStart[threadNo] = stIdle;
-    if(!executeThread(tabThread, tType, localNdb, threadBase)){
-      break;
+    if (tRunType == RunAll || tRunType == RunInsert || tRunType == RunDelete){
+      if(!executeThread(tabThread, tType, localNdb, threadBase)){
+        break;
+      }
+    } else {
+      if(!executeTransLoop(tabThread, tType, localNdb, threadBase, threadNo)){
+        break;
+      }
     }
     ThreadReady[threadNo] = 1;
   }//for
@@ -577,80 +688,131 @@ threadLoop(void* ThreadData)
   return NULL;
 }//threadLoop()
 
-static 
-bool
-executeThread(ThreadNdb* pThread, 
-	      StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+static int error_count = 0;
 
+static bool
+executeTrans(ThreadNdb* pThread,
+             StartType aType,
+             Ndb* aNdbObject,
+             unsigned int threadBase,
+             unsigned int i)
+{
   NdbConnection* tConArray[1024];
   unsigned int tBase;
   unsigned int tBase2;
 
-  unsigned int extraLoops= 0; // (aType == stRead) ? 100000 : 0;
-
-  for (unsigned int ex= 0; ex < (1 + extraLoops); ex++)
-  {
-    for (unsigned int i = 0; i < tNoOfTransactions; i++) {
-      if (tLocal == false) {
-        tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
-      } else {
-        tBase = i * tNoOfParallelTrans * MAX_SEEK;
-      }//if
-      START_REAL_TIME;
-      for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
-        if (tLocal == false) {
-          tBase2 = tBase + (j * tNoOfOpsPerTrans);
-        } else {
-          tBase2 = tBase + (j * MAX_SEEK);
-          tBase2 = getKey(threadBase, tBase2);
-        }//if
-        if (startTransGuess == true) {
-	  union {
-            Uint64 Tkey64;
-            Uint32 Tkey32[2];
-	  };
-          Tkey32[0] = threadBase;
-          Tkey32[1] = tBase2;
-          tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
-                                                      (const char*)&Tkey64, //Main PKey
-                                                      (Uint32)4);           //Key Length
-        } else {
-          tConArray[j] = aNdbObject->startTransaction();
-        }//if
-        if (tConArray[j] == NULL && 
-            !error_handler(aNdbObject->getNdbError()) ){
-          ndbout << endl << "Unable to recover! Quiting now" << endl ;
-          return false;
-        }//if
-        
-        for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
-          //-------------------------------------------------------
-          // Define the operation, but do not execute it yet.
-          //-------------------------------------------------------
-          if (tNdbRecord)
-            defineNdbRecordOperation(pThread, 
-                                     tConArray[j], aType, threadBase,(tBase2+k));
-          else
-            defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
-        }//for
-        
-        tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
-      }//for
-      STOP_REAL_TIME;
+  if (tLocal == false) {
+    tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
+  } else {
+    tBase = i * tNoOfParallelTrans * MAX_SEEK;
+  }//if
+  START_REAL_TIME;
+  for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
+    if (tLocal == false) {
+      tBase2 = tBase + (j * tNoOfOpsPerTrans);
+    } else {
+      tBase2 = tBase + (j * MAX_SEEK);
+      tBase2 = getKey(threadBase, tBase2);
+    }//if
+    if (startTransGuess == true) {
+      union {
+        Uint64 Tkey64;
+        Uint32 Tkey32[2];
+      };
+      Tkey32[0] = threadBase;
+      Tkey32[1] = tBase2;
+      tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
+                                                  (const char*)&Tkey64, //Main PKey
+                                                  (Uint32)4);           //Key Length
+    } else {
+      tConArray[j] = aNdbObject->startTransaction();
+    }//if
+    if (tConArray[j] == NULL){
+      error_handler(aNdbObject->getNdbError());
+      ndbout << endl << "Unable to recover! Quiting now" << endl ;
+      return false;
+    }//if
+    
+    for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
       //-------------------------------------------------------
-      // Now we have defined a set of operations, it is now time
-      // to execute all of them.
+      // Define the operation, but do not execute it yet.
       //-------------------------------------------------------
-      int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
-      while (unsigned(Tcomp) < tNoOfParallelTrans) {
-        int TlocalComp = aNdbObject->pollNdb(3000, 0);
-        Tcomp += TlocalComp;
-      }//while
-      for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
-        aNdbObject->closeTransaction(tConArray[j]);
-      }//for
+      if (tNdbRecord)
+        defineNdbRecordOperation(pThread, 
+                                 tConArray[j], aType, threadBase,(tBase2+k));
+      else
+        defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
     }//for
-  } // for
+    
+    tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+  }//for
+  STOP_REAL_TIME;
+  //-------------------------------------------------------
+  // Now we have defined a set of operations, it is now time
+  // to execute all of them.
+  //-------------------------------------------------------
+  int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
+  while (unsigned(Tcomp) < tNoOfParallelTrans) {
+    int TlocalComp = aNdbObject->pollNdb(3000, 0);
+    Tcomp += TlocalComp;
+  }//while
+  for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
+    if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
+      error_count++;
+      ndbout << "i = " << i << ", j = " << j << ", error = ";
+      ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
+      ndbout << hex << threadBase << endl;
+    }
+    aNdbObject->closeTransaction(tConArray[j]);
+  }//for
+  return true;
+}
+
+static 
+bool
+executeTransLoop(ThreadNdb* pThread, 
+                 StartType aType,
+                 Ndb* aNdbObject,
+                 unsigned int threadBase,
+                 int threadNo) {
+  bool continue_flag = true;
+  int time_expired;
+  longlong executions = 0;
+  unsigned int i = 0;
+  DEFINE_TIMER;
+
+  ThreadExecutions[threadNo] = 0;
+  START_TIMER;
+  do
+  {
+    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i++))
+      return false;
+    STOP_TIMER;
+    time_expired = timer.elapsedTime() / 1000;
+    if (time_expired < tWarmupTime)
+      ; //Do nothing
+    else if (time_expired < (tWarmupTime + tExecutionTime)){
+      executions++; //Count measurement
+    }
+    else if (time_expired < (tWarmupTime + tExecutionTime + tCooldownTime))
+      ; //Do nothing
+    else
+      continue_flag = false; //Time expired
+    if (i == tNoOfTransactions) /* Make sure the record exists */
+      i = 0;
+  } while (continue_flag);
+  ThreadExecutions[threadNo] = executions;
+  return true;
+}//executeTransLoop()
+
+static 
+bool
+executeThread(ThreadNdb* pThread, 
+	      StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+  for (unsigned int i = 0; i < tNoOfTransactions; i++) {
+    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i))
+      return false;
+  }//for
   return true;
 }//executeThread()
 
@@ -880,8 +1042,20 @@ static void setTableNames()
       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%u", i, 
                (unsigned)(NdbTick_CurrentMillisecond()+rand()));
     } else {
-      BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
+      BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", tStdTableNum);
     }
+    ndbout << "Using table name " << tableName[0] << endl;
+  }
+}
+
+static void
+dropTables(Ndb* pMyNdb)
+{
+  int i;
+  for (i = 0; i < MAXTABLES; i++)
+  {
+    ndbout << "Dropping table " << tableName[i] << "..." << endl;
+    pMyNdb->getDictionary()->dropTable(tableName[i]);
   }
 }
 
@@ -893,8 +1067,8 @@ createTables(Ndb* pMyNdb){
   NdbSchemaOp           *MySchemaOp;
   int                   check;
 
-  if (theTableCreateFlag == 0) {
-    for(int i=0; i < 1 ;i++) {
+  if (theTableCreateFlag == 0 || tRunType == RunCreateTable) {
+    for(int i=0; i < MAXTABLES ;i++) {
       ndbout << "Creating " << tableName[i] << "..." << endl;
       MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
       
@@ -953,31 +1127,35 @@ createTables(Ndb* pMyNdb){
         return -1;
       
       NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+    }
+  }
+  if (tNdbRecord)
+  {
+    for(int i=0; i < MAXTABLES ;i++) {
+      NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+      const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
 
-      if (tNdbRecord)
+      if (pTab == NULL){
+        error_handler(pDict->getNdbError());
+        return -1;
+      }
+      int off = 0;
+      Vector<NdbDictionary::RecordSpecification> spec;
+      for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
       {
-	NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
-	const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
-	
-	int off = 0;
-	Vector<NdbDictionary::RecordSpecification> spec;
-	for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
-	{
-	  NdbDictionary::RecordSpecification r0;
-	  r0.column = pTab->getColumn(j);
-	  r0.offset = off;
-	  off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
-	  spec.push_back(r0);
-	}
-	g_record[i] = 
+        NdbDictionary::RecordSpecification r0;
+        r0.column = pTab->getColumn(j);
+        r0.offset = off;
+        off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
+        spec.push_back(r0);
+      }
+      g_record[i] = 
 	  pDict->createRecord(pTab, spec.getBase(), 
 			      spec.size(),
 			      sizeof(NdbDictionary::RecordSpecification));
-	assert(g_record[i]);
-      }
+      assert(g_record[i]);
     }
   }
-  
   return 0;
 }
 
@@ -996,6 +1174,14 @@ bool error_handler(const NdbError & err)
   return false ; // return false to abort
 }
 
+static void
+setAggregateRun(void)
+{
+  tNoOfLoops = 1;
+  tExtraReadLoop = 0;
+  theTableCreateFlag = 1;
+}
+
 static
 int 
 readArguments(int argc, const char** argv){
@@ -1110,6 +1296,43 @@ readArguments(int argc, const char** arg
       tExtraReadLoop = atoi(argv[i+1]);
     } else if (strcmp(argv[i], "-con") == 0){
       tConnections = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-insert") == 0){
+      setAggregateRun();
+      tRunType = RunInsert;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-read") == 0){
+      setAggregateRun();
+      tRunType = RunRead;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-update") == 0){
+      setAggregateRun();
+      tRunType = RunUpdate;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-delete") == 0){
+      setAggregateRun();
+      tRunType = RunDelete;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-create_table") == 0){
+      tRunType = RunCreateTable;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-drop_table") == 0){
+      tRunType = RunDropTable;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-warmup_time") == 0){
+      tWarmupTime = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-execution_time") == 0){
+      tExecutionTime = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-cooldown_time") == 0){
+      tCooldownTime = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-table") == 0){
+      tStdTableNum = atoi(argv[i+1]);
+      theStdTableNameFlag = 1;
     } else {
       return -1;
     }
@@ -1131,7 +1354,6 @@ readArguments(int argc, const char** arg
 static
 void
 input_error(){
-  
   ndbout_c("FLEXASYNCH");
   ndbout_c("   Perform benchmark of insert, update and delete transactions");
   ndbout_c(" ");
@@ -1156,7 +1378,18 @@ input_error(){
   ndbout_c("   -force Force send when communicating");
   ndbout_c("   -non_adaptive Send at a 10 millisecond interval");
   ndbout_c("   -local Number of part, only use keys in one part out of 16");
-  ndbout_c("   -ndbrecord");
+  ndbout_c("   -ndbrecord Use NDB Record");
+  ndbout_c("   -r Number of extra loops");
+  ndbout_c("   -insert Only run inserts on standard table");
+  ndbout_c("   -read Only run reads on standard table");
+  ndbout_c("   -update Only run updates on standard table");
+  ndbout_c("   -delete Only run deletes on standard table");
+  ndbout_c("   -create_table Only run Create Table of standard table");
+  ndbout_c("   -drop_table Only run Drop Table on standard table");
+  ndbout_c("   -warmup_time Warmup Time before measurement starts");
+  ndbout_c("   -execution_time Execution Time where measurement is done");
+  ndbout_c("   -cooldown_time Cooldown time after measurement completed");
+  ndbout_c("   -table Number of standard table, default 0");
 }
   
 template class Vector<NdbDictionary::RecordSpecification>;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4330 to 4331) Jonas Oreland11 Nov