List:Commits« Previous MessageNext Message »
From:magnus.blaudd Date:November 21 2011 8:19am
Subject:bzr push into mysql-5.5-cluster branch (magnus.blaudd:3654 to 3657)
View as plain text  
 3657 magnus.blaudd@stripped	2011-11-21 [merge]
      Merge 7.1 -> 5.5-cluster

 3656 magnus.blaudd@stripped	2011-11-21 [merge]
      Merge schema dist to 7.2

    modified:
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster_binlog.cc
      sql/ndb_event_data.cc
      sql/ndb_event_data.h
      sql/ndb_schema_dist.cc
      sql/ndb_schema_dist.h
      sql/ndb_share.cc
      sql/ndb_share.h
 3655 magnus.blaudd@stripped	2011-11-21 [merge]
      Merge

    modified:
      mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result
      mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test
      sql/log_event.cc
      sql/mysqld.h
      sql/sys_vars.cc
 3654 jonas oreland	2011-11-19 [merge]
      ndb - merge 71 to 72

    modified:
      mysql-test/suite/ndb/r/ndb_index_stat.result
      mysql-test/suite/ndb/t/ndb_index_stat.test
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
      storage/ndb/src/kernel/blocks/trix/Trix.cpp
=== modified file 'mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result'
--- a/mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result	2011-11-15 14:25:58 +0000
+++ b/mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result	2011-11-18 09:46:57 +0000
@@ -3,22 +3,10 @@ include/master-slave.inc
 show variables like 'slave_allow_batching';
 Variable_name	Value
 slave_allow_batching	OFF
-Show that slave_allow_batching cannot be changed while slave is running
-set global slave_allow_batching=ON;
-ERROR HY000: This operation cannot be performed with a running slave; run STOP SLAVE first
-show warnings;
-Level	Code	Message
-Error	1198	This operation cannot be performed with a running slave; run STOP SLAVE first
-show variables like 'slave_allow_batching';
-Variable_name	Value
-slave_allow_batching	OFF
-Now stop slave and change it
-stop slave;
 set global slave_allow_batching=ON;
 show variables like 'slave_allow_batching';
 Variable_name	Value
 slave_allow_batching	ON
-start slave;
 Now the normal test
 CREATE TABLE t1 (C1 CHAR(1), C2 CHAR(1), INDEX (C1)) ENGINE = 'INNODB'  ;
 SELECT * FROM t1;

=== modified file 'mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test'
--- a/mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test	2011-11-15 14:25:58 +0000
+++ b/mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test	2011-11-18 09:46:57 +0000
@@ -4,18 +4,9 @@
 --connection slave
 show variables like 'slave_allow_batching';
 
---echo Show that slave_allow_batching cannot be changed while slave is running
---error ER_SLAVE_MUST_STOP
 set global slave_allow_batching=ON;
-show warnings;
 show variables like 'slave_allow_batching';
 
---echo Now stop slave and change it
-stop slave;
-set global slave_allow_batching=ON;
-show variables like 'slave_allow_batching';
-start slave;
-
 --echo Now the normal test
 --connection master
 

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-11-16 08:17:17 +0000
+++ b/sql/ha_ndbcluster.cc	2011-11-21 07:53:04 +0000
@@ -11007,6 +11007,7 @@ static
 void
 delete_table_drop_share(NDB_SHARE* share, const char * path)
 {
+  DBUG_ENTER("delete_table_drop_share");
   if (share)
   {
     pthread_mutex_lock(&ndbcluster_mutex);
@@ -11038,6 +11039,7 @@ do_drop:
     }
     pthread_mutex_unlock(&ndbcluster_mutex);
   }
+  DBUG_VOID_RETURN;
 }
 
 /* static version which does not need a handler */
