List:Commits« Previous MessageNext Message »
From:magnus.blaudd Date:November 21 2011 12:54pm
Subject:bzr push into mysql-trunk-cluster branch (magnus.blaudd:3414 to 3415)
View as plain text  
 3415 magnus.blaudd@stripped	2011-11-21 [merge]
      Merge 7.2 -> trunk-cluster

    added:
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/blocks/trpman.hpp
    modified:
      mysql-test/suite/ndb/r/ndb_index_stat.result
      mysql-test/suite/ndb/t/ndb_index_stat.test
      mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result
      mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test
      scripts/mysql_system_tables.sql
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster_binlog.cc
      sql/log_event.cc
      sql/mysqld.h
      sql/ndb_event_data.cc
      sql/ndb_event_data.h
      sql/ndb_schema_dist.cc
      sql/ndb_schema_dist.h
      sql/ndb_share.cc
      sql/ndb_share.h
      sql/sys_vars.cc
      storage/ndb/include/kernel/ndb_limits.h
      storage/ndb/src/common/debugger/EventLogger.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
      storage/ndb/src/kernel/blocks/trix/Trix.cpp
      storage/ndb/src/kernel/ndbd.cpp
      storage/ndb/src/kernel/vm/Ndbinfo.hpp
      storage/ndb/tools/ndbinfo_sql.cpp
 3414 Martin Skold	2011-11-17 [merge]
      Null merge

    modified:
      mysql-test/suite/ndb/r/ndb_condition_pushdown.result
      mysql-test/suite/ndb/t/ndb_condition_pushdown.test
=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result	2011-11-16 09:29:49 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result	2011-11-21 12:53:01 +0000
@@ -548,6 +548,23 @@ SELECT count(*) as Count FROM t1 WHERE L
 Count
 256
 drop table t1;
+create table t1 (
+a int unsigned not null,
+b char(180) not null,
+primary key using hash (a),
+index (b)
+) engine=ndb charset=binary;
+insert into t1 values (1,'a'),(2,'b'),(3,'c');
+analyze table t1;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+analyze table t1;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+analyze table t1;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+drop table t1;
 set @is_enable = @is_enable_default;
 set @is_enable = NULL;
 # is_enable_on=0 is_enable_off=1

=== modified file 'mysql-test/suite/ndb/t/ndb_index_stat.test'
--- a/mysql-test/suite/ndb/t/ndb_index_stat.test	2011-09-02 06:43:38 +0000
+++ b/mysql-test/suite/ndb/t/ndb_index_stat.test	2011-11-19 07:56:25 +0000
@@ -374,5 +374,20 @@ SELECT count(*) as Count FROM t1 WHERE L
 
 drop table t1;
 
+# bug#13407848
+# signed char in compute length bytes caused ndbrequire in Trix.cpp
+
+create table t1 (
+  a int unsigned not null,
+  b char(180) not null,
+  primary key using hash (a),
+  index (b)
+) engine=ndb charset=binary;
+insert into t1 values (1,'a'),(2,'b'),(3,'c');
+analyze table t1;
+analyze table t1;
+analyze table t1;
+drop table t1;
+
 set @is_enable = @is_enable_default;
 source ndb_index_stat_enable.inc;

=== modified file 'mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result'
--- a/mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result	2011-11-15 14:25:58 +0000
+++ b/mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result	2011-11-21 12:53:01 +0000
@@ -3,22 +3,10 @@ include/master-slave.inc
 show variables like 'slave_allow_batching';
 Variable_name	Value
 slave_allow_batching	OFF
-Show that slave_allow_batching cannot be changed while slave is running
-set global slave_allow_batching=ON;
-ERROR HY000: This operation cannot be performed with a running slave; run STOP SLAVE first
-show warnings;
-Level	Code	Message
-Error	1198	This operation cannot be performed with a running slave; run STOP SLAVE first
-show variables like 'slave_allow_batching';
-Variable_name	Value
-slave_allow_batching	OFF
-Now stop slave and change it
-stop slave;
 set global slave_allow_batching=ON;
 show variables like 'slave_allow_batching';
 Variable_name	Value
 slave_allow_batching	ON
-start slave;
 Now the normal test
 CREATE TABLE t1 (C1 CHAR(1), C2 CHAR(1), INDEX (C1)) ENGINE = 'INNODB'  ;
 SELECT * FROM t1;
@@ -501,6 +489,7 @@ SET GLOBAL SLAVE_TYPE_CONVERSIONS = @sav
 call mtr.add_suppression("Slave SQL.*Table definition on master and slave does not match: Column 1 size mismatch.* Error_code: 1535");
 call mtr.add_suppression("Slave SQL.*Could not execute Delete_rows event on table test.t1.* Error_code: 1032");
 call mtr.add_suppression("Slave SQL.*Column 1 of table .test.t.. cannot be converted from type.*, Error_code: 1677");
+call mtr.add_suppression("The slave coordinator and worker threads are stopped, possibly leaving data in inconsistent state");
 include/rpl_reset.inc
 [expecting slave to replicate correctly]
 INSERT INTO t4 VALUES (1, "", 1);

=== modified file 'mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test'
--- a/mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test	2011-11-15 14:25:58 +0000
+++ b/mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test	2011-11-18 09:46:57 +0000
@@ -4,18 +4,9 @@
 --connection slave
 show variables like 'slave_allow_batching';
 
---echo Show that slave_allow_batching cannot be changed while slave is running
---error ER_SLAVE_MUST_STOP
 set global slave_allow_batching=ON;
-show warnings;
 show variables like 'slave_allow_batching';
 
---echo Now stop slave and change it
-stop slave;
-set global slave_allow_batching=ON;
-show variables like 'slave_allow_batching';
-start slave;
-
 --echo Now the normal test
 --connection master
 

=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql	2011-11-15 13:13:38 +0000
+++ b/scripts/mysql_system_tables.sql	2011-11-21 12:53:01 +0000
@@ -1837,7 +1837,7 @@ EXECUTE stmt;
 DROP PREPARE stmt;
 
 # ndbinfo.counters
