List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 14 2011 2:26pm
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3581 to 3582)
View as plain text  
 3582 Ole John Aske	2011-11-14 [merge]
      Merge mysql-5.1-telco-7.0 -> mysql-5.1-telco-7.0-spj-scan-scan

    added:
      sql/ndb_component.cc
      sql/ndb_component.h
      sql/ndb_util_thread.h
    modified:
      libmysqld/Makefile.am
      mysql-test/suite/ndb/r/ndb_condition_pushdown.result
      mysql-test/suite/ndb/t/ndb_condition_pushdown.test
      sql/Makefile.am
      sql/ha_ndb_index_stat.cc
      sql/ha_ndb_index_stat.h
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/ha_ndbcluster_cond.cc
      sql/ha_ndbcluster_cond.h
      storage/ndb/CMakeLists.txt
      storage/ndb/include/kernel/ndb_limits.h
      storage/ndb/include/mgmapi/mgmapi_config_parameters.h
      storage/ndb/include/ndb_version.h.in
      storage/ndb/src/common/portlib/NdbThread.c
      storage/ndb/src/common/util/NdbPack.cpp
      storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
      storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
      storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp
      storage/ndb/src/kernel/vm/DLFifoList.hpp
      storage/ndb/src/kernel/vm/DLHashTable.hpp
      storage/ndb/src/kernel/vm/GlobalData.hpp
      storage/ndb/src/kernel/vm/SimulatedBlock.hpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/pc.hpp
      storage/ndb/test/ndbapi/flexAsynch.cpp
 3581 Ole John Aske	2011-11-09 [merge]
      Merge telco-7.0 -> telco-7.0-spj-scan-scan

    modified:
      mysql-test/suite/ndb/r/ndb_index_stat.result
      sql/ha_ndb_index_stat.cc
      storage/ndb/include/ndbapi/NdbReceiver.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
      storage/ndb/src/ndbapi/NdbRecord.hpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am	2011-09-09 13:13:52 +0000
+++ b/libmysqld/Makefile.am	2011-11-14 14:25:20 +0000
@@ -49,7 +49,8 @@ sqlsources = derror.cc field.cc field_co
 	     ha_ndbcluster.cc ha_ndbcluster_cond.cc ha_ndbcluster_push.cc \
 	ha_ndbcluster_connection.cc ha_ndbinfo.cc \
 	ha_ndb_index_stat.cc \
-	ha_ndbcluster_binlog.cc ndb_conflict_trans.cc ha_partition.cc \
+	ha_ndbcluster_binlog.cc ndb_conflict_trans.cc ndb_component.cc \
+        ha_partition.cc \
 	handler.cc sql_handler.cc \
 	hostname.cc init.cc password.c \
 	item.cc item_buff.cc item_cmpfunc.cc item_create.cc \

=== modified file 'mysql-test/suite/ndb/r/ndb_condition_pushdown.result'
--- a/mysql-test/suite/ndb/r/ndb_condition_pushdown.result	2011-11-09 09:53:04 +0000
+++ b/mysql-test/suite/ndb/r/ndb_condition_pushdown.result	2011-11-14 14:25:20 +0000
@@ -2148,6 +2148,24 @@ id	select_type	table	type	possible_keys	
 select * from t where x not like "aa?";
 pk	x
 0	a
+select * from t where x like "%a%";
+pk	x
+0	a
+select * from t where x not like "%b%";
+pk	x
+0	a
+select * from t where x like replace(concat("%", "b%"),"b","a");
+pk	x
+0	a
+select * from t where x not like replace(concat("%", "a%"),"a","b");
+pk	x
+0	a
+select * from t where x like concat("%", replace("b%","b","a"));
+pk	x
+0	a
+select * from t where x not like concat("%", replace("a%","a","b"));
+pk	x
+0	a
 drop table t;
 create table t (pk int primary key, x int) engine = ndb;
 insert into t values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5);
@@ -2351,6 +2369,22 @@ select * from mytable where s like conca
 i	s
 0	Text Hej
 1	xText aaja
+select * from mytable where s like replace(concat("%Xext","%"),"X", "T") order by i;
+i	s
+0	Text Hej
+1	xText aaja
+select * from mytable where s not like replace(concat("%Text","%"),"T", "X") order by i;
+i	s
+0	Text Hej
+1	xText aaja
+select * from mytable where s like concat(replace("%Xext","X", "T"),"%") order by i;
+i	s
+0	Text Hej
+1	xText aaja
+select * from mytable where s not like concat(replace("%Text","T", "X"),"%") order by i;
+i	s
+0	Text Hej
+1	xText aaja
 drop table mytable;
 set engine_condition_pushdown = @old_ecpd;
 DROP TABLE t1,t2,t3,t4,t5;

=== modified file 'mysql-test/suite/ndb/t/ndb_condition_pushdown.test'
--- a/mysql-test/suite/ndb/t/ndb_condition_pushdown.test	2011-11-09 09:53:04 +0000
+++ b/mysql-test/suite/ndb/t/ndb_condition_pushdown.test	2011-11-14 14:25:20 +0000
@@ -2257,6 +2257,12 @@ explain select * from t where x like "aa
 select * from t where x like "aa?";
 explain select * from t where x not like "aa?";
 select * from t where x not like "aa?";
+select * from t where x like "%a%";
+select * from t where x not like "%b%";
+select * from t where x like replace(concat("%", "b%"),"b","a");
+select * from t where x not like replace(concat("%", "a%"),"a","b");
+select * from t where x like concat("%", replace("b%","b","a"));
+select * from t where x not like concat("%", replace("a%","a","b"));
 drop table t;
 
 # Bug#57735 BETWEEN in pushed condition cause garbage to be read in ::unpack_record() 
@@ -2392,6 +2398,12 @@ set engine_condition_pushdown=1;
  select * from mytable where s like concat("%Text","%") and s not like "%Text%" order by i;
  select * from mytable where s like concat("%Text","%") and s not like "%Text1%" order by i;
 
+ select * from mytable where s like replace(concat("%Xext","%"),"X", "T") order by i;
+ select * from mytable where s not like replace(concat("%Text","%"),"T", "X") order by i;
+ select * from mytable where s like concat(replace("%Xext","X", "T"),"%") order by i;
+ select * from mytable where s not like concat(replace("%Text","T", "X"),"%") order by i;
+
+
 drop table mytable;
 
 set engine_condition_pushdown = @old_ecpd;

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2011-09-09 13:13:52 +0000
+++ b/sql/Makefile.am	2011-11-14 14:25:20 +0000
@@ -65,6 +65,8 @@ noinst_HEADERS =	item.h item_func.h item
 			ha_ndb_index_stat.h \
                         ndb_mi.h \
 			ndb_conflict_trans.h \
+                        ndb_component.h \
+                        ndb_util_thread.h \
 			ha_partition.h rpl_constants.h \
 			debug_sync.h \
 			opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
@@ -144,7 +146,8 @@ libndb_la_SOURCES=	ha_ndbcluster.cc \
 			ha_ndb_index_stat.cc \
 			ha_ndbinfo.cc \
 			ndb_mi.cc \
-			ndb_conflict_trans.cc
+			ndb_conflict_trans.cc \
+                        ndb_component.cc
 
 gen_lex_hash_SOURCES =	gen_lex_hash.cc
 gen_lex_hash_LDFLAGS =  @NOINST_LDFLAGS@

=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc	2011-11-08 21:43:36 +0000
+++ b/sql/ha_ndb_index_stat.cc	2011-11-10 10:35:09 +0000
@@ -24,6 +24,16 @@
 #include <mysql/plugin.h>
 #include <ctype.h>
 
+/* from other files */
+extern struct st_ndb_status g_ndb_status;
+extern pthread_mutex_t ndbcluster_mutex;
+
+/* these have to live in ha_ndbcluster.cc */
+extern bool ndb_index_stat_get_enable(THD *thd);
+extern const char* g_ndb_status_index_stat_status;
+extern long g_ndb_status_index_stat_cache_query;
+extern long g_ndb_status_index_stat_cache_clean;
+
 // Do we have waiter...
 static bool ndb_index_stat_waiter= false;
 
@@ -40,6 +50,28 @@ set_thd_ndb(THD *thd, Thd_ndb *thd_ndb)
 typedef NdbDictionary::Table NDBTAB;
 typedef NdbDictionary::Index NDBINDEX;
 