@@ -11048,7 +11050,7 @@ ha_ndbcluster::drop_table_impl(THD *thd,
                                const char *db,
                                const char *table_name)
 {
-  DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
+  DBUG_ENTER("ha_ndbcluster::drop_table_impl");
   NDBDICT *dict= ndb->getDictionary();
   int ndb_table_id= 0;
   int ndb_table_version= 0;
@@ -11185,8 +11187,7 @@ int ha_ndbcluster::delete_table(const ch
 {
   THD *thd= current_thd;
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  Ndb *ndb;
-  int error= 0;
+
   DBUG_ENTER("ha_ndbcluster::delete_table");
   DBUG_PRINT("enter", ("name: %s", name));
 
@@ -11198,6 +11199,7 @@ int ha_ndbcluster::delete_table(const ch
       dropped inside ndb.
       Just drop local files.
     */
+    DBUG_PRINT("info", ("Table is already dropped in NDB"));
     delete_table_drop_share(0, name);
     DBUG_RETURN(handler::delete_table(name));
   }
@@ -11213,16 +11215,12 @@ int ha_ndbcluster::delete_table(const ch
 
   if (check_ndb_connection(thd))
   {
-    error= HA_ERR_NO_CONNECTION;
-    goto err;
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
 
-  ndb= thd_ndb->ndb;
-
   if (!thd_ndb->has_required_global_schema_lock("ha_ndbcluster::delete_table"))
   {
-    error= HA_ERR_NO_CONNECTION;
-    goto err;
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
   }
 
   /*
@@ -11230,6 +11228,8 @@ int ha_ndbcluster::delete_table(const ch
     If it was already gone it might have been dropped
     remotely, give a warning and then drop .ndb file.
    */
+  int error;
+  Ndb* ndb= thd_ndb->ndb;
   if (!(error= drop_table_impl(thd, this, ndb, name,
                                m_dbname, m_tabname)) ||
       error == HA_ERR_NO_SUCH_TABLE)
@@ -11240,7 +11240,6 @@ int ha_ndbcluster::delete_table(const ch
       error= error1;
   }
 
-err:
   DBUG_RETURN(error);
 }
 
@@ -13380,38 +13379,15 @@ static uchar *ndbcluster_get_key(NDB_SHA
 
 #ifndef DBUG_OFF
 
-static void print_share(const char* where, NDB_SHARE* share)
-{
-  fprintf(DBUG_FILE,
-          "%s %s.%s: use_count: %u, commit_count: %lu\n",
-          where, share->db, share->table_name, share->use_count,
-          (ulong) share->commit_count);
-  fprintf(DBUG_FILE,
-          "  - key: %s, key_length: %d\n",
-          share->key, share->key_length);
-
-  Ndb_event_data *event_data= 0;
-  if (share->event_data)
-    event_data= share->event_data;
-  else if (share->op)
-    event_data= (Ndb_event_data *) share->op->getCustomData();
-  if (event_data)
-  {
-    fprintf(DBUG_FILE,
-            "  - event_data->shadow_table: %p %s.%s\n",
-            event_data->shadow_table, event_data->shadow_table->s->db.str,
-            event_data->shadow_table->s->table_name.str);
-  }
-}
-
-
 static void print_ndbcluster_open_tables()
 {
   DBUG_LOCK_FILE;
   fprintf(DBUG_FILE, ">ndbcluster_open_tables\n");
   for (uint i= 0; i < ndbcluster_open_tables.records; i++)
-    print_share("",
-                (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables, i));
+  {
+    NDB_SHARE* share= (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables, i);
+    share->print("", DBUG_FILE);
+  }
   fprintf(DBUG_FILE, "<ndbcluster_open_tables\n");
   DBUG_UNLOCK_FILE;
 }
@@ -13426,7 +13402,7 @@ static void print_ndbcluster_open_tables
 #define dbug_print_share(t, s)                  \
   DBUG_LOCK_FILE;                               \
   DBUG_EXECUTE("info",                          \
-               print_share((t), (s)););         \
+               (s)->print((t), DBUG_FILE););    \
   DBUG_UNLOCK_FILE;
 
 
@@ -13573,6 +13549,7 @@ int ndbcluster_undo_rename_share(THD *th
   return 0;
 }
 
+
 int ndbcluster_rename_share(THD *thd, NDB_SHARE *share)
 {
   NDB_SHARE *tmp;
@@ -13621,11 +13598,7 @@ int ndbcluster_rename_share(THD *thd, ND
   ha_ndbcluster::set_tabname(share->new_key, share->table_name);
 
   dbug_print_share("ndbcluster_rename_share:", share);
-  Ndb_event_data *event_data= 0;
-  if (share->event_data)
-    event_data= share->event_data;
-  else if (share->op)
-    event_data= (Ndb_event_data *) share->op->getCustomData();
+  Ndb_event_data *event_data= share->get_event_data_ptr();
   if (event_data && event_data->shadow_table)
   {
     if (!IS_TMP_PREFIX(share->table_name))
@@ -13674,6 +13647,75 @@ NDB_SHARE *ndbcluster_get_share(NDB_SHAR
 }
 
 
+
+NDB_SHARE*
+NDB_SHARE::create(const char* key, size_t key_length,
+                  TABLE* table, const char* db_name, const char* table_name)
+{
+  NDB_SHARE* share;
+  if (!(share= (NDB_SHARE*) my_malloc(sizeof(*share),
+                                      MYF(MY_WME | MY_ZEROFILL))))
+    return NULL;
+
+  MEM_ROOT **root_ptr=
+    my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
+  MEM_ROOT *old_root= *root_ptr;
+
+  init_sql_alloc(&share->mem_root, 1024, 0);
+  *root_ptr= &share->mem_root; // remember to reset before return
+  share->flags= 0;
+  share->state= NSS_INITIAL;
+  /* Allocate enough space for key, db, and table_name */
+  share->key= (char*) alloc_root(*root_ptr, 2 * (key_length + 1));
+  share->key_length= key_length;
+  strmov(share->key, key);
+  share->db= share->key + key_length + 1;
+  strmov(share->db, db_name);
+  share->table_name= share->db + strlen(share->db) + 1;
+  strmov(share->table_name, table_name);
+  thr_lock_init(&share->lock);
+  pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
+  share->commit_count= 0;
+  share->commit_count_lock= 0;
+
+#ifdef HAVE_NDB_BINLOG
+  share->m_cfn_share= NULL;
+#endif
+
+  share->op= 0;
+  share->new_op= 0;
+  share->event_data= 0;
+
+  {
+    // Create array of bitmap for keeping track of subscribed nodes
+    // NOTE! Only the NDB_SHARE for ndb_schema really needs this
+    int no_nodes= g_ndb_cluster_connection->no_db_nodes();
+    share->subscriber_bitmap= (MY_BITMAP*)
+      alloc_root(&share->mem_root, no_nodes * sizeof(MY_BITMAP));
+    for (int i= 0; i < no_nodes; i++)
+    {
+      bitmap_init(&share->subscriber_bitmap[i],
+                  (Uint32*)alloc_root(&share->mem_root, max_ndb_nodes/8),
+                  max_ndb_nodes, FALSE);
+      bitmap_clear_all(&share->subscriber_bitmap[i]);
+    }
+  }
+
+  if (ndbcluster_binlog_init_share(current_thd, share, table))
+  {
+    DBUG_PRINT("error", ("get_share: %s could not init share", key));
+    free_root(&share->mem_root, MYF(0));
+    my_free(share, 0);
+    *root_ptr= old_root;
+    return NULL;
+  }
+
+  *root_ptr= old_root;
+
+  return share;
+}
+
+
 static inline
 NDB_SHARE *ndbcluster_get_share(const char *key, TABLE *table,
                                 bool create_if_not_exists)
@@ -13694,50 +13736,30 @@ NDB_SHARE *ndbcluster_get_share(const ch
       DBUG_PRINT("error", ("get_share: %s does not exist", key));
       DBUG_RETURN(0);
     }
-    if ((share= (NDB_SHARE*) my_malloc(sizeof(*share),
-                                       MYF(MY_WME | MY_ZEROFILL))))
-    {
-      MEM_ROOT **root_ptr=
-        my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
-      MEM_ROOT *old_root= *root_ptr;
-      init_sql_alloc(&share->mem_root, 1024, 0);
-      *root_ptr= &share->mem_root; // remember to reset before return
-      share->flags= 0;
-      share->state= NSS_INITIAL;
-      /* enough space for key, db, and table_name */
-      share->key= (char*) alloc_root(*root_ptr, 2 * (length + 1));
-      share->key_length= length;
-      strmov(share->key, key);
-      if (my_hash_insert(&ndbcluster_open_tables, (uchar*) share))
-      {
-        free_root(&share->mem_root, MYF(0));
-        my_free((uchar*) share, 0);
-        *root_ptr= old_root;
-        DBUG_RETURN(0);
-      }
-      thr_lock_init(&share->lock);
-      pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
-      share->commit_count= 0;
-      share->commit_count_lock= 0;
-      share->db= share->key + length + 1;
-      ha_ndbcluster::set_dbname(key, share->db);
-      share->table_name= share->db + strlen(share->db) + 1;
-      ha_ndbcluster::set_tabname(key, share->table_name);
-      if (ndbcluster_binlog_init_share(current_thd, share, table))
-      {
-        DBUG_PRINT("error", ("get_share: %s could not init share", key));
-        ndbcluster_real_free_share(&share);
-        *root_ptr= old_root;
-        DBUG_RETURN(0);
-      }
-      *root_ptr= old_root;
-    }
-    else
+
+    /*
+      Extract db and table name from key (to avoid that NDB_SHARE
+      dependens on ha_ndbcluster)
+    */
+    char db_name_buf[FN_HEADLEN];
+    char table_name_buf[FN_HEADLEN];
+    ha_ndbcluster::set_dbname(key, db_name_buf);
+    ha_ndbcluster::set_tabname(key, table_name_buf);
+
+    if (!(share= NDB_SHARE::create(key, length, table,
+                                   db_name_buf, table_name_buf)))
     {
       DBUG_PRINT("error", ("get_share: failed to alloc share"));
       my_error(ER_OUTOFMEMORY, MYF(0), static_cast<int>(sizeof(*share)));
       DBUG_RETURN(0);
     }
+
+    // Insert the new share in list of open shares
+    if (my_hash_insert(&ndbcluster_open_tables, (uchar*) share))
+    {
+      NDB_SHARE::destroy(share);
+      DBUG_RETURN(0);
+    }
   }
   share->use_count++;
   if (opt_ndb_extra_logging > 9)

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-11-10 20:35:28 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-11-21 08:15:07 +0000
@@ -342,6 +342,13 @@ ndb_binlog_open_shadow_table(THD *thd, N
   /* We can't use 'use_all_columns()' as the file object is not setup yet */
   shadow_table->column_bitmaps_set_no_signal(&shadow_table->s->all_set,
                                              &shadow_table->s->all_set);
+
+  if (shadow_table->s->primary_key == MAX_KEY)
+   share->flags|= NSF_HIDDEN_PK;
+
+  if (shadow_table->s->blob_fields != 0)
+    share->flags|= NSF_BLOB_FLAG;
+
 #ifndef DBUG_OFF
   dbug_print_table("table", shadow_table);
 #endif
@@ -355,48 +362,9 @@ ndb_binlog_open_shadow_table(THD *thd, N
 */
 int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *_table)
 {
-  MEM_ROOT *mem_root= &share->mem_root;
-  int do_event_op= ndb_binlog_running;
-  int error= 0;
   DBUG_ENTER("ndbcluster_binlog_init_share");
 
-#ifdef HAVE_NDB_BINLOG
-  share->m_cfn_share= NULL;
-#endif
-
-  share->op= 0;
-  share->new_op= 0;
-  share->event_data= 0;
-
-  if (!ndb_schema_share &&
-      strcmp(share->db, NDB_REP_DB) == 0 &&
-      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
-    do_event_op= 1;
-  else if (!ndb_apply_status_share &&
-           strcmp(share->db, NDB_REP_DB) == 0 &&
-           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
-    do_event_op= 1;
-
-  if (Ndb_dist_priv_util::is_distributed_priv_table(share->db,
-                                                    share->table_name))
-  {
-    do_event_op= 0;
-  }
-
-  {
-    int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
-    share->subscriber_bitmap= (MY_BITMAP*)
-      alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
-    for (i= 0; i < no_nodes; i++)
-    {
-      bitmap_init(&share->subscriber_bitmap[i],
-                  (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
-                  max_ndb_nodes, FALSE);
-      bitmap_clear_all(&share->subscriber_bitmap[i]);
-    }
-  }
-
-  if (!do_event_op)
+  if (!share->need_events(ndb_binlog_running))
   {
     if (_table)
     {
@@ -409,19 +377,10 @@ int ndbcluster_binlog_init_share(THD *th
     {
       share->flags|= NSF_NO_BINLOG;
     }
-    DBUG_RETURN(error);
-  }
-  while (1) 
-  {
-    if ((error= ndb_binlog_open_shadow_table(thd, share)))
-      break;
-    if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
-      share->flags|= NSF_HIDDEN_PK;
-    if (share->event_data->shadow_table->s->blob_fields != 0)
-      share->flags|= NSF_BLOB_FLAG;
-    break;
+    DBUG_RETURN(0);
   }
-  DBUG_RETURN(error);
+
+  DBUG_RETURN(ndb_binlog_open_shadow_table(thd, share));
 }
 
 /*****************************************************************
@@ -2011,19 +1970,21 @@ ndb_handle_schema_change(THD *thd, Ndb *
                          const Ndb_event_data *event_data)
 {
   DBUG_ENTER("ndb_handle_schema_change");
-  NDB_SHARE *share= event_data->share;
-  TABLE *shadow_table= event_data->shadow_table;
-  const char *tabname= shadow_table->s->table_name.str;
-  const char *dbname= shadow_table->s->db.str;
-  bool do_close_cached_tables= FALSE;
-  bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
 
   if (pOp->getEventType() == NDBEVENT::TE_ALTER)
   {
+    DBUG_PRINT("exit", ("Event type is TE_ALTER"));
     DBUG_RETURN(0);
   }
+
+  DBUG_ASSERT(event_data);
   DBUG_ASSERT(pOp->getEventType() == NDBEVENT::TE_DROP ||
               pOp->getEventType() == NDBEVENT::TE_CLUSTER_FAILURE);
+
+  NDB_SHARE *share= event_data->share;
+  TABLE *shadow_table= event_data->shadow_table;
+  const char *tabname= shadow_table->s->table_name.str;
+  const char *dbname= shadow_table->s->db.str;
   {
     Thd_ndb *thd_ndb= get_thd_ndb(thd);
     Ndb *ndb= thd_ndb->ndb;
@@ -2049,10 +2010,10 @@ ndb_handle_schema_change(THD *thd, Ndb *
   {
     share->op= 0;
   }
-  // either just us or drop table handling as well
-      
-  /* Signal ha_ndbcluster::delete/rename_table that drop is done */
   pthread_mutex_unlock(&share->mutex);
+
+  /* Signal ha_ndbcluster::delete/rename_table that drop is done */
+  DBUG_PRINT("info", ("signal that drop is done"));
   (void) pthread_cond_signal(&injector_cond);
 
   pthread_mutex_lock(&ndbcluster_mutex);
@@ -2060,6 +2021,9 @@ ndb_handle_schema_change(THD *thd, Ndb *
   DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                            share->key, share->use_count));
   free_share(&share, TRUE);
+
+  bool do_close_cached_tables= FALSE;
+  bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
   if (is_remote_change && share && share->state != NSS_DROPPED)
   {
     DBUG_PRINT("info", ("remote change"));
@@ -2083,15 +2047,13 @@ ndb_handle_schema_change(THD *thd, Ndb *
     share= 0;
   pthread_mutex_unlock(&ndbcluster_mutex);
 
-  if (event_data)
-  {
-    delete event_data;
-    pOp->setCustomData(NULL);
-  }
+  DBUG_PRINT("info", ("Deleting event_data"));
+  delete event_data;
+  pOp->setCustomData(NULL);
 
+  DBUG_PRINT("info", ("Dropping event operation"));
   pthread_mutex_lock(&injector_mutex);
   is_ndb->dropEventOperation(pOp);
-  pOp= 0;
   pthread_mutex_unlock(&injector_mutex);
 
   if (do_close_cached_tables)
@@ -2559,18 +2521,21 @@ class Ndb_schema_event_handler {
   void
   ndbapi_invalidate_table(const char* db_name, const char* table_name) const
   {
+    DBUG_ENTER("ndbapi_invalidate_table");
     Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
     Ndb *ndb= thd_ndb->ndb;
 
     ndb->setDatabaseName(db_name);
     Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
     ndbtab_g.invalidate();
+    DBUG_VOID_RETURN;
   }
 
 
   void
   mysqld_close_cached_table(const char* db_name, const char* table_name) const
   {
+    DBUG_ENTER("mysqld_close_cached_table");
      // Just mark table as "need reopen"
     const bool wait_for_refresh = false;
     // Not waiting -> no timeout needed
@@ -2583,11 +2548,57 @@ class Ndb_schema_event_handler {
 
     close_cached_tables(m_thd, &table_list,
                         wait_for_refresh, timeout);
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  mysqld_write_frm_from_ndb(const char* db_name,
+                            const char* table_name) const
+  {
+    DBUG_ENTER("mysqld_write_frm_from_ndb");
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Ndb *ndb= thd_ndb->ndb;
+    Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
+    const NDBTAB *ndbtab= ndbtab_g.get_table();
+
+    char key[FN_REFLEN];
+    build_table_filename(key, sizeof(key)-1,
+                         db_name, table_name, NullS, 0);
+
+    uchar *data= 0, *pack_data= 0;
+    size_t length, pack_length;
+
+    if (readfrm(key, &data, &length) == 0 &&
+        packfrm(data, length, &pack_data, &pack_length) == 0 &&
+        cmp_frm(ndbtab, pack_data, pack_length))
+    {
+      DBUG_PRINT("info", ("Detected frm change of table %s.%s",
+                          db_name, table_name));
+
+      DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
+                        ndbtab->getFrmLength());
+      my_free(data);
+      data= NULL;
+
+      int error;
+      if ((error= unpackfrm(&data, &length,
+                            (const uchar*) ndbtab->getFrmData())) ||
+          (error= writefrm(key, data, length)))
+      {
+        sql_print_error("NDB: Failed write frm for %s.%s, error %d",
+                        db_name, table_name, error);
+      }
+    }
+    my_free(data);
+    my_free(pack_data);
+    DBUG_VOID_RETURN;
   }
 
 
   NDB_SHARE* get_share(Ndb_schema_op* schema) const
   {
+    DBUG_ENTER("get_share(Ndb_schema_op*)");
     char key[FN_REFLEN + 1];
     build_table_filename(key, sizeof(key) - 1,
                          schema->db, schema->name, "", 0);
@@ -2597,7 +2608,7 @@ class Ndb_schema_event_handler {
       DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                                share->key, share->use_count));
     }
-    return share;
+    DBUG_RETURN(share);
   }
 
 
@@ -2630,18 +2641,17 @@ class Ndb_schema_event_handler {
   }
 
 
+  bool is_local_table(const char* db_name, const char* table_name) const
+  {
+    return ndbcluster_check_if_local_table(db_name, table_name);
+  }
+
+
   void handle_clear_slock(Ndb_schema_op* schema)
   {
-    if (!is_post_epoch())
-    {
-      /*
-        handle slock after epoch is completed to ensure that
-        schema events get inserted in the binlog after any data
-        events
-      */
-      log_after_epoch(schema);
-      return;
-    }
+    DBUG_ENTER("handle_clear_slock");
+
+    assert(is_post_epoch());
 
     char key[FN_REFLEN + 1];
     build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
@@ -2654,7 +2664,7 @@ class Ndb_schema_event_handler {
       if (opt_ndb_extra_logging > 19)
         sql_print_information("NDB: Discarding event...no obj: %s (%u/%u)",
                               key, schema->id, schema->version);
-      return;
+      DBUG_VOID_RETURN;
     }
 
     if (ndb_schema_object->table_id != schema->id ||
@@ -2670,7 +2680,7 @@ class Ndb_schema_event_handler {
                               schema->id,
                               schema->version);
       ndb_free_schema_object(&ndb_schema_object);
-      return;
+      DBUG_VOID_RETURN;
     }
 
     /*
@@ -2699,17 +2709,20 @@ class Ndb_schema_event_handler {
 
     /* Wake up the waiter */
     pthread_cond_signal(&injector_cond);
-    return;
+
+    DBUG_VOID_RETURN;
   }
 
 
   void
   handle_offline_alter_table_commit(Ndb_schema_op* schema)
   {
+    DBUG_ENTER("handle_offline_alter_table_commit");
+
     assert(is_post_epoch()); // Always after epoch
 
     if (schema->node_id == own_nodeid())
-      return;
+      DBUG_VOID_RETURN;
 
     write_schema_op_to_binlog(m_thd, schema);
     ndbapi_invalidate_table(schema->db, schema->name);
@@ -2749,7 +2762,7 @@ class Ndb_schema_event_handler {
       free_share(&share);
     }
 
-    if (ndbcluster_check_if_local_table(schema->db, schema->name) &&
+    if (is_local_table(schema->db, schema->name) &&
        !Ndb_dist_priv_util::is_distributed_priv_table(schema->db,
                                                       schema->name))
     {
@@ -2757,13 +2770,14 @@ class Ndb_schema_event_handler {
                       "from binlog schema event '%s' from node %d.",
                       schema->db, schema->name, schema->query,
                       schema->node_id);
-      return;
+      DBUG_VOID_RETURN;
     }
 
     if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
     {
       print_could_not_discover_error(m_thd, schema);
     }
+    DBUG_VOID_RETURN;
   }
 
 
@@ -2775,96 +2789,63 @@ class Ndb_schema_event_handler {
     ndbapi_invalidate_table(schema->db, schema->name);
     mysqld_close_cached_table(schema->db, schema->name);
 
-    int error= 0;
-    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
-    Ndb *ndb= thd_ndb->ndb;
-    Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
-    const NDBTAB *ndbtab= ndbtab_g.get_table();
     if (schema->node_id != own_nodeid())
     {
-      char key[FN_REFLEN];
-      uchar *data= 0, *pack_data= 0;
-      size_t length, pack_length;
-
-      DBUG_PRINT("info", ("Detected frm change of table %s.%s",
-                          schema->db, schema->name));
       write_schema_op_to_binlog(m_thd, schema);
-      build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0);
-      /*
-        If the there is no local table shadowing the altered table and
-        it has an frm that is different than the one on disk then
-        overwrite it with the new table definition
-      */
-      if (!ndbcluster_check_if_local_table(schema->db, schema->name) &&
-          readfrm(key, &data, &length) == 0 &&
-          packfrm(data, length, &pack_data, &pack_length) == 0 &&
-          cmp_frm(ndbtab, pack_data, pack_length))
-      {
-        DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
-                  ndbtab->getFrmLength());
-        my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
-        data= NULL;
-        if ((error= unpackfrm(&data, &length,
-                              (const uchar*) ndbtab->getFrmData())) ||
-            (error= writefrm(key, data, length)))
-        {
-          sql_print_error("NDB: Failed write frm for %s.%s, error %d",
-                          schema->db, schema->name, error);
-        }
+      if (!is_local_table(schema->db, schema->name))
+      {
+        mysqld_write_frm_from_ndb(schema->db, schema->name);
       }
-      my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
-      my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
     }
     NDB_SHARE *share= get_share(schema);
     if (share)
     {
       if (opt_ndb_extra_logging > 9)
-        sql_print_information("NDB Binlog: handeling online alter/rename");
+        sql_print_information("NDB Binlog: handling online alter/rename");
 
       pthread_mutex_lock(&share->mutex);
       ndb_binlog_close_shadow_table(share);
 
-      if ((error= ndb_binlog_open_shadow_table(m_thd, share)))
+      if (ndb_binlog_open_shadow_table(m_thd, share))
+      {
         sql_print_error("NDB Binlog: Failed to re-open shadow table %s.%s",
                         schema->db, schema->name);
-      if (error)
         pthread_mutex_unlock(&share->mutex);
-    }
-    if (!error && share)
-    {
-      if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
-        share->flags|= NSF_HIDDEN_PK;
-      /*
-        Refresh share->flags to handle added BLOB columns
-      */
-      if (share->event_data->shadow_table->s->blob_fields != 0)
-        share->flags|= NSF_BLOB_FLAG;
-
-      /*
-        Start subscribing to data changes to the new table definition
-      */
-      String event_name(INJECTOR_EVENT_LEN);
-      ndb_rep_event_name(&event_name, schema->db, schema->name,
-                         get_binlog_full(share));
-      NdbEventOperation *tmp_op= share->op;
-      share->new_op= 0;
-      share->op= 0;
-
-      if (ndbcluster_create_event_ops(m_thd, share, ndbtab, event_name.c_ptr()))
-      {
-        sql_print_error("NDB Binlog:"
-                        "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
-                        event_name.c_ptr());
       }
       else
       {
-        share->new_op= share->op;
-      }
-      share->op= tmp_op;
-      pthread_mutex_unlock(&share->mutex);
+        /*
+          Start subscribing to data changes to the new table definition
+        */
+        String event_name(INJECTOR_EVENT_LEN);
+        ndb_rep_event_name(&event_name, schema->db, schema->name,
+                           get_binlog_full(share));
+        NdbEventOperation *tmp_op= share->op;
+        share->new_op= 0;
+        share->op= 0;
 
-      if (opt_ndb_extra_logging > 9)
-        sql_print_information("NDB Binlog: handeling online alter/rename done");
+        Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+        Ndb *ndb= thd_ndb->ndb;
+        Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
+        const NDBTAB *ndbtab= ndbtab_g.get_table();
+        if (ndbcluster_create_event_ops(m_thd, share, ndbtab,
+                                        event_name.c_ptr()))
+        {
+          sql_print_error("NDB Binlog:"
+                          "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
+                          event_name.c_ptr());
+        }
+        else
+        {
+          share->new_op= share->op;
+        }
+        share->op= tmp_op;
+        pthread_mutex_unlock(&share->mutex);
+
+        if (opt_ndb_extra_logging > 9)
+          sql_print_information("NDB Binlog: handling online "
+                                "alter/rename done");
+      }
     }
     if (share)
     {
@@ -2904,6 +2885,314 @@ class Ndb_schema_event_handler {
   }
 
 
+  void
+  handle_drop_table(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_drop_table");
+
+    assert(is_post_epoch()); // Always after epoch
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    if (is_local_table(schema->db, schema->name))
+    {
+      /* Tables exists as a local table, print error and leave it */
+      sql_print_error("NDB Binlog: Skipping dropping locally "
+                      "defined table '%s.%s' from binlog schema "
+                      "event '%s' from node %d. ",
+                      schema->db, schema->name, schema->query,
+                      schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[2]=
+      {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    NDB_SHARE *share= get_share(schema);
+    // invalidation already handled by binlog thread
+    if (!share || !share->op)
+    {
+      ndbapi_invalidate_table(schema->db, schema->name);
+      mysqld_close_cached_table(schema->db, schema->name);
+    }
+    if (share)
+      free_share(&share);
+
+    ndbapi_invalidate_table(schema->db, schema->name);
+    mysqld_close_cached_table(schema->db, schema->name);
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_rename_table_prepare(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_rename_table_prepare");
+
+    assert(is_post_epoch()); // Always after epoch
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    NDB_SHARE *share= get_share(schema);
+    if (share)
+    {
+      ndbcluster_prepare_rename_share(share, schema->query);
+      free_share(&share);
+    }
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_rename_table(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_rename_table");
+
+    assert(is_post_epoch()); // Always after epoch
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    if (is_local_table(schema->db, schema->name))
+    {
+      /* Tables exists as a local table, print error and leave it */
+      sql_print_error("NDB Binlog: Skipping renaming locally "
+                      "defined table '%s.%s' from binlog schema "
+                      "event '%s' from node %d. ",
+                      schema->db, schema->name, schema->query,
+                      schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[2]=
+      {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    NDB_SHARE *share= get_share(schema);
+    // invalidation already handled by binlog thread
+    if (!share || !share->op)
+    {
+      ndbapi_invalidate_table(schema->db, schema->name);
+      mysqld_close_cached_table(schema->db, schema->name);
+    }
+    if (share)
+      free_share(&share);
+
+    share= get_share(schema);
+    if (share)
+    {
+      ndbcluster_rename_share(m_thd, share);
+      free_share(&share);
+    }
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_drop_db(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_drop_db");
+
+    assert(is_post_epoch()); // Always after epoch
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    // Set NO_LOCK_SCHEMA_OP before 'check_if_local_tables_indb'
+    // until ndbcluster_find_files does not take GSL
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+
+    if (check_if_local_tables_in_db(schema->db))
+    {
+      /* Tables exists as a local table, print error and leave it */
+      sql_print_error("NDB Binlog: Skipping drop database '%s' since "
+                      "it contained local tables "
+                      "binlog schema event '%s' from node %d. ",
+                      schema->db, schema->query,
+                      schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    const int no_print_error[1]= {0};
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_truncate_table(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_truncate_table");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    NDB_SHARE *share= get_share(schema);
+    // invalidation already handled by binlog thread
+    if (!share || !share->op)
+    {
+      ndbapi_invalidate_table(schema->db, schema->name);
+      mysqld_close_cached_table(schema->db, schema->name);
+    }
+    if (share)
+      free_share(&share);
+
+    if (is_local_table(schema->db, schema->name))
+    {
+      sql_print_error("NDB Binlog: Skipping locally defined table "
+                      "'%s.%s' from binlog schema event '%s' from "
+                      "node %d. ",
+                      schema->db, schema->name, schema->query,
+                      schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
+    {
+      print_could_not_discover_error(m_thd, schema);
+    }
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_create_table(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_create_table");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    if (is_local_table(schema->db, schema->name))
+    {
+      sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
+                          "binlog schema event '%s' from node %d. ",
+                          schema->db, schema->name, schema->query,
+                          schema->node_id);
+      DBUG_VOID_RETURN;
+    }
+
+    if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
+    {
+      print_could_not_discover_error(m_thd, schema);
+    }
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_create_db(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_create_db");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[1]= {0};
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_alter_db(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_alter_db");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[1]= {0};
+    run_query(m_thd, schema->query,
+              schema->query + schema->query_length,
+              no_print_error);
+
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  handle_grant_op(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_grant_op");
+
+    assert(!is_post_epoch()); // Always directly
+
+    if (schema->node_id == own_nodeid())
+      DBUG_VOID_RETURN;
+
+    write_schema_op_to_binlog(m_thd, schema);
+
+    if (opt_ndb_extra_logging > 9)
+      sql_print_information("Got dist_priv event: %s, "
+                            "flushing privileges",
+                            get_schema_type_name(schema->type));
+
+    Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
+    Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+    thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
+    const int no_print_error[1]= {0};
+    char *cmd= (char *) "flush privileges";
+    run_query(m_thd, cmd,
+              cmd + strlen(cmd),
+              no_print_error);
+
+    DBUG_VOID_RETURN;
+  }
+
+
   int
   handle_schema_op(Ndb_schema_op* schema)
   {
@@ -2937,179 +3226,64 @@ class Ndb_schema_event_handler {
       switch (schema_type)
       {
       case SOT_CLEAR_SLOCK:
-        handle_clear_slock(schema);
+        /*
+          handle slock after epoch is completed to ensure that
+          schema events get inserted in the binlog after any data
+          events
+        */
+        log_after_epoch(schema);
         DBUG_RETURN(0);
 
       case SOT_ALTER_TABLE_COMMIT:
       case SOT_RENAME_TABLE_PREPARE:
       case SOT_ONLINE_ALTER_TABLE_PREPARE:
       case SOT_ONLINE_ALTER_TABLE_COMMIT:
+      case SOT_RENAME_TABLE:
+      case SOT_DROP_TABLE:
+      case SOT_DROP_DB:
         log_after_epoch(schema);
         unlock_after_epoch(schema);
         DBUG_RETURN(0);
 
-      default:
+      case SOT_TRUNCATE_TABLE:
+        handle_truncate_table(schema);
         break;
-      }
-
-      if (schema->node_id != own_nodeid())
-      {
-        THD* thd= m_thd; // Code compatibility
-        Thd_ndb *thd_ndb= get_thd_ndb(thd);
-        Thd_ndb_options_guard thd_ndb_options(thd_ndb);
 
-        int post_epoch_unlock= 0;
- 
-        switch (schema_type)
-        {
-        case SOT_RENAME_TABLE:
-        case SOT_RENAME_TABLE_NEW:
-        case SOT_DROP_TABLE:
-          if (! ndbcluster_check_if_local_table(schema->db, schema->name))
-          {
-            thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-            const int no_print_error[2]=
-              {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
-            run_query(thd, schema->query,
-                      schema->query + schema->query_length,
-                      no_print_error);
-            /* binlog dropping table after any table operations */
-            log_after_epoch(schema);
-            /* acknowledge this query _after_ epoch completion */
-            post_epoch_unlock= 1;
-          }
-          else
-          {
-            /* Tables exists as a local table, print error and leave it */
-            DBUG_PRINT("info", ("Found local table '%s.%s', leaving it",
-                                schema->db, schema->name));
-            sql_print_error("NDB Binlog: Skipping %sing locally "
-                            "defined table '%s.%s' from binlog schema "
-                            "event '%s' from node %d. ",
-                            (schema_type == SOT_DROP_TABLE ? "dropp" : "renam"),
-                            schema->db, schema->name, schema->query,
-                            schema->node_id);
-            write_schema_op_to_binlog(thd, schema);
-          }
-          // Fall through
-	case SOT_TRUNCATE_TABLE:
-        {
-          NDB_SHARE *share= get_share(schema);
-          // invalidation already handled by binlog thread
-          if (!share || !share->op)
-          {
-            ndbapi_invalidate_table(schema->db, schema->name);
-            mysqld_close_cached_table(schema->db, schema->name);
-          }
-          if (share)
-            free_share(&share);
-        }
-        if (schema_type != SOT_TRUNCATE_TABLE)
-          break;
-        // fall through
-        case SOT_CREATE_TABLE:
-          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-          if (ndbcluster_check_if_local_table(schema->db, schema->name))
-          {
-            DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
-                                schema->db, schema->name));
-            sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
-                            "binlog schema event '%s' from node %d. ",
-                            schema->db, schema->name, schema->query,
-                            schema->node_id);
-          }
-          else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
-          {
-            print_could_not_discover_error(thd, schema);
-          }
-          write_schema_op_to_binlog(thd, schema);
-          break;
+      case SOT_CREATE_TABLE:
+        handle_create_table(schema);
+        break;
 
-        case SOT_DROP_DB:
-          /* Drop the database locally if it only contains ndb tables */
-          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-          if (!check_if_local_tables_in_db(schema->db))
-          {
-            const int no_print_error[1]= {0};
-            run_query(thd, schema->query,
-                      schema->query + schema->query_length,
-                      no_print_error);
-            /* binlog dropping database after any table operations */
-            log_after_epoch(schema);
-            /* acknowledge this query _after_ epoch completion */
-            post_epoch_unlock= 1;
-          }
-          else
-          {
-            /* Database contained local tables, leave it */
-            sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
-                            "binlog schema event '%s' from node %d. ",
-                            schema->db, schema->query,
-                            schema->node_id);
-            write_schema_op_to_binlog(thd, schema);
-          }
-          break;
+      case SOT_CREATE_DB:
+        handle_create_db(schema);
+        break;
 
-        case SOT_CREATE_DB:
-        case SOT_ALTER_DB:
-        {
-          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-          const int no_print_error[1]= {0};
-          run_query(thd, schema->query,
-                    schema->query + schema->query_length,
-                    no_print_error);
-          write_schema_op_to_binlog(thd, schema);
-          break;
-        }
+      case SOT_ALTER_DB:
+        handle_alter_db(schema);
+        break;
 
-        case SOT_CREATE_USER:
-        case SOT_DROP_USER:
-        case SOT_RENAME_USER:
-        case SOT_GRANT:
-        case SOT_REVOKE:
-        {
-          if (opt_ndb_extra_logging > 9)
-            sql_print_information("Got dist_priv event: %s, "
-                                  "flushing privileges",
-                                  get_schema_type_name(schema_type));
-
-          thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-          const int no_print_error[1]= {0};
-          char *cmd= (char *) "flush privileges";
-          run_query(thd, cmd,
-                    cmd + strlen(cmd),
-                    no_print_error);
-          write_schema_op_to_binlog(thd, schema);
-	  break;
-        }
-
-        case SOT_TABLESPACE:
-        case SOT_LOGFILE_GROUP:
-          write_schema_op_to_binlog(thd, schema);
-          break;
+      case SOT_CREATE_USER:
+      case SOT_DROP_USER:
+      case SOT_RENAME_USER:
+      case SOT_GRANT:
+      case SOT_REVOKE:
+        handle_grant_op(schema);
+        break;
 
-        case SOT_ALTER_TABLE_COMMIT:
-        case SOT_RENAME_TABLE_PREPARE:
-        case SOT_ONLINE_ALTER_TABLE_PREPARE:
-        case SOT_ONLINE_ALTER_TABLE_COMMIT:
-        case SOT_CLEAR_SLOCK:
-          // Impossible to come here, the above types has already
-          // been handled and caused the function to return 
-          abort();
+      case SOT_TABLESPACE:
+      case SOT_LOGFILE_GROUP:
+        if (schema->node_id == own_nodeid())
           break;
+        write_schema_op_to_binlog(m_thd, schema);
+        break;
 
-        }
+      }
 
-        /* signal that schema operation has been handled */
-        DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
-        if (bitmap_is_set(&schema->slock, own_nodeid()))
-        {
-          if (post_epoch_unlock)
-            unlock_after_epoch(schema);
-          else
-            ack_schema_op(schema->db, schema->name,
-                          schema->id, schema->version);
-        }
+      /* signal that schema operation has been handled */
+      DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
+      if (bitmap_is_set(&schema->slock, own_nodeid()))
+      {
+        ack_schema_op(schema->db, schema->name,
+                      schema->id, schema->version);
       }
     }
     DBUG_RETURN(0);
@@ -3140,39 +3314,20 @@ class Ndb_schema_event_handler {
         break;
 
       case SOT_DROP_DB:
-        write_schema_op_to_binlog(thd, schema);
+        handle_drop_db(schema);
         break;
 
       case SOT_DROP_TABLE:
-        write_schema_op_to_binlog(thd, schema);
-        ndbapi_invalidate_table(schema->db, schema->name);
-        mysqld_close_cached_table(schema->db, schema->name);
+        handle_drop_table(schema);
         break;
 
-      case SOT_RENAME_TABLE:
-      {
-        write_schema_op_to_binlog(thd, schema);
-        NDB_SHARE *share= get_share(schema);
-        if (share)
-        {
-          ndbcluster_rename_share(thd, share);
-          free_share(&share);
-        }
+      case SOT_RENAME_TABLE_PREPARE:
+        handle_rename_table_prepare(schema);
         break;
-      }
 
-      case SOT_RENAME_TABLE_PREPARE:
-      {
-        if (schema->node_id == own_nodeid())
-          break;
-        NDB_SHARE *share= get_share(schema);
-        if (share)
-        {
-          ndbcluster_prepare_rename_share(share, schema->query);
-          free_share(&share);
-        }
+      case SOT_RENAME_TABLE:
+        handle_rename_table(schema);
         break;
-      }
 
       case SOT_ALTER_TABLE_COMMIT:
         handle_offline_alter_table_commit(schema);
@@ -3186,46 +3341,6 @@ class Ndb_schema_event_handler {
         handle_online_alter_table_commit(schema);
         break;
 
-      case SOT_RENAME_TABLE_NEW:
-      {
-        write_schema_op_to_binlog(thd, schema);
-        NDB_SHARE *share= get_share(schema);
-        if (ndb_binlog_running && (!share || !share->op))
-        {
-          /*
-            we need to free any share here as command below
-            may need to call handle_trailing_share
-          */
-          if (share)
-          {
-            /* ndb_share reference temporary free */
-            DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
-                                     share->key, share->use_count));
-            free_share(&share);
-            share= 0;
-          }
-
-          if (ndbcluster_check_if_local_table(schema->db, schema->name))
-          {
-            DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
-                                schema->db, schema->name));
-            sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
-                            "binlog schema event '%s' from node %d. ",
-                            schema->db, schema->name, schema->query,
-                            schema->node_id);
-          }
-          else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
-          {
-            print_could_not_discover_error(thd, schema);
-          }
-        }
-        if (share)
-        {
-          free_share(&share);
-        }
-        break;
-      }
-
       default:
         DBUG_ASSERT(FALSE);
       }
@@ -3452,7 +3567,7 @@ public:
 };
 
 /*********************************************************************
-  Internal helper functions for handeling of the cluster replication tables
+  Internal helper functions for handling of the cluster replication tables
   - ndb_binlog_index
   - ndb_apply_status
 *********************************************************************/
@@ -5081,8 +5196,9 @@ ndbcluster_check_if_local_table(const ch
   char ndb_file[FN_REFLEN + 1];
 
   DBUG_ENTER("ndbcluster_check_if_local_table");
-  build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
-  build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
+  build_table_filename(key, sizeof(key)-1, dbname, tabname, reg_ext, 0);
+  build_table_filename(ndb_file, sizeof(ndb_file)-1,
+                       dbname, tabname, ha_ndb_ext, 0);
   /* Check that any defined table is an ndb table */
   DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
   if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
@@ -5107,7 +5223,6 @@ int ndbcluster_create_binlog_setup(THD *
                                    const char *table_name,
                                    TABLE * table)
 {
-  int do_event_op= ndb_binlog_running;
   DBUG_ENTER("ndbcluster_create_binlog_setup");
   DBUG_PRINT("enter",("key: %s  key_len: %d  %s.%s",
                       key, key_len, db, table_name));
@@ -5131,25 +5246,7 @@ int ndbcluster_create_binlog_setup(THD *
     DBUG_RETURN(0); // replication already setup, or should not
   }
 
-  if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name))
-  {
-    // The distributed privilege tables are distributed by writing
-    // the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need
-    // to listen to events from this table
-    DBUG_PRINT("info", ("Skipping binlogging of table %s/%s", db, table_name));
-    do_event_op= 0;
-  }
-
-  if (!ndb_schema_share &&
-      strcmp(share->db, NDB_REP_DB) == 0 &&
-      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
-    do_event_op= 1;
-  else if (!ndb_apply_status_share &&
-           strcmp(share->db, NDB_REP_DB) == 0 &&
-           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
-    do_event_op= 1;
-
-  if (!do_event_op)
+  if (!share->need_events(ndb_binlog_running))
   {
     set_binlog_nologging(share);
     pthread_mutex_unlock(&share->mutex);
@@ -5256,11 +5353,8 @@ ndbcluster_create_event(THD *thd, Ndb *n
                       ndbtab->getName(), ndbtab->getObjectVersion(),
                       event_name, share ? share->key : "(nil)"));
   DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
-  if (!share)
-  {
-    DBUG_PRINT("info", ("share == NULL"));
-    DBUG_RETURN(0);
-  }
+  DBUG_ASSERT(share);
+
   if (get_binlog_nologging(share))
   {
     if (opt_ndb_extra_logging && ndb_binlog_running)
@@ -5440,8 +5534,7 @@ ndbcluster_create_event_ops(THD *thd, ND
   DBUG_ENTER("ndbcluster_create_event_ops");
   DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
   DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
-
-  DBUG_ASSERT(share != 0);
+  DBUG_ASSERT(share);
 
   if (get_binlog_nologging(share))
   {
@@ -5455,7 +5548,6 @@ ndbcluster_create_event_ops(THD *thd, ND
   assert(!Ndb_dist_priv_util::is_distributed_priv_table(share->db,
                                                         share->table_name));
 
-  Ndb_event_data *event_data= share->event_data;
   int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
 #ifdef HAVE_NDB_BINLOG
   uint len= (int)strlen(share->table_name);
@@ -5480,6 +5572,10 @@ ndbcluster_create_event_ops(THD *thd, ND
     DBUG_RETURN(0);
   }
 
+  // Check that the share agrees
+  DBUG_ASSERT(share->need_events(ndb_binlog_running));
+
+  Ndb_event_data *event_data= share->event_data;
   if (share->op)
   {
     event_data= (Ndb_event_data *) share->op->getCustomData();

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-11-15 14:45:16 +0000
+++ b/sql/log_event.cc	2011-11-18 09:24:36 +0000
@@ -7960,7 +7960,7 @@ int Rows_log_event::do_apply_event(Relay
       Note that unlike the other thd options set here, this one
       comes from a global, and not from the incoming event.
     */
-    if (slave_allow_batching)
+    if (opt_slave_allow_batching)
       thd->variables.option_bits|= OPTION_ALLOW_BATCH;
     else
       thd->variables.option_bits&= ~OPTION_ALLOW_BATCH;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2011-11-15 13:32:16 +0000
+++ b/sql/mysqld.h	2011-11-18 09:24:36 +0000
@@ -166,7 +166,9 @@ extern ulong slow_launch_threads, slow_l
 extern ulong table_cache_size, table_def_size;
 extern MYSQL_PLUGIN_IMPORT ulong max_connections;
 extern ulong max_connect_errors, connect_timeout;
-extern my_bool slave_allow_batching;
+#ifndef MCP_WL3733
+extern my_bool opt_slave_allow_batching;
+#endif
 extern my_bool allow_slave_start;
 extern LEX_CSTRING reason_slave_blocked;
 extern ulong slave_trans_retries;

=== modified file 'sql/ndb_event_data.cc'
--- a/sql/ndb_event_data.cc	2011-10-29 09:02:21 +0000
+++ b/sql/ndb_event_data.cc	2011-11-10 08:21:36 +0000
@@ -42,3 +42,13 @@ Ndb_event_data::~Ndb_event_data()
   */
   my_free(ndb_value[0]);
 }
+
+
+void Ndb_event_data::print(const char* where, FILE* file) const
+{
+  fprintf(file,
+          "%s shadow_table: %p '%s.%s'\n",
+          where,
+          shadow_table, shadow_table->s->db.str,
+          shadow_table->s->table_name.str);
+}

=== modified file 'sql/ndb_event_data.h'
--- a/sql/ndb_event_data.h	2011-10-29 09:02:21 +0000
+++ b/sql/ndb_event_data.h	2011-11-10 08:21:36 +0000
@@ -34,6 +34,8 @@ public:
   struct TABLE *shadow_table;
   struct NDB_SHARE *share;
   union NdbValue *ndb_value[2];
+
+  void print(const char* where, FILE* file) const;
 };
 
 #endif

=== modified file 'sql/ndb_schema_dist.cc'
--- a/sql/ndb_schema_dist.cc	2011-11-02 09:28:48 +0000
+++ b/sql/ndb_schema_dist.cc	2011-11-10 08:29:32 +0000
@@ -26,8 +26,6 @@ get_schema_type_name(uint type)
     return "DROP_TABLE";
   case SOT_CREATE_TABLE:
     return "CREATE_TABLE";
-  case SOT_RENAME_TABLE_NEW:
-    return "RENAME_TABLE_NEW";
   case SOT_ALTER_TABLE_COMMIT:
     return "ALTER_TABLE_COMMIT";
   case SOT_DROP_DB:

=== modified file 'sql/ndb_schema_dist.h'
--- a/sql/ndb_schema_dist.h	2011-11-02 09:28:48 +0000
+++ b/sql/ndb_schema_dist.h	2011-11-10 08:29:32 +0000
@@ -40,7 +40,7 @@ enum SCHEMA_OP_TYPE
 {
   SOT_DROP_TABLE= 0,
   SOT_CREATE_TABLE= 1,
-  SOT_RENAME_TABLE_NEW= 2,
+  SOT_RENAME_TABLE_NEW= 2, // Unused, but still reserved
   SOT_ALTER_TABLE_COMMIT= 3,
   SOT_DROP_DB= 4,
   SOT_CREATE_DB= 5,

=== modified file 'sql/ndb_share.cc'
--- a/sql/ndb_share.cc	2011-11-07 19:00:35 +0000
+++ b/sql/ndb_share.cc	2011-11-10 08:21:36 +0000
@@ -17,6 +17,10 @@
 
 #include "ndb_share.h"
 #include "ndb_event_data.h"
+#include "ndb_dist_priv_util.h"
+#include "ha_ndbcluster_tables.h"
+
+#include <ndbapi/NdbEventOperation.hpp>
 
 #include <my_sys.h>
 
@@ -37,12 +41,109 @@ NDB_SHARE::destroy(NDB_SHARE* share)
   }
 #endif
   share->new_op= 0;
-  if (share->event_data)
+  Ndb_event_data* event_data = share->event_data;
+  if (event_data)
   {
-    delete share->event_data;
-    share->event_data= 0;
+    delete event_data;
+    event_data= 0;
   }
   free_root(&share->mem_root, MYF(0));
   my_free(share);
 }
 
+
+bool
+NDB_SHARE::need_events(bool default_on) const
+{
+  DBUG_ENTER("NDB_SHARE::need_events");
+  DBUG_PRINT("enter", ("db: %s, table_name: %s",
+                        db, table_name));
+
+  if (default_on)
+  {
+    // Events are on by default, check if it should be turned off
+
+    if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name))
+    {
+      /*
+        The distributed privilege tables are distributed by writing
+        the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need
+        to listen to events from those table
+      */
+      DBUG_PRINT("exit", ("no events for dist priv table"));
+      DBUG_RETURN(false);
+    }
+
+    DBUG_PRINT("exit", ("need events(the default for this mysqld)"));
+    DBUG_RETURN(true);
+  }
+
+  // Events are off by default, check if it should be turned on
+  if (strcmp(db, NDB_REP_DB) == 0)
+  {
+    // The table is in "mysql" database
+    if (strcmp(table_name, NDB_SCHEMA_TABLE) == 0)
+    {
+      DBUG_PRINT("exit", ("need events for " NDB_SCHEMA_TABLE));
+      DBUG_RETURN(true);
+    }
+
+    if (strcmp(table_name, NDB_APPLY_TABLE) == 0)
+    {
+      DBUG_PRINT("exit", ("need events for " NDB_APPLY_TABLE));
+      DBUG_RETURN(true);
+    }
+  }
+
+  DBUG_PRINT("exit", ("no events(the default for this mysqld)"));
+  DBUG_RETURN(false);
+}
+
+
+Ndb_event_data* NDB_SHARE::get_event_data_ptr() const
+{
+  if (event_data)
+  {
+    // The event_data pointer is only used before
+    // creating the NdbEventoperation -> check no op yet
+    assert(!op);
+
+    return event_data;
+  }
+
+  if (op)
+  {
+    // The event_data should now be empty since it's been moved to
+    // op's custom data
+    assert(!event_data);
+
+    // Check that op has custom data
+    assert(op->getCustomData());
+
+    return (Ndb_event_data*)op->getCustomData();
+  }
+
+  return NULL;
+}
+
+
+void NDB_SHARE::print(const char* where, FILE* file) const
+{
+  fprintf(file, "%s %s.%s: use_count: %u\n",
+          where, db, table_name, use_count);
+  fprintf(file, "  - key: '%s', key_length: %d\n", key, key_length);
+  fprintf(file, "  - commit_count: %llu\n", commit_count);
+  if (new_key)
+    fprintf(file, "  - new_key: %p, '%s'\n",
+            new_key, new_key);
+  if (event_data)
+    fprintf(file, "  - event_data: %p\n", event_data);
+  if (op)
+    fprintf(file, "  - op: %p\n", op);
+  if (new_op)
+    fprintf(file, "  - new_op: %p\n", new_op);
+
+  Ndb_event_data *event_data_ptr= get_event_data_ptr();
+  if (event_data_ptr)
+    event_data_ptr->print("  -", file);
+}

=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h	2011-11-05 11:35:57 +0000
+++ b/sql/ndb_share.h	2011-11-10 08:21:36 +0000
@@ -188,7 +188,20 @@ struct NDB_SHARE {
   MY_BITMAP *subscriber_bitmap;
   class NdbEventOperation *new_op;
 
+  static NDB_SHARE* create(const char* key, size_t key_length,
+                         struct TABLE* table, const char* db_name,
+                         const char* table_name);
   static void destroy(NDB_SHARE* share);
+
+  class Ndb_event_data* get_event_data_ptr() const;
+
+  void print(const char* where, FILE* file = stderr) const;
+
+  /*
+    Returns true if this share need to subscribe to
+    events from the table.
+  */
+  bool need_events(bool default_on) const;
 };
 
 

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2011-11-15 14:25:58 +0000
+++ b/sql/sys_vars.cc	2011-11-18 09:24:36 +0000
@@ -2470,40 +2470,6 @@ static Sys_var_ulong Sys_profiling_histo
        VALID_RANGE(0, 100), DEFAULT(15), BLOCK_SIZE(1));
 #endif
 
-#ifndef MCP_WL3733
-#ifndef EMBEDDED_LIBRARY
-my_bool slave_allow_batching;
-
-/*
-  Take Active MI lock while checking/updating slave_allow_batching
-  to give atomicity w.r.t. slave state changes
-*/
-static PolyLock_mutex PLock_active_mi(&LOCK_active_mi);
-
-static bool slave_allow_batching_check(sys_var *self, THD *thd, set_var *var)
-{
-  /* Only allow a change if the slave SQL thread is currently stopped */
-  bool slave_sql_running = active_mi->rli.slave_running;
-
-  if (slave_sql_running)
-  {
-    my_error(ER_SLAVE_MUST_STOP, MYF(0));
-    return true;
-  }
-
-  return false;
-}
-
-static Sys_var_mybool Sys_slave_allow_batching(
-       "slave_allow_batching", "Allow slave to batch requests",
-       GLOBAL_VAR(slave_allow_batching),
-       CMD_LINE(OPT_ARG), DEFAULT(FALSE),
-       &PLock_active_mi,
-       NOT_IN_BINLOG,
-       ON_CHECK(slave_allow_batching_check));
-#endif
-#endif
-
 static Sys_var_harows Sys_select_limit(
        "sql_select_limit",
        "The maximum number of rows to return from SELECT statements",
@@ -3041,6 +3007,14 @@ static Sys_var_mybool Sys_relay_log_reco
        "processed",
        GLOBAL_VAR(relay_log_recovery), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
 
+#ifndef MCP_WL3733
+my_bool opt_slave_allow_batching;
+static Sys_var_mybool Sys_slave_allow_batching(
+       "slave_allow_batching", "Allow slave to batch requests",
+       GLOBAL_VAR(opt_slave_allow_batching),
+       CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+#endif
+
 static Sys_var_charptr Sys_slave_load_tmpdir(
        "slave_load_tmpdir", "The location where the slave should put "
        "its temporary files when replicating a LOAD DATA INFILE command",

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster branch (magnus.blaudd:3654 to 3657) magnus.blaudd21 Nov