-SET @str=IF(@have_ndbinfo,'CREATE OR REPLACE DEFINER=`root@localhost` SQL SECURITY INVOKER VIEW `ndbinfo`.`counters` AS SELECT node_id, b.block_name, block_instance, counter_id, CASE counter_id  WHEN 1 THEN "ATTRINFO"  WHEN 2 THEN "TRANSACTIONS"  WHEN 3 THEN "COMMITS"  WHEN 4 THEN "READS"  WHEN 5 THEN "SIMPLE_READS"  WHEN 6 THEN "WRITES"  WHEN 7 THEN "ABORTS"  WHEN 8 THEN "TABLE_SCANS"  WHEN 9 THEN "RANGE_SCANS"  WHEN 10 THEN "OPERATIONS"  WHEN 11 THEN "READS_RECEIVED"  WHEN 12 THEN "LOCAL_READS_SENT"  WHEN 13 THEN "REMOTE_READS_SENT"  WHEN 14 THEN "READS_NOT_FOUND"  WHEN 15 THEN "TABLE_SCANS_RECEIVED"  WHEN 16 THEN "LOCAL_TABLE_SCANS_SENT"  WHEN 17 THEN "RANGE_SCANS_RECEIVED"  WHEN 18 THEN "LOCAL_RANGE_SCANS_SENT"  WHEN 19 THEN "REMOTE_RANGE_SCANS_SENT"  WHEN 20 THEN "SCAN_BATCHES_RETURNED"  WHEN 21 THEN "SCAN_ROWS_RETURNED"  WHEN 22 THEN "PRUNED_RANGE_SCANS_RECEIVED"  WHEN 23 THEN "CONST_PRUNED_RANGE_SCANS_RECEIVED"  ELSE "<unknown>"  END AS counter_name, val FROM `ndbinf!
 o`.`ndb$counters` c LEFT JOIN `ndbinfo`.blocks b ON c.block_number = b.block_number','SET @dummy = 0');
+SET @str=IF(@have_ndbinfo,'CREATE OR REPLACE DEFINER=`root@localhost` SQL SECURITY INVOKER VIEW `ndbinfo`.`counters` AS SELECT node_id, b.block_name, block_instance, counter_id, CASE counter_id  WHEN 1 THEN "ATTRINFO"  WHEN 2 THEN "TRANSACTIONS"  WHEN 3 THEN "COMMITS"  WHEN 4 THEN "READS"  WHEN 5 THEN "SIMPLE_READS"  WHEN 6 THEN "WRITES"  WHEN 7 THEN "ABORTS"  WHEN 8 THEN "TABLE_SCANS"  WHEN 9 THEN "RANGE_SCANS"  WHEN 10 THEN "OPERATIONS"  WHEN 11 THEN "READS_RECEIVED"  WHEN 12 THEN "LOCAL_READS_SENT"  WHEN 13 THEN "REMOTE_READS_SENT"  WHEN 14 THEN "READS_NOT_FOUND"  WHEN 15 THEN "TABLE_SCANS_RECEIVED"  WHEN 16 THEN "LOCAL_TABLE_SCANS_SENT"  WHEN 17 THEN "RANGE_SCANS_RECEIVED"  WHEN 18 THEN "LOCAL_RANGE_SCANS_SENT"  WHEN 19 THEN "REMOTE_RANGE_SCANS_SENT"  WHEN 20 THEN "SCAN_BATCHES_RETURNED"  WHEN 21 THEN "SCAN_ROWS_RETURNED"  WHEN 22 THEN "PRUNED_RANGE_SCANS_RECEIVED"  WHEN 23 THEN "CONST_PRUNED_RANGE_SCANS_RECEIVED"  WHEN 24 THEN "LOCAL_READS"  ELSE "<unknown>"  END AS co!
 unter_name, val FROM `ndbinfo`.`ndb$counters` c LEFT JOIN `ndbinfo`.blocks b ON c.block_number = b.block_number','SET @dummy = 0');
 PREPARE stmt FROM @str;
 EXECUTE stmt;
 DROP PREPARE stmt;

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-11-16 09:29:49 +0000
+++ b/sql/ha_ndbcluster.cc	2011-11-21 12:53:01 +0000
@@ -11011,6 +11011,7 @@ static
 void
 delete_table_drop_share(NDB_SHARE* share, const char * path)
 {
+  DBUG_ENTER("delete_table_drop_share");
   if (share)
   {
     pthread_mutex_lock(&ndbcluster_mutex);
@@ -11042,6 +11043,7 @@ do_drop:
     }
     pthread_mutex_unlock(&ndbcluster_mutex);
   }
+  DBUG_VOID_RETURN;
 }
 
 /* static version which does not need a handler */
@@ -11052,7 +11054,7 @@ ha_ndbcluster::drop_table_impl(THD *thd,
                                const char *db,
                                const char *table_name)
 {
-  DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
+  DBUG_ENTER("ha_ndbcluster::drop_table_impl");
   NDBDICT *dict= ndb->getDictionary();
   int ndb_table_id= 0;
   int ndb_table_version= 0;
@@ -11189,8 +11191,7 @@ int ha_ndbcluster::delete_table(const ch
 {
   THD *thd= current_thd;
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  Ndb *ndb;
-  int error= 0;
+
   DBUG_ENTER("ha_ndbcluster::delete_table");
   DBUG_PRINT("enter", ("name: %s", name));
 
@@ -11202,6 +11203,7 @@ int ha_ndbcluster::delete_table(const ch
       dropped inside ndb.
       Just drop local files.
     */
+    DBUG_PRINT("info", ("Table is already dropped in NDB"));
     delete_table_drop_share(0, name);
     DBUG_RETURN(handler::delete_table(name));
   }
@@ -11217,16 +11219,12 @@ int ha_ndbcluster::delete_table(const ch
 
   if (check_ndb_connection(thd))
   {
-    error= HA_ERR_NO_CONNECTION;
-    goto err;
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
 
-  ndb= thd_ndb->ndb;
-
   if (!thd_ndb->has_required_global_schema_lock("ha_ndbcluster::delete_table"))
   {
-    error= HA_ERR_NO_CONNECTION;
-    goto err;
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
 
   /*
@@ -11234,6 +11232,8 @@ int ha_ndbcluster::delete_table(const ch
     If it was already gone it might have been dropped
     remotely, give a warning and then drop .ndb file.
    */
+  int error;
+  Ndb* ndb= thd_ndb->ndb;
   if (!(error= drop_table_impl(thd, this, ndb, name,
                                m_dbname, m_tabname)) ||
       error == HA_ERR_NO_SUCH_TABLE)
@@ -11244,7 +11244,6 @@ int ha_ndbcluster::delete_table(const ch
       error= error1;
   }
 
-err:
   DBUG_RETURN(error);
 }
 
@@ -13379,38 +13378,15 @@ static uchar *ndbcluster_get_key(NDB_SHA
 
 #ifndef DBUG_OFF
 
-static void print_share(const char* where, NDB_SHARE* share)
-{
-  fprintf(DBUG_FILE,
-          "%s %s.%s: use_count: %u, commit_count: %lu\n",
-          where, share->db, share->table_name, share->use_count,
-          (ulong) share->commit_count);
-  fprintf(DBUG_FILE,
-          "  - key: %s, key_length: %d\n",
-          share->key, share->key_length);
-
-  Ndb_event_data *event_data= 0;
-  if (share->event_data)
-    event_data= share->event_data;
-  else if (share->op)
-    event_data= (Ndb_event_data *) share->op->getCustomData();
-  if (event_data)
-  {
-    fprintf(DBUG_FILE,
-            "  - event_data->shadow_table: %p %s.%s\n",
-            event_data->shadow_table, event_data->shadow_table->s->db.str,
-            event_data->shadow_table->s->table_name.str);
-  }
-}
-
-
 static void print_ndbcluster_open_tables()
 {
   DBUG_LOCK_FILE;
   fprintf(DBUG_FILE, ">ndbcluster_open_tables\n");
   for (uint i= 0; i < ndbcluster_open_tables.records; i++)
-    print_share("",
-                (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables, i));
+  {
+    NDB_SHARE* share= (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables, i);
+    share->print("", DBUG_FILE);
+  }
   fprintf(DBUG_FILE, "<ndbcluster_open_tables\n");
   DBUG_UNLOCK_FILE;
 }
@@ -13425,7 +13401,7 @@ static void print_ndbcluster_open_tables
 #define dbug_print_share(t, s)                  \
   DBUG_LOCK_FILE;                               \
   DBUG_EXECUTE("info",                          \
-               print_share((t), (s)););         \
+               (s)->print((t), DBUG_FILE););    \
   DBUG_UNLOCK_FILE;
 
 
@@ -13572,6 +13548,7 @@ int ndbcluster_undo_rename_share(THD *th
   return 0;
 }
 
+
 int ndbcluster_rename_share(THD *thd, NDB_SHARE *share)
 {
   NDB_SHARE *tmp;
@@ -13620,11 +13597,7 @@ int ndbcluster_rename_share(THD *thd, ND
   ha_ndbcluster::set_tabname(share->new_key, share->table_name);
 
   dbug_print_share("ndbcluster_rename_share:", share);
-  Ndb_event_data *event_data= 0;
-  if (share->event_data)
-    event_data= share->event_data;
-  else if (share->op)
-    event_data= (Ndb_event_data *) share->op->getCustomData();
+  Ndb_event_data *event_data= share->get_event_data_ptr();
   if (event_data && event_data->shadow_table)
   {
     if (!IS_TMP_PREFIX(share->table_name))
@@ -13673,6 +13646,75 @@ NDB_SHARE *ndbcluster_get_share(NDB_SHAR
 }
 
 
+
+NDB_SHARE*
+NDB_SHARE::create(const char* key, size_t key_length,
+                  TABLE* table, const char* db_name, const char* table_name)
+{
+  NDB_SHARE* share;
+  if (!(share= (NDB_SHARE*) my_malloc(sizeof(*share),
+                                      MYF(MY_WME | MY_ZEROFILL))))
+    return NULL;
+
+  MEM_ROOT **root_ptr=
+    my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
+  MEM_ROOT *old_root= *root_ptr;
+
+  init_sql_alloc(&share->mem_root, 1024, 0);
+  *root_ptr= &share->mem_root; // remember to reset before return
+  share->flags= 0;
+  share->state= NSS_INITIAL;
+  /* Allocate enough space for key, db, and table_name */
+  share->key= (char*) alloc_root(*root_ptr, 2 * (key_length + 1));
+  share->key_length= key_length;
+  strmov(share->key, key);
+  share->db= share->key + key_length + 1;
+  strmov(share->db, db_name);
+  share->table_name= share->db + strlen(share->db) + 1;
+  strmov(share->table_name, table_name);
+  thr_lock_init(&share->lock);
+  pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
+  share->commit_count= 0;
+  share->commit_count_lock= 0;
+
+#ifdef HAVE_NDB_BINLOG
+  share->m_cfn_share= NULL;
+#endif
+
+  share->op= 0;
+  share->new_op= 0;
+  share->event_data= 0;
+
+  {
+    // Create array of bitmap for keeping track of subscribed nodes
+    // NOTE! Only the NDB_SHARE for ndb_schema really needs this
+    int no_nodes= g_ndb_cluster_connection->no_db_nodes();
+    share->subscriber_bitmap= (MY_BITMAP*)
+      alloc_root(&share->mem_root, no_nodes * sizeof(MY_BITMAP));
+    for (int i= 0; i < no_nodes; i++)
+    {
+      bitmap_init(&share->subscriber_bitmap[i],
+                  (Uint32*)alloc_root(&share->mem_root, max_ndb_nodes/8),
+                  max_ndb_nodes, FALSE);
+      bitmap_clear_all(&share->subscriber_bitmap[i]);
+    }
+  }
+
+  if (ndbcluster_binlog_init_share(current_thd, share, table))
+  {
+    DBUG_PRINT("error", ("get_share: %s could not init share", key));
+    free_root(&share->mem_root, MYF(0));
+    my_free(share, 0);
+    *root_ptr= old_root;
+    return NULL;
+  }
+
+  *root_ptr= old_root;
+
+  return share;
+}
+
+
 static inline
 NDB_SHARE *ndbcluster_get_share(const char *key, TABLE *table,
                                 bool create_if_not_exists)
@@ -13693,50 +13735,30 @@ NDB_SHARE *ndbcluster_get_share(const ch
       DBUG_PRINT("error", ("get_share: %s does not exist", key));
       DBUG_RETURN(0);
     }
-    if ((share= (NDB_SHARE*) my_malloc(sizeof(*share),
-                                       MYF(MY_WME | MY_ZEROFILL))))
-    {
-      MEM_ROOT **root_ptr=
-        my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
-      MEM_ROOT *old_root= *root_ptr;
-      init_sql_alloc(&share->mem_root, 1024, 0);
-      *root_ptr= &share->mem_root; // remember to reset before return
-      share->flags= 0;
-      share->state= NSS_INITIAL;
-      /* enough space for key, db, and table_name */
-      share->key= (char*) alloc_root(*root_ptr, 2 * (length + 1));
-      share->key_length= length;
-      strmov(share->key, key);
-      if (my_hash_insert(&ndbcluster_open_tables, (uchar*) share))
-      {
-        free_root(&share->mem_root, MYF(0));
-        my_free((uchar*) share, 0);
-        *root_ptr= old_root;
-        DBUG_RETURN(0);
-      }
-      thr_lock_init(&share->lock);
-      pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
-      share->commit_count= 0;
-      share->commit_count_lock= 0;
-      share->db= share->key + length + 1;
-      ha_ndbcluster::set_dbname(key, share->db);
-      share->table_name= share->db + strlen(share->db) + 1;
-      ha_ndbcluster::set_tabname(key, share->table_name);
-      if (ndbcluster_binlog_init_share(current_thd, share, table))
-      {
-        DBUG_PRINT("error", ("get_share: %s could not init share", key));
-        ndbcluster_real_free_share(&share);
-        *root_ptr= old_root;
-        DBUG_RETURN(0);
-      }
-      *root_ptr= old_root;
-    }
-    else
+
+    /*
+      Extract db and table name from key (to avoid that NDB_SHARE
+      dependens on ha_ndbcluster)
+    */
+    char db_name_buf[FN_HEADLEN];
+    char table_name_buf[FN_HEADLEN];
+    ha_ndbcluster::set_dbname(key, db_name_buf);
+    ha_ndbcluster::set_tabname(key, table_name_buf);
+
+    if (!(share= NDB_SHARE::create(key, length, table,
+                                   db_name_buf, table_name_buf)))
     {
       DBUG_PRINT("error", ("get_share: failed to alloc share"));
       my_error(ER_OUTOFMEMORY, MYF(0), static_cast<int>(sizeof(*share)));
       DBUG_RETURN(0);
     }
+
+    // Insert the new share in list of open shares
+    if (my_hash_insert(&ndbcluster_open_tables, (uchar*) share))
+    {
+      NDB_SHARE::destroy(share);
+      DBUG_RETURN(0);
+    }
   }
   share->use_count++;
   if (opt_ndb_extra_logging > 9)

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-11-15 13:13:38 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-11-21 12:53:01 +0000
@@ -342,6 +342,13 @@ ndb_binlog_open_shadow_table(THD *thd, N
   /* We can't use 'use_all_columns()' as the file object is not setup yet */
   shadow_table->column_bitmaps_set_no_signal(&shadow_table->s->all_set,
                                              &shadow_table->s->all_set);
+
+  if (shadow_table->s->primary_key == MAX_KEY)
+   share->flags|= NSF_HIDDEN_PK;
+
+  if (shadow_table->s->blob_fields != 0)
+    share->flags|= NSF_BLOB_FLAG;
+
 #ifndef DBUG_OFF
   dbug_print_table("table", shadow_table);
 #endif
@@ -355,48 +362,9 @@ ndb_binlog_open_shadow_table(THD *thd, N
 */
 int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *_table)
 {
-  MEM_ROOT *mem_root= &share->mem_root;
-  int do_event_op= ndb_binlog_running;
-  int error= 0;
   DBUG_ENTER("ndbcluster_binlog_init_share");
 
-#ifdef HAVE_NDB_BINLOG
-  share->m_cfn_share= NULL;
-#endif
-
-  share->op= 0;
-  share->new_op= 0;
-  share->event_data= 0;
-
-  if (!ndb_schema_share &&
-      strcmp(share->db, NDB_REP_DB) == 0 &&
-      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
-    do_event_op= 1;
-  else if (!ndb_apply_status_share &&
-           strcmp(share->db, NDB_REP_DB) == 0 &&
-           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
-    do_event_op= 1;
-
-  if (Ndb_dist_priv_util::is_distributed_priv_table(share->db,
-                                                    share->table_name))
-  {
-    do_event_op= 0;
-  }
-
-  {
-    int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
-    share->subscriber_bitmap= (MY_BITMAP*)
-      alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
-    for (i= 0; i < no_nodes; i++)
-    {
-      bitmap_init(&share->subscriber_bitmap[i],
-                  (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
-                  max_ndb_nodes, FALSE);
-      bitmap_clear_all(&share->subscriber_bitmap[i]);
-    }
-  }
-
-  if (!do_event_op)
+  if (!share->need_events(ndb_binlog_running))
   {
     if (_table)
     {
@@ -409,19 +377,10 @@ int ndbcluster_binlog_init_share(THD *th
     {
       share->flags|= NSF_NO_BINLOG;
     }
-    DBUG_RETURN(error);
-  }
-  while (1) 
-  {
-    if ((error= ndb_binlog_open_shadow_table(thd, share)))
-      break;
-    if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
-      share->flags|= NSF_HIDDEN_PK;
-    if (share->event_data->shadow_table->s->blob_fields != 0)
-      share->flags|= NSF_BLOB_FLAG;
-    break;
+    DBUG_RETURN(0);
   }
-  DBUG_RETURN(error);
+
+  DBUG_RETURN(ndb_binlog_open_shadow_table(thd, share));
 }
 
 /*****************************************************************
@@ -2011,19 +1970,21 @@ ndb_handle_schema_change(THD *thd, Ndb *
                          const Ndb_event_data *event_data)
 {
   DBUG_ENTER("ndb_handle_schema_change");
-  NDB_SHARE *share= event_data->share;
-  TABLE *shadow_table= event_data->shadow_table;
-  const char *tabname= shadow_table->s->table_name.str;
-  const char *dbname= shadow_table->s->db.str;
-  bool do_close_cached_tables= FALSE;
-  bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
 
   if (pOp->getEventType() == NDBEVENT::TE_ALTER)
   {
+    DBUG_PRINT("exit", ("Event type is TE_ALTER"));
     DBUG_RETURN(0);
   }
+
+  DBUG_ASSERT(event_data);
   DBUG_ASSERT(pOp->getEventType() == NDBEVENT::TE_DROP ||
               pOp->getEventType() == NDBEVENT::TE_CLUSTER_FAILURE);
+
+  NDB_SHARE *share= event_data->share;
+  TABLE *shadow_table= event_data->shadow_table;
+  const char *tabname= shadow_table->s->table_name.str;
+  const char *dbname= shadow_table->s->db.str;
   {
     Thd_ndb *thd_ndb= get_thd_ndb(thd);
     Ndb *ndb= thd_ndb->ndb;
@@ -2049,10 +2010,10 @@ ndb_handle_schema_change(THD *thd, Ndb *
   {
     share->op= 0;
   }
-  // either just us or drop table handling as well
-      
-  /* Signal ha_ndbcluster::delete/rename_table that drop is done */
   pthread_mutex_unlock(&share->mutex);
+
+  /* Signal ha_ndbcluster::delete/rename_table that drop is done */
+  DBUG_PRINT("info", ("signal that drop is done"));
   (void) pthread_cond_signal(&injector_cond);
 
   pthread_mutex_lock(&ndbcluster_mutex);
@@ -2060,6 +2021,9 @@ ndb_handle_schema_change(THD *thd, Ndb *
   DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                            share->key, share->use_count));
   free_share(&share, TRUE);
+
+  bool do_close_cached_tables= FALSE;
+  bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
   if (is_remote_change && share && share->state != NSS_DROPPED)
   {
     DBUG_PRINT("info", ("remote change"));
@@ -2083,15 +2047,13 @@ ndb_handle_schema_change(THD *thd, Ndb *
     share= 0;
   pthread_mutex_unlock(&ndbcluster_mutex);
 
-  if (event_data)
-  {
-    delete event_data;
-    pOp->setCustomData(NULL);
-  }
+  DBUG_PRINT("info", ("Deleting event_data"));
+  delete event_data;
+  pOp->setCustomData(NULL);
 
+  DBUG_PRINT("info", ("Dropping event operation"));
   pthread_mutex_lock(&injector_mutex);
   is_ndb->dropEventOperation(pOp);
-  pOp= 0;
   pthread_mutex_unlock(&injector_mutex);
 
   if (do_close_cached_tables)
@@ -2531,21 +2493,23 @@ class Ndb_schema_event_handler {
 
 
   void
-  log_after_epoch(Ndb_schema_op* schema)
+  handle_after_epoch(Ndb_schema_op* schema)
   {
-    DBUG_ENTER("log_after_epoch");
+    DBUG_ENTER("handle_after_epoch");
+    DBUG_PRINT("info", ("Pushing Ndb_schema_op on list to be "
+                        "handled after epoch"));
     assert(!is_post_epoch()); // Only before epoch
-    m_post_epoch_log_list.push_back(schema, m_mem_root);
+    m_post_epoch_handle_list.push_back(schema, m_mem_root);
     DBUG_VOID_RETURN;
   }
 
 
   void
-  unlock_after_epoch(Ndb_schema_op* schema)
+  ack_after_epoch(Ndb_schema_op* schema)
   {
-    DBUG_ENTER("unlock_after_epoch");
+    DBUG_ENTER("ack_after_epoch");
     assert(!is_post_epoch()); // Only before epoch
-    m_post_epoch_unlock_list.push_back(schema, m_mem_root);
+    m_post_epoch_ack_list.push_back(schema, m_mem_root);
     DBUG_VOID_RETURN;
   }
 
@@ -2559,18 +2523,21 @@ class Ndb_schema_event_handler {
   void
   ndbapi_invalidate_table(const char* db_name, const char* table_name) const
   {
+    DBUG_ENTER("ndbapi_invalidate_table");
     Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
     Ndb *ndb= thd_ndb->ndb;
 
     ndb->setDatabaseName(db_name);
     Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
     ndbtab_g.invalidate();
+    DBUG_VOID_RETURN;
   }
 
 
   void
   mysqld_close_cached_table(const char* db_name, const char* table_name) const
   {
+    DBUG_ENTER("mysqld_close_cached_table");
      // Just mark table as "need reopen"
     const bool wait_for_refresh = false;
     // Not waiting -> no timeout needed
@@ -2583,11 +2550,57 @@ class Ndb_schema_event_handler {
 
     close_cached_tables(m_thd, &table_list,
                         wait_for_refresh, timeout);
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  mysqld_write_frm_from_ndb(const char* db_name,
+                            const char* table_name) const
+  {
+    DBUG_ENTER("mysqld_write_frm_from_ndb");
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Ndb *ndb= thd_ndb->ndb;
+    Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
+    const NDBTAB *ndbtab= ndbtab_g.get_table();
+
+    char key[FN_REFLEN];
+    build_table_filename(key, sizeof(key)-1,
+                         db_name, table_name, NullS, 0);
+
+    uchar *data= 0, *pack_data= 0;
+    size_t length, pack_length;
+
+    if (readfrm(key, &data, &length) == 0 &&
+        packfrm(data, length, &pack_data, &pack_length) == 0 &&
+        cmp_frm(ndbtab, pack_data, pack_length))
+    {
+      DBUG_PRINT("info", ("Detected frm change of table %s.%s",
+                          db_name, table_name));
+
+      DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
+                        ndbtab->getFrmLength());
+      my_free(data);
+      data= NULL;
+
+      int error;
+      if ((error= unpackfrm(&data, &length,
+                            (const uchar*) ndbtab->getFrmData())) ||
+          (error= writefrm(key, data, length)))
+      {
+        sql_print_error("NDB: Failed write frm for %s.%s, error %d",
+                        db_name, table_name, error);
+      }
+    }
+    my_free(data);
+    my_free(pack_data);
+    DBUG_VOID_RETURN;
   }
 
 
   NDB_SHARE* get_share(Ndb_schema_op* schema) const
   {
+    DBUG_ENTER("get_share(Ndb_schema_op*)");
     char key[FN_REFLEN + 1];
     build_table_filename(key, sizeof(key) - 1,
                          schema->db, schema->name, "", 0);
@@ -2597,7 +2610,7 @@ class Ndb_schema_event_handler {
       DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                                share->key, share->use_count));
     }
-    return share;
+    DBUG_RETURN(share);
   }
 
 
@@ -2630,18 +2643,17 @@ class Ndb_schema_event_handler {
   }
 
 
+  bool is_local_table(const char* db_name, const char* table_name) const
+  {
+    return ndbcluster_check_if_local_table(db_name, table_name);
+  }
+
+
   void handle_clear_slock(Ndb_schema_op* schema)
   {
-    if (!is_post_epoch())
-    {
-      /*
-        handle slock after epoch is completed to ensure that
-        schema events get inserted in the binlog after any data
-        events
-      */
-      log_after_epoch(schema);
-      return;
-    }
+    DBUG_ENTER("handle_clear_slock");
+
+    assert(is_post_epoch());
 
     char key[FN_REFLEN + 1];
     build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
@@ -2654,7 +2666,7 @@ class Ndb_schema_event_handler {
       if (opt_ndb_extra_logging > 19)
         sql_print_information("NDB: Discarding event...no obj: %s (%u/%u)",
                               key, schema->id, schema->version);
-      return;
+      DBUG_VOID_RETURN;
     }
 
     if (ndb_schema_object->table_id != schema->id ||
@@ -2670,7 +2682,7 @@ class Ndb_schema_event_handler {
                               schema->id,
                               schema->version);
       ndb_free_schema_object(&ndb_schema_object);
-      return;
+      DBUG_VOID_RETURN;
     }
 
     /*
@@ -2699,17 +2711,20 @@ class Ndb_schema_event_handler {
 
     /* Wake up the waiter */
     pthread_cond_signal(&injector_cond);
-    return;
+
+    DBUG_VOID_RETURN;
   }
 
 
   void
   handle_offline_alter_table_commit(Ndb_schema_op* schema)
   {
+    DBUG_ENTER("handle_offline_alter_table_commit");
+
     assert(is_post_epoch()); // Always after epoch
 
     if (schema->node_id == own_nodeid())
-      return;
+      DBUG_VOID_RETURN;
 
     write_schema_op_to_binlog(m_thd, schema);
     ndbapi_invalidate_table(schema->db, schema->name);
@@ -2749,7 +2764,7 @@ class Ndb_schema_event_handler {
       free_share(&share);
     }
 
-    if (ndbcluster_check_if_local_table(schema->db, schema->name) &&
+    if (is_local_table(schema->db, schema->name) &&
        !Ndb_dist_priv_util::is_distributed_priv_table(schema->db,
                                                       schema->name))
     {
@@ -2757,13 +2772,14 @@ class Ndb_schema_event_handler {
                       "from binlog schema event '%s' from node %d.",
                       schema->db, schema->name, schema->query,
                       schema->node_id);
-      return;
+      DBUG_VOID_RETURN;
     }
 
     if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
     {
       print_could_not_discover_error(m_thd, schema);
     }
+    DBUG_VOID_RETURN;
   }
 
 
@@ -2775,96 +2791,63 @@ class Ndb_schema_event_handler {
     ndbapi_invalidate_table(schema->db, schema->name);
     mysqld_close_cached_table(schema->db, schema->name);
 
-    int error= 0;
-    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
-    Ndb *ndb= thd_ndb->ndb;
-    Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
-    const NDBTAB *ndbtab= ndbtab_g.get_table();
     if (schema->node_id != own_nodeid())
     {
-      char key[FN_REFLEN];
-      uchar *data= 0, *pack_data= 0;
-      size_t length, pack_length;
-
-      DBUG_PRINT("info", ("Detected frm change of table %s.%s",
-                          schema->db, schema->name));
       write_schema_op_to_binlog(m_thd, schema);
-      build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0);
-      /*
-        If the there is no local table shadowing the altered table and
-        it has an frm that is different than the one on disk then
-        overwrite it with the new table definition
-      */
-      if (!ndbcluster_check_if_local_table(schema->db, schema->name) &&
-          readfrm(key, &data, &length) == 0 &&
-          packfrm(data, length, &pack_data, &pack_length) == 0 &&
-          cmp_frm(ndbtab, pack_data, pack_length))
-      {
-        DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
-                  ndbtab->getFrmLength());
-        my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
-        data= NULL;
-        if ((error= unpackfrm(&data, &length,
-                              (const uchar*) ndbtab->getFrmData())) ||
-            (error= writefrm(key, data, length)))
-        {
-          sql_print_error("NDB: Failed write frm for %s.%s, error %d",
-                          schema->db, schema->name, error);
-        }
+      if (!is_local_table(schema->db, schema->name))
+      {
+        mysqld_write_frm_from_ndb(schema->db, schema->name);
       }
-      my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
-      my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
     }
     NDB_SHARE *share= get_share(schema);
     if (share)
     {
       if (opt_ndb_extra_logging > 9)
-        sql_print_information("NDB Binlog: handeling online alter/rename");
+        sql_print_information("NDB Binlog: handling online alter/rename");
 
       pthread_mutex_lock(&share->mutex);
       ndb_binlog_close_shadow_table(share);
 
-      if ((error= ndb_binlog_open_shadow_table(m_thd, share)))
+      if (ndb_binlog_open_shadow_table(m_thd, share))
+      {
         sql_print_error("NDB Binlog: Failed to re-open shadow table %s.%s",
                         schema->db, schema->name);
-      if (error)
         pthread_mutex_unlock(&share->mutex);
-    }
-    if (!error && share)
-    {
-      if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
-        share->flags|= NSF_HIDDEN_PK;
-      /*
-        Refresh share->flags to handle added BLOB columns
-      */
-      if (share->event_data->shadow_table->s->blob_fields != 0)
-        share->flags|= NSF_BLOB_FLAG;
-
-      /*
-        Start subscribing to data changes to the new table definition
-      */
-      String event_name(INJECTOR_EVENT_LEN);
-      ndb_rep_event_name(&event_name, schema->db, schema->name,
-                         get_binlog_full(share));
-      NdbEventOperation *tmp_op= share->op;
-      share->new_op= 0;
-      share->op= 0;
-
-      if (ndbcluster_create_event_ops(m_thd, share, ndbtab, event_name.c_ptr()))
-      {
-        sql_print_error("NDB Binlog:"
-                        "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
-                        event_name.c_ptr());
       }
       else
       {
-        share->new_op= share->op;
-      }
-      share->op= tmp_op;
-      pthread_mutex_unlock(&share->mutex);
+        /*
+          Start subscribing to data changes to the new table definition
+        */
+        String event_name(INJECTOR_EVENT_LEN);
+        ndb_rep_event_name(&event_name, schema->db, schema->name,
+                           get_binlog_full(share));
+        NdbEventOperation *tmp_op= share->op;
+        share->new_op= 0;
+        share->op= 0;
 
-      if (opt_ndb_extra_logging > 9)
-        sql_print_information("NDB Binlog: handeling online alter/rename done");
+        Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+        Ndb *ndb= thd_ndb->ndb;
+        Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
+        const NDBTAB *ndbtab= ndbtab_g.get_table();
+        if (ndbcluster_create_event_ops(m_thd, share, ndbtab,
+                                        event_name.c_ptr()))
+        {
+          sql_print_error("NDB Binlog:"
+                          "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
+                          event_name.c_ptr());
+        }
+        else
+        {
+          share->new_op= share->op;
+        }
+        share->op= tmp_op;
+        pthread_mutex_unlock(&share->mutex);
+
+        if (opt_ndb_extra_logging > 9)
+          sql_print_information("NDB Binlog: handling online "
+                                "alter/rename done");
+      }
     }
     if (share)
     {
@@ -2904,6 +2887,314 @@ class Ndb_schema_event_handler {
   }
 
 
+  void
+  handle_drop_table(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_drop_table");
+
+    assert(is_post_epoch()); // Always after epoch
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    if (is_local_table(schema->db, schema->name))
+    {
+      /* Tables exists as a local table, print error and leave it */
+      sql_print_error("NDB Binlog: Skipping dropping locally "
+                      "defined table '%s.%s' from binlog schema "
+                      "event '%s' from node %d. ",
+                      schema->db, schema->name, schema->query,
+                      schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[2]=
+      {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    NDB_SHARE *share= get_share(schema);
+    // invalidation already handled by binlog thread
+    if (!share || !share->op)
+    {
+      ndbapi_invalidate_table(schema->db, schema->name);
+      mysqld_close_cached_table(schema->db, schema->name);
+    }
+    if (share)
+      free_share(&share);
+
+    ndbapi_invalidate_table(schema->db, schema->name);
+    mysqld_close_cached_table(schema->db, schema->name);
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_rename_table_prepare(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_rename_table_prepare");
+
+    assert(is_post_epoch()); // Always after epoch
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    NDB_SHARE *share= get_share(schema);
+    if (share)
+    {
+      ndbcluster_prepare_rename_share(share, schema->query);
+      free_share(&share);
+    }
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_rename_table(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_rename_table");
+
+    assert(is_post_epoch()); // Always after epoch
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    if (is_local_table(schema->db, schema->name))
+    {
+      /* Tables exists as a local table, print error and leave it */
+      sql_print_error("NDB Binlog: Skipping renaming locally "
+                      "defined table '%s.%s' from binlog schema "
+                      "event '%s' from node %d. ",
+                      schema->db, schema->name, schema->query,
+                      schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[2]=
+      {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    NDB_SHARE *share= get_share(schema);
+    // invalidation already handled by binlog thread
+    if (!share || !share->op)
+    {
+      ndbapi_invalidate_table(schema->db, schema->name);
+      mysqld_close_cached_table(schema->db, schema->name);
+    }
+    if (share)
+      free_share(&share);
+
+    share= get_share(schema);
+    if (share)
+    {
+      ndbcluster_rename_share(m_thd, share);
+      free_share(&share);
+    }
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_drop_db(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_drop_db");
+
+    assert(is_post_epoch()); // Always after epoch
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    // Set NO_LOCK_SCHEMA_OP before 'check_if_local_tables_indb'
+    // until ndbcluster_find_files does not take GSL
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+
+    if (check_if_local_tables_in_db(schema->db))
+    {
+      /* Tables exists as a local table, print error and leave it */
+      sql_print_error("NDB Binlog: Skipping drop database '%s' since "
+                      "it contained local tables "
+                      "binlog schema event '%s' from node %d. ",
+                      schema->db, schema->query,
+                      schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    const int no_print_error[1]= {0};
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_truncate_table(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_truncate_table");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    NDB_SHARE *share= get_share(schema);
+    // invalidation already handled by binlog thread
+    if (!share || !share->op)
+    {
+      ndbapi_invalidate_table(schema->db, schema->name);
+      mysqld_close_cached_table(schema->db, schema->name);
+    }
+    if (share)
+      free_share(&share);
+
+    if (is_local_table(schema->db, schema->name))
+    {
+      sql_print_error("NDB Binlog: Skipping locally defined table "
+                      "'%s.%s' from binlog schema event '%s' from "
+                      "node %d. ",
+                      schema->db, schema->name, schema->query,
+                      schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
+    {
+      print_could_not_discover_error(m_thd, schema);
+    }
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_create_table(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_create_table");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    if (is_local_table(schema->db, schema->name))
+    {
+      sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
+                          "binlog schema event '%s' from node %d. ",
+                          schema->db, schema->name, schema->query,
+                          schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
+    {
+      print_could_not_discover_error(m_thd, schema);
+    }
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_create_db(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_create_db");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[1]= {0};
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_alter_db(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_alter_db");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[1]= {0};
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_grant_op(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_grant_op");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    if (opt_ndb_extra_logging > 9)
+      sql_print_information("Got dist_priv event: %s, "
+                            "flushing privileges",
+                            get_schema_type_name(schema->type));
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[1]= {0};
+    char *cmd= (char *) "flush privileges";
+    run_query(m_thd, cmd,
+              cmd + strlen(cmd),
+              no_print_error);
+
+    DBUG_VOID_RETURN;
+  }
+
+
   int
   handle_schema_op(Ndb_schema_op* schema)
   {
@@ -2937,179 +3228,75 @@ class Ndb_schema_event_handler {
       switch (schema_type)
       {
       case SOT_CLEAR_SLOCK:
-        handle_clear_slock(schema);
+        /*
+          handle slock after epoch is completed to ensure that
+          schema events get inserted in the binlog after any data
+          events
+        */
+        handle_after_epoch(schema);
         DBUG_RETURN(0);
 
       case SOT_ALTER_TABLE_COMMIT:
       case SOT_RENAME_TABLE_PREPARE:
       case SOT_ONLINE_ALTER_TABLE_PREPARE:
       case SOT_ONLINE_ALTER_TABLE_COMMIT:
-        log_after_epoch(schema);
-        unlock_after_epoch(schema);
+      case SOT_RENAME_TABLE:
+      case SOT_DROP_TABLE:
+      case SOT_DROP_DB:
+        handle_after_epoch(schema);
+        ack_after_epoch(schema);
         DBUG_RETURN(0);
 
-      default:
+      case SOT_TRUNCATE_TABLE:
+        handle_truncate_table(schema);
         break;
-      }
 
-      if (schema->node_id != own_nodeid())
-      {
-        THD* thd= m_thd; // Code compatibility
-        Thd_ndb *thd_ndb= get_thd_ndb(thd);
-        Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+      case SOT_CREATE_TABLE:
+        handle_create_table(schema);
+        break;
 
-        int post_epoch_unlock= 0;
- 
-        switch (schema_type)
-        {
-        case SOT_RENAME_TABLE:
-        case SOT_RENAME_TABLE_NEW:
-        case SOT_DROP_TABLE:
-          if (! ndbcluster_check_if_local_table(schema->db, schema->name))
-          {
-            thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-            const int no_print_error[2]=
-              {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
-            run_query(thd, schema->query,
-                      schema->query + schema->query_length,
-                      no_print_error);
-            /* binlog dropping table after any table operations */
-            log_after_epoch(schema);
-            /* acknowledge this query _after_ epoch completion */
-            post_epoch_unlock= 1;
-          }
-          else
-          {
-            /* Tables exists as a local table, print error and leave it */
-            DBUG_PRINT("info", ("Found local table '%s.%s', leaving it",
-                                schema->db, schema->name));
-            sql_print_error("NDB Binlog: Skipping %sing locally "
-                            "defined table '%s.%s' from binlog schema "
-                            "event '%s' from node %d. ",
-                            (schema_type == SOT_DROP_TABLE ? "dropp" : "renam"),
-                            schema->db, schema->name, schema->query,
-                            schema->node_id);
-            write_schema_op_to_binlog(thd, schema);
-          }
-          // Fall through
-	case SOT_TRUNCATE_TABLE:
-        {
-          NDB_SHARE *share= get_share(schema);
-          // invalidation already handled by binlog thread
-          if (!share || !share->op)
-          {
-            ndbapi_invalidate_table(schema->db, schema->name);
-            mysqld_close_cached_table(schema->db, schema->name);
-          }
-          if (share)
-            free_share(&share);
-        }
-        if (schema_type != SOT_TRUNCATE_TABLE)
-          break;
-        // fall through
-        case SOT_CREATE_TABLE:
-          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-          if (ndbcluster_check_if_local_table(schema->db, schema->name))
-          {
-            DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
-                                schema->db, schema->name));
-            sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
-                            "binlog schema event '%s' from node %d. ",
-                            schema->db, schema->name, schema->query,
-                            schema->node_id);
-          }
-          else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
-          {
-            print_could_not_discover_error(thd, schema);
-          }
-          write_schema_op_to_binlog(thd, schema);
-          break;
+      case SOT_CREATE_DB:
+        handle_create_db(schema);
+        break;
 
-        case SOT_DROP_DB:
-          /* Drop the database locally if it only contains ndb tables */
-          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-          if (!check_if_local_tables_in_db(schema->db))
-          {
-            const int no_print_error[1]= {0};
-            run_query(thd, schema->query,
-                      schema->query + schema->query_length,
-                      no_print_error);
-            /* binlog dropping database after any table operations */
-            log_after_epoch(schema);
-            /* acknowledge this query _after_ epoch completion */
-            post_epoch_unlock= 1;
-          }
-          else
-          {
-            /* Database contained local tables, leave it */
-            sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
-                            "binlog schema event '%s' from node %d. ",
-                            schema->db, schema->query,
-                            schema->node_id);
-            write_schema_op_to_binlog(thd, schema);
-          }
-          break;
+      case SOT_ALTER_DB:
+        handle_alter_db(schema);
+        break;
 
-        case SOT_CREATE_DB:
-        case SOT_ALTER_DB:
-        {
-          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-          const int no_print_error[1]= {0};
-          run_query(thd, schema->query,
-                    schema->query + schema->query_length,
-                    no_print_error);
-          write_schema_op_to_binlog(thd, schema);
-          break;
-        }
+      case SOT_CREATE_USER:
+      case SOT_DROP_USER:
+      case SOT_RENAME_USER:
+      case SOT_GRANT:
+      case SOT_REVOKE:
+        handle_grant_op(schema);
+        break;
 
-        case SOT_CREATE_USER:
-        case SOT_DROP_USER:
-        case SOT_RENAME_USER:
-        case SOT_GRANT:
-        case SOT_REVOKE:
-        {
-          if (opt_ndb_extra_logging > 9)
-            sql_print_information("Got dist_priv event: %s, "
-                                  "flushing privileges",
-                                  get_schema_type_name(schema_type));
-
-          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-          const int no_print_error[1]= {0};
-          char *cmd= (char *) "flush privileges";
-          run_query(thd, cmd,
-                    cmd + strlen(cmd),
-                    no_print_error);
-          write_schema_op_to_binlog(thd, schema);
-	  break;
-        }
-
-        case SOT_TABLESPACE:
-        case SOT_LOGFILE_GROUP:
-          write_schema_op_to_binlog(thd, schema);
+      case SOT_TABLESPACE:
+      case SOT_LOGFILE_GROUP:
+        if (schema->node_id == own_nodeid())
           break;
+        write_schema_op_to_binlog(m_thd, schema);
+        break;
 
-        case SOT_ALTER_TABLE_COMMIT:
-        case SOT_RENAME_TABLE_PREPARE:
-        case SOT_ONLINE_ALTER_TABLE_PREPARE:
-        case SOT_ONLINE_ALTER_TABLE_COMMIT:
-        case SOT_CLEAR_SLOCK:
-          // Impossible to come here, the above types has already
-          // been handled and caused the function to return 
-          abort();
-          break;
+      case SOT_RENAME_TABLE_NEW:
+        /*
+          Only very old MySQL Server connected to the cluster may
+          send this schema operation, ignore it
+        */
+        sql_print_error("NDB schema: Skipping old schema operation"
+                        "(RENAME_TABLE_NEW) on %s.%s",
+                        schema->db, schema->name);
+        DBUG_ASSERT(false);
+        break;
 
-        }
+      }
 
-        /* signal that schema operation has been handled */
-        DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
-        if (bitmap_is_set(&schema->slock, own_nodeid()))
-        {
-          if (post_epoch_unlock)
-            unlock_after_epoch(schema);
-          else
-            ack_schema_op(schema->db, schema->name,
-                          schema->id, schema->version);
-        }
+      /* signal that schema operation has been handled */
+      DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
+      if (bitmap_is_set(&schema->slock, own_nodeid()))
+      {
+        ack_schema_op(schema->db, schema->name,
+                      schema->id, schema->version);
       }
     }
     DBUG_RETURN(0);
@@ -3120,7 +3307,6 @@ class Ndb_schema_event_handler {
   handle_schema_op_post_epoch(Ndb_schema_op* schema)
   {
     DBUG_ENTER("handle_schema_op_post_epoch");
-    THD* thd = m_thd; // Code compatibility
     DBUG_PRINT("enter", ("%s.%s: query: '%s'  type: %d",
                          schema->db, schema->name,
                          schema->query, schema->type));
@@ -3140,39 +3326,20 @@ class Ndb_schema_event_handler {
         break;
 
       case SOT_DROP_DB:
-        write_schema_op_to_binlog(thd, schema);
+        handle_drop_db(schema);
         break;
 
       case SOT_DROP_TABLE:
-        write_schema_op_to_binlog(thd, schema);
-        ndbapi_invalidate_table(schema->db, schema->name);
-        mysqld_close_cached_table(schema->db, schema->name);
+        handle_drop_table(schema);
         break;
 
-      case SOT_RENAME_TABLE:
-      {
-        write_schema_op_to_binlog(thd, schema);
-        NDB_SHARE *share= get_share(schema);
-        if (share)
-        {
-          ndbcluster_rename_share(thd, share);
-          free_share(&share);
-        }
+      case SOT_RENAME_TABLE_PREPARE:
+        handle_rename_table_prepare(schema);
         break;
-      }
 
-      case SOT_RENAME_TABLE_PREPARE:
-      {
-        if (schema->node_id == own_nodeid())
-          break;
-        NDB_SHARE *share= get_share(schema);
-        if (share)
-        {
-          ndbcluster_prepare_rename_share(share, schema->query);
-          free_share(&share);
-        }
+      case SOT_RENAME_TABLE:
+        handle_rename_table(schema);
         break;
-      }
 
       case SOT_ALTER_TABLE_COMMIT:
         handle_offline_alter_table_commit(schema);
@@ -3186,46 +3353,6 @@ class Ndb_schema_event_handler {
         handle_online_alter_table_commit(schema);
         break;
 
-      case SOT_RENAME_TABLE_NEW:
-      {
-        write_schema_op_to_binlog(thd, schema);
-        NDB_SHARE *share= get_share(schema);
-        if (ndb_binlog_running && (!share || !share->op))
-        {
-          /*
-            we need to free any share here as command below
-            may need to call handle_trailing_share
-          */
-          if (share)
-          {
-            /* ndb_share reference temporary free */
-            DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
-                                     share->key, share->use_count));
-            free_share(&share);
-            share= 0;
-          }
-
-          if (ndbcluster_check_if_local_table(schema->db, schema->name))
-          {
-            DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
-                                schema->db, schema->name));
-            sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
-                            "binlog schema event '%s' from node %d. ",
-                            schema->db, schema->name, schema->query,
-                            schema->node_id);
-          }
-          else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
-          {
-            print_could_not_discover_error(thd, schema);
-          }
-        }
-        if (share)
-        {
-          free_share(&share);
-        }
-        break;
-      }
-
       default:
         DBUG_ASSERT(FALSE);
       }
@@ -3234,39 +3361,6 @@ class Ndb_schema_event_handler {
     DBUG_VOID_RETURN;
   }
 
-
-  /*
-    process any operations that should be done after
-    the epoch is complete
-  */
-  void
-  handle_schema_log_post_epoch(List<Ndb_schema_op> *log_list)
-  {
-    DBUG_ENTER("handle_schema_log_post_epoch");
-
-    Ndb_schema_op* schema;
-    while ((schema= log_list->pop()))
-    {
-      handle_schema_op_post_epoch(schema);
-    }
-    DBUG_VOID_RETURN;
-  }
-
-
-  void
-  handle_schema_unlock_post_epoch(List<Ndb_schema_op> *unlock_list)
-  {
-    DBUG_ENTER("handle_schema_unlock_post_epoch");
-
-    Ndb_schema_op *schema;
-    while ((schema= unlock_list->pop()))
-    {
-      ack_schema_op(schema->db, schema->name,
-                    schema->id, schema->version);
-    }
-    DBUG_VOID_RETURN;
-  }
-
   THD* m_thd;
   MEM_ROOT* m_mem_root;
   uint m_own_nodeid;
@@ -3274,8 +3368,8 @@ class Ndb_schema_event_handler {
 
   bool is_post_epoch(void) const { return m_post_epoch; };
 
-  List<Ndb_schema_op> m_post_epoch_log_list;
-  List<Ndb_schema_op> m_post_epoch_unlock_list;
+  List<Ndb_schema_op> m_post_epoch_handle_list;
+  List<Ndb_schema_op> m_post_epoch_ack_list;
 
 public:
   Ndb_schema_event_handler(); // Not implemented
@@ -3291,8 +3385,8 @@ public:
   ~Ndb_schema_event_handler()
   {
     // There should be no work left todo...
-    DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
-    DBUG_ASSERT(m_post_epoch_unlock_list.elements == 0);
+    DBUG_ASSERT(m_post_epoch_handle_list.elements == 0);
+    DBUG_ASSERT(m_post_epoch_ack_list.elements == 0);
   }
 
 
@@ -3436,23 +3530,39 @@ public:
 
   void post_epoch()
   {
-    if (m_post_epoch_log_list.elements > 0)
+    if (unlikely(m_post_epoch_handle_list.elements > 0))
     {
       // Set the flag used to check that functions are called at correct time
       m_post_epoch= true;
 
-      handle_schema_log_post_epoch(&m_post_epoch_log_list);
-      // NOTE post_epoch_unlock_list may not be handled!
-      handle_schema_unlock_post_epoch(&m_post_epoch_unlock_list);
+      /*
+       process any operations that should be done after
+       the epoch is complete
+      */
+      Ndb_schema_op* schema;
+      while ((schema= m_post_epoch_handle_list.pop()))
+      {
+        handle_schema_op_post_epoch(schema);
+      }
+
+      /*
+       process any operations that should be unlocked/acked after
+       the epoch is complete
+      */
+      while ((schema= m_post_epoch_ack_list.pop()))
+      {
+        ack_schema_op(schema->db, schema->name,
+                      schema->id, schema->version);
+      }
     }
     // There should be no work left todo...
-    DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
-    DBUG_ASSERT(m_post_epoch_unlock_list.elements == 0);
+    DBUG_ASSERT(m_post_epoch_handle_list.elements == 0);
+    DBUG_ASSERT(m_post_epoch_ack_list.elements == 0);
   }
 };
 
 /*********************************************************************
-  Internal helper functions for handeling of the cluster replication tables
+  Internal helper functions for handling of the cluster replication tables
   - ndb_binlog_index
   - ndb_apply_status
 *********************************************************************/
@@ -5081,8 +5191,9 @@ ndbcluster_check_if_local_table(const ch
   char ndb_file[FN_REFLEN + 1];
 
   DBUG_ENTER("ndbcluster_check_if_local_table");
-  build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
-  build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
+  build_table_filename(key, sizeof(key)-1, dbname, tabname, reg_ext, 0);
+  build_table_filename(ndb_file, sizeof(ndb_file)-1,
+                       dbname, tabname, ha_ndb_ext, 0);
   /* Check that any defined table is an ndb table */
   DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
   if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
@@ -5107,7 +5218,6 @@ int ndbcluster_create_binlog_setup(THD *
                                    const char *table_name,
                                    TABLE * table)
 {
-  int do_event_op= ndb_binlog_running;
   DBUG_ENTER("ndbcluster_create_binlog_setup");
   DBUG_PRINT("enter",("key: %s  key_len: %d  %s.%s",
                       key, key_len, db, table_name));
@@ -5131,25 +5241,7 @@ int ndbcluster_create_binlog_setup(THD *
     DBUG_RETURN(0); // replication already setup, or should not
   }
 
-  if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name))
-  {
-    // The distributed privilege tables are distributed by writing
-    // the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need
-    // to listen to events from this table
-    DBUG_PRINT("info", ("Skipping binlogging of table %s/%s", db, table_name));
-    do_event_op= 0;
-  }
-
-  if (!ndb_schema_share &&
-      strcmp(share->db, NDB_REP_DB) == 0 &&
-      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
-    do_event_op= 1;
-  else if (!ndb_apply_status_share &&
-           strcmp(share->db, NDB_REP_DB) == 0 &&
-           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
-    do_event_op= 1;
-
-  if (!do_event_op)
+  if (!share->need_events(ndb_binlog_running))
   {
     set_binlog_nologging(share);
     pthread_mutex_unlock(&share->mutex);
@@ -5256,11 +5348,8 @@ ndbcluster_create_event(THD *thd, Ndb *n
                       ndbtab->getName(), ndbtab->getObjectVersion(),
                       event_name, share ? share->key : "(nil)"));
   DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
-  if (!share)
-  {
-    DBUG_PRINT("info", ("share == NULL"));
-    DBUG_RETURN(0);
-  }
+  DBUG_ASSERT(share);
+
   if (get_binlog_nologging(share))
   {
     if (opt_ndb_extra_logging && ndb_binlog_running)
@@ -5440,8 +5529,7 @@ ndbcluster_create_event_ops(THD *thd, ND
   DBUG_ENTER("ndbcluster_create_event_ops");
   DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
   DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
-
-  DBUG_ASSERT(share != 0);
+  DBUG_ASSERT(share);
 
   if (get_binlog_nologging(share))
   {
@@ -5455,7 +5543,6 @@ ndbcluster_create_event_ops(THD *thd, ND
   assert(!Ndb_dist_priv_util::is_distributed_priv_table(share->db,
                                                         share->table_name));
 
-  Ndb_event_data *event_data= share->event_data;
   int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
 #ifdef HAVE_NDB_BINLOG
   uint len= (int)strlen(share->table_name);
@@ -5480,6 +5567,10 @@ ndbcluster_create_event_ops(THD *thd, ND
     DBUG_RETURN(0);
   }
 
+  // Check that the share agrees
+  DBUG_ASSERT(share->need_events(ndb_binlog_running));
+
+  Ndb_event_data *event_data= share->event_data;
   if (share->op)
   {
     event_data= (Ndb_event_data *) share->op->getCustomData();

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-11-16 09:29:49 +0000
+++ b/sql/log_event.cc	2011-11-21 12:53:01 +0000
@@ -9036,7 +9036,7 @@ int Rows_log_event::do_apply_event(Relay
       Note that unlike the other thd options set here, this one
       comes from a global, and not from the incoming event.
     */
-    if (slave_allow_batching)
+    if (opt_slave_allow_batching)
       thd->variables.option_bits|= OPTION_ALLOW_BATCH;
     else
       thd->variables.option_bits&= ~OPTION_ALLOW_BATCH;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2011-11-16 09:29:49 +0000
+++ b/sql/mysqld.h	2011-11-21 12:53:01 +0000
@@ -179,7 +179,9 @@ extern ulong slow_launch_threads, slow_l
 extern ulong table_cache_size, table_def_size;
 extern MYSQL_PLUGIN_IMPORT ulong max_connections;
 extern ulong max_connect_errors, connect_timeout;
-extern my_bool slave_allow_batching;
+#ifndef MCP_WL3733
+extern my_bool opt_slave_allow_batching;
+#endif
 extern my_bool allow_slave_start;
 extern LEX_CSTRING reason_slave_blocked;
 extern ulong slave_trans_retries;

=== modified file 'sql/ndb_event_data.cc'
--- a/sql/ndb_event_data.cc	2011-10-29 09:02:21 +0000
+++ b/sql/ndb_event_data.cc	2011-11-10 08:21:36 +0000
@@ -42,3 +42,13 @@ Ndb_event_data::~Ndb_event_data()
   */
   my_free(ndb_value[0]);
 }
+
+
+void Ndb_event_data::print(const char* where, FILE* file) const
+{
+  fprintf(file,
+          "%s shadow_table: %p '%s.%s'\n",
+          where,
+          shadow_table, shadow_table->s->db.str,
+          shadow_table->s->table_name.str);
+}

=== modified file 'sql/ndb_event_data.h'
--- a/sql/ndb_event_data.h	2011-10-29 09:02:21 +0000
+++ b/sql/ndb_event_data.h	2011-11-10 08:21:36 +0000
@@ -34,6 +34,8 @@ public:
   struct TABLE *shadow_table;
   struct NDB_SHARE *share;
   union NdbValue *ndb_value[2];
+
+  void print(const char* where, FILE* file) const;
 };
 
 #endif

=== modified file 'sql/ndb_schema_dist.cc'
--- a/sql/ndb_schema_dist.cc	2011-11-02 09:28:48 +0000
+++ b/sql/ndb_schema_dist.cc	2011-11-10 08:29:32 +0000
@@ -26,8 +26,6 @@ get_schema_type_name(uint type)
     return "DROP_TABLE";
   case SOT_CREATE_TABLE:
     return "CREATE_TABLE";
-  case SOT_RENAME_TABLE_NEW:
-    return "RENAME_TABLE_NEW";
   case SOT_ALTER_TABLE_COMMIT:
     return "ALTER_TABLE_COMMIT";
   case SOT_DROP_DB:

=== modified file 'sql/ndb_schema_dist.h'
--- a/sql/ndb_schema_dist.h	2011-11-02 09:28:48 +0000
+++ b/sql/ndb_schema_dist.h	2011-11-10 08:29:32 +0000
@@ -40,7 +40,7 @@ enum SCHEMA_OP_TYPE
 {
   SOT_DROP_TABLE= 0,
   SOT_CREATE_TABLE= 1,
-  SOT_RENAME_TABLE_NEW= 2,
+  SOT_RENAME_TABLE_NEW= 2, // Unused, but still reserved
   SOT_ALTER_TABLE_COMMIT= 3,
   SOT_DROP_DB= 4,
   SOT_CREATE_DB= 5,

=== modified file 'sql/ndb_share.cc'
--- a/sql/ndb_share.cc	2011-11-07 19:00:35 +0000
+++ b/sql/ndb_share.cc	2011-11-10 08:21:36 +0000
@@ -17,6 +17,10 @@
 
 #include "ndb_share.h"
 #include "ndb_event_data.h"
+#include "ndb_dist_priv_util.h"
+#include "ha_ndbcluster_tables.h"
+
+#include <ndbapi/NdbEventOperation.hpp>
 
 #include <my_sys.h>
 
@@ -37,12 +41,109 @@ NDB_SHARE::destroy(NDB_SHARE* share)
   }
 #endif
   share->new_op= 0;
-  if (share->event_data)
+  Ndb_event_data* event_data = share->event_data;
+  if (event_data)
   {
-    delete share->event_data;
-    share->event_data= 0;
+    delete event_data;
+    event_data= 0;
   }
   free_root(&share->mem_root, MYF(0));
   my_free(share);
 }
 
+
+bool
+NDB_SHARE::need_events(bool default_on) const
+{
+  DBUG_ENTER("NDB_SHARE::need_events");
+  DBUG_PRINT("enter", ("db: %s, table_name: %s",
+                        db, table_name));
+
+  if (default_on)
+  {
+    // Events are on by default, check if it should be turned off
+
+    if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name))
+    {
+      /*
+        The distributed privilege tables are distributed by writing
+        the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need
+        to listen to events from those table
+      */
+      DBUG_PRINT("exit", ("no events for dist priv table"));
+      DBUG_RETURN(false);
+    }
+
+    DBUG_PRINT("exit", ("need events(the default for this mysqld)"));
+    DBUG_RETURN(true);
+  }
+
+  // Events are off by default, check if it should be turned on
+  if (strcmp(db, NDB_REP_DB) == 0)
+  {
+    // The table is in "mysql" database
+    if (strcmp(table_name, NDB_SCHEMA_TABLE) == 0)
+    {
+      DBUG_PRINT("exit", ("need events for " NDB_SCHEMA_TABLE));
+      DBUG_RETURN(true);
+    }
+
+    if (strcmp(table_name, NDB_APPLY_TABLE) == 0)
+    {
+      DBUG_PRINT("exit", ("need events for " NDB_APPLY_TABLE));
+      DBUG_RETURN(true);
+    }
+  }
+
+  DBUG_PRINT("exit", ("no events(the default for this mysqld)"));
+  DBUG_RETURN(false);
+}
+
+
+Ndb_event_data* NDB_SHARE::get_event_data_ptr() const
+{
+  if (event_data)
+  {
+    // The event_data pointer is only used before
+    // creating the NdbEventoperation -> check no op yet
+    assert(!op);
+
+    return event_data;
+  }
+
+  if (op)
+  {
+    // The event_data should now be empty since it's been moved to
+    // op's custom data
+    assert(!event_data);
+
+    // Check that op has custom data
+    assert(op->getCustomData());
+
+    return (Ndb_event_data*)op->getCustomData();
+  }
+
+  return NULL;
+}
+
+
+void NDB_SHARE::print(const char* where, FILE* file) const
+{
+  fprintf(file, "%s %s.%s: use_count: %u\n",
+          where, db, table_name, use_count);
+  fprintf(file, "  - key: '%s', key_length: %d\n", key, key_length);
+  fprintf(file, "  - commit_count: %llu\n", commit_count);
+  if (new_key)
+    fprintf(file, "  - new_key: %p, '%s'\n",
+            new_key, new_key);
+  if (event_data)
+    fprintf(file, "  - event_data: %p\n", event_data);
+  if (op)
+    fprintf(file, "  - op: %p\n", op);
+  if (new_op)
+    fprintf(file, "  - new_op: %p\n", new_op);
+
+  Ndb_event_data *event_data_ptr= get_event_data_ptr();
+  if (event_data_ptr)
+    event_data_ptr->print("  -", file);
+}

=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h	2011-11-05 11:35:57 +0000
+++ b/sql/ndb_share.h	2011-11-10 08:21:36 +0000
@@ -188,7 +188,20 @@ struct NDB_SHARE {
   MY_BITMAP *subscriber_bitmap;
   class NdbEventOperation *new_op;
 
+  static NDB_SHARE* create(const char* key, size_t key_length,
+                         struct TABLE* table, const char* db_name,
+                         const char* table_name);
   static void destroy(NDB_SHARE* share);
+
+  class Ndb_event_data* get_event_data_ptr() const;
+
+  void print(const char* where, FILE* file = stderr) const;
+
+  /*
+    Returns true if this share need to subscribe to
+    events from the table.
+  */
+  bool need_events(bool default_on) const;
 };
 
 

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2011-11-16 09:29:49 +0000
+++ b/sql/sys_vars.cc	2011-11-21 12:53:01 +0000
@@ -2819,40 +2819,6 @@ static Sys_var_ulong Sys_profiling_histo
        VALID_RANGE(0, 100), DEFAULT(15), BLOCK_SIZE(1));
 #endif
 
-#ifndef MCP_WL3733
-#ifndef EMBEDDED_LIBRARY
-my_bool slave_allow_batching;
-
-/*
-  Take Active MI lock while checking/updating slave_allow_batching
-  to give atomicity w.r.t. slave state changes
-*/
-static PolyLock_mutex PLock_active_mi(&LOCK_active_mi);
-
-static bool slave_allow_batching_check(sys_var *self, THD *thd, set_var *var)
-{
-  /* Only allow a change if the slave SQL thread is currently stopped */
-  bool slave_sql_running = active_mi->rli->slave_running;
-
-  if (slave_sql_running)
-  {
-    my_error(ER_SLAVE_MUST_STOP, MYF(0));
-    return true;
-  }
-
-  return false;
-}
-
-static Sys_var_mybool Sys_slave_allow_batching(
-       "slave_allow_batching", "Allow slave to batch requests",
-       GLOBAL_VAR(slave_allow_batching),
-       CMD_LINE(OPT_ARG), DEFAULT(FALSE),
-       &PLock_active_mi,
-       NOT_IN_BINLOG,
-       ON_CHECK(slave_allow_batching_check));
-#endif
-#endif
-
 static Sys_var_harows Sys_select_limit(
        "sql_select_limit",
        "The maximum number of rows to return from SELECT statements",
@@ -3384,6 +3350,14 @@ static Sys_var_mybool Sys_relay_log_reco
        "processed",
        GLOBAL_VAR(relay_log_recovery), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
 
+#ifndef MCP_WL3733
+my_bool opt_slave_allow_batching;
+static Sys_var_mybool Sys_slave_allow_batching(
+       "slave_allow_batching", "Allow slave to batch requests",
+       GLOBAL_VAR(opt_slave_allow_batching),
+       CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+#endif
+
 static Sys_var_charptr Sys_slave_load_tmpdir(
        "slave_load_tmpdir", "The location where the slave should put "
        "its temporary files when replicating a LOAD DATA INFILE command",

=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h	2011-11-16 08:17:17 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h	2011-11-18 06:47:23 +0000
@@ -194,6 +194,7 @@
 #define NDBMT_BLOCK_MASK ((1 << NDBMT_BLOCK_BITS) - 1)
 #define NDBMT_BLOCK_INSTANCE_BITS 7
 
+#define NDB_DEFAULT_LOG_PARTS 4
 #define NDB_MAX_LOG_PARTS     4
 #define MAX_NDBMT_LQH_WORKERS NDB_MAX_LOG_PARTS
 #define MAX_NDBMT_LQH_THREADS NDB_MAX_LOG_PARTS

=== modified file 'storage/ndb/src/common/debugger/EventLogger.cpp'
--- a/storage/ndb/src/common/debugger/EventLogger.cpp	2011-10-21 12:36:44 +0000
+++ b/storage/ndb/src/common/debugger/EventLogger.cpp	2011-11-18 06:47:23 +0000
@@ -527,23 +527,47 @@ void getTextTransReportCounters(QQQQ) {
   // -------------------------------------------------------------------  
   // Report information about transaction activity once per 10 seconds.
   // ------------------------------------------------------------------- 
-  BaseString::snprintf(m_text, m_text_len, 
-		       "Trans. Count = %u, Commit Count = %u, "
-		       "Read Count = %u, Simple Read Count = %u, "
-		       "Write Count = %u, AttrInfo Count = %u, "
-		       "Concurrent Operations = %u, Abort Count = %u"
-		       " Scans = %u Range scans = %u", 
-		       theData[1], 
-		       theData[2], 
-		       theData[3], 
-		       theData[4],
-		       theData[5], 
-		       theData[6], 
-		       theData[7], 
-		       theData[8],
-		       theData[9],
-		       theData[10]);
+  if (len <= 11)
+  {
+    BaseString::snprintf(m_text, m_text_len,
+                         "Trans. Count = %u, Commit Count = %u, "
+                         "Read Count = %u, Simple Read Count = %u, "
+                         "Write Count = %u, AttrInfo Count = %u, "
+                         "Concurrent Operations = %u, Abort Count = %u"
+                         " Scans = %u Range scans = %u",
+                         theData[1],
+                         theData[2],
+                         theData[3],
+                         theData[4],
+                         theData[5],
+                         theData[6],
+                         theData[7],
+                         theData[8],
+                         theData[9],
+                         theData[10]);
+  }
+  else
+  {
+    BaseString::snprintf(m_text, m_text_len,
+                         "Trans. Count = %u, Commit Count = %u, "
+                         "Read Count = %u, Simple Read Count = %u, "
+                         "Write Count = %u, AttrInfo Count = %u, "
+                         "Concurrent Operations = %u, Abort Count = %u"
+                         " Scans = %u Range scans = %u, Local Read Count = %u",
+                         theData[1],
+                         theData[2],
+                         theData[3],
+                         theData[4],
+                         theData[5],
+                         theData[6],
+                         theData[7],
+                         theData[8],
+                         theData[9],
+                         theData[10],
+                         theData[11]);
+  }
 }
+
 void getTextOperationReportCounters(QQQQ) {
   BaseString::snprintf(m_text, m_text_len,
 		       "Operations=%u",

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2011-11-16 09:29:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2011-11-21 12:53:01 +0000
@@ -207,7 +207,8 @@ Dbdict::execDUMP_STATE_ORD(Signal* signa
     const Uint32 tab = signal->theData[1];
     const Uint32 ver = signal->theData[2];
     TableRecordPtr tabRecPtr;
-    c_tableRecordPool.getPtr(tabRecPtr, tab);
+    bool ok = find_object(tabRecPtr, tab);
+    ndbrequire(ok);
     DropTableReq * req = (DropTableReq*)signal->getDataPtr();
     req->senderData = 1225;
     req->senderRef = numberToRef(1,1);
@@ -252,8 +253,8 @@ Dbdict::execDUMP_STATE_ORD(Signal* signa
   {
     RSS_AP_SNAPSHOT_SAVE(c_rope_pool);
     RSS_AP_SNAPSHOT_SAVE(c_attributeRecordPool);
-    RSS_AP_SNAPSHOT_SAVE(c_tableRecordPool);
-    RSS_AP_SNAPSHOT_SAVE(c_triggerRecordPool);
+    RSS_AP_SNAPSHOT_SAVE(c_tableRecordPool_);
+    RSS_AP_SNAPSHOT_SAVE(c_triggerRecordPool_);
     RSS_AP_SNAPSHOT_SAVE(c_obj_pool);
     RSS_AP_SNAPSHOT_SAVE(c_hash_map_pool);
     RSS_AP_SNAPSHOT_SAVE(g_hash_map);
@@ -263,8 +264,8 @@ Dbdict::execDUMP_STATE_ORD(Signal* signa
   {
     RSS_AP_SNAPSHOT_CHECK(c_rope_pool);
     RSS_AP_SNAPSHOT_CHECK(c_attributeRecordPool);
-    RSS_AP_SNAPSHOT_CHECK(c_tableRecordPool);
-    RSS_AP_SNAPSHOT_CHECK(c_triggerRecordPool);
+    RSS_AP_SNAPSHOT_CHECK(c_tableRecordPool_);
+    RSS_AP_SNAPSHOT_CHECK(c_triggerRecordPool_);
     RSS_AP_SNAPSHOT_CHECK(c_obj_pool);
     RSS_AP_SNAPSHOT_CHECK(c_hash_map_pool);
     RSS_AP_SNAPSHOT_CHECK(g_hash_map);
@@ -296,16 +297,16 @@ void Dbdict::execDBINFO_SCANREQ(Signal *
         c_attributeRecordPool.getUsedHi(),
         { CFG_DB_NO_ATTRIBUTES,0,0,0 }},
       { "Table Record",
-        c_tableRecordPool.getUsed(),
+        c_tableRecordPool_.getUsed(),
         c_noOfMetaTables,
-        c_tableRecordPool.getEntrySize(),
-        c_tableRecordPool.getUsedHi(),
+        c_tableRecordPool_.getEntrySize(),
+        c_tableRecordPool_.getUsedHi(),
         { CFG_DB_NO_TABLES,0,0,0 }},
       { "Trigger Record",
-        c_triggerRecordPool.getUsed(),
-        c_triggerRecordPool.getSize(),
-        c_triggerRecordPool.getEntrySize(),
-        c_triggerRecordPool.getUsedHi(),
+        c_triggerRecordPool_.getUsed(),
+        c_triggerRecordPool_.getSize(),
+        c_triggerRecordPool_.getEntrySize(),
+        c_triggerRecordPool_.getUsedHi(),
         { CFG_DB_NO_TRIGGERS,0,0,0 }},
       { "FS Connect Record",
         c_fsConnectRecordPool.getUsed(),
@@ -512,8 +513,8 @@ void Dbdict::packTableIntoPages(Signal* 
   case DictTabInfo::OrderedIndex:{
     jam();
     TableRecordPtr tablePtr;
-    c_tableRecordPool.getPtr(tablePtr, tableId);
-    if (tablePtr.p->m_obj_ptr_i == RNIL)
+    bool ok = find_object(tablePtr, tableId);
+    if (!ok)
     {
       jam();
       sendGET_TABINFOREF(signal, &req_copy,
@@ -698,7 +699,15 @@ Dbdict::packTableIntoPages(SimplePropert
   {
     jam();
     TableRecordPtr primTab;
-    c_tableRecordPool.getPtr(primTab, tablePtr.p->primaryTableId);
+    bool ok = find_object(primTab, tablePtr.p->primaryTableId);
+    if (!ok)
+    {
+      jam();
+      ndbrequire(signal != NULL);
+      Uint32 err = CreateFragmentationRef::InvalidPrimaryTable;
+      signal->theData[0] = err;
+      return;
+    }
     ConstRope r2(c_rope_pool, primTab.p->tableName);
     r2.copy(tableName);
     w.add(DictTabInfo::PrimaryTable, tableName);
@@ -893,8 +902,6 @@ Dbdict::execCREATE_FRAGMENTATION_REQ(Sig
     return;
   }
 
-  TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, req->primaryTableId);
   XSchemaFile * xsf = &c_schemaFile[SchemaRecord::NEW_SCHEMA_FILE];
   SchemaFile::TableEntry * te = getTableEntry(xsf, req->primaryTableId);
   if (te->m_tableState != SchemaFile::SF_CREATE)
@@ -911,8 +918,9 @@ Dbdict::execCREATE_FRAGMENTATION_REQ(Sig
   }
 
   DictObjectPtr obj_ptr;
-  c_obj_pool.getPtr(obj_ptr, tablePtr.p->m_obj_ptr_i);
-
+  TableRecordPtr tablePtr;
+  bool ok = find_object(obj_ptr, tablePtr, req->primaryTableId);
+  ndbrequire(ok);
   SchemaOpPtr op_ptr;
   findDictObjectOp(op_ptr, obj_ptr);
   ndbrequire(!op_ptr.isNull());
@@ -2233,8 +2241,6 @@ void Dbdict::initRecords()
 {
   initNodeRecords();
   initPageRecords();
-  initTableRecords();
-  initTriggerRecords();
 }//Dbdict::initRecords()
 
 void Dbdict::initSendSchemaRecord()
@@ -2315,27 +2321,12 @@ void Dbdict::initPageRecords()
   c_schemaRecord.oldSchemaPage = NDB_SF_MAX_PAGES;
 }//Dbdict::initPageRecords()
 
-void Dbdict::initTableRecords()
-{
-  TableRecordPtr tablePtr;
-  while (1) {
-    jam();
-    refresh_watch_dog();
-    c_tableRecordPool.seize(tablePtr);
-    if (tablePtr.i == RNIL) {
-      jam();
-      break;
-    }//if
-    initialiseTableRecord(tablePtr);
-  }//while
-}//Dbdict::initTableRecords()
-
-void Dbdict::initialiseTableRecord(TableRecordPtr tablePtr)
+void Dbdict::initialiseTableRecord(TableRecordPtr tablePtr, Uint32 tableId)
 {
   new (tablePtr.p) TableRecord();
   tablePtr.p->filePtr[0] = RNIL;
   tablePtr.p->filePtr[1] = RNIL;
-  tablePtr.p->tableId = tablePtr.i;
+  tablePtr.p->tableId = tableId;
   tablePtr.p->tableVersion = (Uint32)-1;
   tablePtr.p->fragmentType = DictTabInfo::AllNodesSmallTable;
   tablePtr.p->gciTableCreated = 0;
@@ -2375,29 +2366,15 @@ void Dbdict::initialiseTableRecord(Table
   tablePtr.p->m_obj_ptr_i = RNIL;
 }//Dbdict::initialiseTableRecord()
 
-void Dbdict::initTriggerRecords()
-{
-  TriggerRecordPtr triggerPtr;
-  while (1) {
-    jam();
-    refresh_watch_dog();
-    c_triggerRecordPool.seize(triggerPtr);
-    if (triggerPtr.i == RNIL) {
-      jam();
-      break;
-    }//if
-    initialiseTriggerRecord(triggerPtr);
-  }//while
-}
-
-void Dbdict::initialiseTriggerRecord(TriggerRecordPtr triggerPtr)
+void Dbdict::initialiseTriggerRecord(TriggerRecordPtr triggerPtr, Uint32 triggerId)
 {
   new (triggerPtr.p) TriggerRecord();
   triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
-  triggerPtr.p->triggerId = RNIL;
+  triggerPtr.p->triggerId = triggerId;
   triggerPtr.p->tableId = RNIL;
   triggerPtr.p->attributeMask.clear();
   triggerPtr.p->indexId = RNIL;
+  triggerPtr.p->m_obj_ptr_i = RNIL;
 }
 
 Uint32 Dbdict::getFsConnRecord()
@@ -2438,40 +2415,81 @@ Uint32 Dbdict::getFreeObjId(bool both)
   return RNIL;
 }
 
-Uint32 Dbdict::getFreeTableRecord()
+bool Dbdict::seizeTableRecord(TableRecordPtr& tablePtr, Uint32& schemaFileId)
 {
-  Uint32 i = getFreeObjId();
-  if (i == RNIL) {
+  if (schemaFileId == RNIL)
+  {
     jam();
-    return RNIL;
+    schemaFileId = getFreeObjId();
+  }
+  if (schemaFileId == RNIL)
+  {
+    jam();
+    return false;
   }
-  if (i >= c_noOfMetaTables) {
+  if (schemaFileId >= c_noOfMetaTables)
+  {
     jam();
-    return RNIL;
+    return false;
   }
 
-  TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, i);
-  initialiseTableRecord(tablePtr);
-  return i;
+  c_tableRecordPool_.seize(tablePtr);
+  if (tablePtr.isNull())
+  {
+    jam();
+    return false;
+  }
+  initialiseTableRecord(tablePtr, schemaFileId);
+  return true;
 }
 
 Uint32 Dbdict::getFreeTriggerRecord()
 {
-  const Uint32 size = c_triggerRecordPool.getSize();
+  const Uint32 size = c_triggerRecordPool_.getSize();
   TriggerRecordPtr triggerPtr;
-  for (triggerPtr.i = 0; triggerPtr.i < size; triggerPtr.i++) {
+  for (Uint32 id = 0; id < size; id++) {
     jam();
-    c_triggerRecordPool.getPtr(triggerPtr);
-    if (triggerPtr.p->triggerState == TriggerRecord::TS_NOT_DEFINED) {
+    bool ok = find_object(triggerPtr, id);
+    if (!ok)
+    {
       jam();
-      initialiseTriggerRecord(triggerPtr);
-      return triggerPtr.i;
+      return id;
     }
   }
   return RNIL;
 }
 
+bool Dbdict::seizeTriggerRecord(TriggerRecordPtr& triggerPtr, Uint32 triggerId)
+{
+  if (triggerId == RNIL)
+  {
+    triggerId = getFreeTriggerRecord();
+  }
+  else
+  {
+    TriggerRecordPtr ptr;
+    bool ok =  find_object(ptr, triggerId);
+    if (ok)
+    { // triggerId already in use
+      jam();
+      return false;
+    }
+  }
+  if (triggerId == RNIL)
+  {
+    jam();
+    return false;
+  }
+  c_triggerRecordPool_.seize(triggerPtr);
+  if (triggerPtr.isNull())
+  {
+    jam();
+    return false;
+  }
+  initialiseTriggerRecord(triggerPtr, triggerId);
+  return true;
+}
+
 Uint32
 Dbdict::check_read_obj(Uint32 objId, Uint32 transId)
 {
@@ -2654,9 +2672,9 @@ void Dbdict::execREAD_CONFIG_REQ(Signal*
   c_nodes.setSize(MAX_NDB_NODES);
   c_pageRecordArray.setSize(ZNUMBER_OF_PAGES);
   c_schemaPageRecordArray.setSize(2 * NDB_SF_MAX_PAGES);
-  c_tableRecordPool.setSize(c_noOfMetaTables);
+  c_tableRecordPool_.setSize(c_noOfMetaTables);
   g_key_descriptor_pool.setSize(c_noOfMetaTables);
-  c_triggerRecordPool.setSize(c_maxNoOfTriggers);
+  c_triggerRecordPool_.setSize(c_maxNoOfTriggers);
 
   Record_info ri;
   OpSectionBuffer::createRecordInfo(ri, RT_DBDICT_OP_SECTION_BUFFER);
@@ -2933,9 +2951,9 @@ Dbdict::initSchemaFile_conf(Signal* sign
 }
 
 void
-Dbdict::activateIndexes(Signal* signal, Uint32 i)
+Dbdict::activateIndexes(Signal* signal, Uint32 id)
 {
-  if (i == 0)
+  if (id == 0)
     D("activateIndexes start");
 
   Uint32 requestFlags = 0;
@@ -2961,12 +2979,16 @@ Dbdict::activateIndexes(Signal* signal, 
   }
 
   TableRecordPtr indexPtr;
-  indexPtr.i = i;
-  for (; indexPtr.i < c_noOfMetaTables; indexPtr.i++)
+  for (; id < c_noOfMetaTables; id++)
   {
-    c_tableRecordPool.getPtr(indexPtr);
+    bool ok = find_object(indexPtr, id);
+    if (!ok)
+    {
+      jam();
+      continue;
+    }
 
-    if (check_read_obj(indexPtr.i))
+    if (check_read_obj(id))
     {
       continue;
     }
@@ -2984,7 +3006,7 @@ Dbdict::activateIndexes(Signal* signal, 
     }
 
     // wl3600_todo use simple schema trans when implemented
-    D("activateIndexes i=" << indexPtr.i);
+    D("activateIndexes id=" << id);
 
     TxHandlePtr tx_ptr;
     seizeTxHandle(tx_ptr);
@@ -3026,8 +3048,10 @@ Dbdict::activateIndex_fromBeginTrans(Sig
   ndbrequire(!tx_ptr.isNull());
 
   TableRecordPtr indexPtr;
-  indexPtr.i = tx_ptr.p->m_userData;
-  c_tableRecordPool.getPtr(indexPtr);
+  c_tableRecordPool_.getPtr(indexPtr, tx_ptr.p->m_userData);
+  ndbrequire(!indexPtr.isNull());
+  DictObjectPtr index_obj_ptr;
+  c_obj_pool.getPtr(index_obj_ptr, indexPtr.p->m_obj_ptr_i);
 
   AlterIndxReq* req = (AlterIndxReq*)signal->getDataPtrSend();
 
@@ -3040,7 +3064,7 @@ Dbdict::activateIndex_fromBeginTrans(Sig
   req->transId = tx_ptr.p->m_transId;
   req->transKey = tx_ptr.p->m_transKey;
   req->requestInfo = requestInfo;
-  req->indexId = indexPtr.i;
+  req->indexId = index_obj_ptr.p->m_id;
   req->indexVersion = indexPtr.p->tableVersion;
 
   Callback c = {
@@ -3091,13 +3115,13 @@ Dbdict::activateIndex_fromEndTrans(Signa
   ndbrequire(!tx_ptr.isNull());
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, tx_ptr.p->m_userData);
+  c_tableRecordPool_.getPtr(indexPtr, tx_ptr.p->m_userData);
+  DictObjectPtr index_obj_ptr;
+  c_obj_pool.getPtr(index_obj_ptr, indexPtr.p->m_obj_ptr_i);
 
   char indexName[MAX_TAB_NAME_SIZE];
   {
-    DictObjectPtr obj_ptr;
-    c_obj_pool.getPtr(obj_ptr, indexPtr.p->m_obj_ptr_i);
-    LocalRope name(c_rope_pool, obj_ptr.p->m_name);
+    LocalRope name(c_rope_pool, index_obj_ptr.p->m_name);
     name.copy(indexName);
   }
 
@@ -3106,38 +3130,43 @@ Dbdict::activateIndex_fromEndTrans(Signa
   {
     jam();
     infoEvent("DICT: activate index %u done (%s)",
-	      indexPtr.i, indexName);
+             index_obj_ptr.p->m_id, indexName);
   }
   else
   {
     jam();
     warningEvent("DICT: activate index %u error: code=%u line=%u node=%u (%s)",
-		 indexPtr.i,
+                index_obj_ptr.p->m_id,
 		 error.errorCode, error.errorLine, error.errorNodeId,
 		 indexName);
   }
 
+  Uint32 id = index_obj_ptr.p->m_id;
   releaseTxHandle(tx_ptr);
-  activateIndexes(signal, indexPtr.i + 1);
+  activateIndexes(signal, id + 1);
 }
 
 void
-Dbdict::rebuildIndexes(Signal* signal, Uint32 i)
+Dbdict::rebuildIndexes(Signal* signal, Uint32 id)
 {
-  if (i == 0)
+  if (id == 0)
     D("rebuildIndexes start");
 
   TableRecordPtr indexPtr;
-  indexPtr.i = i;
-  for (; indexPtr.i < c_noOfMetaTables; indexPtr.i++) {
-    c_tableRecordPool.getPtr(indexPtr);
-    if (check_read_obj(indexPtr.i))
+  for (; id < c_noOfMetaTables; id++) {
+    bool ok = find_object(indexPtr, id);
+    if (!ok)
+    {
+      jam();
+      continue;
+    }
+    if (check_read_obj(id))
       continue;
     if (!indexPtr.p->isIndex())
       continue;
 
     // wl3600_todo use simple schema trans when implemented
-    D("rebuildIndexes i=" << indexPtr.i);
+    D("rebuildIndexes id=" << id);
 
     TxHandlePtr tx_ptr;
     seizeTxHandle(tx_ptr);
@@ -3176,8 +3205,9 @@ Dbdict::rebuildIndex_fromBeginTrans(Sign
   ndbrequire(!tx_ptr.isNull());
 
   TableRecordPtr indexPtr;
-  indexPtr.i = tx_ptr.p->m_userData;
-  c_tableRecordPool.getPtr(indexPtr);
+  c_tableRecordPool_.getPtr(indexPtr, tx_ptr.p->m_userData);
+  DictObjectPtr index_obj_ptr;
+  c_obj_pool.getPtr(index_obj_ptr,indexPtr.p->m_obj_ptr_i);
 
   BuildIndxReq* req = (BuildIndxReq*)signal->getDataPtrSend();
 
@@ -3193,7 +3223,7 @@ Dbdict::rebuildIndex_fromBeginTrans(Sign
   req->buildId = 0;
   req->buildKey = 0;
   req->tableId = indexPtr.p->primaryTableId;
-  req->indexId = indexPtr.i;
+  req->indexId = index_obj_ptr.p->m_id;
   req->indexType = indexPtr.p->tableType;
   req->parallelism = 16;
 
@@ -3245,7 +3275,7 @@ Dbdict::rebuildIndex_fromEndTrans(Signal
   ndbrequire(!tx_ptr.isNull());
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, tx_ptr.p->m_userData);
+  c_tableRecordPool_.getPtr(indexPtr, tx_ptr.p->m_userData);
 
   const char* actionName;
   {
@@ -3254,10 +3284,11 @@ Dbdict::rebuildIndex_fromEndTrans(Signal
     actionName = !noBuild ? "rebuild" : "online";
   }
 
+  DictObjectPtr obj_ptr;
+  c_obj_pool.getPtr(obj_ptr, indexPtr.p->m_obj_ptr_i);
+
   char indexName[MAX_TAB_NAME_SIZE];
   {
-    DictObjectPtr obj_ptr;
-    c_obj_pool.getPtr(obj_ptr, indexPtr.p->m_obj_ptr_i);
     LocalRope name(c_rope_pool, obj_ptr.p->m_name);
     name.copy(indexName);
   }
@@ -3267,20 +3298,20 @@ Dbdict::rebuildIndex_fromEndTrans(Signal
     jam();
     infoEvent(
         "DICT: %s index %u done (%s)",
-        actionName, indexPtr.i, indexName);
+        actionName, obj_ptr.p->m_id, indexName);
   } else {
     jam();
     warningEvent(
         "DICT: %s index %u error: code=%u line=%u node=%u (%s)",
         actionName,
-        indexPtr.i, error.errorCode, error.errorLine, error.errorNodeId,
+        obj_ptr.p->m_id, error.errorCode, error.errorLine, error.errorNodeId,
         indexName);
   }
 
-  Uint32 i = tx_ptr.p->m_userData;
+  Uint32 id = obj_ptr.p->m_id;
   releaseTxHandle(tx_ptr);
 
-  rebuildIndexes(signal, i + 1);
+  rebuildIndexes(signal, id + 1);
 }
 
 /* **************************************************************** */
@@ -4693,13 +4724,13 @@ Dbdict::release_object(Uint32 obj_ptr_i,
   LocalRope name(c_rope_pool, obj_name);
   name.erase();
 
+jam();
   c_obj_name_hash.remove(ptr);
-  if (!DictTabInfo::isTrigger(obj_ptr_p->m_type))
-  {
-    jam();
-    c_obj_id_hash.remove(ptr);
-  }
+jam();
+  c_obj_id_hash.remove(ptr);
+jam();
   c_obj_pool.release(ptr);
+jam();
 }
 
 void
@@ -4774,19 +4805,16 @@ void Dbdict::handleTabInfoInit(Signal * 
   }
 
   TableRecordPtr tablePtr;
+  Uint32 schemaFileId;
   switch (parseP->requestType) {
   case DictTabInfo::CreateTableFromAPI: {
     jam();
   }
   case DictTabInfo::AlterTableFromAPI:{
     jam();
-    tablePtr.i = getFreeTableRecord();
-    /* ---------------------------------------------------------------- */
-    // Check if no free tables existed.
-    /* ---------------------------------------------------------------- */
-    tabRequire(tablePtr.i != RNIL, CreateTableRef::NoMoreTableRecords);
-
-    c_tableRecordPool.getPtr(tablePtr);
+    schemaFileId = RNIL;
+    bool ok = seizeTableRecord(tablePtr,schemaFileId);
+    tabRequire(ok, CreateTableRef::NoMoreTableRecords);
     break;
   }
   case DictTabInfo::AddTableFromDict:
@@ -4796,20 +4824,16 @@ void Dbdict::handleTabInfoInit(Signal * 
 /* ---------------------------------------------------------------- */
 // Get table id and check that table doesn't already exist
 /* ---------------------------------------------------------------- */
-    tablePtr.i = c_tableDesc.TableId;
-
     if (parseP->requestType == DictTabInfo::ReadTableFromDiskSR) {
-      ndbrequire(tablePtr.i == c_restartRecord.activeTable);
+      ndbrequire(c_tableDesc.TableId == c_restartRecord.activeTable);
     }//if
     if (parseP->requestType == DictTabInfo::GetTabInfoConf) {
-      ndbrequire(tablePtr.i == c_restartRecord.activeTable);
+      ndbrequire(c_tableDesc.TableId == c_restartRecord.activeTable);
     }//if
 
-    c_tableRecordPool.getPtr(tablePtr);
-
-    //Uint32 oldTableVersion = tablePtr.p->tableVersion;
-    initialiseTableRecord(tablePtr);
-
+    schemaFileId = c_tableDesc.TableId;
+    bool ok = seizeTableRecord(tablePtr,schemaFileId);
+    ndbrequire(ok); // Already exists or out of memory
 /* ---------------------------------------------------------------- */
 // Set table version
 /* ---------------------------------------------------------------- */
@@ -4835,7 +4859,7 @@ void Dbdict::handleTabInfoInit(Signal * 
     DictObjectPtr obj_ptr;
     ndbrequire(c_obj_pool.seize(obj_ptr));
     new (obj_ptr.p) DictObject;
-    obj_ptr.p->m_id = tablePtr.i;
+    obj_ptr.p->m_id = schemaFileId;
     obj_ptr.p->m_type = c_tableDesc.TableType;
     obj_ptr.p->m_name = tablePtr.p->tableName;
     obj_ptr.p->m_ref_count = 0;
@@ -4847,11 +4871,11 @@ void Dbdict::handleTabInfoInit(Signal * 
     {
       g_eventLogger->info("Dbdict: %u: create name=%s,id=%u,obj_ptr_i=%d",__LINE__,
                           c_tableDesc.TableName,
-                          tablePtr.i, tablePtr.p->m_obj_ptr_i);
+                          schemaFileId, tablePtr.p->m_obj_ptr_i);
     }
     send_event(signal, trans_ptr,
                NDB_LE_CreateSchemaObject,
-               tablePtr.i,
+               schemaFileId,
                tablePtr.p->tableVersion,
                c_tableDesc.TableType);
   }
@@ -4995,12 +5019,12 @@ void Dbdict::handleTabInfoInit(Signal * 
         ndbrequire(c_tableDesc.UpdateTriggerId != RNIL);
         ndbrequire(c_tableDesc.DeleteTriggerId != RNIL);
         ndbout_c("table: %u UPGRADE saving (%u/%u/%u)",
-                 tablePtr.i,
+                 schemaFileId,
                  c_tableDesc.InsertTriggerId,
                  c_tableDesc.UpdateTriggerId,
                  c_tableDesc.DeleteTriggerId);
         infoEvent("table: %u UPGRADE saving (%u/%u/%u)",
-                  tablePtr.i,
+                  schemaFileId,
                   c_tableDesc.InsertTriggerId,
                   c_tableDesc.UpdateTriggerId,
                   c_tableDesc.DeleteTriggerId);
@@ -5059,66 +5083,78 @@ Dbdict::upgrade_seizeTrigger(TableRecord
    * The insert trigger will be "main" trigger so
    *   it does not need any special treatment
    */
-  const Uint32 size = c_triggerRecordPool.getSize();
+  const Uint32 size = c_triggerRecordPool_.getSize();
   ndbrequire(updateTriggerId == RNIL || updateTriggerId < size);
   ndbrequire(deleteTriggerId == RNIL || deleteTriggerId < size);
 
+  DictObjectPtr tab_obj_ptr;
+  c_obj_pool.getPtr(tab_obj_ptr, tabPtr.p->m_obj_ptr_i);
+
   TriggerRecordPtr triggerPtr;
   if (updateTriggerId != RNIL)
   {
     jam();
-    c_triggerRecordPool.getPtr(triggerPtr, updateTriggerId);
-    if (triggerPtr.p->triggerState == TriggerRecord::TS_NOT_DEFINED)
+    bool ok = find_object(triggerPtr, updateTriggerId);
+    if (!ok)
     {
       jam();
-      initialiseTriggerRecord(triggerPtr);
+      bool ok = seizeTriggerRecord(triggerPtr, updateTriggerId);
+      if (!ok)
+      {
+        jam();
+        ndbrequire(ok);
+      }
       triggerPtr.p->triggerState = TriggerRecord::TS_FAKE_UPGRADE;
-      triggerPtr.p->triggerId = triggerPtr.i;
       triggerPtr.p->tableId = tabPtr.p->primaryTableId;
-      triggerPtr.p->indexId = tabPtr.i;
+      triggerPtr.p->indexId = tab_obj_ptr.p->m_id;
       TriggerInfo::packTriggerInfo(triggerPtr.p->triggerInfo,
                                    g_hashIndexTriggerTmpl[0].triggerInfo);
 
       char buf[256];
       BaseString::snprintf(buf, sizeof(buf),
-                           "UPG_UPD_NDB$INDEX_%u_UI", tabPtr.i);
+                           "UPG_UPD_NDB$INDEX_%u_UI", tab_obj_ptr.p->m_id);
       {
         LocalRope name(c_rope_pool, triggerPtr.p->triggerName);
         name.assign(buf);
       }
 
       DictObjectPtr obj_ptr;
-      bool ok = c_obj_pool.seize(obj_ptr);
+      ok = c_obj_pool.seize(obj_ptr);
       ndbrequire(ok);
       new (obj_ptr.p) DictObject();
 
       obj_ptr.p->m_name = triggerPtr.p->triggerName;
       obj_ptr.p->m_ref_count = 0;
 
-      triggerPtr.p->m_obj_ptr_i = obj_ptr.i;
       obj_ptr.p->m_id = triggerPtr.p->triggerId;
       obj_ptr.p->m_type =TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo);
+      link_object(obj_ptr, triggerPtr);
       c_obj_name_hash.add(obj_ptr);
+      c_obj_id_hash.add(obj_ptr);
     }
   }
 
   if (deleteTriggerId != RNIL)
   {
     jam();
-    c_triggerRecordPool.getPtr(triggerPtr, deleteTriggerId);
-    if (triggerPtr.p->triggerState == TriggerRecord::TS_NOT_DEFINED)
+    bool ok = find_object(triggerPtr, deleteTriggerId); // TODO: msundell seizeTriggerRecord
+    if (!ok)
     {
       jam();
-      initialiseTriggerRecord(triggerPtr);
+      bool ok = seizeTriggerRecord(triggerPtr, deleteTriggerId);
+      if (!ok)
+      {
+        jam();
+        ndbrequire(ok);
+      }
       triggerPtr.p->triggerState = TriggerRecord::TS_FAKE_UPGRADE;
-      triggerPtr.p->triggerId = triggerPtr.i;
       triggerPtr.p->tableId = tabPtr.p->primaryTableId;
-      triggerPtr.p->indexId = tabPtr.i;
+      triggerPtr.p->indexId = tab_obj_ptr.p->m_id;
       TriggerInfo::packTriggerInfo(triggerPtr.p->triggerInfo,
                                    g_hashIndexTriggerTmpl[0].triggerInfo);
       char buf[256];
       BaseString::snprintf(buf, sizeof(buf),
-                           "UPG_DEL_NDB$INDEX_%u_UI", tabPtr.i);
+                           "UPG_DEL_NDB$INDEX_%u_UI", tab_obj_ptr.p->m_id);
 
       {
         LocalRope name(c_rope_pool, triggerPtr.p->triggerName);
@@ -5126,17 +5162,18 @@ Dbdict::upgrade_seizeTrigger(TableRecord
       }
 
       DictObjectPtr obj_ptr;
-      bool ok = c_obj_pool.seize(obj_ptr);
+      ok = c_obj_pool.seize(obj_ptr);
       ndbrequire(ok);
       new (obj_ptr.p) DictObject();
 
       obj_ptr.p->m_name = triggerPtr.p->triggerName;
       obj_ptr.p->m_ref_count = 0;
 
-      triggerPtr.p->m_obj_ptr_i = obj_ptr.i;
       obj_ptr.p->m_id = triggerPtr.p->triggerId;
       obj_ptr.p->m_type =TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo);
+      link_object(obj_ptr, triggerPtr);
       c_obj_name_hash.add(obj_ptr);
+      c_obj_id_hash.add(obj_ptr);
     }
   }
 }
@@ -5787,20 +5824,20 @@ Dbdict::createTable_parse(Signal* signal
     TableRecordPtr tabPtr = parseRecord.tablePtr;
 
     // link operation to object seized in handleTabInfoInit
+    DictObjectPtr obj_ptr;
     {
-      DictObjectPtr obj_ptr;
       Uint32 obj_ptr_i = tabPtr.p->m_obj_ptr_i;
       bool ok = findDictObject(op_ptr, obj_ptr, obj_ptr_i);
       ndbrequire(ok);
     }
 
     {
-      Uint32 version = getTableEntry(tabPtr.i)->m_tableVersion;
+      Uint32 version = getTableEntry(obj_ptr.p->m_id)->m_tableVersion;
       tabPtr.p->tableVersion = create_obj_inc_schema_version(version);
     }
 
     // fill in table id and version
-    impl_req->tableId = tabPtr.i;
+    impl_req->tableId = obj_ptr.p->m_id;
     impl_req->tableVersion = tabPtr.p->tableVersion;
 
     if (ERROR_INSERTED(6202) ||
@@ -5939,7 +5976,13 @@ Dbdict::createTable_parse(Signal* signal
   }
 
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, tableId);
+  bool ok = find_object(tabPtr, tableId);
+  if (!ok)
+  {
+    jam();
+    setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+    return;
+  }
   tabPtr.p->packedSize = tabInfoPtr.sz;
   // wl3600_todo verify version on slave
   tabPtr.p->tableVersion = tableVersion;
@@ -6032,7 +6075,7 @@ Dbdict::createTable_prepare(Signal* sign
 
   Uint32 tableId = createTabPtr.p->m_request.tableId;
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, tableId);
+  bool ok = find_object(tabPtr, tableId);
 
   Callback cb;
   cb.m_callbackData = op_ptr.p->op_key;
@@ -6048,6 +6091,7 @@ Dbdict::createTable_prepare(Signal* sign
     return;
   }
 
+  ndbrequire(ok);
   bool savetodisk = !(tabPtr.p->m_bits & TableRecord::TR_Temporary);
   if (savetodisk)
   {
@@ -6111,7 +6155,8 @@ Dbdict::createTab_local(Signal* signal,
   createTabPtr.p->m_callback = * c;
 
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
+  bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+  ndbrequire(ok);
 
   /**
    * Start by createing table in LQH
@@ -6141,7 +6186,7 @@ Dbdict::createTab_local(Signal* signal,
    * Create KeyDescriptor
    */
   {
-    KeyDescriptor* desc= g_key_descriptor_pool.getPtr(tabPtr.i);
+    KeyDescriptor* desc= g_key_descriptor_pool.getPtr(createTabPtr.p->m_request.tableId);
     new (desc) KeyDescriptor();
 
     if (tabPtr.p->primaryTableId == RNIL)
@@ -6225,7 +6270,8 @@ Dbdict::execCREATE_TAB_CONF(Signal* sign
   createTabPtr.p->m_lqhFragPtr = conf->lqhConnectPtr;
 
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
+  bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+  ndbrequire(ok);
   sendLQHADDATTRREQ(signal, op_ptr, tabPtr.p->m_attributes.firstItem);
 }
 
@@ -6240,7 +6286,8 @@ Dbdict::sendLQHADDATTRREQ(Signal* signal
   getOpRec(op_ptr, createTabPtr);
 
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
+  bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+  ndbrequire(ok);
 
   const bool isHashIndex = tabPtr.p->isHashIndex();
 
@@ -6368,8 +6415,8 @@ Dbdict::createTab_dih(Signal* signal, Sc
   D("createTab_dih" << *op_ptr.p);
 
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
-
+  bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+  ndbrequire(ok);
 
   /**
    * NOTE: use array access here...
@@ -6380,7 +6427,7 @@ Dbdict::createTab_dih(Signal* signal, Sc
 
   DiAddTabReq * req = (DiAddTabReq*)signal->getDataPtrSend();
   req->connectPtr = op_ptr.p->op_key;
-  req->tableId = tabPtr.i;
+  req->tableId = createTabPtr.p->m_request.tableId;
   req->fragType = tabPtr.p->fragmentType;
   req->kValue = tabPtr.p->kValue;
   req->noOfReplicas = 0;
@@ -6480,7 +6527,8 @@ Dbdict::execADD_FRAGREQ(Signal* signal)
   TableRecordPtr tabPtr;
   if (AlterTableReq::getAddFragFlag(changeMask))
   {
-    c_tableRecordPool.getPtr(tabPtr, tableId);
+    bool ok = find_object(tabPtr, tableId);
+    ndbrequire(ok);
     if (DictTabInfo::isTable(tabPtr.p->tableType))
     {
       jam();
@@ -6507,7 +6555,8 @@ Dbdict::execADD_FRAGREQ(Signal* signal)
     findSchemaOp(op_ptr, createTabPtr, senderData);
     ndbrequire(!op_ptr.isNull());
     createTabPtr.p->m_dihAddFragPtr = dihPtr;
-    c_tableRecordPool.getPtr(tabPtr, tableId);
+    bool ok = find_object(tabPtr, tableId);
+    ndbrequire(ok);
   }
 
 #if 0
@@ -6575,7 +6624,8 @@ Dbdict::execLQHFRAGCONF(Signal * signal)
     jam();
     SchemaOpPtr op_ptr;
     TableRecordPtr tabPtr;
-    c_tableRecordPool.getPtr(tabPtr, tableId);
+    bool ok = find_object(tabPtr, tableId);
+    ndbrequire(ok);
     if (DictTabInfo::isTable(tabPtr.p->tableType))
     {
       AlterTableRecPtr alterTabPtr;
@@ -6628,7 +6678,8 @@ Dbdict::execLQHFRAGREF(Signal * signal)
     jam();
     SchemaOpPtr op_ptr;
     TableRecordPtr tabPtr;
-    c_tableRecordPool.getPtr(tabPtr, tableId);
+    bool ok = find_object(tabPtr, tableId);
+    ndbrequire(ok);
     if (DictTabInfo::isTable(tabPtr.p->tableType))
     {
       jam();
@@ -6716,13 +6767,14 @@ Dbdict::execTAB_COMMITCONF(Signal* signa
   //const CreateTabReq* impl_req = &createTabPtr.p->m_request;
 
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, createTabPtr.p->m_request.tableId);
+  bool ok = find_object(tabPtr, createTabPtr.p->m_request.tableId);
+  ndbrequire(ok);
 
   if (refToBlock(signal->getSendersBlockRef()) == DBLQH) {
     jam();
     // prepare table in DBTC
     TcSchVerReq * req = (TcSchVerReq*)signal->getDataPtr();
-    req->tableId = tabPtr.i;
+    req->tableId = createTabPtr.p->m_request.tableId;
     req->tableVersion = tabPtr.p->tableVersion;
     req->tableLogged = (Uint32)!!(tabPtr.p->m_bits & TableRecord::TR_Logged);
     req->senderRef = reference();
@@ -6736,7 +6788,8 @@ Dbdict::execTAB_COMMITCONF(Signal* signa
     {
       jam();
       TableRecordPtr basePtr;
-      c_tableRecordPool.getPtr(basePtr, tabPtr.p->primaryTableId);
+      bool ok = find_object(basePtr, tabPtr.p->primaryTableId);
+      ndbrequire(ok);
       req->userDefinedPartition = (basePtr.p->fragmentType == DictTabInfo::UserDefined);
     }
 
@@ -6750,7 +6803,7 @@ Dbdict::execTAB_COMMITCONF(Signal* signa
     // commit table in DBTC
     signal->theData[0] = op_ptr.p->op_key;
     signal->theData[1] = reference();
-    signal->theData[2] = tabPtr.i;
+    signal->theData[2] = createTabPtr.p->m_request.tableId;
 
     sendSignal(DBTC_REF, GSN_TAB_COMMITREQ, signal, 3, JBB);
     return;
@@ -6815,7 +6868,8 @@ Dbdict::createTable_commit(Signal* signa
 
   Uint32 tableId = createTabPtr.p->m_request.tableId;
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, tableId);
+  bool ok = find_object(tabPtr, tableId);
+  ndbrequire(ok);
 
   D("createTable_commit" << *op_ptr.p);
 
@@ -6827,9 +6881,10 @@ Dbdict::createTable_commit(Signal* signa
   if (DictTabInfo::isIndex(tabPtr.p->tableType))
   {
     TableRecordPtr basePtr;
-    c_tableRecordPool.getPtr(basePtr, tabPtr.p->primaryTableId);
+    bool ok = find_object(basePtr, tabPtr.p->primaryTableId);
+    ndbrequire(ok);
 
-    LocalTableRecord_list list(c_tableRecordPool, basePtr.p->m_indexes);
+    LocalTableRecord_list list(c_tableRecordPool_, basePtr.p->m_indexes);
     list.add(tabPtr);
   }
 }
@@ -6875,7 +6930,8 @@ Dbdict::createTab_alterComplete(Signal* 
   const CreateTabReq* impl_req = &createTabPtr.p->m_request;
 
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, impl_req->tableId);
+  bool ok = find_object(tabPtr, impl_req->tableId);
+  ndbrequire(ok);
 
   D("createTab_alterComplete" << *op_ptr.p);
 
@@ -6937,13 +6993,17 @@ Dbdict::createTable_abortParse(Signal* s
     }
 
     TableRecordPtr tabPtr;
-    c_tableRecordPool.getPtr(tabPtr, tableId);
+    bool ok = find_object(tabPtr, tableId);
 
     // any link was to a new object
     if (hasDictObject(op_ptr)) {
       jam();
       unlinkDictObject(op_ptr);
-      releaseTableObject(tableId, true);
+      if (ok)
+      {
+        jam();
+        releaseTableObject(tabPtr.i, true);
+      }
     }
   } while (0);
 
@@ -6962,13 +7022,14 @@ Dbdict::createTable_abortPrepare(Signal*
   D("createTable_abortPrepare" << *op_ptr.p);
 
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, impl_req->tableId);
+  bool ok = find_object(tabPtr, impl_req->tableId);
+  ndbrequire(ok);
 
   // create drop table operation  wl3600_todo must pre-allocate
 
   SchemaOpPtr oplnk_ptr;
   DropTableRecPtr dropTabPtr;
-  bool ok = seizeLinkedSchemaOp(op_ptr, oplnk_ptr, dropTabPtr);
+  ok = seizeLinkedSchemaOp(op_ptr, oplnk_ptr, dropTabPtr);
   ndbrequire(ok);
 
   DropTabReq* aux_impl_req = &dropTabPtr.p->m_request;
@@ -7024,10 +7085,12 @@ Dbdict::createTable_abortLocalConf(Signa
   Uint32 tableId = impl_req->tableId;
 
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, tableId);
-
-  releaseTableObject(tableId);
-
+  bool ok = find_object(tablePtr, tableId);
+  if (ok)
+  {
+    jam();
+    releaseTableObject(tablePtr.i);
+  }
   createTabPtr.p->m_abortPrepareDone = true;
   sendTransConf(signal, op_ptr);
 }
@@ -7069,10 +7132,10 @@ void Dbdict::execCREATE_TABLE_REF(Signal
   handleDictRef(signal, ref);
 }
 
-void Dbdict::releaseTableObject(Uint32 tableId, bool removeFromHash)
+void Dbdict::releaseTableObject(Uint32 table_ptr_i, bool removeFromHash)
 {
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, tableId);
+  c_tableRecordPool_.getPtr(tablePtr, table_ptr_i);
   if (removeFromHash)
   {
     jam();
@@ -7124,9 +7187,12 @@ void Dbdict::releaseTableObject(Uint32 t
       {
         jam();
         TriggerRecordPtr triggerPtr;
-        c_triggerRecordPool.getPtr(triggerPtr, triggerId);
-        triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
-        release_object(triggerPtr.p->m_obj_ptr_i);
+        bool ok = find_object(triggerPtr, triggerId);
+        if (ok)
+        {
+          release_object(triggerPtr.p->m_obj_ptr_i);
+          c_triggerRecordPool_.release(triggerPtr);
+        }
       }
 
       triggerId = tablePtr.p->m_upgrade_trigger_handling.deleteTriggerId;
@@ -7134,13 +7200,16 @@ void Dbdict::releaseTableObject(Uint32 t
       {
         jam();
         TriggerRecordPtr triggerPtr;
-        c_triggerRecordPool.getPtr(triggerPtr, triggerId);
-        triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
-        release_object(triggerPtr.p->m_obj_ptr_i);
+        bool ok = find_object(triggerPtr, triggerId);
+        if (ok)
+        {
+          release_object(triggerPtr.p->m_obj_ptr_i);
+          c_triggerRecordPool_.release(triggerPtr);
+        }
       }
     }
   }
-
+  c_tableRecordPool_.release(tablePtr);
 }//releaseTableObject()
 
 // CreateTable: END
@@ -7252,6 +7321,7 @@ Dbdict::dropTable_parse(Signal* signal, 
   getOpRec(op_ptr, dropTabPtr);
   DropTabReq* impl_req = &dropTabPtr.p->m_request;
   Uint32 tableId = impl_req->tableId;
+  Uint32 err;
 
   TableRecordPtr tablePtr;
   if (!(tableId < c_noOfMetaTables)) {
@@ -7259,7 +7329,23 @@ Dbdict::dropTable_parse(Signal* signal, 
     setError(error, DropTableRef::NoSuchTable, __LINE__);
     return;
   }
-  c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+
+  err = check_read_obj(impl_req->tableId, trans_ptr.p->m_transId);
+  if (err)
+  {
+    jam();
+    setError(error, err, __LINE__);
+    return;
+  }
+
+  bool ok = find_object(tablePtr, impl_req->tableId);
+  if (!ok)
+  {
+    jam();
+    setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+    return;
+  }
+
 
   // check version first (api will retry)
   if (tablePtr.p->tableVersion != impl_req->tableVersion) {
@@ -7275,7 +7361,7 @@ Dbdict::dropTable_parse(Signal* signal, 
     return;
   }
 
-  if (check_write_obj(tablePtr.i,
+  if (check_write_obj(impl_req->tableId,
                       trans_ptr.p->m_transId,
                       SchemaFile::SF_DROP, error))
   {
@@ -7301,7 +7387,7 @@ Dbdict::dropTable_parse(Signal* signal, 
   SchemaFile::TableEntry te; te.init();
   te.m_tableState = SchemaFile::SF_DROP;
   te.m_transId = trans_ptr.p->m_transId;
-  Uint32 err = trans_log_schema_op(op_ptr, tableId, &te);
+  err = trans_log_schema_op(op_ptr, tableId, &te);
   if (err)
   {
     jam();
@@ -7398,7 +7484,8 @@ Dbdict::dropTable_backup_mutex_locked(Si
   const DropTabReq* impl_req = &dropTabPtr.p->m_request;
 
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+  bool ok = find_object(tablePtr, impl_req->tableId);
+  ndbrequire(ok);
 
   Mutex mutex(signal, c_mutexMgr, dropTabPtr.p->m_define_backup_mutex);
   mutex.unlock(); // ignore response
@@ -7426,7 +7513,8 @@ Dbdict::dropTable_commit(Signal* signal,
   D("dropTable_commit" << *op_ptr.p);
 
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, dropTabPtr.p->m_request.tableId);
+  bool ok = find_object(tablePtr, dropTabPtr.p->m_request.tableId);
+  ndbrequire(ok);
 
   if (tablePtr.p->m_tablespace_id != RNIL)
   {
@@ -7441,22 +7529,22 @@ Dbdict::dropTable_commit(Signal* signal,
     char buf[1024];
     LocalRope name(c_rope_pool, tablePtr.p->tableName);
     name.copy(buf);
-    g_eventLogger->info("Dbdict: drop name=%s,id=%u,obj_id=%u", buf, tablePtr.i,
+    g_eventLogger->info("Dbdict: drop name=%s,id=%u,obj_id=%u", buf, dropTabPtr.p->m_request.tableId,
                         tablePtr.p->m_obj_ptr_i);
   }
 
   send_event(signal, trans_ptr,
              NDB_LE_DropSchemaObject,
-             tablePtr.i,
+             dropTabPtr.p->m_request.tableId,
              tablePtr.p->tableVersion,
              tablePtr.p->tableType);
 
   if (DictTabInfo::isIndex(tablePtr.p->tableType))
   {
     TableRecordPtr basePtr;
-    c_tableRecordPool.getPtr(basePtr, tablePtr.p->primaryTableId);
-
-    LocalTableRecord_list list(c_tableRecordPool, basePtr.p->m_indexes);
+    bool ok = find_object(basePtr, tablePtr.p->primaryTableId);
+    ndbrequire(ok);
+    LocalTableRecord_list list(c_tableRecordPool_, basePtr.p->m_indexes);
     list.remove(tablePtr);
   }
   dropTabPtr.p->m_block = 0;
@@ -7602,9 +7690,6 @@ Dbdict::dropTable_complete(Signal* signa
   DropTableRecPtr dropTabPtr;
   getOpRec(op_ptr, dropTabPtr);
 
-  TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, dropTabPtr.p->m_request.tableId);
-
   dropTabPtr.p->m_block = 0;
   dropTabPtr.p->m_blockNo[0] = DBTC;
   dropTabPtr.p->m_blockNo[1] = DBLQH; // wait usage + LCP
@@ -7631,9 +7716,6 @@ Dbdict::dropTable_complete_nextStep(Sign
    */
   ndbrequire(!hasError(op_ptr.p->m_error));
 
-  TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
-
   Uint32 block = dropTabPtr.p->m_block;
   Uint32 blockNo = dropTabPtr.p->m_blockNo[block];
   D("dropTable_complete_nextStep" << hex << V(blockNo) << *op_ptr.p);
@@ -7716,7 +7798,12 @@ Dbdict::dropTable_complete_done(Signal* 
   Uint32 tableId = dropTabPtr.p->m_request.tableId;
 
   unlinkDictObject(op_ptr);
-  releaseTableObject(tableId);
+  TableRecordPtr tablePtr;
+  bool ok = find_object(tablePtr, tableId);
+  if (ok)
+  {
+    releaseTableObject(tablePtr.i);
+  }
 
   // inform SUMA
   {
@@ -7925,6 +8012,7 @@ Dbdict::alterTable_parse(Signal* signal,
   AlterTableRecPtr alterTabPtr;
   getOpRec(op_ptr, alterTabPtr);
   AlterTabReq* impl_req = &alterTabPtr.p->m_request;
+  Uint32 err;
 
   if (AlterTableReq::getReorgSubOp(impl_req->changeMask))
   {
@@ -7948,7 +8036,14 @@ Dbdict::alterTable_parse(Signal* signal,
     setError(error, AlterTableRef::NoSuchTable, __LINE__);
     return;
   }
-  c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+
+  bool ok = find_object(tablePtr, impl_req->tableId);
+  if (!ok)
+  {
+    jam();
+    setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+    return;
+  }
 
   if (tablePtr.p->m_read_locked)
   {
@@ -7957,7 +8052,7 @@ Dbdict::alterTable_parse(Signal* signal,
     return;
   }
 
-  if (check_write_obj(tablePtr.i, trans_ptr.p->m_transId,
+  if (check_write_obj(impl_req->tableId, trans_ptr.p->m_transId,
                       SchemaFile::SF_ALTER, error))
   {
     jam();
@@ -8319,7 +8414,7 @@ Dbdict::alterTable_parse(Signal* signal,
   te.m_gcp = 0;
   te.m_transId = trans_ptr.p->m_transId;
 
-  Uint32 err = trans_log_schema_op(op_ptr, impl_req->tableId, &te);
+  err = trans_log_schema_op(op_ptr, impl_req->tableId, &te);
   if (err)
   {
     jam();
@@ -8438,8 +8533,9 @@ Dbdict::alterTable_subOps(Signal* signal
       jam();
       TableRecordPtr tabPtr;
       TableRecordPtr indexPtr;
-      c_tableRecordPool.getPtr(tabPtr, impl_req->tableId);
-      LocalTableRecord_list list(c_tableRecordPool, tabPtr.p->m_indexes);
+      bool ok = find_object(tabPtr, impl_req->tableId);
+      ndbrequire(ok);
+      LocalTableRecord_list list(c_tableRecordPool_, tabPtr.p->m_indexes);
       Uint32 ptrI = alterTabPtr.p->m_sub_add_frag_index_ptr;
 
       if (ptrI == RNIL)
@@ -8586,7 +8682,8 @@ Dbdict::alterTable_toAlterIndex(Signal* 
   SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, alterTabPtr.p->m_sub_add_frag_index_ptr);
+  c_tableRecordPool_.getPtr(indexPtr, alterTabPtr.p->m_sub_add_frag_index_ptr);
+  ndbrequire(!indexPtr.isNull());
 
   AlterIndxReq* req = (AlterIndxReq*)signal->getDataPtrSend();
   req->clientRef = reference();
@@ -8713,8 +8810,6 @@ Dbdict::alterTable_toCreateTrigger(Signa
   AlterTableRecPtr alterTablePtr;
   getOpRec(op_ptr, alterTablePtr);
   const AlterTabReq* impl_req = &alterTablePtr.p->m_request;
-  TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
 
   const TriggerTmpl& triggerTmpl = g_reorgTriggerTmpl[0];
 
@@ -8776,7 +8871,8 @@ Dbdict::alterTable_toCopyData(Signal* si
   getOpRec(op_ptr, alterTablePtr);
   const AlterTabReq* impl_req = &alterTablePtr.p->m_request;
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+  bool ok = find_object(tablePtr, impl_req->tableId);
+  ndbrequire(ok);
 
   CopyDataReq* req = (CopyDataReq*)signal->getDataPtrSend();
 
@@ -8951,7 +9047,8 @@ Dbdict::alterTable_backup_mutex_locked(S
   const AlterTabReq* impl_req = &alterTabPtr.p->m_request;
 
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+  bool ok = find_object(tablePtr, impl_req->tableId);
+  ndbrequire(ok);
 
   Mutex mutex(signal, c_mutexMgr, alterTabPtr.p->m_define_backup_mutex);
   mutex.unlock(); // ignore response
@@ -9152,7 +9249,8 @@ Dbdict::alterTable_commit(Signal* signal
   D("alterTable_commit" << *op_ptr.p);
 
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+  bool ok = find_object(tablePtr, impl_req->tableId);
+  ndbrequire(ok);
 
   if (op_ptr.p->m_sections)
   {
@@ -9398,7 +9496,8 @@ Dbdict::alterTable_fromCommitComplete(Si
 
   const Uint32 tableId = impl_req->tableId;
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, tableId);
+  bool ok = find_object(tablePtr, tableId);
+  ndbrequire(ok);
 
   // inform Suma so it can send events to any subscribers of the table
   {
@@ -9783,7 +9882,7 @@ void Dbdict::execGET_TABLEDID_REQ(Signal
   }
 
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, obj_ptr_p->m_id);
+  c_tableRecordPool_.getPtr(tablePtr, obj_ptr_p->m_object_ptr_i);
 
   GetTableIdConf * conf = (GetTableIdConf *)req;
   conf->tableId = tablePtr.p->tableId;
@@ -10112,19 +10211,19 @@ void Dbdict::sendOLD_LIST_TABLES_CONF(Si
 
     TableRecordPtr tablePtr;
     if (DictTabInfo::isTable(type) || DictTabInfo::isIndex(type)){
-      c_tableRecordPool.getPtr(tablePtr, iter.curr.p->m_id);
+      c_tableRecordPool_.getPtr(tablePtr, iter.curr.p->m_object_ptr_i);
 
       if(reqListIndexes && (reqTableId != tablePtr.p->primaryTableId))
 	continue;
 
       conf->tableData[pos] = 0;
-      conf->setTableId(pos, tablePtr.i); // id
+      conf->setTableId(pos, iter.curr.p->m_id); // id
       conf->setTableType(pos, type); // type
       // state
 
       if(DictTabInfo::isTable(type))
       {
-        SchemaFile::TableEntry * te = getTableEntry(xsf, tablePtr.i);
+        SchemaFile::TableEntry * te = getTableEntry(xsf, iter.curr.p->m_id);
         switch(te->m_tableState){
         case SchemaFile::SF_CREATE:
           jam();
@@ -10192,24 +10291,30 @@ void Dbdict::sendOLD_LIST_TABLES_CONF(Si
     }
     if(DictTabInfo::isTrigger(type)){
       TriggerRecordPtr triggerPtr;
-      c_triggerRecordPool.getPtr(triggerPtr, iter.curr.p->m_id);
-
+      bool ok = find_object(triggerPtr, iter.curr.p->m_id);
       conf->tableData[pos] = 0;
-      conf->setTableId(pos, triggerPtr.i);
+      conf->setTableId(pos, iter.curr.p->m_id);
       conf->setTableType(pos, type);
-      switch (triggerPtr.p->triggerState) {
-      case TriggerRecord::TS_DEFINING:
-	conf->setTableState(pos, DictTabInfo::StateBuilding);
-	break;
-      case TriggerRecord::TS_OFFLINE:
-	conf->setTableState(pos, DictTabInfo::StateOffline);
-	break;
-      case TriggerRecord::TS_ONLINE:
-	conf->setTableState(pos, DictTabInfo::StateOnline);
-	break;
-      default:
-	conf->setTableState(pos, DictTabInfo::StateBroken);
-	break;
+      if (!ok)
+      {
+        conf->setTableState(pos, DictTabInfo::StateBroken);
+      }
+      else
+      {
+        switch (triggerPtr.p->triggerState) {
+        case TriggerRecord::TS_DEFINING:
+          conf->setTableState(pos, DictTabInfo::StateBuilding);
+          break;
+        case TriggerRecord::TS_OFFLINE:
+          conf->setTableState(pos, DictTabInfo::StateOffline);
+          break;
+        case TriggerRecord::TS_ONLINE:
+          conf->setTableState(pos, DictTabInfo::StateOnline);
+          break;
+        default:
+          conf->setTableState(pos, DictTabInfo::StateBroken);
+          break;
+        }
       }
       conf->setTableStore(pos, DictTabInfo::StoreNotLogged);
       pos++;
@@ -10335,18 +10440,18 @@ void Dbdict::sendLIST_TABLES_CONF(Signal
 
     TableRecordPtr tablePtr;
     if (DictTabInfo::isTable(type) || DictTabInfo::isIndex(type)){
-      c_tableRecordPool.getPtr(tablePtr, iter.curr.p->m_id);
+      c_tableRecordPool_.getPtr(tablePtr, iter.curr.p->m_object_ptr_i);
 
       if(reqListIndexes && (reqTableId != tablePtr.p->primaryTableId))
 	goto flush;
 
       ltd.requestData = 0; // clear
-      ltd.setTableId(tablePtr.i); // id
+      ltd.setTableId(iter.curr.p->m_id); // id
       ltd.setTableType(type); // type
       // state
 
       if(DictTabInfo::isTable(type)){
-        SchemaFile::TableEntry * te = getTableEntry(xsf, tablePtr.i);
+        SchemaFile::TableEntry * te = getTableEntry(xsf, iter.curr.p->m_id);
         switch(te->m_tableState){
         case SchemaFile::SF_CREATE:
           jam();
@@ -10413,24 +10518,31 @@ void Dbdict::sendLIST_TABLES_CONF(Signal
     }
     if(DictTabInfo::isTrigger(type)){
       TriggerRecordPtr triggerPtr;
-      c_triggerRecordPool.getPtr(triggerPtr, iter.curr.p->m_id);
+      bool ok = find_object(triggerPtr, iter.curr.p->m_id);
 
       ltd.requestData = 0;
-      ltd.setTableId(triggerPtr.i);
+      ltd.setTableId(iter.curr.p->m_id);
       ltd.setTableType(type);
-      switch (triggerPtr.p->triggerState) {
-      case TriggerRecord::TS_DEFINING:
-	ltd.setTableState(DictTabInfo::StateBuilding);
-	break;
-      case TriggerRecord::TS_OFFLINE:
-	ltd.setTableState(DictTabInfo::StateOffline);
-	break;
-      case TriggerRecord::TS_ONLINE:
-	ltd.setTableState(DictTabInfo::StateOnline);
-	break;
-      default:
-	ltd.setTableState(DictTabInfo::StateBroken);
-	break;
+      if (!ok)
+      {
+        ltd.setTableState(DictTabInfo::StateBroken);
+      }
+      else
+      {
+        switch (triggerPtr.p->triggerState) {
+        case TriggerRecord::TS_DEFINING:
+          ltd.setTableState(DictTabInfo::StateBuilding);
+          break;
+        case TriggerRecord::TS_OFFLINE:
+          ltd.setTableState(DictTabInfo::StateOffline);
+          break;
+        case TriggerRecord::TS_ONLINE:
+          ltd.setTableState(DictTabInfo::StateOnline);
+          break;
+        default:
+          ltd.setTableState(DictTabInfo::StateBroken);
+          break;
+        }
       }
       ltd.setTableStore(DictTabInfo::StoreNotLogged);
     }
@@ -10760,16 +10872,16 @@ Dbdict::createIndex_parse(Signal* signal
       setError(error, CreateIndxRef::InvalidPrimaryTable, __LINE__);
       return;
     }
-    c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+    bool ok = find_object(tablePtr, impl_req->tableId);
+    if (!ok || !tablePtr.p->isTable()) {
 
-    if (!tablePtr.p->isTable()) {
       jam();
       setError(error, CreateIndxRef::InvalidPrimaryTable, __LINE__);
       return;
     }
 
     Uint32 err;
-    if ((err = check_read_obj(tablePtr.i, trans_ptr.p->m_transId)))
+    if ((err = check_read_obj(impl_req->tableId, trans_ptr.p->m_transId)))
     {
       jam();
       setError(error, err, __LINE__);
@@ -10975,8 +11087,8 @@ Dbdict::createIndex_toCreateTable(Signal
   getOpRec(op_ptr, createIndexPtr);
 
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, createIndexPtr.p->m_request.tableId);
-  ndbrequire(tablePtr.i == tablePtr.p->tableId);
+  bool ok = find_object(tablePtr, createIndexPtr.p->m_request.tableId);
+  ndbrequire(ok);
 
   // signal data writer
   Uint32* wbuffer = &c_indexPage.word[0];
@@ -11448,21 +11560,30 @@ Dbdict::dropIndex_parse(Signal* signal, 
                         SectionHandle& handle, ErrorInfo& error)
 {
   D("dropIndex_parse" << V(op_ptr.i) << *op_ptr.p);
+  jam();
 
   SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
   DropIndexRecPtr dropIndexPtr;
   getOpRec(op_ptr, dropIndexPtr);
   DropIndxImplReq* impl_req = &dropIndexPtr.p->m_request;
 
-  TableRecordPtr indexPtr;
   if (!(impl_req->indexId < c_noOfMetaTables)) {
     jam();
     setError(error, DropIndxRef::IndexNotFound, __LINE__);
     return;
   }
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
 
-  if (!indexPtr.p->isIndex())
+  Uint32 err = check_read_obj(impl_req->indexId, trans_ptr.p->m_transId);
+  if (err)
+  {
+    jam();
+    setError(error, err, __LINE__);
+    return;
+  }
+
+  TableRecordPtr indexPtr;
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  if (!ok || !indexPtr.p->isIndex())
   {
     jam();
     setError(error, DropIndxRef::NotAnIndex, __LINE__);
@@ -11476,16 +11597,21 @@ Dbdict::dropIndex_parse(Signal* signal, 
     return;
   }
 
-  if (check_write_obj(indexPtr.i, trans_ptr.p->m_transId,
+  if (check_write_obj(impl_req->indexId, trans_ptr.p->m_transId,
                       SchemaFile::SF_DROP, error))
   {
     jam();
     return;
   }
 
-  ndbrequire(indexPtr.p->primaryTableId != RNIL);
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, indexPtr.p->primaryTableId);
+  ok = find_object(tablePtr, indexPtr.p->primaryTableId);
+  if (!ok)
+  {
+    jam();
+    setError(error, CreateIndxRef::InvalidPrimaryTable, __LINE__);
+    return;
+  }
 
   // master sets primary table, slave verifies it agrees
   if (master)
@@ -11942,7 +12068,21 @@ Dbdict::alterIndex_parse(Signal* signal,
     setError(error, AlterIndxRef::IndexNotFound, __LINE__);
     return;
   }
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  if (check_read_obj(impl_req->indexId, trans_ptr.p->m_transId) == GetTabInfoRef::TableNotDefined)
+  {
+    jam();
+    setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+    return;
+  }
+  jam();
+
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  if (!ok)
+  {
+    jam();
+    setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+    return;
+  }
 
   // get name for system index check later
   char indexName[MAX_TAB_NAME_SIZE];
@@ -11959,7 +12099,7 @@ Dbdict::alterIndex_parse(Signal* signal,
     return;
   }
 
-  if (check_write_obj(indexPtr.i, trans_ptr.p->m_transId,
+  if (check_write_obj(impl_req->indexId, trans_ptr.p->m_transId,
                       SchemaFile::SF_ALTER, error))
   {
     jam();
@@ -11995,7 +12135,8 @@ Dbdict::alterIndex_parse(Signal* signal,
 
   ndbrequire(indexPtr.p->primaryTableId != RNIL);
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, indexPtr.p->primaryTableId);
+  ok = find_object(tablePtr, indexPtr.p->primaryTableId);
+  ndbrequire(ok); // TODO:msundell set error
 
   // master sets primary table, participant verifies it agrees
   if (master)
@@ -12122,7 +12263,9 @@ void
 Dbdict::set_index_stat_frag(Signal* signal, TableRecordPtr indexPtr)
 {
   jam();
-  const Uint32 indexId = indexPtr.i;
+  DictObjectPtr index_obj_ptr;
+  c_obj_pool.getPtr(index_obj_ptr, indexPtr.p->m_obj_ptr_i);
+  const Uint32 indexId = index_obj_ptr.p->m_id;
   Uint32 err = get_fragmentation(signal, indexId);
   ndbrequire(err == 0);
   // format: R F { fragId node1 .. nodeR } x { F }
@@ -12133,7 +12276,7 @@ Dbdict::set_index_stat_frag(Signal* sign
   ndbrequire(noOfFragments != 0 && noOfReplicas != 0);
 
   // distribute by table and index id
-  const Uint32 value = indexPtr.p->primaryTableId + indexPtr.i;
+  const Uint32 value = indexPtr.p->primaryTableId + indexId;
   const Uint32 fragId = value % noOfFragments;
   const Uint32 fragIndex = 2 + (1 + noOfReplicas) * fragId;
   const Uint32 nodeIndex = value % noOfReplicas;
@@ -12153,8 +12296,6 @@ Dbdict::alterIndex_subOps(Signal* signal
   getOpRec(op_ptr, alterIndexPtr);
   const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
   Uint32 requestType = impl_req->requestType;
-  TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
 
   // ops to create or drop triggers
   if (alterIndexPtr.p->m_sub_trigger == false)
@@ -12206,7 +12347,10 @@ Dbdict::alterIndex_subOps(Signal* signal
     return true;
   }
 
-  if (indexPtr.p->isOrderedIndex() &&
+  TableRecordPtr indexPtr;
+  bool ok = find_object(indexPtr, impl_req->indexId);
+
+  if (ok && indexPtr.p->isOrderedIndex() &&
       (!alterIndexPtr.p->m_sub_index_stat_dml ||
        !alterIndexPtr.p->m_sub_index_stat_mon)) {
     jam();
@@ -12232,7 +12376,8 @@ Dbdict::alterIndex_toCreateTrigger(Signa
   getOpRec(op_ptr, alterIndexPtr);
   const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok);
 
   const TriggerTmpl& triggerTmpl = alterIndexPtr.p->m_triggerTmpl[0];
 
@@ -12326,7 +12471,8 @@ Dbdict::alterIndex_toDropTrigger(Signal*
   const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok);
 
   //const TriggerTmpl& triggerTmpl = alterIndexPtr.p->m_triggerTmpl[0];
 
@@ -12489,7 +12635,8 @@ Dbdict::alterIndex_toIndexStat(Signal* s
   DictSignal::addRequestFlagsGlobal(requestInfo, op_ptr.p->m_requestInfo);
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok);
 
   req->clientRef = reference();
   req->clientData = op_ptr.p->op_key;
@@ -12593,7 +12740,8 @@ Dbdict::alterIndex_prepare(Signal* signa
   Uint32 requestType = impl_req->requestType;
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok);
 
   D("alterIndex_prepare" << *op_ptr.p);
 
@@ -12653,7 +12801,8 @@ Dbdict::alterIndex_toCreateLocal(Signal*
   const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok);
 
   D("alterIndex_toCreateLocal" << *op_ptr.p);
 
@@ -12686,9 +12835,6 @@ Dbdict::alterIndex_toDropLocal(Signal* s
   getOpRec(op_ptr, alterIndexPtr);
   const AlterIndxImplReq* impl_req = &alterIndexPtr.p->m_request;
 
-  TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
   D("alterIndex_toDropLocal" << *op_ptr.p);
 
   DropIndxImplReq* req = (DropIndxImplReq*)signal->getDataPtrSend();
@@ -12865,7 +13011,12 @@ Dbdict::alterIndex_abortParse(Signal* si
     }
 
     TableRecordPtr indexPtr;
-    c_tableRecordPool.getPtr(indexPtr, indexId);
+    bool ok = find_object(indexPtr, indexId);
+    if (!ok)
+    {
+      jam();
+      break;
+    }
 
     switch (requestType) {
     case AlterIndxImplReq::AlterIndexOnline:
@@ -13105,20 +13256,30 @@ Dbdict::buildIndex_parse(Signal* signal,
                          SchemaOpPtr op_ptr,
                          SectionHandle& handle, ErrorInfo& error)
 {
-  D("buildIndex_parse");
+   D("buildIndex_parse");
 
+  SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
   BuildIndexRecPtr buildIndexPtr;
   getOpRec(op_ptr, buildIndexPtr);
   BuildIndxImplReq* impl_req = &buildIndexPtr.p->m_request;
+  Uint32 err;
 
   // get index
   TableRecordPtr indexPtr;
-  if (!(impl_req->indexId < c_noOfMetaTables)) {
+  err = check_read_obj(impl_req->indexId, trans_ptr.p->m_transId);
+  if (err)
+  {
     jam();
-    setError(error, BuildIndxRef::IndexNotFound, __LINE__);
+    setError(error, err, __LINE__);
+    return;
+  }
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  if (!ok)
+  {
+    jam();
+    setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
     return;
   }
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
 
   ndbrequire(indexPtr.p->primaryTableId == impl_req->tableId);
 
@@ -13129,7 +13290,13 @@ Dbdict::buildIndex_parse(Signal* signal,
     setError(error, BuildIndxRef::IndexNotFound, __LINE__);
     return;
   }
-  c_tableRecordPool.getPtr(tablePtr, impl_req->tableId);
+  ok = find_object(tablePtr, impl_req->tableId);
+  if (!ok)
+  {
+    jam();
+    setError(error, GetTabInfoRef::TableNotDefined, __LINE__);
+    return;
+  }
 
   // set attribute lists
   getIndexAttrList(indexPtr, buildIndexPtr.p->m_indexKeyList);
@@ -13401,9 +13568,6 @@ Dbdict::buildIndex_toDropConstraint(Sign
   getOpRec(op_ptr, buildIndexPtr);
   const BuildIndxImplReq* impl_req = &buildIndexPtr.p->m_request;
 
-  TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
   const TriggerTmpl& triggerTmpl = buildIndexPtr.p->m_triggerTmpl[0];
 
   DropTrigReq* req = (DropTrigReq*)signal->getDataPtrSend();
@@ -13488,9 +13652,6 @@ Dbdict::buildIndex_reply(Signal* signal,
 
   D("buildIndex_reply" << V(impl_req->indexId));
 
-  TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
   if (!hasError(error)) {
     BuildIndxConf* conf = (BuildIndxConf*)signal->getDataPtrSend();
     conf->senderRef = reference();
@@ -13553,7 +13714,8 @@ Dbdict:: buildIndex_toLocalBuild(Signal*
   SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok);
 
   D("buildIndex_toLocalBuild");
 
@@ -13654,7 +13816,8 @@ Dbdict::buildIndex_toLocalOnline(Signal*
   const BuildIndxImplReq* impl_req = &buildIndexPtr.p->m_request;
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok);
 
   D("buildIndex_toLocalOnline");
 
@@ -13705,7 +13868,8 @@ Dbdict::buildIndex_fromLocalOnline(Signa
   const BuildIndxImplReq* impl_req = &buildIndexPtr.p->m_request;
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok);
 
   D("buildIndex_fromLocalOnline");
 
@@ -13880,20 +14044,23 @@ Dbdict::indexStat_parse(Signal* signal, 
 {
   D("indexStat_parse");
 
+  SchemaTransPtr trans_ptr = op_ptr.p->m_trans_ptr;
   IndexStatRecPtr indexStatPtr;
   getOpRec(op_ptr, indexStatPtr);
   IndexStatImplReq* impl_req = &indexStatPtr.p->m_request;
+  Uint32 err;
 
   // get index
   TableRecordPtr indexPtr;
-  if (!(impl_req->indexId < c_noOfMetaTables)) {
+  err = check_read_obj(impl_req->indexId, trans_ptr.p->m_transId);
+  if (err)
+  {
     jam();
-    setError(error, IndexStatRef::InvalidIndex, __LINE__);
+    setError(error, err, __LINE__);
     return;
   }
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
-  if (!indexPtr.p->isOrderedIndex()) {
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  if (!ok || !indexPtr.p->isOrderedIndex()) {
     jam();
     setError(error, IndexStatRef::InvalidIndex, __LINE__);
     return;
@@ -14023,7 +14190,8 @@ Dbdict::indexStat_toIndexStat(Signal* si
   DictSignal::addRequestFlagsGlobal(requestInfo, op_ptr.p->m_requestInfo);
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok);
 
   req->clientRef = reference();
   req->clientData = op_ptr.p->op_key;
@@ -14084,9 +14252,6 @@ Dbdict::indexStat_reply(Signal* signal, 
 
   D("indexStat_reply" << V(impl_req->indexId));
 
-  TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-
   if (!hasError(error)) {
     IndexStatConf* conf = (IndexStatConf*)signal->getDataPtrSend();
     conf->senderRef = reference();
@@ -14143,8 +14308,8 @@ Dbdict::indexStat_toLocalStat(Signal* si
   D("indexStat_toLocalStat");
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
-  ndbrequire(indexPtr.p->isOrderedIndex());
+  bool ok = find_object(indexPtr, impl_req->indexId);
+  ndbrequire(ok && indexPtr.p->isOrderedIndex());
 
   Callback c = {
     safe_cast(&Dbdict::indexStat_fromLocalStat),
@@ -14352,7 +14517,12 @@ Dbdict::execINDEX_STAT_REP(Signal* signa
     jam();
     return;
   }
-  c_tableRecordPool.getPtr(indexPtr, rep->indexId);
+  bool ok = find_object(indexPtr, rep->indexId);
+  if (!ok)
+  {
+    jam();
+    return;
+  }
   if (rep->indexVersion != 0 &&
       rep->indexVersion != indexPtr.p->tableVersion) {
     jam();
@@ -14370,7 +14540,7 @@ Dbdict::execINDEX_STAT_REP(Signal* signa
   D("index stat: " << copyRope<MAX_TAB_NAME_SIZE>(indexPtr.p->tableName)
     << " request type:" << rep->requestType);
 
-  infoEvent("DICT: index %u stats auto-update requested", indexPtr.i);
+  infoEvent("DICT: index %u stats auto-update requested", rep->indexId);
   indexPtr.p->indexStatBgRequest = rep->requestType;
 }
 
@@ -14400,8 +14570,8 @@ Dbdict::indexStatBg_process(Signal* sign
       jam();
       continue;
     }
-    c_tableRecordPool.getPtr(indexPtr, c_indexStatBgId);
-    if (!indexPtr.p->isOrderedIndex()) {
+    bool ok = find_object(indexPtr, c_indexStatBgId);
+    if (!ok || !indexPtr.p->isOrderedIndex()) {
       jam();
       continue;
     }
@@ -14437,15 +14607,16 @@ Dbdict::indexStatBg_fromBeginTrans(Signa
   findTxHandle(tx_ptr, tx_key);
   ndbrequire(!tx_ptr.isNull());
 
-  TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, c_indexStatBgId);
-
   if (ret != 0) {
     jam();
     indexStatBg_sendContinueB(signal);
     return;
   }
 
+  TableRecordPtr indexPtr;
+  bool ok = find_object(indexPtr, c_indexStatBgId);
+  ndbrequire(ok);
+
   Callback c = {
     safe_cast(&Dbdict::indexStatBg_fromIndexStat),
     tx_ptr.p->tx_key
@@ -14475,13 +14646,10 @@ Dbdict::indexStatBg_fromIndexStat(Signal
   findTxHandle(tx_ptr, tx_key);
   ndbrequire(!tx_ptr.isNull());
 
-  TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, c_indexStatBgId);
-
   if (ret != 0) {
     jam();
     setError(tx_ptr.p->m_error, ret, __LINE__);
-    warningEvent("DICT: index %u stats auto-update error: %d", indexPtr.i, ret);
+    warningEvent("DICT: index %u stats auto-update error: %d", c_indexStatBgId, ret);
   }
 
   Callback c = {
@@ -14506,17 +14674,18 @@ Dbdict::indexStatBg_fromEndTrans(Signal*
   ndbrequire(!tx_ptr.isNull());
 
   TableRecordPtr indexPtr;
-  c_tableRecordPool.getPtr(indexPtr, c_indexStatBgId);
+  bool ok = find_object(indexPtr, c_indexStatBgId);
 
   if (ret != 0) {
     jam();
     // skip over but leave the request on
-    warningEvent("DICT: index %u stats auto-update error: %d", indexPtr.i, ret);
+    warningEvent("DICT: index %u stats auto-update error: %d", c_indexStatBgId, ret);
   } else {
     jam();
+    ndbrequire(ok);
     // mark request done
     indexPtr.p->indexStatBgRequest = 0;
-    infoEvent("DICT: index %u stats auto-update done", indexPtr.i);
+    infoEvent("DICT: index %u stats auto-update done", c_indexStatBgId);
   }
 
   releaseTxHandle(tx_ptr);
@@ -14701,7 +14870,8 @@ Dbdict::copyData_prepare(Signal* signal,
   Uint32 tmp[MAX_ATTRIBUTES_IN_TABLE];
   bool tabHasDiskCols = false;
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, impl_req->srcTableId);
+  bool ok = find_object(tabPtr, impl_req->srcTableId);
+  ndbrequire(ok);
   {
     LocalAttributeRecord_list alist(c_attributeRecordPool,
                                            tabPtr.p->m_attributes);
@@ -14802,7 +14972,8 @@ Dbdict::copyData_complete(Signal* signal
   Uint32 tmp[MAX_ATTRIBUTES_IN_TABLE];
   bool tabHasDiskCols = false;
   TableRecordPtr tabPtr;
-  c_tableRecordPool.getPtr(tabPtr, impl_req->srcTableId);
+  bool ok = find_object(tabPtr, impl_req->srcTableId);
+  ndbrequire(ok);
   {
     LocalAttributeRecord_list alist(c_attributeRecordPool,
                                            tabPtr.p->m_attributes);
@@ -15073,7 +15244,7 @@ Dbdict::prepareTransactionEventSysTable 
 
   ndbrequire(opj_ptr_p != 0);
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, opj_ptr_p->m_id);
+  c_tableRecordPool_.getPtr(tablePtr, opj_ptr_p->m_object_ptr_i);
   ndbrequire(tablePtr.i != RNIL); // system table must exist
 
   Uint32 tableId = tablePtr.p->tableId; /* System table */
@@ -15691,7 +15862,7 @@ void Dbdict::executeTransEventSysTable(C
 
   ndbrequire(opj_ptr_p != 0);
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, opj_ptr_p->m_id);
+  c_tableRecordPool_.getPtr(tablePtr, opj_ptr_p->m_object_ptr_i);
   ndbrequire(tablePtr.i != RNIL); // system table must exist
 
   Uint32 noAttr = tablePtr.p->noOfAttributes;
@@ -15871,7 +16042,7 @@ void Dbdict::parseReadEventSys(Signal* s
 
   ndbrequire(opj_ptr_p != 0);
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, opj_ptr_p->m_id);
+  c_tableRecordPool_.getPtr(tablePtr, opj_ptr_p->m_object_ptr_i);
   ndbrequire(tablePtr.i != RNIL); // system table must exist
 
   Uint32 noAttr = tablePtr.p->noOfAttributes;
@@ -15961,7 +16132,7 @@ void Dbdict::createEventUTIL_EXECUTE(Sig
       }
 
       TableRecordPtr tablePtr;
-      c_tableRecordPool.getPtr(tablePtr, obj_ptr_p->m_id);
+      c_tableRecordPool_.getPtr(tablePtr, obj_ptr_p->m_object_ptr_i);
       evntRec->m_request.setTableId(tablePtr.p->tableId);
       evntRec->m_request.setTableVersion(tablePtr.p->tableVersion);
 
@@ -17735,28 +17906,33 @@ Dbdict::createTrigger_parse(Signal* sign
       impl_req->triggerId = getFreeTriggerRecord();
       if (impl_req->triggerId == RNIL)
       {
-	jam();
-	setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
-	return;
+        jam();
+        setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
+        return;
+      }
+      bool ok = find_object(triggerPtr, impl_req->triggerId);
+      if (ok)
+      {
+        jam();
+        setError(error, CreateTrigRef::TriggerExists, __LINE__);
+        return;
       }
-      c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
-      ndbrequire(triggerPtr.p->triggerState == TriggerRecord::TS_NOT_DEFINED);
       D("master allocated triggerId " << impl_req->triggerId);
     }
     else
     {
-      if (!(impl_req->triggerId < c_triggerRecordPool.getSize()))
+      if (!(impl_req->triggerId < c_triggerRecordPool_.getSize()))
       {
 	jam();
 	setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
 	return;
       }
-      c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
-      if (triggerPtr.p->triggerState != TriggerRecord::TS_NOT_DEFINED)
+      bool ok = find_object(triggerPtr, impl_req->triggerId);
+      if (ok)
       {
-	jam();
-	setError(error, CreateTrigRef::TriggerExists, __LINE__);
-	return;
+        jam();
+        setError(error, CreateTrigRef::TriggerExists, __LINE__);
+        return;
       }
       D("master forced triggerId " << impl_req->triggerId);
     }
@@ -17765,14 +17941,14 @@ Dbdict::createTrigger_parse(Signal* sign
   {
     jam();
     // slave receives trigger id from master
-    if (! (impl_req->triggerId < c_triggerRecordPool.getSize()))
+    if (! (impl_req->triggerId < c_triggerRecordPool_.getSize()))
     {
       jam();
       setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
       return;
     }
-    c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
-    if (triggerPtr.p->triggerState != TriggerRecord::TS_NOT_DEFINED)
+    bool ok = find_object(triggerPtr, impl_req->triggerId);
+    if (ok)
     {
       jam();
       setError(error, CreateTrigRef::TriggerExists, __LINE__);
@@ -17781,15 +17957,20 @@ Dbdict::createTrigger_parse(Signal* sign
     D("slave allocated triggerId " << hex << impl_req->triggerId);
   }
 
-  initialiseTriggerRecord(triggerPtr);
-
-  triggerPtr.p->triggerId = impl_req->triggerId;
+  bool ok = seizeTriggerRecord(triggerPtr, impl_req->triggerId);
+  if (!ok)
+  {
+    jam();
+    setError(error, CreateTrigRef::TooManyTriggers, __LINE__);
+    return;
+  }
   triggerPtr.p->tableId = impl_req->tableId;
   triggerPtr.p->indexId = RNIL; // feedback method connects to index
   triggerPtr.p->triggerInfo = impl_req->triggerInfo;
   triggerPtr.p->receiverRef = impl_req->receiverRef;
   triggerPtr.p->triggerState = TriggerRecord::TS_DEFINING;
 
+  // TODO:msundell on failure below, leak of TriggerRecord
   if (handle.m_cnt >= 2)
   {
     jam();
@@ -17829,12 +18010,13 @@ Dbdict::createTrigger_parse(Signal* sign
   // connect to new DictObject
   {
     DictObjectPtr obj_ptr;
-    seizeDictObject(op_ptr, obj_ptr, triggerPtr.p->triggerName);
+    seizeDictObject(op_ptr, obj_ptr, triggerPtr.p->triggerName); // added to c_obj_name_hash
 
     obj_ptr.p->m_id = impl_req->triggerId; // wl3600_todo id
     obj_ptr.p->m_type =
       TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo);
-    triggerPtr.p->m_obj_ptr_i = obj_ptr.i;
+    link_object(obj_ptr, triggerPtr);
+    c_obj_id_hash.add(obj_ptr);
   }
 
   {
@@ -17875,7 +18057,8 @@ Dbdict::createTrigger_parse(Signal* sign
   if (impl_req->indexId != RNIL)
   {
     TableRecordPtr indexPtr;
-    c_tableRecordPool.getPtr(indexPtr, impl_req->indexId);
+    bool ok = find_object(indexPtr, impl_req->indexId);
+    ndbrequire(ok);
     triggerPtr.p->indexId = impl_req->indexId;
     indexPtr.p->triggerId = impl_req->triggerId;
   }
@@ -17911,7 +18094,12 @@ Dbdict::createTrigger_parse_endpoint(Sig
   }
 
   TriggerRecordPtr triggerPtr;
-  c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
+  bool ok = find_object(triggerPtr, impl_req->triggerId);
+  if (!ok)
+  {
+    jam();
+    return;
+  }
   switch(TriggerInfo::getTriggerType(triggerPtr.p->triggerInfo)){
   case TriggerType::REORG_TRIGGER:
     jam();
@@ -18226,8 +18414,8 @@ Dbdict::createTrigger_commit(Signal* sig
 
     Uint32 triggerId = impl_req->triggerId;
     TriggerRecordPtr triggerPtr;
-    c_triggerRecordPool.getPtr(triggerPtr, triggerId);
-
+    bool ok = find_object(triggerPtr, triggerId);
+    ndbrequire(ok);
     triggerPtr.p->triggerState = TriggerRecord::TS_ONLINE;
     unlinkDictObject(op_ptr);
   }
@@ -18285,26 +18473,30 @@ Dbdict::createTrigger_abortParse(Signal*
     jam();
 
     TriggerRecordPtr triggerPtr;
-    if (! (triggerId < c_triggerRecordPool.getSize()))
+    if (! (triggerId < c_triggerRecordPool_.getSize()))
     {
       jam();
       goto done;
     }
 
-    c_triggerRecordPool.getPtr(triggerPtr, triggerId);
-
-    if (triggerPtr.p->triggerState == TriggerRecord::TS_DEFINING)
+    bool ok = find_object(triggerPtr, triggerId);
+    if (ok)
     {
       jam();
-      triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
-    }
 
-    if (triggerPtr.p->indexId != RNIL)
-    {
-      TableRecordPtr indexPtr;
-      c_tableRecordPool.getPtr(indexPtr, triggerPtr.p->indexId);
-      triggerPtr.p->indexId = RNIL;
-      indexPtr.p->triggerId = RNIL;
+      if (triggerPtr.p->indexId != RNIL)
+      {
+        TableRecordPtr indexPtr;
+        bool ok = find_object(indexPtr, triggerPtr.p->indexId);
+        if (ok)
+        {
+          jam();
+          indexPtr.p->triggerId = RNIL;
+        }
+        triggerPtr.p->indexId = RNIL;
+      }
+
+      c_triggerRecordPool_.release(triggerPtr);
     }
 
     // ignore Feedback for now (referencing object will be dropped too)
@@ -18390,8 +18582,8 @@ Dbdict::send_create_trig_req(Signal* sig
   const CreateTrigImplReq* impl_req = &createTriggerPtr.p->m_request;
 
   TriggerRecordPtr triggerPtr;
-  c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
-
+  bool ok = find_object(triggerPtr, impl_req->triggerId);
+  ndbrequire(ok);
   D("send_create_trig_req");
 
   CreateTrigImplReq* req = (CreateTrigImplReq*)signal->getDataPtrSend();
@@ -18421,7 +18613,8 @@ Dbdict::send_create_trig_req(Signal* sig
     if (triggerPtr.p->indexId != RNIL)
     {
       jam();
-      c_tableRecordPool.getPtr(indexPtr, triggerPtr.p->indexId);
+      bool ok = find_object(indexPtr, triggerPtr.p->indexId);
+      ndbrequire(ok);
       if (indexPtr.p->m_upgrade_trigger_handling.m_upgrade)
       {
         jam();
@@ -18636,12 +18829,18 @@ Dbdict::dropTrigger_parse(Signal* signal
   // check trigger id from user or via name
   TriggerRecordPtr triggerPtr;
   {
-    if (!(impl_req->triggerId < c_triggerRecordPool.getSize())) {
+    if (!(impl_req->triggerId < c_triggerRecordPool_.getSize())) {
+      jam();
+      setError(error, DropTrigImplRef::TriggerNotFound, __LINE__);
+      return;
+    }
+    bool ok = find_object(triggerPtr, impl_req->triggerId);
+    if (!ok)
+    {
       jam();
       setError(error, DropTrigImplRef::TriggerNotFound, __LINE__);
       return;
     }
-    c_triggerRecordPool.getPtr(triggerPtr, impl_req->triggerId);
     // wl3600_todo state check
   }
 
@@ -18952,19 +19151,24 @@ Dbdict::dropTrigger_commit(Signal* signa
     Uint32 triggerId = dropTriggerPtr.p->m_request.triggerId;
 
     TriggerRecordPtr triggerPtr;
-    c_triggerRecordPool.getPtr(triggerPtr, triggerId);
-
+    bool ok = find_object(triggerPtr, triggerId);
+    ndbrequire(ok);
     if (triggerPtr.p->indexId != RNIL)
     {
+      jam();
       TableRecordPtr indexPtr;
-      c_tableRecordPool.getPtr(indexPtr, triggerPtr.p->indexId);
+      bool ok = find_object(indexPtr, triggerPtr.p->indexId);
+      if (ok)
+      {
+        jam();
+        indexPtr.p->triggerId = RNIL;
+      }
       triggerPtr.p->indexId = RNIL;
-      indexPtr.p->triggerId = RNIL;
     }
 
     // remove trigger
+    c_triggerRecordPool_.release(triggerPtr);
     releaseDictObject(op_ptr);
-    triggerPtr.p->triggerState = TriggerRecord::TS_NOT_DEFINED;
 
     sendTransConf(signal, op_ptr);
     return;
@@ -19139,7 +19343,8 @@ Dbdict::getIndexAttr(TableRecordPtr inde
   TableRecordPtr tablePtr;
   AttributeRecordPtr attrPtr;
 
-  c_tableRecordPool.getPtr(tablePtr, indexPtr.p->primaryTableId);
+  bool ok = find_object(tablePtr, indexPtr.p->primaryTableId);
+  ndbrequire(ok);
   AttributeRecord* iaRec = c_attributeRecordPool.getPtr(itAttr);
   {
     ConstRope tmp(c_rope_pool, iaRec->attributeName);
@@ -20403,10 +20608,14 @@ Dbdict::execBACKUP_LOCK_TAB_REQ(Signal* 
   Uint32 lock = req->m_lock_unlock;
 
   TableRecordPtr tablePtr;
-  c_tableRecordPool.getPtr(tablePtr, tableId, true);
-
+  bool ok = find_object(tablePtr, tableId);
   Uint32 err = 0;
-  if(lock == BackupLockTab::LOCK_TABLE)
+  if (!ok)
+  {
+    jam();
+    err = GetTabInfoRef::InvalidTableId;
+  }
+  else if(lock == BackupLockTab::LOCK_TABLE)
   {
     jam();
     if ((err = check_write_obj(tableId)) == 0)
@@ -29005,7 +29214,7 @@ Dbdict::check_consistency()
       tablePtr.i++) {
     if (check_read_obj(tablePtr.i,
 
-    c_tableRecordPool.getPtr(tablePtr);
+    c_tableRecordPool_.getPtr(tablePtr);
 
     switch (tablePtr.p->tabState) {
     case TableRecord::NOT_DEFINED:
@@ -29019,10 +29228,11 @@ Dbdict::check_consistency()
 
   // triggers // should be in schema file
   TriggerRecordPtr triggerPtr;
-  for (triggerPtr.i = 0;
-      triggerPtr.i < c_triggerRecordPool.getSize();
-      triggerPtr.i++) {
-    c_triggerRecordPool.getPtr(triggerPtr);
+  for (Uint32 id = 0;
+      id < c_triggerRecordPool_.getSize();
+      id++) {
+    bool ok = find_object(triggerPtr, id);
+    if (!ok) continue;
     switch (triggerPtr.p->triggerState) {
     case TriggerRecord::TS_NOT_DEFINED:
       continue;
@@ -29069,7 +29279,6 @@ void
 Dbdict::check_consistency_table(TableRecordPtr tablePtr)
 {
   D("table " << copyRope<SZ>(tablePtr.p->tableName));
-  ndbrequire(tablePtr.p->tableId == tablePtr.i);
 
   switch (tablePtr.p->tableType) {
   case DictTabInfo::SystemTable: // should just be "Table"
@@ -29111,9 +29320,8 @@ Dbdict::check_consistency_index(TableRec
   }
 
   TableRecordPtr tablePtr;
-  tablePtr.i = indexPtr.p->primaryTableId;
-  ndbrequire(tablePtr.i != RNIL);
-  c_tableRecordPool.getPtr(tablePtr);
+  bool ok = find_object(tablePtr, indexPtr.p->primaryTableId);
+  ndbrequire(ok);
   check_consistency_table(tablePtr);
 
   bool is_unique_index = false;
@@ -29131,13 +29339,10 @@ Dbdict::check_consistency_index(TableRec
   }
 
   TriggerRecordPtr triggerPtr;
-  triggerPtr.i = indexPtr.p->triggerId;
-  ndbrequire(triggerPtr.i != RNIL);
-  c_triggerRecordPool.getPtr(triggerPtr);
-
+  ok = find_object(triggerPtr, indexPtr.p->triggerId);
+  ndbrequire(ok);
   ndbrequire(triggerPtr.p->tableId == tablePtr.p->tableId);
   ndbrequire(triggerPtr.p->indexId == indexPtr.p->tableId);
-  ndbrequire(triggerPtr.p->triggerId == triggerPtr.i);
 
   check_consistency_trigger(triggerPtr);
 
@@ -29163,21 +29368,19 @@ Dbdict::check_consistency_trigger(Trigge
   {
     ndbrequire(triggerPtr.p->triggerState == TriggerRecord::TS_ONLINE);
   }
-  ndbrequire(triggerPtr.p->triggerId == triggerPtr.i);
 
   TableRecordPtr tablePtr;
-  tablePtr.i = triggerPtr.p->tableId;
-  ndbrequire(tablePtr.i != RNIL);
-  c_tableRecordPool.getPtr(tablePtr);
+  bool ok = find_object(tablePtr, triggerPtr.p->tableId);
+  ndbrequire(ok);
   check_consistency_table(tablePtr);
 
   if (triggerPtr.p->indexId != RNIL)
   {
     jam();
     TableRecordPtr indexPtr;
-    indexPtr.i = triggerPtr.p->indexId;
-    c_tableRecordPool.getPtr(indexPtr);
-    ndbrequire(check_read_obj(indexPtr.i) == 0);
+    ndbrequire(check_read_obj(triggerPtr.p->indexId) == 0);
+    bool ok = find_object(indexPtr, triggerPtr.p->indexId);
+    ndbrequire(ok);
     ndbrequire(indexPtr.p->indexState == TableRecord::IS_ONLINE);
     TriggerInfo ti;
     TriggerInfo::unpackTriggerInfo(triggerPtr.p->triggerInfo, ti);
@@ -29185,7 +29388,7 @@ Dbdict::check_consistency_trigger(Trigge
     case TriggerEvent::TE_CUSTOM:
       if (! (triggerPtr.p->triggerState == TriggerRecord::TS_FAKE_UPGRADE))
       {
-        ndbrequire(triggerPtr.i == indexPtr.p->triggerId);
+        ndbrequire(triggerPtr.p->triggerId == indexPtr.p->triggerId);
       }
       break;
     default:

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2011-11-11 13:31:19 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2011-11-19 16:53:16 +0000
@@ -437,9 +437,9 @@ public:
     Uint32 indexStatBgRequest;
   };
 
-  TableRecord_pool c_tableRecordPool;
-  RSS_AP_SNAPSHOT(c_tableRecordPool);
-  TableRecord_pool& get_pool(TableRecordPtr) { return c_tableRecordPool; }
+  TableRecord_pool c_tableRecordPool_;
+  RSS_AP_SNAPSHOT(c_tableRecordPool_);
+  TableRecord_pool& get_pool(TableRecordPtr) { return c_tableRecordPool_; }
 
   /**  Node Group and Tablespace id+version + range or list data.
     *  This is only stored temporarily in DBDICT during an ongoing
@@ -459,6 +459,7 @@ public:
    */
   struct TriggerRecord {
     TriggerRecord() {}
+    static bool isCompatible(Uint32 type) { return DictTabInfo::isTrigger(type); }
 
     /** Trigger state */
     enum TriggerState {
@@ -505,8 +506,9 @@ public:
   typedef ArrayPool<TriggerRecord> TriggerRecord_pool;
 
   Uint32 c_maxNoOfTriggers;
-  TriggerRecord_pool c_triggerRecordPool;
-  RSS_AP_SNAPSHOT(c_triggerRecordPool);
+  TriggerRecord_pool c_triggerRecordPool_;
+  TriggerRecord_pool& get_pool(TriggerRecordPtr) { return c_triggerRecordPool_;}
+  RSS_AP_SNAPSHOT(c_triggerRecordPool_);
 
   /**
    * Information for each FS connection.
@@ -771,6 +773,17 @@ public:
     return find_object(obj, object, id);
   }
 
+  bool find_object(DictObjectPtr& obj, Ptr<TriggerRecord>& object, Uint32 id)
+  {
+    if (!find_trigger_object(obj, id))
+    {
+      object.setNull();
+      return false;
+    }
+    get_pool(object).getPtr(object, obj.p->m_object_ptr_i);
+    return !object.isNull();
+  }
+
   bool find_object(DictObjectPtr& object, Uint32 id)
   {
     DictObject key;
@@ -780,6 +793,15 @@ public:
     return ok;
   }
 
+  bool find_trigger_object(DictObjectPtr& object, Uint32 id)
+  {
+    DictObject key;
+    key.m_id = id;
+    key.m_type = DictTabInfo::HashIndexTrigger; // A trigger type
+    bool ok = c_obj_id_hash.find(object, key);
+    return ok;
+  }
+
   template<typename T> bool link_object(DictObjectPtr obj, Ptr<T> object)
   {
     if (!T::isCompatible(obj.p->m_type))
@@ -3728,14 +3750,17 @@ private:
   /* ------------------------------------------------------------ */
   // Drop Table Handling
   /* ------------------------------------------------------------ */
-  void releaseTableObject(Uint32 tableId, bool removeFromHash = true);
+  void releaseTableObject(Uint32 table_ptr_i, bool removeFromHash = true);
 
   /* ------------------------------------------------------------ */
   // General Stuff
   /* ------------------------------------------------------------ */
   Uint32 getFreeObjId(bool both = false);
   Uint32 getFreeTableRecord();
+  bool seizeTableRecord(TableRecordPtr& tableRecord, Uint32& schemaFileId);
   Uint32 getFreeTriggerRecord();
+  bool seizeTriggerRecord(TriggerRecordPtr& tableRecord, Uint32 triggerId);
+  void releaseTriggerObject(Uint32 trigger_ptr_i);
   bool getNewAttributeRecord(TableRecordPtr tablePtr,
 			     AttributeRecordPtr & attrPtr);
   void packTableIntoPages(Signal* signal);
@@ -3988,10 +4013,8 @@ private:
   void initWriteSchemaRecord();
 
   void initNodeRecords();
-  void initTableRecords();
-  void initialiseTableRecord(TableRecordPtr tablePtr);
-  void initTriggerRecords();
-  void initialiseTriggerRecord(TriggerRecordPtr triggerPtr);
+  void initialiseTableRecord(TableRecordPtr tablePtr, Uint32 tableId);
+  void initialiseTriggerRecord(TriggerRecordPtr triggerPtr, Uint32 triggerId);
   void initPageRecords();
 
   Uint32 getFsConnRecord();

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-11-16 09:29:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-11-21 12:53:01 +0000
@@ -7486,7 +7486,8 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ
   const Uint32 defaultFragments = 
     c_fragments_per_node * cnoOfNodeGroups * cnoReplicas;
   const Uint32 maxFragments =
-    MAX_FRAG_PER_LQH * getLqhWorkers() * cnoOfNodeGroups * cnoReplicas;
+    MAX_FRAG_PER_LQH * (getLqhWorkers() ? getLqhWorkers() : 1) *
+    cnoOfNodeGroups * cnoReplicas;
 
   do {
     NodeGroupRecordPtr NGPtr;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2011-11-16 05:47:02 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2011-11-18 06:47:23 +0000
@@ -36,7 +36,9 @@ void Dblqh::initData() 
   clcpFileSize = ZNO_CONCURRENT_LCP;
   clfoFileSize = 0;
   clogFileFileSize = 0;
-  clogPartFileSize = 0; // Not valid until READ_CONFIG
+
+  NdbLogPartInfo lpinfo(instance());
+  clogPartFileSize = lpinfo.partCount;
 
   cpageRefFileSize = ZPAGE_REF_FILE_SIZE;
   cscanrecFileSize = 0;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-11-16 09:29:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-11-21 12:53:01 +0000
@@ -1220,36 +1220,31 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* 
     m_ctx.m_config.getOwnConfigIterator();
   ndbrequire(p != 0);
 
-  clogPartFileSize = 4;
 
-  Uint32 nodeLogParts = 4;
-  ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_PARTS,
-                            &nodeLogParts);
-  globalData.ndbLogParts = nodeLogParts;
-  ndbrequire(nodeLogParts <= NDB_MAX_LOG_PARTS);
-  {
-    NdbLogPartInfo lpinfo(instance());
-    clogPartFileSize = lpinfo.partCount; // How many are this instance responsible for...
-  }
-
-  if (globalData.ndbMtLqhWorkers > nodeLogParts)
+  /**
+   * TODO move check of log-parts vs. ndbMtLqhWorkers to better place
+   * (Configuration.cpp ??)
+   */
+  ndbrequire(globalData.ndbLogParts <= NDB_MAX_LOG_PARTS);
+  if (globalData.ndbMtLqhWorkers > globalData.ndbLogParts)
   {
     char buf[255];
     BaseString::snprintf(buf, sizeof(buf),
       "Trying to start %d LQH workers with only %d log parts, try initial"
       " node restart to be able to use more LQH workers.",
-      globalData.ndbMtLqhWorkers, nodeLogParts);
+      globalData.ndbMtLqhWorkers, globalData.ndbLogParts);
     progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
   }
-  if (nodeLogParts != 4 &&
-      nodeLogParts != 8 &&
-      nodeLogParts != 16)
+
+  if (globalData.ndbLogParts != 4 &&
+      globalData.ndbLogParts != 8 &&
+      globalData.ndbLogParts != 16)
   {
     char buf[255];
     BaseString::snprintf(buf, sizeof(buf),
       "Trying to start with %d log parts, number of log parts can"
       " only be set to 4, 8 or 16.",
-      nodeLogParts);
+      globalData.ndbLogParts);
     progError(__LINE__, NDBD_EXIT_INVALID_CONFIG, buf);
   }
 
@@ -1280,7 +1275,7 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* 
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TABLE, &ctabrecFileSize));
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_TC_CONNECT, 
 					&ctcConnectrecFileSize));
-  clogFileFileSize       = 4 * cnoLogFiles;
+  clogFileFileSize = clogPartFileSize * cnoLogFiles;
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize));
   cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN;
 
@@ -15926,6 +15921,7 @@ void Dblqh::initLogpage(Signal* signal) 
   logPagePtr.p->logPageWord[ZPOS_VERSION] = NDB_VERSION;
   logPagePtr.p->logPageWord[ZPOS_NO_LOG_FILES] = logPartPtr.p->noLogFiles;
   logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = ZPAGE_HEADER_SIZE;
+  logPagePtr.p->logPageWord[ZPOS_NO_LOG_PARTS]= globalData.ndbLogParts;
   ilpTcConnectptr.i = logPartPtr.p->firstLogTcrec;
   if (ilpTcConnectptr.i != RNIL) {
     jam();

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2011-11-16 08:17:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2011-11-18 06:47:23 +0000
@@ -1769,6 +1769,7 @@ private:
     Uint64 cabortCount;
     Uint64 c_scan_count;
     Uint64 c_range_scan_count;
+    Uint64 clocalReadCount;
 
     // Resource usage counter(not monotonic)
     Uint32 cconcurrentOp;
@@ -1783,6 +1784,7 @@ private:
      cabortCount(0),
      c_scan_count(0),
      c_range_scan_count(0),
+     clocalReadCount(0),
      cconcurrentOp(0) {}
 
     Uint32 build_event_rep(Signal* signal)
@@ -1800,6 +1802,7 @@ private:
       const Uint32 abortCount =       diff(signal, 13, cabortCount);
       const Uint32 scan_count =       diff(signal, 15, c_scan_count);
       const Uint32 range_scan_count = diff(signal, 17, c_range_scan_count);
+      const Uint32 localread_count = diff(signal, 19, clocalReadCount);
 
       signal->theData[0] = NDB_LE_TransReportCounters;
       signal->theData[1] = transCount;
@@ -1812,7 +1815,8 @@ private:
       signal->theData[8] = abortCount;
       signal->theData[9] = scan_count;
       signal->theData[10] = range_scan_count;
-      return 11;
+      signal->theData[11] = localread_count;
+      return 12;
     }
 
     Uint32 build_continueB(Signal* signal) const
@@ -1821,7 +1825,9 @@ private:
       const Uint64* vars[] = {
         &cattrinfoCount, &ctransCount, &ccommitCount,
         &creadCount, &csimpleReadCount, &cwriteCount,
-        &cabortCount, &c_scan_count, &c_range_scan_count };
+        &cabortCount, &c_scan_count, &c_range_scan_count,
+        &clocalReadCount
+      };
       const size_t num = sizeof(vars)/sizeof(vars[0]);
 
       for (size_t i = 0; i < num; i++)

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2011-11-16 08:17:17 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2011-11-18 06:47:23 +0000
@@ -3356,7 +3356,10 @@ void Dbtc::tckeyreq050Lab(Signal* signal
     jam();
     regTcPtr->lastReplicaNo = 0;
     regTcPtr->noOfNodes = 1;
-  } 
+
+    if (regTcPtr->tcNodedata[0] == getOwnNodeId())
+      c_counters.clocalReadCount++;
+  }
   else if (Toperation == ZUNLOCK)
   {
     regTcPtr->m_special_op_flags &= ~TcConnectRecord::SOF_REORG_MOVING;
@@ -13260,7 +13263,8 @@ void Dbtc::execDBINFO_SCANREQ(Signal *si
       { Ndbinfo::WRITES_COUNTER, c_counters.cwriteCount },
       { Ndbinfo::ABORTS_COUNTER, c_counters.cabortCount },
       { Ndbinfo::TABLE_SCANS_COUNTER, c_counters.c_scan_count },
-      { Ndbinfo::RANGE_SCANS_COUNTER, c_counters.c_range_scan_count }
+      { Ndbinfo::RANGE_SCANS_COUNTER, c_counters.c_range_scan_count },
+      { Ndbinfo::LOCAL_READ_COUNTER, c_counters.clocalReadCount }
     };
     const size_t num_counters = sizeof(counters) / sizeof(counters[0]);
 

=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp	2011-11-18 06:47:23 +0000
@@ -275,6 +275,22 @@ Ndbfs::execREAD_CONFIG_REQ(Signal* signa
   Uint32 noIdleFiles = 27;
 
   ndb_mgm_get_int_parameter(p, CFG_DB_INITIAL_OPEN_FILES, &noIdleFiles);
+
+  {
+    /**
+     * each logpart keeps up to 3 logfiles open at any given time...
+     *   (bound)
+     * make sure noIdleFiles is atleast 4 times #logparts
+     */
+    Uint32 logParts = NDB_DEFAULT_LOG_PARTS;
+    ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_PARTS, &logParts);
+    Uint32 logfiles = 4 * logParts;
+    if (noIdleFiles < logfiles)
+    {
+      noIdleFiles = logfiles;
+    }
+  }
+
   // Make sure at least "noIdleFiles" files can be created
   if (noIdleFiles > m_maxFiles && m_maxFiles != 0)
     m_maxFiles = noIdleFiles;

=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.cpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2011-11-19 16:53:16 +0000
@@ -2462,7 +2462,7 @@ Trix::statCleanExecute(Signal* signal, S
   ndbrequire(data.m_indexVersion == av[1]);
   data.m_sampleVersion = av[2];
   data.m_statKey = &av[3];
-  const char* kp = (const char*)data.m_statKey;
+  const unsigned char* kp = (const unsigned char*)data.m_statKey;
   const Uint32 kb = kp[0] + (kp[1] << 8);
   // key is not empty
   ndbrequire(kb != 0);
@@ -2633,8 +2633,8 @@ Trix::statScanExecute(Signal* signal, St
   ::copy(av, ptr1);
   data.m_statKey = &av[0];
   data.m_statValue = &av[kz];
-  const char* kp = (const char*)data.m_statKey;
-  const char* vp = (const char*)data.m_statValue;
+  const unsigned char* kp = (const unsigned char*)data.m_statKey;
+  const unsigned char* vp = (const unsigned char*)data.m_statValue;
   const Uint32 kb = kp[0] + (kp[1] << 8);
   const Uint32 vb = vp[0] + (vp[1] << 8);
   // key and value are not empty

=== added file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp	2011-11-16 15:38:25 +0000
@@ -0,0 +1,648 @@
+/*
+  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+  This program is free software; you can redistribute it and/or modify
+  it under the terms of the GNU General Public License as published by
+  the Free Software Foundation; version 2 of the License.
+
+  This program is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+  GNU General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "trpman.hpp"
+#include <TransporterRegistry.hpp>
+#include <signaldata/CloseComReqConf.hpp>
+#include <signaldata/DisconnectRep.hpp>
+#include <signaldata/EnableCom.hpp>
+#include <signaldata/RouteOrd.hpp>
+#include <signaldata/DumpStateOrd.hpp>
+
+Trpman::Trpman(Block_context & ctx, Uint32 instanceno) :
+  SimulatedBlock(TRPMAN, ctx, instanceno)
+{
+  BLOCK_CONSTRUCTOR(Trpman);
+
+  addRecSignal(GSN_CLOSE_COMREQ, &Trpman::execCLOSE_COMREQ);
+  addRecSignal(GSN_OPEN_COMREQ, &Trpman::execOPEN_COMREQ);
+  addRecSignal(GSN_ENABLE_COMREQ, &Trpman::execENABLE_COMREQ);
+  addRecSignal(GSN_DISCONNECT_REP, &Trpman::execDISCONNECT_REP);
+  addRecSignal(GSN_CONNECT_REP, &Trpman::execCONNECT_REP);
+  addRecSignal(GSN_ROUTE_ORD, &Trpman::execROUTE_ORD);
+
+  addRecSignal(GSN_NDB_TAMPER, &Trpman::execNDB_TAMPER, true);
+  addRecSignal(GSN_DUMP_STATE_ORD, &Trpman::execDUMP_STATE_ORD);
+  addRecSignal(GSN_DBINFO_SCANREQ, &Trpman::execDBINFO_SCANREQ);
+}
+
+Trpman::~Trpman()
+{
+}
+
+BLOCK_FUNCTIONS(Trpman)
+
+#ifdef ERROR_INSERT
+NodeBitmask c_error_9000_nodes_mask;
+extern Uint32 MAX_RECEIVED_SIGNALS;
+#endif
+
+void
+Trpman::execOPEN_COMREQ(Signal* signal)
+{
+  // Connect to the specifed NDB node, only QMGR allowed communication
+  // so far with the node
+
+  const BlockReference userRef = signal->theData[0];
+  Uint32 tStartingNode = signal->theData[1];
+  Uint32 tData2 = signal->theData[2];
+  jamEntry();
+
+  const Uint32 len = signal->getLength();
+  if (len == 2)
+  {
+#ifdef ERROR_INSERT
+    if (! ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
+	   && c_error_9000_nodes_mask.get(tStartingNode)))
+#endif
+    {
+      globalTransporterRegistry.do_connect(tStartingNode);
+      globalTransporterRegistry.setIOState(tStartingNode, HaltIO);
+
+      //-----------------------------------------------------
+      // Report that the connection to the node is opened
+      //-----------------------------------------------------
+      signal->theData[0] = NDB_LE_CommunicationOpened;
+      signal->theData[1] = tStartingNode;
+      sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+      //-----------------------------------------------------
+    }
+  }
+  else
+  {
+    for(unsigned int i = 1; i < MAX_NODES; i++ )
+    {
+      jam();
+      if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2)
+      {
+	jam();
+
+#ifdef ERROR_INSERT
+	if ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
+	    && c_error_9000_nodes_mask.get(i))
+	  continue;
+#endif
+	globalTransporterRegistry.do_connect(i);
+	globalTransporterRegistry.setIOState(i, HaltIO);
+
+	signal->theData[0] = NDB_LE_CommunicationOpened;
+	signal->theData[1] = i;
+	sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+      }
+    }
+  }
+
+  if (userRef != 0)
+  {
+    jam();
+    signal->theData[0] = tStartingNode;
+    signal->theData[1] = tData2;
+    sendSignal(userRef, GSN_OPEN_COMCONF, signal, len - 1,JBA);
+  }
+}
+
+void
+Trpman::execCONNECT_REP(Signal *signal)
+{
+  const Uint32 hostId = signal->theData[0];
+  jamEntry();
+
+  const NodeInfo::NodeType type = (NodeInfo::NodeType)getNodeInfo(hostId).m_type;
+  ndbrequire(type != NodeInfo::INVALID);
+  globalData.m_nodeInfo[hostId].m_version = 0;
+  globalData.m_nodeInfo[hostId].m_mysql_version = 0;
+
+  /**
+   * Inform QMGR that client has connected
+   */
+  signal->theData[0] = hostId;
+  if (ERROR_INSERTED(9005))
+  {
+    sendSignalWithDelay(QMGR_REF, GSN_CONNECT_REP, signal, 50, 1);
+  }
+  else
+  {
+    sendSignal(QMGR_REF, GSN_CONNECT_REP, signal, 1, JBA);
+  }
+
+  /* Automatically subscribe events for MGM nodes.
+   */
+  if (type == NodeInfo::MGM)
+  {
+    jam();
+    globalTransporterRegistry.setIOState(hostId, NoHalt);
+  }
+
+  //------------------------------------------
+  // Also report this event to the Event handler
+  //------------------------------------------
+  signal->theData[0] = NDB_LE_Connected;
+  signal->theData[1] = hostId;
+  sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+}
+
+void
+Trpman::execCLOSE_COMREQ(Signal* signal)
+{
+  // Close communication with the node and halt input/output from
+  // other blocks than QMGR
+
+  CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0];
+
+  const BlockReference userRef = closeCom->xxxBlockRef;
+  Uint32 requestType = closeCom->requestType;
+  Uint32 failNo = closeCom->failNo;
+//  Uint32 noOfNodes = closeCom->noOfNodes;
+
+  jamEntry();
+  for (unsigned i = 0; i < MAX_NODES; i++)
+  {
+    if (NodeBitmask::get(closeCom->theNodes, i))
+    {
+      jam();
+
+      //-----------------------------------------------------
+      // Report that the connection to the node is closed
+      //-----------------------------------------------------
+      signal->theData[0] = NDB_LE_CommunicationClosed;
+      signal->theData[1] = i;
+      sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+
+      globalTransporterRegistry.setIOState(i, HaltIO);
+      globalTransporterRegistry.do_disconnect(i);
+    }
+  }
+
+  if (requestType != CloseComReqConf::RT_NO_REPLY)
+  {
+    ndbassert((requestType == CloseComReqConf::RT_API_FAILURE) ||
+              ((requestType == CloseComReqConf::RT_NODE_FAILURE) &&
+               (failNo != 0)));
+    jam();
+    CloseComReqConf* closeComConf = (CloseComReqConf *)signal->getDataPtrSend();
+    closeComConf->xxxBlockRef = userRef;
+    closeComConf->requestType = requestType;
+    closeComConf->failNo = failNo;
+
+    /* Note assumption that noOfNodes and theNodes
+     * bitmap is not trampled above
+     * signals received from the remote node.
+     */
+    sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
+  }
+}
+
+void
+Trpman::execENABLE_COMREQ(Signal* signal)
+{
+  jamEntry();
+  const EnableComReq *enableComReq = (const EnableComReq *)signal->getDataPtr();
+
+  /* Need to copy out signal data to not clobber it with sendSignal(). */
+  Uint32 senderRef = enableComReq->m_senderRef;
+  Uint32 senderData = enableComReq->m_senderData;
+  Uint32 nodes[NodeBitmask::Size];
+  MEMCOPY_NO_WORDS(nodes, enableComReq->m_nodeIds, NodeBitmask::Size);
+
+  /* Enable communication with all our NDB blocks to these nodes. */
+  Uint32 search_from = 0;
+  for (;;)
+  {
+    Uint32 tStartingNode = NodeBitmask::find(nodes, search_from);
+    if (tStartingNode == NodeBitmask::NotFound)
+      break;
+    search_from = tStartingNode + 1;
+
+    globalTransporterRegistry.setIOState(tStartingNode, NoHalt);
+    setNodeInfo(tStartingNode).m_connected = true;
+
+    //-----------------------------------------------------
+    // Report that the version of the node
+    //-----------------------------------------------------
+    signal->theData[0] = NDB_LE_ConnectedApiVersion;
+    signal->theData[1] = tStartingNode;
+    signal->theData[2] = getNodeInfo(tStartingNode).m_version;
+    signal->theData[3] = getNodeInfo(tStartingNode).m_mysql_version;
+
+    sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB);
+    //-----------------------------------------------------
+  }
+
+  EnableComConf *enableComConf = (EnableComConf *)signal->getDataPtrSend();
+  enableComConf->m_senderRef = reference();
+  enableComConf->m_senderData = senderData;
+  MEMCOPY_NO_WORDS(enableComConf->m_nodeIds, nodes, NodeBitmask::Size);
+  sendSignal(senderRef, GSN_ENABLE_COMCONF, signal,
+             EnableComConf::SignalLength, JBA);
+}
+
+void
+Trpman::execDISCONNECT_REP(Signal *signal)
+{
+  const DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
+  const Uint32 hostId = rep->nodeId;
+  jamEntry();
+
+  setNodeInfo(hostId).m_connected = false;
+  setNodeInfo(hostId).m_connectCount++;
+  const NodeInfo::NodeType type = getNodeInfo(hostId).getType();
+  ndbrequire(type != NodeInfo::INVALID);
+
+  sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
+             DisconnectRep::SignalLength, JBA);
+
+  signal->theData[0] = hostId;
+  sendSignal(CMVMI_REF, GSN_CANCEL_SUBSCRIPTION_REQ, signal, 1, JBB);
+
+  signal->theData[0] = NDB_LE_Disconnected;
+  signal->theData[1] = hostId;
+  sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+}
+
+/**
+ * execROUTE_ORD
+ * Allows other blocks to route signals as if they
+ * came from TRPMAN
+ * Useful in ndbmtd for synchronising signals w.r.t
+ * external signals received from other nodes which
+ * arrive from the same thread that runs TRPMAN
+ */
+void
+Trpman::execROUTE_ORD(Signal* signal)
+{
+  jamEntry();
+  if (!assembleFragments(signal))
+  {
+    jam();
+    return;
+  }
+
+  SectionHandle handle(this, signal);
+
+  RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
+  Uint32 dstRef = ord->dstRef;
+  Uint32 srcRef = ord->srcRef;
+  Uint32 gsn = ord->gsn;
+  /* ord->cnt ignored */
+
+  Uint32 nodeId = refToNode(dstRef);
+
+  if (likely((nodeId == 0) ||
+             getNodeInfo(nodeId).m_connected))
+  {
+    jam();
+    Uint32 secCount = handle.m_cnt;
+    ndbrequire(secCount >= 1 && secCount <= 3);
+
+    jamLine(secCount);
+
+    /**
+     * Put section 0 in signal->theData
+     */
+    Uint32 sigLen = handle.m_ptr[0].sz;
+    ndbrequire(sigLen <= 25);
+    copy(signal->theData, handle.m_ptr[0]);
+
+    SegmentedSectionPtr save = handle.m_ptr[0];
+    for (Uint32 i = 0; i < secCount - 1; i++)
+      handle.m_ptr[i] = handle.m_ptr[i+1];
+    handle.m_cnt--;
+
+    sendSignal(dstRef, gsn, signal, sigLen, JBB, &handle);
+
+    handle.m_cnt = 1;
+    handle.m_ptr[0] = save;
+    releaseSections(handle);
+    return ;
+  }
+
+  releaseSections(handle);
+  warningEvent("Unable to route GSN: %d from %x to %x",
+	       gsn, srcRef, dstRef);
+}
+
+void
+Trpman::execDBINFO_SCANREQ(Signal *signal)
+{
+  DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
+  const Ndbinfo::ScanCursor* cursor =
+    CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
+  Ndbinfo::Ratelimit rl;
+
+  jamEntry();
+
+  switch(req.tableId){
+  case Ndbinfo::TRANSPORTERS_TABLEID:
+  {
+    jam();
+    Uint32 rnode = cursor->data[0];
+    if (rnode == 0)
+      rnode++; // Skip node 0
+
+    while (rnode < MAX_NODES)
+    {
+      switch(getNodeInfo(rnode).m_type)
+      {
+      default:
+      {
+        jam();
+        Ndbinfo::Row row(signal, req);
+        row.write_uint32(getOwnNodeId()); // Node id
+        row.write_uint32(rnode); // Remote node id
+        row.write_uint32(globalTransporterRegistry.getPerformState(rnode)); // State
+        ndbinfo_send_row(signal, req, row, rl);
+       break;
+      }
+
+      case NodeInfo::INVALID:
+        jam();
+       break;
+      }
+
+      rnode++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, rnode);
+        return;
+      }
+    }
+    break;
+  }
+
+  default:
+    break;
+  }
+
+  ndbinfo_send_scan_conf(signal, req, rl);
+}
+
+void
+Trpman::execNDB_TAMPER(Signal* signal)
+{
+  jamEntry();
+#ifdef ERROR_INSERT
+  if (signal->theData[0] == 9003)
+  {
+    if (MAX_RECEIVED_SIGNALS < 1024)
+    {
+      MAX_RECEIVED_SIGNALS = 1024;
+    }
+    else
+    {
+      MAX_RECEIVED_SIGNALS = 1 + (rand() % 128);
+    }
+    ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS);
+    CLEAR_ERROR_INSERT_VALUE;
+  }
+#endif
+}//execNDB_TAMPER()
+
+void
+Trpman::execDUMP_STATE_ORD(Signal* signal)
+{
+  DumpStateOrd * const & dumpState = (DumpStateOrd *)&signal->theData[0];
+  Uint32 arg = dumpState->args[0]; (void)arg;
+
+#ifdef ERROR_INSERT
+  if (arg == 9000 || arg == 9002)
+  {
+    SET_ERROR_INSERT_VALUE(arg);
+    for (Uint32 i = 1; i<signal->getLength(); i++)
+      c_error_9000_nodes_mask.set(signal->theData[i]);
+  }
+
+  if (arg == 9001)
+  {
+    CLEAR_ERROR_INSERT_VALUE;
+    if (signal->getLength() == 1 || signal->theData[1])
+    {
+      for (Uint32 i = 0; i<MAX_NODES; i++)
+      {
+	if (c_error_9000_nodes_mask.get(i))
+	{
+	  signal->theData[0] = 0;
+	  signal->theData[1] = i;
+          execOPEN_COMREQ(signal);
+	}
+      }
+    }
+    c_error_9000_nodes_mask.clear();
+  }
+
+  if (arg == 9004 && signal->getLength() == 2)
+  {
+    SET_ERROR_INSERT_VALUE(9004);
+    c_error_9000_nodes_mask.clear();
+    c_error_9000_nodes_mask.set(signal->theData[1]);
+  }
+
+  if (arg == 9005 && signal->getLength() == 2 && ERROR_INSERTED(9004))
+  {
+    Uint32 db = signal->theData[1];
+    Uint32 i = c_error_9000_nodes_mask.find(0);
+    signal->theData[0] = i;
+    sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA);
+    ndbout_c("stopping %u using %u", i, db);
+    CLEAR_ERROR_INSERT_VALUE;
+  }
+#endif
+
+#ifdef ERROR_INSERT
+  /* <Target NodeId> dump 9992 <NodeId list>
+   * On Target NodeId, block receiving signals from NodeId list
+   *
+   * <Target NodeId> dump 9993 <NodeId list>
+   * On Target NodeId, resume receiving signals from NodeId list
+   *
+   * <Target NodeId> dump 9991
+   * On Target NodeId, resume receiving signals from any blocked node
+   *
+   *
+   * See also code in QMGR for blocking receive from nodes based
+   * on HB roles.
+   *
+   */
+  if((arg == 9993) ||  /* Unblock recv from nodeid */
+     (arg == 9992))    /* Block recv from nodeid */
+  {
+    bool block = (arg == 9992);
+    for (Uint32 n = 1; n < signal->getLength(); n++)
+    {
+      Uint32 nodeId = signal->theData[n];
+
+      if ((nodeId > 0) &&
+          (nodeId < MAX_NODES))
+      {
+        if (block)
+        {
+          ndbout_c("CMVMI : Blocking receive from node %u", nodeId);
+
+          globalTransporterRegistry.blockReceive(nodeId);
+        }
+        else
+        {
+          ndbout_c("CMVMI : Unblocking receive from node %u", nodeId);
+
+          globalTransporterRegistry.unblockReceive(nodeId);
+        }
+      }
+      else
+      {
+        ndbout_c("CMVMI : Ignoring dump %u for node %u",
+                 arg, nodeId);
+      }
+    }
+  }
+  if (arg == 9990) /* Block recv from all ndbd matching pattern */
+  {
+    Uint32 pattern = 0;
+    if (signal->getLength() > 1)
+    {
+      pattern = signal->theData[1];
+      ndbout_c("CMVMI : Blocking receive from all ndbds matching pattern -%s-",
+               ((pattern == 1)? "Other side":"Unknown"));
+    }
+
+    for (Uint32 node = 1; node < MAX_NDB_NODES; node++)
+    {
+      if (globalTransporterRegistry.is_connected(node))
+      {
+        if (getNodeInfo(node).m_type == NodeInfo::DB)
+        {
+          if (!globalTransporterRegistry.isBlocked(node))
+          {
+            switch (pattern)
+            {
+            case 1:
+            {
+              /* Match if given node is on 'other side' of
+               * 2-replica cluster
+               */
+              if ((getOwnNodeId() & 1) != (node & 1))
+              {
+                /* Node is on the 'other side', match */
+                break;
+              }
+              /* Node is on 'my side', don't match */
+              continue;
+            }
+            default:
+              break;
+            }
+            ndbout_c("CMVMI : Blocking receive from node %u", node);
+            globalTransporterRegistry.blockReceive(node);
+          }
+        }
+      }
+    }
+  }
+  if (arg == 9991) /* Unblock recv from all blocked */
+  {
+    for (Uint32 node = 0; node < MAX_NODES; node++)
+    {
+      if (globalTransporterRegistry.isBlocked(node))
+      {
+        ndbout_c("CMVMI : Unblocking receive from node %u", node);
+        globalTransporterRegistry.unblockReceive(node);
+      }
+    }
+  }
+#endif
+}
+
+TrpmanProxy::TrpmanProxy(Block_context & ctx) :
+  LocalProxy(TRPMAN, ctx)
+{
+  addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
+  addRecSignal(GSN_OPEN_COMREQ, &TrpmanProxy::execOPEN_COMREQ);
+  addRecSignal(GSN_ENABLE_COMREQ, &TrpmanProxy::execENABLE_COMREQ);
+  addRecSignal(GSN_DISCONNECT_REP, &TrpmanProxy::execDISCONNECT_REP);
+  addRecSignal(GSN_CONNECT_REP, &TrpmanProxy::execCONNECT_REP);
+  addRecSignal(GSN_ROUTE_ORD, &TrpmanProxy::execROUTE_ORD);
+}
+
+TrpmanProxy::~TrpmanProxy()
+{
+}
+
+SimulatedBlock*
+TrpmanProxy::newWorker(Uint32 instanceNo)
+{
+  return new Trpman(m_ctx, instanceNo);
+}
+
+BLOCK_FUNCTIONS(TrpmanProxy);
+
+/**
+ * TODO TrpmanProxy need to have operation records
+ *      to support splicing a request onto several Trpman-instances
+ *      according to how receive-threads are assigned to instances
+ */
+void
+TrpmanProxy::execOPEN_COMREQ(Signal* signal)
+{
+  jamEntry();
+  SectionHandle handle(this, signal);
+  sendSignal(workerRef(0), GSN_OPEN_COMREQ, signal,
+             signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execCONNECT_REP(Signal *signal)
+{
+  jamEntry();
+  SectionHandle handle(this, signal);
+  sendSignal(workerRef(0), GSN_CONNECT_REP, signal,
+             signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execCLOSE_COMREQ(Signal* signal)
+{
+  jamEntry();
+  SectionHandle handle(this, signal);
+  sendSignal(workerRef(0), GSN_CLOSE_COMREQ, signal,
+             signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execENABLE_COMREQ(Signal* signal)
+{
+  jamEntry();
+  SectionHandle handle(this, signal);
+  sendSignal(workerRef(0), GSN_ENABLE_COMREQ, signal,
+             signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execDISCONNECT_REP(Signal *signal)
+{
+  jamEntry();
+  SectionHandle handle(this, signal);
+  sendSignal(workerRef(0), GSN_DISCONNECT_REP, signal,
+             signal->getLength(), JBB, &handle);
+}
+
+void
+TrpmanProxy::execROUTE_ORD(Signal* signal)
+{
+  jamEntry();
+  SectionHandle handle(this, signal);
+  sendSignal(workerRef(0), GSN_ROUTE_ORD, signal,
+             signal->getLength(), JBB, &handle);
+}

=== added file 'storage/ndb/src/kernel/blocks/trpman.hpp'
--- a/storage/ndb/src/kernel/blocks/trpman.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.hpp	2011-11-16 15:38:25 +0000
@@ -0,0 +1,66 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef TRPMAN_H
+#define TRPMAN_H
+
+#include <pc.hpp>
+#include <SimulatedBlock.hpp>
+#include <LocalProxy.hpp>
+
+class Trpman : public SimulatedBlock
+{
+public:
+  Trpman(Block_context& ctx, Uint32 instanceNumber = 0);
+  virtual ~Trpman();
+  BLOCK_DEFINES(Trpman);
+
+  void execCLOSE_COMREQ(Signal *signal);
+  void execOPEN_COMREQ(Signal *signal);
+  void execENABLE_COMREQ(Signal *signal);
+  void execDISCONNECT_REP(Signal *signal);
+  void execCONNECT_REP(Signal *signal);
+  void execROUTE_ORD(Signal* signal);
+
+  void execDBINFO_SCANREQ(Signal*);
+
+  void execNDB_TAMPER(Signal*);
+  void execDUMP_STATE_ORD(Signal*);
+protected:
+
+};
+
+class TrpmanProxy : public LocalProxy
+{
+public:
+  TrpmanProxy(Block_context& ctx);
+  virtual ~TrpmanProxy();
+  BLOCK_DEFINES(TrpmanProxy);
+
+  void execCLOSE_COMREQ(Signal *signal);
+  void execOPEN_COMREQ(Signal *signal);
+  void execENABLE_COMREQ(Signal *signal);
+  void execDISCONNECT_REP(Signal *signal);
+  void execCONNECT_REP(Signal *signal);
+  void execROUTE_ORD(Signal* signal);
+
+  void execNDB_TAMPER(Signal*);
+  void execDUMP_STATE_ORD(Signal*);
+protected:
+  virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+};
+#endif

=== modified file 'storage/ndb/src/kernel/ndbd.cpp'
--- a/storage/ndb/src/kernel/ndbd.cpp	2011-10-07 13:15:08 +0000
+++ b/storage/ndb/src/kernel/ndbd.cpp	2011-11-16 11:05:46 +0000
@@ -161,9 +161,13 @@ init_global_memory_manager(EmulatorData 
     ed.m_mem_manager->set_resource_limit(rl);
   }
 
-  Uint32 maxopen = 4 * 4; // 4 redo parts, max 4 files per part
+  Uint32 logParts = NDB_DEFAULT_LOG_PARTS;
+  ndb_mgm_get_int_parameter(p, CFG_DB_NO_REDOLOG_PARTS, &logParts);
+
+  Uint32 maxopen = logParts * 4; // 4 redo parts, max 4 files per part
   Uint32 filebuffer = NDB_FILE_BUFFER_SIZE;
   Uint32 filepages = (filebuffer / GLOBAL_PAGE_SIZE) * maxopen;
+  globalData.ndbLogParts = logParts;
 
   {
     /**

=== modified file 'storage/ndb/src/kernel/vm/Ndbinfo.hpp'
--- a/storage/ndb/src/kernel/vm/Ndbinfo.hpp	2011-10-20 19:52:11 +0000
+++ b/storage/ndb/src/kernel/vm/Ndbinfo.hpp	2011-11-18 06:47:23 +0000
@@ -194,7 +194,8 @@ public:
     SPJ_SCAN_BATCHES_RETURNED_COUNTER = 20,
     SPJ_SCAN_ROWS_RETURNED_COUNTER = 21,
     SPJ_PRUNED_RANGE_SCANS_RECEIVED_COUNTER = 22,
-    SPJ_CONST_PRUNED_RANGE_SCANS_RECEIVED_COUNTER = 23
+    SPJ_CONST_PRUNED_RANGE_SCANS_RECEIVED_COUNTER = 23,
+    LOCAL_READ_COUNTER = 24
   };
 
   struct counter_entry {

=== modified file 'storage/ndb/tools/ndbinfo_sql.cpp'
--- a/storage/ndb/tools/ndbinfo_sql.cpp	2011-11-16 08:17:17 +0000
+++ b/storage/ndb/tools/ndbinfo_sql.cpp	2011-11-18 06:47:23 +0000
@@ -134,6 +134,7 @@ struct view {
     "  WHEN 21 THEN \"SCAN_ROWS_RETURNED\""
     "  WHEN 22 THEN \"PRUNED_RANGE_SCANS_RECEIVED\""
     "  WHEN 23 THEN \"CONST_PRUNED_RANGE_SCANS_RECEIVED\""
+    "  WHEN 24 THEN \"LOCAL_READS\""
     "  ELSE \"<unknown>\" "
     " END AS counter_name, "
     "val "

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-trunk-cluster branch (magnus.blaudd:3414 to 3415) magnus.blaudd21 Nov