+/** ndb_index_stat_thread */
+Ndb_index_stat_thread::Ndb_index_stat_thread()
+  : running(-1)
+{
+  pthread_mutex_init(&LOCK, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND, NULL);
+  pthread_cond_init(&COND_ready, NULL);
+  pthread_mutex_init(&list_mutex, MY_MUTEX_INIT_FAST);
+  pthread_mutex_init(&stat_mutex, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&stat_cond, NULL);
+}
+
+Ndb_index_stat_thread::~Ndb_index_stat_thread()
+{
+  pthread_mutex_destroy(&LOCK);
+  pthread_cond_destroy(&COND);
+  pthread_cond_destroy(&COND_ready);
+  pthread_mutex_destroy(&list_mutex);
+  pthread_mutex_destroy(&stat_mutex);
+  pthread_cond_destroy(&stat_cond);
+}
+
 struct Ndb_index_stat {
   enum {
     LT_Undef= 0,
@@ -912,8 +944,8 @@ ndb_index_stat_get_share(NDB_SHARE *shar
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
 
   pthread_mutex_lock(&share->mutex);
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   time_t now= ndb_index_stat_time();
   err_out= 0;
 
@@ -950,8 +982,8 @@ ndb_index_stat_get_share(NDB_SHARE *shar
   }
   while (0);
 
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
   pthread_mutex_unlock(&share->mutex);
   return st;
 }
@@ -966,7 +998,7 @@ ndb_index_stat_free(Ndb_index_stat *st)
 {
   DBUG_ENTER("ndb_index_stat_free");
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   NDB_SHARE *share= st->share;
   assert(share != 0);
 
@@ -1003,10 +1035,10 @@ ndb_index_stat_free(Ndb_index_stat *st)
   assert(found);
   share->index_stat_list= st_head;
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
   DBUG_VOID_RETURN;
 }
 
@@ -1015,7 +1047,7 @@ ndb_index_stat_free(NDB_SHARE *share)
 {
   DBUG_ENTER("ndb_index_stat_free");
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   Ndb_index_stat *st;
   while ((st= share->index_stat_list) != 0)
   {
@@ -1028,10 +1060,10 @@ ndb_index_stat_free(NDB_SHARE *share)
     assert(!st->to_delete);
     st->to_delete= true;
   }
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
   DBUG_VOID_RETURN;
 }
 
@@ -1042,7 +1074,7 @@ ndb_index_stat_find_entry(int index_id, 
 {
   DBUG_ENTER("ndb_index_stat_find_entry");
   pthread_mutex_lock(&ndbcluster_mutex);
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d",
                             index_id, index_version, table_id));
 
@@ -1055,7 +1087,7 @@ ndb_index_stat_find_entry(int index_id, 
       if (st->index_id == index_id &&
           st->index_version == index_version)
       {
-        pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+        pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
         pthread_mutex_unlock(&ndbcluster_mutex);
         DBUG_RETURN(st);
       }
@@ -1063,7 +1095,7 @@ ndb_index_stat_find_entry(int index_id, 
     }
   }
 
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
   pthread_mutex_unlock(&ndbcluster_mutex);
   DBUG_RETURN(0);
 }
@@ -1137,7 +1169,7 @@ void
 ndb_index_stat_proc_new(Ndb_index_stat_proc &pr)
 {
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   const int lt= Ndb_index_stat::LT_New;
   Ndb_index_stat_list &list= ndb_index_stat_list[lt];
 
@@ -1151,10 +1183,10 @@ ndb_index_stat_proc_new(Ndb_index_stat_p
     assert(pr.lt != lt);
     ndb_index_stat_list_move(st, pr.lt);
   }
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
 }
 
 void
@@ -1189,9 +1221,9 @@ ndb_index_stat_proc_update(Ndb_index_sta
     assert(pr.lt != lt);
     ndb_index_stat_list_move(st, pr.lt);
     // db op so update status after each
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     glob.set_status();
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
     cnt++;
   }
   if (cnt == batch)
@@ -1204,7 +1236,7 @@ ndb_index_stat_proc_read(Ndb_index_stat_
   NdbIndexStat::Head head;
   if (st->is->read_stat(pr.ndb) == -1)
   {
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     ndb_index_stat_error(st, "read_stat", __LINE__);
     const bool force_update= st->force_update;
     ndb_index_stat_force_update(st, false);
@@ -1221,12 +1253,12 @@ ndb_index_stat_proc_read(Ndb_index_stat_
       pr.lt= Ndb_index_stat::LT_Error;
     }
 
-    pthread_cond_broadcast(&ndb_index_stat_stat_cond);
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_cond_broadcast(&ndb_index_stat_thread.stat_cond);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
     return;
   }
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   pr.now= ndb_index_stat_time();
   st->is->get_head(head);
   st->load_time= head.m_loadTime;
@@ -1239,8 +1271,8 @@ ndb_index_stat_proc_read(Ndb_index_stat_
   ndb_index_stat_cache_move(st);
   st->cache_clean= false;
   pr.lt= Ndb_index_stat::LT_Idle;
-  pthread_cond_broadcast(&ndb_index_stat_stat_cond);
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_cond_broadcast(&ndb_index_stat_thread.stat_cond);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 void
@@ -1263,9 +1295,9 @@ ndb_index_stat_proc_read(Ndb_index_stat_
     assert(pr.lt != lt);
     ndb_index_stat_list_move(st, pr.lt);
     // db op so update status after each
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     glob.set_status();
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
     cnt++;
   }
   if (cnt == batch)
@@ -1329,7 +1361,7 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
   uint batch= opt.get(Ndb_index_stat_opt::Iidle_batch);
   {
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     const Ndb_index_stat_glob &glob= ndb_index_stat_glob;
     const int lt_update= Ndb_index_stat::LT_Update;
     const Ndb_index_stat_list &list_update= ndb_index_stat_list[lt_update];
@@ -1338,7 +1370,7 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
       // probably there is a force update waiting on Idle list
       batch= ~0;
     }
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   }
   // entry may be moved to end of this list
   if (batch > list.count)
@@ -1358,9 +1390,9 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
     cnt++;
   }
   // full batch does not set pr.busy
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 void
@@ -1417,9 +1449,9 @@ ndb_index_stat_proc_check(Ndb_index_stat
     assert(pr.lt != lt);
     ndb_index_stat_list_move(st, pr.lt);
     // db op so update status after each
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     glob.set_status();
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
     cnt++;
   }
   if (cnt == batch)
@@ -1453,9 +1485,9 @@ ndb_index_stat_proc_evict(Ndb_index_stat
   ndb_index_stat_cache_move(st);
   ndb_index_stat_cache_clean(st);
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 bool
@@ -1564,10 +1596,10 @@ ndb_index_stat_proc_delete(Ndb_index_sta
     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
 
     // adjust global counters at drop
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     ndb_index_stat_force_update(st, false);
     ndb_index_stat_no_stats(st, false);
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 
     ndb_index_stat_proc_evict(pr, st);
     ndb_index_stat_list_remove(st);
@@ -1578,9 +1610,9 @@ ndb_index_stat_proc_delete(Ndb_index_sta
   if (cnt == batch)
     pr.busy= true;
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 void
@@ -1640,9 +1672,9 @@ ndb_index_stat_proc_error(Ndb_index_stat
     cnt++;
   }
   // full batch does not set pr.busy
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 void
@@ -1715,9 +1747,9 @@ ndb_index_stat_proc_event(Ndb_index_stat
       glob.event_miss++;
     }
   }
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 }
 
 /* Control options */
@@ -1731,11 +1763,11 @@ ndb_index_stat_proc_control(Ndb_index_st
   /* Request to zero accumulating counters */
   if (opt.get(Ndb_index_stat_opt::Izero_total) == true)
   {
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     glob.zero_total();
     glob.set_status();
     opt.set(Ndb_index_stat_opt::Izero_total, false);
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   }
 }
 
@@ -1829,10 +1861,10 @@ ndb_index_stat_list_verify(int lt)
 void
 ndb_index_stat_list_verify()
 {
-  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.list_mutex);
   for (int lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
     ndb_index_stat_list_verify(lt);
-  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.list_mutex);
 }
 
 void
@@ -1891,9 +1923,9 @@ ndb_index_stat_end()
    * in LT_Delete.  The first two steps here should be unnecessary.
    */
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   ndb_index_stat_allow(0);
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 
   int lt;
   for (lt= 1; lt < Ndb_index_stat::LT_Count; lt++)
@@ -2029,14 +2061,13 @@ ndb_index_stat_stop_listener(Ndb_index_s
   DBUG_RETURN(0);
 }
 
-pthread_handler_t
-ndb_index_stat_thread_func(void *arg __attribute__((unused)))
+void
+Ndb_index_stat_thread::do_run()
 {
   THD *thd; /* needs to be first for thread_stack */
   struct timespec abstime;
   Thd_ndb *thd_ndb= NULL;
 
-  my_thread_init();
   DBUG_ENTER("ndb_index_stat_thread_func");
 
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
@@ -2046,18 +2077,16 @@ ndb_index_stat_thread_func(void *arg __a
   have_listener= false;
 
   // wl4124_todo remove useless stuff copied from utility thread
- 
-  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+
+  pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
 
   thd= new THD; /* note that contructor of THD uses DBUG_ */
   if (thd == NULL)
   {
     my_errno= HA_ERR_OUT_OF_MEM;
-    DBUG_RETURN(NULL);
+    DBUG_VOID_RETURN;
   }
   THD_CHECK_SENTRY(thd);
-  pthread_detach_this_thread();
-  ndb_index_stat_thread= pthread_self();
 
   thd->thread_stack= (char*)&thd; /* remember where our stack is */
   if (thd->store_globals())
@@ -2080,9 +2109,9 @@ ndb_index_stat_thread_func(void *arg __a
   thd->update_charset();
 
   /* Signal successful initialization */
-  ndb_index_stat_thread_running= 1;
-  pthread_cond_signal(&COND_ndb_index_stat_ready);
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  ndb_index_stat_thread.running= 1;
+  pthread_cond_signal(&ndb_index_stat_thread.COND_ready);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
 
   /*
     wait for mysql server to start
@@ -2096,7 +2125,7 @@ ndb_index_stat_thread_func(void *arg __a
     if (ndbcluster_terminating)
     {
       mysql_mutex_unlock(&LOCK_server_started);
-      pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+      pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
       goto ndb_index_stat_thread_end;
     }
   }
@@ -2105,21 +2134,21 @@ ndb_index_stat_thread_func(void *arg __a
   /*
     Wait for cluster to start
   */
-  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+  pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
   while (!g_ndb_status.cluster_node_id && (ndbcluster_hton->slot != ~(uint)0))
   {
     /* ndb not connected yet */
-    pthread_cond_wait(&COND_ndb_index_stat_thread, &LOCK_ndb_index_stat_thread);
+    pthread_cond_wait(&ndb_index_stat_thread.COND, &ndb_index_stat_thread.LOCK);
     if (ndbcluster_terminating)
       goto ndb_index_stat_thread_end;
   }
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
 
   /* Get instance used for sys objects check and create */
   if (!(pr.is_util= new NdbIndexStat))
   {
     sql_print_error("Could not allocate NdbIndexStat is_util object");
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     goto ndb_index_stat_thread_end;
   }
 
@@ -2127,7 +2156,7 @@ ndb_index_stat_thread_func(void *arg __a
   if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb(thd)))
   {
     sql_print_error("Could not allocate Thd_ndb object");
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     goto ndb_index_stat_thread_end;
   }
   set_thd_ndb(thd, thd_ndb);
@@ -2136,19 +2165,19 @@ ndb_index_stat_thread_func(void *arg __a
   {
     sql_print_error("Could not change index stats thd_ndb database to %s",
                     NDB_INDEX_STAT_DB);
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     goto ndb_index_stat_thread_end;
   }
   pr.ndb= thd_ndb->ndb;
 
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   ndb_index_stat_allow(1);
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 
   /* Fill in initial status variable */
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   glob.set_status();
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
 
   bool enable_ok;
   enable_ok= false;
@@ -2156,10 +2185,10 @@ ndb_index_stat_thread_func(void *arg __a
   set_timespec(abstime, 0);
   for (;;)
   {
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     if (!ndbcluster_terminating && ndb_index_stat_waiter == false) {
-      int ret= pthread_cond_timedwait(&COND_ndb_index_stat_thread,
-                                      &LOCK_ndb_index_stat_thread,
+      int ret= pthread_cond_timedwait(&ndb_index_stat_thread.COND,
+                                      &ndb_index_stat_thread.LOCK,
                                       &abstime);
       const char* reason= ret == ETIMEDOUT ? "timed out" : "wake up";
       (void)reason; // USED
@@ -2168,7 +2197,7 @@ ndb_index_stat_thread_func(void *arg __a
     if (ndbcluster_terminating) /* Shutting down server */
       goto ndb_index_stat_thread_end;
     ndb_index_stat_waiter= false;
-    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
 
     /* const bool enable_ok_new= THDVAR(NULL, index_stat_enable); */
     const bool enable_ok_new= ndb_index_stat_get_enable(NULL);
@@ -2229,9 +2258,9 @@ ndb_index_stat_thread_func(void *arg __a
     glob.th_enable= enable_ok;
     glob.th_busy= pr.busy;
     glob.th_loop= msecs;
-    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
     glob.set_status();
-    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+    pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   }
 
 ndb_index_stat_thread_end:
@@ -2257,15 +2286,12 @@ ndb_index_stat_thread_fail:
   delete thd;
   
   /* signal termination */
-  ndb_index_stat_thread_running= 0;
-  pthread_cond_signal(&COND_ndb_index_stat_ready);
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  ndb_index_stat_thread.running= 0;
+  pthread_cond_signal(&ndb_index_stat_thread.COND_ready);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
   DBUG_PRINT("exit", ("ndb_index_stat_thread"));
 
   DBUG_LEAVE;
-  my_thread_end();
-  pthread_exit(0);
-  return NULL;
 }
 
 /* Optimizer queries */
@@ -2291,7 +2317,7 @@ ndb_index_stat_wait(Ndb_index_stat *st,
   DBUG_ENTER("ndb_index_stat_wait");
 
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_lock(&ndb_index_stat_thread.stat_mutex);
   int err= 0;
   uint count= 0;
   struct timespec abstime;
@@ -2338,13 +2364,13 @@ ndb_index_stat_wait(Ndb_index_stat *st,
       break;
     DBUG_PRINT("index_stat", ("st %s wait count:%u",
                               st->id, ++count));
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     ndb_index_stat_waiter= true;
-    pthread_cond_signal(&COND_ndb_index_stat_thread);
-    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    pthread_cond_signal(&ndb_index_stat_thread.COND);
+    pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
     set_timespec(abstime, 1);
-    ret= pthread_cond_timedwait(&ndb_index_stat_stat_cond,
-                                &ndb_index_stat_stat_mutex,
+    ret= pthread_cond_timedwait(&ndb_index_stat_thread.stat_cond,
+                                &ndb_index_stat_thread.stat_mutex,
                                 &abstime);
     if (ret != 0 && ret != ETIMEDOUT)
     {
@@ -2362,7 +2388,7 @@ ndb_index_stat_wait(Ndb_index_stat *st,
     assert(glob.wait_update != 0);
     glob.wait_update--;
   }
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
+  pthread_mutex_unlock(&ndb_index_stat_thread.stat_mutex);
   if (err != 0)
   {
     DBUG_PRINT("index_stat", ("st %s wait error: %d",
@@ -2408,9 +2434,9 @@ ha_ndbcluster::ndb_index_stat_query(uint
   if (st->read_time == 0)
   {
     DBUG_PRINT("index_stat", ("no index stats"));
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
-    pthread_cond_signal(&COND_ndb_index_stat_thread);
-    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
+    pthread_cond_signal(&ndb_index_stat_thread.COND);
+    pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
     DBUG_RETURN(NdbIndexStat::NoIndexStats);
   }
 

=== modified file 'sql/ha_ndb_index_stat.h'
--- a/sql/ha_ndb_index_stat.h	2011-10-08 16:54:19 +0000
+++ b/sql/ha_ndb_index_stat.h	2011-11-10 10:35:09 +0000
@@ -15,22 +15,43 @@
    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
 */
 
-/* provides declarations only to index_stat.cc */
+#ifndef HA_NDB_INDEX_STAT_H
+#define HA_NDB_INDEX_STAT_H
 
-extern struct st_ndb_status g_ndb_status;
+#include "ndb_component.h"
 
-extern pthread_mutex_t ndbcluster_mutex;
+/* for NdbIndexScanOperation::IndexBound */
+#include <ndbapi/NdbIndexScanOperation.hpp>
 
-extern pthread_t ndb_index_stat_thread;
-extern pthread_cond_t COND_ndb_index_stat_thread;
-extern pthread_mutex_t LOCK_ndb_index_stat_thread;
-
-/* protect entry lists where needed */
-extern pthread_mutex_t ndb_index_stat_list_mutex;
-
-/* protect and signal changes in stats entries */
-extern pthread_mutex_t ndb_index_stat_stat_mutex;
-extern pthread_cond_t ndb_index_stat_stat_cond;
+/* forward declarations */
+struct st_key_range;
+typedef struct st_key_range key_range;
+struct st_key;
+typedef struct st_key KEY;
+
+class Ndb_index_stat_thread : public Ndb_component
+{
+public:
+  Ndb_index_stat_thread();
+  virtual ~Ndb_index_stat_thread();
+
+  int running;
+  pthread_mutex_t LOCK;
+  pthread_cond_t COND;
+  pthread_cond_t COND_ready;
+
+  /* protect entry lists where needed */
+  pthread_mutex_t list_mutex;
+
+  /* protect and signal changes in stats entries */
+  pthread_mutex_t stat_mutex;
+  pthread_cond_t stat_cond;
+
+private:
+  virtual int do_init() { return 0;}
+  virtual void do_run();
+  virtual int do_deinit() { return 0;}
+};
 
 /* these have to live in ha_ndbcluster.cc */
 extern bool ndb_index_stat_get_enable(THD *thd);
@@ -38,7 +59,7 @@ extern const char* g_ndb_status_index_st
 extern long g_ndb_status_index_stat_cache_query;
 extern long g_ndb_status_index_stat_cache_clean;
 
-void 
+void
 compute_index_bounds(NdbIndexScanOperation::IndexBound & bound,
                      const KEY *key_info,
                      const key_range *start_key, const key_range *end_key,
@@ -54,3 +75,5 @@ compute_index_bounds(NdbIndexScanOperati
 
 /* request on stats entry with recent error was ignored */
 #define Ndb_index_stat_error_HAS_ERROR          9003
+
+#endif

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-10-28 09:04:10 +0000
+++ b/sql/ha_ndbcluster.cc	2011-11-14 14:25:20 +0000
@@ -50,6 +50,8 @@
 #include <ndb_version.h>
 #include "ndb_mi.h"
 #include "ndb_conflict_trans.h"
+#include "ndb_component.h"
+#include "ndb_util_thread.h"
 
 #ifdef ndb_dynamite
 #undef assert
@@ -440,25 +442,7 @@ static int ndb_get_table_statistics(THD 
 
 THD *injector_thd= 0;
 
-// Util thread variables
-pthread_t ndb_util_thread;
-int ndb_util_thread_running= 0;
-pthread_mutex_t LOCK_ndb_util_thread;
-pthread_cond_t COND_ndb_util_thread;
-pthread_cond_t COND_ndb_util_ready;
-pthread_handler_t ndb_util_thread_func(void *arg);
-
 // Index stats thread variables
-pthread_t ndb_index_stat_thread;
-int ndb_index_stat_thread_running= 0;
-pthread_mutex_t LOCK_ndb_index_stat_thread;
-pthread_cond_t COND_ndb_index_stat_thread;
-pthread_cond_t COND_ndb_index_stat_ready;
-pthread_mutex_t ndb_index_stat_list_mutex;
-pthread_mutex_t ndb_index_stat_stat_mutex;
-pthread_cond_t ndb_index_stat_stat_cond;
-pthread_handler_t ndb_index_stat_thread_func(void *arg);
-
 extern void ndb_index_stat_free(NDB_SHARE *share);
 extern void ndb_index_stat_end();
 
@@ -12767,7 +12751,7 @@ ndbcluster_find_files(handlerton *hton, 
 /* Call back after cluster connect */
 static int connect_callback()
 {
-  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
   update_status_variables(NULL, &g_ndb_status,
                           g_ndb_cluster_connection);
 
@@ -12777,8 +12761,8 @@ static int connect_callback()
   while ((node_id= g_ndb_cluster_connection->get_next_node(node_iter)))
     g_node_id_map[node_id]= i++;
 
-  pthread_cond_signal(&COND_ndb_util_thread);
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  pthread_cond_signal(&ndb_util_thread.COND);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
   return 0;
 }
 
@@ -12824,6 +12808,12 @@ int(*ndb_wait_setup_func)(ulong) = 0;
 #endif
 extern int ndb_dictionary_is_mysqld;
 
+/**
+ * Components
+ */
+Ndb_util_thread ndb_util_thread;
+Ndb_index_stat_thread ndb_index_stat_thread;
+
 static int ndbcluster_init(void *p)
 {
   DBUG_ENTER("ndbcluster_init");
@@ -12836,20 +12826,11 @@ static int ndbcluster_init(void *p)
   assert(DependencyTracker::InvalidTransactionId ==
          Ndb_binlog_extra_row_info::InvalidTransactionId);
 #endif
+  ndb_util_thread.init();
+  ndb_index_stat_thread.init();
 
   pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
-  pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
-  pthread_cond_init(&COND_ndb_util_thread, NULL);
-  pthread_cond_init(&COND_ndb_util_ready, NULL);
   pthread_cond_init(&COND_ndb_setup_complete, NULL);
-  ndb_util_thread_running= -1;
-  pthread_mutex_init(&LOCK_ndb_index_stat_thread, MY_MUTEX_INIT_FAST);
-  pthread_cond_init(&COND_ndb_index_stat_thread, NULL);
-  pthread_cond_init(&COND_ndb_index_stat_ready, NULL);
-  pthread_mutex_init(&ndb_index_stat_list_mutex, MY_MUTEX_INIT_FAST);
-  pthread_mutex_init(&ndb_index_stat_stat_mutex, MY_MUTEX_INIT_FAST);
-  pthread_cond_init(&ndb_index_stat_stat_cond, NULL);
-  ndb_index_stat_thread_running= -1;
   ndbcluster_terminating= 0;
   ndb_dictionary_is_mysqld= 1;
   ndb_setup_complete= 0;
@@ -12914,72 +12895,53 @@ static int ndbcluster_init(void *p)
   }
 
   // Create utility thread
-  pthread_t tmp;
-  if (pthread_create(&tmp, &connection_attrib, ndb_util_thread_func, 0))
+  if (ndb_util_thread.start())
   {
     DBUG_PRINT("error", ("Could not create ndb utility thread"));
     my_hash_free(&ndbcluster_open_tables);
     pthread_mutex_destroy(&ndbcluster_mutex);
-    pthread_mutex_destroy(&LOCK_ndb_util_thread);
-    pthread_cond_destroy(&COND_ndb_util_thread);
-    pthread_cond_destroy(&COND_ndb_util_ready);
     pthread_cond_destroy(&COND_ndb_setup_complete);
     ndbcluster_global_schema_lock_deinit();
     goto ndbcluster_init_error;
   }
 
   /* Wait for the util thread to start */
-  pthread_mutex_lock(&LOCK_ndb_util_thread);
-  while (ndb_util_thread_running < 0)
-    pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
+  while (ndb_util_thread.running < 0)
+    pthread_cond_wait(&ndb_util_thread.COND_ready, &ndb_util_thread.LOCK);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
   
-  if (!ndb_util_thread_running)
+  if (!ndb_util_thread.running)
   {
     DBUG_PRINT("error", ("ndb utility thread exited prematurely"));
     my_hash_free(&ndbcluster_open_tables);
     pthread_mutex_destroy(&ndbcluster_mutex);
-    pthread_mutex_destroy(&LOCK_ndb_util_thread);
-    pthread_cond_destroy(&COND_ndb_util_thread);
-    pthread_cond_destroy(&COND_ndb_util_ready);
     pthread_cond_destroy(&COND_ndb_setup_complete);
     ndbcluster_global_schema_lock_deinit();
     goto ndbcluster_init_error;
   }
 
   // Create index statistics thread
-  pthread_t tmp2;
-  if (pthread_create(&tmp2, &connection_attrib, ndb_index_stat_thread_func, 0))
+  if (ndb_index_stat_thread.start())
   {
     DBUG_PRINT("error", ("Could not create ndb index statistics thread"));
     my_hash_free(&ndbcluster_open_tables);
     pthread_mutex_destroy(&ndbcluster_mutex);
-    pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
-    pthread_cond_destroy(&COND_ndb_index_stat_thread);
-    pthread_cond_destroy(&COND_ndb_index_stat_ready);
-    pthread_mutex_destroy(&ndb_index_stat_list_mutex);
-    pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
-    pthread_cond_destroy(&ndb_index_stat_stat_cond);
     goto ndbcluster_init_error;
   }
 
   /* Wait for the index statistics thread to start */
-  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
-  while (ndb_index_stat_thread_running < 0)
-    pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
-  
-  if (!ndb_index_stat_thread_running)
+  pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
+  while (ndb_index_stat_thread.running < 0)
+    pthread_cond_wait(&ndb_index_stat_thread.COND_ready,
+                      &ndb_index_stat_thread.LOCK);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
+
+  if (!ndb_index_stat_thread.running)
   {
     DBUG_PRINT("error", ("ndb index statistics thread exited prematurely"));
     my_hash_free(&ndbcluster_open_tables);
     pthread_mutex_destroy(&ndbcluster_mutex);
-    pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
-    pthread_cond_destroy(&COND_ndb_index_stat_thread);
-    pthread_cond_destroy(&COND_ndb_index_stat_ready);
-    pthread_mutex_destroy(&ndb_index_stat_list_mutex);
-    pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
-    pthread_cond_destroy(&ndb_index_stat_stat_cond);
     goto ndbcluster_init_error;
   }
 
@@ -12993,6 +12955,8 @@ static int ndbcluster_init(void *p)
   DBUG_RETURN(FALSE);
 
 ndbcluster_init_error:
+  ndb_util_thread.deinit();
+  ndb_index_stat_thread.deinit();
   /* disconnect from cluster and free connection resources */
   ndbcluster_disconnect();
   ndbcluster_hton->state= SHOW_OPTION_DISABLED;               // If we couldn't use handler
@@ -13030,12 +12994,13 @@ static int ndbcluster_end(handlerton *ht
 
   /* wait for index stat thread to finish */
   sql_print_information("Stopping Cluster Index Statistics thread");
-  pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+  pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
   ndbcluster_terminating= 1;
-  pthread_cond_signal(&COND_ndb_index_stat_thread);
-  while (ndb_index_stat_thread_running > 0)
-    pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
-  pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+  pthread_cond_signal(&ndb_index_stat_thread.COND);
+  while (ndb_index_stat_thread.running > 0)
+    pthread_cond_wait(&ndb_index_stat_thread.COND_ready,
+                      &ndb_index_stat_thread.LOCK);
+  pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
 
   /* wait for util and binlog thread to finish */
   ndbcluster_binlog_end(NULL);
@@ -13064,17 +13029,14 @@ static int ndbcluster_end(handlerton *ht
   ndb_index_stat_end();
   ndbcluster_disconnect();
 
+  ndb_util_thread.deinit();
+  ndb_index_stat_thread.deinit();
+
   // cleanup ndb interface
   ndb_end_internal();
 
   pthread_mutex_destroy(&ndbcluster_mutex);
-  pthread_mutex_destroy(&LOCK_ndb_util_thread);
-  pthread_cond_destroy(&COND_ndb_util_thread);
-  pthread_cond_destroy(&COND_ndb_util_ready);
   pthread_cond_destroy(&COND_ndb_setup_complete);
-  pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
-  pthread_cond_destroy(&COND_ndb_index_stat_thread);
-  pthread_cond_destroy(&COND_ndb_index_stat_ready);
   ndbcluster_global_schema_lock_deinit();
   DBUG_RETURN(0);
 }
@@ -15749,7 +15711,24 @@ ha_ndbcluster::update_table_comment(
 /**
   Utility thread main loop.
 */
-pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
+Ndb_util_thread::Ndb_util_thread()
+  : running(-1)
+{
+  pthread_mutex_init(&LOCK, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND, NULL);
+  pthread_cond_init(&COND_ready, NULL);
+}
+
+Ndb_util_thread::~Ndb_util_thread()
+{
+  assert(running <= 0);
+  pthread_mutex_destroy(&LOCK);
+  pthread_cond_destroy(&COND);
+  pthread_cond_destroy(&COND_ready);
+}
+
+void
+Ndb_util_thread::do_run()
 {
   THD *thd; /* needs to be first for thread_stack */
   struct timespec abstime;
@@ -15757,21 +15736,18 @@ pthread_handler_t ndb_util_thread_func(v
   uint share_list_size= 0;
   NDB_SHARE **share_list= NULL;
 
-  my_thread_init();
   DBUG_ENTER("ndb_util_thread");
   DBUG_PRINT("enter", ("cache_check_time: %lu", opt_ndb_cache_check_time));
- 
-   pthread_mutex_lock(&LOCK_ndb_util_thread);
+
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
 
   thd= new THD; /* note that contructor of THD uses DBUG_ */
   if (thd == NULL)
   {
     my_errno= HA_ERR_OUT_OF_MEM;
-    DBUG_RETURN(NULL);
+    DBUG_VOID_RETURN;
   }
   THD_CHECK_SENTRY(thd);
-  pthread_detach_this_thread();
-  ndb_util_thread= pthread_self();
 
   thd->thread_stack= (char*)&thd; /* remember where our stack is */
   if (thd->store_globals())
@@ -15794,9 +15770,9 @@ pthread_handler_t ndb_util_thread_func(v
   thd->update_charset();
 
   /* Signal successful initialization */
-  ndb_util_thread_running= 1;
-  pthread_cond_signal(&COND_ndb_util_ready);
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  ndb_util_thread.running= 1;
+  pthread_cond_signal(&ndb_util_thread.COND_ready);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
 
   /*
     wait for mysql server to start
@@ -15810,7 +15786,7 @@ pthread_handler_t ndb_util_thread_func(v
     if (ndbcluster_terminating)
     {
       mysql_mutex_unlock(&LOCK_server_started);
-      pthread_mutex_lock(&LOCK_ndb_util_thread);
+      pthread_mutex_lock(&ndb_util_thread.LOCK);
       goto ndb_util_thread_end;
     }
   }
@@ -15819,21 +15795,21 @@ pthread_handler_t ndb_util_thread_func(v
   /*
     Wait for cluster to start
   */
-  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
   while (!g_ndb_status.cluster_node_id && (ndbcluster_hton->slot != ~(uint)0))
   {
     /* ndb not connected yet */
-    pthread_cond_wait(&COND_ndb_util_thread, &LOCK_ndb_util_thread);
+    pthread_cond_wait(&ndb_util_thread.COND, &ndb_util_thread.LOCK);
     if (ndbcluster_terminating)
       goto ndb_util_thread_end;
   }
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
 
   /* Get thd_ndb for this thread */
   if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb(thd)))
   {
     sql_print_error("Could not allocate Thd_ndb object");
-    pthread_mutex_lock(&LOCK_ndb_util_thread);
+    pthread_mutex_lock(&ndb_util_thread.LOCK);
     goto ndb_util_thread_end;
   }
   set_thd_ndb(thd, thd_ndb);
@@ -15845,14 +15821,14 @@ pthread_handler_t ndb_util_thread_func(v
   set_timespec(abstime, 0);
   for (;;)
   {
-    pthread_mutex_lock(&LOCK_ndb_util_thread);
+    pthread_mutex_lock(&ndb_util_thread.LOCK);
     if (!ndbcluster_terminating)
-      pthread_cond_timedwait(&COND_ndb_util_thread,
-                             &LOCK_ndb_util_thread,
+      pthread_cond_timedwait(&ndb_util_thread.COND,
+                             &ndb_util_thread.LOCK,
                              &abstime);
     if (ndbcluster_terminating) /* Shutting down server */
       goto ndb_util_thread_end;
-    pthread_mutex_unlock(&LOCK_ndb_util_thread);
+    pthread_mutex_unlock(&ndb_util_thread.LOCK);
 #ifdef NDB_EXTRA_DEBUG_UTIL_THREAD
     DBUG_PRINT("ndb_util_thread", ("Started, cache_check_time: %lu",
                                    opt_ndb_cache_check_time));
@@ -16005,7 +15981,7 @@ next:
     set_timespec_nsec(abstime, opt_ndb_cache_check_time * 1000000ULL);
   }
 
-  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  pthread_mutex_lock(&ndb_util_thread.LOCK);
 
 ndb_util_thread_end:
   net_end(&thd->net);
@@ -16021,15 +15997,12 @@ ndb_util_thread_fail:
   delete thd;
   
   /* signal termination */
-  ndb_util_thread_running= 0;
-  pthread_cond_signal(&COND_ndb_util_ready);
-  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+  ndb_util_thread.running= 0;
+  pthread_cond_signal(&ndb_util_thread.COND_ready);
+  pthread_mutex_unlock(&ndb_util_thread.LOCK);
   DBUG_PRINT("exit", ("ndb_util_thread"));
 
   DBUG_LEAVE;                               // Must match DBUG_ENTER()
-  my_thread_end();
-  pthread_exit(0);
-  return NULL;                              // Avoid compiler warnings
 }
 
 /*

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-10-20 12:51:03 +0000
+++ b/sql/ha_ndbcluster.h	2011-11-14 14:25:20 +0000
@@ -1172,7 +1172,9 @@ void ndbcluster_print_error(int error, c
 static const char ndbcluster_hton_name[]= "ndbcluster";
 static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1;
 extern int ndbcluster_terminating;
-extern int ndb_util_thread_running;
-extern pthread_cond_t COND_ndb_util_ready;
-extern int ndb_index_stat_thread_running;
-extern pthread_cond_t COND_ndb_index_stat_ready;
+
+#include "ndb_util_thread.h"
+extern Ndb_util_thread ndb_util_thread;
+
+#include "ha_ndb_index_stat.h"
+extern Ndb_index_stat_thread ndb_index_stat_thread;

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-10-20 16:18:28 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-11-10 10:35:09 +0000
@@ -812,7 +812,7 @@ int ndbcluster_binlog_end(THD *thd)
 {
   DBUG_ENTER("ndbcluster_binlog_end");
 
-  if (ndb_util_thread_running > 0)
+  if (ndb_util_thread.running > 0)
   {
     /*
       Wait for util thread to die (as this uses the injector mutex)
@@ -822,33 +822,34 @@ int ndbcluster_binlog_end(THD *thd)
       be called before ndb_cluster_end().
     */
     sql_print_information("Stopping Cluster Utility thread");
-    pthread_mutex_lock(&LOCK_ndb_util_thread);
+    pthread_mutex_lock(&ndb_util_thread.LOCK);
     /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
-    ndb_util_thread_running++;
+    ndb_util_thread.running++;
     ndbcluster_terminating= 1;
-    pthread_cond_signal(&COND_ndb_util_thread);
-    while (ndb_util_thread_running > 1)
-      pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
-    ndb_util_thread_running--;
-    pthread_mutex_unlock(&LOCK_ndb_util_thread);
+    pthread_cond_signal(&ndb_util_thread.COND);
+    while (ndb_util_thread.running > 1)
+      pthread_cond_wait(&ndb_util_thread.COND_ready, &ndb_util_thread.LOCK);
+    ndb_util_thread.running--;
+    pthread_mutex_unlock(&ndb_util_thread.LOCK);
   }
 
-  if (ndb_index_stat_thread_running > 0)
+  if (ndb_index_stat_thread.running > 0)
   {
     /*
       Index stats thread blindly imitates util thread.  Following actually
       fixes some "[Warning] Plugin 'ndbcluster' will be forced to shutdown".
     */
     sql_print_information("Stopping Cluster Index Stats thread");
-    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    pthread_mutex_lock(&ndb_index_stat_thread.LOCK);
     /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
-    ndb_index_stat_thread_running++;
+    ndb_index_stat_thread.running++;
     ndbcluster_terminating= 1;
-    pthread_cond_signal(&COND_ndb_index_stat_thread);
-    while (ndb_index_stat_thread_running > 1)
-      pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
-    ndb_index_stat_thread_running--;
-    pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
+    pthread_cond_signal(&ndb_index_stat_thread.COND);
+    while (ndb_index_stat_thread.running > 1)
+      pthread_cond_wait(&ndb_index_stat_thread.COND_ready,
+                        &ndb_index_stat_thread.LOCK);
+    ndb_index_stat_thread.running--;
+    pthread_mutex_unlock(&ndb_index_stat_thread.LOCK);
   }
 
   if (ndbcluster_binlog_inited)

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-10-20 16:18:28 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-11-10 10:35:09 +0000
@@ -191,10 +191,6 @@ void ndbcluster_global_schema_lock_init(
 void ndbcluster_global_schema_lock_deinit();
 
 extern unsigned char g_node_id_map[max_ndb_nodes];
-extern pthread_mutex_t LOCK_ndb_util_thread;
-extern pthread_cond_t COND_ndb_util_thread;
-extern pthread_mutex_t LOCK_ndb_index_stat_thread;
-extern pthread_cond_t COND_ndb_index_stat_thread;
 extern pthread_mutex_t ndbcluster_mutex;
 extern HASH ndbcluster_open_tables;
 

=== modified file 'sql/ha_ndbcluster_cond.cc'
--- a/sql/ha_ndbcluster_cond.cc	2011-11-09 09:53:04 +0000
+++ b/sql/ha_ndbcluster_cond.cc	2011-11-14 14:25:20 +0000
@@ -1217,7 +1217,10 @@ ha_ndbcluster_cond::build_scan_filter_pr
       uint32 len= value->save_in_field(field);
       char buff[MAX_FIELD_WIDTH];
       String str(buff,sizeof(buff),field->get_field_charset());
-      field->get_field_val_str(&str);
+      if (len > field->get_field()->field_length)
+        str.set(value->get_val(), len, field->get_field_charset());
+      else
+        field->get_field_val_str(&str);
       const char *val=
         (value->is_const_func() && is_string)?
         str.ptr()
@@ -1245,7 +1248,10 @@ ha_ndbcluster_cond::build_scan_filter_pr
       uint32 len= value->save_in_field(field);
       char buff[MAX_FIELD_WIDTH];
       String str(buff,sizeof(buff),field->get_field_charset());
-      field->get_field_val_str(&str);
+      if (len > field->get_field()->field_length)
+        str.set(value->get_val(), len, field->get_field_charset());
+      else
+        field->get_field_val_str(&str);
       const char *val=
         (value->is_const_func() && is_string)?
         str.ptr()

=== modified file 'sql/ha_ndbcluster_cond.h'
--- a/sql/ha_ndbcluster_cond.h	2011-11-04 08:33:56 +0000
+++ b/sql/ha_ndbcluster_cond.h	2011-11-10 15:08:40 +0000
@@ -250,16 +250,7 @@ public:
     const Item *item= value.item;
     if (item && field)
     {
-      DBUG_PRINT("info", ("item length %u, field length %u",
-                          item->max_length, field->field_length));
-      if (item->max_length > field->field_length)
-      {
-        DBUG_PRINT("info", ("Comparing field with longer value"));
-        DBUG_PRINT("info", ("Field can store %u", field->field_length));
-        length= field->field_length;
-      }
-      else
-        length= item->max_length;
+      length= item->max_length;
       my_bitmap_map *old_map=
         dbug_tmp_use_all_columns(field->table, field->table->write_set);
       ((Item *)item)->save_in_field(field, FALSE);

=== added file 'sql/ndb_component.cc'
--- a/sql/ndb_component.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_component.cc	2011-11-10 08:16:52 +0000
@@ -0,0 +1,143 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ndb_component.h"
+
+Ndb_component::Ndb_component()
+  : m_thread_state(TS_UNINIT)
+{
+}
+
+Ndb_component::~Ndb_component()
+{
+
+}
+
+int
+Ndb_component::init()
+{
+  assert(m_thread_state == TS_UNINIT);
+
+  pthread_mutex_init(&m_start_stop_mutex, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&m_start_stop_cond, NULL);
+
+  int res= do_init();
+  if (res == 0)
+  {
+    m_thread_state= TS_INIT;
+  }
+  return res;
+}
+
+void *
+Ndb_component_run_C(void * arg)
+{
+  my_thread_init();
+  Ndb_component * self = reinterpret_cast<Ndb_component*>(arg);
+  self->run_impl();
+  my_thread_end();
+  pthread_exit(0);
+  return NULL;                              // Avoid compiler warnings
+}
+
+extern pthread_attr_t connection_attrib; // mysql global pthread attr
+
+int
+Ndb_component::start()
+{
+  assert(m_thread_state == TS_INIT);
+  pthread_mutex_lock(&m_start_stop_mutex);
+  m_thread_state= TS_STARTING;
+  int res= pthread_create(&m_thread, &connection_attrib, Ndb_component_run_C,
+                          this);
+
+  if (res == 0)
+  {
+    while (m_thread_state == TS_STARTING)
+    {
+      pthread_cond_wait(&m_start_stop_cond, &m_start_stop_mutex);
+    }
+    pthread_mutex_unlock(&m_start_stop_mutex);
+    return m_thread_state == TS_RUNNING ? 0 : 1;
+  }
+
+  pthread_mutex_unlock(&m_start_stop_mutex);
+  return res;
+}
+
+void
+Ndb_component::run_impl()
+{
+  pthread_detach_this_thread();
+  pthread_mutex_lock(&m_start_stop_mutex);
+  if (m_thread_state == TS_STARTING)
+  {
+    m_thread_state= TS_RUNNING;
+    pthread_cond_signal(&m_start_stop_cond);
+    pthread_mutex_unlock(&m_start_stop_mutex);
+    do_run();
+    pthread_mutex_lock(&m_start_stop_mutex);
+  }
+  m_thread_state = TS_STOPPED;
+  pthread_cond_signal(&m_start_stop_cond);
+  pthread_mutex_unlock(&m_start_stop_mutex);
+}
+
+bool
+Ndb_component::is_stop_requested()
+{
+  bool res = false;
+  pthread_mutex_lock(&m_start_stop_mutex);
+  res = m_thread_state != TS_RUNNING;
+  pthread_mutex_unlock(&m_start_stop_mutex);
+  return res;
+}
+
+int
+Ndb_component::stop()
+{
+  pthread_mutex_lock(&m_start_stop_mutex);
+  assert(m_thread_state == TS_RUNNING ||
+         m_thread_state == TS_STOPPING ||
+         m_thread_state == TS_STOPPED);
+
+  if (m_thread_state == TS_RUNNING)
+  {
+    m_thread_state= TS_STOPPING;
+  }
+
+  if (m_thread_state == TS_STOPPING)
+  {
+    while (m_thread_state != TS_STOPPED)
+    {
+      pthread_cond_signal(&m_start_stop_cond);
+      pthread_cond_wait(&m_start_stop_cond, &m_start_stop_mutex);
+    }
+  }
+  pthread_mutex_unlock(&m_start_stop_mutex);
+
+  return 0;
+}
+
+int
+Ndb_component::deinit()
+{
+  assert(m_thread_state == TS_STOPPED);
+  pthread_mutex_destroy(&m_start_stop_mutex);
+  pthread_cond_destroy(&m_start_stop_cond);
+  return do_deinit();
+}

=== added file 'sql/ndb_component.h'
--- a/sql/ndb_component.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_component.h	2011-11-10 08:16:52 +0000
@@ -0,0 +1,82 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef HA_NDBCLUSTER_COMPONENT_H
+#define HA_NDBCLUSTER_COMPONENT_H
+
+#include <my_global.h>
+#include <my_pthread.h>
+
+extern "C" void * Ndb_component_run_C(void *);
+
+class Ndb_component
+{
+public:
+  virtual int init();
+  virtual int start();
+  virtual int stop();
+  virtual int deinit();
+
+protected:
+  /**
+   * Con/de-structor is protected...so that sub-class needs to provide own
+   */
+  Ndb_component();
+  virtual ~Ndb_component();
+
+  /**
+   * Component init function
+   */
+  virtual int do_init() = 0;
+
+  /**
+   * Component run function
+   */
+  virtual void do_run() = 0;
+
+  /**
+   * Component deinit function
+   */
+  virtual int do_deinit() = 0;
+
+  /**
+   * For usage in threads main loop
+   */
+  bool is_stop_requested();
+
+private:
+
+  enum ThreadState
+  {
+    TS_UNINIT   = 0,
+    TS_INIT     = 1,
+    TS_STARTING = 2,
+    TS_RUNNING  = 3,
+    TS_STOPPING = 4,
+    TS_STOPPED  = 5
+  };
+
+  ThreadState m_thread_state;
+  pthread_t m_thread;
+  pthread_mutex_t m_start_stop_mutex;
+  pthread_cond_t m_start_stop_cond;
+
+  void run_impl();
+  friend void * Ndb_component_run_C(void *);
+};
+
+#endif

=== added file 'sql/ndb_util_thread.h'
--- a/sql/ndb_util_thread.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_util_thread.h	2011-11-10 08:16:52 +0000
@@ -0,0 +1,40 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_UTIL_THREAD_H
+#define NDB_UTIL_THREAD_H
+
+#include "ndb_component.h"
+
+class Ndb_util_thread : public Ndb_component
+{
+public:
+  Ndb_util_thread();
+  virtual ~Ndb_util_thread();
+
+  int running;
+  pthread_mutex_t LOCK;
+  pthread_cond_t COND;
+  pthread_cond_t COND_ready;
+
+private:
+  virtual int do_init() { return 0;}
+  virtual void do_run();
+  virtual int do_deinit() { return 0;}
+};
+
+#endif

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2011-09-09 13:13:52 +0000
+++ b/storage/ndb/CMakeLists.txt	2011-11-14 14:25:20 +0000
@@ -150,7 +150,8 @@ SET(NDBCLUSTER_SOURCES
   ../../sql/ha_ndb_index_stat.cc
   ../../sql/ha_ndbinfo.cc
   ../../sql/ndb_mi.cc
-  ../../sql/ndb_conflict_trans.cc)
+  ../../sql/ndb_conflict_trans.cc
+  ../../sql/ndb_component.cc)
 INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include)
 
 IF(EXISTS ${CMAKE_SOURCE_DIR}/storage/mysql_storage_engine.cmake)

=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h	2011-10-13 20:08:25 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h	2011-11-14 14:25:20 +0000
@@ -194,8 +194,9 @@
 #define NDBMT_BLOCK_MASK ((1 << NDBMT_BLOCK_BITS) - 1)
 #define NDBMT_BLOCK_INSTANCE_BITS 7
 
-#define MAX_NDBMT_LQH_WORKERS 4
-#define MAX_NDBMT_LQH_THREADS 4
+#define NDB_MAX_LOG_PARTS     4
+#define MAX_NDBMT_LQH_WORKERS NDB_MAX_LOG_PARTS
+#define MAX_NDBMT_LQH_THREADS NDB_MAX_LOG_PARTS
 #define MAX_NDBMT_TC_THREADS  2
 
 #define NDB_FILE_BUFFER_SIZE (256*1024)

=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2011-10-07 16:12:13 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2011-11-14 12:02:56 +0000
@@ -68,6 +68,7 @@
 
 #define CFG_DB_FILESYSTEM_PATH        125
 #define CFG_DB_NO_REDOLOG_FILES       126
+#define CFG_DB_NO_REDOLOG_PARTS       632
 #define CFG_DB_REDOLOG_FILE_SIZE      140
 
 #define CFG_DB_LCP_DISC_PAGES_TUP     127
@@ -198,6 +199,7 @@
 #define CFG_DB_MT_THREAD_CONFIG          628
 
 #define CFG_DB_CRASH_ON_CORRUPTED_TUPLE  629
+/* 632 used for CFG_DB_NO_REDOLOG_PARTS */
 
 #define CFG_NODE_ARBIT_RANK           200
 #define CFG_NODE_ARBIT_DELAY          201

=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in	2011-07-09 11:16:31 +0000
+++ b/storage/ndb/include/ndb_version.h.in	2011-11-14 14:25:20 +0000
@@ -693,4 +693,25 @@ ndbd_get_config_supported(Uint32 x)
   return x >= NDBD_GET_CONFIG_SUPPORT_71;
 }
 
+#define NDBD_CONFIGURABLE_LOG_PARTS_70 NDB_MAKE_VERSION(7,0,29)
+#define NDBD_CONFIGURABLE_LOG_PARTS_71 NDB_MAKE_VERSION(7,1,18)
+#define NDBD_CONFIGURABLE_LOG_PARTS_72 NDB_MAKE_VERSION(7,2,3)
+
+static
+inline
+int
+ndb_configurable_log_parts(Uint32 x)
+{
+  const Uint32 major = (x >> 16) & 0xFF;
+  const Uint32 minor = (x >>  8) & 0xFF;
+
+  if (major == 7 && minor < 2)
+  {
+    if (minor == 0)
+      return x >= NDBD_CONFIGURABLE_LOG_PARTS_70;
+    else if (minor == 1)
+      return x >= NDBD_CONFIGURABLE_LOG_PARTS_71;
+  }
+  return x >= NDBD_CONFIGURABLE_LOG_PARTS_72;
+}
 #endif

=== modified file 'storage/ndb/src/common/portlib/NdbThread.c'
--- a/storage/ndb/src/common/portlib/NdbThread.c	2011-10-21 08:59:23 +0000
+++ b/storage/ndb/src/common/portlib/NdbThread.c	2011-11-10 15:06:03 +0000
@@ -540,11 +540,17 @@ NdbThread_End()
   {
     NdbMutex_Destroy(g_ndb_thread_mutex);
   }
-  
+
   if (g_ndb_thread_condition)
   {
     NdbCondition_Destroy(g_ndb_thread_condition);
   }
+
+  if (g_main_thread)
+  {
+    NdbMem_Free((char *)g_main_thread);
+    g_main_thread = 0;
+  }
 }
 
 int

=== modified file 'storage/ndb/src/common/util/NdbPack.cpp'
--- a/storage/ndb/src/common/util/NdbPack.cpp	2011-08-09 15:37:45 +0000
+++ b/storage/ndb/src/common/util/NdbPack.cpp	2011-11-11 08:38:00 +0000
@@ -930,7 +930,6 @@ const char*
 NdbPack::Data::print(char* buf, Uint32 bufsz) const
 {
   Print p(buf, bufsz);
-  char* ptr = buf;
   if (m_varBytes != 0)
   {
     p.print("varBytes:");
@@ -1291,6 +1290,7 @@ Tdata::create()
     Uint8 xbuf[Tspec::MaxBuf];
     Uint64 xbuf_align;
   };
+  (void)xbuf_align; // compiler warning
   memset(xbuf, 0x3f, sizeof(xbuf));
   m_xsize = 0;
   m_xnulls = 0;
@@ -1830,7 +1830,7 @@ testdesc(const Tdata& tdata)
   const NdbPack::Data& data = tdata.m_data;
   const Uint8* buf_old = (const Uint8*)data.get_full_buf();
   const Uint32 varBytes = data.get_var_bytes();
-  const Uint32 nullMaskLen = tspec.m_spec.get_nullmask_len(false);
+  // const Uint32 nullMaskLen = tspec.m_spec.get_nullmask_len(false);
   const Uint32 dataLen = data.get_data_len();
   const Uint32 fullLen = data.get_full_len();
   const Uint32 cnt = data.get_cnt();

=== modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp	2011-11-14 09:18:48 +0000
@@ -624,8 +624,8 @@ struct ScanRec {
 /* TABREC                                                                            */
 /* --------------------------------------------------------------------------------- */
 struct Tabrec {
-  Uint32 fragholder[MAX_FRAG_PER_NODE];
-  Uint32 fragptrholder[MAX_FRAG_PER_NODE];
+  Uint32 fragholder[MAX_FRAG_PER_LQH];
+  Uint32 fragptrholder[MAX_FRAG_PER_LQH];
   Uint32 tabUserPtr;
   BlockReference tabUserRef;
   Uint32 tabUserGsn;

=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2011-11-14 09:18:48 +0000
@@ -481,7 +481,7 @@ void Dbacc::initialiseTableRec(Signal* s
   for (tabptr.i = 0; tabptr.i < ctablesize; tabptr.i++) {
     refresh_watch_dog();
     ptrAss(tabptr, tabrec);
-    for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+    for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabptr.p->fragholder); i++) {
       tabptr.p->fragholder[i] = RNIL;
       tabptr.p->fragptrholder[i] = RNIL;
     }//for
@@ -653,7 +653,7 @@ Dbacc::execDROP_FRAG_REQ(Signal* signal)
   tabPtr.p->tabUserPtr = req->senderData;
   tabPtr.p->tabUserGsn = GSN_DROP_FRAG_REQ;
 
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++)
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabPtr.p->fragholder); i++)
   {
     jam();
     if (tabPtr.p->fragholder[i] == req->fragId)
@@ -677,7 +677,7 @@ void Dbacc::releaseRootFragResources(Sig
   if (tabPtr.p->tabUserGsn == GSN_DROP_TAB_REQ)
   {
     jam();
-    for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++)
+    for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabPtr.p->fragholder); i++)
     {
       jam();
       if (tabPtr.p->fragholder[i] != RNIL)
@@ -857,7 +857,7 @@ void Dbacc::releaseFragRecord(Signal* si
 /* -------------------------------------------------------------------------- */
 bool Dbacc::addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fid) 
 {
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabptr.p->fragholder); i++) {
     jam();
     if (tabptr.p->fragholder[i] == RNIL) {
       jam();
@@ -2493,7 +2493,7 @@ void Dbacc::execACC_LOCKREQ(Signal* sign
     ptrCheckGuard(tabptr, ctablesize, tabrec);
     // find fragment (TUX will know it)
     if (req->fragPtrI == RNIL) {
-      for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+      for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabptr.p->fragholder); i++) {
         jam();
         if (tabptr.p->fragholder[i] == req->fragId){
 	  jam();
@@ -7590,7 +7590,7 @@ void Dbacc::takeOutReadyScanQueue(Signal
 
 bool Dbacc::getfragmentrec(Signal* signal, FragmentrecPtr& rootPtr, Uint32 fid) 
 {
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabptr.p->fragholder); i++) {
     jam();
     if (tabptr.p->fragholder[i] == fid) {
       jam();

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2011-11-09 09:53:04 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2011-11-14 14:25:20 +0000
@@ -226,9 +226,9 @@ Dbdict::execDUMP_STATE_ORD(Signal* signa
 
   if (signal->theData[0] == 1227)
   {
-    DictObject_hash::Iterator iter;
-    bool ok = c_obj_hash.first(iter);
-    for(; ok; ok = c_obj_hash.next(iter))
+    DictObjectName_hash::Iterator iter;
+    bool ok = c_obj_name_hash.first(iter);
+    for (; ok; ok = c_obj_name_hash.next(iter))
     {
       LocalRope name(c_rope_pool, iter.curr.p->m_name);
       char buf[1024];
@@ -297,7 +297,7 @@ void Dbdict::execDBINFO_SCANREQ(Signal *
         { CFG_DB_NO_ATTRIBUTES,0,0,0 }},
       { "Table Record",
         c_tableRecordPool.getUsed(),
-        c_tableRecordPool.getSize(),
+        c_noOfMetaTables,
         c_tableRecordPool.getEntrySize(),
         c_tableRecordPool.getUsedHi(),
         { CFG_DB_NO_TABLES,0,0,0 }},
@@ -543,7 +543,7 @@ void Dbdict::packTableIntoPages(Signal* 
   case DictTabInfo::Tablespace:
   case DictTabInfo::LogfileGroup:{
     FilegroupPtr fg_ptr;
-    ndbrequire(c_filegroup_hash.find(fg_ptr, tableId));
+    ndbrequire(find_object(fg_ptr, tableId));
     const Uint32 free_hi= signal->theData[4];
     const Uint32 free_lo= signal->theData[5];
     packFilegroupIntoPages(w, fg_ptr, free_hi, free_lo);
@@ -551,20 +551,20 @@ void Dbdict::packTableIntoPages(Signal* 
   }
   case DictTabInfo::Datafile:{
     FilePtr fg_ptr;
-    ndbrequire(c_file_hash.find(fg_ptr, tableId));
+    ndbrequire(find_object(fg_ptr, tableId));
     const Uint32 free_extents= signal->theData[4];
     packFileIntoPages(w, fg_ptr, free_extents);
     break;
   }
   case DictTabInfo::Undofile:{
     FilePtr fg_ptr;
-    ndbrequire(c_file_hash.find(fg_ptr, tableId));
+    ndbrequire(find_object(fg_ptr, tableId));
     packFileIntoPages(w, fg_ptr, 0);
     break;
   }
   case DictTabInfo::HashMap:{
     HashMapRecordPtr hm_ptr;
-    ndbrequire(c_hash_map_hash.find(hm_ptr, tableId));
+    ndbrequire(find_object(hm_ptr, tableId));
     packHashMapIntoPages(w, hm_ptr);
     break;
   }
@@ -656,7 +656,7 @@ Dbdict::packTableIntoPages(SimplePropert
   if (tablePtr.p->hashMapObjectId != RNIL)
   {
     HashMapRecordPtr hm_ptr;
-    ndbrequire(c_hash_map_hash.find(hm_ptr, tablePtr.p->hashMapObjectId));
+    ndbrequire(find_object(hm_ptr, tablePtr.p->hashMapObjectId));
     w.add(DictTabInfo::HashMapVersion, hm_ptr.p->m_object_version);
   }
 
@@ -731,7 +731,7 @@ Dbdict::packTableIntoPages(SimplePropert
   {
     w.add(DictTabInfo::TablespaceId, tablePtr.p->m_tablespace_id);
     FilegroupPtr tsPtr;
-    ndbrequire(c_filegroup_hash.find(tsPtr, tablePtr.p->m_tablespace_id));
+    ndbrequire(find_object(tsPtr, tablePtr.p->m_tablespace_id));
     w.add(DictTabInfo::TablespaceVersion, tsPtr.p->m_version);
   }
 
@@ -830,7 +830,7 @@ Dbdict::packFilegroupIntoPages(SimplePro
     fg.TS_ExtentSize = fg_ptr.p->m_tablespace.m_extent_size;
     fg.TS_LogfileGroupId = fg_ptr.p->m_tablespace.m_default_logfile_group_id;
     FilegroupPtr lfg_ptr;
-    ndbrequire(c_filegroup_hash.find(lfg_ptr, fg.TS_LogfileGroupId));
+    ndbrequire(find_object(lfg_ptr, fg.TS_LogfileGroupId));
     fg.TS_LogfileGroupVersion = lfg_ptr.p->m_version;
     break;
   case DictTabInfo::LogfileGroup:
@@ -869,7 +869,7 @@ Dbdict::packFileIntoPages(SimpleProperti
   f.FileVersion = f_ptr.p->m_version;
 
   FilegroupPtr lfg_ptr;
-  ndbrequire(c_filegroup_hash.find(lfg_ptr, f.FilegroupId));
+  ndbrequire(find_object(lfg_ptr, f.FilegroupId));
   f.FilegroupVersion = lfg_ptr.p->m_version;
 
   SimpleProperties::UnpackStatus s;
@@ -1816,7 +1816,7 @@ void Dbdict::closeReadSchemaConf(Signal*
       ndbrequire(c_writeSchemaRecord.inUse == false);
       XSchemaFile * xsf = &c_schemaFile[c_schemaRecord.oldSchemaPage != 0 ];
       Uint32 noOfPages =
-        (c_tableRecordPool.getSize() + NDB_SF_PAGE_ENTRIES - 1) /
+        (c_noOfMetaTables + NDB_SF_PAGE_ENTRIES - 1) /
         NDB_SF_PAGE_ENTRIES;
       resizeSchemaFile(xsf, noOfPages);
 
@@ -1946,15 +1946,13 @@ Dbdict::convertSchemaFileTo_6_4(XSchemaF
 Dbdict::Dbdict(Block_context& ctx):
   SimulatedBlock(DBDICT, ctx),
   c_attributeRecordHash(c_attributeRecordPool),
-  c_file_hash(c_file_pool),
-  c_filegroup_hash(c_filegroup_pool),
-  c_obj_hash(c_obj_pool),
+  c_obj_name_hash(c_obj_pool),
+  c_obj_id_hash(c_obj_pool),
   c_schemaOpHash(c_schemaOpPool),
   c_schemaTransHash(c_schemaTransPool),
   c_schemaTransList(c_schemaTransPool),
   c_schemaTransCount(0),
   c_txHandleHash(c_txHandlePool),
-  c_hash_map_hash(c_hash_map_pool),
   c_opCreateEvent(c_opRecordPool),
   c_opSubEvent(c_opRecordPool),
   c_opDropEvent(c_opRecordPool),
@@ -2374,6 +2372,7 @@ void Dbdict::initialiseTableRecord(Table
   tablePtr.p->indexStatFragId = ZNIL;
   tablePtr.p->indexStatNodeId = ZNIL;
   tablePtr.p->indexStatBgRequest = 0;
+  tablePtr.p->m_obj_ptr_i = RNIL;
 }//Dbdict::initialiseTableRecord()
 
 void Dbdict::initTriggerRecords()
@@ -2416,12 +2415,12 @@ Uint32 Dbdict::getFsConnRecord()
  * Search schemafile for free entry.  Its index is used as 'logical id'
  * of new disk-stored object.
  */
-Uint32 Dbdict::getFreeObjId(Uint32 minId, bool both)
+Uint32 Dbdict::getFreeObjId(bool both)
 {
   const XSchemaFile * newxsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
   const XSchemaFile * oldxsf = &c_schemaFile[SchemaRecord::OLD_SCHEMA_FILE];
   const Uint32 noOfEntries = newxsf->noOfPages * NDB_SF_PAGE_ENTRIES;
-  for (Uint32 i = minId; i<noOfEntries; i++)
+  for (Uint32 i = 0; i<noOfEntries; i++)
   {
     const SchemaFile::TableEntry * oldentry = getTableEntry(oldxsf, i);
     const SchemaFile::TableEntry * newentry = getTableEntry(newxsf, i);
@@ -2441,12 +2440,12 @@ Uint32 Dbdict::getFreeObjId(Uint32 minId
 
 Uint32 Dbdict::getFreeTableRecord()
 {
-  Uint32 i = getFreeObjId(0);
+  Uint32 i = getFreeObjId();
   if (i == RNIL) {
     jam();
     return RNIL;
   }
-  if (i >= c_tableRecordPool.getSize()) {
+  if (i >= c_noOfMetaTables) {
     jam();
     return RNIL;
   }
@@ -2632,11 +2631,11 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
     m_ctx.m_config.getOwnConfigIterator();
   ndbrequire(p != 0);
 
-  Uint32 attributesize, tablerecSize;
+  Uint32 attributesize;
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_TRIGGERS,
 					&c_maxNoOfTriggers));
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DICT_ATTRIBUTE,&attributesize));
-  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE, &tablerecSize));
+  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE, &c_noOfMetaTables));
   c_indexStatAutoCreate = 0;
   ndb_mgm_get_int_parameter(p, CFG_DB_INDEX_STAT_AUTO_CREATE,
                             &c_indexStatAutoCreate);
@@ -2655,8 +2654,8 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
   c_nodes.setSize(MAX_NDB_NODES);
   c_pageRecordArray.setSize(ZNUMBER_OF_PAGES);
   c_schemaPageRecordArray.setSize(2 * NDB_SF_MAX_PAGES);
-  c_tableRecordPool.setSize(tablerecSize);
-  g_key_descriptor_pool.setSize(tablerecSize);
+  c_tableRecordPool.setSize(c_noOfMetaTables);
+  g_key_descriptor_pool.setSize(c_noOfMetaTables);
   c_triggerRecordPool.setSize(c_maxNoOfTriggers);
 
   Record_info ri;
@@ -2669,13 +2668,11 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
   c_txHandlePool.setSize(2);
   c_txHandleHash.setSize(2);
 
-  c_obj_pool.setSize(tablerecSize+c_maxNoOfTriggers);
-  c_obj_hash.setSize((tablerecSize+c_maxNoOfTriggers+1)/2);
+  c_obj_pool.setSize(c_noOfMetaTables+c_maxNoOfTriggers);
+  c_obj_name_hash.setSize((c_noOfMetaTables+c_maxNoOfTriggers+1)/2);
+  c_obj_id_hash.setSize((c_noOfMetaTables+c_maxNoOfTriggers+1)/2);
   m_dict_lock_pool.setSize(MAX_NDB_NODES);
 
-  c_file_hash.setSize(16);
-  c_filegroup_hash.setSize(16);
-
   c_file_pool.init(RT_DBDICT_FILE, pc);
   c_filegroup_pool.init(RT_DBDICT_FILEGROUP, pc);
 
@@ -2704,7 +2701,6 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
   c_copyDataRecPool.arena_pool_init(&c_arenaAllocator, RT_DBDICT_COPY_DATA, pc);
   c_schemaOpPool.arena_pool_init(&c_arenaAllocator, RT_DBDICT_SCHEMA_OPERATION, pc);
 
-  c_hash_map_hash.setSize(4);
   c_hash_map_pool.setSize(32);
   g_hash_map.setSize(32);
 
@@ -2726,7 +2722,7 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
   c_schemaFile[1].noOfPages = 0;
 
   Uint32 rps = 0;
-  rps += tablerecSize * (MAX_TAB_NAME_SIZE + MAX_FRM_DATA_SIZE);
+  rps += c_noOfMetaTables * (MAX_TAB_NAME_SIZE + MAX_FRM_DATA_SIZE);
   rps += attributesize * (MAX_ATTR_NAME_SIZE + MAX_ATTR_DEFAULT_VALUE_SIZE);
   rps += c_maxNoOfTriggers * MAX_TAB_NAME_SIZE;
   rps += (10 + 10) * MAX_TAB_NAME_SIZE;
@@ -2896,7 +2892,7 @@ void Dbdict::execREAD_NODESCONF(Signal* 
 void Dbdict::initSchemaFile(Signal* signal)
 {
   XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
-  xsf->noOfPages = (c_tableRecordPool.getSize() + NDB_SF_PAGE_ENTRIES - 1)
+  xsf->noOfPages = (c_noOfMetaTables + NDB_SF_PAGE_ENTRIES - 1)
                    / NDB_SF_PAGE_ENTRIES;
   initSchemaFile(xsf, 0, xsf->noOfPages, true);
   // init alt copy too for INR
@@ -2966,7 +2962,7 @@ Dbdict::activateIndexes(Signal* signal, 
 
   TableRecordPtr indexPtr;
   indexPtr.i = i;
-  for (; indexPtr.i < c_tableRecordPool.getSize(); indexPtr.i++)
+  for (; indexPtr.i < c_noOfMetaTables; indexPtr.i++)
   {
     c_tableRecordPool.getPtr(indexPtr);
 
@@ -3133,7 +3129,7 @@ Dbdict::rebuildIndexes(Signal* signal, U
 
   TableRecordPtr indexPtr;
   indexPtr.i = i;
-  for (; indexPtr.i < c_tableRecordPool.getSize(); indexPtr.i++) {
+  for (; indexPtr.i < c_noOfMetaTables; indexPtr.i++) {
     c_tableRecordPool.getPtr(indexPtr);
     if (check_read_obj(indexPtr.i))
       continue;
@@ -3656,7 +3652,7 @@ void Dbdict::checkSchemaStatus(Signal* s
     SchemaFile::EntryState ownState =
       (SchemaFile::EntryState)ownEntry->m_tableState;
 
-    if (c_restartRecord.activeTable >= c_tableRecordPool.getSize())
+    if (c_restartRecord.activeTable >= c_noOfMetaTables)
     {
       jam();
       ndbrequire(masterState == SchemaFile::SF_UNUSED);
@@ -4150,7 +4146,7 @@ Dbdict::execGET_TABINFO_CONF(Signal* sig
     {
       jam();
       FilePtr fg_ptr;
-      ndbrequire(c_file_hash.find(fg_ptr, conf->tableId));
+      ndbrequire(find_object(fg_ptr, conf->tableId));
       const Uint32 free_extents= conf->freeExtents;
       const Uint32 id= conf->tableId;
       const Uint32 type= conf->tableType;
@@ -4168,7 +4164,7 @@ Dbdict::execGET_TABINFO_CONF(Signal* sig
     {
       jam();
       FilegroupPtr fg_ptr;
-      ndbrequire(c_filegroup_hash.find(fg_ptr, conf->tableId));
+      ndbrequire(find_object(fg_ptr, conf->tableId));
       const Uint32 free_hi= conf->freeWordsHi;
       const Uint32 free_lo= conf->freeWordsLo;
       const Uint32 id= conf->tableId;
@@ -4643,8 +4639,8 @@ void Dbdict::execINCL_NODEREQ(Signal* si
 inline
 void Dbdict::printTables()
 {
-  DictObject_hash::Iterator iter;
-  bool moreTables = c_obj_hash.first(iter);
+  DictObjectName_hash::Iterator iter;
+  bool moreTables = c_obj_name_hash.first(iter);
   printf("OBJECTS IN DICT:\n");
   char name[PATH_MAX];
   while (moreTables) {
@@ -4652,7 +4648,7 @@ void Dbdict::printTables()
     ConstRope r(c_rope_pool, tablePtr.p->m_name);
     r.copy(name);
     printf("%s ", name);
-    moreTables = c_obj_hash.next(iter);
+    moreTables = c_obj_name_hash.next(iter);
   }
   printf("\n");
 }
@@ -4685,16 +4681,25 @@ Dbdict::get_object(DictObjectPtr& obj_pt
   key.m_key.m_name_len = len;
   key.m_key.m_pool = &c_rope_pool;
   key.m_name.m_hash = hash;
-  return c_obj_hash.find(obj_ptr, key);
+  return c_obj_name_hash.find(obj_ptr, key);
 }
 
 void
 Dbdict::release_object(Uint32 obj_ptr_i, DictObject* obj_ptr_p){
-  LocalRope name(c_rope_pool, obj_ptr_p->m_name);
+  jam();
+  RopeHandle obj_name = obj_ptr_p->m_name;
+  DictObjectPtr ptr = { obj_ptr_p, obj_ptr_i };
+
+  LocalRope name(c_rope_pool, obj_name);
   name.erase();
 
-  DictObjectPtr ptr = { obj_ptr_p, obj_ptr_i };
-  c_obj_hash.release(ptr);
+  c_obj_name_hash.remove(ptr);
+  if (!DictTabInfo::isTrigger(obj_ptr_p->m_type))
+  {
+    jam();
+    c_obj_id_hash.remove(ptr);
+  }
+  c_obj_pool.release(ptr);
 }
 
 void
@@ -4824,21 +4829,23 @@ void Dbdict::handleTabInfoInit(Signal * 
 	       CreateTableRef::OutOfStringBuffer);
   }
 
-  DictObjectPtr obj_ptr;
   if (parseP->requestType != DictTabInfo::AlterTableFromAPI) {
     jam();
-    ndbrequire(c_obj_hash.seize(obj_ptr));
+
+    DictObjectPtr obj_ptr;
+    ndbrequire(c_obj_pool.seize(obj_ptr));
     new (obj_ptr.p) DictObject;
     obj_ptr.p->m_id = tablePtr.i;
     obj_ptr.p->m_type = c_tableDesc.TableType;
     obj_ptr.p->m_name = tablePtr.p->tableName;
     obj_ptr.p->m_ref_count = 0;
-    c_obj_hash.add(obj_ptr);
-    tablePtr.p->m_obj_ptr_i = obj_ptr.i;
+    ndbrequire(link_object(obj_ptr, tablePtr));
+    c_obj_id_hash.add(obj_ptr);
+    c_obj_name_hash.add(obj_ptr);
 
     if (g_trace)
     {
-      g_eventLogger->info("Dbdict: create name=%s,id=%u,obj_ptr_i=%d",
+      g_eventLogger->info("Dbdict: %u: create name=%s,id=%u,obj_ptr_i=%d",__LINE__,
                           c_tableDesc.TableName,
                           tablePtr.i, tablePtr.p->m_obj_ptr_i);
     }
@@ -4909,7 +4916,7 @@ void Dbdict::handleTabInfoInit(Signal * 
     {
       jam();
       HashMapRecordPtr hm_ptr;
-      ndbrequire(c_hash_map_hash.find(hm_ptr, dictObj->m_id));
+      ndbrequire(find_object(hm_ptr, dictObj->m_id));
       tablePtr.p->hashMapObjectId = hm_ptr.p->m_object_id;
       tablePtr.p->hashMapVersion = hm_ptr.p->m_object_version;
     }
@@ -4919,7 +4926,7 @@ void Dbdict::handleTabInfoInit(Signal * 
   {
     jam();
     HashMapRecordPtr hm_ptr;
-    tabRequire(c_hash_map_hash.find(hm_ptr, tablePtr.p->hashMapObjectId),
+    tabRequire(find_object(hm_ptr, tablePtr.p->hashMapObjectId),
                CreateTableRef::InvalidHashMap);
 
     tabRequire(hm_ptr.p->m_object_version ==  tablePtr.p->hashMapVersion,
@@ -5037,7 +5044,7 @@ void Dbdict::handleTabInfoInit(Signal * 
      * Increase ref count
      */
     FilegroupPtr ptr;
-    ndbrequire(c_filegroup_hash.find(ptr, tablePtr.p->m_tablespace_id));
+    ndbrequire(find_object(ptr, tablePtr.p->m_tablespace_id));
     increase_ref_count(ptr.p->m_obj_ptr_i);
   }
 }//handleTabInfoInit()
@@ -5081,17 +5088,17 @@ Dbdict::upgrade_seizeTrigger(TableRecord
       }
 
       DictObjectPtr obj_ptr;
-      bool ok = c_obj_hash.seize(obj_ptr);
+      bool ok = c_obj_pool.seize(obj_ptr);
       ndbrequire(ok);
       new (obj_ptr.p) DictObject();
 
       obj_ptr.p->m_name = triggerPtr.p->triggerName;
-      c_obj_hash.add(obj_ptr);
       obj_ptr.p->m_ref_count = 0;
 
       triggerPtr.p->m_obj_ptr_i = obj_ptr.i;
       obj_ptr.p->m_id = triggerPtr.p->triggerId;
       obj_ptr.p->m_type =TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo);
+      c_obj_name_hash.add(obj_ptr);
     }
   }
 
@@ -5119,17 +5126,17 @@ Dbdict::upgrade_seizeTrigger(TableRecord
       }
 
       DictObjectPtr obj_ptr;
-      bool ok = c_obj_hash.seize(obj_ptr);
+      bool ok = c_obj_pool.seize(obj_ptr);
       ndbrequire(ok);
       new (obj_ptr.p) DictObject();
 
       obj_ptr.p->m_name = triggerPtr.p->triggerName;
-      c_obj_hash.add(obj_ptr);
       obj_ptr.p->m_ref_count = 0;
 
       triggerPtr.p->m_obj_ptr_i = obj_ptr.i;
       obj_ptr.p->m_id = triggerPtr.p->triggerId;
       obj_ptr.p->m_type =TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo);
+      c_obj_name_hash.add(obj_ptr);
     }
   }
 }
@@ -5421,7 +5428,7 @@ void Dbdict::handleTabInfo(SimplePropert
   if(tablePtr.p->m_tablespace_id != RNIL || counts[3] || counts[4])
   {
     FilegroupPtr tablespacePtr;
-    if(!c_filegroup_hash.find(tablespacePtr, tablePtr.p->m_tablespace_id))
+    if (!find_object(tablespacePtr, tablePtr.p->m_tablespace_id))
     {
       tabRequire(false, CreateTableRef::InvalidTablespace);
     }
@@ -5657,7 +5664,7 @@ Dbdict::create_fragmentation(Signal* sig
   {
     jam();
     HashMapRecordPtr hm_ptr;
-    ndbrequire(c_hash_map_hash.find(hm_ptr, tabPtr.p->hashMapObjectId));
+    ndbrequire(find_object(hm_ptr, tabPtr.p->hashMapObjectId));
     frag_req->map_ptr_i = hm_ptr.p->m_map_ptr_i;
   }
   else
@@ -6388,7 +6395,7 @@ Dbdict::createTab_dih(Signal* signal, Sc
   if (tabPtr.p->hashMapObjectId != RNIL)
   {
     HashMapRecordPtr hm_ptr;
-    ndbrequire(c_hash_map_hash.find(hm_ptr, tabPtr.p->hashMapObjectId));
+    ndbrequire(find_object(hm_ptr, tabPtr.p->hashMapObjectId));
     req->hashMapPtrI = hm_ptr.p->m_map_ptr_i;
   }
   else
@@ -6992,7 +6999,7 @@ Dbdict::createTable_abortPrepare(Signal*
 
   if (tabPtr.p->m_tablespace_id != RNIL) {
     FilegroupPtr ptr;
-    ndbrequire(c_filegroup_hash.find(ptr, tabPtr.p->m_tablespace_id));
+    ndbrequire(find_object(ptr, tabPtr.p->m_tablespace_id));
     decrease_ref_count(ptr.p->m_obj_ptr_i);
   }
 }
@@ -7069,11 +7076,13 @@ void Dbdict::releaseTableObject(Uint32 t
   if (removeFromHash)
   {
     jam();
+    ndbrequire(tablePtr.p->m_obj_ptr_i != RNIL);
     release_object(tablePtr.p->m_obj_ptr_i);
     tablePtr.p->m_obj_ptr_i = RNIL;
   }
   else
   {
+    ndbrequire(tablePtr.p->m_obj_ptr_i == RNIL);
     LocalRope tmp(c_rope_pool, tablePtr.p->tableName);
     tmp.erase();
   }
@@ -7245,7 +7254,7 @@ Dbdict::dropTable_parse(Signal* signal, 
   Uint32 tableId = impl_req->tableId;
 
   TableRecordPtr tablePtr;
-  if (!(tableId < c_tableRecordPool.getSize())) {
+  if (!(tableId < c_noOfMetaTables)) {
     jam();
     setError(error, DropTableRef::NoSuchTable, __LINE__);
     return;
@@ -7422,7 +7431,7 @@ Dbdict::dropTable_commit(Signal* signal,
   if (tablePtr.p->m_tablespace_id != RNIL)
   {
     FilegroupPtr ptr;
-    ndbrequire(c_filegroup_hash.find(ptr, tablePtr.p->m_tablespace_id));
+    ndbrequire(find_object(ptr, tablePtr.p->m_tablespace_id));
     decrease_ref_count(ptr.p->m_obj_ptr_i);
   }
 
@@ -7934,7 +7943,7 @@ Dbdict::alterTable_parse(Signal* signal,
 
   // get table definition
   TableRecordPtr tablePtr;
-  if (!(impl_req->tableId < c_tableRecordPool.getSize())) {
+  if (!(impl_req->tableId < c_noOfMetaTables)) {
     jam();
     setError(error, AlterTableRef::NoSuchTable, __LINE__);
     return;
@@ -8329,10 +8338,10 @@ Dbdict::check_supported_reorg(Uint32 org
   }
 
   HashMapRecordPtr orgmap_ptr;
-  ndbrequire(c_hash_map_hash.find(orgmap_ptr, org_map_id));
+  ndbrequire(find_object(orgmap_ptr, org_map_id));
 
   HashMapRecordPtr newmap_ptr;
-  ndbrequire(c_hash_map_hash.find(newmap_ptr, new_map_id));
+  ndbrequire(find_object(newmap_ptr, new_map_id));
 
   Ptr<Hash2FragmentMap> orgptr;
   g_hash_map.getPtr(orgptr, orgmap_ptr.p->m_map_ptr_i);
@@ -9072,7 +9081,7 @@ Dbdict::alterTable_toLocal(Signal* signa
     {
       jam();
       HashMapRecordPtr hm_ptr;
-      ndbrequire(c_hash_map_hash.find(hm_ptr,
+      ndbrequire(find_object(hm_ptr,
                                       alterTabPtr.p->m_newTablePtr.p->hashMapObjectId));
       req->new_map_ptr_i = hm_ptr.p->m_map_ptr_i;
     }
@@ -9177,7 +9186,7 @@ Dbdict::alterTable_commit(Signal* signal
       c_obj_pool.getPtr(obj_ptr, tablePtr.p->m_obj_ptr_i);
 
       // remove old name from hash
-      c_obj_hash.remove(obj_ptr);
+      c_obj_name_hash.remove(obj_ptr);
 
       // save old name and replace it by new
       bool ok =
@@ -9187,7 +9196,7 @@ Dbdict::alterTable_commit(Signal* signal
 
       // add new name to object hash
       obj_ptr.p->m_name = tablePtr.p->tableName;
-      c_obj_hash.add(obj_ptr);
+      c_obj_name_hash.add(obj_ptr);
     }
 
     if (AlterTableReq::getFrmFlag(changeMask))
@@ -10091,9 +10100,9 @@ void Dbdict::sendOLD_LIST_TABLES_CONF(Si
   conf->counter = 0;
   Uint32 pos = 0;
 
-  DictObject_hash::Iterator iter;
-  bool ok = c_obj_hash.first(iter);
-  for(; ok; ok = c_obj_hash.next(iter)){
+  DictObjectName_hash::Iterator iter;
+  bool ok = c_obj_name_hash.first(iter);
+  for (; ok; ok = c_obj_name_hash.next(iter)){
     Uint32 type = iter.curr.p->m_type;
     if ((reqTableType != (Uint32)0) && (reqTableType != type))
       continue;
@@ -10279,8 +10288,8 @@ void Dbdict::sendLIST_TABLES_CONF(Signal
   XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
   NodeReceiverGroup rg(senderRef);
 
-  DictObject_hash::Iterator iter;
-  bool done = !c_obj_hash.first(iter);
+  DictObjectName_hash::Iterator iter;
+  bool done = !c_obj_name_hash.first(iter);
 
   if (done)
   {
@@ -10464,7 +10473,7 @@ flush:
     Uint32 tableDataWords = tableDataWriter.getWordsUsed();
     Uint32 tableNameWords = tableNamesWriter.getWordsUsed();
 
-    done = !c_obj_hash.next(iter);
+    done = !c_obj_name_hash.next(iter);
     if ((tableDataWords + tableNameWords) > fragSize || done)
     {
       jam();
@@ -10746,7 +10755,7 @@ Dbdict::createIndex_parse(Signal* signal
   // check primary table
   TableRecordPtr tablePtr;
   {
-    if (!(impl_req->tableId < c_tableRecordPool.getSize())) {
+    if (!(impl_req->tableId < c_noOfMetaTables)) {
       jam();
       setError(error, CreateIndxRef::InvalidPrimaryTable, __LINE__);
       return;
@@ -10898,7 +10907,7 @@ Dbdict::createIndex_parse(Signal* signal
   if (master)
   {
     jam();
-    impl_req->indexId = getFreeObjId(0);
+    impl_req->indexId = getFreeObjId();
   }
 
   if (impl_req->indexId == RNIL)
@@ -10908,7 +10917,7 @@ Dbdict::createIndex_parse(Signal* signal
     return;
   }
 
-  if (impl_req->indexId >= c_tableRecordPool.getSize())
+  if (impl_req->indexId >= c_noOfMetaTables)
   {
     jam();
     setError(error, CreateTableRef::NoMoreTableRecords, __LINE__);
@@ -11446,7 +11455,7 @@ Dbdict::dropIndex_parse(Signal* signal, 
   DropIndxImplReq* impl_req = &dropIndexPtr.p->m_request;
 
   TableRecordPtr indexPtr;
-  if (!(impl_req->indexId < c_tableRecordPool.getSize())) {
+  if (!(impl_req->indexId < c_noOfMetaTables)) {
     jam();
     setError(error, DropIndxRef::IndexNotFound, __LINE__);
     return;
@@ -11928,7 +11937,7 @@ Dbdict::alterIndex_parse(Signal* signal,
   AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
 
   TableRecordPtr indexPtr;
-  if (!(impl_req->indexId < c_tableRecordPool.getSize())) {
+  if (!(impl_req->indexId < c_noOfMetaTables)) {
     jam();
     setError(error, AlterIndxRef::IndexNotFound, __LINE__);
     return;
@@ -12849,7 +12858,7 @@ Dbdict::alterIndex_abortParse(Signal* si
   D("alterIndex_abortParse" << *op_ptr.p);
 
   do {
-    if (!(impl_req->indexId < c_tableRecordPool.getSize())) {
+    if (!(impl_req->indexId < c_noOfMetaTables)) {
       jam();
       D("invalid index id" << V(indexId));
       break;
@@ -13104,7 +13113,7 @@ Dbdict::buildIndex_parse(Signal* signal,
 
   // get index
   TableRecordPtr indexPtr;
-  if (!(impl_req->indexId < c_tableRecordPool.getSize())) {
+  if (!(impl_req->indexId < c_noOfMetaTables)) {
     jam();
     setError(error, BuildIndxRef::IndexNotFound, __LINE__);
     return;
@@ -13115,7 +13124,7 @@ Dbdict::buildIndex_parse(Signal* signal,
 
   // get primary table
   TableRecordPtr tablePtr;
-  if (!(impl_req->tableId < c_tableRecordPool.getSize())) {
+  if (!(impl_req->tableId < c_noOfMetaTables)) {
     jam();
     setError(error, BuildIndxRef::IndexNotFound, __LINE__);
     return;
@@ -13877,7 +13886,7 @@ Dbdict::indexStat_parse(Signal* signal, 
 
   // get index
   TableRecordPtr indexPtr;
-  if (!(impl_req->indexId < c_tableRecordPool.getSize())) {
+  if (!(impl_req->indexId < c_noOfMetaTables)) {
     jam();
     setError(error, IndexStatRef::InvalidIndex, __LINE__);
     return;
@@ -14333,7 +14342,7 @@ Dbdict::execINDEX_STAT_REP(Signal* signa
 
   // check
   TableRecordPtr indexPtr;
-  if (rep->indexId >= c_tableRecordPool.getSize()) {
+  if (rep->indexId >= c_noOfMetaTables) {
     jam();
     return;
   }
@@ -14381,7 +14390,7 @@ Dbdict::indexStatBg_process(Signal* sign
   uint loop;
   for (loop = 0; loop < maxloop; loop++, c_indexStatBgId++) {
     jam();
-    c_indexStatBgId %= c_tableRecordPool.getSize();
+    c_indexStatBgId %= c_noOfMetaTables;
 
     // check
     TableRecordPtr indexPtr;
@@ -17648,7 +17657,7 @@ Dbdict::createTrigger_parse(Signal* sign
   // check the table
   {
     const Uint32 tableId = impl_req->tableId;
-    if (! (tableId < c_tableRecordPool.getSize()))
+    if (! (tableId < c_noOfMetaTables))
     {
       jam();
       setError(error, CreateTrigRef::InvalidTable, __LINE__);
@@ -20669,7 +20678,7 @@ Dbdict::createFile_parse(Signal* signal,
 
   // Get Filegroup
   FilegroupPtr fg_ptr;
-  if(!c_filegroup_hash.find(fg_ptr, f.FilegroupId))
+  if (!find_object(fg_ptr, f.FilegroupId))
   {
     jam();
     setError(error, CreateFileRef::NoSuchFilegroup, __LINE__, f.FileName);
@@ -20773,7 +20782,7 @@ Dbdict::createFile_parse(Signal* signal,
   {
     jam();
 
-    Uint32 objId = getFreeObjId(0);
+    Uint32 objId = getFreeObjId();
     if (objId == RNIL)
     {
       jam();
@@ -20853,6 +20862,8 @@ Dbdict::createFile_parse(Signal* signal,
   obj_ptr.p->m_type = f.FileType;
   obj_ptr.p->m_ref_count = 0;
 
+  ndbrequire(link_object(obj_ptr, filePtr));
+
   {
     SchemaFile::TableEntry te; te.init();
     te.m_tableState = SchemaFile::SF_CREATE;
@@ -20871,8 +20882,8 @@ Dbdict::createFile_parse(Signal* signal,
     }
   }
 
-  c_obj_hash.add(obj_ptr);
-  c_file_hash.add(filePtr);
+  c_obj_name_hash.add(obj_ptr);
+  c_obj_id_hash.add(obj_ptr);
 
   // save sections to DICT memory
   saveOpSection(op_ptr, handle, 0);
@@ -20900,8 +20911,8 @@ Dbdict::createFile_parse(Signal* signal,
 
   if (g_trace)
   {
-    g_eventLogger->info("Dbdict: create name=%s,id=%u,obj_ptr_i=%d,"
-                        "type=%s,bytes=%llu,warn=0x%x",
+    g_eventLogger->info("Dbdict: %u: create name=%s,id=%u,obj_ptr_i=%d,"
+                        "type=%s,bytes=%llu,warn=0x%x",__LINE__,
                         f.FileName,
                         impl_req->file_id,
                         filePtr.p->m_obj_ptr_i,
@@ -20944,8 +20955,8 @@ Dbdict::createFile_abortParse(Signal* si
   {
     FilePtr f_ptr;
     FilegroupPtr fg_ptr;
-    ndbrequire(c_file_hash.find(f_ptr, impl_req->file_id));
-    ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
+    ndbrequire(find_object(f_ptr, impl_req->file_id));
+    ndbrequire(find_object(fg_ptr, f_ptr.p->m_filegroup_id));
     if (f_ptr.p->m_type == DictTabInfo::Datafile)
     {
       jam();
@@ -20959,7 +20970,7 @@ Dbdict::createFile_abortParse(Signal* si
     }
 
     release_object(f_ptr.p->m_obj_ptr_i);
-    c_file_hash.release(f_ptr);
+    c_file_pool.release(f_ptr);
   }
 
   sendTransConf(signal, op_ptr);
@@ -21062,8 +21073,8 @@ Dbdict::createFile_fromWriteObjInfo(Sign
   FilePtr f_ptr;
   FilegroupPtr fg_ptr;
 
-  ndbrequire(c_file_hash.find(f_ptr, impl_req->file_id));
-  ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
+  ndbrequire(find_object(f_ptr, impl_req->file_id));
+  ndbrequire(find_object(fg_ptr, f_ptr.p->m_filegroup_id));
 
   req->senderData = op_ptr.p->op_key;
   req->senderRef = reference();
@@ -21122,8 +21133,8 @@ Dbdict::createFile_abortPrepare(Signal* 
   getOpRec(op_ptr, createFileRecPtr);
   CreateFileImplReq* impl_req = &createFileRecPtr.p->m_request;
 
-  ndbrequire(c_file_hash.find(f_ptr, impl_req->file_id));
-  ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
+  ndbrequire(find_object(f_ptr, impl_req->file_id));
+  ndbrequire(find_object(fg_ptr, f_ptr.p->m_filegroup_id));
 
   req->senderData = op_ptr.p->op_key;
   req->senderRef = reference();
@@ -21178,8 +21189,8 @@ Dbdict::createFile_commit(Signal* signal
   FilegroupPtr fg_ptr;
 
   jam();
-  ndbrequire(c_file_hash.find(f_ptr, impl_req->file_id));
-  ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
+  ndbrequire(find_object(f_ptr, impl_req->file_id));
+  ndbrequire(find_object(fg_ptr, f_ptr.p->m_filegroup_id));
 
   req->senderData = op_ptr.p->op_key;
   req->senderRef = reference();
@@ -21467,7 +21478,7 @@ Dbdict::createFilegroup_parse(Signal* si
     fg_ptr.p->m_tablespace.m_default_logfile_group_id = fg.TS_LogfileGroupId;
 
     FilegroupPtr lg_ptr;
-    if (!c_filegroup_hash.find(lg_ptr, fg.TS_LogfileGroupId))
+    if (!find_object(lg_ptr, fg.TS_LogfileGroupId))
     {
       jam();
       setError(error, CreateFilegroupRef::NoSuchLogfileGroup, __LINE__);
@@ -21512,7 +21523,7 @@ Dbdict::createFilegroup_parse(Signal* si
   {
     jam();
 
-    Uint32 objId = getFreeObjId(0);
+    Uint32 objId = getFreeObjId();
     if (objId == RNIL)
     {
       jam();
@@ -21532,7 +21543,6 @@ Dbdict::createFilegroup_parse(Signal* si
   }
 
   fg_ptr.p->key = impl_req->filegroup_id;
-  fg_ptr.p->m_obj_ptr_i = obj_ptr.i;
   fg_ptr.p->m_type = fg.FilegroupType;
   fg_ptr.p->m_version = impl_req->filegroup_version;
   fg_ptr.p->m_name = obj_ptr.p->m_name;
@@ -21541,6 +21551,8 @@ Dbdict::createFilegroup_parse(Signal* si
   obj_ptr.p->m_type = fg.FilegroupType;
   obj_ptr.p->m_ref_count = 0;
 
+  ndbrequire(link_object(obj_ptr, fg_ptr));
+
   if (master)
   {
     jam();
@@ -21570,8 +21582,8 @@ Dbdict::createFilegroup_parse(Signal* si
     }
   }
 
-  c_obj_hash.add(obj_ptr);
-  c_filegroup_hash.add(fg_ptr);
+  c_obj_name_hash.add(obj_ptr);
+  c_obj_id_hash.add(obj_ptr);
 
   // save sections to DICT memory
   saveOpSection(op_ptr, handle, 0);
@@ -21584,7 +21596,7 @@ Dbdict::createFilegroup_parse(Signal* si
   createFilegroupPtr.p->m_parsed = true;
 
 #if defined VM_TRACE || defined ERROR_INSERT
-  ndbout_c("Dbdict: create name=%s,id=%u,obj_ptr_i=%d",
+  ndbout_c("Dbdict: %u: create name=%s,id=%u,obj_ptr_i=%d",__LINE__,
            fg.FilegroupName, impl_req->filegroup_id, fg_ptr.p->m_obj_ptr_i);
 #endif
 
@@ -21617,19 +21629,19 @@ Dbdict::createFilegroup_abortParse(Signa
     CreateFilegroupImplReq* impl_req = &createFilegroupPtr.p->m_request;
 
     FilegroupPtr fg_ptr;
-    ndbrequire(c_filegroup_hash.find(fg_ptr, impl_req->filegroup_id));
+    ndbrequire(find_object(fg_ptr, impl_req->filegroup_id));
 
     if (fg_ptr.p->m_type == DictTabInfo::Tablespace)
     {
       jam();
       FilegroupPtr lg_ptr;
-      ndbrequire(c_filegroup_hash.find
+      ndbrequire(find_object
                  (lg_ptr, fg_ptr.p->m_tablespace.m_default_logfile_group_id));
       decrease_ref_count(lg_ptr.p->m_obj_ptr_i);
     }
 
     release_object(fg_ptr.p->m_obj_ptr_i);
-    c_filegroup_hash.release(fg_ptr);
+    c_filegroup_pool.release(fg_ptr);
   }
 
   sendTransConf(signal, op_ptr);
@@ -21740,7 +21752,8 @@ Dbdict::createFilegroup_fromWriteObjInfo
   req->filegroup_version = impl_req->filegroup_version;
 
   FilegroupPtr fg_ptr;
-  ndbrequire(c_filegroup_hash.find(fg_ptr, impl_req->filegroup_id));
+
+  ndbrequire(find_object(fg_ptr, impl_req->filegroup_id));
 
   Uint32 ref= 0;
   Uint32 len= 0;
@@ -21956,7 +21969,7 @@ Dbdict::dropFile_parse(Signal* signal, b
   DropFileImplReq* impl_req = &dropFileRecPtr.p->m_request;
 
   FilePtr f_ptr;
-  if (!c_file_hash.find(f_ptr, impl_req->file_id))
+  if (!find_object(f_ptr, impl_req->file_id))
   {
     jam();
     setError(error, DropFileRef::NoSuchFile, __LINE__);
@@ -22131,11 +22144,11 @@ Dbdict::dropFile_complete(Signal* signal
   FilegroupPtr fg_ptr;
 
   jam();
-  ndbrequire(c_file_hash.find(f_ptr, impl_req->file_id));
-  ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
+  ndbrequire(find_object(f_ptr, impl_req->file_id));
+  ndbrequire(find_object(fg_ptr, f_ptr.p->m_filegroup_id));
   decrease_ref_count(fg_ptr.p->m_obj_ptr_i);
   release_object(f_ptr.p->m_obj_ptr_i);
-  c_file_hash.release(f_ptr);
+  c_file_pool.release(f_ptr);
 
   sendTransConf(signal, op_ptr);
 }
@@ -22186,8 +22199,8 @@ Dbdict::send_drop_file(Signal* signal, U
   FilegroupPtr fg_ptr;
 
   jam();
-  ndbrequire(c_file_hash.find(f_ptr, fileId));
-  ndbrequire(c_filegroup_hash.find(fg_ptr, f_ptr.p->m_filegroup_id));
+  ndbrequire(find_object(f_ptr, fileId));
+  ndbrequire(find_object(fg_ptr, f_ptr.p->m_filegroup_id));
 
   req->senderData = op_key;
   req->senderRef = reference();
@@ -22314,7 +22327,7 @@ Dbdict::dropFilegroup_parse(Signal* sign
   DropFilegroupImplReq* impl_req = &dropFilegroupRecPtr.p->m_request;
 
   FilegroupPtr fg_ptr;
-  if (!c_filegroup_hash.find(fg_ptr, impl_req->filegroup_id))
+  if (!find_object(fg_ptr, impl_req->filegroup_id))
   {
     jam();
     setError(error, DropFilegroupRef::NoSuchFilegroup, __LINE__);
@@ -22437,7 +22450,7 @@ Dbdict::dropFilegroup_prepare(Signal* si
                DropFilegroupImplReq::Prepare);
 
   FilegroupPtr fg_ptr;
-  ndbrequire(c_filegroup_hash.find(fg_ptr, impl_req->filegroup_id));
+  ndbrequire(find_object(fg_ptr, impl_req->filegroup_id));
 
   if (fg_ptr.p->m_type == DictTabInfo::LogfileGroup)
   {
@@ -22475,7 +22488,7 @@ Dbdict::dropFilegroup_abortPrepare(Signa
                DropFilegroupImplReq::Abort);
 
   FilegroupPtr fg_ptr;
-  ndbrequire(c_filegroup_hash.find(fg_ptr, impl_req->filegroup_id));
+  ndbrequire(find_object(fg_ptr, impl_req->filegroup_id));
 
   if (fg_ptr.p->m_type == DictTabInfo::LogfileGroup)
   {
@@ -22516,7 +22529,7 @@ Dbdict::dropFilegroup_commit(Signal* sig
                DropFilegroupImplReq::Commit);
 
   FilegroupPtr fg_ptr;
-  ndbrequire(c_filegroup_hash.find(fg_ptr, impl_req->filegroup_id));
+  ndbrequire(find_object(fg_ptr, impl_req->filegroup_id));
 
   if (fg_ptr.p->m_type == DictTabInfo::LogfileGroup)
   {
@@ -22539,7 +22552,6 @@ Dbdict::dropFilegroup_commit(Signal* sig
       entry->m_transId = 0;
 
       release_object(objPtr.i, objPtr.p);
-      c_file_hash.remove(filePtr);
     }
     list.release();
   }
@@ -22547,8 +22559,7 @@ Dbdict::dropFilegroup_commit(Signal* sig
   {
     jam();
     FilegroupPtr lg_ptr;
-    ndbrequire(c_filegroup_hash.
-	       find(lg_ptr,
+    ndbrequire(find_object(lg_ptr,
 		    fg_ptr.p->m_tablespace.m_default_logfile_group_id));
 
     decrease_ref_count(lg_ptr.p->m_obj_ptr_i);
@@ -22568,10 +22579,10 @@ Dbdict::dropFilegroup_complete(Signal* s
   DropFilegroupImplReq* impl_req = &dropFilegroupRecPtr.p->m_request;
 
   FilegroupPtr fg_ptr;
-  ndbrequire(c_filegroup_hash.find(fg_ptr, impl_req->filegroup_id));
+  ndbrequire(find_object(fg_ptr, impl_req->filegroup_id));
 
   release_object(fg_ptr.p->m_obj_ptr_i);
-  c_filegroup_hash.release(fg_ptr);
+  c_filegroup_pool.release(fg_ptr);
 
   sendTransConf(signal, op_ptr);
 }
@@ -22621,7 +22632,7 @@ Dbdict::send_drop_fg(Signal* signal, Uin
   DropFilegroupImplReq* req = (DropFilegroupImplReq*)signal->getDataPtrSend();
 
   FilegroupPtr fg_ptr;
-  ndbrequire(c_filegroup_hash.find(fg_ptr, filegroupId));
+  ndbrequire(find_object(fg_ptr, filegroupId));
 
   req->senderData = op_key;
   req->senderRef = reference();
@@ -24318,12 +24329,12 @@ Dbdict::seizeDictObject(SchemaOpPtr op_p
 {
   D("seizeDictObject" << *op_ptr.p);
 
-  bool ok = c_obj_hash.seize(obj_ptr);
+  bool ok = c_obj_pool.seize(obj_ptr);
   ndbrequire(ok);
   new (obj_ptr.p) DictObject();
 
   obj_ptr.p->m_name = name;
-  c_obj_hash.add(obj_ptr);
+  c_obj_name_hash.add(obj_ptr);
   obj_ptr.p->m_ref_count = 0;
 
   linkDictObject(op_ptr, obj_ptr);
@@ -24576,7 +24587,7 @@ Dbdict::execSCHEMA_TRANS_BEGIN_REQ(Signa
     trans_ptr.p->m_clientRef = clientRef;
     trans_ptr.p->m_transId = transId;
     trans_ptr.p->m_requestInfo = requestInfo;
-    trans_ptr.p->m_obj_id = getFreeObjId(0);
+    trans_ptr.p->m_obj_id = getFreeObjId();
     if (localTrans)
     {
       /**
@@ -24586,7 +24597,7 @@ Dbdict::execSCHEMA_TRANS_BEGIN_REQ(Signa
        *   schema file so that we don't accidently allocate
        *   an objectId that should be used to recreate an object
        */
-      trans_ptr.p->m_obj_id = getFreeObjId(0, true);
+      trans_ptr.p->m_obj_id = getFreeObjId(true);
     }
 
     if (!localTrans)
@@ -26895,7 +26906,7 @@ Dbdict::execSCHEMA_TRANS_IMPL_REQ(Signal
     if (signal->getLength() < SchemaTransImplReq::SignalLengthStart)
     {
       jam();
-      reqCopy.start.objectId = getFreeObjId(0);
+      reqCopy.start.objectId = getFreeObjId();
     }
     slave_run_start(signal, req);
     return;
@@ -27044,9 +27055,9 @@ Dbdict::slave_run_start(Signal *signal, 
   SchemaTransPtr trans_ptr;
   const Uint32 trans_key = req->transKey;
 
-  Uint32 objId = getFreeObjId(req->start.objectId);
-  if (objId != req->start.objectId)
-  {
+  Uint32 objId = req->start.objectId;
+  if (check_read_obj(objId,0) == 0)
+  { /* schema file id already in use */
     jam();
     setError(error, CreateTableRef::NoMoreTableRecords, __LINE__);
     goto err;
@@ -28442,7 +28453,7 @@ Dbdict::createHashMap_parse(Signal* sign
     }
 
     HashMapRecordPtr hm_ptr;
-    ndbrequire(c_hash_map_hash.find(hm_ptr, objptr->m_id));
+    ndbrequire(find_object(hm_ptr, objptr->m_id));
 
     impl_req->objectId = objptr->m_id;
     impl_req->objectVersion = hm_ptr.p->m_object_version;
@@ -28496,7 +28507,7 @@ Dbdict::createHashMap_parse(Signal* sign
       goto error;
     }
 
-    objId = impl_req->objectId = getFreeObjId(0);
+    objId = impl_req->objectId = getFreeObjId();
     if (objId == RNIL)
     {
       jam();
@@ -28538,7 +28549,8 @@ Dbdict::createHashMap_parse(Signal* sign
   obj_ptr.p->m_type = DictTabInfo::HashMap;
   obj_ptr.p->m_ref_count = 0;
   obj_ptr.p->m_name = name;
-  c_obj_hash.add(obj_ptr);
+  c_obj_name_hash.add(obj_ptr);
+  c_obj_id_hash.add(obj_ptr);
 
   if (ERROR_INSERTED(6209))
   {
@@ -28577,9 +28589,8 @@ Dbdict::createHashMap_parse(Signal* sign
   hm_ptr.p->m_object_id = objId;
   hm_ptr.p->m_object_version = objVersion;
   hm_ptr.p->m_name = name;
-  hm_ptr.p->m_obj_ptr_i = obj_ptr.i;
   hm_ptr.p->m_map_ptr_i = map_ptr.i;
-  c_hash_map_hash.add(hm_ptr);
+  link_object(obj_ptr, hm_ptr);
 
   /**
    * pack is stupid...and requires bytes!
@@ -28627,7 +28638,7 @@ Dbdict::createHashMap_parse(Signal* sign
   handle.m_cnt = 1;
 
 #if defined VM_TRACE || defined ERROR_INSERT
-  ndbout_c("Dbdict: create name=%s,id=%u,obj_ptr_i=%d",
+  ndbout_c("Dbdict: %u: create name=%s,id=%u,obj_ptr_i=%d",__LINE__,
            hm.HashMapName, objId, hm_ptr.p->m_obj_ptr_i);
 #endif
 
@@ -28639,7 +28650,7 @@ error:
   if (!hm_ptr.isNull())
   {
     jam();
-    c_hash_map_hash.release(hm_ptr);
+    c_hash_map_pool.release(hm_ptr);
   }
 
   if (!map_ptr.isNull())
@@ -28681,11 +28692,11 @@ Dbdict::createHashMap_abortParse(Signal*
     jam();
 
     HashMapRecordPtr hm_ptr;
-    ndbrequire(c_hash_map_hash.find(hm_ptr, impl_req->objectId));
+    ndbrequire(find_object(hm_ptr, impl_req->objectId));
 
     release_object(hm_ptr.p->m_obj_ptr_i);
     g_hash_map.release(hm_ptr.p->m_map_ptr_i);
-    c_hash_map_hash.release(hm_ptr);
+    c_hash_map_pool.release(hm_ptr);
   }
 
   // wl3600_todo probably nothing..
@@ -28990,7 +29001,7 @@ Dbdict::check_consistency()
   // schema file entries // mis-named "tables"
   TableRecordPtr tablePtr;
   for (tablePtr.i = 0;
-      tablePtr.i < c_tableRecordPool.getSize();
+      tablePtr.i < c_noOfMetaTables;
       tablePtr.i++) {
     if (check_read_obj(tablePtr.i,
 

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2011-11-03 08:40:19 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2011-11-11 07:49:30 +0000
@@ -167,6 +167,7 @@ struct sysTab_NDBEVENTS_0 {
  */
 class Dbdict: public SimulatedBlock {
 public:
+
   /*
    *   2.3 RECORD AND FILESIZES
    */
@@ -229,7 +230,7 @@ public:
   };
   typedef Ptr<AttributeRecord> AttributeRecordPtr;
   typedef ArrayPool<AttributeRecord> AttributeRecord_pool;
-  typedef DLHashTable<AttributeRecord,AttributeRecord,AttributeRecord_pool> AttributeRecord_hash;
+  typedef DLMHashTable<AttributeRecord_pool, AttributeRecord> AttributeRecord_hash;
   typedef DLFifoList<AttributeRecord,AttributeRecord,AttributeRecord_pool> AttributeRecord_list;
   typedef LocalDLFifoList<AttributeRecord,AttributeRecord,AttributeRecord_pool> LocalAttributeRecord_list;
 
@@ -249,6 +250,8 @@ public:
 
   struct TableRecord {
     TableRecord(){ m_upgrade_trigger_handling.m_upgrade = false;}
+    static bool isCompatible(Uint32 type) { return DictTabInfo::isTable(type) || DictTabInfo::isIndex(type); }
+
     Uint32 maxRowsLow;
     Uint32 maxRowsHigh;
     Uint32 minRowsLow;
@@ -436,6 +439,7 @@ public:
 
   TableRecord_pool c_tableRecordPool;
   RSS_AP_SNAPSHOT(c_tableRecordPool);
+  TableRecord_pool& get_pool(TableRecordPtr) { return c_tableRecordPool; }
 
   /**  Node Group and Tablespace id+version + range or list data.
     *  This is only stored temporarily in DBDICT during an ongoing
@@ -604,6 +608,7 @@ public:
 
   struct File {
     File() {}
+    static bool isCompatible(Uint32 type) { return DictTabInfo::isFile(type); }
 
     Uint32 key;
     Uint32 m_magic;
@@ -620,19 +625,15 @@ public:
       Uint32 prevList;
       Uint32 nextPool;
     };
-    Uint32 nextHash, prevHash;
-
-    Uint32 hashValue() const { return key;}
-    bool equal(const File& obj) const { return key == obj.key;}
   };
   typedef Ptr<File> FilePtr;
   typedef RecordPool<File, RWPool> File_pool;
   typedef DLListImpl<File_pool, File> File_list;
   typedef LocalDLListImpl<File_pool, File> Local_file_list;
-  typedef KeyTableImpl<File_pool, File> File_hash;
 
   struct Filegroup {
     Filegroup(){}
+    static bool isCompatible(Uint32 type) { return DictTabInfo::isFilegroup(type); }
 
     Uint32 key;
     Uint32 m_obj_ptr_i;
@@ -657,25 +658,34 @@ public:
     union {
       Uint32 nextPool;
       Uint32 nextList;
-      Uint32 nextHash;
     };
-    Uint32 prevHash;
-
-    Uint32 hashValue() const { return key;}
-    bool equal(const Filegroup& obj) const { return key == obj.key;}
   };
   typedef Ptr<Filegroup> FilegroupPtr;
   typedef RecordPool<Filegroup, RWPool> Filegroup_pool;
-  typedef KeyTableImpl<Filegroup_pool, Filegroup> Filegroup_hash;
 
   File_pool c_file_pool;
   Filegroup_pool c_filegroup_pool;
-  File_hash c_file_hash;
-  Filegroup_hash c_filegroup_hash;
+
+  File_pool& get_pool(FilePtr) { return c_file_pool; }
+  Filegroup_pool& get_pool(FilegroupPtr) { return c_filegroup_pool; }
 
   RopePool c_rope_pool;
   RSS_AP_SNAPSHOT(c_rope_pool);
 
+  template <typename T, typename U = T> struct HashedById {
+    static Uint32& nextHash(U& t) { return t.nextHash_by_id; }
+    static Uint32& prevHash(U& t) { return t.prevHash_by_id; }
+    static Uint32 hashValue(T const& t) { return t.hashValue_by_id(); }
+    static bool equal(T const& lhs, T const& rhs) { return lhs.equal_by_id(rhs); }
+  };
+
+  template <typename T, typename U = T> struct HashedByName {
+    static Uint32& nextHash(U& t) { return t.nextHash_by_name; }
+    static Uint32& prevHash(U& t) { return t.prevHash_by_name; }
+    static Uint32 hashValue(T const& t) { return t.hashValue_by_name(); }
+    static bool equal(T const& lhs, T const& rhs) { return lhs.equal_by_name(rhs); }
+  };
+
   struct DictObject {
     DictObject() {
       m_trans_key = 0;
@@ -683,6 +693,7 @@ public:
     };
     Uint32 m_id;
     Uint32 m_type;
+    Uint32 m_object_ptr_i;
     Uint32 m_ref_count;
     RopeHandle m_name;
     union {
@@ -694,21 +705,34 @@ public:
       Uint32 nextPool;
       Uint32 nextList;
     };
-    Uint32 nextHash;
-    Uint32 prevHash;
 
-    Uint32 hashValue() const { return m_name.hashValue();}
-    bool equal(const DictObject& obj) const {
-      if(obj.hashValue() == hashValue()){
+    // SchemaOp -> DictObject -> SchemaTrans
+    Uint32 m_trans_key;
+    Uint32 m_op_ref_count;
+
+    // HashedById
+    Uint32 nextHash_by_id;
+    Uint32 prevHash_by_id;
+    Uint32 hashValue_by_id() const { return m_id; }
+    bool equal_by_id(DictObject const& obj) const {
+      bool isTrigger = DictTabInfo::isTrigger(m_type);
+      bool objIsTrigger = DictTabInfo::isTrigger(obj.m_type);
+      return (isTrigger == objIsTrigger) &&
+             (obj.m_id == m_id);
+    }
+
+    // HashedByName
+    Uint32 nextHash_by_name;
+    Uint32 prevHash_by_name;
+    Uint32 hashValue_by_name() const { return m_name.hashValue(); }
+    bool equal_by_name(DictObject const& obj) const {
+      if(obj.hashValue_by_name() == hashValue_by_name()){
 	ConstRope r(* m_key.m_pool, obj.m_name);
 	return r.compare(m_key.m_name_ptr, m_key.m_name_len) == 0;
       }
       return false;
     }
 
-    // SchemaOp -> DictObject -> SchemaTrans
-    Uint32 m_trans_key;
-    Uint32 m_op_ref_count;
 #ifdef VM_TRACE
     void print(NdbOut&) const;
 #endif
@@ -716,13 +740,57 @@ public:
 
   typedef Ptr<DictObject> DictObjectPtr;
   typedef ArrayPool<DictObject> DictObject_pool;
-  typedef DLHashTable<DictObject,DictObject,DictObject_pool> DictObject_hash;
+  typedef DLMHashTable<DictObject_pool, DictObject, HashedByName<DictObject> > DictObjectName_hash;
+  typedef DLMHashTable<DictObject_pool, DictObject, HashedById<DictObject> > DictObjectId_hash;
   typedef SLList<DictObject> DictObject_list;
 
-  DictObject_hash c_obj_hash; // Name
+  DictObjectName_hash c_obj_name_hash; // Name (not temporary TableRecords)
+  DictObjectId_hash c_obj_id_hash; // Schema file id / Trigger id
   DictObject_pool c_obj_pool;
   RSS_AP_SNAPSHOT(c_obj_pool);
 
+  template<typename T> bool find_object(DictObjectPtr& obj, Ptr<T>& object, Uint32 id)
+  {
+    if (!find_object(obj, id))
+    {
+      object.setNull();
+      return false;
+    }
+    if (!T::isCompatible(obj.p->m_type))
+    {
+      object.setNull();
+      return false;
+    }
+    get_pool(object).getPtr(object, obj.p->m_object_ptr_i);
+    return !object.isNull();
+  }
+
+  template<typename T> bool find_object(Ptr<T>& object, Uint32 id)
+  {
+    DictObjectPtr obj;
+    return find_object(obj, object, id);
+  }
+
+  bool find_object(DictObjectPtr& object, Uint32 id)
+  {
+    DictObject key;
+    key.m_id = id;
+    key.m_type = 0; // Not a trigger atleast
+    bool ok = c_obj_id_hash.find(object, key);
+    return ok;
+  }
+
+  template<typename T> bool link_object(DictObjectPtr obj, Ptr<T> object)
+  {
+    if (!T::isCompatible(obj.p->m_type))
+    {
+      return false;
+    }
+    obj.p->m_object_ptr_i = object.i;
+    object.p->m_obj_ptr_i = obj.i;
+    return true;
+  }
+
   // 1
   DictObject * get_object(const char * name){
     return get_object(name, Uint32(strlen(name) + 1));
@@ -1612,7 +1680,7 @@ private:
   };
 
   typedef RecordPool<SchemaOp,ArenaPool> SchemaOp_pool;
-  typedef DLHashTable<SchemaOp,SchemaOp,SchemaOp_pool> SchemaOp_hash;
+  typedef DLMHashTable<SchemaOp_pool, SchemaOp> SchemaOp_hash;
   typedef DLFifoList<SchemaOp,SchemaOp,SchemaOp_pool>::Head  SchemaOp_head;
   typedef LocalDLFifoList<SchemaOp,SchemaOp,SchemaOp_pool> LocalSchemaOp_list;
 
@@ -1857,7 +1925,7 @@ private:
       assert(false);
       return -1;
     }
-    // DLHashTable
+    // DLMHashTable
     Uint32 trans_key;
     Uint32 nextHash;
     Uint32 prevHash;
@@ -1975,7 +2043,7 @@ private:
   Uint32 check_write_obj(Uint32, Uint32, SchemaFile::EntryState, ErrorInfo&);
 
   typedef RecordPool<SchemaTrans,ArenaPool> SchemaTrans_pool;
-  typedef DLHashTable<SchemaTrans,SchemaTrans,SchemaTrans_pool> SchemaTrans_hash;
+  typedef DLMHashTable<SchemaTrans_pool, SchemaTrans> SchemaTrans_hash;
   typedef DLFifoList<SchemaTrans,SchemaTrans,SchemaTrans_pool> SchemaTrans_list;
 
   SchemaTrans_pool c_schemaTransPool;
@@ -2194,7 +2262,7 @@ private:
     // ArrayPool
     Uint32 nextPool;
 
-    // DLHashTable
+    // DLMHashTable
     Uint32 tx_key;
     Uint32 nextHash;
     Uint32 prevHash;
@@ -2246,7 +2314,7 @@ private:
   };
 
   typedef ArrayPool<TxHandle> TxHandle_pool;
-  typedef DLHashTable<TxHandle,TxHandle,TxHandle_pool> TxHandle_hash;
+  typedef DLMHashTable<TxHandle_pool, TxHandle> TxHandle_hash;
 
   TxHandle_pool c_txHandlePool;
   TxHandle_hash c_txHandleHash;
@@ -2929,6 +2997,7 @@ private:
 
   struct HashMapRecord {
     HashMapRecord(){}
+    static bool isCompatible(Uint32 type) { return DictTabInfo::isHashMap(type); }
 
     /* Table id (array index in DICT and other blocks) */
     union {
@@ -2944,24 +3013,15 @@ private:
      * ptr.i, in g_hash_map
      */
     Uint32 m_map_ptr_i;
-    union {
-      Uint32 nextPool;
-      Uint32 nextHash;
-    };
-    Uint32 prevHash;
-
-    Uint32 hashValue() const { return key;}
-    bool equal(const HashMapRecord& obj) const { return key == obj.key;}
-
+    Uint32 nextPool;
   };
   typedef Ptr<HashMapRecord> HashMapRecordPtr;
   typedef ArrayPool<HashMapRecord> HashMapRecord_pool;
-  typedef KeyTableImpl<HashMapRecord_pool, HashMapRecord> HashMapRecord_hash;
 
   HashMapRecord_pool c_hash_map_pool;
-  HashMapRecord_hash c_hash_map_hash;
   RSS_AP_SNAPSHOT(c_hash_map_pool);
   RSS_AP_SNAPSHOT(g_hash_map);
+  HashMapRecord_pool& get_pool(HashMapRecordPtr) { return c_hash_map_pool; }
 
   struct CreateHashMapRec;
   typedef RecordPool<CreateHashMapRec,ArenaPool> CreateHashMapRec_pool;
@@ -3673,7 +3733,7 @@ private:
   /* ------------------------------------------------------------ */
   // General Stuff
   /* ------------------------------------------------------------ */
-  Uint32 getFreeObjId(Uint32 minId, bool both = false);
+  Uint32 getFreeObjId(bool both = false);
   Uint32 getFreeTableRecord();
   Uint32 getFreeTriggerRecord();
   bool getNewAttributeRecord(TableRecordPtr tablePtr,
@@ -4037,6 +4097,7 @@ protected:
   virtual bool getParam(const char * param, Uint32 * retVal);
 private:
   ArenaAllocator c_arenaAllocator;
+  Uint32 c_noOfMetaTables;
 };
 
 inline bool

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2011-11-14 09:18:48 +0000
@@ -520,7 +520,7 @@ public:
 // Each entry in this array contains a reference to 16 fragment records in a
 // row. Thus finding the correct record is very quick provided the fragment id.
 //-----------------------------------------------------------------------------
-    Uint32 startFid[MAX_NDB_NODES * MAX_FRAG_PER_NODE / NO_OF_FRAGS_PER_CHUNK];
+    Uint32 startFid[MAX_NDB_NODES * MAX_FRAG_PER_LQH / NO_OF_FRAGS_PER_CHUNK];
 
     Uint32 tabFile[2];
     Uint32 connectrec;                                    

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-10-31 10:01:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-11-14 14:25:20 +0000
@@ -7485,6 +7485,8 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
   Uint32 err = 0;
   const Uint32 defaultFragments = 
     c_fragments_per_node * cnoOfNodeGroups * cnoReplicas;
+  const Uint32 maxFragments =
+    MAX_FRAG_PER_LQH * getLqhWorkers() * cnoOfNodeGroups * cnoReplicas;
 
   do {
     NodeGroupRecordPtr NGPtr;
@@ -7506,11 +7508,15 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
       case DictTabInfo::AllNodesMediumTable:
         jam();
         noOfFragments = 2 * defaultFragments;
+        if (noOfFragments > maxFragments)
+          noOfFragments = maxFragments;
         set_default_node_groups(signal, noOfFragments);
         break;
       case DictTabInfo::AllNodesLargeTable:
         jam();
         noOfFragments = 4 * defaultFragments;
+        if (noOfFragments > maxFragments)
+          noOfFragments = maxFragments;
         set_default_node_groups(signal, noOfFragments);
         break;
       case DictTabInfo::SingleFragment:
@@ -7863,7 +7869,7 @@ void Dbdih::execDIADDTABREQ(Signal* sign
   }
 
   union {
-    Uint16 fragments[2 + MAX_FRAG_PER_NODE*MAX_REPLICAS*MAX_NDB_NODES];
+    Uint16 fragments[2 + MAX_FRAG_PER_LQH*MAX_REPLICAS*MAX_NDB_NODES];
     Uint32 align;
   };
   (void)align; // kill warning

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2011-10-28 13:45:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2011-11-14 14:25:20 +0000
@@ -111,6 +111,9 @@ class Lgman;
 #define ZPOS_PREV_PAGE_NO 19
 #define ZPOS_IN_FREE_LIST 20
 
+/* Specify number of log parts used to enable use of more LQH threads */
+#define ZPOS_NO_LOG_PARTS 21
+
 /* ------------------------------------------------------------------------- */
 /*       CONSTANTS FOR THE VARIOUS REPLICA AND NODE TYPES.                   */
 /* ------------------------------------------------------------------------- */
@@ -1929,8 +1932,8 @@ public:
       ,TABLE_READ_ONLY = 9
     };
     
-    UintR fragrec[MAX_FRAG_PER_NODE];
-    Uint16 fragid[MAX_FRAG_PER_NODE];
+    UintR fragrec[MAX_FRAG_PER_LQH];
+    Uint16 fragid[MAX_FRAG_PER_LQH];
     /**
      * Status of the table 
      */
@@ -2834,7 +2837,6 @@ private:
   UintR cfirstfreeLcpLoc;
   UintR clcpFileSize;
 
-#define ZLOG_PART_FILE_SIZE 4
   LogPartRecord *logPartRecord;
   LogPartRecordPtr logPartPtr;
   UintR clogPartFileSize;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.cpp	2011-11-14 12:02:56 +0000
@@ -20,6 +20,7 @@
 
 NdbLogPartInfo::NdbLogPartInfo(Uint32 instanceNo)
 {
+  LogParts = globalData.ndbLogParts;
   lqhWorkers = globalData.ndbMtLqhWorkers;
   partCount = 0;
   partMask.clear();

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhCommon.hpp	2011-11-14 12:02:56 +0000
@@ -22,7 +22,10 @@
 #include <Bitmask.hpp>
 
 /*
- * Log part id is from DBDIH.  Number of log parts is fixed as 4.
+ * Log part id is from DBDIH.  Number of log parts is configurable with a
+ * maximum setting and minimum of 4 parts. The below description assumes
+ * 4 parts.
+ *
  * A log part is identified by log part number (0-3)
  *
  *   log part number = log part id % 4
@@ -38,12 +41,12 @@
  * instance (main instance 0 or worker instances 1-4).
  */
 struct NdbLogPartInfo {
-  enum { LogParts = 4 };
+  Uint32 LogParts;
   NdbLogPartInfo(Uint32 instanceNo);
   Uint32 lqhWorkers;
   Uint32 partCount;
-  Uint16 partNo[LogParts];
-  Bitmask<(LogParts+31)/32> partMask;
+  Uint16 partNo[NDB_MAX_LOG_PARTS];
+  Bitmask<(NDB_MAX_LOG_PARTS+31)/32> partMask;
   Uint32 partNoFromId(Uint32 lpid) const;
   bool partNoOwner(Uint32 lpno) const;
   bool partNoOwner(Uint32 tabId, Uint32 fragId);

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2011-11-14 12:02:56 +0000
@@ -36,9 +36,7 @@ void Dblqh::initData() 
   clcpFileSize = ZNO_CONCURRENT_LCP;
   clfoFileSize = 0;
   clogFileFileSize = 0;
-
-  NdbLogPartInfo lpinfo(instance());
-  clogPartFileSize = lpinfo.partCount;
+  clogPartFileSize = 0; // Not valid until READ_CONFIG
 
   cpageRefFileSize = ZPAGE_REF_FILE_SIZE;
   cscanrecFileSize = 0;
@@ -117,7 +115,7 @@ void Dblqh::initRecords() 
 
   logPartRecord = (LogPartRecord*)allocRecord("LogPartRecord",
 					      sizeof(LogPartRecord), 
-					      clogPartFileSize);
+					      NDB_MAX_LOG_PARTS);
 
   logFileRecord = (LogFileRecord*)allocRecord("LogFileRecord",
 					      sizeof(LogFileRecord),

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-11-09 13:38:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-11-14 14:25:20 +0000
@@ -1219,7 +1219,34 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* 
   const ndb_mgm_configuration_iterator * p = 
     m_ctx.m_config.getOwnConfigIterator();
   ndbrequire(p != 0);
-  
+
+  clogPartFileSize = 4;
+  ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_PARTS,
+                            &clogPartFileSize);
+  globalData.ndbLogParts = clogPartFileSize;
+  ndbrequire(clogPartFileSize <= NDB_MAX_LOG_PARTS);
+
+  if (globalData.ndbMtLqhWorkers > clogPartFileSize)
+  {
+    char buf[255];
+    BaseString::snprintf(buf, sizeof(buf),
+      "Trying to start %d LQH workers with only %d log parts, try initial"
+      " node restart to be able to use more LQH workers.",
+      globalData.ndbMtLqhWorkers, clogPartFileSize);
+    progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
+  }
+  if (clogPartFileSize != 4 &&
+      clogPartFileSize != 8 &&
+      clogPartFileSize != 16)
+  {
+    char buf[255];
+    BaseString::snprintf(buf, sizeof(buf),
+      "Trying to start with %d log parts, number of log parts can"
+      " only be set to 4, 8 or 16.",
+      clogPartFileSize);
+    progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
+  }
+
   cnoLogFiles = 8;
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_FILES, 
 					&cnoLogFiles));
@@ -1889,7 +1916,7 @@ void Dblqh::execLQHFRAGREQ(Signal* signa
     ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec);
     FragrecordPtr tFragPtr;
     tFragPtr.i = RNIL;
-    for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+    for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tTablePtr.p->fragid); i++) {
       if (tTablePtr.p->fragid[i] == fragptr.p->fragId) {
         jam();
         tFragPtr.i = tTablePtr.p->fragrec[i];
@@ -2633,7 +2660,7 @@ void Dblqh::removeTable(Uint32 tableId)
   tabptr.i = tableId;
   ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
   
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabptr.p->fragid); i++) {
     jam();
     if (tabptr.p->fragid[i] != ZNIL) {
       jam();
@@ -2778,7 +2805,7 @@ Dblqh::wait_reorg_suma_filter_enabled(Si
 void
 Dblqh::commit_reorg(TablerecPtr tablePtr)
 {
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++)
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tablePtr.p->fragrec); i++)
   {
     jam();
     Ptr<Fragrecord> fragPtr;
@@ -14644,7 +14671,7 @@ void Dblqh::initGcpRecLab(Signal* signal
   }//for
   // initialize un-used part
   Uint32 Ti;
-  for (Ti = clogPartFileSize; Ti < ZLOG_PART_FILE_SIZE; Ti++) {
+  for (Ti = clogPartFileSize; Ti < NDB_MAX_LOG_PARTS; Ti++) {
     gcpPtr.p->gcpFilePtr[Ti] = ZNIL;
     gcpPtr.p->gcpPageNo[Ti] = ZNIL;
     gcpPtr.p->gcpSyncReady[Ti] = FALSE;
@@ -15698,7 +15725,10 @@ void Dblqh::initWriteEndLab(Signal* sign
 /*---------------------------------------------------------------------------*/
 /* PAGE ZERO IN FILE ZERO MUST SET LOG LAP TO ONE SINCE IT HAS STARTED       */
 /* WRITING TO THE LOG, ALSO GLOBAL CHECKPOINTS ARE SET TO ZERO.              */
+/* Set number of log parts used to ensure we use correct number of log parts */
+/* at system restart. Was previously hardcoded to 4.                         */
 /*---------------------------------------------------------------------------*/
+    logPagePtr.p->logPageWord[ZPOS_NO_LOG_PARTS]= clogPartFileSize;
     logPagePtr.p->logPageWord[ZPOS_LOG_LAP] = 1;
     logPagePtr.p->logPageWord[ZPOS_MAX_GCI_STARTED] = 0;
     logPagePtr.p->logPageWord[ZPOS_MAX_GCI_COMPLETED] = 0;
@@ -15881,6 +15911,8 @@ void Dblqh::initLogpage(Signal* signal) 
 {
   TcConnectionrecPtr ilpTcConnectptr;
 
+  /* Ensure all non-used header words are zero */
+  bzero(logPagePtr.p, sizeof(Uint32) * ZPAGE_HEADER_SIZE);
   logPagePtr.p->logPageWord[ZPOS_LOG_LAP] = logPartPtr.p->logLap;
   logPagePtr.p->logPageWord[ZPOS_MAX_GCI_COMPLETED] = 
         logPartPtr.p->logPartNewestCompletedGCI;
@@ -16423,6 +16455,35 @@ void Dblqh::openSrFrontpageLab(Signal* s
  * -------------------------------------------------------------------------- */
 void Dblqh::readSrFrontpageLab(Signal* signal) 
 {
+  Uint32 num_parts_used;
+  if (!ndb_configurable_log_parts(logPagePtr.p->logPageWord[ZPOS_VERSION])) {
+    jam();
+    num_parts_used= 4;
+  }
+  else
+  {
+    jam();
+    num_parts_used = logPagePtr.p->logPageWord[ZPOS_NO_LOG_PARTS];
+  }
+  /* Verify that number of log parts >= number of LQH workers */
+  if (globalData.ndbMtLqhWorkers > num_parts_used) {
+    char buf[255];
+    BaseString::snprintf(buf, sizeof(buf),
+      "Trying to start %d LQH workers with only %d log parts, try initial"
+      " node restart to be able to use more LQH workers.",
+      globalData.ndbMtLqhWorkers, num_parts_used);
+    progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
+  }
+  if (num_parts_used != clogPartFileSize)
+  {
+    char buf[255];
+    BaseString::snprintf(buf, sizeof(buf),
+      "Can only change NoOfLogParts through initial node restart, old"
+      " value of NoOfLogParts = %d, tried using %d",
+      num_parts_used, clogPartFileSize);
+    progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
+  }
+
   Uint32 fileNo = logPagePtr.p->logPageWord[ZPAGE_HEADER_SIZE + ZPOS_FILE_NO];
   if (fileNo == 0) {
     jam();
@@ -20050,7 +20111,7 @@ void Dblqh::deleteFragrec(Uint32 fragId)
 {
   Uint32 indexFound= RNIL;
   fragptr.i = RNIL;
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabptr.p->fragid); i++) {
     jam();
     if (tabptr.p->fragid[i] == fragId) {
       fragptr.i = tabptr.p->fragrec[i];
@@ -20265,7 +20326,7 @@ Dblqh::getFirstInLogQueue(Signal* signal
 /* ---------------------------------------------------------------- */
 bool Dblqh::getFragmentrec(Signal* signal, Uint32 fragId) 
 {
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabptr.p->fragid); i++) {
     jam();
     if (tabptr.p->fragid[i] == fragId) {
       fragptr.i = tabptr.p->fragrec[i];
@@ -20328,7 +20389,7 @@ void Dblqh::initialiseGcprec(Signal* sig
   if (cgcprecFileSize != 0) {
     for (gcpPtr.i = 0; gcpPtr.i < cgcprecFileSize; gcpPtr.i++) {
       ptrAss(gcpPtr, gcpRecord);
-      for (tigpIndex = 0; tigpIndex < ZLOG_PART_FILE_SIZE; tigpIndex++) {
+      for (tigpIndex = 0; tigpIndex < NDB_MAX_LOG_PARTS; tigpIndex++) {
         gcpPtr.p->gcpLogPartState[tigpIndex] = ZIDLE;
         gcpPtr.p->gcpSyncReady[tigpIndex] = ZFALSE;
       }//for
@@ -20616,7 +20677,7 @@ void Dblqh::initialiseTabrec(Signal* sig
       tabptr.p->tableStatus = Tablerec::NOT_DEFINED;
       tabptr.p->usageCountR = 0;
       tabptr.p->usageCountW = 0;
-      for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+      for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabptr.p->fragid); i++) {
         tabptr.p->fragid[i] = ZNIL;
         tabptr.p->fragrec[i] = RNIL;
       }//for
@@ -20886,7 +20947,7 @@ bool Dblqh::insertFragrec(Signal* signal
     terrorCode = ZNO_FREE_FRAGMENTREC;
     return false;
   }
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabptr.p->fragid); i++) {
     jam();
     if (tabptr.p->fragid[i] == ZNIL) {
       jam();
@@ -22579,7 +22640,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
 		  i, tabPtr.p->tableStatus,
                   tabPtr.p->usageCountR, tabPtr.p->usageCountW);
 
-	for (Uint32 j = 0; j<MAX_FRAG_PER_NODE; j++)
+	for (Uint32 j = 0; j<NDB_ARRAY_SIZE(tabPtr.p->fragrec); j++)
 	{
 	  FragrecordPtr fragPtr;
 	  if ((fragPtr.i = tabPtr.p->fragrec[j]) != RNIL)

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-11-09 13:38:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-11-14 14:25:20 +0000
@@ -1532,7 +1532,17 @@ Dbspj::releaseNodeRows(Ptr<Request> requ
       releaseRow(requestPtr, pos);
       cnt++;
     }
-    treeNodePtr.p->m_row_map.init();
+
+    // Release the (now empty) RowMap
+    RowMap& map = treeNodePtr.p->m_row_map;
+    if (!map.isNull())
+    {
+      jam();
+      RowRef ref;
+      map.copyto(ref);
+      releaseRow(requestPtr, ref);  // Map was allocated in row memory
+      map.init();
+    }
     DEBUG("RowMapIterator: released " << cnt << " rows!");
   }
 }

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2011-10-13 20:08:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2011-11-14 14:25:20 +0000
@@ -1135,8 +1135,8 @@ ArrayPool<TupTriggerData> c_triggerPool;
     // List of ordered indexes
     DLList<TupTriggerData> tuxCustomTriggers;
     
-    Uint32 fragid[MAX_FRAG_PER_NODE];
-    Uint32 fragrec[MAX_FRAG_PER_NODE];
+    Uint32 fragid[MAX_FRAG_PER_LQH];
+    Uint32 fragrec[MAX_FRAG_PER_LQH];
 
     union {
       struct {

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp	2011-11-14 09:18:48 +0000
@@ -1546,7 +1546,7 @@ Dbtup::disk_restart_undo(Signal* signal,
     Ptr<Tablerec> tabPtr;
     tabPtr.i= rec->m_table;
     ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
-    for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
+    for(Uint32 i = 0; i<NDB_ARRAY_SIZE(tabPtr.p->fragrec); i++)
       if (tabPtr.p->fragrec[i] != RNIL)
 	disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i], 
 			      Fragrecord::UC_CREATE, 0);
@@ -1566,7 +1566,7 @@ Dbtup::disk_restart_undo(Signal* signal,
     Ptr<Tablerec> tabPtr;
     tabPtr.i= rec->m_table;
     ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
-    for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
+    for(Uint32 i = 0; i<NDB_ARRAY_SIZE(tabPtr.p->fragrec); i++)
       if (tabPtr.p->fragrec[i] != RNIL)
 	disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i], 
 			      Fragrecord::UC_CREATE, 0);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2011-10-13 20:08:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2011-11-14 14:25:20 +0000
@@ -3920,7 +3920,7 @@ Dbtup::validate_page(Tablerec* regTabPtr
   if(mm_vars == 0)
     return;
   
-  for(Uint32 F= 0; F<MAX_FRAG_PER_NODE; F++)
+  for(Uint32 F= 0; F<NDB_ARRAY_SIZE(regTabPtr->fragrec); F++)
   {
     FragrecordPtr fragPtr;
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2011-10-07 16:12:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2011-11-14 09:18:48 +0000
@@ -43,9 +43,10 @@ extern EventLogger * g_eventLogger;
 
 void Dbtup::initData() 
 {
-  cnoOfFragrec = MAX_FRAG_PER_NODE;
-  cnoOfFragoprec = MAX_FRAG_PER_NODE;
-  cnoOfAlterTabOps = MAX_FRAG_PER_NODE;
+  TablerecPtr tablePtr;
+  cnoOfFragrec = NDB_ARRAY_SIZE(tablePtr.p->fragrec);
+  cnoOfFragoprec = NDB_ARRAY_SIZE(tablePtr.p->fragrec);
+  cnoOfAlterTabOps = NDB_ARRAY_SIZE(tablePtr.p->fragrec);
   c_maxTriggersPerTable = ZDEFAULT_MAX_NO_TRIGGERS_PER_TABLE;
   c_noOfBuildIndexRec = 32;
 
@@ -772,7 +773,7 @@ void Dbtup::initializeTablerec() 
 void
 Dbtup::initTab(Tablerec* const regTabPtr)
 {
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(regTabPtr->fragid); i++) {
     regTabPtr->fragid[i] = RNIL;
     regTabPtr->fragrec[i] = RNIL;
   }//for
@@ -870,7 +871,7 @@ void Dbtup::execTUPSEIZEREQ(Signal* sign
   return;
 }//Dbtup::execTUPSEIZEREQ()
 
-#define printFragment(t){ for(Uint32 i = 0; i < MAX_FRAG_PER_NODE;i++){\
+#define printFragment(t){ for(Uint32 i = 0; i < NDB_ARRAY_SIZE(t.p->fragid);i++){ \
   ndbout_c("table = %d fragid[%d] = %d fragrec[%d] = %d", \
            t.i, t.p->fragid[i], i, t.p->fragrec[i]); }}
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp	2011-11-14 09:18:48 +0000
@@ -552,14 +552,14 @@ Dbtup::buildIndex(Signal* signal, Uint32
   do {
     // get fragment
     FragrecordPtr fragPtr;
-    if (buildPtr.p->m_fragNo == MAX_FRAG_PER_NODE) {
+    if (buildPtr.p->m_fragNo == NDB_ARRAY_SIZE(tablePtr.p->fragrec)) {
       jam();
       // build ready
       buildIndexReply(signal, buildPtr.p);
       c_buildIndexList.release(buildPtr);
       return;
     }
-    ndbrequire(buildPtr.p->m_fragNo < MAX_FRAG_PER_NODE);
+    ndbrequire(buildPtr.p->m_fragNo < NDB_ARRAY_SIZE(tablePtr.p->fragrec));
     fragPtr.i= tablePtr.p->fragrec[buildPtr.p->m_fragNo];
     if (fragPtr.i == RNIL) {
       jam();
@@ -809,7 +809,8 @@ Dbtup::execALTER_TAB_CONF(Signal* signal
   else
   {
     jam();
-    ndbrequire(buildPtr.p->m_fragNo >= MAX_FRAG_PER_NODE);
+    TablerecPtr tablePtr;
+    ndbrequire(buildPtr.p->m_fragNo >= NDB_ARRAY_SIZE(tablePtr.p->fragid));
     buildIndexReply(signal, buildPtr.p);
     c_buildIndexList.release(buildPtr);
     return;
@@ -830,7 +831,7 @@ Dbtup::buildIndexOffline_table_readonly(
   tablePtr.i= buildReq->tableId;
   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
-  for (;buildPtr.p->m_fragNo < MAX_FRAG_PER_NODE;
+  for (;buildPtr.p->m_fragNo < NDB_ARRAY_SIZE(tablePtr.p->fragrec);
        buildPtr.p->m_fragNo++)
   {
     jam();
@@ -906,7 +907,7 @@ Dbtup::mt_scan_init(Uint32 tableId, Uint
 
   FragrecordPtr fragPtr;
   fragPtr.i = RNIL;
-  for (Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
+  for (Uint32 i = 0; i<NDB_ARRAY_SIZE(tablePtr.p->fragid); i++)
   {
     if (tablePtr.p->fragid[i] == fragId)
     {
@@ -1011,8 +1012,10 @@ Dbtup::execBUILD_INDX_IMPL_REF(Signal* s
   ndbrequire(buildPtr.p->m_outstanding);
   buildPtr.p->m_outstanding--;
 
+  TablerecPtr tablePtr;
   buildPtr.p->m_errorCode = (BuildIndxImplRef::ErrorCode)err;
-  buildPtr.p->m_fragNo = MAX_FRAG_PER_NODE; // No point in starting any more
+  // No point in starting any more
+  buildPtr.p->m_fragNo = NDB_ARRAY_SIZE(tablePtr.p->fragrec);
   buildIndexOffline_table_readonly(signal, ptr);
 }
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2011-09-01 18:42:31 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2011-11-14 09:18:48 +0000
@@ -910,7 +910,7 @@ bool Dbtup::addfragtotab(Tablerec* const
                          Uint32 fragId,
                          Uint32 fragIndex)
 {
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(regTabPtr->fragid); i++) {
     jam();
     if (regTabPtr->fragid[i] == RNIL) {
       jam();
@@ -926,7 +926,7 @@ void Dbtup::getFragmentrec(FragrecordPtr
                            Uint32 fragId,
                            Tablerec* const regTabPtr)
 {
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(regTabPtr->fragid); i++) {
     jam();
     if (regTabPtr->fragid[i] == fragId) {
       jam();
@@ -1015,7 +1015,7 @@ Dbtup::execALTER_TAB_REQ(Signal *signal)
   case AlterTabReq::AlterTableSumaEnable:
   {
     FragrecordPtr regFragPtr;
-    for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++)
+    for (Uint32 i = 0; i < NDB_ARRAY_SIZE(regTabPtr.p->fragrec); i++)
     {
       jam();
       if ((regFragPtr.i = regTabPtr.p->fragrec[i]) != RNIL)
@@ -1044,7 +1044,7 @@ Dbtup::execALTER_TAB_REQ(Signal *signal)
     Uint32 gci = signal->theData[signal->getLength() - 1];
     regTabPtr.p->m_reorg_suma_filter.m_gci_hi = gci;
     FragrecordPtr regFragPtr;
-    for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++)
+    for (Uint32 i = 0; i < NDB_ARRAY_SIZE(regTabPtr.p->fragrec); i++)
     {
       jam();
       if ((regFragPtr.i = regTabPtr.p->fragrec[i]) != RNIL)
@@ -1320,7 +1320,7 @@ Dbtup::handleAlterTableCommit(Signal *si
   if (AlterTableReq::getReorgFragFlag(req->changeMask))
   {
     FragrecordPtr regFragPtr;
-    for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++)
+    for (Uint32 i = 0; i < NDB_ARRAY_SIZE(regTabPtr->fragrec); i++)
     {
       jam();
       if ((regFragPtr.i = regTabPtr->fragrec[i]) != RNIL)
@@ -1363,7 +1363,7 @@ Dbtup::handleAlterTableComplete(Signal *
   if (AlterTableReq::getReorgCompleteFlag(req->changeMask))
   {
     FragrecordPtr regFragPtr;
-    for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++)
+    for (Uint32 i = 0; i < NDB_ARRAY_SIZE(regTabPtr->fragrec); i++)
     {
       jam();
       if ((regFragPtr.i = regTabPtr->fragrec[i]) != RNIL)
@@ -1892,7 +1892,7 @@ void Dbtup::releaseAlterTabOpRec(AlterTa
 
 void Dbtup::deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId) 
 {
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(regTabPtr->fragid); i++) {
     jam();
     if (regTabPtr->fragid[i] == fragId) {
       jam();
@@ -1991,7 +1991,7 @@ void Dbtup::releaseFragment(Signal* sign
   Uint32 fragIndex = RNIL;
   Uint32 fragId = RNIL;
   Uint32 i = 0;
-  for (i = 0; i < MAX_FRAG_PER_NODE; i++) {
+  for (i = 0; i < NDB_ARRAY_SIZE(tabPtr.p->fragid); i++) {
     jam();
     if (tabPtr.p->fragid[i] != RNIL) {
       jam();
@@ -2464,11 +2464,11 @@ Dbtup::drop_fragment_fsremove_done(Signa
   Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id ;
 
   Uint32 i;
-  for(i= 0; i<MAX_FRAG_PER_NODE; i++)
+  for(i= 0; i<NDB_ARRAY_SIZE(tabPtr.p->fragrec); i++)
     if(tabPtr.p->fragrec[i] == fragPtr.i)
       break;
 
-  ndbrequire(i != MAX_FRAG_PER_NODE);
+  ndbrequire(i != NDB_ARRAY_SIZE(tabPtr.p->fragrec));
   tabPtr.p->fragid[i]= RNIL;
   tabPtr.p->fragrec[i]= RNIL;
   releaseFragrec(fragPtr);
@@ -2694,7 +2694,7 @@ Dbtup::execDROP_FRAG_REQ(Signal* signal)
   tabPtr.p->m_dropTable.tabUserPtr = req->senderData;
 
   Uint32 fragIndex = RNIL;
-  for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++)
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(tabPtr.p->fragid); i++)
   {
     jam();
     if (tabPtr.p->fragid[i] == req->fragId)

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2011-10-13 09:02:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp	2011-11-14 09:18:48 +0000
@@ -120,7 +120,7 @@ public:
 
 private:
   // sizes are in words (Uint32)
-  STATIC_CONST( MaxIndexFragments = MAX_FRAG_PER_NODE );
+  STATIC_CONST( MaxIndexFragments = MAX_FRAG_PER_LQH );
   STATIC_CONST( MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX );
   STATIC_CONST( MaxAttrDataSize = 2 * MAX_ATTRIBUTES_IN_INDEX + MAX_KEY_SIZE_IN_WORDS );
   STATIC_CONST( MaxXfrmDataSize = MaxAttrDataSize * MAX_XFRM_MULTIPLY);

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2011-10-13 09:02:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp	2011-11-11 08:42:31 +0000
@@ -173,8 +173,8 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal
   c_scanOpPool.getPtr(scanPtr);
   ScanOp& scan = *scanPtr.p;
   const Index& index = *c_indexPool.getPtr(scan.m_indexId);
-  const DescHead& descHead = getDescHead(index);
-  const KeyType* keyTypes = getKeyTypes(descHead);
+  // compiler warning unused: const DescHead& descHead = getDescHead(index);
+  // compiler warning unused: const KeyType* keyTypes = getKeyTypes(descHead);
   // data passed in Signal
   const Uint32* const boundData = &req->data[0];
   Uint32 boundLen = req->boundAiLength;

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxStat.cpp	2011-11-11 08:42:31 +0000
@@ -123,7 +123,7 @@ Dbtux::getEntriesBeforeOrAfter(Frag& fra
   Uint16 path[MaxTreeDepth + 1];
   unsigned depth = getPathToNode(node, path);
   ndbrequire(depth != 0 && depth <= MaxTreeDepth);
-  TreeHead& tree = frag.m_tree;
+  // compiler warning unused: TreeHead& tree = frag.m_tree;
   Uint32 cnt = 0;
   Uint32 tot = (Uint32)frag.m_entryCount;
   unsigned i = 0;

=== modified file 'storage/ndb/src/kernel/vm/DLFifoList.hpp'
--- a/storage/ndb/src/kernel/vm/DLFifoList.hpp	2011-10-07 11:46:40 +0000
+++ b/storage/ndb/src/kernel/vm/DLFifoList.hpp	2011-11-11 07:47:19 +0000
@@ -21,6 +21,7 @@
 
 #include <ndb_global.h>
 #include <kernel_types.h>
+#include "ArrayPool.hpp"
 #include "Pool.hpp"
 
 /**

=== modified file 'storage/ndb/src/kernel/vm/DLHashTable.hpp'
--- a/storage/ndb/src/kernel/vm/DLHashTable.hpp	2011-10-13 09:25:13 +0000
+++ b/storage/ndb/src/kernel/vm/DLHashTable.hpp	2011-11-11 07:46:17 +0000
@@ -1,6 +1,5 @@
 /*
-   Copyright (c) 2003-2006, 2008 MySQL AB, 2009, 2010 Sun Microsystems, Inc.
-   Use is subject to license terms.
+   Copyright (c) 2003-2006, 2008, 2011, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -20,30 +19,38 @@
 #define DL_HASHTABLE_HPP
 
 #include <ndb_global.h>
-#include "ArrayPool.hpp"
 
 /**
- * DLHashTable implements a hashtable using chaining
+ * DLMHashTable implements a hashtable using chaining
  *   (with a double linked list)
  *
- * The entries in the hashtable must have the following methods:
- *  -# bool U::equal(const class U &) const;
- *     Which should return equal if the to objects have the same key
- *  -# Uint32 U::hashValue() const;
- *     Which should return a 32 bit hashvalue
+ * The entries in the (uninstansiated) meta class passed to the
+ * hashtable must have the following methods:
  *
- * and the following members:
- *  -# Uint32 U::nextHash;
- *  -# Uint32 U::prevHash;
+ *  -# nextHash(T&) returning a reference to the next link
+ *  -# prevHash(T&) returning a reference to the prev link
+ *  -# bool equal(T const&,T const&) returning equality of the objects keys
+ *  -# hashValue(T) calculating the hash value
  */
 
-template <typename P, typename T, typename U = T>
-class DLHashTableImpl 
+template <typename T, typename U = T> struct DLHashTableDefaultMethods {
+static Uint32& nextHash(U& t) { return t.nextHash; }
+static Uint32& prevHash(U& t) { return t.prevHash; }
+static Uint32 hashValue(T const& t) { return t.hashValue(); }
+static bool equal(T const& lhs, T const& rhs) { return lhs.equal(rhs); }
+};
+
+template <typename P, typename T, typename M = DLHashTableDefaultMethods<T> >
+class DLMHashTable
 {
 public:
-  DLHashTableImpl(P & thePool);
-  ~DLHashTableImpl();
-  
+  explicit DLMHashTable(P & thePool);
+  ~DLMHashTable();
+private:
+  DLMHashTable(const DLMHashTable&);
+  DLMHashTable&  operator=(const DLMHashTable&);
+
+public:
   /**
    * Set the no of bucket in the hashtable
    *
@@ -63,9 +70,9 @@ public:
    * Add an object to the hashtable
    */
   void add(Ptr<T> &);
-  
+
   /**
-   * Find element key in hashtable update Ptr (i & p) 
+   * Find element key in hashtable update Ptr (i & p)
    *   (using key.equal(...))
    * @return true if found and false otherwise
    */
@@ -108,7 +115,7 @@ public:
    * Remove all elements, but dont return them to pool
    */
   void removeAll();
-  
+
   /**
    * Remove element and return to pool
    */
@@ -118,7 +125,7 @@ public:
    * Remove element and return to pool
    */
   void release(Ptr<T> &);
-  
+
   class Iterator {
   public:
     Ptr<T> curr;
@@ -136,7 +143,7 @@ public:
    * First element in bucket
    */
   bool first(Iterator & iter) const;
-  
+
   /**
    * Next Element
    *
@@ -151,16 +158,16 @@ public:
    * @param iter - An "uninitialized" iterator
    */
   bool next(Uint32 bucket, Iterator & iter) const;
-  
+
 private:
   Uint32 mask;
   Uint32 * hashValues;
   P & thePool;
 };
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
-DLHashTableImpl<P, T, U>::DLHashTableImpl(P & _pool)
+DLMHashTable<P, T, M>::DLMHashTable(P & _pool)
   : thePool(_pool)
 {
   // Require user defined constructor on T since we fiddle
@@ -171,23 +178,23 @@ DLHashTableImpl<P, T, U>::DLHashTableImp
   hashValues = 0;
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
-DLHashTableImpl<P, T, U>::~DLHashTableImpl()
+DLMHashTable<P, T, M>::~DLMHashTable()
 {
-  if(hashValues != 0)
+  if (hashValues != 0)
     delete [] hashValues;
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 bool
-DLHashTableImpl<P, T, U>::setSize(Uint32 size)
+DLMHashTable<P, T, M>::setSize(Uint32 size)
 {
   Uint32 i = 1;
-  while(i < size) i *= 2;
+  while (i < size) i *= 2;
 
-  if(mask == (i - 1))
+  if (mask == (i - 1))
   {
     /**
      * The size is already set to <b>size</b>
@@ -195,43 +202,43 @@ DLHashTableImpl<P, T, U>::setSize(Uint32
     return true;
   }
 
-  if(mask != 0)
+  if (mask != 0)
   {
     /**
      * The mask is already set
      */
     return false;
   }
-  
+
   mask = (i - 1);
   hashValues = new Uint32[i];
-  for(Uint32 j = 0; j<i; j++)
+  for (Uint32 j = 0; j<i; j++)
     hashValues[j] = RNIL;
-  
+
   return true;
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 void
-DLHashTableImpl<P, T, U>::add(Ptr<T> & obj)
+DLMHashTable<P, T, M>::add(Ptr<T> & obj)
 {
-  const Uint32 hv = obj.p->U::hashValue() & mask;
+  const Uint32 hv = M::hashValue(*obj.p) & mask;
   const Uint32 i  = hashValues[hv];
-  
-  if(i == RNIL)
+
+  if (i == RNIL)
   {
     hashValues[hv] = obj.i;
-    obj.p->U::nextHash = RNIL;
-    obj.p->U::prevHash = RNIL;
-  } 
-  else 
+    M::nextHash(*obj.p) = RNIL;
+    M::prevHash(*obj.p) = RNIL;
+  }
+  else
   {
     T * tmp = thePool.getPtr(i);
-    tmp->U::prevHash = obj.i;
-    obj.p->U::nextHash = i;
-    obj.p->U::prevHash = RNIL;
-    
+    M::prevHash(*tmp) = obj.i;
+    M::nextHash(*obj.p) = i;
+    M::prevHash(*obj.p) = RNIL;
+
     hashValues[hv] = obj.i;
   }
 }
@@ -239,62 +246,62 @@ DLHashTableImpl<P, T, U>::add(Ptr<T> & o
 /**
  * First element
  */
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 bool
-DLHashTableImpl<P, T, U>::first(Iterator & iter) const 
+DLMHashTable<P, T, M>::first(Iterator & iter) const
 {
   Uint32 i = 0;
-  while(i <= mask && hashValues[i] == RNIL) i++;
-  if(i <= mask)
+  while (i <= mask && hashValues[i] == RNIL) i++;
+  if (i <= mask)
   {
     iter.bucket = i;
     iter.curr.i = hashValues[i];
     iter.curr.p = thePool.getPtr(iter.curr.i);
     return true;
   }
-  else 
+  else
   {
     iter.curr.i = RNIL;
   }
   return false;
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 bool
-DLHashTableImpl<P, T, U>::next(Iterator & iter) const 
+DLMHashTable<P, T, M>::next(Iterator & iter) const
 {
-  if(iter.curr.p->U::nextHash == RNIL)
+  if (M::nextHash(*iter.curr.p) == RNIL)
   {
     Uint32 i = iter.bucket + 1;
-    while(i <= mask && hashValues[i] == RNIL) i++;
-    if(i <= mask)
+    while (i <= mask && hashValues[i] == RNIL) i++;
+    if (i <= mask)
     {
       iter.bucket = i;
       iter.curr.i = hashValues[i];
       iter.curr.p = thePool.getPtr(iter.curr.i);
       return true;
     }
-    else 
+    else
     {
       iter.curr.i = RNIL;
       return false;
     }
   }
-  
-  iter.curr.i = iter.curr.p->U::nextHash;
+
+  iter.curr.i = M::nextHash(*iter.curr.p);
   iter.curr.p = thePool.getPtr(iter.curr.i);
   return true;
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 void
-DLHashTableImpl<P, T, U>::remove(Ptr<T> & ptr, const T & key)
+DLMHashTable<P, T, M>::remove(Ptr<T> & ptr, const T & key)
 {
-  const Uint32 hv = key.U::hashValue() & mask;  
-  
+  const Uint32 hv = M::hashValue(key) & mask;
+
   Uint32 i;
   T * p;
   Ptr<T> prev;
@@ -302,42 +309,42 @@ DLHashTableImpl<P, T, U>::remove(Ptr<T> 
   prev.i = RNIL;
 
   i = hashValues[hv];
-  while(i != RNIL)
+  while (i != RNIL)
   {
     p = thePool.getPtr(i);
-    if(key.U::equal(* p))
+    if (M::equal(key, * p))
     {
-      const Uint32 next = p->U::nextHash;
-      if(prev.i == RNIL)
+      const Uint32 next = M::nextHash(*p);
+      if (prev.i == RNIL)
       {
-	hashValues[hv] = next;
-      } 
-      else 
+        hashValues[hv] = next;
+      }
+      else
       {
-	prev.p->U::nextHash = next;
+        M::nextHash(*prev.p) = next;
       }
-      
-      if(next != RNIL)
+
+      if (next != RNIL)
       {
-	T * nextP = thePool.getPtr(next);
-	nextP->U::prevHash = prev.i;
+        T * nextP = thePool.getPtr(next);
+        M::prevHash(*nextP) = prev.i;
       }
-      
+
       ptr.i = i;
       ptr.p = p;
       return;
     }
     prev.p = p;
     prev.i = i;
-    i = p->U::nextHash;
+    i = M::nextHash(*p);
   }
   ptr.i = RNIL;
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 void
-DLHashTableImpl<P, T, U>::remove(Uint32 i)
+DLMHashTable<P, T, M>::remove(Uint32 i)
 {
   Ptr<T> tmp;
   tmp.i = i;
@@ -345,10 +352,10 @@ DLHashTableImpl<P, T, U>::remove(Uint32 
   remove(tmp);
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 void
-DLHashTableImpl<P, T, U>::release(Uint32 i)
+DLMHashTable<P, T, M>::release(Uint32 i)
 {
   Ptr<T> tmp;
   tmp.i = i;
@@ -356,22 +363,22 @@ DLHashTableImpl<P, T, U>::release(Uint32
   release(tmp);
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
-void 
-DLHashTableImpl<P, T, U>::remove(Ptr<T> & ptr)
+void
+DLMHashTable<P, T, M>::remove(Ptr<T> & ptr)
 {
-  const Uint32 next = ptr.p->U::nextHash;
-  const Uint32 prev = ptr.p->U::prevHash;
+  const Uint32 next = M::nextHash(*ptr.p);
+  const Uint32 prev = M::prevHash(*ptr.p);
 
-  if(prev != RNIL)
+  if (prev != RNIL)
   {
     T * prevP = thePool.getPtr(prev);
-    prevP->U::nextHash = next;
-  } 
-  else 
+    M::nextHash(*prevP) = next;
+  }
+  else
   {
-    const Uint32 hv = ptr.p->U::hashValue() & mask;  
+    const Uint32 hv = M::hashValue(*ptr.p) & mask;
     if (hashValues[hv] == ptr.i)
     {
       hashValues[hv] = next;
@@ -382,30 +389,30 @@ DLHashTableImpl<P, T, U>::remove(Ptr<T> 
       assert(false);
     }
   }
-  
-  if(next != RNIL)
+
+  if (next != RNIL)
   {
     T * nextP = thePool.getPtr(next);
-    nextP->U::prevHash = prev;
+    M::prevHash(*nextP) = prev;
   }
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
-void 
-DLHashTableImpl<P, T, U>::release(Ptr<T> & ptr)
+void
+DLMHashTable<P, T, M>::release(Ptr<T> & ptr)
 {
-  const Uint32 next = ptr.p->U::nextHash;
-  const Uint32 prev = ptr.p->U::prevHash;
+  const Uint32 next = M::nextHash(*ptr.p);
+  const Uint32 prev = M::prevHash(*ptr.p);
 
-  if(prev != RNIL)
+  if (prev != RNIL)
   {
     T * prevP = thePool.getPtr(prev);
-    prevP->U::nextHash = next;
-  } 
-  else 
+    M::nextHash(*prevP) = next;
+  }
+  else
   {
-    const Uint32 hv = ptr.p->U::hashValue() & mask;  
+    const Uint32 hv = M::hashValue(*ptr.p) & mask;
     if (hashValues[hv] == ptr.i)
     {
       hashValues[hv] = next;
@@ -416,104 +423,104 @@ DLHashTableImpl<P, T, U>::release(Ptr<T>
       // Will add assert in 5.1
     }
   }
-  
-  if(next != RNIL)
+
+  if (next != RNIL)
   {
     T * nextP = thePool.getPtr(next);
-    nextP->U::prevHash = prev;
+    M::prevHash(*nextP) = prev;
   }
-  
+
   thePool.release(ptr);
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
-void 
-DLHashTableImpl<P, T, U>::removeAll()
+void
+DLMHashTable<P, T, M>::removeAll()
 {
-  for(Uint32 i = 0; i<=mask; i++)
+  for (Uint32 i = 0; i<=mask; i++)
     hashValues[i] = RNIL;
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 bool
-DLHashTableImpl<P, T, U>::next(Uint32 bucket, Iterator & iter) const 
+DLMHashTable<P, T, M>::next(Uint32 bucket, Iterator & iter) const
 {
-  while (bucket <= mask && hashValues[bucket] == RNIL) 
-    bucket++; 
-  
-  if (bucket > mask) 
+  while (bucket <= mask && hashValues[bucket] == RNIL)
+    bucket++;
+
+  if (bucket > mask)
   {
     iter.bucket = bucket;
     iter.curr.i = RNIL;
     return false;
   }
-  
+
   iter.bucket = bucket;
   iter.curr.i = hashValues[bucket];
   iter.curr.p = thePool.getPtr(iter.curr.i);
   return true;
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 bool
-DLHashTableImpl<P, T, U>::seize(Ptr<T> & ptr)
+DLMHashTable<P, T, M>::seize(Ptr<T> & ptr)
 {
-  if(thePool.seize(ptr)){
-    ptr.p->U::nextHash = ptr.p->U::prevHash = RNIL;
+  if (thePool.seize(ptr)){
+    M::nextHash(*ptr.p) = M::prevHash(*ptr.p) = RNIL;
     return true;
   }
   return false;
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 void
-DLHashTableImpl<P, T, U>::getPtr(Ptr<T> & ptr, Uint32 i) const 
+DLMHashTable<P, T, M>::getPtr(Ptr<T> & ptr, Uint32 i) const
 {
   ptr.i = i;
   ptr.p = thePool.getPtr(i);
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 void
-DLHashTableImpl<P, T, U>::getPtr(Ptr<T> & ptr) const 
+DLMHashTable<P, T, M>::getPtr(Ptr<T> & ptr) const
 {
   thePool.getPtr(ptr);
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
-T * 
-DLHashTableImpl<P, T, U>::getPtr(Uint32 i) const 
+T *
+DLMHashTable<P, T, M>::getPtr(Uint32 i) const
 {
   return thePool.getPtr(i);
 }
 
-template <typename P, typename T, typename U>
+template <typename P, typename T, typename M>
 inline
 bool
-DLHashTableImpl<P, T, U>::find(Ptr<T> & ptr, const T & key) const 
+DLMHashTable<P, T, M>::find(Ptr<T> & ptr, const T & key) const
 {
-  const Uint32 hv = key.U::hashValue() & mask;  
-  
+  const Uint32 hv = M::hashValue(key) & mask;
+
   Uint32 i;
   T * p;
 
   i = hashValues[hv];
-  while(i != RNIL)
+  while (i != RNIL)
   {
     p = thePool.getPtr(i);
-    if(key.U::equal(* p))
+    if (M::equal(key, * p))
     {
       ptr.i = i;
       ptr.p = p;
       return true;
     }
-    i = p->U::nextHash;
+    i = M::nextHash(*p);
   }
   ptr.i = RNIL;
   ptr.p = NULL;
@@ -522,11 +529,26 @@ DLHashTableImpl<P, T, U>::find(Ptr<T> & 
 
 // Specializations
 
+#include "ArrayPool.hpp"
+
+template <typename P, typename T, typename U = T >
+class DLHashTableImpl: public DLMHashTable<P, T, DLHashTableDefaultMethods<T, U> >
+{
+public:
+  explicit DLHashTableImpl(P & p): DLMHashTable<P, T, DLHashTableDefaultMethods<T, U> >(p) { }
+private:
+  DLHashTableImpl(const DLHashTableImpl&);
+  DLHashTableImpl&  operator=(const DLHashTableImpl&);
+};
+
 template <typename T, typename U = T, typename P = ArrayPool<T> >
-class DLHashTable : public DLHashTableImpl<P, T, U>
+class DLHashTable: public DLMHashTable<P, T, DLHashTableDefaultMethods<T, U> >
 {
 public:
-  DLHashTable(P & p) : DLHashTableImpl<P, T, U>(p) {}
+  explicit DLHashTable(P & p): DLMHashTable<P, T, DLHashTableDefaultMethods<T, U> >(p) { }
+private:
+  DLHashTable(const DLHashTable&);
+  DLHashTable&  operator=(const DLHashTable&);
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp	2011-09-15 20:21:59 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp	2011-11-14 12:02:56 +0000
@@ -75,6 +75,7 @@ struct GlobalData {
   Uint32     ndbMtLqhWorkers;
   Uint32     ndbMtLqhThreads;
   Uint32     ndbMtTcThreads;
+  Uint32     ndbLogParts;
   
   GlobalData(){ 
     theSignalId = 0; 
@@ -85,6 +86,7 @@ struct GlobalData {
     ndbMtLqhWorkers = 0;
     ndbMtLqhThreads = 0;
     ndbMtTcThreads = 0;
+    ndbLogParts = 0;
 #ifdef GCP_TIMER_HACK
     gcp_timer_limit = 0;
 #endif

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2011-10-13 20:08:25 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2011-11-14 14:25:20 +0000
@@ -507,7 +507,7 @@ protected:
     };
     Uint32 prevHash;
     
-    inline bool equal(FragmentInfo & p) const {
+    inline bool equal(FragmentInfo const & p) const {
       return m_senderRef == p.m_senderRef && m_fragmentId == p.m_fragmentId;
     }
     

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2011-10-13 20:08:25 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2011-11-14 14:25:20 +0000
@@ -76,7 +76,7 @@ static const Uint32 MAX_SIGNALS_BEFORE_W
 #define MAX_THREADS (NUM_MAIN_THREADS +       \
                      MAX_NDBMT_LQH_THREADS +  \
                      MAX_NDBMT_TC_THREADS + 1)
-#define MAX_BLOCK_INSTANCES (MAX_THREADS)
+#define MAX_BLOCK_INSTANCES (MAX_THREADS+1)
 
 /* If this is too small it crashes before first signal. */
 #define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)

=== modified file 'storage/ndb/src/kernel/vm/pc.hpp'
--- a/storage/ndb/src/kernel/vm/pc.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/pc.hpp	2011-11-14 09:18:48 +0000
@@ -165,7 +165,7 @@
 // need large value.
 /* ------------------------------------------------------------------------- */
 #define NO_OF_FRAG_PER_NODE 1
-#define MAX_FRAG_PER_NODE 8
+#define MAX_FRAG_PER_LQH 8
 
 /**
 * DIH allocates fragments in chunk for fast find of fragment record.

=== modified file 'storage/ndb/test/ndbapi/flexAsynch.cpp'
--- a/storage/ndb/test/ndbapi/flexAsynch.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/flexAsynch.cpp	2011-11-11 08:35:14 +0000
@@ -37,7 +37,7 @@
 #define MAX_SEEK 16 
 #define MAXSTRLEN 16 
 #define MAXATTR 64
-#define MAXTABLES 64
+#define MAXTABLES 1
 #define NDB_MAXTHREADS 128
 /*
   NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
@@ -59,6 +59,16 @@ enum StartType { 
   stStop 
 } ;
 
+enum RunType {
+  RunInsert,
+  RunRead,
+  RunUpdate,
+  RunDelete,
+  RunCreateTable,
+  RunDropTable,
+  RunAll
+};
+
 struct ThreadNdb
 {
   int NoOfOps;
@@ -70,6 +80,7 @@ extern "C" { static void* threadLoop(voi
 static void setAttrNames(void);
 static void setTableNames(void);
 static int readArguments(int argc, const char** argv);
+static void dropTables(Ndb* pMyNdb);
 static int createTables(Ndb*);
 static void defineOperation(NdbConnection* aTransObject, StartType aType, 
                             Uint32 base, Uint32 aIndex);
@@ -77,6 +88,8 @@ static void defineNdbRecordOperation(Thr
                             Uint32 base, Uint32 aIndex);
 static void execute(StartType aType);
 static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
+static bool executeTransLoop(ThreadNdb* pThread, StartType aType, Ndb* aNdbObject,
+                             unsigned int threadBase, int threadNo);
 static void executeCallback(int result, NdbConnection* NdbObject,
                             void* aObject);
 static bool error_handler(const NdbError & err);
@@ -92,9 +105,15 @@ ErrorData * flexAsynchErrorData;        
 static NdbThread*               threadLife[NDB_MAXTHREADS];
 static int                              tNodeId;
 static int                              ThreadReady[NDB_MAXTHREADS];
+static longlong                 ThreadExecutions[NDB_MAXTHREADS];
 static StartType                ThreadStart[NDB_MAXTHREADS];
 static char                             tableName[MAXTABLES][MAXSTRLEN+1];
 static char                             attrName[MAXATTR][MAXSTRLEN+1];
+static RunType                          tRunType = RunAll;
+static int                              tStdTableNum = 0;
+static int                              tWarmupTime = 10; //Seconds
+static int                              tExecutionTime = 30; //Seconds
+static int                              tCooldownTime = 10; //Seconds
 
 // Program Parameters
 static NdbRecord * g_record[MAXTABLES];
@@ -126,9 +145,10 @@ static int                              
 
 #define START_REAL_TIME
 #define STOP_REAL_TIME
-#define START_TIMER { NdbTimer timer; timer.doStart();
+#define DEFINE_TIMER NdbTimer timer
+#define START_TIMER timer.doStart();
 #define STOP_TIMER timer.doStop();
-#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; 
+#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans)
 
 NDBT_Stats a_i, a_u, a_d, a_r;
 
@@ -183,6 +203,7 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   ThreadNdb*            pThreadData;
   int                   tLoops=0;
   int                   returnValue = NDBT_OK;
+  DEFINE_TIMER;
 
   flexAsynchErrorData = new ErrorData;
   flexAsynchErrorData->resetErrorCounters();
@@ -201,7 +222,13 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   ndbout << "  " << tNoOfParallelTrans;
   ndbout << " number of parallel operation per thread " << endl;
   ndbout << "  " << tNoOfTransactions << " transaction(s) per round " << endl;
-  ndbout << "  " << tNoOfLoops << " iterations " << endl;
+  if (tRunType == RunAll){
+    ndbout << "  " << tNoOfLoops << " iterations " << endl;
+  } else if (tRunType == RunRead || tRunType == RunUpdate){
+    ndbout << "  Warmup time is " << tWarmupTime << endl;
+    ndbout << "  Execution time is " << tExecutionTime << endl;
+    ndbout << "  Cooldown time is " << tCooldownTime << endl;
+  }
   ndbout << "  " << "Load Factor is " << tLoadFactor << "%" << endl;
   ndbout << "  " << tNoOfAttributes << " attributes per table " << endl;
   ndbout << "  " << tAttributeSize;
@@ -262,10 +289,20 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
   if (pNdb->waitUntilReady(10000) != 0){
     ndbout << "NDB is not ready" << endl;
     ndbout << "Benchmark failed!" << endl;
-    returnValue = NDBT_FAILED;
+    return NDBT_ProgramExit(NDBT_FAILED);
   }
 
-  if(returnValue == NDBT_OK){
+  if (tRunType == RunCreateTable)
+  {
+    if (createTables(pNdb) != 0){
+      returnValue = NDBT_FAILED;
+    }
+  }
+  else if (tRunType == RunDropTable)
+  {
+    dropTables(pNdb);
+  }
+  else if(returnValue == NDBT_OK){
     if (createTables(pNdb) != 0){
       returnValue = NDBT_FAILED;
     }
@@ -282,14 +319,15 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
     }
   }
 
-  if(returnValue == NDBT_OK){
+  if(returnValue == NDBT_OK &&
+     tRunType != RunCreateTable &&
+     tRunType != RunDropTable){
     /****************************************************************
      *  Create NDB objects.                                   *
      ****************************************************************/
     resetThreads();
     for (Uint32 i = 0; i < tNoOfThreads ; i++) {
-      pThreadData[i].ThreadNo = i
-;
+      pThreadData[i].ThreadNo = i;
       threadLife[i] = NdbThread_Create(threadLoop,
                                        (void**)&pThreadData[i],
                                        32768,
@@ -312,76 +350,86 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
        ****************************************************************/
           
       failed = 0 ;
-
-      START_TIMER;
-      execute(stInsert);
-      STOP_TIMER;
-      a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-      PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int ci = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!" 
-                 << endl << endl;
-          ndbout << "Attempting to redo the failed transactions now..." 
-                 << endl ;
-          ndbout << "Redo attempt " << ci <<" out of " << retry_opt 
-                 << endl << endl;
-          failed = 0 ;
-          START_TIMER;
-          execute(stInsert);
-          STOP_TIMER;
-          PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          ci++;
-        }
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-        }else{
-          ndbout << endl <<"Redo attempt failed, moving on now..." << endl 
-                 << endl;
+      if (tRunType == RunAll || tRunType == RunInsert){
+        ndbout << "Executing inserts" << endl;
+        START_TIMER;
+        execute(stInsert);
+        STOP_TIMER;
+      }
+      if (tRunType == RunAll){
+        a_i.addObservation((1000*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int ci = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!" 
+                   << endl << endl;
+            ndbout << "Attempting to redo the failed transactions now..." 
+                   << endl ;
+            ndbout << "Redo attempt " << ci <<" out of " << retry_opt 
+                   << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stInsert);
+            STOP_TIMER;
+            PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            ci++;
+          }
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+          }else{
+            ndbout << endl <<"Redo attempt failed, moving on now..." << endl 
+                   << endl;
+          }//if
         }//if
-      }//if
-          
+      }//if  
       /****************************************************************
        * Perform read.                                                *
        ****************************************************************/
       
       failed = 0 ;
 
-      for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
-      {
-        START_TIMER;
-        execute(stRead);
-        STOP_TIMER;
-        a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-        PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-      }
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cr = 1;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<<endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now..." << endl;
-          ndbout << endl;
-          ndbout <<"Redo attempt " << cr <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
+      if (tRunType == RunAll || tRunType == RunRead){
+        for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+        {
+          ndbout << "Executing reads" << endl;
           START_TIMER;
           execute(stRead);
           STOP_TIMER;
-          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cr++ ;
-        }//while
-        if(0 == failed ) {
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
-        }else{
-          ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+          if (tRunType == RunAll){
+            a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+          }//if
+        }//for
+      }//if
+
+      if (tRunType == RunAll){
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cr = 1;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<<endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now..." << endl;
+            ndbout << endl;
+            ndbout <<"Redo attempt " << cr <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stRead);
+            STOP_TIMER;
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cr++ ;
+          }//while
+          if(0 == failed ) {
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+          }else{
+            ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
+          }//if
         }//if
       }//if
           
@@ -391,35 +439,40 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
        ****************************************************************/
       
       failed = 0 ;
-          
-      START_TIMER;
-      execute(stUpdate);
-      STOP_TIMER;
-      a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-      PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cu = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<<endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now..." << endl;
-          ndbout << endl <<"Redo attempt " << cu <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
-          START_TIMER;
-          execute(stUpdate);
-          STOP_TIMER;
-          PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cu++ ;
-        }//while
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-        } else {
-          ndbout << endl;
-          ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+
+      if (tRunType == RunAll || tRunType == RunUpdate){
+        ndbout << "Executing updates" << endl;
+        START_TIMER;
+        execute(stUpdate);
+        STOP_TIMER;
+      }//if
+      if (tRunType == RunAll){
+        a_u.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cu = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<<endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now..." << endl;
+            ndbout << endl <<"Redo attempt " << cu <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stUpdate);
+            STOP_TIMER;
+            PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cu++ ;
+          }//while
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+          } else {
+            ndbout << endl;
+            ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
+          }//if
         }//if
       }//if
           
@@ -428,38 +481,41 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
        ****************************************************************/
       
       failed = 0 ;
-          
-      for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
-      {
-        START_TIMER;
-        execute(stRead);
-        STOP_TIMER;
-        a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-        PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-      }        
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cr2 = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<<endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now..." << endl;
-          ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
+
+      if (tRunType == RunAll){
+        for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
+        {
+          ndbout << "Executing reads" << endl;
           START_TIMER;
           execute(stRead);
           STOP_TIMER;
+          a_r.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
           PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cr2++ ;
-        }//while
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl;
-        }else{
-          ndbout << endl;
-          ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+        }        
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cr2 = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<<endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now..." << endl;
+            ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stRead);
+            STOP_TIMER;
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cr2++ ;
+          }//while
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl;
+          }else{
+            ndbout << endl;
+            ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+          }//if
         }//if
       }//if
           
@@ -470,34 +526,39 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       
       failed = 0 ;
           
-      START_TIMER;
-      execute(stDelete);
-      STOP_TIMER;
-      a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
-      PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
-
-      if (0 < failed) {
-        int i = retry_opt ;
-        int cd = 1 ;
-        while (0 < failed && 0 < i){
-          ndbout << failed << " of the transactions returned errors!"<< endl ;
-          ndbout << endl;
-          ndbout <<"Attempting to redo the failed transactions now:" << endl ;
-          ndbout << endl <<"Redo attempt " << cd <<" out of ";
-          ndbout << retry_opt << endl << endl;
-          failed = 0 ;
-          START_TIMER;
-          execute(stDelete);
-          STOP_TIMER;
-          PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
-          i-- ;
-          cd++ ;
-        }//while
-        if(0 == failed ){
-          ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
-        }else{
-          ndbout << endl;
-          ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+      if (tRunType == RunAll || tRunType == RunDelete){
+        ndbout << "Executing deletes" << endl;
+        START_TIMER;
+        execute(stDelete);
+        STOP_TIMER;
+      }//if
+      if (tRunType == RunAll){
+        a_d.addObservation((1000 * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
+        PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
+
+        if (0 < failed) {
+          int i = retry_opt ;
+          int cd = 1 ;
+          while (0 < failed && 0 < i){
+            ndbout << failed << " of the transactions returned errors!"<< endl ;
+            ndbout << endl;
+            ndbout <<"Attempting to redo the failed transactions now:" << endl ;
+            ndbout << endl <<"Redo attempt " << cd <<" out of ";
+            ndbout << retry_opt << endl << endl;
+            failed = 0 ;
+            START_TIMER;
+            execute(stDelete);
+            STOP_TIMER;
+            PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
+            i-- ;
+            cd++ ;
+          }//while
+          if(0 == failed ){
+            ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
+          }else{
+            ndbout << endl;
+            ndbout << "Redo attempt failed, moving on now..." << endl << endl;
+          }//if
         }//if
       }//if
           
@@ -516,17 +577,61 @@ NDB_COMMAND(flexAsynch, "flexAsynch", "f
       NdbThread_WaitFor(threadLife[i], &tmp);
       NdbThread_Destroy(&threadLife[i]);
     }
-  } 
+  }
+
+  if (tRunType == RunAll)
+  {
+    dropTables(pNdb);
+  }
   delete [] pThreadData;
   delete pNdb;
 
-  //printing errorCounters
-  flexAsynchErrorData->printErrorCounters(ndbout);
-
-  print("insert", a_i);
-  print("update", a_u);
-  print("delete", a_d);
-  print("read  ", a_r);
+  if (tRunType == RunAll ||
+      tRunType == RunInsert ||
+      tRunType == RunDelete ||
+      tRunType == RunUpdate ||
+      tRunType == RunRead)
+  {
+    //printing errorCounters
+    flexAsynchErrorData->printErrorCounters(ndbout);
+    if (tRunType == RunAll) {
+      print("insert", a_i);
+      print("update", a_u);
+      print("delete", a_d);
+      print("read  ", a_r);
+    }
+  }
+  if (tRunType == RunInsert ||
+      tRunType == RunRead ||
+      tRunType == RunUpdate ||
+      tRunType == RunDelete)
+  {
+    longlong total_executions = 0;
+    longlong total_transactions;
+    longlong exec_time;
+
+    if (tRunType == RunInsert || tRunType == RunDelete) {
+      total_executions = (longlong)tNoOfTransactions;
+      total_executions *= (longlong)tNoOfThreads;
+    } else {
+      for (Uint32 i = 0; i < tNoOfThreads; i++){
+        total_executions += ThreadExecutions[i];
+      }
+    }
+    total_transactions = total_executions * tNoOfParallelTrans;
+    if (tRunType == RunInsert || tRunType == RunDelete) {
+      exec_time = (longlong)timer.elapsedTime();
+    } else {
+      exec_time = (longlong)tExecutionTime * 1000;
+    }
+    ndbout << "Total number of transactions is " << total_transactions;
+    ndbout << endl;
+    ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
+
+    total_transactions = (total_transactions * 1000) / exec_time;
+    int trans_per_sec = (int)total_transactions;
+    ndbout << "Total transactions per second " << trans_per_sec << endl;
+  }
 
   delete [] g_cluster_connection;
 
@@ -551,7 +656,7 @@ threadLoop(void* ThreadData)
   localNdb = new Ndb(g_cluster_connection+(threadNo % tConnections), "TEST_DB");
   localNdb->init(1024);
   localNdb->waitUntilReady(10000);
-  unsigned int threadBase = (threadNo << 16) + tNodeId ;
+  unsigned int threadBase = (threadNo << 16);
   
   for (;;){
     while (ThreadStart[threadNo] == stIdle) {
@@ -565,8 +670,14 @@ threadLoop(void* ThreadData)
 
     tType = ThreadStart[threadNo];
     ThreadStart[threadNo] = stIdle;
-    if(!executeThread(tabThread, tType, localNdb, threadBase)){
-      break;
+    if (tRunType == RunAll || tRunType == RunInsert || tRunType == RunDelete){
+      if(!executeThread(tabThread, tType, localNdb, threadBase)){
+        break;
+      }
+    } else {
+      if(!executeTransLoop(tabThread, tType, localNdb, threadBase, threadNo)){
+        break;
+      }
     }
     ThreadReady[threadNo] = 1;
   }//for
@@ -577,80 +688,131 @@ threadLoop(void* ThreadData)
   return NULL;
 }//threadLoop()
 
-static 
-bool
-executeThread(ThreadNdb* pThread, 
-	      StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+static int error_count = 0;
 
+static bool
+executeTrans(ThreadNdb* pThread,
+             StartType aType,
+             Ndb* aNdbObject,
+             unsigned int threadBase,
+             unsigned int i)
+{
   NdbConnection* tConArray[1024];
   unsigned int tBase;
   unsigned int tBase2;
 
-  unsigned int extraLoops= 0; // (aType == stRead) ? 100000 : 0;
-
-  for (unsigned int ex= 0; ex < (1 + extraLoops); ex++)
-  {
-    for (unsigned int i = 0; i < tNoOfTransactions; i++) {
-      if (tLocal == false) {
-        tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
-      } else {
-        tBase = i * tNoOfParallelTrans * MAX_SEEK;
-      }//if
-      START_REAL_TIME;
-      for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
-        if (tLocal == false) {
-          tBase2 = tBase + (j * tNoOfOpsPerTrans);
-        } else {
-          tBase2 = tBase + (j * MAX_SEEK);
-          tBase2 = getKey(threadBase, tBase2);
-        }//if
-        if (startTransGuess == true) {
-	  union {
-            Uint64 Tkey64;
-            Uint32 Tkey32[2];
-	  };
-          Tkey32[0] = threadBase;
-          Tkey32[1] = tBase2;
-          tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
-                                                      (const char*)&Tkey64, //Main PKey
-                                                      (Uint32)4);           //Key Length
-        } else {
-          tConArray[j] = aNdbObject->startTransaction();
-        }//if
-        if (tConArray[j] == NULL && 
-            !error_handler(aNdbObject->getNdbError()) ){
-          ndbout << endl << "Unable to recover! Quiting now" << endl ;
-          return false;
-        }//if
-        
-        for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
-          //-------------------------------------------------------
-          // Define the operation, but do not execute it yet.
-          //-------------------------------------------------------
-          if (tNdbRecord)
-            defineNdbRecordOperation(pThread, 
-                                     tConArray[j], aType, threadBase,(tBase2+k));
-          else
-            defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
-        }//for
-        
-        tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
-      }//for
-      STOP_REAL_TIME;
+  if (tLocal == false) {
+    tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
+  } else {
+    tBase = i * tNoOfParallelTrans * MAX_SEEK;
+  }//if
+  START_REAL_TIME;
+  for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
+    if (tLocal == false) {
+      tBase2 = tBase + (j * tNoOfOpsPerTrans);
+    } else {
+      tBase2 = tBase + (j * MAX_SEEK);
+      tBase2 = getKey(threadBase, tBase2);
+    }//if
+    if (startTransGuess == true) {
+      union {
+        Uint64 Tkey64;
+        Uint32 Tkey32[2];
+      };
+      Tkey32[0] = threadBase;
+      Tkey32[1] = tBase2;
+      tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
+                                                  (const char*)&Tkey64, //Main PKey
+                                                  (Uint32)4);           //Key Length
+    } else {
+      tConArray[j] = aNdbObject->startTransaction();
+    }//if
+    if (tConArray[j] == NULL){
+      error_handler(aNdbObject->getNdbError());
+      ndbout << endl << "Unable to recover! Quiting now" << endl ;
+      return false;
+    }//if
+    
+    for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
       //-------------------------------------------------------
-      // Now we have defined a set of operations, it is now time
-      // to execute all of them.
+      // Define the operation, but do not execute it yet.
       //-------------------------------------------------------
-      int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
-      while (unsigned(Tcomp) < tNoOfParallelTrans) {
-        int TlocalComp = aNdbObject->pollNdb(3000, 0);
-        Tcomp += TlocalComp;
-      }//while
-      for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
-        aNdbObject->closeTransaction(tConArray[j]);
-      }//for
+      if (tNdbRecord)
+        defineNdbRecordOperation(pThread, 
+                                 tConArray[j], aType, threadBase,(tBase2+k));
+      else
+        defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
     }//for
-  } // for
+    
+    tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
+  }//for
+  STOP_REAL_TIME;
+  //-------------------------------------------------------
+  // Now we have defined a set of operations, it is now time
+  // to execute all of them.
+  //-------------------------------------------------------
+  int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
+  while (unsigned(Tcomp) < tNoOfParallelTrans) {
+    int TlocalComp = aNdbObject->pollNdb(3000, 0);
+    Tcomp += TlocalComp;
+  }//while
+  for (unsigned int j = 0 ; j < tNoOfParallelTrans ; j++) {
+    if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
+      error_count++;
+      ndbout << "i = " << i << ", j = " << j << ", error = ";
+      ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
+      ndbout << hex << threadBase << endl;
+    }
+    aNdbObject->closeTransaction(tConArray[j]);
+  }//for
+  return true;
+}
+
+static 
+bool
+executeTransLoop(ThreadNdb* pThread, 
+                 StartType aType,
+                 Ndb* aNdbObject,
+                 unsigned int threadBase,
+                 int threadNo) {
+  bool continue_flag = true;
+  int time_expired;
+  longlong executions = 0;
+  unsigned int i = 0;
+  DEFINE_TIMER;
+
+  ThreadExecutions[threadNo] = 0;
+  START_TIMER;
+  do
+  {
+    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i++))
+      return false;
+    STOP_TIMER;
+    time_expired = (int)(timer.elapsedTime() / 1000);
+    if (time_expired < tWarmupTime)
+      ; //Do nothing
+    else if (time_expired < (tWarmupTime + tExecutionTime)){
+      executions++; //Count measurement
+    }
+    else if (time_expired < (tWarmupTime + tExecutionTime + tCooldownTime))
+      ; //Do nothing
+    else
+      continue_flag = false; //Time expired
+    if (i == tNoOfTransactions) /* Make sure the record exists */
+      i = 0;
+  } while (continue_flag);
+  ThreadExecutions[threadNo] = executions;
+  return true;
+}//executeTransLoop()
+
+static 
+bool
+executeThread(ThreadNdb* pThread, 
+	      StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
+  for (unsigned int i = 0; i < tNoOfTransactions; i++) {
+    if (!executeTrans(pThread, aType, aNdbObject, threadBase, i))
+      return false;
+  }//for
   return true;
 }//executeThread()
 
@@ -880,8 +1042,20 @@ static void setTableNames()
       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%u", i, 
                (unsigned)(NdbTick_CurrentMillisecond()+rand()));
     } else {
-      BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
+      BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", tStdTableNum);
     }
+    ndbout << "Using table name " << tableName[0] << endl;
+  }
+}
+
+static void
+dropTables(Ndb* pMyNdb)
+{
+  int i;
+  for (i = 0; i < MAXTABLES; i++)
+  {
+    ndbout << "Dropping table " << tableName[i] << "..." << endl;
+    pMyNdb->getDictionary()->dropTable(tableName[i]);
   }
 }
 
@@ -893,8 +1067,8 @@ createTables(Ndb* pMyNdb){
   NdbSchemaOp           *MySchemaOp;
   int                   check;
 
-  if (theTableCreateFlag == 0) {
-    for(int i=0; i < 1 ;i++) {
+  if (theTableCreateFlag == 0 || tRunType == RunCreateTable) {
+    for(int i=0; i < MAXTABLES ;i++) {
       ndbout << "Creating " << tableName[i] << "..." << endl;
       MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
       
@@ -953,31 +1127,35 @@ createTables(Ndb* pMyNdb){
         return -1;
       
       NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
+    }
+  }
+  if (tNdbRecord)
+  {
+    for(int i=0; i < MAXTABLES ;i++) {
+      NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
+      const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
 
-      if (tNdbRecord)
+      if (pTab == NULL){
+        error_handler(pDict->getNdbError());
+        return -1;
+      }
+      int off = 0;
+      Vector<NdbDictionary::RecordSpecification> spec;
+      for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
       {
-	NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
-	const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
-	
-	int off = 0;
-	Vector<NdbDictionary::RecordSpecification> spec;
-	for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
-	{
-	  NdbDictionary::RecordSpecification r0;
-	  r0.column = pTab->getColumn(j);
-	  r0.offset = off;
-	  off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
-	  spec.push_back(r0);
-	}
-	g_record[i] = 
+        NdbDictionary::RecordSpecification r0;
+        r0.column = pTab->getColumn(j);
+        r0.offset = off;
+        off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
+        spec.push_back(r0);
+      }
+      g_record[i] = 
 	  pDict->createRecord(pTab, spec.getBase(), 
 			      spec.size(),
 			      sizeof(NdbDictionary::RecordSpecification));
-	assert(g_record[i]);
-      }
+      assert(g_record[i]);
     }
   }
-  
   return 0;
 }
 
@@ -996,6 +1174,14 @@ bool error_handler(const NdbError & err)
   return false ; // return false to abort
 }
 
+static void
+setAggregateRun(void)
+{
+  tNoOfLoops = 1;
+  tExtraReadLoop = 0;
+  theTableCreateFlag = 1;
+}
+
 static
 int 
 readArguments(int argc, const char** argv){
@@ -1110,6 +1296,43 @@ readArguments(int argc, const char** arg
       tExtraReadLoop = atoi(argv[i+1]);
     } else if (strcmp(argv[i], "-con") == 0){
       tConnections = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-insert") == 0){
+      setAggregateRun();
+      tRunType = RunInsert;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-read") == 0){
+      setAggregateRun();
+      tRunType = RunRead;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-update") == 0){
+      setAggregateRun();
+      tRunType = RunUpdate;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-delete") == 0){
+      setAggregateRun();
+      tRunType = RunDelete;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-create_table") == 0){
+      tRunType = RunCreateTable;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-drop_table") == 0){
+      tRunType = RunDropTable;
+      argc++;
+      i--;
+    } else if (strcmp(argv[i], "-warmup_time") == 0){
+      tWarmupTime = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-execution_time") == 0){
+      tExecutionTime = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-cooldown_time") == 0){
+      tCooldownTime = atoi(argv[i+1]);
+    } else if (strcmp(argv[i], "-table") == 0){
+      tStdTableNum = atoi(argv[i+1]);
+      theStdTableNameFlag = 1;
     } else {
       return -1;
     }
@@ -1131,7 +1354,6 @@ readArguments(int argc, const char** arg
 static
 void
 input_error(){
-  
   ndbout_c("FLEXASYNCH");
   ndbout_c("   Perform benchmark of insert, update and delete transactions");
   ndbout_c(" ");
@@ -1156,7 +1378,18 @@ input_error(){
   ndbout_c("   -force Force send when communicating");
   ndbout_c("   -non_adaptive Send at a 10 millisecond interval");
   ndbout_c("   -local Number of part, only use keys in one part out of 16");
-  ndbout_c("   -ndbrecord");
+  ndbout_c("   -ndbrecord Use NDB Record");
+  ndbout_c("   -r Number of extra loops");
+  ndbout_c("   -insert Only run inserts on standard table");
+  ndbout_c("   -read Only run reads on standard table");
+  ndbout_c("   -update Only run updates on standard table");
+  ndbout_c("   -delete Only run deletes on standard table");
+  ndbout_c("   -create_table Only run Create Table of standard table");
+  ndbout_c("   -drop_table Only run Drop Table on standard table");
+  ndbout_c("   -warmup_time Warmup Time before measurement starts");
+  ndbout_c("   -execution_time Execution Time where measurement is done");
+  ndbout_c("   -cooldown_time Cooldown time after measurement completed");
+  ndbout_c("   -table Number of standard table, default 0");
 }
   
 template class Vector<NdbDictionary::RecordSpecification>;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3581 to 3582) Ole John Aske14 Nov